Explorar o código

Merge remote-tracking branch 'origin/master'

xufei %!s(int64=4) %!d(string=hai) anos
pai
achega
c56cd0fff8

+ 50 - 7
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamic.scala

@@ -57,14 +57,18 @@ object CompanyDynamic {
 
     //表名(不加前后辍)
     def calc(tableName: String
-             , bName: Boolean = false //是否补充cname字段
+             , bName: Int = 0 //是否补充cname字段
             ): Unit = {
       val handle = getClazz[CompanyDynamicHandle](s"com.winhc.bigdata.spark.jobs.dynamic.tables.$tableName")
 
       val types = handle.org_type()
+      val colsExclusiveSome = spark.table(s"${project}.ads_change_extract").columns.filter(s => {
+        !s.equals("cid") && !s.equals("data") && !s.equals("old_data") && !s.equals("ds") && !s.equals("tn")
+      }).seq
       val rdd = sql(
         bName match {
-          case false =>
+//默认:无需补全cname字段
+          case 0 =>
             s"""
                |SELECT  *,null AS cname
                |FROM    ${project}.ads_change_extract
@@ -72,7 +76,8 @@ object CompanyDynamic {
                |AND     tn = '$tableName'
                |AND     TYPE in (${types.map("'" + _ + "'").mkString(",")})
                |""".stripMargin
-          case true =>
+//需根据cid补全cname字段数据
+          case 1 =>
             s"""
                |SELECT A.*,B.cname AS cname
                |FROM(
@@ -88,6 +93,42 @@ object CompanyDynamic {
                |) AS B
                |ON A.cid = B.cid
                |""".stripMargin
+//既需根据cid补全cname字段数据;还需在data字段中根据其中的cid添加对应cname数据
+          case 2 =>
+            s"""
+               |SELECT cid, ${colsExclusiveSome.mkString(",")},old_data, cname, str_to_map(replace(concat_ws(',',data),'},{',',')) AS data
+               |FROM(
+               |  SELECT new_cid AS cid, ${colsExclusiveSome.mkString(",")},to_json(old_data) AS old_data, COLLECT_SET(cname)[0] AS cname, COLLECT_SET(to_json(data)) AS data
+               |  FROM(
+               |    SELECT new_cid, ${colsExclusiveSome.map("A." + _).mkString(",")}, A.cid, IF(A.data IS NULL, STR_TO_MAP(CONCAT('cname:',cname),',',':'), A.data) AS data, old_data, IF(A.new_cid=A.cid,B.cname,null) AS cname
+               |    FROM(
+               |      SELECT cid AS new_cid, cid, ${colsExclusiveSome.mkString(",")},data, old_data
+               |      FROM(
+               |        SELECT  *
+               |        FROM    ${project}.ads_change_extract
+               |        WHERE   ds = '$ds'
+               |        AND     tn = '$tableName'
+               |        AND     TYPE in (${types.map("'" + _ + "'").mkString(",")})
+               |        )
+               |      UNION ALL
+               |      SELECT cid AS new_cid, data['cid'] AS cid, ${colsExclusiveSome.mkString(",")},null AS data, old_data
+               |      FROM(
+               |        SELECT  *
+               |        FROM    ${project}.ads_change_extract
+               |        WHERE   ds = '$ds'
+               |        AND     tn = '$tableName'
+               |        AND     TYPE in (${types.map("'" + _ + "'").mkString(",")})
+               |        )
+               |    ) AS A
+               |    LEFT JOIN (
+               |        SELECT cid,cname FROM  $project.base_company_mapping
+               |        WHERE ds = '${getLastPartitionsOrElse(project + ".base_company_mapping", "0")}'
+               |    ) AS B
+               |    ON A.cid = B.cid
+               |  )
+               |  GROUP BY new_cid, ${colsExclusiveSome.mkString(",")},to_json(old_data)
+               |)
+               |""".stripMargin
         })
         .rdd.flatMap(r => {
         val rowkey = r.getAs[String]("rowkey")
@@ -145,7 +186,7 @@ object CompanyDynamic {
       sys.exit(-99)
     }
 
-    val Array(project, ds, tableName, bName_table) = if (args.length == 4) args else args :+ ""
+    val Array(project, tableName, ds, bName_table) = if (args.length == 4) args else args :+ "0"
 
     println(
       s"""
@@ -162,15 +203,17 @@ object CompanyDynamic {
     val cd = CompanyDynamicUtil(spark, project, ds)
 
     cd.init()
+/*
 
     for (e <- tableName.split(",")) {
       if (e.length > 2) {
-        cd.calc(e)
+        cd.calc(e, bName_table.toInt)
       }
     }
 
-    for (e <- bName_table.split(",")) {
-      cd.calc(e, bName = true)
+*/
+    for (e <- tableName.split(",")) {
+      cd.calc(e, bName_table.toInt)
     }
     spark.stop()
   }

+ 0 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamicHandle.scala

@@ -117,7 +117,6 @@ trait CompanyDynamicHandle {
     ))
   }
 
-
   /**
    * 来源表的变更类型,insert or update
    *