Pārlūkot izejas kodu

过滤重复关系和节点

xufei 4 gadi atpakaļ
vecāks
revīzija
4e7b70ca73

+ 5 - 1
src/main/scala/com/winhc/bigdata/spark/ng/change/NgChangeExtract.scala

@@ -197,7 +197,11 @@ object NgChangeExtract {
 
   def main(args: Array[String]): Unit = {
     val Array(tableName, inc_ds) = args
-
+    if (args.size != 2) {
+      println("please set tableName ds.")
+      sys.exit(-1)
+    }
+    println(s"table : $tableName , ds : $inc_ds")
     val config = EsConfig.getEsConfigMap ++ mutable.Map(
       "spark.hadoop.odps.project.name" -> "winhc_ng",
       "spark.hadoop.odps.spark.local.partition.amt" -> "100"

+ 516 - 0
src/main/scala/com/winhc/bigdata/spark/ng/relation/inc_company_relation_v2.scala

@@ -0,0 +1,516 @@
+package com.winhc.bigdata.spark.ng.relation
+
+import com.winhc.bigdata.spark.config.EsConfig
+import com.winhc.bigdata.spark.udf.{BaseFunc, CompanyMapping}
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.{CompanyRelationUtils, LoggingUtils, SparkUtils}
+import org.apache.spark.sql.SparkSession
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @author: π
+ * @date: 2021/01/10 09:36
+ */
+case class inc_company_relation_v2(s: SparkSession,
+                                   project: String, //表所在工程名
+                                   ds: String //分区
+                                  ) extends LoggingUtils with BaseFunc with CompanyMapping {
+  @(transient@getter) val spark: SparkSession = s
+
+  val inc_ads_company = "winhc_ng.inc_ads_company"
+  val ads_company = "winhc_ng.ads_company"
+  val inc_ads_company_legal_entity = "winhc_ng.inc_ads_company_legal_entity"
+  val inc_ads_company_holder = "winhc_ng.inc_ads_company_holder"
+  val inc_ads_company_human_relation = "winhc_ng.inc_ads_company_human_relation"
+  val inc_ads_company_staff = "winhc_ng.inc_ads_company_staff"
+
+  val ads_change_extract = "winhc_ng.bds_change_extract"
+
+  val inc_ads_company_node = "winhc_ng.inc_ads_company_node"
+  val inc_ads_relation_holder = "winhc_ng.inc_ads_relation_holder"
+  val inc_ads_relation_staff = "winhc_ng.inc_ads_relation_staff"
+  val inc_ads_relation_legal_entity = "winhc_ng.inc_ads_relation_legal_entity"
+
+  def register_fun(): Unit = {
+    prepareFunctions(spark)
+
+    def get_company_node(id: String, name: String, deleted: String, topic_type: String): String = CompanyRelationUtils.get_company_node(id, name, deleted, topic_type)
+
+    def get_relation_holder(start_id: String, start_name: String, end_id: String,
+                            end_name: String, percent: Double, deleted: Int, holder_type: Int, topic_type: String): String =
+      CompanyRelationUtils.get_relation_holder(start_id, start_name, end_id, end_name, percent, deleted, holder_type, topic_type)
+
+    def get_relation_staff(start_id: String, start_name: String, end_id: String,
+                           end_name: String, staff_type: String, deleted: Int, topic_type: String): String =
+      CompanyRelationUtils.get_relation_staff(start_id, start_name, end_id, end_name, staff_type, deleted, topic_type)
+
+    def get_relation_legal_entity(start_id: String, start_name: String, end_id: String,
+                                  end_name: String, deleted: Int, legal_entity_type: String, topic_type: String): String =
+      CompanyRelationUtils.get_relation_legal_entity(start_id, start_name, end_id, end_name, deleted, legal_entity_type, topic_type)
+
+
+    spark.udf.register("get_company_node", get_company_node _)
+    spark.udf.register("get_relation_holder", get_relation_holder _)
+    spark.udf.register("get_relation_staff", get_relation_staff _)
+    spark.udf.register("get_relation_legal_entity", get_relation_legal_entity _)
+  }
+
+  def inc(): Unit = {
+
+    //公司节点
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $inc_ads_company_node PARTITION(ds = '$ds')
+         |SELECT  a.company_id id
+         |        ,name_cleanup(a.name) name
+         |        ,'企业'AS LABEL
+         |        ,'0' deleted
+         |FROM    (
+         |            SELECT  company_id
+         |                    ,name
+         |            FROM    $inc_ads_company
+         |            WHERE   ds = '$ds' AND deleted <> 9
+         |        ) a
+         |JOIN (
+         |              SELECT *
+         |              FROM $ads_change_extract
+         |              WHERE   ds = '$ds' AND tn = 'company' AND update_type = 'insert'
+         |              UNION ALL
+         |              SELECT *
+         |              FROM $ads_change_extract
+         |              WHERE   ds = '$ds' AND tn = 'company' AND update_type <> 'insert' AND change_fields like '%name%'
+         |          ) b
+         |ON      a.company_id = b.company_id
+         |""".stripMargin)
+
+    //增量法人表(新增,移除法人)
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $inc_ads_company_legal_entity PARTITION(ds= '$ds')
+         |SELECT  c.rowkey,c.company_id,c.company_name,c.legal_entity_id,c.legal_entity_name,c.legal_entity_type,c.create_time,c.update_time,c.deleted
+         |FROM    (
+         |            SELECT  CONCAT_WS('_',company_id,hash(legal_entity_name)) AS rowkey
+         |                    ,company_id
+         |                    ,name AS company_name
+         |                    ,legal_entity_id
+         |                    ,legal_entity_name
+         |                    ,legal_entity_type
+         |                    ,create_time
+         |                    ,update_time
+         |                    ,0 AS deleted
+         |            FROM    $inc_ads_company
+         |            WHERE   ds = '$ds'
+         |            AND     legal_entity_id IS NOT NULL
+         |            AND     length(trim(legal_entity_name)) > 0
+         |            AND     deleted <> 9
+         |        ) c
+         |JOIN (
+         |              SELECT company_id
+         |              FROM $ads_change_extract
+         |              WHERE   ds = '$ds' AND tn = 'company' AND update_type = 'insert'
+         |              UNION ALL
+         |              SELECT company_id
+         |              FROM $ads_change_extract
+         |              WHERE   ds = '$ds' AND tn = 'company' AND update_type <> 'insert' AND change_fields like '%legal_entity_name%'
+         |          ) d
+         |ON      c.company_id = d.company_id
+         |UNION ALL
+         |SELECT  c.rowkey,c.company_id,c.company_name,c.legal_entity_id,c.legal_entity_name,c.legal_entity_type,c.create_time,c.update_time,c.deleted
+         |FROM    (
+         |         SELECT *
+         |          ,ROW_NUMBER() OVER(PARTITION BY company_id ORDER BY ds DESC,update_time DESC) num
+         |         FROM (
+         |             SELECT  CONCAT_WS('_',company_id,hash(legal_entity_name)) AS rowkey
+         |                    ,company_id
+         |                    ,name AS company_name
+         |                    ,legal_entity_id
+         |                    ,legal_entity_name
+         |                    ,legal_entity_type
+         |                    ,create_time
+         |                    ,update_time
+         |                    ,1 AS deleted
+         |                    ,ds
+         |            FROM    $inc_ads_company
+         |            WHERE   ds < '$ds'
+         |            AND     legal_entity_id IS NOT NULL
+         |            AND     length(trim(legal_entity_name)) > 0
+         |            AND     deleted <> 9
+         |            union all
+         |            SELECT  CONCAT_WS('_',company_id,hash(legal_entity_name)) AS rowkey
+         |                    ,company_id
+         |                    ,name AS company_name
+         |                    ,legal_entity_id
+         |                    ,legal_entity_name
+         |                    ,legal_entity_type
+         |                    ,create_time
+         |                    ,update_time
+         |                    ,1 AS deleted
+         |                    ,ds
+         |            FROM    $ads_company
+         |            WHERE   ds > '0'
+         |            AND     legal_entity_id IS NOT NULL
+         |            AND     length(trim(legal_entity_name)) > 0
+         |            AND     deleted <> 9
+         |          )
+         |        ) c
+         |JOIN (
+         |              SELECT company_id
+         |              FROM $ads_change_extract
+         |              WHERE   ds = '$ds' AND tn = 'company' AND update_type <> 'insert' AND change_fields like '%legal_entity_name%'
+         |          ) d
+         |ON      c.company_id = d.company_id
+         |WHERE c.num = 1
+         |""".stripMargin)
+
+    //全量公司
+    sql(
+      s"""
+         |SELECT *
+         |from (
+         |SELECT *
+         |,ROW_NUMBER() OVER(PARTITION BY company_id ORDER BY ds DESC,update_time DESC) num
+         |FROM (
+         |  SELECT  CONCAT_WS('_',company_id,hash(legal_entity_name)) AS rowkey
+         |         ,company_id
+         |         ,name AS company_name
+         |         ,legal_entity_id
+         |         ,legal_entity_name
+         |         ,legal_entity_type
+         |         ,create_time
+         |         ,update_time
+         |         ,province_code,city_code,county_code
+         |         ,reg_capital_amount,cate_first_code
+         |         ,cate_second_code,cate_third_code
+         |         ,ds
+         | FROM    $inc_ads_company
+         | WHERE   ds > '0'
+         | AND     legal_entity_id IS NOT NULL
+         | AND     length(trim(legal_entity_name)) > 0
+         | AND     deleted <> 9
+         | union all
+         | SELECT  CONCAT_WS('_',company_id,hash(legal_entity_name)) AS rowkey
+         |         ,company_id
+         |         ,name AS company_name
+         |         ,legal_entity_id
+         |         ,legal_entity_name
+         |         ,legal_entity_type
+         |         ,create_time
+         |         ,update_time
+         |          ,province_code,city_code,county_code
+         |         ,reg_capital_amount,cate_first_code
+         |         ,cate_second_code,cate_third_code
+         |         ,ds
+         | FROM    $ads_company
+         | WHERE   ds > '0'
+         | AND     legal_entity_id IS NOT NULL
+         | AND     length(trim(legal_entity_name)) > 0
+         | AND     deleted <> 9
+         |)
+         |)c
+         |WHERE num = 1
+         |""".stripMargin).createOrReplaceTempView("company_view_all")
+
+    //新增人员关系表
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $inc_ads_company_human_relation PARTITION(ds= '$ds')
+         |SELECT  CONCAT_WS('_',a.company_id,hash(a.human_name)) AS rowkey
+         |        ,a.company_id
+         |        ,a.company_name
+         |        ,a.human_name
+         |        ,hash(a.human_name) AS hid
+         |        ,concat('p',md5(uuid())) human_pid
+         |        ,a.STATUS
+         |        ,a.create_time
+         |        ,a.update_time
+         |        ,a.deleted
+         |        ,c.province_code,c.city_code,c.county_code
+         |        ,c.reg_capital_amount,c.cate_first_code
+         |        ,c.cate_second_code,c.cate_third_code
+         |FROM    (
+         |            SELECT  *
+         |            FROM    (
+         |                        SELECT  company_id,company_name,human_name,hid,status,create_time,update_time,0 as deleted
+         |                                ,ROW_NUMBER() OVER (PARTITION BY company_id,human_name ORDER BY ds desc,update_time desc) num
+         |                        FROM    (
+         |                                    --股东
+         |                                    SELECT  company_id,company_name,holder_name human_name,holder_id hid,2 as status,create_time,update_time,0 as deleted,ds
+         |                                    FROM    $inc_ads_company_holder
+         |                                    WHERE   ds = '$ds' AND holder_type = 1
+         |                                    UNION ALL
+         |                                    --主要成员
+         |                                    SELECT  company_id,company_name,staff_name human_name,hid,2 as status,create_time,update_time,0 as deleted,ds
+         |                                    FROM    $inc_ads_company_staff
+         |                                    WHERE   ds = '$ds'
+         |                                    UNION ALL
+         |                                    --法人
+         |                                    SELECT  company_id,company_name,legal_entity_name human_name,legal_entity_id as hid,2 as status,create_time,update_time,0 as deleted,ds
+         |                                    FROM    $inc_ads_company_legal_entity
+         |                                    WHERE   ds = '$ds' AND legal_entity_type = 1
+         |                                )
+         |                    )
+         |            WHERE   num = 1 AND name_cleanup(human_name) <> ''
+         |        ) a
+         |LEFT JOIN (
+         |    SELECT  company_id,human_name,human_pid
+         |    FROM    (
+         |              SELECT  company_id,human_name,human_pid
+         |                                ,ROW_NUMBER() OVER (PARTITION BY company_id,human_name ORDER BY ds desc,update_time desc) num
+         |               FROM    $inc_ads_company_human_relation
+         |               WHERE   ds < '$ds'
+         |              )
+         |    WHERE   num = 1
+         |) b
+         |ON      a.company_id = b.company_id
+         |AND     a.human_name = b.human_name
+         |JOIN (
+         |SELECT * FROM company_view_all
+         |) c
+         |ON      a.company_id = c.company_id
+         |WHERE   b.company_id IS NULL
+         |""".stripMargin)
+
+    //全量人员关系表
+    sql(
+      s"""
+         |SELECT  company_id,human_name,human_pid
+         |FROM    (
+         |          SELECT  company_id,human_name,human_pid
+         |                            ,ROW_NUMBER() OVER (PARTITION BY company_id,human_name ORDER BY ds desc,update_time desc) num
+         |           FROM    $inc_ads_company_human_relation
+         |           WHERE   ds > '0'
+         |          )
+         |WHERE   num = 1
+         |""".stripMargin).createOrReplaceTempView("company_human_relation_all")
+
+    //股东关系
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"}  table $inc_ads_relation_holder  PARTITION (ds= '$ds')
+         |SELECT
+         |start_id,start_name,end_id,end_name,percent,deleted,holder_type,label
+         |FROM (
+         |SELECT coalesce(c.human_pid,'') start_id,name_cleanup(c.human_name) start_name,a.company_id end_id,name_cleanup(a.company_name) end_name,percent,deleted,holder_type,label
+         |from (
+         |SELECT * ,
+         |    '投资' as LABEL
+         | FROM    $inc_ads_company_holder
+         | WHERE   ds = '$ds'
+         |and holder_type = 1
+         |)a
+         |JOIN
+         |(
+         |SELECT * FROM company_human_relation_all
+         |) c
+         |ON a.company_id = c.company_id and a.holder_name = c.human_name
+         |JOIN
+         |  (
+         |    SELECT rowkey
+         |    FROM $ads_change_extract
+         |    WHERE   ds = '$ds' AND tn = 'company_holder'
+         |   ) b
+         |ON    a.rowkey = b.rowkey
+         |UNION ALL
+         |SELECT holder_id start_id,name_cleanup(holder_name) start_name,a.company_id end_id,name_cleanup(company_name) end_name,percent,deleted,holder_type,label
+         |from (
+         |SELECT * ,
+         |    '投资' as LABEL
+         | FROM    $inc_ads_company_holder
+         |WHERE   ds = '$ds'
+         |and holder_type = 2
+         |)a JOIN
+         |  (
+         |    SELECT rowkey
+         |    FROM $ads_change_extract
+         |    WHERE   ds = '$ds' AND tn = 'company_holder'
+         |   ) b
+         |ON    a.rowkey = b.rowkey
+         |)
+         |WHERE start_id <> end_id
+         |AND length(start_id) >= 32  AND length(end_id) >= 32
+         |AND length(start_name) <> 0  AND length(end_name) <> 0
+         |""".stripMargin)
+
+    //主要成员关系
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"}  table $inc_ads_relation_staff  PARTITION (ds= '$ds')
+         |SELECT
+         |start_id,start_name,end_id,end_name,staff_type,deleted,label
+         |FROM (
+         |SELECT c.human_pid start_id,name_cleanup(c.human_name) start_name,a.company_id end_id,name_cleanup(a.company_name) end_name,name_cleanup(staff_type) staff_type,deleted,label
+         |from (
+         |SELECT * ,
+         |    '高管' as LABEL
+         | FROM    $inc_ads_company_staff
+         |WHERE   ds = '$ds'
+         |)a
+         |JOIN
+         |(
+         |SELECT * FROM company_human_relation_all
+         |) c
+         |ON a.company_id = c.company_id and a.staff_name   = c.human_name
+         |JOIN
+         |  (
+         |    SELECT rowkey
+         |    FROM $ads_change_extract
+         |    WHERE   ds = '$ds' AND tn = 'company_staff'
+         |   ) b
+         |ON    a.rowkey = b.rowkey
+         |)
+         |WHERE start_id <> end_id
+         |AND length(start_id) >= 32  AND length(end_id) >= 32
+         |AND length(start_name) <> 0  AND length(end_name) <> 0
+         |""".stripMargin)
+
+    //法人关系
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table $inc_ads_relation_legal_entity   PARTITION (ds='$ds')
+         |SELECT
+         |start_id,start_name,end_id,end_name,deleted,legal_entity_type,label
+         |FROM (
+         |SELECT c.human_pid start_id,name_cleanup(c.human_name) start_name,a.company_id end_id,name_cleanup(a.company_name) end_name,deleted,legal_entity_type,label
+         |from (
+         |SELECT * ,
+         |    '法人' as LABEL
+         | FROM    $inc_ads_company_legal_entity
+         |WHERE   ds = '$ds'
+         |and legal_entity_type = 1
+         |)a
+         |JOIN
+         |(
+         |SELECT * FROM company_human_relation_all
+         |) c
+         |ON a.company_id = c.company_id and a.legal_entity_name = c.human_name
+         |UNION ALL
+         |SELECT legal_entity_id start_id,name_cleanup(legal_entity_name) start_name,a.company_id end_id,name_cleanup(company_name) end_name,deleted,legal_entity_type,label
+         |from (
+         |SELECT * ,
+         |    '法人' as LABEL
+         | FROM    $inc_ads_company_legal_entity
+         |WHERE   ds = '$ds'
+         |and legal_entity_type = 2
+         |)a
+         |)
+         |WHERE start_id <> end_id
+         |AND length(start_id) >= 32  AND length(end_id) >= 32
+         |AND length(start_name) <> 0  AND length(end_name) <> 0
+         |""".stripMargin)
+
+  }
+
+  def sendKafkaPre(): Unit = {
+    val inc_ads_company_node_kafka = "winhc_ng.inc_ads_company_node_kafka"
+    val inc_ads_relation_holder_v1_kafka = "winhc_ng.inc_ads_relation_holder_v1_kafka"
+    val inc_ads_relation_holder_v2_kafka = "winhc_ng.inc_ads_relation_holder_v2_kafka"
+    val inc_ads_relation_staff_kafka = "winhc_ng.inc_ads_relation_staff_kafka"
+    val inc_ads_relation_legal_entity_v1_kafka = "winhc_ng.inc_ads_relation_legal_entity_v1_kafka"
+    val inc_ads_relation_legal_entity_v2_kafka = "winhc_ng.inc_ads_relation_legal_entity_v2_kafka"
+    //公司节点
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table $inc_ads_company_node_kafka  PARTITION (ds='$ds')
+         |select
+         |id key,
+         |get_company_node(id, name, deleted, '1') message
+         |from $inc_ads_company_node
+         |where ds = '$ds'
+         |""".stripMargin).show(20, false)
+
+    //股东关系(人 -> 公司)
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table $inc_ads_relation_holder_v1_kafka  PARTITION (ds='$ds')
+         |select
+         |end_id key,
+         |get_relation_holder(start_id, start_name, end_id, end_name, percent, deleted, holder_type, '2') message
+         |from $inc_ads_relation_holder
+         |where ds = '$ds'
+         |and holder_type = 1
+         |""".stripMargin).show(20, false)
+
+    //股东关系(公司 -> 公司)
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table $inc_ads_relation_holder_v2_kafka  PARTITION (ds='$ds')
+         |select
+         |end_id key,
+         |get_relation_holder(start_id, start_name, end_id, end_name, percent, deleted, holder_type, '3') message
+         |from $inc_ads_relation_holder
+         |where ds = '$ds'
+         |and holder_type = 2
+         |""".stripMargin).show(20, false)
+
+    //法人关系(人 -> 公司)
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table $inc_ads_relation_legal_entity_v1_kafka  PARTITION (ds='$ds')
+         |select
+         |end_id key,
+         |get_relation_legal_entity(start_id, start_name, end_id, end_name, deleted, legal_entity_type, '4') message
+         |from $inc_ads_relation_legal_entity
+         |where ds = '$ds'
+         |and legal_entity_type = '1'
+         |""".stripMargin).show(20, false)
+
+    //法人关系(公司 -> 公司)
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table $inc_ads_relation_legal_entity_v2_kafka  PARTITION (ds='$ds')
+         |select
+         |end_id key,
+         |get_relation_legal_entity(start_id, start_name, end_id, end_name, deleted, legal_entity_type, '5') message
+         |from $inc_ads_relation_legal_entity
+         |where ds = '$ds'
+         |and legal_entity_type = '2'
+         |""".stripMargin).show(20, false)
+
+    //主要成员关系(人 -> 公司)
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table $inc_ads_relation_staff_kafka  PARTITION (ds='$ds')
+         |select
+         |end_id key,
+         |get_relation_staff(start_id, start_name, end_id, end_name, staff_type, deleted, '6') message
+         |from $inc_ads_relation_staff
+         |where ds = '$ds'
+         |""".stripMargin).show(20, false)
+
+    //防止空分区
+    addEmptyPartitionOrSkip(inc_ads_company_node_kafka, ds)
+    addEmptyPartitionOrSkip(inc_ads_relation_holder_v1_kafka, ds)
+    addEmptyPartitionOrSkip(inc_ads_relation_holder_v2_kafka, ds)
+    addEmptyPartitionOrSkip(inc_ads_relation_staff_kafka, ds)
+    addEmptyPartitionOrSkip(inc_ads_relation_legal_entity_v1_kafka, ds)
+    addEmptyPartitionOrSkip(inc_ads_relation_legal_entity_v2_kafka, ds)
+
+  }
+
+}
+
+
+object inc_company_relation_v2 {
+  def main(args: Array[String]): Unit = {
+    if (args.size != 2) {
+      println("please set project ds.")
+      sys.exit(-1)
+    }
+    val Array(project, ds) = args
+    //    val project = "winhc_ng"
+    //    val ds = "20210106"
+    val config = EsConfig.getEsConfigMap ++ mutable.Map(
+      "spark.hadoop.odps.project.name" -> project,
+      "spark.debug.maxToStringFields" -> "200",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "10000"
+    )
+    val spark = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    val re = inc_company_relation_v2(s = spark, project = project, ds = ds)
+    re.register_fun()
+    re.inc()
+    re.sendKafkaPre()
+    spark.stop()
+  }
+}