xufei 3 years ago
parent
commit
e26e336adc

+ 65 - 33
src/main/scala/com/winhc/bigdata/spark/ng/relation/inc_company_relation_v2.scala

@@ -201,36 +201,40 @@ case class inc_company_relation_v2(s: SparkSession,
          |  SELECT *
          |  FROM (
          |         SELECT company_id,name_cleanup(human_name) human_name,create_time,update_time
-         |             ,ROW_NUMBER() OVER (PARTITION BY company_id,human_name ORDER BY ds desc,update_time desc) num
          |             ,MIN(deleted) OVER (PARTITION BY company_id,human_name) deleted
+         |             ,ROW_NUMBER() OVER (PARTITION BY company_id,human_name ORDER BY ds desc,update_time desc) num2
          |              FROM (
-         |                    --股东
-         |                    SELECT  rowkey,company_id,company_name,holder_name human_name,holder_id hid,2 as status,create_time,update_time,deleted,ds,1 label
-         |                    FROM    $inc_ads_company_holder
-         |                    WHERE   ds > '0'
-         |                    AND holder_type = 1
-         |                    UNION ALL
-         |                    SELECT  rowkey,company_id,company_name,holder_name human_name,holder_id hid,2 as status,create_time,update_time,deleted,ds,1 label
-         |                    FROM    $ads_company_holder
-         |                    WHERE   ds > '0'
-         |                    AND holder_type = 1
-         |                    UNION ALL
-         |                    --主要成员
-         |                    SELECT  rowkey,company_id,company_name,staff_name human_name,hid,2 as status,create_time,update_time,deleted,ds,2 label
-         |                    FROM    $inc_ads_company_staff
-         |                    WHERE   ds > '0'
-         |                    UNION ALL
-         |                    SELECT  rowkey,company_id,company_name,staff_name human_name,hid,2 as status,create_time,update_time,deleted,ds,2 label
-         |                    FROM    $ads_company_staff
-         |                    WHERE   ds > '0'
-         |                    UNION ALL
-         |                    --法人
-         |                    SELECT  rowkey,company_id,company_name,legal_entity_name human_name,legal_entity_id as hid,2 as status,create_time,update_time,deleted,ds,3 label
-         |                    FROM    $inc_ads_company_legal_entity
-         |                    WHERE   ds > '0'
-         |                    AND legal_entity_type = 1
-         |                 )
-         |      ) WHERE NUM = 1
+         |                   SELECT rowkey,company_id,company_name,human_name,hid,status,create_time,update_time,deleted,ds,label
+         |                         ,ROW_NUMBER() OVER (PARTITION BY company_id,human_name,label ORDER BY ds desc,update_time desc) num
+         |                   FROM (
+         |                          --股东
+         |                          SELECT  rowkey,company_id,company_name,holder_name human_name,holder_id hid,2 as status,create_time,update_time,deleted,ds,1 label
+         |                          FROM    $inc_ads_company_holder
+         |                          WHERE   ds > '0'
+         |                          AND holder_type = 1
+         |                          UNION ALL
+         |                          SELECT  rowkey,company_id,company_name,holder_name human_name,holder_id hid,2 as status,create_time,update_time,deleted,ds,1 label
+         |                          FROM    $ads_company_holder
+         |                          WHERE   ds > '0'
+         |                          AND holder_type = 1
+         |                          UNION ALL
+         |                          --主要成员
+         |                          SELECT  rowkey,company_id,company_name,staff_name human_name,hid,2 as status,create_time,update_time,deleted,ds,2 label
+         |                          FROM    $inc_ads_company_staff
+         |                          WHERE   ds > '0'
+         |                          UNION ALL
+         |                          SELECT  rowkey,company_id,company_name,staff_name human_name,hid,2 as status,create_time,update_time,deleted,ds,2 label
+         |                          FROM    $ads_company_staff
+         |                          WHERE   ds > '0'
+         |                          UNION ALL
+         |                          --法人
+         |                          SELECT  rowkey,company_id,company_name,legal_entity_name human_name,legal_entity_id as hid,2 as status,create_time,update_time,deleted,ds,3 label
+         |                          FROM    $inc_ads_company_legal_entity
+         |                          WHERE   ds > '0'
+         |                          AND legal_entity_type = 1
+         |                      )
+         |                 ) WHERE NUM = 1
+         |      ) WHERE NUM2 = 1
          |) b on a.company_id = b.company_id
          |""".stripMargin).createOrReplaceTempView("incr_company_human_all")
 
@@ -276,12 +280,12 @@ case class inc_company_relation_v2(s: SparkSession,
          |                        SELECT  company_id,human_name,human_pid,ds,update_time,deleted
          |                        FROM    $inc_ads_company_human_relation
          |                        WHERE   ds > '0'
-         |                        -- AND deleted <> 9
+         |                        AND deleted <> 9
          |                        UNION ALL
          |                        SELECT  company_id,human_name,human_pid,ds,update_time,deleted
          |                        FROM    $inc_ads_company_human_relation_merge
          |                        WHERE   ds > '0'
-         |                        -- AND deleted <> 9
+         |                        AND deleted <> 9
          |                        )
          |                )
          |        WHERE   num =1
@@ -526,6 +530,34 @@ case class inc_company_relation_v2(s: SparkSession,
   //导出ES数据准备
   def exportESPre(): Unit = {
 
+    //重新计算新增,更新人员所属公司
+    sql(
+      s"""
+         |SELECT  b.*
+         |FROM    (
+         |        SELECT  human_pid
+         |        FROM    $inc_ads_company_human_relation
+         |        WHERE   ds = '$ds'
+         |        GROUP BY human_pid
+         |        ) a
+         |JOIN (
+         |        SELECT  *
+         |        FROM    (
+         |                SELECT  * ,ROW_NUMBER()OVER (PARTITION BY company_id, human_pid ORDER BY ds DESC , update_time DESC ) num
+         |                FROM    (
+         |                        SELECT  *
+         |                        FROM    $inc_ads_company_human_relation_merge
+         |                        WHERE   ds > '0'
+         |                        UNION ALL
+         |                        SELECT  *
+         |                        FROM    $inc_ads_company_human_relation
+         |                        WHERE   ds > '0'
+         |                        )
+         |                ) WHERE   num =1
+         |     ) b
+         |ON      a.human_pid = b.human_pid
+        |""".stripMargin).createOrReplaceTempView("inc_update_deleted_person_company")
+
     //删除数据
     sql(
       s"""
@@ -537,7 +569,7 @@ case class inc_company_relation_v2(s: SparkSession,
          |        SELECT
          |                human_pid
          |                ,deleted
-         |        FROM    $inc_ads_company_human_relation
+         |        FROM    inc_update_deleted_person_company
          |        WHERE   ds = '$ds'
          |        AND deleted = 9
          |        UNION ALL
@@ -546,7 +578,7 @@ case class inc_company_relation_v2(s: SparkSession,
          |                SELECT
          |                        human_pid
          |                        ,MIN(deleted) OVER(PARTITION BY human_pid ) deleted
-         |                FROM    $inc_ads_company_human_relation
+         |                FROM    inc_update_deleted_person_company
          |                WHERE   ds = '$ds'
          |                AND deleted <> 9
          |                )
@@ -557,7 +589,7 @@ case class inc_company_relation_v2(s: SparkSession,
 
     //合并数据
     inc_human_relation_util
-      .merge(s, ds, inc_ads_company_human_relation, inc_ads_company_human_relation_update)
+      .merge(s, ds, "inc_update_deleted_person_company", inc_ads_company_human_relation_update)
   }
 
 }

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/ng/relation/inc_human_relation_merge.scala

@@ -152,12 +152,12 @@ case class inc_human_relation_merge(s: SparkSession,
          |    SELECT *
          |    FROM  $inc_ads_company_human_relation_merge
          |    WHERE   ds < '$ds'
-         |    AND     deleted <> 9
+         |    -- AND     deleted <> 9
          |    UNION ALL
          |    SELECT *
          |    FROM  $inc_ads_company_human_relation
          |    WHERE   ds > '0'
-         |    AND     deleted <> 9
+         |    -- AND     deleted <> 9
          |) b
          |on a.person_id = b.human_pid
          |""".stripMargin)