Преглед на файлове

feat: 存量企业动态从存量变更信息中提取

许家凯 преди 4 години
родител
ревизия
3e55dbcd88
променени са 1 файла, в които са добавени 122 реда и са изтрити 0 реда
  1. 122 0
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/all_company_dynamic.scala

+ 122 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/all_company_dynamic.scala

@@ -0,0 +1,122 @@
+package com.winhc.bigdata.spark.jobs.dynamic
+
+import com.winhc.bigdata.spark.config.EsConfig
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.{LoggingUtils, SparkUtils}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/8/10 11:40
+ * @Description:
+ */
+object all_company_dynamic {
+
+  case class dynamic(id: String
+                     // , cid: String
+                     // , cname: String
+                     // , info_type: String
+                     , rta_desc: String
+                     , change_content: String
+                     , change_time: String
+                     // , biz_id: String
+                     // , sub_info_type: String
+                     // , info_risk_level: String
+                     // , winhc_suggest: String
+                    )
+
+  case class AllCompanyDynamicUtils(s: SparkSession,
+                                    project: String //表所在工程名
+                                   ) extends LoggingUtils with Logging {
+    @(transient@getter) val spark: SparkSession = s
+
+    def all(): Unit = {
+      val lastDs = getLastPartitionsOrElse("winhc_eci_dev.ads_company_change", "0")
+      val intersectCols = getColumns(s"$project.ads_company_change").toSet & getColumns(s"$project.inc_ads_company_change").toSet
+
+      import com.winhc.bigdata.spark.implicits.BaseHelper._
+      def change: (String, String, String, String, String) => dynamic = (rowkey, changeTime, changeItem, changeBefore, changeAfter) => {
+        val cid = rowkey.split("_")(0)
+        val rta_desc = changeItem
+        val change_time = changeTime
+        val change_content = s"""{"变更后内容": ${changeAfter.getOrNull()},"变更事项": ${changeItem.getOrNull()},"变更日期": "$changeTime","变更前内容": ${changeBefore.getOrNull()}}"""
+        val id = CompanyDynamicHandleUtils.getDynamicId(cid, rta_desc, cid, change_time)
+        dynamic(id, rta_desc, change_content, change_time)
+      }
+
+      import org.apache.spark.sql.functions._
+      val changeUdf = udf(change)
+      sql(
+        s"""
+           |SELECT  t2.*,now() as xjk
+           |FROM    (
+           |            SELECT  t1.*
+           |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds DESC) AS num
+           |            FROM    (
+           |                        SELECT  ${intersectCols.mkString(",")}
+           |                        FROM    winhc_eci_dev.ads_company_change
+           |                        WHERE   ds = '${lastDs}'
+           |                    ) AS t1
+           |        ) AS t2
+           |WHERE   t2.num = 1
+           |""".stripMargin)
+        .select(
+          split(col("rowkey"), "_").getItem(0).as("cid")
+          , split(col("rowkey"), "_").getItem(0).as("biz_id")
+          , changeUdf(col("rowkey"), col("change_time"), col("change_item"), col("content_after"), col("content_before")).as("change_cols")
+          , col("xjk").as("create_time")
+        )
+        .withColumn("cname", lit(""))
+        .withColumn("info_type", lit("eci_detail"))
+        .withColumn("sub_info_type", lit("1"))
+        .withColumn("info_risk_level", lit("2"))
+        .withColumn("winhc_suggest", lit(""))
+        .select(
+          col("change_cols").getField("id").as("id")
+          , col("cid")
+          , col("cname")
+          , col("info_type")
+          , col("change_cols").getField("rta_desc").as("rta_desc")
+          , col("change_cols").getField("change_content").as("change_content")
+          , col("change_cols").getField("change_time").as("change_time")
+          , col("biz_id")
+          , col("sub_info_type")
+          , col("info_risk_level")
+          , col("winhc_suggest")
+          , col("create_time").cast("string")
+        )
+        .createTempView("xjk_company_change_tmp")
+
+      val writeCols = getColumns(s"${getEnvProjectName(CompanyDynamic.env, project)}.${CompanyDynamic.targetTab}").diff(Seq("ds", "tn"))
+
+      sql(
+        s"""
+           |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${getEnvProjectName(CompanyDynamic.env, project)}.${CompanyDynamic.targetTab} PARTITION(ds='$lastDs',tn='company')
+           |SELECT ${writeCols.mkString(",")}
+           |FROM
+           |    xjk_company_change_tmp
+           |""".stripMargin)
+    }
+  }
+
+
+  def main(args: Array[String]): Unit = {
+
+    val project = "winhc_eci_dev"
+
+    val config = EsConfig.getEsConfigMap ++ mutable.Map(
+      "spark.hadoop.odps.project.name" -> project,
+      "spark.debug.maxToStringFields" -> "200",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "100"
+    )
+
+    val spark = SparkUtils.InitEnv("all_company_dynamic", config)
+    AllCompanyDynamicUtils(spark, project).all()
+
+    spark.stop()
+  }
+}