xufei 3 年 前
コミット
3b1e68cc1d

+ 142 - 49
src/main/scala/com/winhc/bigdata/spark/ng/judicial/JudicialCaseRelationAggs.scala

@@ -163,6 +163,7 @@ object JudicialCaseRelationAggs {
       case "pre_calc" => r.pre_calc()
       case "calc_mapping" => r.calc_mapping()
       case "calc" => r.calc()
+      case "calc_relation" => r.calc_relation()
       case _ => {
         println("not fun to run !")
         sys.exit(-1)
@@ -183,12 +184,20 @@ case class JudicialCaseRelationAggs(s: SparkSession, project: String, args_case:
   val ads_judicial_case_relation_id = s" $project.ads_judicial_case_relation_id"
   //id映射表
   val ads_judicial_case_id_mapping = s" $project.ads_judicial_case_id_mapping"
+  //id映射表(原始表)
+  val ods_judicial_case_id_mapping = s" $project.ods_judicial_case_id_mapping"
   //主表
   val ads_judicial_case_relation_r1 = s" $project.ads_judicial_case_relation_r1"
   //明细表
   val ads_judicial_case_relation_r2 = s" $project.ads_judicial_case_relation_r2"
   //案件移除表
-  val ads_judicial_case_id_mapping_deleted = s" $project.ads_judicial_case_id_mapping_deleted"
+  val ads_judicial_case_id_mapping_r1_deleted = s" $project.ads_judicial_case_id_mapping_r1_deleted"
+  //案件移除表
+  val ads_judicial_case_id_mapping_r2_deleted = s" $project.ads_judicial_case_id_mapping_r2_deleted"
+  //案件关系表
+  val bds_judicial_case_relation = s" $project.bds_judicial_case_relation"
+  val ads_judicial_case_node_kafka = s" $project.ads_judicial_case_node_kafka"
+  val ads_judicial_case_relation_kafka = s" $project.ads_judicial_case_relation_kafka"
 
   private val cols_map: Map[String, String] = args_case.cols_map
   private val rowkey: String = args_case.rowkey
@@ -254,12 +263,75 @@ case class JudicialCaseRelationAggs(s: SparkSession, project: String, args_case:
     addEmptyPartitionOrSkipPlus(ads_judicial_case_relation_pre, calc_ds, tableName)
   }
 
+  def calc_relation(): Unit = {
+
+    sql(
+      s"""
+        |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_node_kafka PARTITION(ds='$calc_ds')
+        |SELECT  *
+        |        ,to_json(MAP('start_id',start_id,"topic_type",topic_type)) node_json
+        |FROM    (
+        |            SELECT  concat_ws('_',start_id, tn_flag(tn)) start_id
+        |                    ,"500" AS topic_type
+        |            FROM    (
+        |                        SELECT  *
+        |                                ,ROW_NUMBER() OVER (PARTITION BY start_id,tn ORDER BY start_id DESC) num
+        |                        FROM    (
+        |                                    SELECT  rowkey_1 AS start_id,tn_1 AS tn
+        |                                    FROM    $bds_judicial_case_relation
+        |                                    WHERE   ds = '20210604'
+        |                                    AND     rowkey_1 IS NOT NULL AND     tn_1 IS NOT NULL
+        |                                    UNION ALL
+        |                                    SELECT  rowkey_2 AS start_id,tn_2 AS tn
+        |                                    FROM    $bds_judicial_case_relation
+        |                                    WHERE   ds = '20210604'
+        |                                    AND     rowkey_2 IS NOT NULL AND     tn_2 IS NOT NULL
+        |                                )
+        |                    )
+        |            WHERE   num = 1
+        |        )
+        |${if (isWindows) "LIMIT 1000" else ""}
+        |""".stripMargin).show(1000,false)
+
+    sql(
+      s"""
+        |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_relation_kafka PARTITION(ds='$calc_ds')
+        |SELECT  *
+        |        ,to_json(MAP('start_id',start_id,'end_id',end_id,"topic_type",topic_type,"connect_type",connect_type)) relation_json
+        |FROM    (
+        |            SELECT  concat_ws('_',rowkey_1, tn_flag(tn_1)) start_id
+        |                    ,concat_ws('_',rowkey_2, tn_flag(tn_2)) end_id
+        |                    ,"400" AS topic_type
+        |                    ,connect_type
+        |            FROM    $bds_judicial_case_relation
+        |            WHERE   ds = '20210604'
+        |            AND     rowkey_1 IS NOT NULL AND  rowkey_2 IS NOT NULL
+        |            AND     tn_1 IS NOT NULL AND  tn_2 IS NOT NULL
+        |        )
+        |${if (isWindows) "LIMIT 1000" else ""}
+        |""".stripMargin).show(1000,false)
+
+    //分区不存在
+    addEmptyPartitionOrSkip(ads_judicial_case_node_kafka, calc_ds)
+    addEmptyPartitionOrSkip(ads_judicial_case_relation_kafka, calc_ds)
+  }
+
   def calc_mapping(): Unit = {
+    //ods 转换 ads
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_id_mapping PARTITION(ds='$calc_ds')
+         |SELECT  component_id
+         |        ,rowkey
+         |        ,flag_tn(tn)
+         |FROM    $ods_judicial_case_id_mapping
+         |WHERE   ds = '$calc_ds'
+         |""".stripMargin)
 
-    //找出删除id
+    //主表删除
     sql(
       s"""
-         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_id_mapping_deleted PARTITION(ds='$calc_ds')
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_id_mapping_r1_deleted PARTITION(ds='$calc_ds')
          |SELECT
          |         a.id new_id
          |        ,b.id old_id
@@ -287,7 +359,28 @@ case class JudicialCaseRelationAggs(s: SparkSession, project: String, args_case:
          |WHERE   a.id <> b.id
          |""".stripMargin)
 
-
+    //明细表删除
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_id_mapping_r2_deleted PARTITION(ds='$calc_ds')
+         |SELECT  b.id,a.rowkey,a.tn, 1 as deleted
+         |FROM    (
+         |            SELECT  rowkey, tn, old_id
+         |            FROM    $ads_judicial_case_id_mapping_r1_deleted
+         |            WHERE   ds = '$calc_ds'
+         |        ) a
+         |JOIN    (
+         |            SELECT  id, judicase_id
+         |            FROM    (
+         |                        SELECT  id, judicase_id
+         |                                ,ROW_NUMBER() OVER (PARTITION BY id ORDER BY ds DESC) num
+         |                        FROM    $ads_judicial_case_relation_r2
+         |                        WHERE   ds < '$calc_ds'
+         |                    )
+         |            WHERE   num = 1
+         |        ) b
+         |ON      a.old_id = b.judicase_id
+         |""".stripMargin)
   }
 
   def calc(): Unit = {
@@ -300,51 +393,51 @@ case class JudicialCaseRelationAggs(s: SparkSession, project: String, args_case:
 
     //detail 文书id
     //替换司法案件id
-        sql(
-          s"""
-             |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_relation_id PARTITION(ds='$calc_ds')
-             |SELECT  b.id
-             |        ,a.flag,a.title,a.case_type,a.case_reason,a.case_no,a.court_name,a.case_stage,a.lable,a.detail
-             |        ,a.yg_name,a.bg_name,a.all_name,a.date,a.detail_id,a.case_amt,a.case_id,a.tn,a.data
-             |FROM    (
-             |            SELECT   flag
-             |                    ,title
-             |                    ,case_type(case_no) case_type
-             |                    ,case_reason
-             |                    ,case_no_trim(case_no) as case_no
-             |                    ,court_name
-             |                    ,case_stage(case_no) case_stage
-             |                    ,case_label(flag) lable
-             |                    ,to_json(named_struct('flag', flag, 'date',date, 'detail_id', coalesce(case_id, detail_id), 'name', json_array(bg_name)) ) detail
-             |                    ,yg_name
-             |                    ,bg_name
-             |                    ,merge_json(yg_name, bg_name, all_name) all_name
-             |                    ,date
-             |                    ,detail_id
-             |                    ,case_amt
-             |                    ,case_id
-             |                    ,tn
-             |                    ,data
-             |                    ,row_id
-             |            FROM    (
-             |                        SELECT  *, md5(concat_ws('',detail_id,tn)) row_id
-             |                                ,ROW_NUMBER() OVER (PARTITION BY detail_id,tn ORDER BY ds DESC) num
-             |                        FROM    $ads_judicial_case_relation_pre
-             |                        WHERE   ds > ${calc_last_ds(ads_judicial_case_relation_id)}
-             |                    )
-             |            WHERE   num = 1
-             |        ) a
-             |JOIN    (
-             |           SELECT id, row_id
-             |           FROM (
-             |                    SELECT  id, md5(concat_ws('',rowkey,tn)) row_id
-             |                           ,ROW_NUMBER() OVER (PARTITION BY rowkey,tn ORDER BY ds DESC) num2
-             |                    FROM    $ads_judicial_case_id_mapping
-             |                    WHERE   ds > 0
-             |                ) WHERE num2 = 1
-             |        ) b
-             |ON      a.row_id = b.row_id
-             |""".stripMargin)
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_relation_id PARTITION(ds='$calc_ds')
+         |SELECT  b.id
+         |        ,a.flag,a.title,a.case_type,a.case_reason,a.case_no,a.court_name,a.case_stage,a.lable,a.detail
+         |        ,a.yg_name,a.bg_name,a.all_name,a.date,a.detail_id,a.case_amt,a.case_id,a.tn,a.data
+         |FROM    (
+         |            SELECT   flag
+         |                    ,title
+         |                    ,case_type(case_no) case_type
+         |                    ,case_reason
+         |                    ,case_no_trim(case_no) as case_no
+         |                    ,court_name
+         |                    ,case_stage(case_no) case_stage
+         |                    ,case_label(flag) lable
+         |                    ,to_json(named_struct('flag', flag, 'date',date, 'detail_id', coalesce(case_id, detail_id), 'name', json_array(bg_name)) ) detail
+         |                    ,yg_name
+         |                    ,bg_name
+         |                    ,merge_json(yg_name, bg_name, all_name) all_name
+         |                    ,date
+         |                    ,detail_id
+         |                    ,case_amt
+         |                    ,case_id
+         |                    ,tn
+         |                    ,data
+         |                    ,row_id
+         |            FROM    (
+         |                        SELECT  *, md5(concat_ws('',detail_id,tn)) row_id
+         |                                ,ROW_NUMBER() OVER (PARTITION BY detail_id,tn ORDER BY ds DESC) num
+         |                        FROM    $ads_judicial_case_relation_pre
+         |                        WHERE   ds > ${calc_last_ds(ads_judicial_case_relation_id)}
+         |                    )
+         |            WHERE   num = 1
+         |        ) a
+         |JOIN    (
+         |           SELECT id, row_id
+         |           FROM (
+         |                    SELECT  id, md5(concat_ws('',rowkey,tn)) row_id
+         |                           ,ROW_NUMBER() OVER (PARTITION BY rowkey,tn ORDER BY ds DESC) num2
+         |                    FROM    $ads_judicial_case_id_mapping
+         |                    WHERE   ds > 0
+         |                ) WHERE num2 = 1
+         |        ) b
+         |ON      a.row_id = b.row_id
+         |""".stripMargin)
 
     //司法案件主表
     sql(

+ 11 - 0
src/main/scala/com/winhc/bigdata/spark/udf/CompanyMapping.scala

@@ -78,9 +78,20 @@ trait CompanyMapping {
     spark.udf.register("merge_json", (yg_name: String, bg_name: String, all_name: String) => {
       merge_json(yg_name, bg_name, all_name)
     })
+
     spark.udf.register("all_name", (name: String) => {
       all_name(name)
     })
+
+    spark.udf.register("flag_tn", (name: String) => {
+      flag_tn(name)
+    })
+
+    spark.udf.register("tn_flag", (name: String) => {
+      tn_flag(name)
+    })
+
+
   }
 
   def prepare(spark: SparkSession): Unit = {

+ 27 - 0
src/main/scala/com/winhc/bigdata/spark/utils/BaseUtil.scala

@@ -655,7 +655,34 @@ object BaseUtil {
     r
   }
 
+  val tn_mapping = Map[String, String](
+    "company_lawsuit" -> "0"
+    , "company_court_open_announcement" -> "1"
+    , "company_court_announcement" -> "2"
+    , "company_dishonest_info" -> "3"
+    , "company_send_announcement" -> "4"
+    , "company_zxr_restrict" -> "5"
+    , "company_zxr_final_case" -> "6"
+    , "company_zxr" -> "7"
+    , "company_court_register" -> "8"
+  )
+
+  def flag_tn(flag: String): String = {
+    if (StringUtils.isBlank(flag)) return "-1"
+    val m = for ((a, b) <- tn_mapping) yield (b, a)
+    if (m.contains(flag)) return m(flag)
+    "-1"
+  }
+
+  def tn_flag(tn: String): String = {
+    if (StringUtils.isBlank(tn)) return "-1"
+    if (tn_mapping.contains(tn)) return tn_mapping(tn)
+    "-1"
+  }
+
   def main(args: Array[String]): Unit = {
+    println(flag_tn("10"))
+    println(tn_flag("company_zxr1"))
     val s = "[{\"name\":\"史某某\",\"litigant_id\":\"xx\"},{\"name\":\"伍新贵\",\"litigant_id\":\"\"}]\u0001[{\"name\":\"伍新贵\",\"litigant_id\":\"\"}]"
     println(all_name(s))
     val s1 = "[{\"name\":\"\",\"litigant_id\":\"xx\"},{\"name\":\"伍新贵\",\"litigant_id\":\"\"}]"