فهرست منبع

Merge remote-tracking branch 'origin/master'

许家凯 3 سال پیش
والد
کامیت
d69e894494

+ 73 - 120
src/main/scala/com/winhc/bigdata/spark/ng/relation/inc_company_relation_v2.scala

@@ -57,11 +57,14 @@ case class inc_company_relation_v2(s: SparkSession,
                                   end_name: String, deleted: Int, legal_entity_type: String, topic_type: String): String =
       CompanyRelationUtils.get_relation_legal_entity(start_id, start_name, end_id, end_name, deleted.toString, legal_entity_type, topic_type)
 
+    def get_success_status(ds: String, status: String, topic_type: String): String = CompanyRelationUtils.get_success_status(ds, status, topic_type)
+
     spark.udf.register("get_company_node", get_company_node _)
     spark.udf.register("get_person_node", get_person_node _)
     spark.udf.register("get_relation_holder", get_relation_holder _)
     spark.udf.register("get_relation_staff", get_relation_staff _)
     spark.udf.register("get_relation_legal_entity", get_relation_legal_entity _)
+    spark.udf.register("get_success_status", get_success_status _)
   }
 
   def inc(): Unit = {
@@ -167,135 +170,74 @@ case class inc_company_relation_v2(s: SparkSession,
          |WHERE num = 1
          |""".stripMargin).createOrReplaceTempView("company_view_all")
 
-    //TODO GET 新增-更新-关系表
-    sql(
-      s"""
-         |SELECT  b.company_id,b.human_name,b.deleted,b.create_time,b.update_time
-         |FROM
-         |(
-         |   SELECT  *
-         |   FROM    (
-         |               SELECT  company_id,company_name,name_cleanup(human_name) human_name,hid,status,create_time,update_time
-         |                       ,ROW_NUMBER() OVER (PARTITION BY company_id ORDER BY ds desc,update_time desc) num
-         |               FROM    (
-         |                           --股东
-         |                           SELECT  company_id,company_name,holder_name human_name,holder_id hid,2 as status,create_time,update_time,ds
-         |                           FROM    $inc_ads_company_holder
-         |                           WHERE   ds = '$ds' AND holder_type = 1
-         |                           UNION ALL
-         |                           --主要成员
-         |                           SELECT  company_id,company_name,staff_name human_name,hid,2 as status,create_time,update_time,ds
-         |                           FROM    $inc_ads_company_staff
-         |                           WHERE   ds = '$ds'
-         |                           UNION ALL
-         |                           --法人
-         |                           SELECT  company_id,company_name,legal_entity_name human_name,legal_entity_id as hid,2 as status,create_time,update_time,ds
-         |                           FROM    $inc_ads_company_legal_entity
-         |                           WHERE   ds = '$ds' AND legal_entity_type = 1
-         |                       )
-         |           )
-         |   WHERE   num = 1
-         |   -- AND name_cleanup(human_name) <> ''
-         |) a JOIN
-         |(
-         |  SELECT *
-         |  FROM (
-         |         SELECT company_id,name_cleanup(human_name) human_name,create_time,update_time
-         |             ,MIN(deleted) OVER (PARTITION BY company_id,human_name) deleted
-         |             ,ROW_NUMBER() OVER (PARTITION BY company_id,human_name ORDER BY ds desc,update_time desc) num2
-         |              FROM (
-         |                   SELECT rowkey,company_id,company_name,human_name,hid,status,create_time,update_time,deleted,ds,label
-         |                         ,ROW_NUMBER() OVER (PARTITION BY company_id,human_name,label ORDER BY ds desc,update_time desc) num
-         |                   FROM (
-         |                          --股东
-         |                          SELECT  rowkey,company_id,company_name,holder_name human_name,holder_id hid,2 as status,create_time,update_time,deleted,ds,1 label
-         |                          FROM    $inc_ads_company_holder
-         |                          WHERE   ds > '0'
-         |                          AND holder_type = 1
-         |                          UNION ALL
-         |                          SELECT  rowkey,company_id,company_name,holder_name human_name,holder_id hid,2 as status,create_time,update_time,deleted,ds,1 label
-         |                          FROM    $ads_company_holder
-         |                          WHERE   ds > '0'
-         |                          AND holder_type = 1
-         |                          UNION ALL
-         |                          --主要成员
-         |                          SELECT  rowkey,company_id,company_name,staff_name human_name,hid,2 as status,create_time,update_time,deleted,ds,2 label
-         |                          FROM    $inc_ads_company_staff
-         |                          WHERE   ds > '0'
-         |                          UNION ALL
-         |                          SELECT  rowkey,company_id,company_name,staff_name human_name,hid,2 as status,create_time,update_time,deleted,ds,2 label
-         |                          FROM    $ads_company_staff
-         |                          WHERE   ds > '0'
-         |                          UNION ALL
-         |                          --法人
-         |                          SELECT  rowkey,company_id,company_name,legal_entity_name human_name,legal_entity_id as hid,2 as status,create_time,update_time,deleted,ds,3 label
-         |                          FROM    $inc_ads_company_legal_entity
-         |                          WHERE   ds > '0'
-         |                          AND legal_entity_type = 1
-         |                      )
-         |                 ) WHERE NUM = 1
-         |      ) WHERE NUM2 = 1
-         |) b on a.company_id = b.company_id
-         |""".stripMargin).createOrReplaceTempView("incr_company_human_all")
-
+    //新增人员关系表(只更新新增人员)
+    //TODO 过滤 deleted = 1
     sql(
       s"""
-         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $inc_ads_company_human_relation PARTITION(ds= '$ds')
-         |SELECT
-         |     CONCAT_WS('_',a.company_id,COALESCE(b.human_pid, a.human_pid)) AS rowkey
-         |     ,a.company_id
-         |     ,name_cleanup(c.company_name) company_name
-         |     ,a.human_name
-         |     ,hash(a.human_name) hid
-         |     ,COALESCE(b.human_pid, a.human_pid)  human_pid
-         |     ,2 STATUS
-         |     ,a.create_time
-         |     ,a.update_time
-         |     ,a.deleted
-         |     ,c.province_code
-         |     ,c.city_code
-         |     ,c.county_code
-         |     ,c.reg_capital_amount
-         |     ,c.cate_first_code
-         |     ,c.cate_second_code
-         |     ,c.cate_third_code
+         |INSERT INTO TABLE $inc_ads_company_human_relation PARTITION(ds= '$ds')
+         |SELECT  md5(CONCAT_WS('_',a.company_id,a.human_pid)) as rowkey
+         |        ,a.company_id
+         |        ,a.company_name
+         |        ,a.human_name
+         |        ,hash(a.human_name) AS hid
+         |        ,a.human_pid
+         |        ,a.STATUS
+         |        ,a.create_time
+         |        ,a.update_time
+         |        ,a.deleted
+         |        ,c.province_code,c.city_code,c.county_code
+         |        ,c.reg_capital_amount,c.cate_first_code
+         |        ,c.cate_second_code,c.cate_third_code
          |FROM    (
-         |        SELECT  company_id
-         |                ,human_name
-         |                ,deleted
-         |                ,create_time
-         |                ,update_time
-         |                ,concat('p',md5(uuid())) human_pid
-         |        FROM    incr_company_human_all
-         |        WHERE   length(human_name) > 0 AND  length(company_id) >= 32
+         |            SELECT  *,concat('p',md5(uuid())) human_pid
+         |            FROM    (
+         |                        SELECT  company_id,company_name,name_cleanup(human_name) human_name,hid,status,create_time,update_time,0 as deleted
+         |                                ,ROW_NUMBER() OVER (PARTITION BY company_id,human_name ORDER BY ds desc,update_time desc) num
+         |                        FROM    (
+         |                                    --股东
+         |                                    SELECT  company_id,company_name,holder_name human_name,holder_id hid,2 as status,create_time,update_time,0 as deleted,ds
+         |                                    FROM    $inc_ads_company_holder
+         |                                    WHERE   ds = '$ds' AND holder_type = 1
+         |                                    UNION ALL
+         |                                    --主要成员
+         |                                    SELECT  company_id,company_name,staff_name human_name,hid,2 as status,create_time,update_time,0 as deleted,ds
+         |                                    FROM    $inc_ads_company_staff
+         |                                    WHERE   ds = '$ds'
+         |                                    UNION ALL
+         |                                    --法人
+         |                                    SELECT  company_id,company_name,legal_entity_name human_name,legal_entity_id as hid,2 as status,create_time,update_time,0 as deleted,ds
+         |                                    FROM    $inc_ads_company_legal_entity
+         |                                    WHERE   ds = '$ds' AND legal_entity_type = 1
+         |                                )
+         |                    )
+         |            WHERE   num = 1 AND name_cleanup(human_name) <> '' AND length(company_id) >= 32
          |        ) a
          |LEFT JOIN (
-         |        SELECT  company_id
-         |                ,human_name
-         |                ,human_pid
-         |        FROM    (
-         |                SELECT  company_id,human_name,human_pid
-         |                        ,ROW_NUMBER()OVER (PARTITION BY company_id, human_name ORDER BY ds DESC , update_time DESC ) num
-         |                FROM    (
-         |                        SELECT  company_id,human_name,human_pid,ds,update_time,deleted
-         |                        FROM    $inc_ads_company_human_relation
-         |                        WHERE   ds > '0'
-         |                        AND deleted <> 9
-         |                        UNION ALL
-         |                        SELECT  company_id,human_name,human_pid,ds,update_time,deleted
-         |                        FROM    $inc_ads_company_human_relation_merge
-         |                        WHERE   ds > '0'
-         |                        AND deleted <> 9
-         |                        )
-         |                )
-         |        WHERE   num =1
-         |          ) b
+         |    SELECT  company_id,human_name,human_pid
+         |    FROM    (
+         |               SELECT  company_id,human_name,human_pid
+         |                                     ,ROW_NUMBER() OVER (PARTITION BY company_id,human_name ORDER BY ds desc,update_time desc) num
+         |               FROM(
+         |                    SELECT  company_id,human_name,human_pid,ds,update_time
+         |                    FROM    $inc_ads_company_human_relation
+         |                    WHERE   ds > '0'
+         |                    AND     deleted <> 9
+         |                    UNION ALL
+         |                    SELECT  company_id,human_name,human_pid,ds,update_time
+         |                    FROM    $inc_ads_company_human_relation_merge
+         |                    WHERE   ds > '0'
+         |                    AND     deleted <> 9
+         |                 )
+         |              )
+         |    WHERE   num = 1
+         |) b
          |ON      a.company_id = b.company_id
          |AND     name_cleanup(a.human_name) = name_cleanup(b.human_name)
          |JOIN (
          |SELECT * FROM company_view_all
          |) c
          |ON      a.company_id = c.company_id
+         |WHERE   b.company_id IS NULL
          |""".stripMargin)
 
     //全量人员关系表
@@ -447,6 +389,7 @@ case class inc_company_relation_v2(s: SparkSession,
     val inc_ads_relation_staff_kafka = "winhc_ng.inc_ads_relation_staff_kafka"
     val inc_ads_relation_legal_entity_v1_kafka = "winhc_ng.inc_ads_relation_legal_entity_v1_kafka"
     val inc_ads_relation_legal_entity_v2_kafka = "winhc_ng.inc_ads_relation_legal_entity_v2_kafka"
+    val inc_ads_node_relation_success_status = "winhc_ng.inc_ads_node_relation_success_status"
     //公司节点
     sql(
       s"""
@@ -517,6 +460,15 @@ case class inc_company_relation_v2(s: SparkSession,
          |where ds = '$ds'
          |""".stripMargin).show(20, false)
 
+    //发送成功状态标识
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table $inc_ads_node_relation_success_status  PARTITION (ds='$ds')
+         |select
+         |$ds key,
+         |get_success_status($ds, '1', '100') message
+         |""".stripMargin).show(20, false)
+
     //防止空分区
     addEmptyPartitionOrSkip(inc_ads_company_node_kafka, ds)
     addEmptyPartitionOrSkip(inc_ads_relation_holder_v1_kafka, ds)
@@ -524,6 +476,7 @@ case class inc_company_relation_v2(s: SparkSession,
     addEmptyPartitionOrSkip(inc_ads_relation_staff_kafka, ds)
     addEmptyPartitionOrSkip(inc_ads_relation_legal_entity_v1_kafka, ds)
     addEmptyPartitionOrSkip(inc_ads_relation_legal_entity_v2_kafka, ds)
+    addEmptyPartitionOrSkip(inc_ads_node_relation_success_status, ds)
 
   }
 
@@ -556,7 +509,7 @@ case class inc_company_relation_v2(s: SparkSession,
          |                ) WHERE   num =1
          |     ) b
          |ON      a.human_pid = b.human_pid
-        |""".stripMargin).createOrReplaceTempView("inc_update_deleted_person_company")
+         |""".stripMargin).createOrReplaceTempView("inc_update_deleted_person_company")
 
     //删除数据
     sql(
@@ -612,7 +565,7 @@ object inc_company_relation_v2 {
     re.register_fun()
     re.inc()
     re.sendKafkaPre()
-    re.exportESPre()
+    //re.exportESPre()//后置生成
     spark.stop()
   }
 }

+ 4 - 0
src/main/scala/com/winhc/bigdata/spark/utils/CompanyRelationUtils.scala

@@ -19,6 +19,7 @@ case class relation_staff(start_id: String, start_name: String, end_id: String,
 case class relation_legal_entity(start_id: String, start_name: String, end_id: String,
                                  end_name: String, deleted: String, legal_entity_type: String, topic_type: String)
 
+case class success_status(ds: String, status: String, topic_type: String)
 
 object CompanyRelationUtils {
 
@@ -40,4 +41,7 @@ object CompanyRelationUtils {
                                 end_name: String, deleted: String, legal_entity_type: String, topic_type: String): String =
     relation_legal_entity(start_id, start_name, end_id, end_name, deleted, legal_entity_type, topic_type).toJson()
 
+  def get_success_status(ds: String, status: String, topic_type: String): String =
+    success_status(ds, status, topic_type).toJson()
+
 }