Pārlūkot izejas kodu

feat: 司法案件失信人处理

- 失信人关于人部分数据处理
- 失信人输出到下司法案件下游
许家凯 4 gadi atpakaļ
vecāks
revīzija
86f657fc88

+ 38 - 17
src/main/scala/com/winhc/bigdata/spark/jobs/human/company_judicial_human.scala

@@ -34,15 +34,26 @@ case class company_judicial_human(s: SparkSession,
 
   }
 
-  def etl(tableName: String, dupliCols: Seq[String], is_inc: Boolean = true): Unit = {
+  def etl(tableName: String, dupliCols: Seq[String], case_no_fields: Seq[String], id_card_field: String = null, is_inc: Boolean = true): Unit = {
+    case_no_trim_udf()
     cleanup()
     is_id_card()
+    id_card_trim_udf()
     init(tableName)
     val ods_tableName = s"$project.ods_$tableName"
     val inc_ods_tableName = s"$project.inc_ods_$tableName"
-    val target_table = s"$project.ads_${tableName}_human"
+    val case_no_f_set = case_no_fields.toSet
+    val target_table = s"$project.inc_ads_${tableName}_human"
     val targetCols = getColumns(target_table).filter(f => {
       !f.equals("ds")
+    }).map(f => {
+      if (case_no_f_set.contains(f)) {
+        s"case_no_trim($f) as $f"
+      } else if (id_card_field.equals(f)) {
+        s"id_card_trim($f) as $f"
+      }
+      else
+        f
     })
     val intersectCols = getColumns(ods_tableName).toSet & getColumns(inc_ods_tableName).toSet
 
@@ -50,13 +61,18 @@ case class company_judicial_human(s: SparkSession,
       f.equals("cid") || f.equals("cids")
     }).max
 
+    val case_no_f = case_no_fields.map(f => {
+      s"AND case_no_trim($f) IS NOT NULL "
+    }).mkString(" ")
+    val id_card_f = if (id_card_field == null) "" else s" AND is_id_card($id_card_field)"
+
     def all(): Unit = {
-      val inc_last_ds = getLastPartitionsOrElse(inc_ods_tableName, "0")
       val last_ds = getLastPartitionsOrElse(ods_tableName, "0")
+      val ads_target_table = s"$project.ads_${tableName}_human"
 
       sql(
         s"""
-           |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $target_table PARTITION(ds='$inc_last_ds')
+           |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_target_table PARTITION(ds='$last_ds')
            |SELECT  ${targetCols.mkString(",")}
            |FROM    (
            |            SELECT  *
@@ -68,15 +84,8 @@ case class company_judicial_human(s: SparkSession,
            |                        WHERE   ds = '$last_ds'
            |                        AND     $cid_name IS NULL
            |                        AND     trim(cleanup(concat(${dupliCols.mkString(",")}))) <> ''
-           |                        AND     is_id_card(card_num)
-           |                        UNION ALL
-           |                        SELECT  MD5(cleanup(concat(${dupliCols.mkString(",")}))) AS rowkey
-           |                                ,${intersectCols.mkString(",")}
-           |                        FROM    $inc_ods_tableName
-           |                        WHERE   ds > '$last_ds'
-           |                        AND     $cid_name IS NULL
-           |                        AND     trim(cleanup(concat(${dupliCols.mkString(",")}))) <> ''
-           |                        AND     is_id_card(card_num)
+           |                        $case_no_f
+           |                        $id_card_f
            |                    ) AS t1
            |        ) AS t2
            |WHERE   t2.num = 1
@@ -85,12 +94,13 @@ case class company_judicial_human(s: SparkSession,
     }
 
     def inc(): Unit = {
+      val inc_target_table = s"$project.inc_ads_${tableName}_human"
       val inc_last_ds = getLastPartitionsOrElse(inc_ods_tableName, "0")
-      val ads_last_ds = getLastPartitionsOrElse(target_table, "0")
+      val ads_last_ds = getLastPartitionsOrElse(inc_target_table, "0")
 
       sql(
         s"""
-           |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $target_table PARTITION(ds='$inc_last_ds')
+           |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $inc_target_table PARTITION(ds='$inc_last_ds')
            |SELECT  ${targetCols.mkString(",")}
            |FROM    (
            |            SELECT  *
@@ -102,11 +112,22 @@ case class company_judicial_human(s: SparkSession,
            |                        WHERE   ds > '$ads_last_ds'
            |                        AND     $cid_name IS NULL
            |                        AND     trim(cleanup(concat(${dupliCols.mkString(",")}))) <> ''
-           |                        AND     is_id_card(card_num)
+           |                        $case_no_f
+           |                        $id_card_f
            |                    ) AS t1
            |        ) AS t2
            |WHERE   t2.num = 1
            |""".stripMargin)
+      import com.winhc.bigdata.spark.implicits.DataFrame2HBaseHelper._
+
+      val write_cols = getColumns(inc_target_table).diff(Seq("rowkey", "ds"))
+      sql(
+        s"""
+           |select *
+           |from $inc_target_table
+           |where ds = $inc_last_ds
+           |""".stripMargin)
+        .save2HBase(s"${tableName}_human".toUpperCase, "rowkey", write_cols)
     }
 
     if (is_inc)
@@ -125,7 +146,7 @@ object company_judicial_human {
     val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
 
     company_judicial_human(spark, "winhc_eci_dev")
-      .etl("company_dishonest_info", Seq("name", "case_no"), is_inc = false)
+      .etl("company_dishonest_info", Seq("name", "case_no"), case_no_fields = Seq("case_no", "gist_dd"), id_card_field = "card_num", is_inc = true)
 
     spark.stop()
   }

+ 37 - 26
src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JCR_pre3.scala

@@ -16,30 +16,35 @@ import scala.collection.mutable
  */
 case class JCR_pre3(s: SparkSession,
                     project: String //表所在工程名
-                    ) extends LoggingUtils with Logging with BaseFunc with CompanyMapping {
+                   ) extends LoggingUtils with Logging with BaseFunc with CompanyMapping {
   @(transient@getter) val spark: SparkSession = s
 
   private val target_table = "ads_judicial_case_relation_pre"
 
+  val flag_map  =Map("company_dishonest_info"->"3"
+    ,"company_dishonest_info_human"->"9")
+
 
   def company_dishonest_info(is_inc: Boolean = true): Unit = {
     prepareFunctions(spark)
+    case_no_trim_udf()
+
 
-    def all(): Unit = {
-      val ads_ds = getLastPartitionsOrElse(s"$project.ads_company_dishonest_info", "0")
-      val inc_last_ds = getLastPartitionsOrElse(s"$project.inc_ads_company_dishonest_info", "0")
+    def all(table_name: String): Unit = {
+      val ads_ds = getLastPartitionsOrElse(s"$project.ads_$table_name", "0")
+      val inc_last_ds = getLastPartitionsOrElse(s"$project.inc_ads_$table_name", "0")
 
-      val intersectCols = getColumns(s"$project.ads_company_dishonest_info").toSet & getColumns(s"$project.inc_ads_company_dishonest_info").toSet
+      val intersectCols = getColumns(s"$project.ads_$table_name").toSet & getColumns(s"$project.inc_ads_$table_name").toSet
 
       sql(
         s"""
-           |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $project.ads_judicial_case_relation_pre PARTITION(ds='$inc_last_ds',tn='company_dishonest_info')
-           |SELECT  md5(CONCAT(rowkey,'company_dishonest_info')) as judicase_id
-           |        ,3 as flag
+           |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $project.ads_judicial_case_relation_pre PARTITION(ds='$inc_last_ds',tn='$table_name')
+           |SELECT  md5(CONCAT(rowkey,'$table_name')) as judicase_id
+           |        ,${flag_map(table_name)} as flag
            |        ,CONCAT('关于',name,'的失信信息') as title
            |        ,case_type(gist_dd) as case_type
            |        ,null as case_reason
-           |        ,case_no
+           |        ,case_no_trim(gist_dd) as case_no
            |        ,court as court_name
            |        ,'执行' as case_stage
            |        ,null as yg_name
@@ -52,32 +57,36 @@ case class JCR_pre3(s: SparkSession,
            |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds DESC ) AS num
            |            FROM    (
            |                        SELECT  ${intersectCols.mkString(",")}
-           |                        FROM    winhc_eci_dev.ads_company_dishonest_info
+           |                        FROM    winhc_eci_dev.ads_$table_name
            |                        WHERE   ds = '$ads_ds'
+           |                        AND     case_no_trim(gist_dd) is not null
            |                        UNION ALL
            |                        SELECT  ${intersectCols.mkString(",")}
-           |                        FROM    winhc_eci_dev.inc_ads_company_dishonest_info
+           |                        FROM    winhc_eci_dev.inc_ads_$table_name
            |                        WHERE   ds > '$ads_ds'
+           |                        AND     case_no_trim(gist_dd) is not null
            |                    ) AS t1
            |        ) AS t2
            |WHERE   t2.num = 1
            |""".stripMargin)
     }
 
-    def inc(): Unit = {
+    def inc(table_name: String): Unit = {
 
       val last_ds = sql(s"show partitions $project.$target_table")
         .collect()
         .map(r => r.getString(0))
-        .filter(str => str.contains("company_dishonest_info"))
-        .map(str => str.split("/")(0).split("=")(1))
+        .filter(str => str.contains(table_name))
+        .flatMap(str => str.split("/"))
+        .filter(str => str.contains("ds"))
+        .map(str => str.split("=")(1))
         .max
 
-      val inc_last_ds = getLastPartitionsOrElse(s"$project.inc_ads_company_dishonest_info", "0")
+      val inc_last_ds = getLastPartitionsOrElse(s"$project.inc_ads_$table_name", "0")
 
       sql(
         s"""
-           |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $project.ads_judicial_case_relation_pre PARTITION(ds='$inc_last_ds',tn='company_dishonest_info')
+           |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $project.ads_judicial_case_relation_pre PARTITION(ds='$inc_last_ds',tn='$table_name')
            |SELECT  judicase_id
            |        ,flag
            |        ,title
@@ -111,14 +120,14 @@ case class JCR_pre3(s: SparkSession,
            |                                ,0 AS xjk_f
            |                        FROM    winhc_eci_dev.ads_judicial_case_relation_pre
            |                        WHERE   ds = '$last_ds'
-           |                        AND     tn = 'company_dishonest_info'
+           |                        AND     tn = '$table_name'
            |                        UNION ALL
-           |                        SELECT  md5(CONCAT(rowkey,'company_dishonest_info')) as judicase_id
-           |                                ,3 as flag
+           |                        SELECT  md5(CONCAT(rowkey,'$table_name')) as judicase_id
+           |                                ,${flag_map(table_name)} as flag
            |                                ,CONCAT('关于',name,'的失信信息') as title
            |                                ,case_type(gist_dd) as case_type
            |                                ,null as case_reason
-           |                                ,case_no
+           |                                ,case_no_trim(gist_dd) as case_no
            |                                ,court as court_name
            |                                ,'执行' as case_stage
            |                                ,null as yg_name
@@ -127,8 +136,9 @@ case class JCR_pre3(s: SparkSession,
            |                                ,rowkey as detail_id
            |                                ,null as case_amt
            |                                ,1 AS xjk_f
-           |                        FROM    winhc_eci_dev.inc_ads_company_dishonest_info
+           |                        FROM    winhc_eci_dev.inc_ads_$table_name
            |                        WHERE   ds > '$last_ds'
+           |                        AND     case_no_trim(gist_dd) is not null
            |                    ) AS t1
            |        ) AS t2
            |WHERE   t2.num = 1
@@ -136,10 +146,11 @@ case class JCR_pre3(s: SparkSession,
 
     }
 
-    if (is_inc)
-      inc()
-    else
-      all()
+    for (t <- Seq("company_dishonest_info", "company_dishonest_info_human"))
+      if (is_inc)
+        inc(t)
+      else
+        all(t)
   }
 }
 
@@ -151,7 +162,7 @@ object JCR_pre3 {
     )
     val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
 
-    JCR_pre3(s = spark, project = "winhc_eci_dev").company_dishonest_info(true)
+    JCR_pre3(s = spark, project = "winhc_eci_dev").company_dishonest_info(false)
     spark.stop()
   }
 }