Bläddra i källkod

Merge branch 'master' of http://139.224.213.4:3000/bigdata/Spark_Max

# Conflicts:
#	src/main/scala/com/winhc/bigdata/spark/etl/PersonIncrSync.scala
晏永年 4 år sedan
förälder
incheckning
0a2593fa62

+ 207 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JudicialCaseRelationPre10.scala

@@ -0,0 +1,207 @@
+package com.winhc.bigdata.spark.jobs.judicial
+
+import com.winhc.bigdata.spark.udf.{BaseFunc, CompanyMapping}
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.mutable
+
+
+object JudicialCaseRelationPre10 {
+  def main(args: Array[String]): Unit = {
+    val project = "winhc_eci_dev"
+    println(
+      s"""
+         |project: $project
+         |""".stripMargin)
+
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> s"$project",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    JudicialCaseRelationPre10(spark, project).precalc()
+    spark.stop()
+  }
+}
+
+
+//司法案件-被执行人数据预处理
+case class JudicialCaseRelationPre10(s: SparkSession, project: String
+                                   ) extends LoggingUtils with CompanyMapping with BaseFunc {
+  override protected val spark: SparkSession = s
+
+  def precalc(): Unit = {
+    prepareFunctions(spark)
+    case_no_trim_udf()
+    //被执行人(企业)
+    var lastDsIncAds = BaseUtil.getPartion(s"$project.inc_ads_company_zxr", spark)
+    spark.sparkContext.setJobDescription(s"处理zxr($lastDsIncAds)")
+    sql(
+      s"""
+         |insert ${if (isWindows) "INTO" else "OVERWRITE"} table $project.ads_judicial_case_relation_pre partition(ds='$lastDsIncAds',tn='zxr')
+         |select
+         |  judicase_id
+         |  ,flag
+         |  ,title
+         |  ,case_type
+         |  ,case_reason
+         |  ,case_no
+         |  ,court_name
+         |  ,case_stage
+         |  ,yg_name
+         |  ,bg_name
+         |  ,date
+         |  ,detail_id
+         |  ,case_amt
+         |from (
+         |      select
+         |      md5(cleanup(case_no)) as judicase_id
+         |      ,"7" as flag
+         |      ,concat_ws('',cname,'被执行人') as title
+         |      ,concat_ws('',case_type(case_no)) as case_type
+         |      ,null as case_reason
+         |      ,case_no
+         |      ,court as court_name
+         |      ,concat_ws('',case_stage(case_no)) as case_stage
+         |      ,null as yg_name
+         |      ,cname as bg_name
+         |      ,case_create_time as date
+         |      ,rowkey as detail_id
+         |      ,exec_money as case_amt
+         |      ,row_number() over(partition by rowkey order by update_time desc) num
+         |      from $project.inc_ads_company_zxr
+         |      where length(case_no) > 0 and ds > '0'
+         |   )
+         |where num = 1
+         |""".stripMargin).show(10, false)
+
+
+    //被执行人预处理(个人)
+    val columns: Seq[String] = spark.table(s"$project.inc_ads_company_zxr_person").schema.map(_.name).filter(_!="flag")
+    lastDsIncAds = BaseUtil.getPartion(s"$project.inc_ads_company_zxr_person", spark)
+    spark.sparkContext.setJobDescription(s"处理zxr_person($lastDsIncAds)")
+    //1、先从被执行人中用name和case_no关联补全身份证号码
+    sql(
+      s"""
+         |--先从company_zxr_person存量、增量表获取
+         |SELECT rowkey
+         |    ,0 AS flag
+         |    ,new_cid
+         |    ,cid
+         |    ,id
+         |    ,cname
+         |    ,sex
+         |    ,card
+         |    ,court
+         |    ,case_create_time
+         |    ,A.case_no AS case_no
+         |    ,gist_id
+         |    ,case_state
+         |    ,exec_money
+         |    ,type
+         |    ,source
+         |    ,status
+         |    ,appro_time
+         |    ,A.create_time AS create_time
+         |    ,A.update_time AS update_time
+         |    ,A.deleted AS deleted
+         |FROM(
+         |    SELECT *
+         |    FROM(
+         |        SELECT *
+         |        FROM(
+         |            SELECT *
+         |            ,ROW_NUMBER() OVER (PARTITION BY cleanup(CONCAT_WS('',cname,case_no)) ORDER BY update_time DESC ) num
+         |            FROM(
+         |                SELECT ${columns.mkString(",")}
+         |                FROM $project.ads_company_zxr_person
+         |                WHERE ds>'0'
+         |                UNION ALL
+         |                SELECT ${columns.mkString(",")}
+         |                FROM $project.inc_ads_company_zxr_person
+         |                WHERE ds>'0'
+         |            )
+         |        )
+         |        WHERE num=1
+         |    )
+         | ) A
+         |
+         |""".stripMargin
+    ).createOrReplaceTempView("tmp_person_cloze_1")
+
+    //3、再从自身表中根据name和company_name去补全身份证号码
+//    sql(
+//      s"""
+//         |--先从前面结果表获取
+//         |INSERT OVERWRITE TABLE winhc_eci_dev.ads_company_zxr_person_cloze PARTITION(ds=$lastDsIncAds)
+//         |SELECT A.rowkey
+//         |    ,IF(A.card IS NULL AND D.card IS NOT NULL,3,0) AS flag
+//         |    ,A.new_cid
+//         |    ,A.cid
+//         |    ,A.id
+//         |    ,A.cname
+//         |    ,A.sex
+//         |    ,COALESCE(A.card,D.card) AS card
+//         |    ,A.court
+//         |    ,A.case_create_time
+//         |    ,A.case_no
+//         |    ,A.gist_id
+//         |    ,A.case_state
+//         |    ,A.exec_money
+//         |    ,A.type
+//         |    ,A.source
+//         |    ,A.status
+//         |    ,A.appro_time
+//         |    ,A.create_time
+//         |    ,A.update_time
+//         |    ,A.deleted
+//         |FROM tmp_person_cloze_1 A
+//         |LEFT JOIN (tmp_person_cloze_1 D)
+//         |
+//         |ON A.cname=D.cname and d.card is not null  --根据实际数据情况
+//         |""".stripMargin
+//    )//.createOrReplaceTempView("tmp_person_cloze_3")
+
+    sql(
+      s"""
+         |insert ${if (isWindows) "INTO" else "OVERWRITE"} table $project.ads_judicial_case_relation_pre partition(ds='$lastDsIncAds',tn='zxr_person')
+         |select
+         |  judicase_id
+         |  ,flag
+         |  ,title
+         |  ,case_type
+         |  ,case_reason
+         |  ,case_no
+         |  ,court
+         |  ,case_stage
+         |  ,yg_name
+         |  ,bg_name
+         |  ,date
+         |  ,detail_id
+         |  ,case_amt
+         |from (
+         |      select
+         |      md5(cleanup(case_no)) as judicase_id
+         |      ,"10" as flag
+         |      ,concat_ws('',cname,'被执行人') AS title
+         |      ,concat_ws('',case_type(case_no)) as case_type
+         |      ,NULL AS case_reason
+         |      ,case_no
+         |      ,court
+         |      ,concat_ws('',case_stage(case_no)) as case_stage
+         |      ,NULL as yg_name
+         |      ,cname as bg_name
+         |      ,case_create_time as date
+         |      ,rowkey as detail_id
+         |      ,exec_money as case_amt
+         |      ,row_number() over(partition by rowkey order by update_time desc) num
+         |      from tmp_person_cloze_1
+         |      where length(case_no) > 0
+         |   )
+         |where num = 1
+         |""".stripMargin).show(10, false)
+
+  }
+}

+ 226 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JudicialCaseRelationPre12.scala

@@ -0,0 +1,226 @@
+package com.winhc.bigdata.spark.jobs.judicial
+
+import com.winhc.bigdata.spark.udf.{BaseFunc, CompanyMapping}
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.mutable
+
+
+object JudicialCaseRelationPre12 {
+  def main(args: Array[String]): Unit = {
+    val project = "winhc_eci_dev"
+    println(
+      s"""
+         |project: $project
+         |""".stripMargin)
+
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> s"$project",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    JudicialCaseRelationPre12(spark, project).precalc()
+    spark.stop()
+  }
+}
+
+
+//司法案件,终本数据预处理
+case class JudicialCaseRelationPre12(s: SparkSession, project: String
+                                   ) extends LoggingUtils with CompanyMapping with BaseFunc {
+  override protected val spark: SparkSession = s
+
+  def precalc(): Unit = {
+    prepareFunctions(spark)
+    case_no_trim_udf()
+    //终本案件-企业
+    var lastDsIncAds = BaseUtil.getPartion(s"$project.inc_ads_company_zxr_final_case", spark)
+    spark.sparkContext.setJobDescription(s"处理company_zxr_final_case($lastDsIncAds)")
+    sql(
+      s"""
+         |insert ${if (isWindows) "INTO" else "OVERWRITE"} table $project.ads_judicial_case_relation_pre partition(ds='$lastDsIncAds',tn='zxr_final_case')
+         |select
+         |  judicase_id
+         |  ,flag
+         |  ,title
+         |  ,case_type
+         |  ,case_reason
+         |  ,case_no
+         |  ,court_name
+         |  ,case_stage
+         |  ,yg_name
+         |  ,bg_name
+         |  ,date
+         |  ,detail_id
+         |  ,case_amt
+         |from (
+         |      select
+         |      md5(cleanup(case_no)) as judicase_id
+         |      ,"6" as flag
+         |      ,concat_ws('',name,'终本案件') as title
+         |      ,concat_ws('',case_type(case_no)) as case_type
+         |      ,NULL as case_reason
+         |      ,case_no
+         |      ,court_name
+         |      ,concat_ws('',case_stage(case_no)) as case_stage
+         |      ,NULL as yg_name
+         |      ,name as bg_name
+         |      ,case_create_time as date
+         |      ,rowkey as detail_id
+         |      ,exec_amount as case_amt
+         |      ,row_number() over(partition by rowkey order by update_time desc) num
+         |      from $project.inc_ads_company_zxr_final_case
+         |      where length(case_no) > 0 and ds > '0'
+         |   )
+         |where num = 1
+         |""".stripMargin).show(10, false)
+
+
+    //终本案件预处理(个人)
+    val columns: Seq[String] = spark.table(s"$project.inc_ads_company_zxr_final_case_person").schema.map(_.name).filter(_!="flag")
+    lastDsIncAds = BaseUtil.getPartion(s"$project.inc_ads_company_zxr_final_case_person", spark)
+    spark.sparkContext.setJobDescription(s"处理zxr_final_case_person($lastDsIncAds)")
+    //1、先从被执行人中用name和case_no关联补全身份证号码
+    sql(
+      s"""
+         |--先从company_zxr_final_case_person存量、增量表获取
+         |SELECT rowkey
+         |    ,IF(A.identity_num IS NULL AND B.card IS NOT NULL,1,0) AS flag
+         |    ,new_cid
+         |    ,cid
+         |    ,id
+         |    ,name
+         |    ,sex
+         |    ,COALESCE(A.identity_num,B.card) AS identity_num
+         |    ,court_name
+         |    ,case_final_time
+         |    ,case_create_time
+         |    ,A.case_no AS case_no
+         |    ,description
+         |    ,no_exec_amount
+         |    ,exec_amount
+         |    ,source
+         |    ,status
+         |    ,appro_time
+         |    ,A.create_time AS create_time
+         |    ,A.update_time AS update_time
+         |    ,A.deleted AS deleted
+         |FROM(
+         |    SELECT *
+         |    FROM(
+         |        SELECT *
+         |        FROM(
+         |            SELECT *
+         |            ,ROW_NUMBER() OVER (PARTITION BY cleanup(CONCAT_WS('',name,case_no)) ORDER BY update_time DESC ) num
+         |            FROM(
+         |                SELECT ${columns.mkString(",")}
+         |                FROM $project.ads_company_zxr_final_case_person
+         |                WHERE ds>'0'
+         |                UNION ALL
+         |                SELECT ${columns.mkString(",")}
+         |                FROM $project.inc_ads_company_zxr_final_case_person
+         |                WHERE ds>'0'
+         |            )
+         |        )
+         |        WHERE num=1
+         |    )
+         | ) A
+         |LEFT JOIN
+         |(
+         |    SELECT *
+         |    FROM(
+         |        SELECT *
+         |        ,ROW_NUMBER() OVER (PARTITION BY cleanup(CONCAT_WS('',cname,case_no)) ORDER BY update_time DESC ) num
+         |        FROM (
+         |            SELECT cname,card,case_no,type,update_time
+         |            FROM $project.ods_company_zxr
+         |            WHERE ds>'0'
+         |            UNION ALL
+         |            SELECT cname,card,case_no,type,update_time
+         |            FROM $project.inc_ods_company_zxr
+         |            WHERE ds>'0'
+         |        )
+         |    )
+         |    WHERE num=1 AND type='1'
+         |) B
+         |ON A.name=B.cname AND A.case_no=B.case_no
+         |""".stripMargin
+    ).createOrReplaceTempView("tmp_person_cloze_1")
+    //2、再从失信人中用name和case_no关联补全身份证号码
+
+    //3、再从自身表中根据name和company_name去补全身份证号码
+//    sql(
+//      s"""
+//         |--先从前面结果表获取
+//         |INSERT OVERWRITE TABLE winhc_eci_dev.ads_company_zxr_final_case_person_cloze PARTITION(ds=$lastDsIncAds)
+//         |SELECT A.rowkey
+//         |    ,IF(A.identity_num IS NULL AND D.identity_num IS NOT NULL,3,0) AS flag
+//         |    ,A.new_cid
+//         |    ,A.cid
+//         |    ,A.id
+//         |    ,A.name
+//         |    ,A.sex
+//         |    ,COALESCE(A.identity_num,D.identity_num) AS identity_num
+//         |    ,A.court_name
+//         |    ,A.case_final_time
+//         |    ,A.case_create_time
+//         |    ,A.case_no
+//         |    ,A.description
+//         |    ,A.no_exec_amount
+//         |    ,A.exec_amount
+//         |    ,A.source
+//         |    ,A.status
+//         |    ,A.appro_time
+//         |    ,A.create_time
+//         |    ,A.update_time
+//         |    ,A.deleted
+//         |FROM tmp_person_cloze_1 A
+//         |LEFT JOIN
+//         |tmp_person_cloze_1 D
+//         |ON A.name=D.name --根据实际数据情况
+//         |""".stripMargin
+//    )//.createOrReplaceTempView("tmp_person_cloze_3")
+
+    sql(
+      s"""
+         |insert ${if (isWindows) "INTO" else "OVERWRITE"} table $project.ads_judicial_case_relation_pre partition(ds='$lastDsIncAds',tn='zxr_final_case_person')
+         |select
+         |  judicase_id
+         |  ,flag
+         |  ,title
+         |  ,case_type
+         |  ,case_reason
+         |  ,case_no
+         |  ,court_name
+         |  ,case_stage
+         |  ,yg_name
+         |  ,bg_name
+         |  ,date
+         |  ,detail_id
+         |  ,case_amt
+         |from (
+         |      select
+         |      md5(cleanup(case_no)) as judicase_id
+         |      ,"12" as flag
+         |      ,concat_ws('',name,'终本案件') AS title
+         |      ,concat_ws('',case_type(case_no)) as case_type
+         |      ,NULL AS case_reason
+         |      ,case_no
+         |      ,court_name
+         |      ,concat_ws('',case_stage(case_no)) as case_stage
+         |      ,NULL as yg_name
+         |      ,name as bg_name
+         |      ,case_create_time as date--目前天眼查数据没有执行日期,先以此代替
+         |      ,rowkey as detail_id
+         |      ,exec_amount as case_amt
+         |      ,row_number() over(partition by rowkey order by update_time desc) num
+         |      from tmp_person_cloze_1
+         |      where length(case_no) > 0
+         |   )
+         |where num = 1
+         |""".stripMargin).show(10, false)
+
+  }
+}

+ 15 - 13
src/main/scala/com/winhc/bigdata/spark/jobs/JudicialCaseRelationPreNew.scala

@@ -1,4 +1,4 @@
-package com.winhc.bigdata.spark.jobs
+package com.winhc.bigdata.spark.jobs.judicial
 
 import com.winhc.bigdata.spark.udf.{BaseFunc, CompanyMapping, CourtRank}
 import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
@@ -131,7 +131,8 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
     sql(
       s"""
          |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table $project.ads_judicial_case_relation_pre partition(ds='$t1_ds',tn='wenshu')
-         |SELECT  a.judicase_id
+         |SELECT
+         |        COALESCE(a.judicase_id,b.new_judicase_id) judicase_id
          |        ,'0' as flag
          |        ,title
          |        ,case_type
@@ -148,11 +149,12 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |            SELECT  *
          |            FROM $project.xjk_ads_judicial_case_relation1_tmp
          |        ) a
-         |JOIN    (
-         |            SELECT * FROM
+         |RIGHT JOIN (
+         |            SELECT *,md5(cleanup(case_no)) as new_judicase_id
+         |             FROM
          |            (
          |              SELECT  *,row_number() over(partition by docid order by judge_date desc) num
-         |              FROM    $project.ods_justicase
+         |              FROM    $project.ods_wenshu_detail
          |              WHERE   ds > '0'
          |            )c
          |            where num = 1
@@ -337,8 +339,8 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |     ,case_no_trim(case_no) as case_no
          |     ,court_name
          |     ,case_stage
-         |     ,yg_name
-         |     ,bg_name
+         |     ,replace_char(yg_name) as yg_name
+         |     ,replace_char(bg_name) as bg_name
          |     ,date
          |     ,detail_id
          |     ,case_amt
@@ -365,8 +367,8 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |        ,case_stage
          |        ,case_label(flag) lable
          |        ,map_2_json(${getStrToMap(cols)}) as detail
-         |        ,yg_name
-         |        ,bg_name
+         |        ,replace_char(yg_name) as yg_name
+         |        ,replace_char(bg_name) as bg_name
          |        ,date
          |        ,detail_id
          |        ,case_amt
@@ -402,7 +404,7 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
     //司法案件主表
     sql(
       s"""
-         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $project.tmp_xf_judicial_case_relation_r1
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci.tmp_xf_judicial_case_relation_r1
          |SELECT  judicase_id
          |        ,max(first_title) title
          |        ,max(case_type) case_type
@@ -426,7 +428,7 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |        FROM    (
          |                   SELECT  *,court_level(court_name) court_level
          |                   FROM    $project.$t3
-         |                   WHERE   ds = '$t1_ds'
+         |                   WHERE   ds >= '$second_ds'
          |                ) a JOIN
          |                (
          |                   select *
@@ -439,7 +441,7 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
     //明细表
     sql(
       s"""
-         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $project.tmp_xf_judicial_case_relation_r2
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci.tmp_xf_judicial_case_relation_r2
          |SELECT  md5(concat_ws('',judicase_id,CLEANUP(case_no),case_stage)) id
          |        ,judicase_id
          |        ,max(first_title) title
@@ -461,7 +463,7 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |        FROM    (
          |                   SELECT  *
          |                   FROM    $project.$t3
-         |                   WHERE   ds = '$t1_ds'
+         |                   WHERE   ds >= '$second_ds'
          |                )a JOIN
          |                (
          |                   select *

+ 129 - 0
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForPersonCidsUtils.scala

@@ -0,0 +1,129 @@
+package com.winhc.bigdata.spark.utils
+
+import java.util.Date
+
+import com.winhc.bigdata.spark.udf.CompanyMapping
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.SparkSession
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @Author lyb
+ * @Date 2020/9/28
+ * @Description 增量ods到ads的仅针对人员的同步
+ */
+case class CompanyIncrForPersonCidsUtils(s: SparkSession,
+                                         project: String, //表所在工程名
+                                         tableName: String, //表名(不加前后辍)
+                                         dupliCols: Seq[String], // 去重列
+                                         updateCol: String = "update_time" //ROW_NUMBER窗口函数的ORDER BY字段,默认(可以不传参数)为update_time
+                                              ) extends LoggingUtils with CompanyMapping {
+  @(transient@getter) val spark: SparkSession = s
+
+  def calc(): Unit = {
+    println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
+
+    prepareFunctions(spark)
+
+    val inc_ods_company = s"${project}.inc_ods_company" //每日公司基本信息增量
+    val ads_company_tb = s"${project}.ads_${tableName}" //存量ads表
+    val ods_company_tb = s"${project}.ods_$tableName" //增量ods表
+    val inc_ods_company_tb = s"${project}.inc_ods_$tableName" //增量ods表
+    val inc_ads_company_tb = s"${project}.inc_ads_${tableName}_person" //增量ads表
+
+    //存量表ads最新分区
+    val remainDs = BaseUtil.getPartion(ads_company_tb, spark)
+
+    //增量ads最后一个分区
+    val lastDsIncAds = BaseUtil.getPartion(inc_ads_company_tb, spark)
+
+    val list = sql(s"show partitions $inc_ods_company_tb").collect.toList.map(_.getString(0).split("=")(1))
+    //增量ods第一个分区
+    val firstDsIncOds = list.head
+    //增量ods最后一个分区//落表分区
+    val lastDsIncOds = list.last
+    //执行分区
+    var runDs = ""
+    //table字段
+    val columns: Seq[String] = spark.table(ads_company_tb).schema.map(_.name).filter(s => {
+      !s.equals("ds") && !s.equals("new_cid") && !s.equals("cids") && !s.equals("rowkey")
+    })
+    //第一次run
+    if (StringUtils.isBlank(lastDsIncAds)) {
+      runDs = firstDsIncOds
+    } else { //非第一次分区时间 + 1天
+      runDs = BaseUtil.atDaysAfter(1, lastDsIncAds)
+    }
+
+    val cols_md5 = dupliCols.filter(!_.equals("new_cid"))
+
+    //增量ods和增量ads最后一个分区相等,跳出
+    if (lastDsIncOds.equals(lastDsIncAds)) {
+      println("inc_ods equals inc_ads ds ,please delete last ds !!!")
+      runDs = lastDsIncOds
+      //sys.exit(-1)
+    }
+
+    println(
+      s"""
+         |cols_md5:$cols_md5
+         |remainDs:$remainDs
+         |lastDsIncOds:$lastDsIncOds
+         |lastDsIncAds:$lastDsIncAds
+         |runDs:$runDs
+         |firstDsIncOds:$firstDsIncOds
+         |""".stripMargin)
+
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${inc_ads_company_tb} PARTITION(ds=$lastDsIncOds)
+         |SELECT  rowkey
+         |        ,flag
+         |        ,new_cid
+         |        ,cid
+         |        ,${columns.filter(s=>{s!="cid" && s!="new_cid"}).mkString(",")}
+         |FROM    (
+         |            SELECT  CONCAT_WS('_',MD5(cleanup(${dupliCols(0)})),MD5(cleanup(${cols_md5.drop(1).mkString("")}))) AS rowkey
+         |                    ,flag
+         |                    ,MD5(cleanup(${dupliCols(0)})) AS new_cid
+         |                    ,null AS cid
+         |                    ,${columns.filter(s=>{s!="cid" && s!="new_cid"}).mkString(",")}
+         |                    ,ROW_NUMBER() OVER (PARTITION BY cleanup(CONCAT_WS('',${dupliCols.mkString(",")})) ORDER BY NVL($updateCol,update_time) DESC ) num
+         |            FROM    (
+         |                        SELECT  "1" AS flag
+         |                                ,${columns.mkString(",")}
+         |                        FROM    ${inc_ods_company_tb}
+         |                        WHERE   ds >= ${runDs}
+         |                        AND     cids IS NULL
+         |                        AND     ${dupliCols.mkString(""," IS NOT NULL AND "," IS NOT NULL")}
+         |                    ) a
+         |        ) b
+         |WHERE   num = 1
+         |AND     CONCAT_WS('',${cols_md5.mkString(",")}) IS NOT NULL
+         |AND     trim(CONCAT_WS('',${cols_md5.mkString(",")})) <> ''
+         |""".stripMargin)
+
+    val colsTotal = columns ++ Seq("new_cid")
+
+    MaxComputer2Phoenix(
+      spark,
+      colsTotal,
+      inc_ads_company_tb,
+      tableName+"_PERSON",
+      lastDsIncOds,
+      s"CONCAT_WS('_',new_cid,MD5(cleanup(${cols_md5.drop(1).mkString("")})))"
+    ).syn()
+
+//    CompanyIncSummary(spark, project, tableName, "new_cid", dupliCols).calc
+
+    println(s"${this.getClass.getSimpleName} calc end! " + new Date().toString)
+  }
+}
+
+
+
+
+