Kaynağa Gözat

合并逻辑修复

xufei 3 yıl önce
ebeveyn
işleme
a59a44e17d

+ 58 - 119
src/main/scala/com/winhc/bigdata/spark/ng/relation/inc_company_relation_v2.scala

@@ -167,135 +167,74 @@ case class inc_company_relation_v2(s: SparkSession,
          |WHERE num = 1
          |""".stripMargin).createOrReplaceTempView("company_view_all")
 
-    //TODO GET 新增-更新-关系表
+    //新增人员关系表(只更新新增人员)
+    //TODO 过滤 deleted = 1
     sql(
       s"""
-         |SELECT  b.company_id,b.human_name,b.deleted,b.create_time,b.update_time
-         |FROM
-         |(
-         |   SELECT  *
-         |   FROM    (
-         |               SELECT  company_id,company_name,name_cleanup(human_name) human_name,hid,status,create_time,update_time
-         |                       ,ROW_NUMBER() OVER (PARTITION BY company_id ORDER BY ds desc,update_time desc) num
-         |               FROM    (
-         |                           --股东
-         |                           SELECT  company_id,company_name,holder_name human_name,holder_id hid,2 as status,create_time,update_time,ds
-         |                           FROM    $inc_ads_company_holder
-         |                           WHERE   ds = '$ds' AND holder_type = 1
-         |                           UNION ALL
-         |                           --主要成员
-         |                           SELECT  company_id,company_name,staff_name human_name,hid,2 as status,create_time,update_time,ds
-         |                           FROM    $inc_ads_company_staff
-         |                           WHERE   ds = '$ds'
-         |                           UNION ALL
-         |                           --法人
-         |                           SELECT  company_id,company_name,legal_entity_name human_name,legal_entity_id as hid,2 as status,create_time,update_time,ds
-         |                           FROM    $inc_ads_company_legal_entity
-         |                           WHERE   ds = '$ds' AND legal_entity_type = 1
-         |                       )
-         |           )
-         |   WHERE   num = 1
-         |   -- AND name_cleanup(human_name) <> ''
-         |) a JOIN
-         |(
-         |  SELECT *
-         |  FROM (
-         |         SELECT company_id,name_cleanup(human_name) human_name,create_time,update_time
-         |             ,MIN(deleted) OVER (PARTITION BY company_id,human_name) deleted
-         |             ,ROW_NUMBER() OVER (PARTITION BY company_id,human_name ORDER BY ds desc,update_time desc) num2
-         |              FROM (
-         |                   SELECT rowkey,company_id,company_name,human_name,hid,status,create_time,update_time,deleted,ds,label
-         |                         ,ROW_NUMBER() OVER (PARTITION BY company_id,human_name,label ORDER BY ds desc,update_time desc) num
-         |                   FROM (
-         |                          --股东
-         |                          SELECT  rowkey,company_id,company_name,holder_name human_name,holder_id hid,2 as status,create_time,update_time,deleted,ds,1 label
-         |                          FROM    $inc_ads_company_holder
-         |                          WHERE   ds > '0'
-         |                          AND holder_type = 1
-         |                          UNION ALL
-         |                          SELECT  rowkey,company_id,company_name,holder_name human_name,holder_id hid,2 as status,create_time,update_time,deleted,ds,1 label
-         |                          FROM    $ads_company_holder
-         |                          WHERE   ds > '0'
-         |                          AND holder_type = 1
-         |                          UNION ALL
-         |                          --主要成员
-         |                          SELECT  rowkey,company_id,company_name,staff_name human_name,hid,2 as status,create_time,update_time,deleted,ds,2 label
-         |                          FROM    $inc_ads_company_staff
-         |                          WHERE   ds > '0'
-         |                          UNION ALL
-         |                          SELECT  rowkey,company_id,company_name,staff_name human_name,hid,2 as status,create_time,update_time,deleted,ds,2 label
-         |                          FROM    $ads_company_staff
-         |                          WHERE   ds > '0'
-         |                          UNION ALL
-         |                          --法人
-         |                          SELECT  rowkey,company_id,company_name,legal_entity_name human_name,legal_entity_id as hid,2 as status,create_time,update_time,deleted,ds,3 label
-         |                          FROM    $inc_ads_company_legal_entity
-         |                          WHERE   ds > '0'
-         |                          AND legal_entity_type = 1
-         |                      )
-         |                 ) WHERE NUM = 1
-         |      ) WHERE NUM2 = 1
-         |) b on a.company_id = b.company_id
-         |""".stripMargin).createOrReplaceTempView("incr_company_human_all")
-
-    sql(
-      s"""
-         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $inc_ads_company_human_relation PARTITION(ds= '$ds')
-         |SELECT
-         |     CONCAT_WS('_',a.company_id,COALESCE(b.human_pid, a.human_pid)) AS rowkey
-         |     ,a.company_id
-         |     ,name_cleanup(c.company_name) company_name
-         |     ,a.human_name
-         |     ,hash(a.human_name) hid
-         |     ,COALESCE(b.human_pid, a.human_pid)  human_pid
-         |     ,2 STATUS
-         |     ,a.create_time
-         |     ,a.update_time
-         |     ,a.deleted
-         |     ,c.province_code
-         |     ,c.city_code
-         |     ,c.county_code
-         |     ,c.reg_capital_amount
-         |     ,c.cate_first_code
-         |     ,c.cate_second_code
-         |     ,c.cate_third_code
+         |INSERT INTO TABLE $inc_ads_company_human_relation PARTITION(ds= '$ds')
+         |SELECT  md5(CONCAT_WS('_',a.company_id,a.human_pid)) as rowkey
+         |        ,a.company_id
+         |        ,a.company_name
+         |        ,a.human_name
+         |        ,hash(a.human_name) AS hid
+         |        ,a.human_pid
+         |        ,a.STATUS
+         |        ,a.create_time
+         |        ,a.update_time
+         |        ,a.deleted
+         |        ,c.province_code,c.city_code,c.county_code
+         |        ,c.reg_capital_amount,c.cate_first_code
+         |        ,c.cate_second_code,c.cate_third_code
          |FROM    (
-         |        SELECT  company_id
-         |                ,human_name
-         |                ,deleted
-         |                ,create_time
-         |                ,update_time
-         |                ,concat('p',md5(uuid())) human_pid
-         |        FROM    incr_company_human_all
-         |        WHERE   length(human_name) > 0 AND  length(company_id) >= 32
+         |            SELECT  *,concat('p',md5(uuid())) human_pid
+         |            FROM    (
+         |                        SELECT  company_id,company_name,name_cleanup(human_name) human_name,hid,status,create_time,update_time,0 as deleted
+         |                                ,ROW_NUMBER() OVER (PARTITION BY company_id,human_name ORDER BY ds desc,update_time desc) num
+         |                        FROM    (
+         |                                    --股东
+         |                                    SELECT  company_id,company_name,holder_name human_name,holder_id hid,2 as status,create_time,update_time,0 as deleted,ds
+         |                                    FROM    $inc_ads_company_holder
+         |                                    WHERE   ds = '$ds' AND holder_type = 1
+         |                                    UNION ALL
+         |                                    --主要成员
+         |                                    SELECT  company_id,company_name,staff_name human_name,hid,2 as status,create_time,update_time,0 as deleted,ds
+         |                                    FROM    $inc_ads_company_staff
+         |                                    WHERE   ds = '$ds'
+         |                                    UNION ALL
+         |                                    --法人
+         |                                    SELECT  company_id,company_name,legal_entity_name human_name,legal_entity_id as hid,2 as status,create_time,update_time,0 as deleted,ds
+         |                                    FROM    $inc_ads_company_legal_entity
+         |                                    WHERE   ds = '$ds' AND legal_entity_type = 1
+         |                                )
+         |                    )
+         |            WHERE   num = 1 AND name_cleanup(human_name) <> '' AND length(company_id) >= 32
          |        ) a
          |LEFT JOIN (
-         |        SELECT  company_id
-         |                ,human_name
-         |                ,human_pid
-         |        FROM    (
-         |                SELECT  company_id,human_name,human_pid
-         |                        ,ROW_NUMBER()OVER (PARTITION BY company_id, human_name ORDER BY ds DESC , update_time DESC ) num
-         |                FROM    (
-         |                        SELECT  company_id,human_name,human_pid,ds,update_time,deleted
-         |                        FROM    $inc_ads_company_human_relation
-         |                        WHERE   ds > '0'
-         |                        AND deleted <> 9
-         |                        UNION ALL
-         |                        SELECT  company_id,human_name,human_pid,ds,update_time,deleted
-         |                        FROM    $inc_ads_company_human_relation_merge
-         |                        WHERE   ds > '0'
-         |                        AND deleted <> 9
-         |                        )
-         |                )
-         |        WHERE   num =1
-         |          ) b
+         |    SELECT  company_id,human_name,human_pid
+         |    FROM    (
+         |               SELECT  company_id,human_name,human_pid
+         |                                     ,ROW_NUMBER() OVER (PARTITION BY company_id,human_name ORDER BY ds desc,update_time desc) num
+         |               FROM(
+         |                    SELECT  company_id,human_name,human_pid,ds,update_time
+         |                    FROM    $inc_ads_company_human_relation
+         |                    WHERE   ds > '0'
+         |                    AND     deleted <> 9
+         |                    UNION ALL
+         |                    SELECT  company_id,human_name,human_pid,ds,update_time
+         |                    FROM    $inc_ads_company_human_relation_merge
+         |                    WHERE   ds > '0'
+         |                    AND     deleted <> 9
+         |                 )
+         |              )
+         |    WHERE   num = 1
+         |) b
          |ON      a.company_id = b.company_id
          |AND     name_cleanup(a.human_name) = name_cleanup(b.human_name)
          |JOIN (
          |SELECT * FROM company_view_all
          |) c
          |ON      a.company_id = c.company_id
+         |WHERE   b.company_id IS NULL
          |""".stripMargin)
 
     //全量人员关系表
@@ -612,7 +551,7 @@ object inc_company_relation_v2 {
     re.register_fun()
     re.inc()
     re.sendKafkaPre()
-    re.exportESPre()
+    //re.exportESPre()//后置生成
     spark.stop()
   }
 }