xufei il y a 4 ans
Parent
commit
0264d490b9

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/ng/change/NgChangeExtract.scala

@@ -178,7 +178,7 @@ object NgChangeExtract {
 
 
   private val startArgs = Seq(
-    Args(tableName = "company_holder", primaryFields = "amount,deleted")
+    Args(tableName = "company_holder", primaryFields = "percent,amount,deleted")
     , Args(tableName = "company_staff", primaryFields = "staff_type,deleted")
     , Args(tableName = "company", primaryKey = "company_id", primaryFields = "name,cate_third_code,county_code,reg_capital_amount,legal_entity_name,deleted")
     , Args(tableName = "company_tm", primaryFields = "status")

+ 31 - 88
src/main/scala/com/winhc/bigdata/spark/ng/relation/inc_company_relation_v2.scala

@@ -80,11 +80,8 @@ case class inc_company_relation_v2(s: SparkSession,
          |JOIN (
          |              SELECT *
          |              FROM $ads_change_extract
-         |              WHERE   ds = '$ds' AND tn = 'company' AND change_fields is null
-         |              UNION ALL
-         |              SELECT *
-         |              FROM $ads_change_extract
-         |              WHERE   ds = '$ds' AND tn = 'company' AND change_fields like '%name%'
+         |              WHERE   ds = '$ds' AND tn = 'company'
+         |              AND (change_fields is null OR change_fields like '%name%')
          |          ) b
          |ON      a.company_id = b.company_id
          |""".stripMargin)
@@ -93,79 +90,33 @@ case class inc_company_relation_v2(s: SparkSession,
     sql(
       s"""
          |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $inc_ads_company_legal_entity PARTITION(ds= '$ds')
-         |SELECT  c.rowkey,c.company_id,c.company_name,c.legal_entity_id,c.legal_entity_name,c.legal_entity_type,c.create_time,c.update_time,c.deleted
-         |FROM    (
-         |            SELECT  CONCAT_WS('_',company_id,hash(legal_entity_name)) AS rowkey
-         |                    ,company_id
-         |                    ,name AS company_name
-         |                    ,legal_entity_id
-         |                    ,legal_entity_name
-         |                    ,legal_entity_type
-         |                    ,create_time
-         |                    ,update_time
-         |                    ,0 AS deleted
-         |            FROM    $inc_ads_company
-         |            WHERE   ds = '$ds'
-         |            AND     legal_entity_id IS NOT NULL
-         |            AND     length(trim(legal_entity_name)) > 0
-         |            -- AND     deleted <> 9
-         |        ) c
-         |JOIN (
-         |              SELECT company_id
-         |              FROM $ads_change_extract
-         |              WHERE   ds = '$ds' AND tn = 'company' AND change_fields is null
-         |              UNION ALL
-         |              SELECT company_id
-         |              FROM $ads_change_extract
-         |              WHERE   ds = '$ds' AND tn = 'company' AND change_fields like '%legal_entity_name%'
-         |          ) d
-         |ON      c.company_id = d.company_id
+         |SELECT  CONCAT_WS('_',company_id,hash(new_data['legal_entity_name'])) AS rowkey
+         |        ,company_id
+         |        ,new_data['name'] AS company_name
+         |        ,new_data['legal_entity_id'] AS legal_entity_id
+         |        ,new_data['legal_entity_name'] AS legal_entity_name
+         |        ,new_data['legal_entity_type'] AS legal_entity_type
+         |        ,new_data['create_time'] AS create_time
+         |        ,new_data['update_time'] AS update_time
+         |        ,new_data['deleted'] AS deleted
+         |FROM    $ads_change_extract
+         |WHERE   ds = '$ds' AND     tn = 'company'
+         |AND     (change_fields LIKE '%legal_entity_name%' or change_fields is null or change_fields LIKE '%deleted%')
+         |AND     length(trim(new_data['legal_entity_name'])) > 0
          |UNION ALL
-         |SELECT  c.rowkey,c.company_id,c.company_name,c.legal_entity_id,c.legal_entity_name,c.legal_entity_type,c.create_time,c.update_time,c.deleted
-         |FROM    (
-         |         SELECT *
-         |          ,ROW_NUMBER() OVER(PARTITION BY company_id ORDER BY ds DESC,update_time DESC) num
-         |         FROM (
-         |             SELECT  CONCAT_WS('_',company_id,hash(legal_entity_name)) AS rowkey
-         |                    ,company_id
-         |                    ,name AS company_name
-         |                    ,legal_entity_id
-         |                    ,legal_entity_name
-         |                    ,legal_entity_type
-         |                    ,create_time
-         |                    ,update_time
-         |                    ,1 AS deleted
-         |                    ,ds
-         |            FROM    $inc_ads_company
-         |            WHERE   ds < '$ds'
-         |            AND     legal_entity_id IS NOT NULL
-         |            AND     length(trim(legal_entity_name)) > 0
-         |            -- AND     deleted <> 9
-         |            union all
-         |            SELECT  CONCAT_WS('_',company_id,hash(legal_entity_name)) AS rowkey
-         |                    ,company_id
-         |                    ,name AS company_name
-         |                    ,legal_entity_id
-         |                    ,legal_entity_name
-         |                    ,legal_entity_type
-         |                    ,create_time
-         |                    ,update_time
-         |                    ,1 AS deleted
-         |                    ,ds
-         |            FROM    $ads_company
-         |            WHERE   ds > '0'
-         |            AND     legal_entity_id IS NOT NULL
-         |            AND     length(trim(legal_entity_name)) > 0
-         |            -- AND     deleted <> 9
-         |          )
-         |        ) c
-         |JOIN (
-         |              SELECT company_id
-         |              FROM $ads_change_extract
-         |              WHERE   ds = '$ds' AND tn = 'company' AND change_fields like '%legal_entity_name%'
-         |          ) d
-         |ON      c.company_id = d.company_id
-         |WHERE c.num = 1
+         |SELECT  CONCAT_WS('_',company_id,hash(old_data['legal_entity_name'])) AS rowkey
+         |        ,company_id
+         |        ,old_data['name'] AS company_name
+         |        ,old_data['legal_entity_id'] AS legal_entity_id
+         |        ,old_data['legal_entity_name'] AS legal_entity_name
+         |        ,old_data['legal_entity_type'] AS legal_entity_type
+         |        ,old_data['create_time'] AS create_time
+         |        ,new_data['update_time'] AS update_time
+         |        ,1 AS deleted
+         |FROM    $ads_change_extract
+         |WHERE   ds = '$ds' AND     tn = 'company'
+         |AND     change_fields LIKE '%legal_entity_name%'
+         |AND     length(trim(old_data['legal_entity_name'])) > 0
          |""".stripMargin)
 
     //全量公司
@@ -314,6 +265,7 @@ case class inc_company_relation_v2(s: SparkSession,
          |    SELECT rowkey
          |    FROM $ads_change_extract
          |    WHERE   ds = '$ds' AND tn = 'company_holder'
+         |    AND    (change_fields LIKE '%percent%' OR change_fields is null OR change_fields LIKE '%deleted%')
          |   ) b
          |ON    a.rowkey = b.rowkey
          |UNION ALL
@@ -329,6 +281,7 @@ case class inc_company_relation_v2(s: SparkSession,
          |    SELECT rowkey
          |    FROM $ads_change_extract
          |    WHERE   ds = '$ds' AND tn = 'company_holder'
+         |    AND    (change_fields LIKE '%percent%' OR change_fields is null OR change_fields LIKE '%deleted%')
          |   ) b
          |ON    a.rowkey = b.rowkey
          |)
@@ -361,6 +314,7 @@ case class inc_company_relation_v2(s: SparkSession,
          |    SELECT rowkey
          |    FROM $ads_change_extract
          |    WHERE   ds = '$ds' AND tn = 'company_staff'
+         |    AND    (change_fields LIKE '%staff_type%' OR change_fields is null OR change_fields LIKE '%deleted%')
          |   ) b
          |ON    a.rowkey = b.rowkey
          |)
@@ -532,17 +486,6 @@ case class inc_company_relation_v2(s: SparkSession,
          |where ds = '$ds'
          |""".stripMargin).show(20, false)
 
-    //新增人员打标签
-    //    sql(
-    //      s"""
-    //         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table $inc_ads_person_node_label_kafka  PARTITION (ds='$ds')
-    //         |select
-    //         |human_pid key,
-    //         |get_person_node(human_pid, human_name, deleted, '7') message
-    //         |from $inc_ads_company_human_relation
-    //         |where ds = '$ds'
-    //         |""".stripMargin).show(20, false)
-
     //新增或者更新-投资人-打标签
     sql(
       s"""