Parcourir la source

放开deleted限制

xufei il y a 4 ans
Parent
commit
407718c423

+ 94 - 15
src/main/scala/com/winhc/bigdata/spark/ng/relation/inc_company_relation_v2.scala

@@ -38,6 +38,8 @@ case class inc_company_relation_v2(s: SparkSession,
 
     def get_company_node(id: String, name: String, deleted: String, topic_type: String): String = CompanyRelationUtils.get_company_node(id, name, deleted, topic_type)
 
+    def get_person_node(id: String, name: String, deleted: String, topic_type: String): String = CompanyRelationUtils.get_person_node(id, name, deleted, topic_type)
+
     def get_relation_holder(start_id: String, start_name: String, end_id: String,
                             end_name: String, percent: Double, deleted: Int, holder_type: Int, topic_type: String): String =
       CompanyRelationUtils.get_relation_holder(start_id, start_name, end_id, end_name, percent, deleted, holder_type, topic_type)
@@ -52,6 +54,7 @@ case class inc_company_relation_v2(s: SparkSession,
 
 
     spark.udf.register("get_company_node", get_company_node _)
+    spark.udf.register("get_person_node", get_person_node _)
     spark.udf.register("get_relation_holder", get_relation_holder _)
     spark.udf.register("get_relation_staff", get_relation_staff _)
     spark.udf.register("get_relation_legal_entity", get_relation_legal_entity _)
@@ -71,16 +74,17 @@ case class inc_company_relation_v2(s: SparkSession,
          |            SELECT  company_id
          |                    ,name
          |            FROM    $inc_ads_company
-         |            WHERE   ds = '$ds' AND deleted <> 9
+         |            WHERE   ds = '$ds'
+         |            -- AND deleted <> 9
          |        ) a
          |JOIN (
          |              SELECT *
          |              FROM $ads_change_extract
-         |              WHERE   ds = '$ds' AND tn = 'company' AND update_type = 'insert'
+         |              WHERE   ds = '$ds' AND tn = 'company' AND change_fields is null
          |              UNION ALL
          |              SELECT *
          |              FROM $ads_change_extract
-         |              WHERE   ds = '$ds' AND tn = 'company' AND update_type <> 'insert' AND change_fields like '%name%'
+         |              WHERE   ds = '$ds' AND tn = 'company' AND change_fields like '%name%'
          |          ) b
          |ON      a.company_id = b.company_id
          |""".stripMargin)
@@ -104,16 +108,16 @@ case class inc_company_relation_v2(s: SparkSession,
          |            WHERE   ds = '$ds'
          |            AND     legal_entity_id IS NOT NULL
          |            AND     length(trim(legal_entity_name)) > 0
-         |            AND     deleted <> 9
+         |            -- AND     deleted <> 9
          |        ) c
          |JOIN (
          |              SELECT company_id
          |              FROM $ads_change_extract
-         |              WHERE   ds = '$ds' AND tn = 'company' AND update_type = 'insert'
+         |              WHERE   ds = '$ds' AND tn = 'company' AND change_fields is null
          |              UNION ALL
          |              SELECT company_id
          |              FROM $ads_change_extract
-         |              WHERE   ds = '$ds' AND tn = 'company' AND update_type <> 'insert' AND change_fields like '%legal_entity_name%'
+         |              WHERE   ds = '$ds' AND tn = 'company' AND change_fields like '%legal_entity_name%'
          |          ) d
          |ON      c.company_id = d.company_id
          |UNION ALL
@@ -136,7 +140,7 @@ case class inc_company_relation_v2(s: SparkSession,
          |            WHERE   ds < '$ds'
          |            AND     legal_entity_id IS NOT NULL
          |            AND     length(trim(legal_entity_name)) > 0
-         |            AND     deleted <> 9
+         |            -- AND     deleted <> 9
          |            union all
          |            SELECT  CONCAT_WS('_',company_id,hash(legal_entity_name)) AS rowkey
          |                    ,company_id
@@ -152,13 +156,13 @@ case class inc_company_relation_v2(s: SparkSession,
          |            WHERE   ds > '0'
          |            AND     legal_entity_id IS NOT NULL
          |            AND     length(trim(legal_entity_name)) > 0
-         |            AND     deleted <> 9
+         |            -- AND     deleted <> 9
          |          )
          |        ) c
          |JOIN (
          |              SELECT company_id
          |              FROM $ads_change_extract
-         |              WHERE   ds = '$ds' AND tn = 'company' AND update_type <> 'insert' AND change_fields like '%legal_entity_name%'
+         |              WHERE   ds = '$ds' AND tn = 'company' AND change_fields like '%legal_entity_name%'
          |          ) d
          |ON      c.company_id = d.company_id
          |WHERE c.num = 1
@@ -188,7 +192,7 @@ case class inc_company_relation_v2(s: SparkSession,
          | WHERE   ds > '0'
          | AND     legal_entity_id IS NOT NULL
          | AND     length(trim(legal_entity_name)) > 0
-         | AND     deleted <> 9
+         | -- AND     deleted <> 9
          | union all
          | SELECT  CONCAT_WS('_',company_id,hash(legal_entity_name)) AS rowkey
          |         ,company_id
@@ -206,7 +210,7 @@ case class inc_company_relation_v2(s: SparkSession,
          | WHERE   ds > '0'
          | AND     legal_entity_id IS NOT NULL
          | AND     length(trim(legal_entity_name)) > 0
-         | AND     deleted <> 9
+         | -- AND     deleted <> 9
          |)
          |)c
          |WHERE num = 1
@@ -215,7 +219,7 @@ case class inc_company_relation_v2(s: SparkSession,
     //新增人员关系表
     sql(
       s"""
-         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $inc_ads_company_human_relation PARTITION(ds= '$ds')
+         |INSERT INTO TABLE $inc_ads_company_human_relation PARTITION(ds= '$ds')
          |SELECT  CONCAT_WS('_',a.company_id,hash(a.human_name)) AS rowkey
          |        ,a.company_id
          |        ,a.company_name
@@ -259,7 +263,7 @@ case class inc_company_relation_v2(s: SparkSession,
          |              SELECT  company_id,human_name,human_pid
          |                                ,ROW_NUMBER() OVER (PARTITION BY company_id,human_name ORDER BY ds desc,update_time desc) num
          |               FROM    $inc_ads_company_human_relation
-         |               WHERE   ds < '$ds'
+         |               WHERE   ds > '0'
          |              )
          |    WHERE   num = 1
          |) b
@@ -400,6 +404,54 @@ case class inc_company_relation_v2(s: SparkSession,
          |AND length(start_name) <> 0  AND length(end_name) <> 0
          |""".stripMargin)
 
+    //二次修正human表,保证和增量人对齐
+    sql(
+      s"""
+         |INSERT OVERWRITE TABLE $inc_ads_company_human_relation PARTITION(ds='$ds')
+         |SELECT   a.rowkey
+         |        ,a.company_id
+         |        ,a.company_name
+         |        ,a.human_name
+         |        ,a.hid
+         |        ,a.human_pid
+         |        ,a.STATUS
+         |        ,a.create_time
+         |        ,a.update_time
+         |        ,a.deleted
+         |        ,a.province_code
+         |        ,a.city_code
+         |        ,a.county_code
+         |        ,a.reg_capital_amount
+         |        ,a.cate_first_code
+         |        ,a.cate_second_code
+         |        ,a.cate_third_code
+         |FROM    (
+         |            SELECT  *
+         |            FROM    $inc_ads_company_human_relation
+         |            WHERE   ds = '$ds'
+         |        ) a
+         |JOIN (
+         |              SELECT  start_id
+         |              FROM    (
+         |                          SELECT  start_id
+         |                          FROM    $inc_ads_relation_holder
+         |                          WHERE   ds = '$ds'
+         |                          AND     holder_type = 1
+         |                          UNION ALL
+         |                          SELECT  start_id
+         |                          FROM    $inc_ads_relation_staff
+         |                          WHERE   ds = '$ds'
+         |                          UNION ALL
+         |                          SELECT  start_id
+         |                          FROM    $inc_ads_relation_legal_entity
+         |                          WHERE   ds = '$ds'
+         |                          AND     legal_entity_type = 1
+         |                      )
+         |              GROUP BY start_id
+         |          ) b
+         |ON      a.human_pid = b.start_id
+         |""".stripMargin)
+
   }
 
   def sendKafkaPre(): Unit = {
@@ -409,6 +461,7 @@ case class inc_company_relation_v2(s: SparkSession,
     val inc_ads_relation_staff_kafka = "winhc_ng.inc_ads_relation_staff_kafka"
     val inc_ads_relation_legal_entity_v1_kafka = "winhc_ng.inc_ads_relation_legal_entity_v1_kafka"
     val inc_ads_relation_legal_entity_v2_kafka = "winhc_ng.inc_ads_relation_legal_entity_v2_kafka"
+    val inc_ads_person_node_label_kafka = "winhc_ng.inc_ads_person_node_label_kafka"
     //公司节点
     sql(
       s"""
@@ -479,6 +532,33 @@ case class inc_company_relation_v2(s: SparkSession,
          |where ds = '$ds'
          |""".stripMargin).show(20, false)
 
+    //新增人员打标签
+    //    sql(
+    //      s"""
+    //         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table $inc_ads_person_node_label_kafka  PARTITION (ds='$ds')
+    //         |select
+    //         |human_pid key,
+    //         |get_person_node(human_pid, human_name, deleted, '7') message
+    //         |from $inc_ads_company_human_relation
+    //         |where ds = '$ds'
+    //         |""".stripMargin).show(20, false)
+
+    //新增或者更新-投资人-打标签
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table $inc_ads_person_node_label_kafka  PARTITION (ds='$ds')
+         |select key,max(message) message
+         |from
+         |(
+         |  select
+         |  start_id key,
+         |  get_person_node(start_id, start_name, deleted, '7') message
+         |  from $inc_ads_relation_holder
+         |  where ds = '$ds' and holder_type = 1
+         |)
+         |group by key
+         |""".stripMargin).show(20, false)
+
     //防止空分区
     addEmptyPartitionOrSkip(inc_ads_company_node_kafka, ds)
     addEmptyPartitionOrSkip(inc_ads_relation_holder_v1_kafka, ds)
@@ -486,6 +566,7 @@ case class inc_company_relation_v2(s: SparkSession,
     addEmptyPartitionOrSkip(inc_ads_relation_staff_kafka, ds)
     addEmptyPartitionOrSkip(inc_ads_relation_legal_entity_v1_kafka, ds)
     addEmptyPartitionOrSkip(inc_ads_relation_legal_entity_v2_kafka, ds)
+    addEmptyPartitionOrSkip(inc_ads_person_node_label_kafka, ds)
 
   }
 
@@ -499,8 +580,6 @@ object inc_company_relation_v2 {
       sys.exit(-1)
     }
     val Array(project, ds) = args
-    //    val project = "winhc_ng"
-    //    val ds = "20210106"
     val config = EsConfig.getEsConfigMap ++ mutable.Map(
       "spark.hadoop.odps.project.name" -> project,
       "spark.debug.maxToStringFields" -> "200",

+ 5 - 0
src/main/scala/com/winhc/bigdata/spark/utils/CompanyRelationUtils.scala

@@ -10,6 +10,8 @@ import org.apache.commons.lang3.StringUtils
  */
 case class company_node(id: String, name: String, deleted: String, topic_type: String)
 
+case class person_node(id: String, name: String, deleted: String, topic_type: String)
+
 case class relation_holder(start_id: String, start_name: String, end_id: String,
                            end_name: String, percent: Double, deleted: Int, holder_type: Int, topic_type: String)
 
@@ -25,6 +27,9 @@ object CompanyRelationUtils {
   def get_company_node(id: String, name: String, deleted: String, topic_type: String): String =
     company_node(id, name, deleted, topic_type).toJson()
 
+  def get_person_node(id: String, name: String, deleted: String, topic_type: String): String =
+    person_node(id, name, deleted, topic_type).toJson()
+
   def get_relation_holder(start_id: String, start_name: String, end_id: String,
                           end_name: String, percent: Double, deleted: Int, holder_type: Int, topic_type: String): String =
     relation_holder(start_id, start_name, end_id, end_name, percent, deleted, holder_type, topic_type).toJson()