Browse Source

fix: 司法案件上游接入失信和被执数据

- 司法案件pre表关联司法案件id
许家凯 4 years ago
parent
commit
b49fb54c24

+ 77 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JudicialCaseRelationPre39.scala

@@ -36,9 +36,17 @@ case class JudicialCaseRelationPre39(s: SparkSession,
 
       val intersectCols = getColumns(s"$project.ads_$table_name").toSet & getColumns(s"$project.inc_ads_$table_name").toSet
 
+
+      sql(
+        s"""
+           |SELECT  *
+           |FROM    winhc_eci_dev.ads_judicial_case_relation_graph
+           |WHERE   flag = '$table_name'
+           |""".stripMargin)
+        .createOrReplaceTempView("all_judicial_mapping_tmp")
+
       sql(
         s"""
-           |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $project.ads_judicial_case_relation_pre PARTITION(ds='$inc_last_ds',tn='$table_name')
            |SELECT  md5(CONCAT(rowkey,'$table_name')) as judicase_id
            |        ,${flag_map(table_name)} as flag
            |        ,CONCAT('关于',name,'的失信信息') as title
@@ -52,6 +60,7 @@ case class JudicialCaseRelationPre39(s: SparkSession,
            |        ,pub_date as date
            |        ,rowkey as detail_id
            |        ,null as case_amt
+           |        ,case_no_trim(case_no) as xjk_case_no
            |FROM    (
            |            SELECT  *
            |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds DESC ) AS num
@@ -69,6 +78,73 @@ case class JudicialCaseRelationPre39(s: SparkSession,
            |        ) AS t2
            |WHERE   t2.num = 1
            |""".stripMargin)
+        .createOrReplaceTempView(s"all_${table_name}_tmp")
+
+      sql(
+        s"""
+           |SELECT  coalesce(t2.judicase_id,t1.judicase_id) AS judicase_id
+           |        ,flag
+           |        ,title
+           |        ,case_type
+           |        ,case_reason
+           |        ,case_no
+           |        ,court_name
+           |        ,case_stage
+           |        ,yg_name
+           |        ,bg_name
+           |        ,date
+           |        ,detail_id
+           |        ,case_amt
+           |        ,xjk_case_no
+           |FROM    (
+           |            SELECT  *
+           |            FROM    all_${table_name}_tmp
+           |        ) AS t1
+           |LEFT JOIN (
+           |              SELECT  *
+           |              FROM    all_judicial_mapping_tmp
+           |          ) AS t2
+           |ON      t1.detail_id = t2.id
+           |""".stripMargin)
+        .createOrReplaceTempView(s"ok_table_$table_name")
+
+
+
+      //fixme  替换司法案件id
+      sql(
+        s"""
+           |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $project.ads_judicial_case_relation_pre PARTITION(ds='$inc_last_ds',tn='$table_name')
+           |SELECT  judicase_id
+           |        ,flag
+           |        ,title
+           |        ,case_type
+           |        ,case_reason
+           |        ,case_no
+           |        ,court_name
+           |        ,case_stage
+           |        ,yg_name
+           |        ,bg_name
+           |        ,date
+           |        ,detail_id
+           |        ,case_amt
+           |FROM    ok_table_$table_name
+           |UNION ALL
+           |SELECT  judicase_id
+           |        ,flag
+           |        ,title
+           |        ,case_type
+           |        ,case_reason
+           |        ,xjk_case_no AS case_no
+           |        ,court_name
+           |        ,case_stage
+           |        ,yg_name
+           |        ,bg_name
+           |        ,date
+           |        ,detail_id
+           |        ,case_amt
+           |FROM    ok_table_$table_name
+           |WHERE   xjk_case_no IS NOT NULL
+           |""".stripMargin)
     }
 
     def inc(table_name: String): Unit = {

+ 213 - 13
src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JudicialCaseRelation_CaseAgg.scala

@@ -3,7 +3,7 @@ package com.winhc.bigdata.spark.jobs.judicial
 import com.winhc.bigdata.spark.implicits.RegexUtils.RichRegex
 import com.winhc.bigdata.spark.udf.BaseFunc
 import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
-import com.winhc.bigdata.spark.utils.{LoggingUtils, SparkUtils, case_connect_utils}
+import com.winhc.bigdata.spark.utils.{AsyncExtract, LoggingUtils, SparkUtils, case_connect_utils}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 
@@ -23,13 +23,217 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
   private val table_id_map = Map("wenshu_detail" -> "case_id")
   private val pat = ".*\\d+.*".r
 
+  private val separation = "@@"
+
   import spark.implicits._
 
+  is_id_card_udf()
+
   def etl(ds: String): Unit = {
-    etl_wenshu(ds)
+    AsyncExtract.startAndWait(spark, Seq(
+      ("dishonest etl...", () => {
+        etl_company_dishonest_info(ds)
+        true
+      })
+      , ("wenshu etl...", () => {
+        etl_wenshu(ds)
+        true
+      })
+      ,
+      ("company_zxr etl...", () => {
+        etl_company_zxf(ds)
+        true
+      })
+    ))
     relationByGroup()
   }
 
+  private def etl_company_zxf(ds: String): Unit = {
+    val view =
+      s"""
+         | rowkey as id
+         | ,court as court_name
+         | ,case_no
+         | ,court as connect_court_name
+         | ,gist_id as connect_case_no
+         | ,null as yg_name
+         | ,null as bg_name
+         | ,ds
+         |""".stripMargin
+
+    /*val companyCidAndNameUtils = utils.CompanyCidAndNameUtils(spark)
+      def detail_zxr(tn: String): Unit = {
+      val org_ads_tab = s"$project.ads_$tn"
+      val org_inc_ads_tab = s"$project.inc_ads_$tn"
+      val ads_last_ds = getLastPartitionsOrElse(org_ads_tab, "0")
+
+      val intersect_cols = getColumns(org_ads_tab).intersect(getColumns(org_inc_ads_tab))
+
+      val verify = tn.contains("person") match {
+        case true => s"AND     is_id_card(t2.card)"
+        case false => ""
+      }
+      sql(
+        s"""
+           |SELECT  *
+           |FROM    (
+           |            SELECT  *
+           |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds DESC ) AS num
+           |            FROM    (
+           |                        SELECT  ${intersect_cols.mkString(",")}
+           |                        FROM    $org_ads_tab
+           |                        WHERE   ds = '$ads_last_ds'
+           |                        UNION ALL
+           |                        SELECT  ${intersect_cols.mkString(",")}
+           |                        FROM    $org_inc_ads_tab
+           |                        WHERE   ds > '$ads_last_ds'
+           |                    ) AS t1
+           |        ) AS t2
+           |WHERE   t2.num = 1
+           |$verify
+           |""".stripMargin).createOrReplaceTempView("xjk_tmp")
+
+      var other_cols: Seq[String] = null
+      var result_tab: String = null
+
+      if (!tn.contains("person")) {
+        val res_tab = companyCidAndNameUtils.replaceNewNameByCid("xjk_tmp", "new_cid", "cname")
+        result_tab = s"${res_tab}_tmp"
+        sql(
+          s"""
+             |SELECT  $view
+             |FROM    $res_tab
+             |""".stripMargin)
+          .cache()
+          .createOrReplaceTempView(result_tab)
+      } else {
+        result_tab = s"xjk_tmp_xjk_tmp"
+        sql(
+          s"""
+             |SELECT  $view
+             |FROM    xjk_tmp
+             |""".stripMargin)
+          .createOrReplaceTempView(result_tab)
+      }
+      other_cols = getColumns(result_tab)
+
+      sql(
+        s"""
+           |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.dwd_judicial_case  PARTITION(ds='$ds',tn='$tn')
+           |SELECT  id
+           |        , 1 as main_case_no
+           |        ,case_no
+           |        ,${getStrToMap(other_cols)} as case_attribute
+           |FROM    $result_tab
+           |UNION ALL
+           |SELECT  id
+           |        , 0 as main_case_no
+           |        ,connect_case_no as case_no
+           |        ,${getStrToMap(other_cols)} as case_attribute
+           |FROM    $result_tab
+           |WHERE   connect_case_no is not null
+           |""".stripMargin)
+
+    }
+
+    AsyncExtract.startAndWait(spark, Seq(
+      ("company_zxr", () => {
+        detail_zxr("company_zxr_list")
+        true
+      })
+      , ("company_zxr_person", () => {
+        detail_zxr("company_zxr_person")
+        true
+      })
+    ))*/
+
+    AsyncExtract.startAndWait(spark, Seq(
+      ("company_zxr", () => {
+        detail_etl(ds, "company_zxr", view)
+        true
+      })
+      , ("company_zxr_person", () => {
+        detail_etl(ds, "company_zxr_person", view)
+        true
+      })
+    ))
+  }
+
+  private def detail_etl(ds: String, tn: String, view: String): Unit = {
+    val tmp_tab = s"all_etl_${tn}_tmp"
+
+    val org_ads_tab = s"$project.ads_$tn"
+    val org_inc_ads_tab = s"$project.inc_ads_$tn"
+    val ads_last_ds = getLastPartitionsOrElse(org_ads_tab, "0")
+
+    val intersect_cols = getColumns(org_ads_tab).intersect(getColumns(org_inc_ads_tab))
+
+    sql(
+      s"""
+         |SELECT $view
+         |FROM    (
+         |            SELECT  *
+         |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds DESC ) AS num
+         |            FROM    (
+         |                        SELECT  ${intersect_cols.mkString(",")}
+         |                        FROM    $org_ads_tab
+         |                        WHERE   ds = '$ads_last_ds'
+         |                        UNION ALL
+         |                        SELECT  ${intersect_cols.mkString(",")}
+         |                        FROM    $org_inc_ads_tab
+         |                        WHERE   ds > '$ads_last_ds'
+         |                    ) AS t1
+         |        ) AS t2
+         |WHERE   t2.num = 1
+         |""".stripMargin)
+      .createOrReplaceTempView(tmp_tab)
+    val other_cols = getColumns(tmp_tab)
+
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.dwd_judicial_case  PARTITION(ds='$ds',tn='$tn')
+         |SELECT  id
+         |        , 1 as main_case_no
+         |        ,case_no
+         |        ,${getStrToMap(other_cols)} as case_attribute
+         |FROM    $tmp_tab
+         |UNION ALL
+         |SELECT  id
+         |        , 0 as main_case_no
+         |        ,connect_case_no as case_no
+         |        ,${getStrToMap(other_cols)} as case_attribute
+         |FROM    $tmp_tab
+         |WHERE   connect_case_no is not null
+         |""".stripMargin)
+  }
+
+
+  private def etl_company_dishonest_info(ds: String): Unit = {
+    val view =
+      s"""
+         | rowkey as id
+         | ,court as court_name
+         | ,case_no
+         | ,gist_unit as connect_court_name
+         | ,gist_dd as connect_case_no
+         | ,name as yg_name
+         | ,name as bg_name
+         | ,ds
+         |""".stripMargin
+
+
+    AsyncExtract.startAndWait(spark, Seq(
+      ("company_dishonest_info", () => {
+        detail_etl(ds, "company_dishonest_info", view)
+        true
+      })
+      , ("company_dishonest_info_person", () => {
+        detail_etl(ds, "company_dishonest_info_person", view)
+        true
+      })
+    ))
+  }
+
   private def etl_wenshu(ds: String): Unit = {
     val tableName = "wenshu_detail"
 
@@ -51,11 +255,6 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
          |                        SELECT  ${other_cols.mkString(",")}
          |                        FROM    $org_tab
          |                        WHERE   ds > 0
-         | ---                        WHERE   ds = '$ods_end_ds'
-         | ---                        UNION ALL
-         | ---                        SELECT  ${other_cols.mkString(",")}
-         | ---                        FROM    $org_tab
-         | ---                        WHERE   ds > $ods_end_ds
          |                    ) AS t1
          |        ) AS t2
          |WHERE   t2.num = 1
@@ -68,7 +267,7 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
          |SELECT  *
          |FROM    $tmp_tab lateral view explode(split(connect_case_no,'\\n')) t as single_connect_case_no
          |""".stripMargin)
-      .cache()
+//      .cache()
       .createOrReplaceTempView(s"explode_$tmp_tab")
 
     sql(
@@ -105,7 +304,8 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
          | AND     case_no <> ''
          | AND     match_case_no(case_no)
          |""".stripMargin)
-      .cache()
+      .repartition(500)
+//      .cache()
       .createOrReplaceTempView("dwd_judicial_case_tmp")
 
     //需要区分group by ,只用一个
@@ -113,7 +313,7 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
       s"""
          |SELECT  case_no,party,collect_set(id) as connect_case_id
          |FROM    (
-         |            SELECT  concat_ws('_',id,tn) as id
+         |            SELECT  concat_ws('$separation',id,tn) as id
          |                    ,case_no
          |                    ,tn
          |                    ,main_case_no
@@ -132,12 +332,12 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
         val connect_case_id = r.getAs[Seq[String]]("connect_case_id")
         val list = ArrayBuffer[(String, String, String, String, String, String, Int)]()
         if (connect_case_id.length < 2) {
-          val e_1 = connect_case_id.head.split("_")
+          val e_1 = connect_case_id.head.split(separation)
           list.append((e_1(0), null, case_no, null, e_1(1), null, 2))
         }
         for (i <- 0 to connect_case_id.length - 2) {
-          val e_1 = connect_case_id(i).split("_")
-          val e_2 = connect_case_id(i + 1).split("_")
+          val e_1 = connect_case_id(i).split(separation)
+          val e_2 = connect_case_id(i + 1).split(separation)
           list.append((e_1(0), e_2(0), case_no, case_no, e_1(1), e_2(1), 2))
         }
         list