xufei преди 3 години
родител
ревизия
3cdfb93857
променени са 1 файла, в които са добавени 267 реда и са изтрити 103 реда
  1. 267 103
      src/main/scala/com/winhc/bigdata/spark/ng/judicial/JudicialCaseRelationAggs.scala

+ 267 - 103
src/main/scala/com/winhc/bigdata/spark/ng/judicial/JudicialCaseRelationAggs.scala

@@ -195,12 +195,21 @@ case class JudicialCaseRelationAggs(s: SparkSession, project: String, args_case:
   //案件移除表
   val ads_judicial_case_id_mapping_r1_deleted = s" $project.ads_judicial_case_id_mapping_r1_deleted"
   //案件移除表
-  val ads_judicial_case_id_mapping_r2_deleted = s" $project.ads_judicial_case_id_mapping_r2_deleted"
+  val ads_judicial_case_id_mapping_r3_deleted = s" $project.ads_judicial_case_id_mapping_r3_deleted"
   //案件关系表
   val bds_judicial_case_relation = s" $project.bds_judicial_case_relation"
   val ads_judicial_case_node_kafka = s" $project.ads_judicial_case_node_kafka"
   val ads_judicial_case_relation_kafka = s" $project.ads_judicial_case_relation_kafka"
 
+  val ads_judicial_case_node = s" $project.ads_judicial_case_node"
+  val ads_judicial_case_relation = s" $project.ads_judicial_case_relation"
+
+  //黑名单表
+  val ads_case_id_big = s"winhc_ng.ads_case_id_big"
+
+  val update = s"update"
+  val incr = s"incr"
+
   private val cols_map: Map[String, String] = args_case.cols_map
   private val rowkey: String = args_case.rowkey
   private val tableName: String = args_case.tableName
@@ -267,60 +276,204 @@ case class JudicialCaseRelationAggs(s: SparkSession, project: String, args_case:
 
   def calc_relation(): Unit = {
 
+    //增量关系和节点
     sql(
       s"""
-        |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_node_kafka PARTITION(ds='$calc_ds')
-        |SELECT  *
-        |        ,to_json(MAP('start_id',start_id,"topic_type",topic_type)) node_json
+        |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_node PARTITION(ds='$calc_ds')
+        |SELECT  concat_ws('_',start_id,tn_flag(tn)) start_id
+        |        ,'CASE' LABEL
         |FROM    (
-        |            SELECT  concat_ws('_',start_id, tn_flag(tn)) start_id
-        |                    ,"500" AS topic_type
+        |            SELECT  *
+        |                    ,ROW_NUMBER() OVER (PARTITION BY start_id,tn ORDER BY start_id DESC) num
         |            FROM    (
-        |                        SELECT  *
-        |                                ,ROW_NUMBER() OVER (PARTITION BY start_id,tn ORDER BY start_id DESC) num
-        |                        FROM    (
-        |                                    SELECT  rowkey_1 AS start_id,tn_1 AS tn
-        |                                    FROM    $bds_judicial_case_relation
-        |                                    WHERE   ds = '$calc_ds'
-        |                                    AND     rowkey_1 IS NOT NULL AND     tn_1 IS NOT NULL
-        |                                    UNION ALL
-        |                                    SELECT  rowkey_2 AS start_id,tn_2 AS tn
-        |                                    FROM    $bds_judicial_case_relation
-        |                                    WHERE   ds = '$calc_ds'
-        |                                    AND     rowkey_2 IS NOT NULL AND     tn_2 IS NOT NULL
-        |                                )
+        |                        SELECT  rowkey_1 AS start_id
+        |                                ,tn_1 AS tn
+        |                        FROM    $bds_judicial_case_relation
+        |                        WHERE   ds = '$calc_ds'
+        |                        AND     rowkey_1 IS NOT NULL
+        |                        AND     tn_1 IS NOT NULL
+        |                        UNION ALL
+        |                        SELECT  rowkey_2 AS start_id
+        |                                ,tn_2 AS tn
+        |                        FROM    $bds_judicial_case_relation
+        |                        WHERE   ds = '$calc_ds'
+        |                        AND     rowkey_2 IS NOT NULL
+        |                        AND     tn_2 IS NOT NULL
         |                    )
-        |            WHERE   num = 1
         |        )
-        |${if (isWindows) "LIMIT 1000" else ""}
-        |""".stripMargin).show(1000,false)
+        |WHERE   num = 1
+        |""".stripMargin)
 
-    //todo 去重
     sql(
       s"""
-        |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_relation_kafka PARTITION(ds='$calc_ds')
-        |SELECT start_id, end_id, topic_type, connect_type, relation_json
-        |FROM (
-        |   SELECT  start_id, end_id, topic_type, connect_type
-        |           ,to_json(MAP('start_id',start_id,'end_id',end_id,"topic_type",topic_type,"connect_type",connect_type)) relation_json
-        |           ,ROW_NUMBER() OVER (PARTITION by combine_id(start_id,end_id) ORDER by start_id desc) num
-        |   FROM    (
-        |               SELECT  concat_ws('_',rowkey_1, tn_flag(tn_1)) start_id
-        |                       ,concat_ws('_',rowkey_2, tn_flag(tn_2)) end_id
-        |                       ,"400" AS topic_type
-        |                       ,connect_type
-        |               FROM    $bds_judicial_case_relation
-        |               WHERE   ds = '$calc_ds'
-        |               AND     rowkey_1 IS NOT NULL AND  rowkey_2 IS NOT NULL
-        |               AND     tn_1 IS NOT NULL AND  tn_2 IS NOT NULL
-        |           )
-        |) WHERE num = 1
-        |${if (isWindows) "LIMIT 1000" else ""}
-        |""".stripMargin).show(1000,false)
+        |INSERT ${if (isWindows) "INTO" else "OVERWRITE"}  TABLE $ads_judicial_case_relation PARTITION (ds='$calc_ds')
+        |SELECT start_id,end_id,connect_type,'RELATION' as TYPE
+        |from (
+        |SELECT  start_id,end_id,connect_type
+        |        ,ROW_NUMBER() OVER (PARTITION BY combine_id(start_id,end_id) ORDER BY start_id DESC) num
+        |FROM    (
+        |            SELECT  concat_ws('_',rowkey_1,tn_flag(tn_1)) start_id
+        |                    ,concat_ws('_',rowkey_2,tn_flag(tn_2)) end_id
+        |                    ,connect_type
+        |            FROM    $bds_judicial_case_relation
+        |            WHERE   ds = '$calc_ds'
+        |            AND     rowkey_1 IS NOT NULL AND  rowkey_2 IS NOT NULL
+        |            AND     tn_1 IS NOT NULL AND  tn_2 IS NOT NULL
+        |        )
+        |)
+        |WHERE  num = 1
+        |""".stripMargin)
+
+    //发送kafka关系和节点
+    sql(
+      s"""
+        |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_relation_kafka PARTITION(ds='$calc_ds',tn='$incr')
+        |SELECT  a.start_id
+        |        ,a.end_id
+        |        ,IF(b.row_id IS NULL,"400","600") topic_type
+        |        ,connect_type
+        |        ,to_json(MAP('start_id',a.start_id,'end_id',a.end_id,"topic_type",IF(b.row_id IS NULL,"400","600"),"connect_type",connect_type)) relation_json
+        |FROM    (
+        |            SELECT  start_id
+        |                    ,end_id
+        |                    ,connect_type
+        |                    ,combine_id(start_id,end_id) row_id
+        |            FROM    $ads_judicial_case_relation
+        |            WHERE   ds = '$calc_ds'
+        |        ) a
+        |LEFT JOIN (
+        |              SELECT   combine_id(start_id,end_id) row_id
+        |              FROM    $ads_judicial_case_relation
+        |              WHERE   ds < '$calc_ds'
+        |              GROUP BY combine_id(start_id,end_id)
+        |          ) b
+        |ON      a.row_id = b.row_id
+        |LEFT JOIN    (
+        |            SELECT  case_id
+        |            FROM    $ads_case_id_big
+        |            GROUP by case_id
+        |        ) c
+        |ON      a.start_id = c.case_id
+        |LEFT JOIN (
+        |              SELECT  case_id
+        |              FROM    $ads_case_id_big
+        |              GROUP by case_id
+        |          ) d
+        |ON      a.end_id = d.case_id
+        |WHERE   c.case_id IS NULL
+        |AND     d.case_id IS NULL
+        |AND     a.start_id IS NOT NULL
+        |AND     a.end_id IS NOT NULL
+        |AND     b.row_id IS NULL
+        |""".stripMargin)
+
+    sql(
+      s"""
+        |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_relation_kafka PARTITION(ds='$calc_ds',tn='$update')
+        |SELECT  a.start_id
+        |        ,a.end_id
+        |        ,"600" topic_type
+        |        ,connect_type
+        |        ,to_json(MAP('start_id',a.start_id,'end_id',a.end_id,"topic_type","600","connect_type",connect_type)) relation_json
+        |FROM    (
+        |            SELECT  start_id
+        |                    ,end_id
+        |                    ,connect_type
+        |                    ,combine_id(start_id,end_id) row_id
+        |            FROM    $ads_judicial_case_relation
+        |            WHERE   ds = '$calc_ds'
+        |        ) a
+        |LEFT JOIN (
+        |              SELECT  case_id
+        |              FROM    (
+        |                          SELECT  case_id
+        |                          FROM    $ads_case_id_big
+        |                          UNION ALL
+        |                          SELECT  start_id case_id
+        |                          FROM    $ads_judicial_case_relation_kafka
+        |                          WHERE   ds = '$calc_ds'
+        |                          AND     tn = '$incr'
+        |                          UNION ALL
+        |                          SELECT  end_id case_id
+        |                          FROM    $ads_judicial_case_relation_kafka
+        |                          WHERE   ds = '$calc_ds'
+        |                          AND     tn = '$incr'
+        |                      )
+        |              GROUP BY case_id
+        |          ) c
+        |ON      a.start_id = c.case_id
+        |LEFT  JOIN    (
+        |            SELECT  case_id
+        |            FROM    (
+        |                        SELECT  case_id
+        |                        FROM    $ads_case_id_big
+        |                        UNION ALL
+        |                        SELECT  start_id case_id
+        |                        FROM    $ads_judicial_case_relation_kafka
+        |                        WHERE   ds = '$calc_ds'
+        |                        AND     tn = '$incr'
+        |                        UNION ALL
+        |                        SELECT  end_id case_id
+        |                        FROM    $ads_judicial_case_relation_kafka
+        |                        WHERE   ds = '$calc_ds'
+        |                        AND     tn = '$incr'
+        |                    )
+        |            GROUP BY case_id
+        |        ) d
+        |ON      a.end_id = d.case_id
+        |WHERE   c.case_id IS NULL
+        |AND     d.case_id IS NULL
+        |AND     a.start_id IS NOT NULL
+        |AND     a.end_id IS NOT NULL
+        |""".stripMargin)
+
+    sql(
+      s"""
+        |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_node_kafka PARTITION(ds,tn)
+        |SELECT  a.start_id
+        |        ,IF(b.start_id IS NULL,"500","700") topic_type
+        |        ,to_json(MAP('start_id',a.start_id,"topic_type",IF(b.start_id IS NULL,"500","700"))) node_json
+        |        ,'$calc_ds' ds
+        |        ,IF(b.start_id IS NULL,"$incr","$update") tn
+        |FROM    (
+        |            SELECT  start_id
+        |            FROM    $ads_judicial_case_node
+        |            WHERE   ds = '$calc_ds'
+        |            GROUP by start_id
+        |        ) a
+        |LEFT JOIN (
+        |              SELECT  start_id
+        |              FROM    $ads_judicial_case_node
+        |              WHERE   ds < '$calc_ds'
+        |              GROUP BY start_id
+        |          ) b
+        |ON      a.start_id = b.start_id
+        |LEFT JOIN    (
+        |            SELECT  case_id
+        |            FROM    (
+        |                        SELECT  case_id
+        |                        FROM    $ads_case_id_big
+        |                        UNION ALL
+        |                        SELECT  start_id case_id
+        |                        FROM    $ads_judicial_case_relation_kafka
+        |                        WHERE   ds = '$calc_ds'
+        |                        UNION ALL
+        |                        SELECT  end_id case_id
+        |                        FROM    $ads_judicial_case_relation_kafka
+        |                        WHERE   ds = '$calc_ds'
+        |                    )
+        |            GROUP BY case_id
+        |        ) c
+        |ON      a.start_id = c.case_id
+        |WHERE   c.case_id IS NULL
+        |AND     a.start_id IS NOT NULL
+        |""".stripMargin)
 
     //分区不存在
-    addEmptyPartitionOrSkip(ads_judicial_case_node_kafka, calc_ds)
-    addEmptyPartitionOrSkip(ads_judicial_case_relation_kafka, calc_ds)
+    addEmptyPartitionOrSkipPlus(ads_judicial_case_node_kafka, calc_ds, incr)
+    addEmptyPartitionOrSkipPlus(ads_judicial_case_node_kafka, calc_ds, update)
+    addEmptyPartitionOrSkipPlus(ads_judicial_case_relation_kafka, calc_ds, incr)
+    addEmptyPartitionOrSkipPlus(ads_judicial_case_relation_kafka, calc_ds, update)
   }
 
   def calc_mapping(): Unit = {
@@ -328,70 +481,86 @@ case class JudicialCaseRelationAggs(s: SparkSession, project: String, args_case:
     sql(
       s"""
          |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_id_mapping PARTITION(ds='$calc_ds')
-         |SELECT  component_id
-         |        ,rowkey
-         |        ,flag_tn(tn)
-         |FROM    $ods_judicial_case_id_mapping
-         |WHERE   ds = '$calc_ds'
+         |SELECT  id,rowkey,tn
+         |FROM (
+         |     SELECT  component_id id
+         |             ,rowkey
+         |             ,flag_tn(tn) tn
+         |             ,ROW_NUMBER() OVER (PARTITION BY rowkey,flag_tn(tn) ORDER BY ds DESC, update_time DESC) num
+         |     FROM    $ods_judicial_case_id_mapping
+         |     WHERE   ds = '$calc_ds'
+         |)
+         |WHERE num = 1
          |""".stripMargin)
 
+    //可能有重复 TODO
     //主表删除
     sql(
       s"""
          |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_id_mapping_r1_deleted PARTITION(ds='$calc_ds')
-         |SELECT
-         |         a.id new_id
-         |        ,b.id old_id
-         |        ,a.rowkey
-         |        ,a.tn
-         |        ,1 AS deleted
-         |FROM    (
-         |            SELECT  *
-         |                    ,md5(concat_ws('',rowkey,tn)) row_id
-         |            FROM    $ads_judicial_case_id_mapping
-         |            WHERE   ds = '$calc_ds'
-         |        ) a
-         |JOIN    (
-         |            SELECT  id, row_id, num
-         |            FROM    (
-         |                        SELECT  id
-         |                                ,md5(concat_ws('',rowkey,tn)) row_id
-         |                                ,ROW_NUMBER() OVER (PARTITION BY rowkey,tn ORDER BY ds DESC) num
-         |                        FROM    $ads_judicial_case_id_mapping
-         |                        WHERE   ds < '$calc_ds'
-         |                    )
-         |            WHERE   num = 1
-         |        ) b
-         |ON      a.row_id = b.row_id
-         |WHERE   a.id <> b.id
+         |SELECT new_id, old_id, rowkey, tn, deleted
+         |FROM (
+         |      SELECT
+         |               a.id new_id
+         |              ,b.id old_id
+         |              ,a.rowkey
+         |              ,a.tn
+         |              ,1 AS deleted
+         |              ,ROW_NUMBER() OVER (PARTITION BY b.id ORDER BY ds DESC) num2
+         |      FROM    (
+         |                  SELECT  *
+         |                          ,md5(concat_ws('',rowkey,tn)) row_id
+         |                  FROM    $ads_judicial_case_id_mapping
+         |                  WHERE   ds = '$calc_ds'
+         |              ) a
+         |      JOIN    (
+         |                  SELECT  id, row_id, num
+         |                  FROM    (
+         |                              SELECT  id
+         |                                      ,md5(concat_ws('',rowkey,tn)) row_id
+         |                                      ,ROW_NUMBER() OVER (PARTITION BY rowkey,tn ORDER BY ds DESC) num
+         |                              FROM    $ads_judicial_case_id_mapping
+         |                              WHERE   ds < '$calc_ds'
+         |                          )
+         |                  WHERE   num = 1
+         |              ) b
+         |      ON      a.row_id = b.row_id
+         |      WHERE   a.id <> b.id
+         |)
+         |WHERE  num2  =  1
          |""".stripMargin)
 
     //明细表删除
     sql(
       s"""
-         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_id_mapping_r2_deleted PARTITION(ds='$calc_ds')
-         |SELECT  b.id,a.rowkey,a.tn, 1 as deleted
-         |FROM    (
-         |            SELECT  rowkey, tn, old_id
-         |            FROM    $ads_judicial_case_id_mapping_r1_deleted
-         |            WHERE   ds = '$calc_ds'
-         |        ) a
-         |JOIN    (
-         |            SELECT  id, judicase_id
-         |            FROM    (
-         |                        SELECT  id, judicase_id
-         |                                ,ROW_NUMBER() OVER (PARTITION BY id ORDER BY ds DESC) num
-         |                        FROM    $ads_judicial_case_relation_r3
-         |                        WHERE   ds < '$calc_ds'
-         |                    )
-         |            WHERE   num = 1
-         |        ) b
-         |ON      a.old_id = b.judicase_id
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_id_mapping_r3_deleted PARTITION(ds='$calc_ds')
+         |SELECT id, rowkey, tn, deleted
+         |FROM (
+         |      SELECT  b.id,a.rowkey,a.tn, 1 as deleted
+         |              ,ROW_NUMBER() OVER (PARTITION BY b.id ORDER BY tn DESC) num2
+         |      FROM    (
+         |                  SELECT  rowkey, tn, old_id
+         |                  FROM    $ads_judicial_case_id_mapping_r1_deleted
+         |                  WHERE   ds = '$calc_ds'
+         |              ) a
+         |      JOIN    (
+         |                  SELECT  id, judicase_id
+         |                  FROM    (
+         |                              SELECT  id, judicase_id
+         |                                      ,ROW_NUMBER() OVER (PARTITION BY id ORDER BY ds DESC) num
+         |                              FROM    $ads_judicial_case_relation_r3
+         |                              WHERE   ds < '$calc_ds'
+         |                          )
+         |                  WHERE   num = 1
+         |              ) b
+         |      ON      a.old_id = b.judicase_id
+         |)
+         |WHERE  num2  =  1
          |""".stripMargin)
 
     //分区不存在
     addEmptyPartitionOrSkip(ads_judicial_case_id_mapping_r1_deleted, calc_ds)
-    addEmptyPartitionOrSkip(ads_judicial_case_id_mapping_r2_deleted, calc_ds)
+    addEmptyPartitionOrSkip(ads_judicial_case_id_mapping_r3_deleted, calc_ds)
 
   }
 
@@ -435,21 +604,17 @@ case class JudicialCaseRelationAggs(s: SparkSession, project: String, args_case:
          |                    ,data
          |                    ,row_id
          |            FROM    (
-         |                        SELECT  *, md5(concat_ws('',detail_id,tn)) row_id
+         |                        SELECT  *, concat_ws('',detail_id,tn) row_id
          |                                ,ROW_NUMBER() OVER (PARTITION BY detail_id,tn ORDER BY ds DESC) num
          |                        FROM    $ads_judicial_case_relation_pre
-         |                        WHERE   ds > ${calc_last_ds(ads_judicial_case_relation_id)} AND  case_no_trim(case_no) is not null AND  date is not null
+         |                        WHERE   ds > 0 AND  case_no_trim(case_no) is not null AND  date is not null
          |                    )
          |            WHERE   num = 1
          |        ) a
          |JOIN    (
-         |           SELECT id, row_id
-         |           FROM (
-         |                    SELECT  id, md5(concat_ws('',rowkey,tn)) row_id
-         |                           ,ROW_NUMBER() OVER (PARTITION BY rowkey,tn ORDER BY ds DESC) num2
-         |                    FROM    $ads_judicial_case_id_mapping
-         |                    WHERE   ds > 0
-         |                ) WHERE num2 = 1
+         |           SELECT  id, concat_ws('',rowkey,tn) row_id
+         |           FROM    $ads_judicial_case_id_mapping
+         |           WHERE   ds = '$calc_ds'
          |        ) b
          |ON      a.row_id = b.row_id
          |""".stripMargin)
@@ -512,7 +677,6 @@ case class JudicialCaseRelationAggs(s: SparkSession, project: String, args_case:
          |) x
          |""".stripMargin).show(10, false)
 
-
     //司法案件主表
         sql(
           s"""