Просмотр исходного кода

Merge remote-tracking branch 'origin/master'

许家凯 3 лет назад
Родитель
Сommit
6bb0adf28c

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/ng/change/NgChangeExtract.scala

@@ -177,9 +177,9 @@ object NgChangeExtract {
 
 
   private val startArgs = Seq(
-    Args(tableName = "company_holder", primaryFields = "percent,amount,deleted")
+    Args(tableName = "company_holder", primaryFields = "holder_id,percent,amount,deleted")
     , Args(tableName = "company_staff", primaryFields = "staff_type,deleted")
-    , Args(tableName = "company", primaryKey = "company_id", primaryFields = "name,cate_third_code,county_code,reg_capital_amount,legal_entity_name,deleted")
+    , Args(tableName = "company", primaryKey = "company_id", primaryFields = "name,cate_third_code,county_code,reg_capital_amount,legal_entity_name,legal_entity_id,deleted")
     , Args(tableName = "company_tm", primaryFields = "status")
     , Args(tableName = "company_icp", primaryFields = "domain")
   )

+ 3 - 3
src/main/scala/com/winhc/bigdata/spark/ng/relation/inc_company_relation.scala

@@ -39,15 +39,15 @@ case class inc_company_relation(s: SparkSession,
 
     def get_relation_holder(start_id: String, start_name: String, end_id: String,
                             end_name: String, percent: Double, deleted: Int, holder_type: Int, topic_type: String): String =
-      CompanyRelationUtils.get_relation_holder(start_id, start_name, end_id, end_name, percent, deleted, holder_type, topic_type)
+      CompanyRelationUtils.get_relation_holder(start_id, start_name, end_id, end_name, percent.toString, deleted.toString, holder_type.toString, topic_type)
 
     def get_relation_staff(start_id: String, start_name: String, end_id: String,
                            end_name: String, staff_type: String, deleted: Int, topic_type: String): String =
-      CompanyRelationUtils.get_relation_staff(start_id, start_name, end_id, end_name, staff_type, deleted, topic_type)
+      CompanyRelationUtils.get_relation_staff(start_id, start_name, end_id, end_name, staff_type, deleted.toString, topic_type)
 
     def get_relation_legal_entity(start_id: String, start_name: String, end_id: String,
                                   end_name: String, deleted: Int, legal_entity_type: String, topic_type: String): String =
-      CompanyRelationUtils.get_relation_legal_entity(start_id, start_name, end_id, end_name, deleted, legal_entity_type, topic_type)
+      CompanyRelationUtils.get_relation_legal_entity(start_id, start_name, end_id, end_name, deleted.toString, legal_entity_type, topic_type)
 
 
     spark.udf.register("get_company_node", get_company_node _)

+ 71 - 82
src/main/scala/com/winhc/bigdata/spark/ng/relation/inc_company_relation_v2.scala

@@ -47,15 +47,15 @@ case class inc_company_relation_v2(s: SparkSession,
 
     def get_relation_holder(start_id: String, start_name: String, end_id: String,
                             end_name: String, percent: Double, deleted: Int, holder_type: Int, topic_type: String): String =
-      CompanyRelationUtils.get_relation_holder(start_id, start_name, end_id, end_name, percent, deleted, holder_type, topic_type)
+      CompanyRelationUtils.get_relation_holder(start_id, start_name, end_id, end_name, percent.toString, deleted.toString, holder_type.toString, topic_type)
 
     def get_relation_staff(start_id: String, start_name: String, end_id: String,
                            end_name: String, staff_type: String, deleted: Int, topic_type: String): String =
-      CompanyRelationUtils.get_relation_staff(start_id, start_name, end_id, end_name, staff_type, deleted, topic_type)
+      CompanyRelationUtils.get_relation_staff(start_id, start_name, end_id, end_name, staff_type, deleted.toString, topic_type)
 
     def get_relation_legal_entity(start_id: String, start_name: String, end_id: String,
                                   end_name: String, deleted: Int, legal_entity_type: String, topic_type: String): String =
-      CompanyRelationUtils.get_relation_legal_entity(start_id, start_name, end_id, end_name, deleted, legal_entity_type, topic_type)
+      CompanyRelationUtils.get_relation_legal_entity(start_id, start_name, end_id, end_name, deleted.toString, legal_entity_type, topic_type)
 
     spark.udf.register("get_company_node", get_company_node _)
     spark.udf.register("get_person_node", get_person_node _)
@@ -105,7 +105,7 @@ case class inc_company_relation_v2(s: SparkSession,
          |        ,new_data['deleted'] AS deleted
          |FROM    $ads_change_extract
          |WHERE   ds = '$ds' AND     tn = 'company'
-         |AND     (change_fields LIKE '%legal_entity_name%' or change_fields is null or change_fields LIKE '%deleted%')
+         |AND     (change_fields LIKE '%legal_entity_name%' or change_fields is null or change_fields LIKE '%deleted%' OR change_fields LIKE '%legal_entity_id%')
          |AND     length(trim(new_data['legal_entity_name'])) > 0
          |UNION ALL
          |SELECT  CONCAT_WS('_',company_id,hash(old_data['legal_entity_name'])) AS rowkey
@@ -201,36 +201,40 @@ case class inc_company_relation_v2(s: SparkSession,
          |  SELECT *
          |  FROM (
          |         SELECT company_id,name_cleanup(human_name) human_name,create_time,update_time
-         |             ,ROW_NUMBER() OVER (PARTITION BY company_id,human_name ORDER BY ds desc,update_time desc) num
          |             ,MIN(deleted) OVER (PARTITION BY company_id,human_name) deleted
+         |             ,ROW_NUMBER() OVER (PARTITION BY company_id,human_name ORDER BY ds desc,update_time desc) num2
          |              FROM (
-         |                    --股东
-         |                    SELECT  rowkey,company_id,company_name,holder_name human_name,holder_id hid,2 as status,create_time,update_time,deleted,ds,1 label
-         |                    FROM    $inc_ads_company_holder
-         |                    WHERE   ds > '0'
-         |                    AND holder_type = 1
-         |                    UNION ALL
-         |                    SELECT  rowkey,company_id,company_name,holder_name human_name,holder_id hid,2 as status,create_time,update_time,deleted,ds,1 label
-         |                    FROM    $ads_company_holder
-         |                    WHERE   ds > '0'
-         |                    AND holder_type = 1
-         |                    UNION ALL
-         |                    --主要成员
-         |                    SELECT  rowkey,company_id,company_name,staff_name human_name,hid,2 as status,create_time,update_time,deleted,ds,2 label
-         |                    FROM    $inc_ads_company_staff
-         |                    WHERE   ds > '0'
-         |                    UNION ALL
-         |                    SELECT  rowkey,company_id,company_name,staff_name human_name,hid,2 as status,create_time,update_time,deleted,ds,2 label
-         |                    FROM    $ads_company_staff
-         |                    WHERE   ds > '0'
-         |                    UNION ALL
-         |                    --法人
-         |                    SELECT  rowkey,company_id,company_name,legal_entity_name human_name,legal_entity_id as hid,2 as status,create_time,update_time,deleted,ds,3 label
-         |                    FROM    $inc_ads_company_legal_entity
-         |                    WHERE   ds > '0'
-         |                    AND legal_entity_type = 1
-         |                 )
-         |      ) WHERE NUM = 1
+         |                   SELECT rowkey,company_id,company_name,human_name,hid,status,create_time,update_time,deleted,ds,label
+         |                         ,ROW_NUMBER() OVER (PARTITION BY company_id,human_name,label ORDER BY ds desc,update_time desc) num
+         |                   FROM (
+         |                          --股东
+         |                          SELECT  rowkey,company_id,company_name,holder_name human_name,holder_id hid,2 as status,create_time,update_time,deleted,ds,1 label
+         |                          FROM    $inc_ads_company_holder
+         |                          WHERE   ds > '0'
+         |                          AND holder_type = 1
+         |                          UNION ALL
+         |                          SELECT  rowkey,company_id,company_name,holder_name human_name,holder_id hid,2 as status,create_time,update_time,deleted,ds,1 label
+         |                          FROM    $ads_company_holder
+         |                          WHERE   ds > '0'
+         |                          AND holder_type = 1
+         |                          UNION ALL
+         |                          --主要成员
+         |                          SELECT  rowkey,company_id,company_name,staff_name human_name,hid,2 as status,create_time,update_time,deleted,ds,2 label
+         |                          FROM    $inc_ads_company_staff
+         |                          WHERE   ds > '0'
+         |                          UNION ALL
+         |                          SELECT  rowkey,company_id,company_name,staff_name human_name,hid,2 as status,create_time,update_time,deleted,ds,2 label
+         |                          FROM    $ads_company_staff
+         |                          WHERE   ds > '0'
+         |                          UNION ALL
+         |                          --法人
+         |                          SELECT  rowkey,company_id,company_name,legal_entity_name human_name,legal_entity_id as hid,2 as status,create_time,update_time,deleted,ds,3 label
+         |                          FROM    $inc_ads_company_legal_entity
+         |                          WHERE   ds > '0'
+         |                          AND legal_entity_type = 1
+         |                      )
+         |                 ) WHERE NUM = 1
+         |      ) WHERE NUM2 = 1
          |) b on a.company_id = b.company_id
          |""".stripMargin).createOrReplaceTempView("incr_company_human_all")
 
@@ -276,12 +280,12 @@ case class inc_company_relation_v2(s: SparkSession,
          |                        SELECT  company_id,human_name,human_pid,ds,update_time,deleted
          |                        FROM    $inc_ads_company_human_relation
          |                        WHERE   ds > '0'
-         |                        -- AND deleted <> 9
+         |                        AND deleted <> 9
          |                        UNION ALL
          |                        SELECT  company_id,human_name,human_pid,ds,update_time,deleted
          |                        FROM    $inc_ads_company_human_relation_merge
          |                        WHERE   ds > '0'
-         |                        -- AND deleted <> 9
+         |                        AND deleted <> 9
          |                        )
          |                )
          |        WHERE   num =1
@@ -357,7 +361,7 @@ case class inc_company_relation_v2(s: SparkSession,
          |    SELECT rowkey
          |    FROM $ads_change_extract
          |    WHERE   ds = '$ds' AND tn = 'company_holder'
-         |    AND    (change_fields LIKE '%percent%' OR change_fields is null OR change_fields LIKE '%deleted%')
+         |    AND    (change_fields LIKE '%percent%' OR change_fields is null OR change_fields LIKE '%deleted%' OR change_fields LIKE '%holder_id%')
          |   ) b
          |ON    a.rowkey = b.rowkey
          |)
@@ -526,6 +530,34 @@ case class inc_company_relation_v2(s: SparkSession,
   //导出ES数据准备
   def exportESPre(): Unit = {
 
+    //重新计算新增,更新人员所属公司
+    sql(
+      s"""
+         |SELECT  b.*
+         |FROM    (
+         |        SELECT  human_pid
+         |        FROM    $inc_ads_company_human_relation
+         |        WHERE   ds = '$ds'
+         |        GROUP BY human_pid
+         |        ) a
+         |JOIN (
+         |        SELECT  *
+         |        FROM    (
+         |                SELECT  * ,ROW_NUMBER()OVER (PARTITION BY company_id, human_pid ORDER BY ds DESC , update_time DESC ) num
+         |                FROM    (
+         |                        SELECT  *
+         |                        FROM    $inc_ads_company_human_relation_merge
+         |                        WHERE   ds > '0'
+         |                        UNION ALL
+         |                        SELECT  *
+         |                        FROM    $inc_ads_company_human_relation
+         |                        WHERE   ds > '0'
+         |                        )
+         |                ) WHERE   num =1
+         |     ) b
+         |ON      a.human_pid = b.human_pid
+        |""".stripMargin).createOrReplaceTempView("inc_update_deleted_person_company")
+
     //删除数据
     sql(
       s"""
@@ -537,7 +569,7 @@ case class inc_company_relation_v2(s: SparkSession,
          |        SELECT
          |                human_pid
          |                ,deleted
-         |        FROM    $inc_ads_company_human_relation
+         |        FROM    inc_update_deleted_person_company
          |        WHERE   ds = '$ds'
          |        AND deleted = 9
          |        UNION ALL
@@ -546,7 +578,7 @@ case class inc_company_relation_v2(s: SparkSession,
          |                SELECT
          |                        human_pid
          |                        ,MIN(deleted) OVER(PARTITION BY human_pid ) deleted
-         |                FROM    $inc_ads_company_human_relation
+         |                FROM    inc_update_deleted_person_company
          |                WHERE   ds = '$ds'
          |                AND deleted <> 9
          |                )
@@ -557,7 +589,7 @@ case class inc_company_relation_v2(s: SparkSession,
 
     //合并数据
     inc_human_relation_util
-      .merge(s, ds, inc_ads_company_human_relation, inc_ads_company_human_relation_update)
+      .merge(s, ds, "inc_update_deleted_person_company", inc_ads_company_human_relation_update)
   }
 
 }
@@ -583,47 +615,4 @@ object inc_company_relation_v2 {
     re.exportESPre()
     spark.stop()
   }
-}
-
-//    sql(
-//      s"""
-//         |SELECT  a.*
-//         |FROM (
-//         |SELECT  *
-//         |FROM    (
-//         |           SELECT  *
-//         |                ,ROW_NUMBER() OVER (PARTITION BY company_id,human_pid ORDER BY ds desc,update_time desc) num
-//         |           FROM  (
-//         |                     SELECT  *
-//         |                     FROM    $inc_ads_company_human_relation
-//         |                     WHERE   ds > '0'
-//         |                     -- AND     deleted <> 9
-//         |                     UNION ALL
-//         |                     SELECT  *
-//         |                     FROM    $inc_ads_company_human_relation_merge
-//         |                     WHERE   ds > '0'
-//         |                     -- AND     deleted <> 9
-//         |              )
-//         |          )
-//         |WHERE   num = 1
-//         |) a
-//         |JOIN (
-//         |              SELECT  start_id
-//         |              FROM    (
-//         |                          SELECT  start_id
-//         |                          FROM    $inc_ads_relation_holder
-//         |                          WHERE   ds = '$ds'
-//         |                          AND     holder_type = 1
-//         |                          UNION ALL
-//         |                          SELECT  start_id
-//         |                          FROM    $inc_ads_relation_staff
-//         |                          WHERE   ds = '$ds'
-//         |                          UNION ALL
-//         |                          SELECT  start_id
-//         |                          FROM    $inc_ads_relation_legal_entity
-//         |                          WHERE   ds = '$ds'
-//         |                          AND     legal_entity_type = 1
-//         |                      )
-//         |              GROUP BY start_id
-//         | ) b on a.human_pid = b.start_id
-//         |""".stripMargin).createOrReplaceTempView("update_tab")
+}

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/ng/relation/inc_human_relation_merge.scala

@@ -152,12 +152,12 @@ case class inc_human_relation_merge(s: SparkSession,
          |    SELECT *
          |    FROM  $inc_ads_company_human_relation_merge
          |    WHERE   ds < '$ds'
-         |    AND     deleted <> 9
+         |    -- AND     deleted <> 9
          |    UNION ALL
          |    SELECT *
          |    FROM  $inc_ads_company_human_relation
          |    WHERE   ds > '0'
-         |    AND     deleted <> 9
+         |    -- AND     deleted <> 9
          |) b
          |on a.person_id = b.human_pid
          |""".stripMargin)

+ 6 - 8
src/main/scala/com/winhc/bigdata/spark/utils/CompanyRelationUtils.scala

@@ -1,8 +1,6 @@
 package com.winhc.bigdata.spark.utils
 
 import com.winhc.bigdata.spark.implicits.CaseClass2JsonHelper._
-import com.winhc.bigdata.spark.utils.BaseUtil.cleanup
-import org.apache.commons.lang3.StringUtils
 
 /**
  * @author: π
@@ -13,13 +11,13 @@ case class company_node(id: String, name: String, deleted: String, topic_type: S
 case class person_node(id: String, name: String, deleted: String, topic_type: String)
 
 case class relation_holder(start_id: String, start_name: String, end_id: String,
-                           end_name: String, percent: Double, deleted: Int, holder_type: Int, topic_type: String)
+                           end_name: String, percent: String, deleted: String, holder_type: String, topic_type: String)
 
 case class relation_staff(start_id: String, start_name: String, end_id: String,
-                          end_name: String, staff_type: String, deleted: Int, topic_type: String)
+                          end_name: String, staff_type: String, deleted: String, topic_type: String)
 
 case class relation_legal_entity(start_id: String, start_name: String, end_id: String,
-                                 end_name: String, deleted: Int, legal_entity_type: String, topic_type: String)
+                                 end_name: String, deleted: String, legal_entity_type: String, topic_type: String)
 
 
 object CompanyRelationUtils {
@@ -31,15 +29,15 @@ object CompanyRelationUtils {
     person_node(id, name, deleted, topic_type).toJson()
 
   def get_relation_holder(start_id: String, start_name: String, end_id: String,
-                          end_name: String, percent: Double, deleted: Int, holder_type: Int, topic_type: String): String =
+                          end_name: String, percent: String, deleted: String, holder_type: String, topic_type: String): String =
     relation_holder(start_id, start_name, end_id, end_name, percent, deleted, holder_type, topic_type).toJson()
 
   def get_relation_staff(start_id: String, start_name: String, end_id: String,
-                         end_name: String, staff_type: String, deleted: Int, topic_type: String): String =
+                         end_name: String, staff_type: String, deleted: String, topic_type: String): String =
     relation_staff(start_id, start_name, end_id, end_name, staff_type, deleted, topic_type).toJson()
 
   def get_relation_legal_entity(start_id: String, start_name: String, end_id: String,
-                                end_name: String, deleted: Int, legal_entity_type: String, topic_type: String): String =
+                                end_name: String, deleted: String, legal_entity_type: String, topic_type: String): String =
     relation_legal_entity(start_id, start_name, end_id, end_name, deleted, legal_entity_type, topic_type).toJson()
 
 }