Forráskód Böngészése

合并删除数据回流

xufei 3 éve
szülő
commit
fa3a7e0f28

+ 31 - 17
src/main/scala/com/winhc/bigdata/spark/ng/relation/inc_company_relation_v2.scala

@@ -24,6 +24,7 @@ case class inc_company_relation_v2(s: SparkSession,
   val inc_ads_company_legal_entity = "winhc_ng.inc_ads_company_legal_entity"
   val inc_ads_company_holder = "winhc_ng.inc_ads_company_holder"
   val inc_ads_company_human_relation = "winhc_ng.inc_ads_company_human_relation"
+  val inc_ads_company_human_relation_merge = "winhc_ng.inc_ads_company_human_relation_merge"
   val inc_ads_company_staff = "winhc_ng.inc_ads_company_staff"
 
   val ads_change_extract = "winhc_ng.bds_change_extract"
@@ -141,8 +142,6 @@ case class inc_company_relation_v2(s: SparkSession,
          |         ,ds
          | FROM    $inc_ads_company
          | WHERE   ds > '0'
-         | AND     legal_entity_id IS NOT NULL
-         | AND     length(trim(legal_entity_name)) > 0
          | -- AND     deleted <> 9
          | union all
          | SELECT  CONCAT_WS('_',company_id,hash(legal_entity_name)) AS rowkey
@@ -159,8 +158,6 @@ case class inc_company_relation_v2(s: SparkSession,
          |         ,ds
          | FROM    $ads_company
          | WHERE   ds > '0'
-         | AND     legal_entity_id IS NOT NULL
-         | AND     length(trim(legal_entity_name)) > 0
          | -- AND     deleted <> 9
          |)
          |)c
@@ -168,6 +165,7 @@ case class inc_company_relation_v2(s: SparkSession,
          |""".stripMargin).createOrReplaceTempView("company_view_all")
 
     //新增人员关系表
+    //TODO 过滤 deleted = 1
     sql(
       s"""
          |INSERT INTO TABLE $inc_ads_company_human_relation PARTITION(ds= '$ds')
@@ -187,7 +185,7 @@ case class inc_company_relation_v2(s: SparkSession,
          |FROM    (
          |            SELECT  *,concat('p',md5(uuid())) human_pid
          |            FROM    (
-         |                        SELECT  company_id,company_name,human_name,hid,status,create_time,update_time,0 as deleted
+         |                        SELECT  company_id,company_name,name_cleanup(human_name) human_name,hid,status,create_time,update_time,0 as deleted
          |                                ,ROW_NUMBER() OVER (PARTITION BY company_id,human_name ORDER BY ds desc,update_time desc) num
          |                        FROM    (
          |                                    --股东
@@ -211,15 +209,23 @@ case class inc_company_relation_v2(s: SparkSession,
          |LEFT JOIN (
          |    SELECT  company_id,human_name,human_pid
          |    FROM    (
-         |              SELECT  company_id,human_name,human_pid
-         |                                ,ROW_NUMBER() OVER (PARTITION BY company_id,human_name ORDER BY ds desc,update_time desc) num
-         |               FROM    $inc_ads_company_human_relation
-         |               WHERE   ds > '0'
+         |               SELECT  company_id,human_name,human_pid
+         |                                     ,ROW_NUMBER() OVER (PARTITION BY company_id,human_name ORDER BY ds desc,update_time desc) num
+         |               FROM(
+         |                    SELECT  company_id,human_name,human_pid,ds,update_time
+         |                    FROM    $inc_ads_company_human_relation
+         |                    WHERE   ds > '0'
+         |                    UNION ALL
+         |                    SELECT  company_id,human_name,human_pid,ds,update_time
+         |                    FROM    $inc_ads_company_human_relation_merge
+         |                    WHERE   ds > '0'
+         |                    AND     deleted = 0
+         |                 )
          |              )
          |    WHERE   num = 1
          |) b
          |ON      a.company_id = b.company_id
-         |AND     a.human_name = b.human_name
+         |AND     name_cleanup(a.human_name) = name_cleanup(b.human_name)
          |JOIN (
          |SELECT * FROM company_view_all
          |) c
@@ -232,10 +238,18 @@ case class inc_company_relation_v2(s: SparkSession,
       s"""
          |SELECT  company_id,human_name,human_pid
          |FROM    (
-         |          SELECT  company_id,human_name,human_pid
-         |                            ,ROW_NUMBER() OVER (PARTITION BY company_id,human_name ORDER BY ds desc,update_time desc) num
-         |           FROM    $inc_ads_company_human_relation
-         |           WHERE   ds > '0'
+         |           SELECT  company_id,human_name,human_pid
+         |                ,ROW_NUMBER() OVER (PARTITION BY company_id,human_name ORDER BY ds desc,update_time desc) num
+         |           FROM  (
+         |                     SELECT  company_id,human_name,human_pid,ds,update_time
+         |                     FROM    $inc_ads_company_human_relation
+         |                     WHERE   ds > '0'
+         |                     UNION ALL
+         |                     SELECT  company_id,human_name,human_pid,ds,update_time
+         |                     FROM    $inc_ads_company_human_relation_merge
+         |                     WHERE   ds > '0'
+         |                     AND     deleted = 0
+         |              )
          |          )
          |WHERE   num = 1
          |""".stripMargin).createOrReplaceTempView("company_human_relation_all")
@@ -259,7 +273,7 @@ case class inc_company_relation_v2(s: SparkSession,
          |(
          |SELECT * FROM company_human_relation_all
          |) c
-         |ON a.company_id = c.company_id and a.holder_name = c.human_name
+         |ON a.company_id = c.company_id and name_cleanup(a.holder_name) = name_cleanup(c.human_name)
          |JOIN
          |  (
          |    SELECT rowkey
@@ -308,7 +322,7 @@ case class inc_company_relation_v2(s: SparkSession,
          |(
          |SELECT * FROM company_human_relation_all
          |) c
-         |ON a.company_id = c.company_id and a.staff_name   = c.human_name
+         |ON a.company_id = c.company_id and name_cleanup(a.staff_name)   = name_cleanup(c.human_name)
          |JOIN
          |  (
          |    SELECT rowkey
@@ -342,7 +356,7 @@ case class inc_company_relation_v2(s: SparkSession,
          |(
          |SELECT * FROM company_human_relation_all
          |) c
-         |ON a.company_id = c.company_id and a.legal_entity_name = c.human_name
+         |ON a.company_id = c.company_id and name_cleanup(a.legal_entity_name) = name_cleanup(c.human_name)
          |UNION ALL
          |SELECT legal_entity_id start_id,name_cleanup(legal_entity_name) start_name,a.company_id end_id,name_cleanup(company_name) end_name,deleted,legal_entity_type,label
          |from (

+ 180 - 0
src/main/scala/com/winhc/bigdata/spark/ng/relation/inc_human_relation_merge.scala

@@ -0,0 +1,180 @@
+package com.winhc.bigdata.spark.ng.relation
+
+import com.winhc.bigdata.spark.config.EsConfig
+import com.winhc.bigdata.spark.udf.{BaseFunc, CompanyMapping}
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.{CompanyRelationUtils, LoggingUtils, SparkUtils}
+import org.apache.spark.sql.SparkSession
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @author: π
+ * @date: 2021/01/10 09:36
+ */
+case class inc_human_relation_merge(s: SparkSession,
+                                    project: String, //表所在工程名
+                                    ds: String //分区
+                                  ) extends LoggingUtils with BaseFunc with CompanyMapping {
+  @(transient@getter) val spark: SparkSession = s
+
+  val inc_ads_company = "winhc_ng.inc_ads_company"
+  val ads_company = "winhc_ng.ads_company"
+  val inc_ads_company_legal_entity = "winhc_ng.inc_ads_company_legal_entity"
+  val inc_ads_company_holder = "winhc_ng.inc_ads_company_holder"
+  val inc_ads_company_human_relation = "winhc_ng.inc_ads_company_human_relation"
+  val inc_ods_company_human_relation_merge = "winhc_ng.inc_ods_company_human_relation_merge"
+  val inc_ads_company_human_relation_merge = "winhc_ng.inc_ads_company_human_relation_merge"
+  val inc_ads_company_staff = "winhc_ng.inc_ads_company_staff"
+
+  val ads_change_extract = "winhc_ng.bds_change_extract"
+
+  val inc_ads_company_node = "winhc_ng.inc_ads_company_node"
+  val inc_ads_relation_holder = "winhc_ng.inc_ads_relation_holder"
+  val inc_ads_relation_staff = "winhc_ng.inc_ads_relation_staff"
+  val inc_ads_relation_legal_entity = "winhc_ng.inc_ads_relation_legal_entity"
+
+  def register_fun(): Unit = {
+    prepareFunctions(spark)
+  }
+
+  def inc(): Unit = {
+
+    //合并删除
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table $inc_ads_company_human_relation_merge PARTITION (ds='$ds')
+         |SELECT  CONCAT_WS('_',a.company_id,a.person_id) AS rowkey
+         |        ,a.company_id
+         |        ,b.company_name
+         |        ,a.person_name human_name
+         |        ,hash(a.person_name) hid
+         |        ,a.person_id human_pid
+         |        ,2 STATUS
+         |        ,a.create_time
+         |        ,a.update_time
+         |        ,a.deleted
+         |        ,b.province_code
+         |        ,b.city_code
+         |        ,b.county_code
+         |        ,b.reg_capital_amount
+         |        ,b.cate_first_code
+         |        ,b.cate_second_code
+         |        ,b.cate_third_code
+         |FROM    (
+         |            SELECT  company_id
+         |                    ,person_name
+         |                    ,person_id
+         |                    ,create_time
+         |                    ,update_time
+         |                    ,deleted
+         |            FROM    $inc_ods_company_human_relation_merge
+         |            WHERE   ds = '$ds'
+         |            AND     deleted = 0
+         |        ) a
+         |JOIN    (
+         |            SELECT  *
+         |            FROM    (
+         |                        SELECT  *
+         |                                ,ROW_NUMBER() OVER(PARTITION BY company_id ORDER BY ds DESC,update_time DESC) num
+         |                        FROM    (
+         |                                    SELECT  company_id
+         |                                            ,name AS company_name
+         |                                            ,province_code
+         |                                            ,city_code
+         |                                            ,county_code
+         |                                            ,reg_capital_amount
+         |                                            ,cate_first_code
+         |                                            ,cate_second_code
+         |                                            ,cate_third_code
+         |                                            ,update_time
+         |                                            ,ds
+         |                                    FROM    $inc_ads_company
+         |                                    WHERE   ds > '0'
+         |                                    UNION ALL
+         |                                    SELECT  company_id
+         |                                            ,name AS company_name
+         |                                            ,province_code
+         |                                            ,city_code
+         |                                            ,county_code
+         |                                            ,reg_capital_amount
+         |                                            ,cate_first_code
+         |                                            ,cate_second_code
+         |                                            ,cate_third_code
+         |                                            ,update_time
+         |                                            ,ds
+         |                                    FROM    $ads_company
+         |                                    WHERE   ds > '0'
+         |                                )
+         |                    ) c
+         |            WHERE   num = 1
+         |        ) b
+         |ON      a.company_id = b.company_id
+         |UNION ALL
+         |SELECT
+         |         CONCAT_WS('_',company_id,a.person_id) AS rowkey
+         |        ,company_id
+         |        ,company_name
+         |        ,a.person_name human_name
+         |        ,hash(a.person_name) hid
+         |        ,a.person_id human_pid
+         |        ,STATUS
+         |        ,a.create_time
+         |        ,a.update_time
+         |        ,a.deleted
+         |        ,province_code
+         |        ,city_code
+         |        ,county_code
+         |        ,reg_capital_amount
+         |        ,cate_first_code
+         |        ,cate_second_code
+         |        ,cate_third_code
+         |from (
+         |  SELECT person_name
+         |        ,person_id
+         |        ,create_time
+         |        ,update_time
+         |        ,deleted
+         |FROM    $inc_ods_company_human_relation_merge
+         |WHERE   ds = '$ds'
+         |AND     deleted = 1
+         |) a
+         |JOIN (
+         |    SELECT *
+         |    FROM  $inc_ads_company_human_relation_merge
+         |    WHERE   ds < '$ds'
+         |    UNION ALL
+         |    SELECT *
+         |    FROM  $inc_ads_company_human_relation
+         |    WHERE   ds > '0'
+         |) b
+         |on a.person_id = b.human_pid
+         |""".stripMargin)
+
+  }
+
+}
+
+
+object inc_human_relation_merge {
+  def main(args: Array[String]): Unit = {
+    if (args.size != 2) {
+      println("please set project ds.")
+      sys.exit(-1)
+    }
+    val Array(project, ds) = args
+    val config = EsConfig.getEsConfigMap ++ mutable.Map(
+      "spark.hadoop.odps.project.name" -> project,
+      "spark.debug.maxToStringFields" -> "200",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "10000"
+    )
+    val spark = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    val re = inc_human_relation_merge(s = spark, project = project, ds = ds)
+    re.register_fun()
+    re.inc()
+    spark.stop()
+  }
+}
+
+