فهرست منبع

工商id融入

xufei 3 سال پیش
والد
کامیت
fb42e2c71e

+ 464 - 0
src/main/scala/com/winhc/bigdata/spark/ng/relation/inc_company_relation_v3.scala

@@ -0,0 +1,464 @@
+package com.winhc.bigdata.spark.ng.relation
+
+import com.winhc.bigdata.spark.config.EsConfig
+import com.winhc.bigdata.spark.udf.{BaseFunc, CompanyMapping}
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.{CompanyRelationUtils, LoggingUtils, SparkUtils}
+import org.apache.spark.sql.SparkSession
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @author: π
+ * @date: 2021/07/27 09:36
+ * @desc: 人员id
+ */
+case class inc_company_relation_v3(s: SparkSession,
+                                   project: String, //表所在工程名
+                                   ds: String //分区
+                                  ) extends LoggingUtils with BaseFunc with CompanyMapping {
+  @(transient@getter) val spark: SparkSession = s
+
+  //val pro = "winhc_eci_dev"
+  val pro = "winhc_ng"
+
+  val inc_ads_company = "winhc_ng.inc_ads_company"
+  val ads_company = "winhc_ng.ads_company"
+  val inc_ads_company_legal_entity = s"${pro}.inc_ads_company_legal_entity"
+  val ads_company_holder = "winhc_ng.ads_company_holder"
+  val inc_ads_company_holder = "winhc_ng.inc_ads_company_holder"
+  //todo 测试table
+  //val inc_ads_company_human_relation = s"${pro}.inc_ads_company_human_relation_dev"
+  val inc_ads_company_human_relation = s"${pro}.inc_ads_company_human_relation"
+  val inc_ads_company_human_relation_merge = "winhc_ng.inc_ads_company_human_relation_merge"
+  val inc_ads_company_human_relation_deleted = "winhc_ng.inc_ads_company_human_relation_deleted"
+  val inc_ads_company_human_relation_update = "winhc_ng.inc_ads_company_human_relation_update"
+  val inc_ads_company_staff = "winhc_ng.inc_ads_company_staff"
+  val ads_company_staff = "winhc_ng.ads_company_staff"
+
+  val ads_change_extract = "winhc_ng.bds_change_extract"
+
+  val inc_ads_company_node = s"${pro}.inc_ads_company_node"
+  val inc_ads_relation_holder = s"${pro}.inc_ads_relation_holder"
+  val inc_ads_relation_staff = s"${pro}.inc_ads_relation_staff"
+  val inc_ads_relation_legal_entity = s"${pro}.inc_ads_relation_legal_entity"
+
+  //val change_field = " change_fields LIKE '%update_time%' OR "
+  val change_field = ""
+
+  def register_fun(): Unit = {
+    prepareFunctions(spark)
+
+    def get_company_node(id: String, name: String, deleted: String, topic_type: String): String = CompanyRelationUtils.get_company_node(id, name, deleted, topic_type)
+
+    def get_person_node(id: String, name: String, deleted: String, topic_type: String): String = CompanyRelationUtils.get_person_node(id, name, deleted, topic_type)
+
+    def get_relation_holder(start_id: String, start_name: String, end_id: String,
+                            end_name: String, percent: Double, deleted: Int, holder_type: Int, topic_type: String): String =
+      CompanyRelationUtils.get_relation_holder(start_id, start_name, end_id, end_name, percent.toString, deleted.toString, holder_type.toString, topic_type)
+
+    def get_relation_staff(start_id: String, start_name: String, end_id: String,
+                           end_name: String, staff_type: String, deleted: Int, topic_type: String): String =
+      CompanyRelationUtils.get_relation_staff(start_id, start_name, end_id, end_name, staff_type, deleted.toString, topic_type)
+
+    def get_relation_legal_entity(start_id: String, start_name: String, end_id: String,
+                                  end_name: String, deleted: Int, legal_entity_type: String, topic_type: String): String =
+      CompanyRelationUtils.get_relation_legal_entity(start_id, start_name, end_id, end_name, deleted.toString, legal_entity_type, topic_type)
+
+    def get_success_status(ds: String, status: String, topic_type: String): String = CompanyRelationUtils.get_success_status(ds, status, topic_type)
+
+    spark.udf.register("get_company_node", get_company_node _)
+    spark.udf.register("get_person_node", get_person_node _)
+    spark.udf.register("get_relation_holder", get_relation_holder _)
+    spark.udf.register("get_relation_staff", get_relation_staff _)
+    spark.udf.register("get_relation_legal_entity", get_relation_legal_entity _)
+    spark.udf.register("get_success_status", get_success_status _)
+  }
+
+  def inc(): Unit = {
+
+    //公司节点
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $inc_ads_company_node PARTITION(ds = '$ds')
+         |SELECT  a.company_id id
+         |        ,name_cleanup(a.name) name
+         |        ,'企业'AS LABEL
+         |        ,'0' deleted
+         |FROM    (
+         |            SELECT  company_id
+         |                    ,name
+         |            FROM    $inc_ads_company
+         |            WHERE   ds = '$ds'
+         |            -- AND deleted <> 9
+         |        ) a
+         |JOIN (
+         |              SELECT *
+         |              FROM $ads_change_extract
+         |              WHERE   ds = '$ds' AND tn = 'company'
+         |              AND (change_fields is null OR change_fields like '%name%')
+         |          ) b
+         |ON      a.company_id = b.company_id
+         |""".stripMargin).show(100, false)
+
+    //增量法人表(新增,移除法人)
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $inc_ads_company_legal_entity PARTITION(ds= '$ds')
+         |SELECT  CONCAT_WS('_',company_id,hash(new_data['legal_entity_name'])) AS rowkey
+         |        ,company_id
+         |        ,new_data['name'] AS company_name
+         |        ,new_data['legal_entity_id'] AS legal_entity_id
+         |        ,new_data['legal_entity_name'] AS legal_entity_name
+         |        ,new_data['legal_entity_type'] AS legal_entity_type
+         |        ,new_data['create_time'] AS create_time
+         |        ,new_data['update_time'] AS update_time
+         |        ,new_data['deleted'] AS deleted
+         |FROM    $ads_change_extract
+         |WHERE   ds = '$ds' AND     tn = 'company'
+         |AND     ($change_field change_fields LIKE '%legal_entity_name%' or change_fields is null or change_fields LIKE '%deleted%')
+         |AND     length(trim(new_data['legal_entity_name'])) > 0
+         |UNION ALL
+         |SELECT  CONCAT_WS('_',company_id,hash(old_data['legal_entity_name'])) AS rowkey
+         |        ,company_id
+         |        ,old_data['name'] AS company_name
+         |        ,old_data['legal_entity_id'] AS legal_entity_id
+         |        ,old_data['legal_entity_name'] AS legal_entity_name
+         |        ,old_data['legal_entity_type'] AS legal_entity_type
+         |        ,old_data['create_time'] AS create_time
+         |        ,new_data['update_time'] AS update_time
+         |        ,1 AS deleted
+         |FROM    $ads_change_extract
+         |WHERE   ds = '$ds' AND     tn = 'company'
+         |AND     change_fields LIKE '%legal_entity_name%'
+         |AND     length(trim(old_data['legal_entity_name'])) > 0
+         |""".stripMargin).show(100, false)
+
+    //todo 新增人员逻辑剔除
+    sql(
+      s"""
+         |SELECT *
+         |from (
+         |SELECT *
+         |,ROW_NUMBER() OVER(PARTITION BY company_id ORDER BY ds DESC,update_time DESC) num
+         |FROM (
+         |  SELECT  CONCAT_WS('_',company_id,hash(legal_entity_name)) AS rowkey
+         |         ,company_id
+         |         ,name AS company_name
+         |         ,legal_entity_id
+         |         ,legal_entity_name
+         |         ,legal_entity_type
+         |         ,create_time
+         |         ,update_time
+         |         ,province_code,city_code,county_code
+         |         ,reg_capital_amount,cate_first_code
+         |         ,cate_second_code,cate_third_code
+         |         ,ds
+         | FROM    $inc_ads_company
+         | WHERE   ds > '0'
+         | -- AND     deleted <> 9
+         | union all
+         | SELECT   CONCAT_WS('_',company_id,hash(legal_entity_name)) AS rowkey
+         |         ,company_id
+         |         ,name AS company_name
+         |         ,legal_entity_id
+         |         ,legal_entity_name
+         |         ,legal_entity_type
+         |         ,create_time
+         |         ,update_time
+         |         ,province_code,city_code,county_code
+         |         ,reg_capital_amount,cate_first_code
+         |         ,cate_second_code,cate_third_code
+         |         ,ds
+         | FROM    $ads_company
+         | WHERE   ds > '0'
+         | -- AND     deleted <> 9
+         |)
+         |)c
+         |WHERE num = 1
+         |""".stripMargin).createOrReplaceTempView("company_view_all")
+
+    //新增人员关系表(只更新新增人员)
+    //TODO 过滤 deleted = 1
+    sql(
+      s"""
+         |INSERT INTO TABLE $inc_ads_company_human_relation PARTITION(ds= '$ds')
+         |SELECT  md5(CONCAT_WS('_',a.company_id,a.human_pid)) as rowkey
+         |        ,a.company_id
+         |        ,a.company_name
+         |        ,a.human_name
+         |        ,hash(a.human_name) AS hid
+         |        ,a.human_pid
+         |        ,a.STATUS
+         |        ,a.create_time
+         |        ,a.update_time
+         |        ,a.deleted
+         |        ,c.province_code,c.city_code,c.county_code
+         |        ,c.reg_capital_amount,c.cate_first_code
+         |        ,c.cate_second_code,c.cate_third_code
+         |FROM    (
+         |            SELECT  *
+         |            FROM    (
+         |                        SELECT  company_id,company_name,name_cleanup(human_name) human_name,hid,status,create_time,update_time,0 as deleted,human_pid
+         |                                ,ROW_NUMBER() OVER (PARTITION BY company_id,human_name ORDER BY ds desc,update_time desc) num
+         |                        FROM    (
+         |                                    --股东
+         |                                    SELECT  company_id,company_name,holder_name human_name,holder_id hid,2 as status,create_time,update_time,0 as deleted,ds,holder_id as human_pid
+         |                                    FROM    $inc_ads_company_holder
+         |                                    WHERE   ds = '$ds' AND holder_type = 1
+         |                                    UNION ALL
+         |                                    --主要成员
+         |                                    SELECT  company_id,company_name,staff_name human_name,hid,2 as status,create_time,update_time,0 as deleted,ds,hid as human_pid
+         |                                    FROM    $inc_ads_company_staff
+         |                                    WHERE   ds = '$ds'
+         |                                    UNION ALL
+         |                                    --法人
+         |                                    SELECT  company_id,company_name,legal_entity_name human_name,legal_entity_id as hid,2 as status,create_time,update_time,0 as deleted,ds,legal_entity_id as human_pid
+         |                                    FROM    $inc_ads_company_legal_entity
+         |                                    WHERE   ds = '$ds' AND legal_entity_type = 1
+         |                                )
+         |                    )
+         |            WHERE   num = 1 AND name_cleanup(human_name) <> '' AND length(company_id) >= 32 AND length(human_pid) = 33
+         |        ) a
+         |JOIN (
+         |SELECT * FROM company_view_all
+         |) c
+         |ON      a.company_id = c.company_id
+         |""".stripMargin).show(100, false)
+
+    //股东关系
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"}  table $inc_ads_relation_holder  PARTITION (ds= '$ds')
+         |SELECT
+         |start_id,start_name,end_id,end_name,percent,deleted,holder_type,label
+         |FROM (
+         |SELECT coalesce(holder_id,'') start_id,name_cleanup(holder_name) start_name,a.company_id end_id,name_cleanup(a.company_name) end_name,percent,deleted,holder_type,label
+         |from (
+         |SELECT * ,
+         |    '投资' as LABEL
+         | FROM    $inc_ads_company_holder
+         | WHERE   ds = '$ds'
+         |and holder_type = 1
+         |)a
+         |JOIN
+         |  (
+         |     SELECT rowkey
+         |     FROM (
+         |         SELECT rowkey
+         |         FROM $ads_change_extract
+         |         WHERE   ds = '$ds' AND tn = 'company_holder'
+         |         AND    ($change_field change_fields LIKE '%percent%' OR change_fields is null OR change_fields LIKE '%deleted%')
+         |        )
+         |     GROUP BY rowkey
+         |   ) b
+         |ON    a.rowkey = b.rowkey
+         |UNION ALL
+         |SELECT holder_id start_id,name_cleanup(holder_name) start_name,a.company_id end_id,name_cleanup(company_name) end_name,percent,deleted,holder_type,label
+         |from (
+         |SELECT * ,
+         |    '投资' as LABEL
+         | FROM    $inc_ads_company_holder
+         |WHERE   ds = '$ds'
+         |and holder_type = 2
+         |)a JOIN
+         |  (
+         |    SELECT rowkey
+         |    FROM $ads_change_extract
+         |    WHERE   ds = '$ds' AND tn = 'company_holder'
+         |    AND    ($change_field change_fields LIKE '%percent%' OR change_fields is null OR change_fields LIKE '%deleted%' OR change_fields LIKE '%holder_id%')
+         |   ) b
+         |ON    a.rowkey = b.rowkey
+         |)
+         |WHERE start_id <> end_id
+         |AND length(start_id) >= 32  AND length(end_id) >= 32
+         |AND length(start_name) <> 0  AND length(end_name) <> 0
+         |""".stripMargin).show(100, false)
+
+    //主要成员关系
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"}  table $inc_ads_relation_staff  PARTITION (ds= '$ds')
+         |SELECT
+         |start_id,start_name,end_id,end_name,staff_type,deleted,label
+         |FROM (
+         |SELECT hid start_id,name_cleanup(staff_name) start_name,a.company_id end_id,name_cleanup(a.company_name) end_name,name_cleanup(staff_type) staff_type,deleted,label
+         |from (
+         |SELECT * ,
+         |    '高管' as LABEL
+         | FROM    $inc_ads_company_staff
+         |WHERE   ds = '$ds'
+         |)a
+         |JOIN
+         |  (
+         |    SELECT rowkey
+         |     FROM (
+         |       SELECT rowkey
+         |         FROM $ads_change_extract
+         |         WHERE   ds = '$ds' AND tn = 'company_staff'
+         |       AND    ($change_field change_fields LIKE '%staff_type%' OR change_fields is null OR change_fields LIKE '%deleted%')
+         |       )
+         |       GROUP BY rowkey
+         |   ) b
+         |ON    a.rowkey = b.rowkey
+         |)
+         |WHERE start_id <> end_id
+         |AND length(start_id) >= 32  AND length(end_id) >= 32
+         |AND length(start_name) <> 0  AND length(end_name) <> 0
+         |""".stripMargin).show(100, false)
+
+    //法人关系
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table $inc_ads_relation_legal_entity   PARTITION (ds='$ds')
+         |SELECT
+         |start_id,start_name,end_id,end_name,deleted,legal_entity_type,label
+         |FROM (
+         |SELECT legal_entity_id start_id,name_cleanup(legal_entity_name) start_name,a.company_id end_id,name_cleanup(a.company_name) end_name,deleted,legal_entity_type,label
+         |from (
+         |SELECT * ,
+         |    '法人' as LABEL
+         | FROM    $inc_ads_company_legal_entity
+         |WHERE   ds = '$ds'
+         |and legal_entity_type = 1
+         |)a
+         |UNION ALL
+         |SELECT legal_entity_id start_id,name_cleanup(legal_entity_name) start_name,a.company_id end_id,name_cleanup(company_name) end_name,deleted,legal_entity_type,label
+         |from (
+         |SELECT * ,
+         |    '法人' as LABEL
+         | FROM    $inc_ads_company_legal_entity
+         |WHERE   ds = '$ds'
+         |and legal_entity_type = 2
+         |)a
+         |)
+         |WHERE start_id <> end_id
+         |AND length(start_id) >= 32  AND length(end_id) >= 32
+         |AND length(start_name) <> 0  AND length(end_name) <> 0
+         |""".stripMargin).show(100, false)
+
+  }
+
+  def sendKafkaPre(): Unit = {
+    val inc_ads_company_node_kafka = "winhc_ng.inc_ads_company_node_kafka"
+    val inc_ads_relation_holder_v1_kafka = "winhc_ng.inc_ads_relation_holder_v1_kafka"
+    val inc_ads_relation_holder_v2_kafka = "winhc_ng.inc_ads_relation_holder_v2_kafka"
+    val inc_ads_relation_staff_kafka = "winhc_ng.inc_ads_relation_staff_kafka"
+    val inc_ads_relation_legal_entity_v1_kafka = "winhc_ng.inc_ads_relation_legal_entity_v1_kafka"
+    val inc_ads_relation_legal_entity_v2_kafka = "winhc_ng.inc_ads_relation_legal_entity_v2_kafka"
+    val inc_ads_node_relation_success_status = "winhc_ng.inc_ads_node_relation_success_status"
+    //公司节点
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table $inc_ads_company_node_kafka  PARTITION (ds='$ds')
+         |select
+         |id key,
+         |get_company_node(id, name, deleted, '1') message
+         |from $inc_ads_company_node
+         |where ds = '$ds'
+         |""".stripMargin).show(20, false)
+
+    //股东关系(人 -> 公司)
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table $inc_ads_relation_holder_v1_kafka  PARTITION (ds='$ds')
+         |select
+         |end_id key,
+         |get_relation_holder(start_id, start_name, end_id, end_name, percent, deleted, holder_type, '2') message
+         |from $inc_ads_relation_holder
+         |where ds = '$ds'
+         |and holder_type = 1
+         |""".stripMargin).show(20, false)
+
+    //股东关系(公司 -> 公司)
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table $inc_ads_relation_holder_v2_kafka  PARTITION (ds='$ds')
+         |select
+         |end_id key,
+         |get_relation_holder(start_id, start_name, end_id, end_name, percent, deleted, holder_type, '3') message
+         |from $inc_ads_relation_holder
+         |where ds = '$ds'
+         |and holder_type = 2
+         |""".stripMargin).show(20, false)
+
+    //法人关系(人 -> 公司)
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table $inc_ads_relation_legal_entity_v1_kafka  PARTITION (ds='$ds')
+         |select
+         |end_id key,
+         |get_relation_legal_entity(start_id, start_name, end_id, end_name, deleted, legal_entity_type, '4') message
+         |from $inc_ads_relation_legal_entity
+         |where ds = '$ds'
+         |and legal_entity_type = '1'
+         |""".stripMargin).show(20, false)
+
+    //法人关系(公司 -> 公司)
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table $inc_ads_relation_legal_entity_v2_kafka  PARTITION (ds='$ds')
+         |select
+         |end_id key,
+         |get_relation_legal_entity(start_id, start_name, end_id, end_name, deleted, legal_entity_type, '5') message
+         |from $inc_ads_relation_legal_entity
+         |where ds = '$ds'
+         |and legal_entity_type = '2'
+         |""".stripMargin).show(20, false)
+
+    //主要成员关系(人 -> 公司)
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table $inc_ads_relation_staff_kafka  PARTITION (ds='$ds')
+         |select
+         |end_id key,
+         |get_relation_staff(start_id, start_name, end_id, end_name, staff_type, deleted, '6') message
+         |from $inc_ads_relation_staff
+         |where ds = '$ds'
+         |""".stripMargin).show(20, false)
+
+    //发送成功状态标识
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table $inc_ads_node_relation_success_status  PARTITION (ds='$ds')
+         |select
+         |$ds key,
+         |get_success_status($ds, '1', '100') message
+         |""".stripMargin).show(20, false)
+
+    //防止空分区
+    addEmptyPartitionOrSkip(inc_ads_company_node_kafka, ds)
+    addEmptyPartitionOrSkip(inc_ads_relation_holder_v1_kafka, ds)
+    addEmptyPartitionOrSkip(inc_ads_relation_holder_v2_kafka, ds)
+    addEmptyPartitionOrSkip(inc_ads_relation_staff_kafka, ds)
+    addEmptyPartitionOrSkip(inc_ads_relation_legal_entity_v1_kafka, ds)
+    addEmptyPartitionOrSkip(inc_ads_relation_legal_entity_v2_kafka, ds)
+    addEmptyPartitionOrSkip(inc_ads_node_relation_success_status, ds)
+
+  }
+
+}
+
+
+object inc_company_relation_v3 {
+  def main(args: Array[String]): Unit = {
+    if (args.size != 2) {
+      println("please set project ds.")
+      sys.exit(-1)
+    }
+    val Array(project, ds) = args
+    val config = EsConfig.getEsConfigMap ++ mutable.Map(
+      "spark.hadoop.odps.project.name" -> project,
+      "spark.debug.maxToStringFields" -> "200",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "10000"
+    )
+    val spark = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    val re = inc_company_relation_v3(s = spark, project = project, ds = ds)
+    re.register_fun()
+    re.inc()
+    re.sendKafkaPre()
+    spark.stop()
+  }
+}
+

+ 42 - 5
src/main/scala/com/winhc/bigdata/spark/ng/relation/lookup_tab_pid.scala

@@ -20,6 +20,8 @@ case class args_job(tableName: String
                     , rowkey: String = "rowkey" //主键rowkey
                     , explode: String = "" // 炸开语句
                     , keyno: String = "" // id字段
+                    , other_args: Seq[String] = Seq.empty //补充字段
+                    , tar_tab_flag: String = "0" //表类型
                    )
 
 object args_job {
@@ -67,6 +69,25 @@ object args_job {
       , explode = "LATERAL VIEW explode(jsonall_2_array('$.litigant_id', concat_ws('\u0001', defendant_info, plaintiff_info, litigant_info)) ) key AS key_no"
       , keyno = "key_no"
     )
+
+    , args_job(tableName = "company"
+      , rowkey = "company_id"
+      , keyno = "legal_entity_id"
+      , tar_tab_flag = "1"
+      , other_args = Seq("company_id")
+    )
+
+    , args_job(tableName = "company_staff"
+      , keyno = "hid"
+      , tar_tab_flag = "1"
+      , other_args = Seq("company_id")
+    )
+
+    , args_job(tableName = "company_holder"
+      , keyno = "holder_id"
+      , tar_tab_flag = "1"
+      , other_args = Seq("company_id")
+    )
   )
 
   def get_args_company_job(tn: String): args_job = {
@@ -87,11 +108,19 @@ case class lookup_tab_pid(s: SparkSession
   val keyno: String = args_job.keyno
 
   val tab_back_deleted = s" $project.inc_ads_company_human_relation_back_deleted"
-  val tar_tab = s"$project.tmp_xf_person_id_rowkey"
+  var tar_tab = ""
+  args_job.tar_tab_flag match {
+    case "0" => tar_tab = s"$project.tmp_xf_person_id_rowkey"
+    case "1" => tar_tab = s"$project.inc_ads_person_id_rowkey"
+    case _ => tar_tab = s"$project.tmp_xf_person_id_rowkey"
+  }
 
-  var tar_cols: Seq[String] = getColumns(tar_tab).diff(Seq("ds", "tn", "message"))
+  var tar_cols: Seq[String] = (getColumns(tar_tab).diff(Seq("ds", "tn", "message")) ++ args_job.other_args).distinct
   val sort: String = get_partition_order_by()
-  val inter_cols: Seq[String] = getColumns(ads_tab).intersect(getColumns(inc_ads_tab))
+  var inter_cols: Seq[String] = getColumns(ads_tab).intersect(getColumns(inc_ads_tab))
+  if (!inter_cols.contains("rowkey")) {
+    inter_cols ++= Seq(s"${rowkey} as rowkey")
+  }
 
   val ds: String = {
     var par = BaseUtil.getPartion(tar_tab, tn, spark)
@@ -183,10 +212,11 @@ case class lookup_tab_pid(s: SparkSession
       s"""
          |INSERT OVERWRITE TABLE $tar_tab PARTITION(ds='$lastDs',tn='$tn')
          |SELECT  ${tar_cols.mkString(",")},
-         |        get_table_message(${tar_cols.mkString(",")}, '$tn') message
+         |        -- get_table_message(${tar_cols.mkString(",")}, '$tn') message
+         |        ${to_json(tar_cols ++ Seq("tn"))}
          |FROM    mapping a
          |JOIN    (
-         |            SELECT  ${rowkey},$keyno
+         |            SELECT  *,'$tn' as tn
          |            FROM  tab_tmp
          |            ${args_job.explode}
          |        ) b
@@ -200,6 +230,13 @@ case class lookup_tab_pid(s: SparkSession
 
   }
 
+  private def to_json(seq:Seq[String]): String = {
+    val r1 = seq.map(x => {
+      s"'$x',$x"
+    }).mkString(",")
+   s"to_json(map($r1)) message"
+  }
+
   private def get_partition_order_by(): String = {
     if (ads_tab.contains("update_time")) {
       " ds DESC,update_time DESC "

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/utils/BaseUtil.scala

@@ -279,6 +279,8 @@ object BaseUtil {
         return "民事一审"
       } else if (StrUtil.containsAny(caseNo, "执恢")) {
         return "恢复执行"
+      } else if (StrUtil.containsAny(caseNo, "执异")) {
+        return "执行异议"
       } else if (StrUtil.containsAny(caseNo, "执")) {
         return "首次执行"
       } else if (StrUtil.containsAny(caseNo, "刑初")) {
@@ -287,8 +289,6 @@ object BaseUtil {
         return "民事二审"
       } else if (StrUtil.containsAny(caseNo, "特")) {
         return "特别程序"
-      } else if (StrUtil.containsAny(caseNo, "执异")) {
-        return "执行异议"
       } else if (StrUtil.containsAny(caseNo, "民申")) {
         return "民事申请再审审查"
       } else if (StrUtil.containsAny(caseNo, "刑终")) {