Pārlūkot izejas kodu

Merge remote-tracking branch 'origin/master'

许家凯 4 gadi atpakaļ
vecāks
revīzija
e758160626

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/ng/change/NgChangeExtract.scala

@@ -177,7 +177,7 @@ object NgChangeExtract {
 
 
   private val startArgs = Seq(
-    Args(tableName = "company_holder", primaryFields = "amount,deleted")
+    Args(tableName = "company_holder", primaryFields = "percent,amount,deleted")
     , Args(tableName = "company_staff", primaryFields = "staff_type,deleted")
     , Args(tableName = "company", primaryKey = "company_id", primaryFields = "name,cate_third_code,county_code,reg_capital_amount,legal_entity_name,deleted")
     , Args(tableName = "company_tm", primaryFields = "status")

+ 107 - 85
src/main/scala/com/winhc/bigdata/spark/ng/relation/inc_company_relation_v2.scala

@@ -38,6 +38,8 @@ case class inc_company_relation_v2(s: SparkSession,
 
     def get_company_node(id: String, name: String, deleted: String, topic_type: String): String = CompanyRelationUtils.get_company_node(id, name, deleted, topic_type)
 
+    def get_person_node(id: String, name: String, deleted: String, topic_type: String): String = CompanyRelationUtils.get_person_node(id, name, deleted, topic_type)
+
     def get_relation_holder(start_id: String, start_name: String, end_id: String,
                             end_name: String, percent: Double, deleted: Int, holder_type: Int, topic_type: String): String =
       CompanyRelationUtils.get_relation_holder(start_id, start_name, end_id, end_name, percent, deleted, holder_type, topic_type)
@@ -52,6 +54,7 @@ case class inc_company_relation_v2(s: SparkSession,
 
 
     spark.udf.register("get_company_node", get_company_node _)
+    spark.udf.register("get_person_node", get_person_node _)
     spark.udf.register("get_relation_holder", get_relation_holder _)
     spark.udf.register("get_relation_staff", get_relation_staff _)
     spark.udf.register("get_relation_legal_entity", get_relation_legal_entity _)
@@ -71,16 +74,14 @@ case class inc_company_relation_v2(s: SparkSession,
          |            SELECT  company_id
          |                    ,name
          |            FROM    $inc_ads_company
-         |            WHERE   ds = '$ds' AND deleted <> 9
+         |            WHERE   ds = '$ds'
+         |            -- AND deleted <> 9
          |        ) a
          |JOIN (
          |              SELECT *
          |              FROM $ads_change_extract
-         |              WHERE   ds = '$ds' AND tn = 'company' AND update_type = 'insert'
-         |              UNION ALL
-         |              SELECT *
-         |              FROM $ads_change_extract
-         |              WHERE   ds = '$ds' AND tn = 'company' AND update_type <> 'insert' AND change_fields like '%name%'
+         |              WHERE   ds = '$ds' AND tn = 'company'
+         |              AND (change_fields is null OR change_fields like '%name%')
          |          ) b
          |ON      a.company_id = b.company_id
          |""".stripMargin)
@@ -89,79 +90,33 @@ case class inc_company_relation_v2(s: SparkSession,
     sql(
       s"""
          |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $inc_ads_company_legal_entity PARTITION(ds= '$ds')
-         |SELECT  c.rowkey,c.company_id,c.company_name,c.legal_entity_id,c.legal_entity_name,c.legal_entity_type,c.create_time,c.update_time,c.deleted
-         |FROM    (
-         |            SELECT  CONCAT_WS('_',company_id,hash(legal_entity_name)) AS rowkey
-         |                    ,company_id
-         |                    ,name AS company_name
-         |                    ,legal_entity_id
-         |                    ,legal_entity_name
-         |                    ,legal_entity_type
-         |                    ,create_time
-         |                    ,update_time
-         |                    ,0 AS deleted
-         |            FROM    $inc_ads_company
-         |            WHERE   ds = '$ds'
-         |            AND     legal_entity_id IS NOT NULL
-         |            AND     length(trim(legal_entity_name)) > 0
-         |            AND     deleted <> 9
-         |        ) c
-         |JOIN (
-         |              SELECT company_id
-         |              FROM $ads_change_extract
-         |              WHERE   ds = '$ds' AND tn = 'company' AND update_type = 'insert'
-         |              UNION ALL
-         |              SELECT company_id
-         |              FROM $ads_change_extract
-         |              WHERE   ds = '$ds' AND tn = 'company' AND update_type <> 'insert' AND change_fields like '%legal_entity_name%'
-         |          ) d
-         |ON      c.company_id = d.company_id
+         |SELECT  CONCAT_WS('_',company_id,hash(new_data['legal_entity_name'])) AS rowkey
+         |        ,company_id
+         |        ,new_data['name'] AS company_name
+         |        ,new_data['legal_entity_id'] AS legal_entity_id
+         |        ,new_data['legal_entity_name'] AS legal_entity_name
+         |        ,new_data['legal_entity_type'] AS legal_entity_type
+         |        ,new_data['create_time'] AS create_time
+         |        ,new_data['update_time'] AS update_time
+         |        ,new_data['deleted'] AS deleted
+         |FROM    $ads_change_extract
+         |WHERE   ds = '$ds' AND     tn = 'company'
+         |AND     (change_fields LIKE '%legal_entity_name%' or change_fields is null or change_fields LIKE '%deleted%')
+         |AND     length(trim(new_data['legal_entity_name'])) > 0
          |UNION ALL
-         |SELECT  c.rowkey,c.company_id,c.company_name,c.legal_entity_id,c.legal_entity_name,c.legal_entity_type,c.create_time,c.update_time,c.deleted
-         |FROM    (
-         |         SELECT *
-         |          ,ROW_NUMBER() OVER(PARTITION BY company_id ORDER BY ds DESC,update_time DESC) num
-         |         FROM (
-         |             SELECT  CONCAT_WS('_',company_id,hash(legal_entity_name)) AS rowkey
-         |                    ,company_id
-         |                    ,name AS company_name
-         |                    ,legal_entity_id
-         |                    ,legal_entity_name
-         |                    ,legal_entity_type
-         |                    ,create_time
-         |                    ,update_time
-         |                    ,1 AS deleted
-         |                    ,ds
-         |            FROM    $inc_ads_company
-         |            WHERE   ds < '$ds'
-         |            AND     legal_entity_id IS NOT NULL
-         |            AND     length(trim(legal_entity_name)) > 0
-         |            AND     deleted <> 9
-         |            union all
-         |            SELECT  CONCAT_WS('_',company_id,hash(legal_entity_name)) AS rowkey
-         |                    ,company_id
-         |                    ,name AS company_name
-         |                    ,legal_entity_id
-         |                    ,legal_entity_name
-         |                    ,legal_entity_type
-         |                    ,create_time
-         |                    ,update_time
-         |                    ,1 AS deleted
-         |                    ,ds
-         |            FROM    $ads_company
-         |            WHERE   ds > '0'
-         |            AND     legal_entity_id IS NOT NULL
-         |            AND     length(trim(legal_entity_name)) > 0
-         |            AND     deleted <> 9
-         |          )
-         |        ) c
-         |JOIN (
-         |              SELECT company_id
-         |              FROM $ads_change_extract
-         |              WHERE   ds = '$ds' AND tn = 'company' AND update_type <> 'insert' AND change_fields like '%legal_entity_name%'
-         |          ) d
-         |ON      c.company_id = d.company_id
-         |WHERE c.num = 1
+         |SELECT  CONCAT_WS('_',company_id,hash(old_data['legal_entity_name'])) AS rowkey
+         |        ,company_id
+         |        ,old_data['name'] AS company_name
+         |        ,old_data['legal_entity_id'] AS legal_entity_id
+         |        ,old_data['legal_entity_name'] AS legal_entity_name
+         |        ,old_data['legal_entity_type'] AS legal_entity_type
+         |        ,old_data['create_time'] AS create_time
+         |        ,new_data['update_time'] AS update_time
+         |        ,1 AS deleted
+         |FROM    $ads_change_extract
+         |WHERE   ds = '$ds' AND     tn = 'company'
+         |AND     change_fields LIKE '%legal_entity_name%'
+         |AND     length(trim(old_data['legal_entity_name'])) > 0
          |""".stripMargin)
 
     //全量公司
@@ -188,7 +143,7 @@ case class inc_company_relation_v2(s: SparkSession,
          | WHERE   ds > '0'
          | AND     legal_entity_id IS NOT NULL
          | AND     length(trim(legal_entity_name)) > 0
-         | AND     deleted <> 9
+         | -- AND     deleted <> 9
          | union all
          | SELECT  CONCAT_WS('_',company_id,hash(legal_entity_name)) AS rowkey
          |         ,company_id
@@ -206,7 +161,7 @@ case class inc_company_relation_v2(s: SparkSession,
          | WHERE   ds > '0'
          | AND     legal_entity_id IS NOT NULL
          | AND     length(trim(legal_entity_name)) > 0
-         | AND     deleted <> 9
+         | -- AND     deleted <> 9
          |)
          |)c
          |WHERE num = 1
@@ -215,8 +170,8 @@ case class inc_company_relation_v2(s: SparkSession,
     //新增人员关系表
     sql(
       s"""
-         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $inc_ads_company_human_relation PARTITION(ds= '$ds')
-         |SELECT  CONCAT_WS('_',a.company_id,hash(a.human_name)) AS rowkey
+         |INSERT INTO TABLE $inc_ads_company_human_relation PARTITION(ds= '$ds')
+         |SELECT  md5(CONCAT_WS('_',company_id,human_pid)) as rowkey
          |        ,a.company_id
          |        ,a.company_name
          |        ,a.human_name
@@ -259,7 +214,7 @@ case class inc_company_relation_v2(s: SparkSession,
          |              SELECT  company_id,human_name,human_pid
          |                                ,ROW_NUMBER() OVER (PARTITION BY company_id,human_name ORDER BY ds desc,update_time desc) num
          |               FROM    $inc_ads_company_human_relation
-         |               WHERE   ds < '$ds'
+         |               WHERE   ds > '0'
          |              )
          |    WHERE   num = 1
          |) b
@@ -310,6 +265,7 @@ case class inc_company_relation_v2(s: SparkSession,
          |    SELECT rowkey
          |    FROM $ads_change_extract
          |    WHERE   ds = '$ds' AND tn = 'company_holder'
+         |    AND    (change_fields LIKE '%percent%' OR change_fields is null OR change_fields LIKE '%deleted%')
          |   ) b
          |ON    a.rowkey = b.rowkey
          |UNION ALL
@@ -325,6 +281,7 @@ case class inc_company_relation_v2(s: SparkSession,
          |    SELECT rowkey
          |    FROM $ads_change_extract
          |    WHERE   ds = '$ds' AND tn = 'company_holder'
+         |    AND    (change_fields LIKE '%percent%' OR change_fields is null OR change_fields LIKE '%deleted%')
          |   ) b
          |ON    a.rowkey = b.rowkey
          |)
@@ -357,6 +314,7 @@ case class inc_company_relation_v2(s: SparkSession,
          |    SELECT rowkey
          |    FROM $ads_change_extract
          |    WHERE   ds = '$ds' AND tn = 'company_staff'
+         |    AND    (change_fields LIKE '%staff_type%' OR change_fields is null OR change_fields LIKE '%deleted%')
          |   ) b
          |ON    a.rowkey = b.rowkey
          |)
@@ -400,6 +358,54 @@ case class inc_company_relation_v2(s: SparkSession,
          |AND length(start_name) <> 0  AND length(end_name) <> 0
          |""".stripMargin)
 
+    //二次修正human表,保证和增量人对齐
+    sql(
+      s"""
+         |INSERT OVERWRITE TABLE $inc_ads_company_human_relation PARTITION(ds='$ds')
+         |SELECT   a.rowkey
+         |        ,a.company_id
+         |        ,a.company_name
+         |        ,a.human_name
+         |        ,a.hid
+         |        ,a.human_pid
+         |        ,a.STATUS
+         |        ,a.create_time
+         |        ,a.update_time
+         |        ,a.deleted
+         |        ,a.province_code
+         |        ,a.city_code
+         |        ,a.county_code
+         |        ,a.reg_capital_amount
+         |        ,a.cate_first_code
+         |        ,a.cate_second_code
+         |        ,a.cate_third_code
+         |FROM    (
+         |            SELECT  *
+         |            FROM    $inc_ads_company_human_relation
+         |            WHERE   ds = '$ds'
+         |        ) a
+         |JOIN (
+         |              SELECT  start_id
+         |              FROM    (
+         |                          SELECT  start_id
+         |                          FROM    $inc_ads_relation_holder
+         |                          WHERE   ds = '$ds'
+         |                          AND     holder_type = 1
+         |                          UNION ALL
+         |                          SELECT  start_id
+         |                          FROM    $inc_ads_relation_staff
+         |                          WHERE   ds = '$ds'
+         |                          UNION ALL
+         |                          SELECT  start_id
+         |                          FROM    $inc_ads_relation_legal_entity
+         |                          WHERE   ds = '$ds'
+         |                          AND     legal_entity_type = 1
+         |                      )
+         |              GROUP BY start_id
+         |          ) b
+         |ON      a.human_pid = b.start_id
+         |""".stripMargin)
+
   }
 
   def sendKafkaPre(): Unit = {
@@ -409,6 +415,7 @@ case class inc_company_relation_v2(s: SparkSession,
     val inc_ads_relation_staff_kafka = "winhc_ng.inc_ads_relation_staff_kafka"
     val inc_ads_relation_legal_entity_v1_kafka = "winhc_ng.inc_ads_relation_legal_entity_v1_kafka"
     val inc_ads_relation_legal_entity_v2_kafka = "winhc_ng.inc_ads_relation_legal_entity_v2_kafka"
+    val inc_ads_person_node_label_kafka = "winhc_ng.inc_ads_person_node_label_kafka"
     //公司节点
     sql(
       s"""
@@ -479,6 +486,22 @@ case class inc_company_relation_v2(s: SparkSession,
          |where ds = '$ds'
          |""".stripMargin).show(20, false)
 
+//    //新增或者更新-投资人-打标签
+//    sql(
+//      s"""
+//         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table $inc_ads_person_node_label_kafka  PARTITION (ds='$ds')
+//         |select key,max(message) message
+//         |from
+//         |(
+//         |  select
+//         |  start_id key,
+//         |  get_person_node(start_id, start_name, deleted, '7') message
+//         |  from $inc_ads_relation_holder
+//         |  where ds = '$ds' and holder_type = 1
+//         |)
+//         |group by key
+//         |""".stripMargin).show(20, false)
+
     //防止空分区
     addEmptyPartitionOrSkip(inc_ads_company_node_kafka, ds)
     addEmptyPartitionOrSkip(inc_ads_relation_holder_v1_kafka, ds)
@@ -486,6 +509,7 @@ case class inc_company_relation_v2(s: SparkSession,
     addEmptyPartitionOrSkip(inc_ads_relation_staff_kafka, ds)
     addEmptyPartitionOrSkip(inc_ads_relation_legal_entity_v1_kafka, ds)
     addEmptyPartitionOrSkip(inc_ads_relation_legal_entity_v2_kafka, ds)
+    addEmptyPartitionOrSkip(inc_ads_person_node_label_kafka, ds)
 
   }
 
@@ -499,8 +523,6 @@ object inc_company_relation_v2 {
       sys.exit(-1)
     }
     val Array(project, ds) = args
-    //    val project = "winhc_ng"
-    //    val ds = "20210106"
     val config = EsConfig.getEsConfigMap ++ mutable.Map(
       "spark.hadoop.odps.project.name" -> project,
       "spark.debug.maxToStringFields" -> "200",

+ 5 - 0
src/main/scala/com/winhc/bigdata/spark/utils/CompanyRelationUtils.scala

@@ -10,6 +10,8 @@ import org.apache.commons.lang3.StringUtils
  */
 case class company_node(id: String, name: String, deleted: String, topic_type: String)
 
+case class person_node(id: String, name: String, deleted: String, topic_type: String)
+
 case class relation_holder(start_id: String, start_name: String, end_id: String,
                            end_name: String, percent: Double, deleted: Int, holder_type: Int, topic_type: String)
 
@@ -25,6 +27,9 @@ object CompanyRelationUtils {
   def get_company_node(id: String, name: String, deleted: String, topic_type: String): String =
     company_node(id, name, deleted, topic_type).toJson()
 
+  def get_person_node(id: String, name: String, deleted: String, topic_type: String): String =
+    person_node(id, name, deleted, topic_type).toJson()
+
   def get_relation_holder(start_id: String, start_name: String, end_id: String,
                           end_name: String, percent: Double, deleted: Int, holder_type: Int, topic_type: String): String =
     relation_holder(start_id, start_name, end_id, end_name, percent, deleted, holder_type, topic_type).toJson()