소스 검색

聚合逻辑修改

xufei 3 년 전
부모
커밋
e55314a9bf

+ 587 - 0
src/main/scala/com/winhc/bigdata/spark/ng/judicial/JudicialCaseRelationAggsV2.scala

@@ -0,0 +1,587 @@
+package com.winhc.bigdata.spark.ng.judicial
+
+import com.winhc.bigdata.spark.udf._
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.mutable
+
+/**
+ * @Description:司法案件新版本聚合(迭代)
+ * @author π
+ * @date 2021/8/12 16:46
+ */
+
+case class args_case_v2(tableName: String = ""
+                        , rowkey: String = "rowkey"
+                        , cols_map: Map[String, String] = Map.empty
+                       )
+
+object args_case_v2 {
+  val tn_mapping = Map[String, String](
+    "company_lawsuit" -> "0"
+    , "company_court_open_announcement" -> "1"
+    , "company_court_announcement" -> "2"
+    , "company_dishonest_info" -> "3"
+    , "company_send_announcement" -> "4"
+    , "company_zxr_restrict" -> "5"
+    , "company_zxr_final_case" -> "6"
+    , "company_zxr" -> "7"
+    , "company_court_register" -> "8"
+  )
+  val tab_args = Seq(
+    //文书(金额万元)
+    args_case_v2(tableName = "wenshu_detail_v2"
+      , cols_map = Map[String, String]("flag" -> "0", "case_stage" -> "case_stage(case_no)"
+        , "yg_name" -> "plaintiff_info", "bg_name" -> "defendant_info", "date" -> "judge_date"
+        , "detail_id" -> "rowkey", "case_amt" -> "case_amt", "judge_amt" -> "judge_amt", "exec_amt" -> "null"
+        , "data" -> "map('date',judge_date,'party_title',party_title)"
+        , "all_name" -> "litigant_info"
+        , "detail_info" -> "to_json(named_struct('flag', '0', 'date',judge_date, 'detail_id', rowkey, 'doc_type', doc_type, 'judge_result', judge_result))"
+      ))
+    //开庭公告
+    , args_case_v2(tableName = "company_court_open_announcement"
+      , cols_map = Map[String, String]("flag" -> "1", "title" -> "null", "case_type" -> "case_type(case_no)"
+        , "case_stage" -> "case_stage(case_no)", "court_name" -> "court", "case_reason" -> "case_reason"
+        , "yg_name" -> "plaintiff_info", "bg_name" -> "defendant_info", "date" -> "start_date"
+        , "detail_id" -> "rowkey", "case_amt" -> "null", "judge_amt" -> "null", "exec_amt" -> "null"
+        , "data" -> "map('date',start_date)"
+        , "all_name" -> "litigant_info"
+        , "detail_info" -> "to_json(named_struct('flag', '1', 'date',start_date, 'detail_id', rowkey, 'court', court, 'court_room',court_room))"
+      ))
+    //法院公告
+    , args_case_v2(tableName = "company_court_announcement"
+      , cols_map = Map[String, String]("flag" -> "2", "title" -> "null", "case_type" -> "case_type(case_no)"
+        , "case_stage" -> "case_stage(case_no)", "court_name" -> "court_name", "case_reason" -> "null"
+        , "yg_name" -> "plaintiff_info", "bg_name" -> "litigant_info", "date" -> "concat_ws(' ',publish_date,'00:00:00')"
+        , "detail_id" -> "rowkey", "case_amt" -> "null", "judge_amt" -> "null", "exec_amt" -> "null"
+        , "data" -> "map('date',concat_ws(' ',publish_date,'00:00:00'))"
+        , "all_name" -> "null"
+        , "detail_info" -> "to_json(named_struct('flag', '2', 'date',concat_ws(' ',publish_date,'00:00:00'), 'detail_id', rowkey, 'announcement_type', announcement_type, 'court_name', court_name))"
+      ))
+    //失信人
+    , args_case_v2(tableName = "company_dishonest_info"
+      , cols_map = Map[String, String]("flag" -> "3", "title" -> "null", "case_type" -> "case_type(case_no)"
+        , "case_stage" -> "case_stage(case_no)", "court_name" -> "court", "case_reason" -> "null"
+        , "yg_name" -> "null", "bg_name" -> " to_json(array(named_struct('litigant_id',COALESCE(keyno,''),'name',name)))", "date" -> "pub_date"
+        , "detail_id" -> "rowkey", "case_amt" -> "null", "judge_amt" -> "null", "exec_amt" -> "null"
+        , "data" -> "map('date',reg_time)"
+        , "all_name" -> "null"
+        , "detail_info" -> "to_json(named_struct('flag', '3', 'date', pub_date, 'detail_id', rowkey, 'name', array(named_struct('litigant_id',COALESCE(keyno,''),'name',name)), 'performance',  performance, 'action_content', action_content ))"
+      ))
+    //送达公告
+    , args_case_v2(tableName = "company_send_announcement"
+      , cols_map = Map[String, String]("flag" -> "4", "title" -> "null", "case_type" -> "case_type(case_no)"
+        , "case_stage" -> "case_stage(case_no)", "court_name" -> "court", "case_reason" -> "case_reason"
+        , "yg_name" -> "plaintiff_info", "bg_name" -> "defendant_info", "date" -> "start_date"
+        , "detail_id" -> "rowkey", "case_amt" -> "null", "judge_amt" -> "null", "exec_amt" -> "null"
+        , "data" -> "map('date',start_date)"
+        , "all_name" -> "litigant_info"
+        , "detail_info" -> "to_json(named_struct('flag', '4', 'date',start_date, 'detail_id', rowkey, 'defendant_info', json_array(defendant_info), 'plaintiff_info', json_array(plaintiff_info)))"
+      ))
+    //限高
+    , args_case_v2(tableName = "company_zxr_restrict"
+      , cols_map = Map[String, String]("flag" -> "5", "title" -> "null", "case_type" -> "case_type(case_no)"
+        , "case_stage" -> "case_stage(case_no)", "court_name" -> "court_name", "case_reason" -> "null", "yg_name" -> "null"
+        , "bg_name" -> "to_json(array(named_struct('litigant_id',COALESCE(company_id,'') ,'name',COALESCE(company_name,''))  ,named_struct('litigant_id',COALESCE(pid,''),'name',COALESCE(person_name,''))  ))"
+        , "date" -> "case_create_time", "detail_id" -> "rowkey", "case_amt" -> "null", "judge_amt" -> "null", "exec_amt" -> "null"
+        , "data" -> "map('date',case_create_time)"
+        , "all_name" -> "null"
+        , "detail_info" -> "to_json(named_struct('flag', '5', 'date', case_create_time, 'detail_id', rowkey, 'person', array(named_struct('litigant_id',COALESCE(pid,''),'person_name',person_name)), 'company', array(named_struct('litigant_id', company_id, 'company_name',company_name)) ))"
+      ))
+    //终本
+    , args_case_v2(tableName = "company_zxr_final_case"
+      , cols_map = Map[String, String]("flag" -> "6", "title" -> "null", "case_type" -> "case_type(case_no)"
+        , "case_stage" -> "case_stage(case_no)", "court_name" -> "court_name", "case_reason" -> "null"
+        , "yg_name" -> "null", "bg_name" -> "to_json(array(named_struct('litigant_id',COALESCE(keyno,''),'name',name)))"
+        , "date" -> "case_create_time"
+        , "detail_id" -> "rowkey", "case_amt" -> "null", "judge_amt" -> "null", "exec_amt" -> "null"
+        , "data" -> "map('date',case_create_time)"
+        , "all_name" -> "null"
+        , "detail_info" -> "to_json(named_struct('flag', '6', 'date', case_create_time, 'detail_id', rowkey, 'name', array(named_struct('litigant_id',COALESCE(keyno,''), 'name',name)), 'exec_amount', amt_div(exec_amount, 10000), 'no_exec_amount', amt_div(no_exec_amount, 10000) ))"
+      ))
+    //被执
+    , args_case_v2(tableName = "company_zxr"
+      , cols_map = Map[String, String]("flag" -> "7", "title" -> "null", "case_type" -> "case_type(case_no)"
+        , "case_stage" -> "case_stage(case_no)", "court_name" -> "court", "case_reason" -> "null"
+        , "yg_name" -> "null", "bg_name" -> "to_json(array(named_struct('litigant_id',COALESCE(keyno,''),'name',name)))", "date" -> "case_create_time"
+        , "detail_id" -> "rowkey", "case_amt" -> "null", "judge_amt" -> "null", "exec_amt" -> "amt_div(exec_money,10000)"
+        , "data" -> "map('date', case_create_time, 'exec_info', to_json(array(named_struct('litigant_id',COALESCE(keyno,''),'name',name,'exec_money',amt_div(exec_money,10000),'date',case_create_time  ))) )"
+        , "all_name" -> "null"
+        , "detail_info" -> "to_json(named_struct('flag', '7', 'date', case_create_time, 'detail_id', rowkey, 'name', array(named_struct('litigant_id',COALESCE(keyno,''),'name',name)), 'exec_money', amt_div(exec_money,10000) ))"
+      ))
+    //立案信息
+    , args_case_v2(tableName = "company_court_register"
+      , cols_map = Map[String, String]("flag" -> "8", "title" -> "null", "case_type" -> "case_type(case_no)"
+        , "case_stage" -> "case_stage(case_no)", "court_name" -> "court", "case_reason" -> "case_reason"
+        , "yg_name" -> "plaintiff_info", "bg_name" -> "defendant_info", "date" -> "filing_date"
+        , "detail_id" -> "rowkey", "case_amt" -> "null", "judge_amt" -> "null", "exec_amt" -> "null"
+        , "data" -> "map('date',filing_date)"
+        , "all_name" -> "litigant_info"
+        , "detail_info" -> "to_json(named_struct('flag', '8', 'date',filing_date, 'detail_id', rowkey, 'court', court, 'judge', judge))"
+      ))
+  )
+
+  def get_job_args(tn: String): args_case_v2 = {
+    tab_args.find(p => tn.equals(p.tableName)).getOrElse(throw new NullPointerException("tn is not fount"))
+  }
+
+  def get_job_args(): args_case_v2 = {
+    args_case_v2()
+  }
+}
+
+
+object JudicialCaseRelationAggsV2 {
+  def main(args: Array[String]): Unit = {
+    var project = ""
+    var tn = ""
+    var c = ""
+    if (args.length == 3) {
+      val Array(p1, p2, p3) = args
+      project = p1
+      tn = p2
+      c = p3
+    } else if (args.length == 2) {
+      val Array(p1, p2) = args
+      project = p1
+      c = p2
+    } else {
+      println("please check project tn c!")
+      sys.exit(-1)
+    }
+    println(
+      s"""
+         |project: $project
+         |tn: $tn
+         |c: $c
+         |""".stripMargin)
+
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> s"$project",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "10000"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    if (StringUtils.isBlank(tn)) {
+      tn = "wenshu_detail_v2"
+    }
+    if ("all".equals(tn)) {
+      args_case_v2.tab_args.map(_.tableName).foreach(t => {
+        run(project, t, c, spark)
+      })
+    } else {
+      run(project, tn, c, spark)
+    }
+
+    spark.stop()
+  }
+
+  private def run(project: String, tn: String, c: String, spark: SparkSession) = {
+    val r = JudicialCaseRelationAggsV2(spark, project, args_case_v2.get_job_args(tn))
+    c match {
+      case "pre_calc" => r.pre_calc()
+      case "calc_mapping" => r.calc_mapping()
+      case "calc" => r.calc()
+      case _ => {
+        println("not fun to run !")
+        sys.exit(-1)
+      }
+    }
+  }
+}
+
+case class JudicialCaseRelationAggsV2(s: SparkSession, project: String, args_case_v2: args_case_v2
+                                     ) extends LoggingUtils with CompanyMapping with BaseFunc with CourtRank {
+  override protected val spark: SparkSession = s
+
+  //预处理表
+  val ads_judicial_case_relation_pre = s" $project.ads_judicial_case_relation_pre_v2"
+  //替换id表
+  val ads_judicial_case_relation_id = s" $project.ads_judicial_case_relation_id_v2"
+  //id映射表
+  val ads_judicial_case_id_mapping = s" $project.ads_judicial_case_id_mapping"
+  //id映射表(原始表)
+  val ods_judicial_case_id_mapping = s" $project.ods_judicial_case_id_mapping"
+  //主表
+  val ads_judicial_case_relation_r1 = s" $project.ads_judicial_case_relation_r1_v2"
+  //  //明细表
+  //  val ads_judicial_case_relation_r2 = s" $project.ads_judicial_case_relation_r2"
+  //明细表(增强)
+  val ads_judicial_case_relation_r3 = s" $project.ads_judicial_case_relation_r3_v2"
+  //案件移除表
+  val ads_judicial_case_id_mapping_r1_deleted = s" $project.ads_judicial_case_id_mapping_r1_deleted"
+  //案件移除表
+  val ads_judicial_case_id_mapping_r3_deleted = s" $project.ads_judicial_case_id_mapping_r3_deleted"
+  //案件关系表
+  val bds_judicial_case_relation = s" $project.bds_judicial_case_relation"
+  val ads_judicial_case_node_kafka = s" $project.ads_judicial_case_node_kafka"
+  val ads_judicial_case_relation_kafka = s" $project.ads_judicial_case_relation_kafka"
+
+  val ads_judicial_case_node = s" $project.ads_judicial_case_node"
+  val ads_judicial_case_relation = s" $project.ads_judicial_case_relation"
+
+  //黑名单表
+  val ads_case_id_big = s"winhc_ng.ads_case_id_big"
+
+  val update = s"update"
+  val incr = s"incr"
+
+  private val cols_map: Map[String, String] = args_case_v2.cols_map
+  private val rowkey: String = args_case_v2.rowkey
+  private val tableName: String = args_case_v2.tableName
+
+  val ads_table = s" $project.ads_$tableName"
+  val inc_ads_table = s" $project.inc_ads_$tableName"
+
+
+  val pre_cols = getColumns(ads_judicial_case_relation_pre).diff(Seq("ds", "tn"))
+  var last_ds = BaseUtil.getPartion(ads_judicial_case_relation_pre, tableName, spark)
+  val calc_ds = BaseUtil.getYesterday()
+
+  if (calc_ds.equals(last_ds)) {
+    last_ds = BaseUtil.getSecondPartion(ads_judicial_case_relation_pre, tableName, spark)
+  }
+  val is_incr = if (StringUtils.isBlank(last_ds)) false else true
+
+  val cols = pre_cols.map(c => {
+    if (cols_map.contains(c)) {
+      s"${cols_map(c)} as $c"
+    } else c
+  })
+
+  case_no_trim_udf_v2()
+  prepareFunctions(spark)
+
+  val sort = get_partition_order_by()
+
+  def pre_calc(): Unit = {
+    var all_sql = ""
+    if (!is_incr) {
+      all_sql =
+        s"""
+           |SELECT  *
+           |FROM    $ads_table
+           |WHERE   ${if (is_incr) "ds = -1" else "ds > 0"}
+           |UNION ALL
+           |""".stripMargin
+    }
+
+    //裁判文书
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_relation_pre PARTITION(ds='$calc_ds',tn='$tableName')
+         |SELECT
+         |${pre_cols.mkString(",")}
+         |from    (
+         |            SELECT  ${cols.mkString(",")}
+         |                    ,ROW_NUMBER() OVER(PARTITION BY $rowkey ORDER BY $sort) AS num
+         |            from    (
+         |                       $all_sql
+         |                        SELECT  *
+         |                        FROM    $inc_ads_table
+         |                        WHERE    ${if (is_incr) s"ds > $last_ds" else "ds > 0"}
+         |                    )
+         |        )
+         |WHERE   num = 1
+         |${if (isWindows) "LIMIT 1000" else ""}
+         |""".stripMargin).show(100, false)
+
+    //分区不存在,插入空分区
+    addEmptyPartitionOrSkipPlus(ads_judicial_case_relation_pre, calc_ds, tableName)
+  }
+
+  def calc_mapping(): Unit = {
+    //ods 转换 ads
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_id_mapping PARTITION(ds='$calc_ds')
+         |SELECT  id,rowkey,tn
+         |FROM (
+         |     SELECT  component_id id
+         |             ,rowkey
+         |             ,flag_tn(tn) tn
+         |             ,ROW_NUMBER() OVER (PARTITION BY rowkey,flag_tn(tn) ORDER BY ds DESC, update_time DESC) num
+         |     FROM    $ods_judicial_case_id_mapping
+         |     WHERE   ds = '$calc_ds'
+         |)
+         |WHERE num = 1
+         |""".stripMargin)
+
+    //可能有重复 TODO
+    //主表删除
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_id_mapping_r1_deleted PARTITION(ds='$calc_ds')
+         |SELECT new_id, old_id, rowkey, tn, deleted
+         |FROM (
+         |      SELECT
+         |               a.id new_id
+         |              ,b.id old_id
+         |              ,a.rowkey
+         |              ,a.tn
+         |              ,1 AS deleted
+         |              ,ROW_NUMBER() OVER (PARTITION BY b.id ORDER BY ds DESC) num2
+         |      FROM    (
+         |                  SELECT  *
+         |                          ,md5(concat_ws('',rowkey,tn)) row_id
+         |                  FROM    $ads_judicial_case_id_mapping
+         |                  WHERE   ds = '$calc_ds'
+         |              ) a
+         |      JOIN    (
+         |                  SELECT  id, row_id, num
+         |                  FROM    (
+         |                              SELECT  id
+         |                                      ,md5(concat_ws('',rowkey,tn)) row_id
+         |                                      ,ROW_NUMBER() OVER (PARTITION BY rowkey,tn ORDER BY ds DESC) num
+         |                              FROM    $ads_judicial_case_id_mapping
+         |                              WHERE   ds < '$calc_ds'
+         |                          )
+         |                  WHERE   num = 1
+         |              ) b
+         |      ON      a.row_id = b.row_id
+         |      WHERE   a.id <> b.id
+         |)
+         |WHERE  num2  =  1
+         |""".stripMargin)
+
+    //明细表删除
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_id_mapping_r3_deleted PARTITION(ds='$calc_ds')
+         |SELECT id, rowkey, tn, deleted
+         |FROM (
+         |      SELECT  b.id,a.rowkey,a.tn, 1 as deleted
+         |              ,ROW_NUMBER() OVER (PARTITION BY b.id ORDER BY tn DESC) num2
+         |      FROM    (
+         |                  SELECT  rowkey, tn, old_id
+         |                  FROM    $ads_judicial_case_id_mapping_r1_deleted
+         |                  WHERE   ds = '$calc_ds'
+         |              ) a
+         |      JOIN    (
+         |                  SELECT  id, judicase_id
+         |                  FROM    (
+         |                              SELECT  id, judicase_id
+         |                                      ,ROW_NUMBER() OVER (PARTITION BY id ORDER BY ds DESC) num
+         |                              FROM    $ads_judicial_case_relation_r3
+         |                              WHERE   ds < '$calc_ds'
+         |                          )
+         |                  WHERE   num = 1
+         |              ) b
+         |      ON      a.old_id = b.judicase_id
+         |)
+         |WHERE  num2  =  1
+         |""".stripMargin)
+
+    //分区不存在
+    addEmptyPartitionOrSkip(ads_judicial_case_id_mapping_r1_deleted, calc_ds)
+    addEmptyPartitionOrSkip(ads_judicial_case_id_mapping_r3_deleted, calc_ds)
+
+  }
+
+  def calc(): Unit = {
+    prepareFunctions(spark)
+    case_no_trim_udf_v2()
+    registerCourtRank()
+    spark.udf.register("name_aggs", new NameAggsPlusV2(1000))
+    spark.udf.register("case_reason", new CaseReasonAggs(1000))
+    spark.udf.register("all_name_plus_v2", new AllNamePlusV2(1000))
+    spark.udf.register("case_amt_plus_v2", new CaseAmtAggsPlusV2(1000))
+
+    //detail 文书id
+    //替换司法案件id
+//    sql(
+//      s"""
+//         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_relation_id PARTITION(ds='$calc_ds')
+//         |SELECT  a.id
+//         |        ,b.flag,b.title,b.case_type,b.case_reason,b.case_no,b.court_name,b.case_stage,b.lable,b.detail
+//         |        ,b.yg_name,b.bg_name,b.all_name,b.date,b.detail_id,b.case_amt,b.case_id,b.tn,b.data
+//         |FROM    (
+//         |           SELECT  id, concat_ws('',rowkey,tn) row_id
+//         |           FROM    $ads_judicial_case_id_mapping
+//         |           WHERE   ds = '$calc_ds'
+//         |        ) a
+//         |JOIN    (
+//         |            SELECT   flag
+//         |                    ,title
+//         |                    ,case_type(case_no) case_type
+//         |                    ,adjust_reason(case_reason) case_reason
+//         |                    ,case_no_trim(case_no) as case_no
+//         |                    ,court_name
+//         |                    ,case_stage(case_no) case_stage
+//         |                    ,case_label(flag) lable
+//         |                    ,to_json(named_struct('flag', flag, 'date',date, 'detail_id', detail_id, 'name', json_array(bg_name)) ) detail
+//         |                    ,yg_name
+//         |                    ,bg_name
+//         |                    ,merge_json(yg_name, bg_name, all_name) all_name
+//         |                    ,date
+//         |                    ,detail_id
+//         |                    ,case_amt
+//         |                    ,case_id
+//         |                    ,tn
+//         |                    ,data
+//         |                    ,row_id
+//         |            FROM    (
+//         |                        SELECT  *, concat_ws('',detail_id,tn) row_id
+//         |                                ,ROW_NUMBER() OVER (PARTITION BY detail_id,tn ORDER BY ds DESC) num
+//         |                        FROM    $ads_judicial_case_relation_pre
+//         |                        WHERE   ds > 0 AND  case_no_trim(case_no) is not null AND  date is not null
+//         |                    )
+//         |            WHERE   num = 1
+//         |        ) b
+//         |ON      a.row_id = b.row_id
+//         |""".stripMargin).show(20, false)
+
+    //明细表
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_relation_r3 PARTITION(ds='$calc_ds')
+         |SELECT
+         |    id,
+         |    judicase_id,
+         |    title       ,
+         |    case_type   ,
+         |    case_reason ,
+         |    case_no     ,
+         |    court_name  ,
+         |    case_stage  ,
+         |    lable       ,
+         |    detail      ,
+         |    name_aggs['yg_name'] yg_name,
+         |    name_aggs['bg_name'] bg_name,
+         |    last_date   ,
+         |    0 deleted   ,
+         |    all_name    ,
+         |    court_level,
+         |    case_amt,
+         |    judge_amt,
+         |    exec_info
+         |FROM
+         |(
+         |SELECT  md5(concat_ws('',concat_ws('',judicase_id),CLEANUP(case_no))) id
+         |        ,judicase_id
+         |        ,max(title) title
+         |        ,case_type(max(case_no)) as case_type
+         |        ,case_reason(case_reason,date,flag) case_reason
+         |        ,case_no
+         |        ,concat_ws(',',collect_set(court_name)) court_name
+         |        ,case_stage(max(case_no)) as case_stage
+         |        ,trim_black(concat_ws(',',max(case_type),collect_set(lable))) lable
+         |        ,concat('[',concat_ws(',',collect_set(detail)),']') detail
+         |        ,max(date) last_date
+         |        ,name_aggs(yg_name,bg_name,flag,data['date']) name_aggs
+         |        ,all_name_plus_v2(all_name) all_name
+         |        ,trim_black(concat_ws(',',collect_set(court_level))) court_level
+         |        ,max(case_amt) as case_amt
+         |        ,max(judge_amt) as judge_amt
+         |        ,case_amt_plus_v2(data['exec_info']) as exec_info
+         |FROM    (
+         |        SELECT  a.*,court_level(court_name) court_level
+         |        FROM    (
+         |                   SELECT   judicase_id
+         |                           ,flag
+         |                           ,title
+         |                           ,case_type(case_no) case_type
+         |                           ,adjust_reason(case_reason) case_reason
+         |                           ,case_no_trim(case_no) as case_no
+         |                           ,court_name
+         |                           ,case_stage(case_no) case_stage
+         |                           ,case_label(flag) lable
+         |                           ,detail
+         |                           ,yg_name
+         |                           ,bg_name
+         |                           ,all_name
+         |                           ,date
+         |                           ,detail_id
+         |                           ,case_amt
+         |                           ,judge_amt
+         |                           ,tn
+         |                           ,data
+         |                   FROM    $ads_judicial_case_relation_id
+         |                   WHERE   ds = '$calc_ds' AND length(case_label(flag)) > 0 AND  case_no_trim(case_no) is not null AND  date is not null
+         |                )a
+         |)
+         |GROUP BY judicase_id
+         |         ,case_no
+         |) x
+         |""".stripMargin).show(10, false)
+
+    //司法案件主表
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_relation_r1 PARTITION(ds='$calc_ds')
+         |SELECT
+         |    judicase_id,
+         |    title       ,
+         |    case_type   ,
+         |    case_reason ,
+         |    case_no     ,
+         |    court_name  ,
+         |    case_stage  ,
+         |    lable       ,
+         |    name_aggs['yg_name'] yg_name,
+         |    name_aggs['bg_name'] bg_name,
+         |    all_name,
+         |    case_info    ,
+         |    judge_info   ,
+         |    exec_info    ,
+         |    date         ,
+         |    court_level  ,
+         |    0 deleted
+         |FROM
+         |(
+         |SELECT  judicase_id
+         |        ,max(title) title
+         |        ,concat_ws(',',collect_set(case_type)) case_type
+         |        ,case_reason(case_reason,date,'0') case_reason
+         |        ,concat_ws(',',collect_set(case_no)) case_no
+         |        ,trim_black(concat_ws(',',collect_set(court_name))) court_name
+         |        ,max(last_stage) case_stage
+         |        ,trim_black(concat_ws(',', collect_set(lable)) ) lable
+         |        -- ,max(first_case_amt) case_amt
+         |        ,max(date) AS date
+         |        ,trim_black(concat_ws(',',collect_set(court_level))) court_level
+         |        ,name_aggs(yg_name,bg_name,'0',date) name_aggs
+         |        ,all_name_plus_v2(all_name) all_name
+         |        ,amt_merge(concat_ws('&',collect_set(case_info))) case_info
+         |        ,amt_merge(concat_ws('&',collect_set(judge_info))) judge_info
+         |        ,case_amt_plus_v2(exec_info) as exec_info
+         |FROM    (
+         |        SELECT  a.*
+         |        FROM    (
+         |                   SELECT  judicase_id,title,case_type,case_reason,case_no,court_name,case_stage,lable,yg_name,bg_name,all_name,date,case_amt,judge_amt,exec_info
+         |                   ,court_level(court_name) court_level
+         |                   ,concat_ws('|',case_stage,coalesce(case_amt,0))  as case_info
+         |                   ,concat_ws('|',case_stage,coalesce(judge_amt,0)) as judge_info
+         |                   ,first_value(case_stage) OVER (PARTITION BY judicase_id ORDER BY date DESC ) AS last_stage
+         |                   FROM    $ads_judicial_case_relation_r3
+         |                   WHERE   ds = '$calc_ds'
+         |                ) a
+         |        )
+         |GROUP BY judicase_id
+         |)x
+         |""".stripMargin).show(20, false)
+
+    //分区不存在,插入空分区
+    addEmptyPartitionOrSkip(ads_judicial_case_relation_r1, calc_ds)
+    addEmptyPartitionOrSkip(ads_judicial_case_relation_r3, calc_ds)
+  }
+
+  private def get_partition_order_by(): String = {
+    if (pre_cols.contains("update_time") || pre_cols.contains("update_date")) {
+      " ds DESC,update_time DESC "
+    } else {
+      " ds DESC "
+    }
+  }
+
+  def calc_last_ds(tabName: String, default: String = "0"): String = {
+    var d1 = getLastPartitionsOrElse(tabName, default)
+    val d2 = BaseUtil.getYesterday()
+    if (d1.equals(d2)) {
+      d1 = getSecondLastPartitionOrElse(tabName, default)
+    }
+    d1
+  }
+}

+ 76 - 0
src/main/scala/com/winhc/bigdata/spark/udf/CaseAmtAggsPlusV2.scala

@@ -0,0 +1,76 @@
+package com.winhc.bigdata.spark.udf
+
+import com.winhc.bigdata.spark.utils.BaseUtil._
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
+import org.apache.spark.sql.types._
+
+import scala.collection.mutable
+
+/**
+ * 取被执行维度,根据当事人聚合[{'name':'被執行人','exec_money':'执行金额(单位取万元)'}],如果出现一个司法案件,同一个当事人被执行过多次,那么根据case_create_time取最早的一个
+ *
+ * @Description:执行金额聚合
+ * @author π
+ * @date 2021/08/16 15:15
+ */
+class CaseAmtAggsPlusV2(max: Int) extends UserDefinedAggregateFunction {
+
+  val flags = Seq("7")
+  val split = "\u0001"
+  val empty_col = "[]"
+
+  override def inputSchema: StructType = StructType(Array[StructField](
+    StructField("exec_info", DataTypes.StringType)
+  ))
+
+  override def bufferSchema: StructType = StructType(
+    Array[StructField](
+      StructField("t1", DataTypes.createArrayType(DataTypes.StringType))
+    )
+  )
+
+  override def dataType: DataType = DataTypes.StringType
+
+  override def deterministic: Boolean = true
+
+  override def initialize(buffer: MutableAggregationBuffer): Unit = {
+    buffer.update(0, Seq.empty)
+  }
+
+  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
+    val buf_info = buffer.getSeq[String](0)
+    if (buf_info.size >= max) {
+      return
+    }
+    if (input.size != 1) return
+    val input_info = input.getAs[String](0)
+    if (empty_col.equals(input_info)) {
+      return
+    }
+    buffer(0) = buf_info ++ Seq(input_info)
+  }
+
+  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
+
+    val buf_info = buffer1.getSeq[String](0)
+    if (buf_info.size >= max) {
+      return
+    }
+    val input_info = buffer2.getSeq[String](0)
+    buffer1(0) = buf_info ++ input_info
+  }
+
+  override def evaluate(buffer: Row): Any = {
+    val tmp_name: Seq[String] = buffer.getSeq[String](0)
+    //println("tmp_name|  " + tmp_name)
+    if (tmp_name == null || tmp_name.isEmpty) return "[]"
+    val list = tmp_name.map(x => {
+      json_array(x)
+    }).reduce(_ ++ _)
+    //println("list|  " + list)
+    merge_exec_json(list)
+  }
+
+}

+ 7 - 0
src/main/scala/com/winhc/bigdata/spark/udf/CompanyMapping.scala

@@ -106,6 +106,13 @@ trait CompanyMapping {
     spark.udf.register("adjust_reason", (reason: String) => {
       adjust_reason(reason)
     })
+    spark.udf.register("amt_div", (amt1: String, amt2: String) => {
+      amt_div(amt1, amt2)
+    })
+
+    spark.udf.register("amt_merge", (info:String) => {
+      amt_merge(info)
+    })
 
 
   }

+ 112 - 0
src/main/scala/com/winhc/bigdata/spark/udf/NameAggsPlusV2.scala

@@ -0,0 +1,112 @@
+package com.winhc.bigdata.spark.udf
+
+import com.winhc.bigdata.spark.utils.BaseUtil
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
+import org.apache.spark.sql.types._
+
+import scala.collection.JavaConverters._
+import scala.collection.immutable
+
+/**
+ * @Description:优先取文书数据
+ * @author π
+ * @date 2021/08/19 14:40
+ */
+
+class NameAggsPlusV2(max: Int) extends UserDefinedAggregateFunction {
+
+  //val flags = Seq("0", "1", "2", "4", "8")
+  val split = "\u0001"
+  val empty_col = "[]"
+
+  override def inputSchema: StructType = StructType(Array[StructField](
+    StructField("yg_name", DataTypes.StringType)
+    , StructField("bg_name", DataTypes.StringType)
+    , StructField("flag", DataTypes.StringType)
+    , StructField("bus_date", DataTypes.StringType)
+  ))
+
+  override def bufferSchema: StructType = StructType(
+    Array[StructField](
+      StructField("t1", DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType))
+    )
+  )
+
+  override def dataType: DataType = DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType)
+
+  override def deterministic: Boolean = true
+
+  override def initialize(buffer: MutableAggregationBuffer): Unit = {
+    buffer.update(0, Map[String, String]())
+  }
+
+  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
+    if (buffer.size >= max) {
+      return
+    }
+    if (input.size != 4) return
+    val yg_name = input.getAs[String](0)
+    val bg_name = input.getAs[String](1)
+    val flag = input.getAs[String](2)
+    val bus_date = input.getAs[String](3)
+    if (StringUtils.isBlank(bus_date)) {
+      return
+    }
+    if (StringUtils.isBlank(yg_name) && StringUtils.isBlank(bg_name)) {
+      return
+    }
+    //原被告转换
+    val yg_name1 = BaseUtil.filter_json(yg_name, "name")
+    val bg_name1 = BaseUtil.filter_json(bg_name, "name")
+
+    if (empty_col.equals(yg_name1) && empty_col.equals(bg_name1)) {
+      return
+    }
+    val map0 = buffer.getMap[String, String](0).toMap
+    var map_new0 = scala.collection.mutable.Map[String, String](map0.toSeq: _*)
+    map_new0 ++= Map(s"$bus_date$split$flag" -> s"$yg_name1$split$bg_name1$split$flag")
+    buffer.update(0, map_new0)
+  }
+
+  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
+    buffer1(0) = buffer1.getAs[Map[String, String]](0) ++ buffer2.getAs[Map[String, String]](0)
+  }
+
+  override def evaluate(buffer: Row): Any = {
+    var yg_name = empty_col
+    var bg_name = empty_col
+    val m0: Map[String, String] = buffer.getAs[Map[String, String]](0)
+    println(s"m0: $m0")
+    var has = false
+    if (m0.isEmpty) {
+      return Map("yg_name" -> yg_name, "bg_name" -> bg_name)
+    } else if (m0.nonEmpty) {
+      val tmp: List[(String, String, String, String, String)] = m0.map(x => {
+        val Array(yg_name1, bg_name1, flag) = x._2.split(s"$split", -1)
+        if (flag.equals("0")) {
+          has = true
+        }
+        var weight = "0"
+        if (!empty_col.equals(yg_name1) && !empty_col.equals(bg_name1)) {
+          weight = "1"
+        }
+        (x._1, yg_name1, bg_name1, flag, weight)
+      }).
+        filter(x => {
+        if (has) {
+          x._4.equals("0")
+        } else {
+          !has
+        }
+      }).
+        toList.sortBy(x => (x._5, x._1))(Ordering.Tuple2(Ordering.String.reverse, Ordering.String))
+      println(s"tmp: $tmp")
+      val (_, yg_name2, bg_name2, _, _) = tmp.head
+      yg_name = yg_name2
+      bg_name = bg_name2
+    }
+    Map("yg_name" -> yg_name, "bg_name" -> bg_name)
+  }
+}

+ 133 - 8
src/main/scala/com/winhc/bigdata/spark/utils/BaseUtil.scala

@@ -16,7 +16,7 @@ import com.alibaba.fastjson.serializer.SerializeFilter
 import org.json4s.DefaultFormats
 import org.json4s.jackson.Json
 
-import scala.collection.mutable
+import scala.collection.{immutable, mutable}
 import scala.collection.mutable.ListBuffer
 
 /**
@@ -275,12 +275,22 @@ object BaseUtil {
 
   def caseStage(caseNo: String): String = {
     if (StringUtils.isNotBlank(caseNo)) {
-      if (StrUtil.containsAny(caseNo, "民初")) {
+      if (StrUtil.containsAny(caseNo, "民") && StrUtil.containsAny(caseNo, "初")) {//特殊
         return "民事一审"
       } else if (StrUtil.containsAny(caseNo, "执恢")) {
         return "恢复执行"
       } else if (StrUtil.containsAny(caseNo, "执异")) {
         return "执行异议"
+      } else if (StrUtil.containsAny(caseNo, "执复")) {
+        return "执行复议"
+      } else if (StrUtil.containsAny(caseNo, "司救执")) {
+        return "执行司法救助"
+      } else if (StrUtil.containsAny(caseNo, "司救民")) {
+        return "民事司法救助"
+      } else if (StrUtil.containsAny(caseNo, "执监")) {
+        return "执行监督"
+      } else if (StrUtil.containsAny(caseNo, "执他")) {
+        return "其他执行"
       } else if (StrUtil.containsAny(caseNo, "执")) {
         return "首次执行"
       } else if (StrUtil.containsAny(caseNo, "刑初")) {
@@ -299,10 +309,60 @@ object BaseUtil {
         return "财产保全"
       } else if (StrUtil.containsAny(caseNo, "行初")) {
         return "行政一审"
+      } else if (StrUtil.containsAny(caseNo, "民辖终")) {
+        return "民事管辖上诉"
       } else if (StrUtil.containsAny(caseNo, "民辖")) {
         return "民事管辖"
       } else if (StrUtil.containsAny(caseNo, "民再")) {
         return "民事再审"
+      } else if (StrUtil.containsAny(caseNo, "诉前调")) {
+        return "诉前调解"
+      } else if (StrUtil.containsAny(caseNo, "刑更")) {
+        return "刑罚与执行变更"
+      } else if (StrUtil.containsAny(caseNo, "行审复")) {
+        return "非诉行政行为申请执行审查复议"
+      } else if (StrUtil.containsAny(caseNo, "行审")) {
+        return "非诉行政行为申请执行审查"
+      } else if (StrUtil.containsAny(caseNo, "民督")) {
+        return "申请支付令审查"
+      } else if (StrUtil.containsAny(caseNo, "行申")) {
+        return "行政申请再审审查"
+      } else if (StrUtil.containsAny(caseNo, "刑再")) {
+        return "刑事审判监督"
+      } else if (StrUtil.containsAny(caseNo, "行赔初")) {
+        return "行政赔偿一审"
+      } else if (StrUtil.containsAny(caseNo, "行赔终")) {
+        return "行政赔偿二审"
+      } else if (StrUtil.containsAny(caseNo, "行赔申")) {
+        return "行政赔偿申请再审审查"
+      } else if (StrUtil.containsAny(caseNo, "刑辖")) {
+        return "刑事管辖"
+      } else if (StrUtil.containsAny(caseNo, "司惩")) {
+        return "司法制裁审查"
+      } else if (StrUtil.containsAny(caseNo, "民监")) {
+        return "民事依职权再审审查"
+      } else if (StrUtil.containsAny(caseNo, "刑他")) {
+        return "其他刑事"
+      } else if (StrUtil.containsAny(caseNo, "民抗")) {
+        return "民事抗诉再审审查"
+      } else if (StrUtil.containsAny(caseNo, "民催")) {
+        return "催告"
+      } else if (StrUtil.containsAny(caseNo, "民他")) {
+        return "其他民事"
+      } else if (StrUtil.containsAny(caseNo, "行辖")) {
+        return "行政管辖"
+      } else if (StrUtil.containsAny(caseNo, "民撤")) {
+        return "第三人撤销之诉"
+      } else if (StrUtil.containsAny(caseNo, "行再")) {
+        return "行政再审"
+      } else if (StrUtil.containsAny(caseNo, "委赔监")) {
+        return "司法赔偿监督审查"
+      } else if (StrUtil.containsAny(caseNo, "委赔")) {
+        return "赔偿委员会审理赔偿"
+      } else if (StrUtil.containsAny(caseNo, "刑医")) {
+        return "强制医疗"
+      } else if (StrUtil.containsAny(caseNo, "法赔")) {
+        return "法院作为赔偿义务机关自赔"
       } else {
         return "其它阶段"
       }
@@ -625,7 +685,8 @@ object BaseUtil {
       list.foreach(x => {
         val n = collection.mutable.Map(x.toSeq: _*)
         val name = x.getOrElse("name", "")
-        if (name.length > 0 && !m.contains(name)) {
+        val litigant_id = x.getOrElse("litigant_id", "")
+        if (name.length > 0 && (!m.contains(name) || litigant_id.length > 0)) {
           m ++= mutable.Map(name -> n)
         }
       })
@@ -638,6 +699,43 @@ object BaseUtil {
     }
   }
 
+  def merge_exec_json(list: List[Map[String, String]]): String = {
+    import org.json4s.DefaultFormats
+    //    try {
+    val list1: Map[String, List[(String, String, String, Double)]] = list.map(x => {
+      val name = x.getOrElse("name", "")
+      val litigant_id = x.getOrElse("litigant_id", "")
+      val date = x.getOrElse("date", "")
+      val exec_money = x.getOrElse("exec_money", 0).asInstanceOf[Double]
+      (name, litigant_id, date, exec_money)
+    }).filter(x => {
+      x._4 > 0 && StringUtils.isNotBlank(x._1) && StringUtils.isNotBlank(x._3)
+    }).groupBy(_._1)
+    //println("list1|  " + list)
+    val list2: Iterable[Map[String, Any]] = list1.mapValues(l => {
+      var name: String = l.head._1
+      var litigant_id: String = l.head._2
+      var date: String = l.head._3
+      var exec_money: Double = l.head._4
+      l.foreach(x => {
+        if (StringUtils.isNotBlank(x._1)) name = x._1
+        if (StringUtils.isNotBlank(x._2)) litigant_id = x._2
+        if (date.compare(x._3) > 0) {
+          date = x._3
+          exec_money = x._4
+        }
+      })
+      Map("name" -> name, "litigant_id" -> litigant_id, "date" -> date, "exec_money" -> exec_money)
+    }).values
+    if (list2.isEmpty) return "[]"
+    Json(DefaultFormats).write(list2)
+    //    } catch {
+    //      case e: Exception => {
+    //        return "[]"
+    //      }
+    //    }
+  }
+
   def merge_json(yg_name: String, bg_name: String, all_name: String): String = {
     list_json(json_array(yg_name) ++ json_array(bg_name) ++ json_array(all_name))
   }
@@ -708,7 +806,34 @@ object BaseUtil {
     reason
   }
 
+  def amt_div(amt1: String, amt2: String): Double = {
+    var r1 = 0d
+    if (StringUtils.isBlank(amt1) || StringUtils.isBlank(amt2)) r1 = 0d
+    try {
+      r1 = BigDecimalUtil.div(amt1, amt2, 6)
+    } catch {
+      case e: Exception => r1 = 0d
+    }
+    r1
+  }
+
+  def amt_merge(amt_info: String): String = {
+    if (StringUtils.isBlank(amt_info)) return "[]"
+    val a1 = amt_info.split("&", -1)
+    val m: immutable.Seq[Map[String, Any]] = a1.map(x => {
+      val Array(case_stage, amt) = x.split("\\|", -1)
+      (case_stage, amt.toDouble)
+    }).filter(_._2 > 0).groupBy(_._1).mapValues(x => {
+      x.map(_._2).max
+    }).toList.map(x => {
+      Map("case_stage" -> x._1, "amt" -> x._2)
+    })
+    Json(DefaultFormats).write(m)
+  }
+
   def main(args: Array[String]): Unit = {
+    println(amt_merge("一审|20.3&二审|30.3&再审|40.3&再审|30.3&一审|30.3"))
+    println(amt_div("1998301", "10000"))
     println(adjust_reason("其他"))
     println(adjust_reason("其它"))
     println(combine_id(null, ""))
@@ -718,9 +843,9 @@ object BaseUtil {
     println(tn_flag("company_zxr1"))
     val s = "[{\"name\":\"史某某\",\"litigant_id\":\"xx\"},{\"name\":\"伍新贵\",\"litigant_id\":\"\"}]\u0001[{\"name\":\"伍新贵\",\"litigant_id\":\"\"}]"
     println(all_name(s))
-    val s1 = "[{\"name\":\"\",\"litigant_id\":\"xx\"},{\"name\":\"伍新贵\",\"litigant_id\":\"\"}]"
-    val s2 = "[{\"name\":\"\",\"litigant_id\":\"xx\"},{\"name\":\"何安平\",\"litigant_id\":\"\"}]"
-    val s3 = "[{\"name\":\"xxx\",\"litigant_id\":\"xx\"},{\"name\":\"伍新贵\",\"litigant_id\":\"\"},{\"name\":\"何安平\",\"litigant_id\":\"\"}]"
+    val s1 = "[{\"name\":\"xxx\",\"litigant_id\":\"\",\"date\":\"2021\"},{\"name\":\"伍新贵\",\"litigant_id\":\"\",\"date\":\"2022\"}]"
+    val s2 = "[{\"name\":\"\",\"litigant_id\":\"xx\"},{\"name\":\"何安平\",\"litigant_id\":\"222\"}]"
+    val s3 = "[{\"name\":\"xxx\",\"litigant_id\":\"\",\"date\":\"2021\"},{\"name\":\"伍新贵\",\"litigant_id\":\"333\"},{\"name\":\"何安平\",\"litigant_id\":\"\"}]"
 
     //    val s1 = null
     //    val s2 = ""
@@ -734,10 +859,10 @@ object BaseUtil {
     //    println(nameCleanup("小米科技.;有,@限公  司  雷军"))
     //    println(title("xx", null, "reason"))
     //    println(parseAddress("大石桥市人民法院"))
-    //  println(case_no_trim_v2("(2019)年中国贸仲京裁字第0394号号"))
+    println(case_no_trim_v2("(2020)豫1728执541号之一"))
     //    val seq = Seq("1", "3", "2", "7").mkString("\001")
     //    println(sortString(seq))
-    //println(caseStage("(2019)鄂刑初7号"))
+    println(caseStage("(2019)鄂民申7号"))
     /*val yg_name = ",,"
     val bg_name = "张三,,小米,;"
     println(compareName(yg_name, bg_name))*/