Ver Fonte

适配cids等

晏永年 há 4 anos atrás
pai
commit
4a4d2e4d5f

+ 20 - 16
src/main/scala/com/winhc/bigdata/spark/etl/PersonIncrSync.scala

@@ -21,6 +21,7 @@ case class PersonIncrSync(s: SparkSession,
                           tableName: String, //表名(不加前后辍)
                           idCardCol: String, //身份证字段
                           dupliCols: Seq[String], // 去重列
+                          cidCol: String,//企业id字段名称,默认为"cid"
                           updateCol: String = "update_time" //ROW_NUMBER窗口函数的ORDER BY字段,默认(可以不传参数)为update_time
                                               ) extends LoggingUtils with CompanyMapping {
   @(transient@getter) val spark: SparkSession = s
@@ -51,7 +52,7 @@ case class PersonIncrSync(s: SparkSession,
     var runDs = ""
     //table字段
     val columns: Seq[String] = spark.table(ads_company_tb).schema.map(_.name).filter(s => {
-      !s.equals("ds") && !s.equals("new_cid") && !s.equals("rowkey")
+      !s.equals("ds") && !s.equals(s"new_$cidCol") && !s.equals("rowkey")
     })
     //第一次run
     if (StringUtils.isBlank(lastDsIncAds)) {
@@ -60,7 +61,7 @@ case class PersonIncrSync(s: SparkSession,
       runDs = BaseUtil.atDaysAfter(1, lastDsIncAds)
     }
 
-    val cols_md5 = dupliCols.filter(!_.equals("new_cid"))
+    val cols_md5 = dupliCols.filter(!_.equals(s"new_$cidCol"))
 
     //增量ods和增量ads最后一个分区相等,跳出
     if (lastDsIncOds.equals(lastDsIncAds)) {
@@ -84,22 +85,22 @@ case class PersonIncrSync(s: SparkSession,
          |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${inc_ads_company_tb} PARTITION(ds=$lastDsIncOds)
          |SELECT  rowkey
          |        ,flag
-         |        ,new_cid
-         |        ,cid
-         |        ,${columns.filter(s=>{s!="cid" && s!="new_cid"}).mkString(",")}
+         |        ,new_$cidCol
+         |        ,$cidCol
+         |        ,${columns.filter(s=>{s!=s"${cidCol}" && s!=s"new_${cidCol}"}).mkString(",")}
          |FROM    (
          |            SELECT  CONCAT_WS('_',MD5(cleanup(${dupliCols.map(s => s"a.$s").toVector(0)})),MD5(cleanup(${cols_md5.drop(1).map(s => s"a.$s").mkString("")}))) AS rowkey
          |                    ,a.flag
-         |                    ,MD5(cleanup(${dupliCols.map(s => s"a.$s").toVector(0)})) AS new_cid
-         |                    ,null AS cid
-         |                    ,${columns.filter(s=>{s!="cid" && s!="new_cid"}).map(s => s"a.$s").mkString(",").replace("${idCardCol}","IF(a.${idCardCol} IS NULL AND c.identity_num IS NOT NULL,c.identity_num,a.${idCardCol}) AS ${idCardCol}")}
+         |                    ,MD5(cleanup(${dupliCols.map(s => s"a.$s").toVector(0)})) AS new_$cidCol
+         |                    ,null AS $cidCol
+         |                    ,${columns.filter(s=>{s!=s"${cidCol}" && s!=s"new_${cidCol}"}).map(s => s"a.$s").mkString(",").replace("${idCardCol}","IF(a.${idCardCol} IS NULL AND c.identity_num IS NOT NULL,c.identity_num,a.${idCardCol}) AS ${idCardCol}")}
          |                    ,ROW_NUMBER() OVER (PARTITION BY cleanup(CONCAT_WS('',${dupliCols.map(s => s"a.$s").mkString(",")})) ORDER BY NVL($updateCol,update_time) DESC ) num
          |            FROM    (
          |                        SELECT  "1" AS flag
          |                                ,${columns.mkString(",")}
          |                        FROM    ${inc_ods_company_tb}
          |                        WHERE   ds >= ${runDs}
-         |                        AND     cid IS NULL
+         |                        AND     $cidCol IS NULL
          |                        AND     ${dupliCols.mkString(""," IS NOT NULL AND "," IS NOT NULL")}
          |                    ) a
          |             LEFT JOIN
@@ -115,7 +116,7 @@ case class PersonIncrSync(s: SparkSession,
          |AND     trim(CONCAT_WS('',${cols_md5.mkString(",")})) <> ''
          |""".stripMargin)
 
-    val colsTotal = columns ++ Seq("new_cid")
+    val colsTotal = columns ++ Seq(s"new_$cidCol")
 
     MaxComputer2Phoenix(
       spark,
@@ -123,7 +124,7 @@ case class PersonIncrSync(s: SparkSession,
       inc_ads_company_tb,
       tableName+"_PERSON",
       lastDsIncOds,
-      s"CONCAT_WS('_',new_cid,MD5(cleanup(${cols_md5.drop(1).mkString("")})))"
+      s"CONCAT_WS('_',new_$cidCol,MD5(cleanup(${cols_md5.drop(1).mkString("")})))"
     ).syn()
 
 //    CompanyIncSummary(spark, project, tableName, "new_cid", dupliCols).calc
@@ -135,21 +136,24 @@ object PersonIncrSync {
 
   def main(args: Array[String]): Unit = {
 
-    val Array(project, tableName, idCardCol, dupliCols, flag, _) = args
+    val Array(project, tableName, idCardCol, dupliCols, _*) = args
     println(
       s"""
          |project: $project
          |tableName: $tableName
          |idCardCol: $idCardCol
          |dupliCols: $dupliCols
-         |flag: $flag
          |""".stripMargin)
     if (args.length < 5) {
-      println("请输入 project:项目, tableName:表名, idCardCol:身份证字段, dupliCols:去重字段, flag:标识, [updateCol:排序列]!!!")
+      println("请输入 project:项目, tableName:表名, idCardCol:身份证字段, dupliCols:去重字段, [cidCol:企业标识, [updateCol:排序列]]!!!")
       sys.exit(-1)
     }
-    //ROW_NUMBER窗口函数的ORDER BY字段,默认(可以不传参数)为update_time
+    //兼容zxr表的企业字段为cids;ROW_NUMBER窗口函数的ORDER BY字段,默认(可以不传参数)为update_time
+    var cidCol: String ="cid"
     var updateCol: String = "update_time"
+    if (args.length >= 5 && !args(4).endsWith("}")) {
+      cidCol = args(4)
+    }
     if (args.length == 6 && !args(5).endsWith("}")) {
       updateCol = args(5)
     }
@@ -158,7 +162,7 @@ object PersonIncrSync {
       "spark.hadoop.odps.spark.local.partition.amt" -> "100"
     )
     val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
-    PersonIncrSync(spark, project, tableName, idCardCol, dupliCols.split(",").toSeq, updateCol).calc()
+    PersonIncrSync(spark, project, tableName, idCardCol, dupliCols.split(",").toSeq, cidCol, updateCol).calc()
     spark.stop()
   }
 }