Prechádzať zdrojové kódy

Merge remote-tracking branch 'origin/master'

许家凯 3 rokov pred
rodič
commit
6a23770f43

+ 1 - 0
src/main/scala/com/winhc/bigdata/spark/ng/relation/inc_company_relation.scala

@@ -13,6 +13,7 @@ import scala.collection.mutable
  * @author: π
  * @date: 2021/01/10 09:36
  */
+@deprecated
 case class inc_company_relation(s: SparkSession,
                                 project: String, //表所在工程名
                                 ds: String //分区

+ 219 - 128
src/main/scala/com/winhc/bigdata/spark/ng/relation/inc_company_relation_v2.scala

@@ -22,9 +22,14 @@ case class inc_company_relation_v2(s: SparkSession,
   val inc_ads_company = "winhc_ng.inc_ads_company"
   val ads_company = "winhc_ng.ads_company"
   val inc_ads_company_legal_entity = "winhc_ng.inc_ads_company_legal_entity"
+  val ads_company_holder = "winhc_ng.ads_company_holder"
   val inc_ads_company_holder = "winhc_ng.inc_ads_company_holder"
   val inc_ads_company_human_relation = "winhc_ng.inc_ads_company_human_relation"
+  val inc_ads_company_human_relation_merge = "winhc_ng.inc_ads_company_human_relation_merge"
+  val inc_ads_company_human_relation_deleted = "winhc_ng.inc_ads_company_human_relation_deleted"
+  val inc_ads_company_human_relation_update = "winhc_ng.inc_ads_company_human_relation_update"
   val inc_ads_company_staff = "winhc_ng.inc_ads_company_staff"
+  val ads_company_staff = "winhc_ng.ads_company_staff"
 
   val ads_change_extract = "winhc_ng.bds_change_extract"
 
@@ -52,7 +57,6 @@ case class inc_company_relation_v2(s: SparkSession,
                                   end_name: String, deleted: Int, legal_entity_type: String, topic_type: String): String =
       CompanyRelationUtils.get_relation_legal_entity(start_id, start_name, end_id, end_name, deleted, legal_entity_type, topic_type)
 
-
     spark.udf.register("get_company_node", get_company_node _)
     spark.udf.register("get_person_node", get_person_node _)
     spark.udf.register("get_relation_holder", get_relation_holder _)
@@ -141,11 +145,9 @@ case class inc_company_relation_v2(s: SparkSession,
          |         ,ds
          | FROM    $inc_ads_company
          | WHERE   ds > '0'
-         | AND     legal_entity_id IS NOT NULL
-         | AND     length(trim(legal_entity_name)) > 0
          | -- AND     deleted <> 9
          | union all
-         | SELECT  CONCAT_WS('_',company_id,hash(legal_entity_name)) AS rowkey
+         | SELECT   CONCAT_WS('_',company_id,hash(legal_entity_name)) AS rowkey
          |         ,company_id
          |         ,name AS company_name
          |         ,legal_entity_id
@@ -153,78 +155,143 @@ case class inc_company_relation_v2(s: SparkSession,
          |         ,legal_entity_type
          |         ,create_time
          |         ,update_time
-         |          ,province_code,city_code,county_code
+         |         ,province_code,city_code,county_code
          |         ,reg_capital_amount,cate_first_code
          |         ,cate_second_code,cate_third_code
          |         ,ds
          | FROM    $ads_company
          | WHERE   ds > '0'
-         | AND     legal_entity_id IS NOT NULL
-         | AND     length(trim(legal_entity_name)) > 0
          | -- AND     deleted <> 9
          |)
          |)c
          |WHERE num = 1
          |""".stripMargin).createOrReplaceTempView("company_view_all")
 
-    //新增人员关系表
+    //TODO GET 新增-更新-关系表
     sql(
       s"""
-         |INSERT INTO TABLE $inc_ads_company_human_relation PARTITION(ds= '$ds')
-         |SELECT  md5(CONCAT_WS('_',company_id,human_pid)) as rowkey
-         |        ,a.company_id
-         |        ,a.company_name
-         |        ,a.human_name
-         |        ,hash(a.human_name) AS hid
-         |        ,concat('p',md5(uuid())) human_pid
-         |        ,a.STATUS
-         |        ,a.create_time
-         |        ,a.update_time
-         |        ,a.deleted
-         |        ,c.province_code,c.city_code,c.county_code
-         |        ,c.reg_capital_amount,c.cate_first_code
-         |        ,c.cate_second_code,c.cate_third_code
+         |SELECT  b.company_id,b.human_name,b.deleted,b.create_time,b.update_time
+         |FROM
+         |(
+         |   SELECT  *
+         |   FROM    (
+         |               SELECT  company_id,company_name,name_cleanup(human_name) human_name,hid,status,create_time,update_time
+         |                       ,ROW_NUMBER() OVER (PARTITION BY company_id ORDER BY ds desc,update_time desc) num
+         |               FROM    (
+         |                           --股东
+         |                           SELECT  company_id,company_name,holder_name human_name,holder_id hid,2 as status,create_time,update_time,ds
+         |                           FROM    $inc_ads_company_holder
+         |                           WHERE   ds = '$ds' AND holder_type = 1
+         |                           UNION ALL
+         |                           --主要成员
+         |                           SELECT  company_id,company_name,staff_name human_name,hid,2 as status,create_time,update_time,ds
+         |                           FROM    $inc_ads_company_staff
+         |                           WHERE   ds = '$ds'
+         |                           UNION ALL
+         |                           --法人
+         |                           SELECT  company_id,company_name,legal_entity_name human_name,legal_entity_id as hid,2 as status,create_time,update_time,ds
+         |                           FROM    $inc_ads_company_legal_entity
+         |                           WHERE   ds = '$ds' AND legal_entity_type = 1
+         |                       )
+         |           )
+         |   WHERE   num = 1
+         |   -- AND name_cleanup(human_name) <> ''
+         |) a JOIN
+         |(
+         |  SELECT *
+         |  FROM (
+         |         SELECT company_id,name_cleanup(human_name) human_name,create_time,update_time
+         |             ,ROW_NUMBER() OVER (PARTITION BY company_id,human_name ORDER BY ds desc,update_time desc) num
+         |             ,MIN(deleted) OVER (PARTITION BY company_id,human_name) deleted
+         |              FROM (
+         |                    --股东
+         |                    SELECT  rowkey,company_id,company_name,holder_name human_name,holder_id hid,2 as status,create_time,update_time,deleted,ds,1 label
+         |                    FROM    $inc_ads_company_holder
+         |                    WHERE   ds > '0'
+         |                    AND holder_type = 1
+         |                    UNION ALL
+         |                    SELECT  rowkey,company_id,company_name,holder_name human_name,holder_id hid,2 as status,create_time,update_time,deleted,ds,1 label
+         |                    FROM    $ads_company_holder
+         |                    WHERE   ds > '0'
+         |                    AND holder_type = 1
+         |                    UNION ALL
+         |                    --主要成员
+         |                    SELECT  rowkey,company_id,company_name,staff_name human_name,hid,2 as status,create_time,update_time,deleted,ds,2 label
+         |                    FROM    $inc_ads_company_staff
+         |                    WHERE   ds > '0'
+         |                    UNION ALL
+         |                    SELECT  rowkey,company_id,company_name,staff_name human_name,hid,2 as status,create_time,update_time,deleted,ds,2 label
+         |                    FROM    $ads_company_staff
+         |                    WHERE   ds > '0'
+         |                    UNION ALL
+         |                    --法人
+         |                    SELECT  rowkey,company_id,company_name,legal_entity_name human_name,legal_entity_id as hid,2 as status,create_time,update_time,deleted,ds,3 label
+         |                    FROM    $inc_ads_company_legal_entity
+         |                    WHERE   ds > '0'
+         |                    AND legal_entity_type = 1
+         |                 )
+         |      ) WHERE NUM = 1
+         |) b on a.company_id = b.company_id
+         |""".stripMargin).createOrReplaceTempView("incr_company_human_all")
+
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $inc_ads_company_human_relation PARTITION(ds= '$ds')
+         |SELECT
+         |     CONCAT_WS('_',a.company_id,COALESCE(b.human_pid, a.human_pid)) AS rowkey
+         |     ,a.company_id
+         |     ,name_cleanup(c.company_name) company_name
+         |     ,a.human_name
+         |     ,hash(a.human_name) hid
+         |     ,COALESCE(b.human_pid, a.human_pid)  human_pid
+         |     ,2 STATUS
+         |     ,a.create_time
+         |     ,a.update_time
+         |     ,a.deleted
+         |     ,c.province_code
+         |     ,c.city_code
+         |     ,c.county_code
+         |     ,c.reg_capital_amount
+         |     ,c.cate_first_code
+         |     ,c.cate_second_code
+         |     ,c.cate_third_code
          |FROM    (
-         |            SELECT  *
-         |            FROM    (
-         |                        SELECT  company_id,company_name,human_name,hid,status,create_time,update_time,0 as deleted
-         |                                ,ROW_NUMBER() OVER (PARTITION BY company_id,human_name ORDER BY ds desc,update_time desc) num
-         |                        FROM    (
-         |                                    --股东
-         |                                    SELECT  company_id,company_name,holder_name human_name,holder_id hid,2 as status,create_time,update_time,0 as deleted,ds
-         |                                    FROM    $inc_ads_company_holder
-         |                                    WHERE   ds = '$ds' AND holder_type = 1
-         |                                    UNION ALL
-         |                                    --主要成员
-         |                                    SELECT  company_id,company_name,staff_name human_name,hid,2 as status,create_time,update_time,0 as deleted,ds
-         |                                    FROM    $inc_ads_company_staff
-         |                                    WHERE   ds = '$ds'
-         |                                    UNION ALL
-         |                                    --法人
-         |                                    SELECT  company_id,company_name,legal_entity_name human_name,legal_entity_id as hid,2 as status,create_time,update_time,0 as deleted,ds
-         |                                    FROM    $inc_ads_company_legal_entity
-         |                                    WHERE   ds = '$ds' AND legal_entity_type = 1
-         |                                )
-         |                    )
-         |            WHERE   num = 1 AND name_cleanup(human_name) <> ''
+         |        SELECT  company_id
+         |                ,human_name
+         |                ,deleted
+         |                ,create_time
+         |                ,update_time
+         |                ,concat('p',md5(uuid())) human_pid
+         |        FROM    incr_company_human_all
+         |        WHERE   length(human_name) > 0 AND  length(company_id) >= 32
          |        ) a
          |LEFT JOIN (
-         |    SELECT  company_id,human_name,human_pid
-         |    FROM    (
-         |              SELECT  company_id,human_name,human_pid
-         |                                ,ROW_NUMBER() OVER (PARTITION BY company_id,human_name ORDER BY ds desc,update_time desc) num
-         |               FROM    $inc_ads_company_human_relation
-         |               WHERE   ds > '0'
-         |              )
-         |    WHERE   num = 1
-         |) b
+         |        SELECT  company_id
+         |                ,human_name
+         |                ,human_pid
+         |        FROM    (
+         |                SELECT  company_id,human_name,human_pid
+         |                        ,ROW_NUMBER()OVER (PARTITION BY company_id, human_name ORDER BY ds DESC , update_time DESC ) num
+         |                FROM    (
+         |                        SELECT  company_id,human_name,human_pid,ds,update_time,deleted
+         |                        FROM    $inc_ads_company_human_relation
+         |                        WHERE   ds > '0'
+         |                        -- AND deleted <> 9
+         |                        UNION ALL
+         |                        SELECT  company_id,human_name,human_pid,ds,update_time,deleted
+         |                        FROM    $inc_ads_company_human_relation_merge
+         |                        WHERE   ds > '0'
+         |                        -- AND deleted <> 9
+         |                        )
+         |                )
+         |        WHERE   num =1
+         |          ) b
          |ON      a.company_id = b.company_id
-         |AND     a.human_name = b.human_name
+         |AND     name_cleanup(a.human_name) = name_cleanup(b.human_name)
          |JOIN (
          |SELECT * FROM company_view_all
          |) c
          |ON      a.company_id = c.company_id
-         |WHERE   b.company_id IS NULL
          |""".stripMargin)
 
     //全量人员关系表
@@ -232,10 +299,19 @@ case class inc_company_relation_v2(s: SparkSession,
       s"""
          |SELECT  company_id,human_name,human_pid
          |FROM    (
-         |          SELECT  company_id,human_name,human_pid
-         |                            ,ROW_NUMBER() OVER (PARTITION BY company_id,human_name ORDER BY ds desc,update_time desc) num
-         |           FROM    $inc_ads_company_human_relation
-         |           WHERE   ds > '0'
+         |           SELECT  company_id,human_name,human_pid
+         |                ,ROW_NUMBER() OVER (PARTITION BY company_id,human_name ORDER BY ds desc,update_time desc) num
+         |           FROM  (
+         |                     SELECT  company_id,human_name,human_pid,ds,update_time
+         |                     FROM    $inc_ads_company_human_relation
+         |                     WHERE   ds > '0'
+         |                     AND     deleted <> 9
+         |                     UNION ALL
+         |                     SELECT  company_id,human_name,human_pid,ds,update_time
+         |                     FROM    $inc_ads_company_human_relation_merge
+         |                     WHERE   ds > '0'
+         |                     AND     deleted <> 9
+         |              )
          |          )
          |WHERE   num = 1
          |""".stripMargin).createOrReplaceTempView("company_human_relation_all")
@@ -259,7 +335,7 @@ case class inc_company_relation_v2(s: SparkSession,
          |(
          |SELECT * FROM company_human_relation_all
          |) c
-         |ON a.company_id = c.company_id and a.holder_name = c.human_name
+         |ON a.company_id = c.company_id and name_cleanup(a.holder_name) = name_cleanup(c.human_name)
          |JOIN
          |  (
          |    SELECT rowkey
@@ -308,7 +384,7 @@ case class inc_company_relation_v2(s: SparkSession,
          |(
          |SELECT * FROM company_human_relation_all
          |) c
-         |ON a.company_id = c.company_id and a.staff_name   = c.human_name
+         |ON a.company_id = c.company_id and name_cleanup(a.staff_name)   = name_cleanup(c.human_name)
          |JOIN
          |  (
          |    SELECT rowkey
@@ -342,7 +418,7 @@ case class inc_company_relation_v2(s: SparkSession,
          |(
          |SELECT * FROM company_human_relation_all
          |) c
-         |ON a.company_id = c.company_id and a.legal_entity_name = c.human_name
+         |ON a.company_id = c.company_id and name_cleanup(a.legal_entity_name) = name_cleanup(c.human_name)
          |UNION ALL
          |SELECT legal_entity_id start_id,name_cleanup(legal_entity_name) start_name,a.company_id end_id,name_cleanup(company_name) end_name,deleted,legal_entity_type,label
          |from (
@@ -358,54 +434,6 @@ case class inc_company_relation_v2(s: SparkSession,
          |AND length(start_name) <> 0  AND length(end_name) <> 0
          |""".stripMargin)
 
-    //二次修正human表,保证和增量人对齐
-    sql(
-      s"""
-         |INSERT OVERWRITE TABLE $inc_ads_company_human_relation PARTITION(ds='$ds')
-         |SELECT   a.rowkey
-         |        ,a.company_id
-         |        ,a.company_name
-         |        ,a.human_name
-         |        ,a.hid
-         |        ,a.human_pid
-         |        ,a.STATUS
-         |        ,a.create_time
-         |        ,a.update_time
-         |        ,a.deleted
-         |        ,a.province_code
-         |        ,a.city_code
-         |        ,a.county_code
-         |        ,a.reg_capital_amount
-         |        ,a.cate_first_code
-         |        ,a.cate_second_code
-         |        ,a.cate_third_code
-         |FROM    (
-         |            SELECT  *
-         |            FROM    $inc_ads_company_human_relation
-         |            WHERE   ds = '$ds'
-         |        ) a
-         |JOIN (
-         |              SELECT  start_id
-         |              FROM    (
-         |                          SELECT  start_id
-         |                          FROM    $inc_ads_relation_holder
-         |                          WHERE   ds = '$ds'
-         |                          AND     holder_type = 1
-         |                          UNION ALL
-         |                          SELECT  start_id
-         |                          FROM    $inc_ads_relation_staff
-         |                          WHERE   ds = '$ds'
-         |                          UNION ALL
-         |                          SELECT  start_id
-         |                          FROM    $inc_ads_relation_legal_entity
-         |                          WHERE   ds = '$ds'
-         |                          AND     legal_entity_type = 1
-         |                      )
-         |              GROUP BY start_id
-         |          ) b
-         |ON      a.human_pid = b.start_id
-         |""".stripMargin)
-
   }
 
   def sendKafkaPre(): Unit = {
@@ -415,7 +443,6 @@ case class inc_company_relation_v2(s: SparkSession,
     val inc_ads_relation_staff_kafka = "winhc_ng.inc_ads_relation_staff_kafka"
     val inc_ads_relation_legal_entity_v1_kafka = "winhc_ng.inc_ads_relation_legal_entity_v1_kafka"
     val inc_ads_relation_legal_entity_v2_kafka = "winhc_ng.inc_ads_relation_legal_entity_v2_kafka"
-    val inc_ads_person_node_label_kafka = "winhc_ng.inc_ads_person_node_label_kafka"
     //公司节点
     sql(
       s"""
@@ -486,22 +513,6 @@ case class inc_company_relation_v2(s: SparkSession,
          |where ds = '$ds'
          |""".stripMargin).show(20, false)
 
-//    //新增或者更新-投资人-打标签
-//    sql(
-//      s"""
-//         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table $inc_ads_person_node_label_kafka  PARTITION (ds='$ds')
-//         |select key,max(message) message
-//         |from
-//         |(
-//         |  select
-//         |  start_id key,
-//         |  get_person_node(start_id, start_name, deleted, '7') message
-//         |  from $inc_ads_relation_holder
-//         |  where ds = '$ds' and holder_type = 1
-//         |)
-//         |group by key
-//         |""".stripMargin).show(20, false)
-
     //防止空分区
     addEmptyPartitionOrSkip(inc_ads_company_node_kafka, ds)
     addEmptyPartitionOrSkip(inc_ads_relation_holder_v1_kafka, ds)
@@ -509,10 +520,46 @@ case class inc_company_relation_v2(s: SparkSession,
     addEmptyPartitionOrSkip(inc_ads_relation_staff_kafka, ds)
     addEmptyPartitionOrSkip(inc_ads_relation_legal_entity_v1_kafka, ds)
     addEmptyPartitionOrSkip(inc_ads_relation_legal_entity_v2_kafka, ds)
-    addEmptyPartitionOrSkip(inc_ads_person_node_label_kafka, ds)
 
   }
 
+  //导出ES数据准备
+  def exportESPre(): Unit = {
+
+    //删除数据
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $inc_ads_company_human_relation_deleted PARTITION (ds='$ds')
+         |SELECT
+         |        human_pid
+         |        ,max(deleted) deleted
+         |FROM    (
+         |        SELECT
+         |                human_pid
+         |                ,deleted
+         |        FROM    $inc_ads_company_human_relation
+         |        WHERE   ds = '$ds'
+         |        AND deleted = 9
+         |        UNION ALL
+         |        SELECT  human_pid,deleted
+         |        FROM    (
+         |                SELECT
+         |                        human_pid
+         |                        ,MIN(deleted) OVER(PARTITION BY human_pid ) deleted
+         |                FROM    $inc_ads_company_human_relation
+         |                WHERE   ds = '$ds'
+         |                AND deleted <> 9
+         |                )
+         |        WHERE   deleted > 0
+         |        )
+         |GROUP BY human_pid
+         |""".stripMargin)
+
+    //合并数据
+    inc_human_relation_util
+      .merge(s, ds, inc_ads_company_human_relation, inc_ads_company_human_relation_update)
+  }
+
 }
 
 
@@ -533,6 +580,50 @@ object inc_company_relation_v2 {
     re.register_fun()
     re.inc()
     re.sendKafkaPre()
+    re.exportESPre()
     spark.stop()
   }
 }
+
+//    sql(
+//      s"""
+//         |SELECT  a.*
+//         |FROM (
+//         |SELECT  *
+//         |FROM    (
+//         |           SELECT  *
+//         |                ,ROW_NUMBER() OVER (PARTITION BY company_id,human_pid ORDER BY ds desc,update_time desc) num
+//         |           FROM  (
+//         |                     SELECT  *
+//         |                     FROM    $inc_ads_company_human_relation
+//         |                     WHERE   ds > '0'
+//         |                     -- AND     deleted <> 9
+//         |                     UNION ALL
+//         |                     SELECT  *
+//         |                     FROM    $inc_ads_company_human_relation_merge
+//         |                     WHERE   ds > '0'
+//         |                     -- AND     deleted <> 9
+//         |              )
+//         |          )
+//         |WHERE   num = 1
+//         |) a
+//         |JOIN (
+//         |              SELECT  start_id
+//         |              FROM    (
+//         |                          SELECT  start_id
+//         |                          FROM    $inc_ads_relation_holder
+//         |                          WHERE   ds = '$ds'
+//         |                          AND     holder_type = 1
+//         |                          UNION ALL
+//         |                          SELECT  start_id
+//         |                          FROM    $inc_ads_relation_staff
+//         |                          WHERE   ds = '$ds'
+//         |                          UNION ALL
+//         |                          SELECT  start_id
+//         |                          FROM    $inc_ads_relation_legal_entity
+//         |                          WHERE   ds = '$ds'
+//         |                          AND     legal_entity_type = 1
+//         |                      )
+//         |              GROUP BY start_id
+//         | ) b on a.human_pid = b.start_id
+//         |""".stripMargin).createOrReplaceTempView("update_tab")

+ 333 - 0
src/main/scala/com/winhc/bigdata/spark/ng/relation/inc_human_relation_merge.scala

@@ -0,0 +1,333 @@
+package com.winhc.bigdata.spark.ng.relation
+
+import com.winhc.bigdata.spark.config.EsConfig
+import com.winhc.bigdata.spark.udf.{BaseFunc, CompanyMapping}
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.{CompanyRelationUtils, LoggingUtils, SparkUtils}
+import org.apache.spark.sql.SparkSession
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @author: π
+ * @date: 2021/01/10 09:36
+ */
+case class inc_human_relation_merge(s: SparkSession,
+                                    project: String, //表所在工程名
+                                    ds: String //分区
+                                   ) extends LoggingUtils with BaseFunc with CompanyMapping {
+  @(transient@getter) val spark: SparkSession = s
+
+  val inc_ads_company = "winhc_ng.inc_ads_company"
+  val ads_company = "winhc_ng.ads_company"
+  val inc_ads_company_legal_entity = "winhc_ng.inc_ads_company_legal_entity"
+  val inc_ads_company_holder = "winhc_ng.inc_ads_company_holder"
+  val inc_ads_company_human_relation = "winhc_ng.inc_ads_company_human_relation"
+  val inc_ods_company_human_relation_merge = "winhc_ng.inc_ods_company_human_relation_merge"
+  val inc_ads_company_human_relation_merge = "winhc_ng.inc_ads_company_human_relation_merge"
+  val inc_ads_company_human_relation_merge_deleted = "winhc_ng.inc_ads_company_human_relation_merge_deleted"
+  val inc_ads_company_human_relation_merge_update = "winhc_ng.inc_ads_company_human_relation_merge_update"
+  val inc_ads_company_staff = "winhc_ng.inc_ads_company_staff"
+
+  val ads_change_extract = "winhc_ng.bds_change_extract"
+
+  val inc_ads_company_node = "winhc_ng.inc_ads_company_node"
+  val inc_ads_relation_holder = "winhc_ng.inc_ads_relation_holder"
+  val inc_ads_relation_staff = "winhc_ng.inc_ads_relation_staff"
+  val inc_ads_relation_legal_entity = "winhc_ng.inc_ads_relation_legal_entity"
+
+  def register_fun(): Unit = {
+    prepareFunctions(spark)
+  }
+
+  def inc(): Unit = {
+
+    //合并删除
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table $inc_ads_company_human_relation_merge PARTITION (ds='$ds')
+         |SELECT  CONCAT_WS('_',a.company_id,a.person_id) AS rowkey
+         |        ,a.company_id
+         |        ,b.company_name
+         |        ,a.person_name human_name
+         |        ,hash(a.person_name) hid
+         |        ,a.person_id human_pid
+         |        ,2 STATUS
+         |        ,a.create_time
+         |        ,a.update_time
+         |        ,a.deleted
+         |        ,b.province_code
+         |        ,b.city_code
+         |        ,b.county_code
+         |        ,b.reg_capital_amount
+         |        ,b.cate_first_code
+         |        ,b.cate_second_code
+         |        ,b.cate_third_code
+         |FROM    (
+         |           SELECT  *
+         |           FROM    (
+         |                       SELECT  company_id
+         |                               ,person_name
+         |                               ,person_id
+         |                               ,create_time
+         |                               ,update_time
+         |                               ,label
+         |                               ,ROW_NUMBER() OVER (PARTITION BY company_id,person_id ORDER BY label) num
+         |                               ,MIN(deleted) OVER (PARTITION BY company_id,person_id) deleted
+         |                       FROM    $inc_ods_company_human_relation_merge
+         |                       WHERE   ds = '$ds'
+         |                       AND     length(company_id) = 32
+         |                   )
+         |           WHERE num = 1
+         |        ) a
+         |JOIN    (
+         |            SELECT  *
+         |            FROM    (
+         |                        SELECT  *
+         |                                ,ROW_NUMBER() OVER(PARTITION BY company_id ORDER BY ds DESC,update_time DESC) num
+         |                        FROM    (
+         |                                    SELECT  company_id
+         |                                            ,name AS company_name
+         |                                            ,province_code
+         |                                            ,city_code
+         |                                            ,county_code
+         |                                            ,reg_capital_amount
+         |                                            ,cate_first_code
+         |                                            ,cate_second_code
+         |                                            ,cate_third_code
+         |                                            ,update_time
+         |                                            ,ds
+         |                                    FROM    $inc_ads_company
+         |                                    WHERE   ds > '0'
+         |                                    UNION ALL
+         |                                    SELECT  company_id
+         |                                            ,name AS company_name
+         |                                            ,province_code
+         |                                            ,city_code
+         |                                            ,county_code
+         |                                            ,reg_capital_amount
+         |                                            ,cate_first_code
+         |                                            ,cate_second_code
+         |                                            ,cate_third_code
+         |                                            ,update_time
+         |                                            ,ds
+         |                                    FROM    $ads_company
+         |                                    WHERE   ds > '0'
+         |                                )
+         |                    ) c
+         |            WHERE   num = 1
+         |        ) b
+         |ON      a.company_id = b.company_id
+         |UNION ALL
+         |SELECT
+         |         CONCAT_WS('_',company_id,a.person_id) AS rowkey
+         |        ,company_id
+         |        ,company_name
+         |        ,a.person_name human_name
+         |        ,hash(a.person_name) hid
+         |        ,a.person_id human_pid
+         |        ,STATUS
+         |        ,a.create_time
+         |        ,a.update_time
+         |        ,a.deleted
+         |        ,province_code
+         |        ,city_code
+         |        ,county_code
+         |        ,reg_capital_amount
+         |        ,cate_first_code
+         |        ,cate_second_code
+         |        ,cate_third_code
+         |from (
+         |  SELECT person_name
+         |        ,person_id
+         |        ,create_time
+         |        ,update_time
+         |        ,deleted
+         |FROM    $inc_ods_company_human_relation_merge
+         |WHERE   ds = '$ds'
+         |AND     length(company_id) = 0
+         |) a
+         |JOIN (
+         |    SELECT *
+         |    FROM  $inc_ads_company_human_relation_merge
+         |    WHERE   ds < '$ds'
+         |    AND     deleted <> 9
+         |    UNION ALL
+         |    SELECT *
+         |    FROM  $inc_ads_company_human_relation
+         |    WHERE   ds > '0'
+         |    AND     deleted <> 9
+         |) b
+         |on a.person_id = b.human_pid
+         |""".stripMargin)
+
+  }
+
+  //导出ES数据准备
+  def exportESPre(): Unit = {
+    //删除数据
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $inc_ads_company_human_relation_merge_deleted PARTITION (ds='$ds')
+         |SELECT
+         |        human_pid
+         |        ,max(deleted) deleted
+         |FROM    (
+         |        SELECT
+         |                human_pid
+         |                ,deleted
+         |        FROM    $inc_ads_company_human_relation_merge
+         |        WHERE   ds = '$ds'
+         |        AND deleted = 9
+         |        UNION ALL
+         |        SELECT  human_pid,deleted
+         |        FROM    (
+         |                SELECT
+         |                        human_pid
+         |                        ,MIN(deleted) OVER(PARTITION BY human_pid ) deleted
+         |                FROM    $inc_ads_company_human_relation_merge
+         |                WHERE   ds = '$ds'
+         |                AND deleted <> 9
+         |                )
+         |        WHERE   deleted > 0
+         |        )
+         |GROUP BY human_pid
+         |""".stripMargin)
+
+    //增量合并数据
+    inc_human_relation_util
+      .merge(s, ds, inc_ads_company_human_relation_merge, inc_ads_company_human_relation_merge_update)
+
+  }
+
+}
+
+
+object inc_human_relation_merge {
+  def main(args: Array[String]): Unit = {
+    if (args.size != 2) {
+      println("please set project ds.")
+      sys.exit(-1)
+    }
+    val Array(project, ds) = args
+    val config = EsConfig.getEsConfigMap ++ mutable.Map(
+      "spark.hadoop.odps.project.name" -> project,
+      "spark.debug.maxToStringFields" -> "200",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "10000"
+    )
+    val spark = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    val re = inc_human_relation_merge(s = spark, project = project, ds = ds)
+    re.register_fun()
+    re.inc()
+    re.exportESPre()
+    spark.stop()
+  }
+}
+
+object inc_human_relation_util {
+  def merge(spark: SparkSession, ds: String, source_tab: String, target_tab: String): Unit = {
+    //合并逻辑
+    spark.sql(
+      s"""
+         |SELECT  *
+         |        ,ROW_NUMBER() OVER(PARTITION BY human_pid ORDER BY tmp_val desc,province_num DESC ,human_pid) AS num_rank
+         |FROM    (
+         |            SELECT  human_pid
+         |                    ,case
+         |                     when province_code = '' then 0
+         |                     when province_code = '0' then 0
+         |                     when province_code is null then 0
+         |                     else 1
+         |                     end as tmp_val
+         |                    ,COLLECT_SET(human_name)[0] AS human_name
+         |                    ,province_code
+         |                    ,COUNT(1) AS province_num
+         |                    ,CONCAT_WS(',',collect_set(area_code)) AS area_code
+         |                    ,CONCAT_WS(',',collect_set(cate_code)) AS cate_code
+         |                    ,COLLECT_SET(first_company)[0] AS first_company
+         |                    ,COLLECT_SET(first_company_reg_capital_amount)[0] AS first_company_reg_capital_amount
+         |                    ,COLLECT_SET(total_compny_cnt)[0] AS total_compny_cnt
+         |            FROM    (
+         |                        SELECT  company_id
+         |                                ,human_name
+         |                                ,human_pid
+         |                                ,deleted
+         |                                ,CONCAT_WS('|', province_code,city_code,county_code) AS area_code
+         |                                ,case
+         |                                   when province_code = '' then '0'
+         |                                   when province_code = '0' then '0'
+         |                                   when province_code is null then '0'
+         |                                else province_code
+         |                                end as  province_code
+         |                                ,CONCAT_WS('|', cate_first_code,cate_second_code,cate_third_code) AS cate_code
+         |                                ,FIRST_VALUE(company_id) OVER (PARTITION BY human_pid,province_code ORDER BY reg_capital_amount DESC) AS first_company
+         |                                ,FIRST_VALUE(reg_capital_amount) OVER (PARTITION BY human_pid,province_code ORDER BY reg_capital_amount DESC) AS first_company_reg_capital_amount
+         |                                ,COUNT(company_id) OVER(PARTITION BY human_pid) AS total_compny_cnt
+         |                        FROM    (
+         |                                    SELECT  *
+         |                                    FROM    $source_tab
+         |                                    WHERE   ds = "$ds"
+         |                                    AND deleted = 0
+         |                                )
+         |                    )
+         |            GROUP BY human_pid
+         |                     ,province_code
+         |        )
+         |""".stripMargin).createOrReplaceTempView("tmp_tab")
+
+    spark.sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table $target_tab PARTITION (ds='$ds')
+         |SELECT  human_pid
+         |        ,COLLECT_SET(human_name)[0] AS human_name
+         |        ,CONCAT_ws(',',COLLECT_SET(area_code)) AS area_code
+         |        ,CONCAT_ws(',',COLLECT_SET(cate_code)) AS cate_code
+         |        ,CONCAT('[',CONCAT_WS(',',collect_set(code_company_num_tup)),']') AS code_company_num_map
+         |        ,0 deleted
+         |        ,$ds as update_time
+         |        ,COLLECT_SET(total_compny_cnt)[0] AS total_compny_cnt
+         |FROM    (
+         |            SELECT  human_pid
+         |                    ,COLLECT_SET(human_name)[0] AS human_name
+         |                    ,CONCAT_WS(',',COLLECT_SET(area_code)) AS area_code
+         |                    ,CONCAT_WS(',',COLLECT_SET(cate_code)) AS cate_code
+         |                    ,CONCAT(
+         |                        '{"province_code":"0","province_num":"',sum(province_num),'","company_id":"',collect_set(first_company)[0],'"}'
+         |                    ) AS code_company_num_tup
+         |                    ,COLLECT_SET(total_compny_cnt)[0] AS total_compny_cnt
+         |            FROM    (
+         |                        SELECT  human_pid
+         |                                ,human_name
+         |                                ,province_num
+         |                                ,area_code
+         |                                ,cate_code
+         |                                ,first_company_reg_capital_amount
+         |                                ,FIRST_VALUE(first_company) OVER (PARTITION BY human_pid ORDER BY first_company_reg_capital_amount DESC) AS first_company
+         |                                ,total_compny_cnt
+         |                        FROM    tmp_tab
+         |                        WHERE   num_rank > 2
+         |                    )
+         |            GROUP BY human_pid
+         |            UNION ALL
+         |            SELECT  human_pid
+         |                    ,human_name
+         |                    ,area_code
+         |                    ,cate_code
+         |                    ,CONCAT(
+         |                        '{"province_code":"'
+         |                        ,province_code
+         |                        ,'","province_num":"',province_num
+         |                        ,'","company_id":"',first_company
+         |                        ,'"}'
+         |                    ) AS code_company_num_tup
+         |                    ,total_compny_cnt
+         |            FROM    tmp_tab
+         |            WHERE   num_rank <= 2
+         |        )
+         |GROUP BY human_pid
+         |""".stripMargin)
+  }
+}
+
+