Ver Fonte

增加人员移除逻辑

xufei há 3 anos atrás
pai
commit
3087637842

+ 1 - 0
src/main/scala/com/winhc/bigdata/spark/ng/relation/inc_company_relation.scala

@@ -13,6 +13,7 @@ import scala.collection.mutable
  * @author: π
  * @date: 2021/01/10 09:36
  */
+@deprecated
 case class inc_company_relation(s: SparkSession,
                                 project: String, //表所在工程名
                                 ds: String //分区

+ 201 - 125
src/main/scala/com/winhc/bigdata/spark/ng/relation/inc_company_relation_v2.scala

@@ -22,10 +22,14 @@ case class inc_company_relation_v2(s: SparkSession,
   val inc_ads_company = "winhc_ng.inc_ads_company"
   val ads_company = "winhc_ng.ads_company"
   val inc_ads_company_legal_entity = "winhc_ng.inc_ads_company_legal_entity"
+  val ads_company_holder = "winhc_ng.ads_company_holder"
   val inc_ads_company_holder = "winhc_ng.inc_ads_company_holder"
   val inc_ads_company_human_relation = "winhc_ng.inc_ads_company_human_relation"
   val inc_ads_company_human_relation_merge = "winhc_ng.inc_ads_company_human_relation_merge"
+  val inc_ads_company_human_relation_deleted = "winhc_ng.inc_ads_company_human_relation_deleted"
+  val inc_ads_company_human_relation_update = "winhc_ng.inc_ads_company_human_relation_update"
   val inc_ads_company_staff = "winhc_ng.inc_ads_company_staff"
+  val ads_company_staff = "winhc_ng.ads_company_staff"
 
   val ads_change_extract = "winhc_ng.bds_change_extract"
 
@@ -53,7 +57,6 @@ case class inc_company_relation_v2(s: SparkSession,
                                   end_name: String, deleted: Int, legal_entity_type: String, topic_type: String): String =
       CompanyRelationUtils.get_relation_legal_entity(start_id, start_name, end_id, end_name, deleted, legal_entity_type, topic_type)
 
-
     spark.udf.register("get_company_node", get_company_node _)
     spark.udf.register("get_person_node", get_person_node _)
     spark.udf.register("get_relation_holder", get_relation_holder _)
@@ -144,7 +147,7 @@ case class inc_company_relation_v2(s: SparkSession,
          | WHERE   ds > '0'
          | -- AND     deleted <> 9
          | union all
-         | SELECT  CONCAT_WS('_',company_id,hash(legal_entity_name)) AS rowkey
+         | SELECT   CONCAT_WS('_',company_id,hash(legal_entity_name)) AS rowkey
          |         ,company_id
          |         ,name AS company_name
          |         ,legal_entity_id
@@ -152,7 +155,7 @@ case class inc_company_relation_v2(s: SparkSession,
          |         ,legal_entity_type
          |         ,create_time
          |         ,update_time
-         |          ,province_code,city_code,county_code
+         |         ,province_code,city_code,county_code
          |         ,reg_capital_amount,cate_first_code
          |         ,cate_second_code,cate_third_code
          |         ,ds
@@ -164,73 +167,130 @@ case class inc_company_relation_v2(s: SparkSession,
          |WHERE num = 1
          |""".stripMargin).createOrReplaceTempView("company_view_all")
 
-    //新增人员关系表
-    //TODO 过滤 deleted = 1
+    //TODO GET 新增-更新-关系表
     sql(
       s"""
-         |INSERT INTO TABLE $inc_ads_company_human_relation PARTITION(ds= '$ds')
-         |SELECT  md5(CONCAT_WS('_',a.company_id,a.human_pid)) as rowkey
-         |        ,a.company_id
-         |        ,a.company_name
-         |        ,a.human_name
-         |        ,hash(a.human_name) AS hid
-         |        ,a.human_pid
-         |        ,a.STATUS
-         |        ,a.create_time
-         |        ,a.update_time
-         |        ,a.deleted
-         |        ,c.province_code,c.city_code,c.county_code
-         |        ,c.reg_capital_amount,c.cate_first_code
-         |        ,c.cate_second_code,c.cate_third_code
-         |FROM    (
-         |            SELECT  *,concat('p',md5(uuid())) human_pid
-         |            FROM    (
-         |                        SELECT  company_id,company_name,name_cleanup(human_name) human_name,hid,status,create_time,update_time,0 as deleted
-         |                                ,ROW_NUMBER() OVER (PARTITION BY company_id,human_name ORDER BY ds desc,update_time desc) num
-         |                        FROM    (
-         |                                    --股东
-         |                                    SELECT  company_id,company_name,holder_name human_name,holder_id hid,2 as status,create_time,update_time,0 as deleted,ds
-         |                                    FROM    $inc_ads_company_holder
-         |                                    WHERE   ds = '$ds' AND holder_type = 1
-         |                                    UNION ALL
-         |                                    --主要成员
-         |                                    SELECT  company_id,company_name,staff_name human_name,hid,2 as status,create_time,update_time,0 as deleted,ds
-         |                                    FROM    $inc_ads_company_staff
-         |                                    WHERE   ds = '$ds'
-         |                                    UNION ALL
-         |                                    --法人
-         |                                    SELECT  company_id,company_name,legal_entity_name human_name,legal_entity_id as hid,2 as status,create_time,update_time,0 as deleted,ds
-         |                                    FROM    $inc_ads_company_legal_entity
-         |                                    WHERE   ds = '$ds' AND legal_entity_type = 1
-         |                                )
-         |                    )
-         |            WHERE   num = 1 AND name_cleanup(human_name) <> ''
-         |        ) a
-         |LEFT JOIN (
-         |    SELECT  company_id,human_name,human_pid
-         |    FROM    (
-         |               SELECT  company_id,human_name,human_pid
-         |                                     ,ROW_NUMBER() OVER (PARTITION BY company_id,human_name ORDER BY ds desc,update_time desc) num
-         |               FROM(
-         |                    SELECT  company_id,human_name,human_pid,ds,update_time
-         |                    FROM    $inc_ads_company_human_relation
+         |SELECT  b.company_id,b.human_name,b.deleted,b.create_time,b.update_time
+         |FROM
+         |(
+         |   SELECT  *
+         |   FROM    (
+         |               SELECT  company_id,company_name,name_cleanup(human_name) human_name,hid,status,create_time,update_time
+         |                       ,ROW_NUMBER() OVER (PARTITION BY company_id ORDER BY ds desc,update_time desc) num
+         |               FROM    (
+         |                           --股东
+         |                           SELECT  company_id,company_name,holder_name human_name,holder_id hid,2 as status,create_time,update_time,ds
+         |                           FROM    $inc_ads_company_holder
+         |                           WHERE   ds = '$ds' AND holder_type = 1
+         |                           UNION ALL
+         |                           --主要成员
+         |                           SELECT  company_id,company_name,staff_name human_name,hid,2 as status,create_time,update_time,ds
+         |                           FROM    $inc_ads_company_staff
+         |                           WHERE   ds = '$ds'
+         |                           UNION ALL
+         |                           --法人
+         |                           SELECT  company_id,company_name,legal_entity_name human_name,legal_entity_id as hid,2 as status,create_time,update_time,ds
+         |                           FROM    $inc_ads_company_legal_entity
+         |                           WHERE   ds = '$ds' AND legal_entity_type = 1
+         |                       )
+         |           )
+         |   WHERE   num = 1
+         |   -- AND name_cleanup(human_name) <> ''
+         |) a JOIN
+         |(
+         |  SELECT *
+         |  FROM (
+         |         SELECT company_id,name_cleanup(human_name) human_name,create_time,update_time
+         |             ,ROW_NUMBER() OVER (PARTITION BY company_id,human_name ORDER BY ds desc,update_time desc) num
+         |             ,MIN(deleted) OVER (PARTITION BY company_id,human_name) deleted
+         |              FROM (
+         |                    --股东
+         |                    SELECT  rowkey,company_id,company_name,holder_name human_name,holder_id hid,2 as status,create_time,update_time,deleted,ds,1 label
+         |                    FROM    $inc_ads_company_holder
+         |                    WHERE   ds > '0'
+         |                    AND holder_type = 1
+         |                    UNION ALL
+         |                    SELECT  rowkey,company_id,company_name,holder_name human_name,holder_id hid,2 as status,create_time,update_time,deleted,ds,1 label
+         |                    FROM    $ads_company_holder
          |                    WHERE   ds > '0'
+         |                    AND holder_type = 1
          |                    UNION ALL
-         |                    SELECT  company_id,human_name,human_pid,ds,update_time
-         |                    FROM    $inc_ads_company_human_relation_merge
+         |                    --主要成员
+         |                    SELECT  rowkey,company_id,company_name,staff_name human_name,hid,2 as status,create_time,update_time,deleted,ds,2 label
+         |                    FROM    $inc_ads_company_staff
          |                    WHERE   ds > '0'
-         |                    AND     deleted = 0
+         |                    UNION ALL
+         |                    SELECT  rowkey,company_id,company_name,staff_name human_name,hid,2 as status,create_time,update_time,deleted,ds,2 label
+         |                    FROM    $ads_company_staff
+         |                    WHERE   ds > '0'
+         |                    UNION ALL
+         |                    --法人
+         |                    SELECT  rowkey,company_id,company_name,legal_entity_name human_name,legal_entity_id as hid,2 as status,create_time,update_time,deleted,ds,3 label
+         |                    FROM    $inc_ads_company_legal_entity
+         |                    WHERE   ds > '0'
+         |                    AND legal_entity_type = 1
          |                 )
-         |              )
-         |    WHERE   num = 1
-         |) b
+         |      ) WHERE NUM = 1
+         |) b on a.company_id = b.company_id
+         |""".stripMargin).createOrReplaceTempView("incr_company_human_all")
+
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $inc_ads_company_human_relation PARTITION(ds= '$ds')
+         |SELECT
+         |     CONCAT_WS('_',a.company_id,COALESCE(b.human_pid, a.human_pid)) AS rowkey
+         |     ,a.company_id
+         |     ,name_cleanup(c.company_name) company_name
+         |     ,a.human_name
+         |     ,hash(a.human_name) hid
+         |     ,COALESCE(b.human_pid, a.human_pid)  human_pid
+         |     ,2 STATUS
+         |     ,a.create_time
+         |     ,a.update_time
+         |     ,a.deleted
+         |     ,c.province_code
+         |     ,c.city_code
+         |     ,c.county_code
+         |     ,c.reg_capital_amount
+         |     ,c.cate_first_code
+         |     ,c.cate_second_code
+         |     ,c.cate_third_code
+         |FROM    (
+         |        SELECT  company_id
+         |                ,human_name
+         |                ,deleted
+         |                ,create_time
+         |                ,update_time
+         |                ,concat('p',md5(uuid())) human_pid
+         |        FROM    incr_company_human_all
+         |        ) a
+         |LEFT JOIN (
+         |        SELECT  company_id
+         |                ,human_name
+         |                ,human_pid
+         |        FROM    (
+         |                SELECT  company_id,human_name,human_pid
+         |                        ,ROW_NUMBER()OVER (PARTITION BY company_id, human_name ORDER BY ds DESC , update_time DESC ) num
+         |                FROM    (
+         |                        SELECT  company_id,human_name,human_pid,ds,update_time,deleted
+         |                        FROM    $inc_ads_company_human_relation
+         |                        WHERE   ds > '0'
+         |                        AND deleted <> 9
+         |                        UNION ALL
+         |                        SELECT  company_id,human_name,human_pid,ds,update_time,deleted
+         |                        FROM    $inc_ads_company_human_relation_merge
+         |                        WHERE   ds > '0'
+         |                        AND deleted <> 9
+         |                        )
+         |                )
+         |        WHERE   num =1
+         |          ) b
          |ON      a.company_id = b.company_id
          |AND     name_cleanup(a.human_name) = name_cleanup(b.human_name)
          |JOIN (
          |SELECT * FROM company_view_all
          |) c
          |ON      a.company_id = c.company_id
-         |WHERE   b.company_id IS NULL
          |""".stripMargin)
 
     //全量人员关系表
@@ -244,11 +304,12 @@ case class inc_company_relation_v2(s: SparkSession,
          |                     SELECT  company_id,human_name,human_pid,ds,update_time
          |                     FROM    $inc_ads_company_human_relation
          |                     WHERE   ds > '0'
+         |                     AND     deleted <> 9
          |                     UNION ALL
          |                     SELECT  company_id,human_name,human_pid,ds,update_time
          |                     FROM    $inc_ads_company_human_relation_merge
          |                     WHERE   ds > '0'
-         |                     AND     deleted = 0
+         |                     AND     deleted <> 9
          |              )
          |          )
          |WHERE   num = 1
@@ -372,54 +433,6 @@ case class inc_company_relation_v2(s: SparkSession,
          |AND length(start_name) <> 0  AND length(end_name) <> 0
          |""".stripMargin)
 
-    //二次修正human表,保证和增量人对齐
-    sql(
-      s"""
-         |INSERT OVERWRITE TABLE $inc_ads_company_human_relation PARTITION(ds='$ds')
-         |SELECT   a.rowkey
-         |        ,a.company_id
-         |        ,a.company_name
-         |        ,a.human_name
-         |        ,a.hid
-         |        ,a.human_pid
-         |        ,a.STATUS
-         |        ,a.create_time
-         |        ,a.update_time
-         |        ,a.deleted
-         |        ,a.province_code
-         |        ,a.city_code
-         |        ,a.county_code
-         |        ,a.reg_capital_amount
-         |        ,a.cate_first_code
-         |        ,a.cate_second_code
-         |        ,a.cate_third_code
-         |FROM    (
-         |            SELECT  *
-         |            FROM    $inc_ads_company_human_relation
-         |            WHERE   ds = '$ds'
-         |        ) a
-         |JOIN (
-         |              SELECT  start_id
-         |              FROM    (
-         |                          SELECT  start_id
-         |                          FROM    $inc_ads_relation_holder
-         |                          WHERE   ds = '$ds'
-         |                          AND     holder_type = 1
-         |                          UNION ALL
-         |                          SELECT  start_id
-         |                          FROM    $inc_ads_relation_staff
-         |                          WHERE   ds = '$ds'
-         |                          UNION ALL
-         |                          SELECT  start_id
-         |                          FROM    $inc_ads_relation_legal_entity
-         |                          WHERE   ds = '$ds'
-         |                          AND     legal_entity_type = 1
-         |                      )
-         |              GROUP BY start_id
-         |          ) b
-         |ON      a.human_pid = b.start_id
-         |""".stripMargin)
-
   }
 
   def sendKafkaPre(): Unit = {
@@ -429,7 +442,6 @@ case class inc_company_relation_v2(s: SparkSession,
     val inc_ads_relation_staff_kafka = "winhc_ng.inc_ads_relation_staff_kafka"
     val inc_ads_relation_legal_entity_v1_kafka = "winhc_ng.inc_ads_relation_legal_entity_v1_kafka"
     val inc_ads_relation_legal_entity_v2_kafka = "winhc_ng.inc_ads_relation_legal_entity_v2_kafka"
-    val inc_ads_person_node_label_kafka = "winhc_ng.inc_ads_person_node_label_kafka"
     //公司节点
     sql(
       s"""
@@ -500,22 +512,6 @@ case class inc_company_relation_v2(s: SparkSession,
          |where ds = '$ds'
          |""".stripMargin).show(20, false)
 
-//    //新增或者更新-投资人-打标签
-//    sql(
-//      s"""
-//         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table $inc_ads_person_node_label_kafka  PARTITION (ds='$ds')
-//         |select key,max(message) message
-//         |from
-//         |(
-//         |  select
-//         |  start_id key,
-//         |  get_person_node(start_id, start_name, deleted, '7') message
-//         |  from $inc_ads_relation_holder
-//         |  where ds = '$ds' and holder_type = 1
-//         |)
-//         |group by key
-//         |""".stripMargin).show(20, false)
-
     //防止空分区
     addEmptyPartitionOrSkip(inc_ads_company_node_kafka, ds)
     addEmptyPartitionOrSkip(inc_ads_relation_holder_v1_kafka, ds)
@@ -523,10 +519,46 @@ case class inc_company_relation_v2(s: SparkSession,
     addEmptyPartitionOrSkip(inc_ads_relation_staff_kafka, ds)
     addEmptyPartitionOrSkip(inc_ads_relation_legal_entity_v1_kafka, ds)
     addEmptyPartitionOrSkip(inc_ads_relation_legal_entity_v2_kafka, ds)
-    //addEmptyPartitionOrSkip(inc_ads_person_node_label_kafka, ds)
 
   }
 
+  //导出ES数据准备
+  def exportESPre(): Unit = {
+
+    //删除数据
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $inc_ads_company_human_relation_deleted PARTITION (ds='$ds')
+         |SELECT
+         |        human_pid
+         |        ,max(deleted) deleted
+         |FROM    (
+         |        SELECT
+         |                human_pid
+         |                ,deleted
+         |        FROM    $inc_ads_company_human_relation
+         |        WHERE   ds = '$ds'
+         |        AND deleted = 9
+         |        UNION ALL
+         |        SELECT  human_pid,deleted
+         |        FROM    (
+         |                SELECT
+         |                        human_pid
+         |                        ,MIN(deleted) OVER(PARTITION BY human_pid ) deleted
+         |                FROM    $inc_ads_company_human_relation
+         |                WHERE   ds = '$ds'
+         |                AND deleted <> 9
+         |                )
+         |        WHERE   deleted > 0
+         |        )
+         |GROUP BY human_pid
+         |""".stripMargin)
+
+    //合并数据
+    inc_human_relation_util
+      .merge(s, ds, inc_ads_company_human_relation, inc_ads_company_human_relation_update)
+  }
+
 }
 
 
@@ -547,6 +579,50 @@ object inc_company_relation_v2 {
     re.register_fun()
     re.inc()
     re.sendKafkaPre()
+    re.exportESPre()
     spark.stop()
   }
 }
+
+//    sql(
+//      s"""
+//         |SELECT  a.*
+//         |FROM (
+//         |SELECT  *
+//         |FROM    (
+//         |           SELECT  *
+//         |                ,ROW_NUMBER() OVER (PARTITION BY company_id,human_pid ORDER BY ds desc,update_time desc) num
+//         |           FROM  (
+//         |                     SELECT  *
+//         |                     FROM    $inc_ads_company_human_relation
+//         |                     WHERE   ds > '0'
+//         |                     -- AND     deleted <> 9
+//         |                     UNION ALL
+//         |                     SELECT  *
+//         |                     FROM    $inc_ads_company_human_relation_merge
+//         |                     WHERE   ds > '0'
+//         |                     -- AND     deleted <> 9
+//         |              )
+//         |          )
+//         |WHERE   num = 1
+//         |) a
+//         |JOIN (
+//         |              SELECT  start_id
+//         |              FROM    (
+//         |                          SELECT  start_id
+//         |                          FROM    $inc_ads_relation_holder
+//         |                          WHERE   ds = '$ds'
+//         |                          AND     holder_type = 1
+//         |                          UNION ALL
+//         |                          SELECT  start_id
+//         |                          FROM    $inc_ads_relation_staff
+//         |                          WHERE   ds = '$ds'
+//         |                          UNION ALL
+//         |                          SELECT  start_id
+//         |                          FROM    $inc_ads_relation_legal_entity
+//         |                          WHERE   ds = '$ds'
+//         |                          AND     legal_entity_type = 1
+//         |                      )
+//         |              GROUP BY start_id
+//         | ) b on a.human_pid = b.start_id
+//         |""".stripMargin).createOrReplaceTempView("update_tab")

+ 164 - 11
src/main/scala/com/winhc/bigdata/spark/ng/relation/inc_human_relation_merge.scala

@@ -16,7 +16,7 @@ import scala.collection.mutable
 case class inc_human_relation_merge(s: SparkSession,
                                     project: String, //表所在工程名
                                     ds: String //分区
-                                  ) extends LoggingUtils with BaseFunc with CompanyMapping {
+                                   ) extends LoggingUtils with BaseFunc with CompanyMapping {
   @(transient@getter) val spark: SparkSession = s
 
   val inc_ads_company = "winhc_ng.inc_ads_company"
@@ -26,6 +26,8 @@ case class inc_human_relation_merge(s: SparkSession,
   val inc_ads_company_human_relation = "winhc_ng.inc_ads_company_human_relation"
   val inc_ods_company_human_relation_merge = "winhc_ng.inc_ods_company_human_relation_merge"
   val inc_ads_company_human_relation_merge = "winhc_ng.inc_ads_company_human_relation_merge"
+  val inc_ads_company_human_relation_merge_deleted = "winhc_ng.inc_ads_company_human_relation_merge_deleted"
+  val inc_ads_company_human_relation_merge_update = "winhc_ng.inc_ads_company_human_relation_merge_update"
   val inc_ads_company_staff = "winhc_ng.inc_ads_company_staff"
 
   val ads_change_extract = "winhc_ng.bds_change_extract"
@@ -63,15 +65,21 @@ case class inc_human_relation_merge(s: SparkSession,
          |        ,b.cate_second_code
          |        ,b.cate_third_code
          |FROM    (
-         |            SELECT  company_id
-         |                    ,person_name
-         |                    ,person_id
-         |                    ,create_time
-         |                    ,update_time
-         |                    ,deleted
-         |            FROM    $inc_ods_company_human_relation_merge
-         |            WHERE   ds = '$ds'
-         |            AND     deleted = 0
+         |           SELECT  *
+         |           FROM    (
+         |                       SELECT  company_id
+         |                               ,person_name
+         |                               ,person_id
+         |                               ,create_time
+         |                               ,update_time
+         |                               ,label
+         |                               ,ROW_NUMBER() OVER (PARTITION BY company_id,person_id ORDER BY label) num
+         |                               ,MIN(deleted) OVER (PARTITION BY company_id,person_id) deleted
+         |                       FROM    $inc_ods_company_human_relation_merge
+         |                       WHERE   ds = '$ds'
+         |                       AND     length(company_id) = 32
+         |                   )
+         |           WHERE num = 1
          |        ) a
          |JOIN    (
          |            SELECT  *
@@ -138,22 +146,61 @@ case class inc_human_relation_merge(s: SparkSession,
          |        ,deleted
          |FROM    $inc_ods_company_human_relation_merge
          |WHERE   ds = '$ds'
-         |AND     deleted = 1
+         |AND     length(company_id) = 0
          |) a
          |JOIN (
          |    SELECT *
          |    FROM  $inc_ads_company_human_relation_merge
          |    WHERE   ds < '$ds'
+         |    AND     deleted <> 9
          |    UNION ALL
          |    SELECT *
          |    FROM  $inc_ads_company_human_relation
          |    WHERE   ds > '0'
+         |    AND     deleted <> 9
          |) b
          |on a.person_id = b.human_pid
          |""".stripMargin)
 
   }
 
+  //导出ES数据准备
+  def exportESPre(): Unit = {
+    //删除数据
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $inc_ads_company_human_relation_merge_deleted PARTITION (ds='$ds')
+         |SELECT
+         |        human_pid
+         |        ,max(deleted) deleted
+         |FROM    (
+         |        SELECT
+         |                human_pid
+         |                ,deleted
+         |        FROM    $inc_ads_company_human_relation_merge
+         |        WHERE   ds = '$ds'
+         |        AND deleted = 9
+         |        UNION ALL
+         |        SELECT  human_pid,deleted
+         |        FROM    (
+         |                SELECT
+         |                        human_pid
+         |                        ,MIN(deleted) OVER(PARTITION BY human_pid ) deleted
+         |                FROM    $inc_ads_company_human_relation_merge
+         |                WHERE   ds = '$ds'
+         |                AND deleted <> 9
+         |                )
+         |        WHERE   deleted > 0
+         |        )
+         |GROUP BY human_pid
+         |""".stripMargin)
+
+    //增量合并数据
+    inc_human_relation_util
+      .merge(s, ds, inc_ads_company_human_relation_merge, inc_ads_company_human_relation_merge_update)
+
+  }
+
 }
 
 
@@ -173,8 +220,114 @@ object inc_human_relation_merge {
     val re = inc_human_relation_merge(s = spark, project = project, ds = ds)
     re.register_fun()
     re.inc()
+    re.exportESPre()
     spark.stop()
   }
 }
 
+object inc_human_relation_util {
+  def merge(spark: SparkSession, ds: String, source_tab: String, target_tab: String): Unit = {
+    //合并逻辑
+    spark.sql(
+      s"""
+         |SELECT  *
+         |        ,ROW_NUMBER() OVER(PARTITION BY human_pid ORDER BY tmp_val desc,province_num DESC ,human_pid) AS num_rank
+         |FROM    (
+         |            SELECT  human_pid
+         |                    ,case
+         |                     when province_code = '' then 0
+         |                     when province_code = '0' then 0
+         |                     when province_code is null then 0
+         |                     else 1
+         |                     end as tmp_val
+         |                    ,COLLECT_SET(human_name)[0] AS human_name
+         |                    ,province_code
+         |                    ,COUNT(1) AS province_num
+         |                    ,CONCAT_WS(',',collect_set(area_code)) AS area_code
+         |                    ,CONCAT_WS(',',collect_set(cate_code)) AS cate_code
+         |                    ,COLLECT_SET(first_company)[0] AS first_company
+         |                    ,COLLECT_SET(first_company_reg_capital_amount)[0] AS first_company_reg_capital_amount
+         |                    ,COLLECT_SET(total_compny_cnt)[0] AS total_compny_cnt
+         |            FROM    (
+         |                        SELECT  company_id
+         |                                ,human_name
+         |                                ,human_pid
+         |                                ,deleted
+         |                                ,CONCAT_WS('|', province_code,city_code,county_code) AS area_code
+         |                                ,case
+         |                                   when province_code = '' then '0'
+         |                                   when province_code = '0' then '0'
+         |                                   when province_code is null then '0'
+         |                                else province_code
+         |                                end as  province_code
+         |                                ,CONCAT_WS('|', cate_first_code,cate_second_code,cate_third_code) AS cate_code
+         |                                ,FIRST_VALUE(company_id) OVER (PARTITION BY human_pid,province_code ORDER BY reg_capital_amount DESC) AS first_company
+         |                                ,FIRST_VALUE(reg_capital_amount) OVER (PARTITION BY human_pid,province_code ORDER BY reg_capital_amount DESC) AS first_company_reg_capital_amount
+         |                                ,COUNT(company_id) OVER(PARTITION BY human_pid) AS total_compny_cnt
+         |                        FROM    (
+         |                                    SELECT  *
+         |                                    FROM    $source_tab
+         |                                    WHERE   ds = "$ds"
+         |                                    AND deleted = 0
+         |                                )
+         |                    )
+         |            GROUP BY human_pid
+         |                     ,province_code
+         |        )
+         |""".stripMargin).createOrReplaceTempView("tmp_tab")
+
+    spark.sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table $target_tab PARTITION (ds='$ds')
+         |SELECT  human_pid
+         |        ,COLLECT_SET(human_name)[0] AS human_name
+         |        ,CONCAT_ws(',',COLLECT_SET(area_code)) AS area_code
+         |        ,CONCAT_ws(',',COLLECT_SET(cate_code)) AS cate_code
+         |        ,CONCAT('[',CONCAT_WS(',',collect_set(code_company_num_tup)),']') AS code_company_num_map
+         |        ,0 deleted
+         |        ,$ds as update_time
+         |        ,COLLECT_SET(total_compny_cnt)[0] AS total_compny_cnt
+         |FROM    (
+         |            SELECT  human_pid
+         |                    ,COLLECT_SET(human_name)[0] AS human_name
+         |                    ,CONCAT_WS(',',COLLECT_SET(area_code)) AS area_code
+         |                    ,CONCAT_WS(',',COLLECT_SET(cate_code)) AS cate_code
+         |                    ,CONCAT(
+         |                        '{"province_code":"0","province_num":"',sum(province_num),'","company_id":"',collect_set(first_company)[0],'"}'
+         |                    ) AS code_company_num_tup
+         |                    ,COLLECT_SET(total_compny_cnt)[0] AS total_compny_cnt
+         |            FROM    (
+         |                        SELECT  human_pid
+         |                                ,human_name
+         |                                ,province_num
+         |                                ,area_code
+         |                                ,cate_code
+         |                                ,first_company_reg_capital_amount
+         |                                ,FIRST_VALUE(first_company) OVER (PARTITION BY human_pid ORDER BY first_company_reg_capital_amount DESC) AS first_company
+         |                                ,total_compny_cnt
+         |                        FROM    tmp_tab
+         |                        WHERE   num_rank > 2
+         |                    )
+         |            GROUP BY human_pid
+         |            UNION ALL
+         |            SELECT  human_pid
+         |                    ,human_name
+         |                    ,area_code
+         |                    ,cate_code
+         |                    ,CONCAT(
+         |                        '{"province_code":"'
+         |                        ,province_code
+         |                        ,'","province_num":"',province_num
+         |                        ,'","company_id":"',first_company
+         |                        ,'"}'
+         |                    ) AS code_company_num_tup
+         |                    ,total_compny_cnt
+         |            FROM    tmp_tab
+         |            WHERE   num_rank <= 2
+         |        )
+         |GROUP BY human_pid
+         |""".stripMargin)
+  }
+}
+