xufei 3 years ago
parent
commit
5a028eefea

+ 52 - 83
src/main/scala/com/winhc/bigdata/spark/ng/judicial/JudicialCaseRelationAggsV2.scala

@@ -2,7 +2,7 @@ package com.winhc.bigdata.spark.ng.judicial
 
 import com.winhc.bigdata.spark.udf._
 import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
-import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import com.winhc.bigdata.spark.utils.{AsyncExtract, BaseUtil, LoggingUtils, SparkUtils}
 import org.apache.commons.lang3.StringUtils
 import org.apache.spark.sql.SparkSession
 
@@ -21,7 +21,7 @@ case class args_case_v2(tableName: String = ""
 
 object args_case_v2 {
   val tn_mapping = Map[String, String](
-    "company_lawsuit" -> "0"
+    "wenshu_detail_v2" -> "0"
     , "company_court_open_announcement" -> "1"
     , "company_court_announcement" -> "2"
     , "company_dishonest_info" -> "3"
@@ -37,9 +37,9 @@ object args_case_v2 {
       , cols_map = Map[String, String]("flag" -> "0", "case_stage" -> "case_stage(case_no)"
         , "yg_name" -> "plaintiff_info", "bg_name" -> "defendant_info", "date" -> "judge_date"
         , "detail_id" -> "rowkey", "case_amt" -> "case_amt", "judge_amt" -> "judge_amt", "exec_amt" -> "null"
-        , "data" -> "map('date',judge_date,'party_title',party_title)"
+        , "data" -> "map('date',judge_date,'party_title',party_title,'case_end',case_end(judge_result) )"
         , "all_name" -> "litigant_info"
-        , "detail_info" -> "to_json(named_struct('flag', '0', 'date',judge_date, 'detail_id', rowkey, 'doc_type', doc_type, 'judge_result', judge_result))"
+        , "detail_info" -> "to_json(named_struct('flag', '0', 'date', cast(judge_date as string), 'detail_id', rowkey, 'doc_type', doc_type, 'judge_result', judge_result))"
       ))
     //开庭公告
     , args_case_v2(tableName = "company_court_open_announcement"
@@ -49,7 +49,7 @@ object args_case_v2 {
         , "detail_id" -> "rowkey", "case_amt" -> "null", "judge_amt" -> "null", "exec_amt" -> "null"
         , "data" -> "map('date',start_date)"
         , "all_name" -> "litigant_info"
-        , "detail_info" -> "to_json(named_struct('flag', '1', 'date',start_date, 'detail_id', rowkey, 'court', court, 'court_room',court_room))"
+        , "detail_info" -> "to_json(named_struct('flag', '1', 'date', cast(start_date as string), 'detail_id', rowkey, 'court', court, 'court_room',court_room))"
       ))
     //法院公告
     , args_case_v2(tableName = "company_court_announcement"
@@ -67,9 +67,9 @@ object args_case_v2 {
         , "case_stage" -> "case_stage(case_no)", "court_name" -> "court", "case_reason" -> "null"
         , "yg_name" -> "null", "bg_name" -> " to_json(array(named_struct('litigant_id',COALESCE(keyno,''),'name',name)))", "date" -> "pub_date"
         , "detail_id" -> "rowkey", "case_amt" -> "null", "judge_amt" -> "null", "exec_amt" -> "null"
-        , "data" -> "map('date',reg_time)"
+        , "data" -> "map('date',reg_time,'case_end',if(deleted = 1,'1',null) )"
         , "all_name" -> "null"
-        , "detail_info" -> "to_json(named_struct('flag', '3', 'date', pub_date, 'detail_id', rowkey, 'name', array(named_struct('litigant_id',COALESCE(keyno,''),'name',name)), 'performance',  performance, 'action_content', action_content ))"
+        , "detail_info" -> "to_json(named_struct('flag', '3', 'date', cast(pub_date as string), 'detail_id', rowkey, 'name', array(named_struct('litigant_id',COALESCE(keyno,''),'name',name)), 'performance',  performance, 'action_content', action_content ))"
       ))
     //送达公告
     , args_case_v2(tableName = "company_send_announcement"
@@ -79,17 +79,17 @@ object args_case_v2 {
         , "detail_id" -> "rowkey", "case_amt" -> "null", "judge_amt" -> "null", "exec_amt" -> "null"
         , "data" -> "map('date',start_date)"
         , "all_name" -> "litigant_info"
-        , "detail_info" -> "to_json(named_struct('flag', '4', 'date',start_date, 'detail_id', rowkey, 'defendant_info', json_array(defendant_info), 'plaintiff_info', json_array(plaintiff_info)))"
+        , "detail_info" -> "to_json(named_struct('flag', '4', 'date', cast(start_date as string), 'detail_id', rowkey, 'defendant_info', json_array(defendant_info), 'plaintiff_info', json_array(plaintiff_info)))"
       ))
-    //限高
+    //限高 //todo 有公司取公司,其次取人
     , args_case_v2(tableName = "company_zxr_restrict"
       , cols_map = Map[String, String]("flag" -> "5", "title" -> "null", "case_type" -> "case_type(case_no)"
         , "case_stage" -> "case_stage(case_no)", "court_name" -> "court_name", "case_reason" -> "null", "yg_name" -> "null"
-        , "bg_name" -> "to_json(array(named_struct('litigant_id',COALESCE(company_id,'') ,'name',COALESCE(company_name,''))  ,named_struct('litigant_id',COALESCE(pid,''),'name',COALESCE(person_name,''))  ))"
+        , "bg_name" -> "to_json(array(named_struct('litigant_id',if(length(company_name)  = 0 or company_name is NULL ,pid,company_id) ,'name', if(length(company_name)  = 0 or company_name is NULL ,person_name,company_name) )  ))"
         , "date" -> "case_create_time", "detail_id" -> "rowkey", "case_amt" -> "null", "judge_amt" -> "null", "exec_amt" -> "null"
-        , "data" -> "map('date',case_create_time)"
+        , "data" -> "map('date',case_create_time,'case_end',if(deleted = 1,'1',null) )"
         , "all_name" -> "null"
-        , "detail_info" -> "to_json(named_struct('flag', '5', 'date', case_create_time, 'detail_id', rowkey, 'person', array(named_struct('litigant_id',COALESCE(pid,''),'person_name',person_name)), 'company', array(named_struct('litigant_id', company_id, 'company_name',company_name)) ))"
+        , "detail_info" -> "to_json(named_struct('flag', '5', 'date', cast(case_create_time as string), 'detail_id', rowkey, 'person', array(named_struct('litigant_id',COALESCE(pid,''),'person_name',person_name)), 'company', array(named_struct('litigant_id', company_id, 'company_name',company_name)),'applicant_info', json_array(applicant_info) ))"
       ))
     //终本
     , args_case_v2(tableName = "company_zxr_final_case"
@@ -98,9 +98,9 @@ object args_case_v2 {
         , "yg_name" -> "null", "bg_name" -> "to_json(array(named_struct('litigant_id',COALESCE(keyno,''),'name',name)))"
         , "date" -> "case_create_time"
         , "detail_id" -> "rowkey", "case_amt" -> "null", "judge_amt" -> "null", "exec_amt" -> "null"
-        , "data" -> "map('date',case_create_time)"
+        , "data" -> "map('date',case_create_time,'case_end',if(deleted = 0,'1',null) )"
         , "all_name" -> "null"
-        , "detail_info" -> "to_json(named_struct('flag', '6', 'date', case_create_time, 'detail_id', rowkey, 'name', array(named_struct('litigant_id',COALESCE(keyno,''), 'name',name)), 'exec_amount', amt_div(exec_amount, 10000), 'no_exec_amount', amt_div(no_exec_amount, 10000) ))"
+        , "detail_info" -> "to_json(named_struct('flag', '6', 'date', cast(case_create_time as string), 'detail_id', rowkey, 'name', array(named_struct('litigant_id',COALESCE(keyno,''), 'name',name)), 'exec_amount', amt_div(exec_amount, 10000), 'no_exec_amount', amt_div(no_exec_amount, 10000) ))"
       ))
     //被执
     , args_case_v2(tableName = "company_zxr"
@@ -110,7 +110,7 @@ object args_case_v2 {
         , "detail_id" -> "rowkey", "case_amt" -> "null", "judge_amt" -> "null", "exec_amt" -> "amt_div(exec_money,10000)"
         , "data" -> "map('date', case_create_time, 'exec_info', to_json(array(named_struct('litigant_id',COALESCE(keyno,''),'name',name,'exec_money',amt_div(exec_money,10000),'date',case_create_time  ))) )"
         , "all_name" -> "null"
-        , "detail_info" -> "to_json(named_struct('flag', '7', 'date', case_create_time, 'detail_id', rowkey, 'name', array(named_struct('litigant_id',COALESCE(keyno,''),'name',name)), 'exec_money', amt_div(exec_money,10000) ))"
+        , "detail_info" -> "to_json(named_struct('flag', '7', 'date', cast(case_create_time as string), 'detail_id', rowkey, 'name', array(named_struct('litigant_id',COALESCE(keyno,''),'name',name)), 'exec_money', amt_div(exec_money,10000) ))"
       ))
     //立案信息
     , args_case_v2(tableName = "company_court_register"
@@ -120,7 +120,7 @@ object args_case_v2 {
         , "detail_id" -> "rowkey", "case_amt" -> "null", "judge_amt" -> "null", "exec_amt" -> "null"
         , "data" -> "map('date',filing_date)"
         , "all_name" -> "litigant_info"
-        , "detail_info" -> "to_json(named_struct('flag', '8', 'date',filing_date, 'detail_id', rowkey, 'court', court, 'judge', judge))"
+        , "detail_info" -> "to_json(named_struct('flag', '8', 'date', cast(filing_date as string), 'detail_id', rowkey, 'court', court, 'judge', judge))"
       ))
   )
 
@@ -168,13 +168,31 @@ object JudicialCaseRelationAggsV2 {
       tn = "wenshu_detail_v2"
     }
     if ("all".equals(tn)) {
-      args_case_v2.tab_args.map(_.tableName).foreach(t => {
-        run(project, t, c, spark)
+      //      args_case_v2.tab_args.map(_.tableName).foreach(t => {
+      //        run(project, t, c, spark)
+      //      })
+      val seq = args_case_v2.tab_args.map(x => {
+        (x.tableName, () => {
+          run(project, x.tableName, c, spark)
+          true
+        })
       })
+      AsyncExtract.startAndWait(spark, seq)
+    } else if (tn.contains(",")) {
+      val arr = tn.split(",", -1)
+      //      args_case_v2.tab_args.map(_.tableName).filter(arr.contains(_)).foreach(t => {
+      //        run(project, t, c, spark)
+      //      })
+      val seq = args_case_v2.tab_args.map(_.tableName).filter(arr.contains(_)).map(x => {
+        (x, () => {
+          run(project, x, c, spark)
+          true
+        })
+      })
+      AsyncExtract.startAndWait(spark, seq)
     } else {
       run(project, tn, c, spark)
     }
-
     spark.stop()
   }
 
@@ -201,9 +219,9 @@ case class JudicialCaseRelationAggsV2(s: SparkSession, project: String, args_cas
   //替换id表
   val ads_judicial_case_relation_id = s" $project.ads_judicial_case_relation_id_v2"
   //id映射表
-  val ads_judicial_case_id_mapping = s" $project.ads_judicial_case_id_mapping"
+  val ads_judicial_case_id_mapping = s" $project.ads_judicial_case_id_mapping_v2"
   //id映射表(原始表)
-  val ods_judicial_case_id_mapping = s" $project.ods_judicial_case_id_mapping"
+  val ods_judicial_case_id_mapping = s" $project.ods_judicial_case_id_mapping_v2"
   //主表
   val ads_judicial_case_relation_r1 = s" $project.ads_judicial_case_relation_r1_v2"
   //  //明细表
@@ -211,19 +229,9 @@ case class JudicialCaseRelationAggsV2(s: SparkSession, project: String, args_cas
   //明细表(增强)
   val ads_judicial_case_relation_r3 = s" $project.ads_judicial_case_relation_r3_v2"
   //案件移除表
-  val ads_judicial_case_id_mapping_r1_deleted = s" $project.ads_judicial_case_id_mapping_r1_deleted"
+  val ads_judicial_case_id_mapping_r1_deleted = s" $project.ads_judicial_case_id_mapping_r1_deleted_v2"
   //案件移除表
-  val ads_judicial_case_id_mapping_r3_deleted = s" $project.ads_judicial_case_id_mapping_r3_deleted"
-  //案件关系表
-  val bds_judicial_case_relation = s" $project.bds_judicial_case_relation"
-  val ads_judicial_case_node_kafka = s" $project.ads_judicial_case_node_kafka"
-  val ads_judicial_case_relation_kafka = s" $project.ads_judicial_case_relation_kafka"
-
-  val ads_judicial_case_node = s" $project.ads_judicial_case_node"
-  val ads_judicial_case_relation = s" $project.ads_judicial_case_relation"
-
-  //黑名单表
-  val ads_case_id_big = s"winhc_ng.ads_case_id_big"
+  val ads_judicial_case_id_mapping_r3_deleted = s" $project.ads_judicial_case_id_mapping_r3_deleted_v2"
 
   val update = s"update"
   val incr = s"incr"
@@ -389,50 +397,6 @@ case class JudicialCaseRelationAggsV2(s: SparkSession, project: String, args_cas
     spark.udf.register("all_name_plus_v2", new AllNamePlusV2(1000))
     spark.udf.register("case_amt_plus_v2", new CaseAmtAggsPlusV2(1000))
 
-    //detail 文书id
-    //替换司法案件id
-//    sql(
-//      s"""
-//         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_relation_id PARTITION(ds='$calc_ds')
-//         |SELECT  a.id
-//         |        ,b.flag,b.title,b.case_type,b.case_reason,b.case_no,b.court_name,b.case_stage,b.lable,b.detail
-//         |        ,b.yg_name,b.bg_name,b.all_name,b.date,b.detail_id,b.case_amt,b.case_id,b.tn,b.data
-//         |FROM    (
-//         |           SELECT  id, concat_ws('',rowkey,tn) row_id
-//         |           FROM    $ads_judicial_case_id_mapping
-//         |           WHERE   ds = '$calc_ds'
-//         |        ) a
-//         |JOIN    (
-//         |            SELECT   flag
-//         |                    ,title
-//         |                    ,case_type(case_no) case_type
-//         |                    ,adjust_reason(case_reason) case_reason
-//         |                    ,case_no_trim(case_no) as case_no
-//         |                    ,court_name
-//         |                    ,case_stage(case_no) case_stage
-//         |                    ,case_label(flag) lable
-//         |                    ,to_json(named_struct('flag', flag, 'date',date, 'detail_id', detail_id, 'name', json_array(bg_name)) ) detail
-//         |                    ,yg_name
-//         |                    ,bg_name
-//         |                    ,merge_json(yg_name, bg_name, all_name) all_name
-//         |                    ,date
-//         |                    ,detail_id
-//         |                    ,case_amt
-//         |                    ,case_id
-//         |                    ,tn
-//         |                    ,data
-//         |                    ,row_id
-//         |            FROM    (
-//         |                        SELECT  *, concat_ws('',detail_id,tn) row_id
-//         |                                ,ROW_NUMBER() OVER (PARTITION BY detail_id,tn ORDER BY ds DESC) num
-//         |                        FROM    $ads_judicial_case_relation_pre
-//         |                        WHERE   ds > 0 AND  case_no_trim(case_no) is not null AND  date is not null
-//         |                    )
-//         |            WHERE   num = 1
-//         |        ) b
-//         |ON      a.row_id = b.row_id
-//         |""".stripMargin).show(20, false)
-
     //明细表
     sql(
       s"""
@@ -456,7 +420,8 @@ case class JudicialCaseRelationAggsV2(s: SparkSession, project: String, args_cas
          |    court_level,
          |    case_amt,
          |    judge_amt,
-         |    exec_info
+         |    exec_info,
+         |    case_end
          |FROM
          |(
          |SELECT  md5(concat_ws('',concat_ws('',judicase_id),CLEANUP(case_no))) id
@@ -467,15 +432,16 @@ case class JudicialCaseRelationAggsV2(s: SparkSession, project: String, args_cas
          |        ,case_no
          |        ,concat_ws(',',collect_set(court_name)) court_name
          |        ,case_stage(max(case_no)) as case_stage
-         |        ,trim_black(concat_ws(',',max(case_type),collect_set(lable))) lable
+         |        ,trim_black(concat_ws(',', collect_set(lable))) lable
          |        ,concat('[',concat_ws(',',collect_set(detail)),']') detail
          |        ,max(date) last_date
-         |        ,name_aggs(yg_name,bg_name,flag,data['date']) name_aggs
+         |        ,name_aggs(yg_name,bg_name,flag,data['date'],detail_id) name_aggs
          |        ,all_name_plus_v2(all_name) all_name
          |        ,trim_black(concat_ws(',',collect_set(court_level))) court_level
          |        ,max(case_amt) as case_amt
          |        ,max(judge_amt) as judge_amt
          |        ,case_amt_plus_v2(data['exec_info']) as exec_info
+         |        ,max(case_end) as case_end
          |FROM    (
          |        SELECT  a.*,court_level(court_name) court_level
          |        FROM    (
@@ -498,6 +464,7 @@ case class JudicialCaseRelationAggsV2(s: SparkSession, project: String, args_cas
          |                           ,judge_amt
          |                           ,tn
          |                           ,data
+         |                           ,case_end
          |                   FROM    $ads_judicial_case_relation_id
          |                   WHERE   ds = '$calc_ds' AND length(case_label(flag)) > 0 AND  case_no_trim(case_no) is not null AND  date is not null
          |                )a
@@ -528,7 +495,8 @@ case class JudicialCaseRelationAggsV2(s: SparkSession, project: String, args_cas
          |    exec_info    ,
          |    date         ,
          |    court_level  ,
-         |    0 deleted
+         |    0 deleted    ,
+         |    case_end
          |FROM
          |(
          |SELECT  judicase_id
@@ -542,15 +510,16 @@ case class JudicialCaseRelationAggsV2(s: SparkSession, project: String, args_cas
          |        -- ,max(first_case_amt) case_amt
          |        ,max(date) AS date
          |        ,trim_black(concat_ws(',',collect_set(court_level))) court_level
-         |        ,name_aggs(yg_name,bg_name,'0',date) name_aggs
+         |        ,name_aggs(yg_name,bg_name,'0',date,'0') name_aggs
          |        ,all_name_plus_v2(all_name) all_name
          |        ,amt_merge(concat_ws('&',collect_set(case_info))) case_info
          |        ,amt_merge(concat_ws('&',collect_set(judge_info))) judge_info
          |        ,case_amt_plus_v2(exec_info) as exec_info
+         |        ,max(case_end) as case_end
          |FROM    (
          |        SELECT  a.*
          |        FROM    (
-         |                   SELECT  judicase_id,title,case_type,case_reason,case_no,court_name,case_stage,lable,yg_name,bg_name,all_name,date,case_amt,judge_amt,exec_info
+         |                   SELECT  judicase_id,title,case_type,case_reason,case_no,court_name,case_stage,lable,yg_name,bg_name,all_name,date,case_amt,judge_amt,exec_info,case_end
          |                   ,court_level(court_name) court_level
          |                   ,concat_ws('|',case_stage,coalesce(case_amt,0))  as case_info
          |                   ,concat_ws('|',case_stage,coalesce(judge_amt,0)) as judge_info

+ 3 - 1
src/main/scala/com/winhc/bigdata/spark/udf/CompanyMapping.scala

@@ -113,7 +113,9 @@ trait CompanyMapping {
     spark.udf.register("amt_merge", (info:String) => {
       amt_merge(info)
     })
-
+    spark.udf.register("case_end", (info:String) => {
+      case_end(info)
+    })
 
   }
 

+ 30 - 13
src/main/scala/com/winhc/bigdata/spark/udf/NameAggsPlusV2.scala

@@ -1,14 +1,12 @@
 package com.winhc.bigdata.spark.udf
 
 import com.winhc.bigdata.spark.utils.BaseUtil
+import com.winhc.bigdata.spark.utils.BaseUtil.{json_array, list_json}
 import org.apache.commons.lang3.StringUtils
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
 import org.apache.spark.sql.types._
 
-import scala.collection.JavaConverters._
-import scala.collection.immutable
-
 /**
  * @Description:优先取文书数据
  * @author π
@@ -21,11 +19,14 @@ class NameAggsPlusV2(max: Int) extends UserDefinedAggregateFunction {
   val split = "\u0001"
   val empty_col = "[]"
 
+  val contain_flags = Seq("3", "5", "6", "7")
+
   override def inputSchema: StructType = StructType(Array[StructField](
     StructField("yg_name", DataTypes.StringType)
     , StructField("bg_name", DataTypes.StringType)
     , StructField("flag", DataTypes.StringType)
     , StructField("bus_date", DataTypes.StringType)
+    , StructField("detail_id", DataTypes.StringType)
   ))
 
   override def bufferSchema: StructType = StructType(
@@ -46,11 +47,12 @@ class NameAggsPlusV2(max: Int) extends UserDefinedAggregateFunction {
     if (buffer.size >= max) {
       return
     }
-    if (input.size != 4) return
+    if (input.size != 5) return
     val yg_name = input.getAs[String](0)
     val bg_name = input.getAs[String](1)
     val flag = input.getAs[String](2)
     val bus_date = input.getAs[String](3)
+    val detail_id = input.getAs[String](4)
     if (StringUtils.isBlank(bus_date)) {
       return
     }
@@ -66,7 +68,7 @@ class NameAggsPlusV2(max: Int) extends UserDefinedAggregateFunction {
     }
     val map0 = buffer.getMap[String, String](0).toMap
     var map_new0 = scala.collection.mutable.Map[String, String](map0.toSeq: _*)
-    map_new0 ++= Map(s"$bus_date$split$flag" -> s"$yg_name1$split$bg_name1$split$flag")
+    map_new0 ++= Map(s"$bus_date$split$flag$split$detail_id" -> s"$yg_name1$split$bg_name1$split$flag")
     buffer.update(0, map_new0)
   }
 
@@ -78,11 +80,26 @@ class NameAggsPlusV2(max: Int) extends UserDefinedAggregateFunction {
     var yg_name = empty_col
     var bg_name = empty_col
     val m0: Map[String, String] = buffer.getAs[Map[String, String]](0)
-    println(s"m0: $m0")
+    //println(s"m0: $m0")
     var has = false
+    var merge = true
     if (m0.isEmpty) {
       return Map("yg_name" -> yg_name, "bg_name" -> bg_name)
     } else if (m0.nonEmpty) {
+      //只包含(限高,被执,终本,失信,被告聚合)
+      val tmp0 = m0.map(x => {
+        val Array(yg_name1, bg_name1, flag) = x._2.split(s"$split", -1)
+        if (!contain_flags.contains(flag)) {
+          merge = false
+        }
+        (yg_name1, bg_name1, flag)
+      })
+      //println(s"merge: $merge")
+      if (merge) {
+        //val ygs = tmp0.map(x=> json_array(x._1)).reduce(_ ++ _)
+        val bgs = tmp0.map(x => json_array(x._2)).reduce(_ ++ _)
+        return Map("yg_name" -> yg_name, "bg_name" -> list_json(bgs))
+      }
       val tmp: List[(String, String, String, String, String)] = m0.map(x => {
         val Array(yg_name1, bg_name1, flag) = x._2.split(s"$split", -1)
         if (flag.equals("0")) {
@@ -95,14 +112,14 @@ class NameAggsPlusV2(max: Int) extends UserDefinedAggregateFunction {
         (x._1, yg_name1, bg_name1, flag, weight)
       }).
         filter(x => {
-        if (has) {
-          x._4.equals("0")
-        } else {
-          !has
-        }
-      }).
+          if (has) {
+            x._4.equals("0")
+          } else {
+            !has
+          }
+        }).
         toList.sortBy(x => (x._5, x._1))(Ordering.Tuple2(Ordering.String.reverse, Ordering.String))
-      println(s"tmp: $tmp")
+      //println(s"tmp: $tmp")
       val (_, yg_name2, bg_name2, _, _) = tmp.head
       yg_name = yg_name2
       bg_name = bg_name2

File diff suppressed because it is too large
+ 14 - 4
src/main/scala/com/winhc/bigdata/spark/utils/BaseUtil.scala