Browse Source

案件聚合逻辑

xufei 3 years ago
parent
commit
c013990ecf

+ 467 - 0
src/main/scala/com/winhc/bigdata/spark/ng/judicial/JudicialCaseRelationAggs.scala

@@ -0,0 +1,467 @@
+package com.winhc.bigdata.spark.ng.judicial
+
+import com.winhc.bigdata.spark.udf.{BaseFunc, CaseAmtAggs, CaseReasonAggs, CompanyMapping, CourtRank, NameAggs, NameAggsPlus}
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.mutable
+
+/**
+ * @Description:司法案件新版本聚合
+ * @author π
+ * @date 2021/5/19 10:46
+ */
+
+case class args_case(tableName: String = ""
+                     , rowkey: String = "rowkey"
+                     , cols_map: Map[String, String] = Map.empty
+                    )
+
+object args_case {
+  val tn_mapping = Map[String, String](
+    "company_lawsuit" -> "0"
+    , "company_court_open_announcement" -> "1"
+    , "company_court_announcement" -> "2"
+    , "company_dishonest_info" -> "3"
+    , "company_send_announcement" -> "4"
+    , "company_zxr_restrict" -> "5"
+    , "company_zxr_final_case" -> "6"
+    , "company_zxr" -> "7"
+    , "company_court_register" -> "8"
+  )
+  val tab_args = Seq(
+    //文书
+    args_case(tableName = "company_lawsuit"
+      , cols_map = Map[String, String]("flag" -> "0", "case_stage" -> "case_stage(case_no)"
+        , "yg_name" -> "plaintiff_info", "bg_name" -> "defendant_info", "date" -> "judge_date"
+        , "detail_id" -> "rowkey", "case_amt" -> "case_amt*10000"
+        , "data" -> "map('date',judge_date)"
+        , "all_name" -> "litigant_info"
+      ))
+    //开庭公告
+    , args_case(tableName = "company_court_open_announcement"
+      , cols_map = Map[String, String]("flag" -> "1", "title" -> "null", "case_type" -> "case_type(case_no)"
+        , "case_stage" -> "case_stage(case_no)", "court_name" -> "court", "case_reason" -> "case_reason"
+        , "yg_name" -> "plaintiff_info", "bg_name" -> "defendant_info", "date" -> "start_date"
+        , "detail_id" -> "rowkey", "case_amt" -> "null", "case_id" -> "null"
+        , "data" -> "map('date',start_date)"
+        , "all_name" -> "litigant_info"
+      ))
+    //法院公告
+    , args_case(tableName = "company_court_announcement"
+      , cols_map = Map[String, String]("flag" -> "2", "title" -> "null", "case_type" -> "case_type(case_no)"
+        , "case_stage" -> "case_stage(case_no)", "court_name" -> "court_name", "case_reason" -> "null"
+        , "yg_name" -> "plaintiff_info", "bg_name" -> "litigant_info", "date" -> "concat_ws(' ',publish_date,'00:00:00')"
+        , "detail_id" -> "rowkey", "case_amt" -> "null", "case_id" -> "null"
+        , "data" -> "map('date',concat_ws(' ',publish_date,'00:00:00'))"
+        , "all_name" -> "null"
+      ))
+    //失信人
+    , args_case(tableName = "company_dishonest_info"
+      , cols_map = Map[String, String]("flag" -> "3", "title" -> "null", "case_type" -> "case_type(case_no)"
+        , "case_stage" -> "case_stage(case_no)", "court_name" -> "court", "case_reason" -> "null"
+        , "yg_name" -> "null", "bg_name" -> "to_json(array(named_struct('litigant_id',COALESCE(keyno,''),'name',name)))", "date" -> "pub_date"
+        , "detail_id" -> "rowkey", "case_amt" -> "null", "case_id" -> "null"
+        , "data" -> "map('date',reg_time)"
+        , "all_name" -> "null"
+      ))
+    //送达公告
+    , args_case(tableName = "company_send_announcement"
+      , cols_map = Map[String, String]("flag" -> "4", "title" -> "null", "case_type" -> "case_type(case_no)"
+        , "case_stage" -> "case_stage(case_no)", "court_name" -> "court", "case_reason" -> "case_reason"
+        , "yg_name" -> "plaintiff_info", "bg_name" -> "defendant_info", "date" -> "start_date"
+        , "detail_id" -> "rowkey", "case_amt" -> "null", "case_id" -> "null"
+        , "data" -> "map('date',start_date)"
+        , "all_name" -> "litigant_info"
+      ))
+    //限高
+    , args_case(tableName = "company_zxr_restrict"
+      , cols_map = Map[String, String]("flag" -> "5", "title" -> "null", "case_type" -> "case_type(case_no)"
+        , "case_stage" -> "case_stage(case_no)", "court_name" -> "court_name", "case_reason" -> "null", "yg_name" -> "null"
+        , "bg_name" -> "to_json(array(named_struct('litigant_id',COALESCE(company_id,'') ,'name',COALESCE(company_name,''))  ,named_struct('litigant_id',COALESCE(pid,''),'name',COALESCE(person_name,''))  ))"
+        , "date" -> "case_create_time", "detail_id" -> "rowkey", "case_amt" -> "null", "case_id" -> "null"
+        , "data" -> "map('date',case_create_time)"
+        , "all_name" -> "null"
+      ))
+    //终本
+    , args_case(tableName = "company_zxr_final_case"
+      , cols_map = Map[String, String]("flag" -> "6", "title" -> "null", "case_type" -> "case_type(case_no)"
+        , "case_stage" -> "case_stage(case_no)", "court_name" -> "court_name", "case_reason" -> "null"
+        , "yg_name" -> "null", "bg_name" -> "to_json(array(named_struct('litigant_id',COALESCE(keyno,''),'name',name)))"
+        , "date" -> "case_create_time"
+        , "detail_id" -> "rowkey", "case_amt" -> "exec_amount", "case_id" -> "null"
+        , "data" -> "map('date',case_create_time)"
+        , "all_name" -> "null"
+      ))
+    //被执
+    , args_case(tableName = "company_zxr"
+      , cols_map = Map[String, String]("flag" -> "7", "title" -> "null", "case_type" -> "case_type(case_no)"
+        , "case_stage" -> "case_stage(case_no)", "court_name" -> "court", "case_reason" -> "null"
+        , "yg_name" -> "null", "bg_name" -> "to_json(array(named_struct('litigant_id',COALESCE(keyno,''),'name',name)))", "date" -> "case_create_time"
+        , "detail_id" -> "rowkey", "case_amt" -> "exec_money", "case_id" -> "null"
+        , "data" -> "map('date',case_create_time)"
+        , "all_name" -> "null"
+      ))
+    //立案信息
+    , args_case(tableName = "company_court_register"
+      , cols_map = Map[String, String]("flag" -> "8", "title" -> "null", "case_type" -> "case_type(case_no)"
+        , "case_stage" -> "case_stage(case_no)", "court_name" -> "court", "case_reason" -> "case_reason"
+        , "yg_name" -> "plaintiff_info", "bg_name" -> "defendant_info", "date" -> "filing_date"
+        , "detail_id" -> "rowkey", "case_amt" -> "null", "case_id" -> "null"
+        , "data" -> "map('date',filing_date)"
+        , "all_name" -> "litigant_info"
+      ))
+  )
+
+  def get_job_args(tn: String): args_case = {
+    tab_args.find(p => tn.equals(p.tableName)).getOrElse(throw new NullPointerException("tn is not fount"))
+  }
+
+  def get_job_args(): args_case = {
+    args_case()
+  }
+}
+
+
+object JudicialCaseRelationAggs {
+  def main(args: Array[String]): Unit = {
+    var project = ""
+    var tn = ""
+    var c = ""
+    if (args.length == 3) {
+      val Array(p1, p2, p3) = args
+      project = p1
+      tn = p2
+      c = p3
+    } else if (args.length == 2) {
+      val Array(p1, p2) = args
+      project = p1
+      c = p2
+    } else {
+      println("please check project tn c!")
+      sys.exit(-1)
+    }
+    println(
+      s"""
+         |project: $project
+         |tn: $tn
+         |c: $c
+         |""".stripMargin)
+
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> s"$project",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "10000"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    if (StringUtils.isBlank(tn)) {
+      tn = "company_lawsuit"
+    }
+    val r = JudicialCaseRelationAggs(spark, project, args_case.get_job_args(tn))
+    c match {
+      case "pre_calc" => r.pre_calc()
+      case "calc_mapping" => r.calc_mapping()
+      case "calc" => r.calc()
+      case _ => {
+        println("not fun to run !")
+        sys.exit(-1)
+      }
+    }
+    spark.stop()
+  }
+
+}
+
+case class JudicialCaseRelationAggs(s: SparkSession, project: String, args_case: args_case
+                                   ) extends LoggingUtils with CompanyMapping with BaseFunc with CourtRank {
+  override protected val spark: SparkSession = s
+
+  //预处理表
+  val ads_judicial_case_relation_pre = s" $project.ads_judicial_case_relation_pre"
+  //替换id表
+  val ads_judicial_case_relation_id = s" $project.ads_judicial_case_relation_id"
+  //id映射表
+  val ads_judicial_case_id_mapping = s" $project.ads_judicial_case_id_mapping"
+  //主表
+  val ads_judicial_case_relation_r1 = s" $project.ads_judicial_case_relation_r1"
+  //明细表
+  val ads_judicial_case_relation_r2 = s" $project.ads_judicial_case_relation_r2"
+  //案件移除表
+  val ads_judicial_case_id_mapping_deleted = s" $project.ads_judicial_case_id_mapping_deleted"
+
+  private val cols_map: Map[String, String] = args_case.cols_map
+  private val rowkey: String = args_case.rowkey
+  private val tableName: String = args_case.tableName
+
+  val ads_table = s" $project.ads_$tableName"
+  val inc_ads_table = s" $project.inc_ads_$tableName"
+
+
+  val pre_cols = getColumns(ads_judicial_case_relation_pre).diff(Seq("ds", "tn"))
+  var last_ds = BaseUtil.getPartion(ads_judicial_case_relation_pre, tableName, spark)
+  val calc_ds = BaseUtil.getYesterday()
+
+  if (calc_ds.equals(last_ds)) {
+    last_ds = BaseUtil.getSecondPartion(ads_judicial_case_relation_pre, tableName, spark)
+  }
+  val is_incr = if (StringUtils.isBlank(last_ds)) false else true
+
+  val cols = pre_cols.map(c => {
+    if (cols_map.contains(c)) {
+      s"${cols_map(c)} as $c"
+    } else c
+  })
+
+  case_no_trim_udf_v2()
+  prepareFunctions(spark)
+
+  val sort = get_partition_order_by()
+
+  def pre_calc(): Unit = {
+    var all_sql = ""
+    if (!is_incr) {
+      all_sql =
+        s"""
+           |SELECT  *
+           |FROM    $ads_table
+           |WHERE   ${if (is_incr) "ds = -1" else "ds > 0"}
+           |UNION ALL
+           |""".stripMargin
+    }
+
+    //裁判文书
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_relation_pre PARTITION(ds='$calc_ds',tn='$tableName')
+         |SELECT
+         |${pre_cols.mkString(",")}
+         |from    (
+         |            SELECT  ${cols.mkString(",")}
+         |                    ,ROW_NUMBER() OVER(PARTITION BY $rowkey ORDER BY $sort) AS num
+         |            from    (
+         |                       $all_sql
+         |                        SELECT  *
+         |                        FROM    $inc_ads_table
+         |                        WHERE    ${if (is_incr) s"ds > $last_ds" else "ds > 0"}
+         |                    )
+         |        )
+         |WHERE   num = 1
+         |${if (isWindows) "LIMIT 1000" else ""}
+         |""".stripMargin).show(100, false)
+
+    //分区不存在,插入空分区
+    addEmptyPartitionOrSkipPlus(ads_judicial_case_relation_pre, calc_ds, tableName)
+  }
+
+  def calc_mapping(): Unit = {
+
+    //找出删除id
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_id_mapping_deleted PARTITION(ds='$calc_ds')
+         |SELECT
+         |         a.id new_id
+         |        ,b.id old_id
+         |        ,a.rowkey
+         |        ,a.tn
+         |        ,1 AS deleted
+         |FROM    (
+         |            SELECT  *
+         |                    ,md5(concat_ws('',rowkey,tn)) row_id
+         |            FROM    $ads_judicial_case_id_mapping
+         |            WHERE   ds = '$calc_ds'
+         |        ) a
+         |JOIN    (
+         |            SELECT  id, row_id, num
+         |            FROM    (
+         |                        SELECT  id
+         |                                ,md5(concat_ws('',rowkey,tn)) row_id
+         |                                ,ROW_NUMBER() OVER (PARTITION BY rowkey,tn ORDER BY ds DESC) num
+         |                        FROM    $ads_judicial_case_id_mapping
+         |                        WHERE   ds < '$calc_ds'
+         |                    )
+         |            WHERE   num = 1
+         |        ) b
+         |ON      a.row_id = b.row_id
+         |WHERE   a.id <> b.id
+         |""".stripMargin)
+
+
+  }
+
+  def calc(): Unit = {
+    prepareFunctions(spark)
+    case_no_trim_udf_v2()
+    registerCourtRank()
+    spark.udf.register("name_aggs", new NameAggsPlus(1000))
+    spark.udf.register("case_reason", new CaseReasonAggs(1000))
+    spark.udf.register("case_amt", new CaseAmtAggs(1000))
+
+    //detail 文书id
+    //替换司法案件id
+        sql(
+          s"""
+             |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_relation_id PARTITION(ds='$calc_ds')
+             |SELECT  b.id
+             |        ,a.flag,a.title,a.case_type,a.case_reason,a.case_no,a.court_name,a.case_stage,a.lable,a.detail
+             |        ,a.yg_name,a.bg_name,a.all_name,a.date,a.detail_id,a.case_amt,a.case_id,a.tn,a.data
+             |FROM    (
+             |            SELECT   flag
+             |                    ,title
+             |                    ,case_type(case_no) case_type
+             |                    ,case_reason
+             |                    ,case_no_trim(case_no) as case_no
+             |                    ,court_name
+             |                    ,case_stage(case_no) case_stage
+             |                    ,case_label(flag) lable
+             |                    ,to_json(named_struct('flag', flag, 'date',date, 'detail_id', coalesce(case_id, detail_id), 'name', json_array(bg_name)) ) detail
+             |                    ,yg_name
+             |                    ,bg_name
+             |                    ,merge_json(yg_name, bg_name, all_name) all_name
+             |                    ,date
+             |                    ,detail_id
+             |                    ,case_amt
+             |                    ,case_id
+             |                    ,tn
+             |                    ,data
+             |                    ,row_id
+             |            FROM    (
+             |                        SELECT  *, md5(concat_ws('',detail_id,tn)) row_id
+             |                                ,ROW_NUMBER() OVER (PARTITION BY detail_id,tn ORDER BY ds DESC) num
+             |                        FROM    $ads_judicial_case_relation_pre
+             |                        WHERE   ds > ${calc_last_ds(ads_judicial_case_relation_id)}
+             |                    )
+             |            WHERE   num = 1
+             |        ) a
+             |JOIN    (
+             |           SELECT id, row_id
+             |           FROM (
+             |                    SELECT  id, md5(concat_ws('',rowkey,tn)) row_id
+             |                           ,ROW_NUMBER() OVER (PARTITION BY rowkey,tn ORDER BY ds DESC) num2
+             |                    FROM    $ads_judicial_case_id_mapping
+             |                    WHERE   ds > 0
+             |                ) WHERE num2 = 1
+             |        ) b
+             |ON      a.row_id = b.row_id
+             |""".stripMargin)
+
+    //司法案件主表
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_relation_r1 PARTITION(ds='$calc_ds')
+         |SELECT
+         |    judicase_id,
+         |    title       ,
+         |    case_type   ,
+         |    case_reason ,
+         |    case_no     ,
+         |    court_name  ,
+         |    case_stage  ,
+         |    lable       ,
+         |    name_aggs['yg_name'] yg_name,
+         |    name_aggs['bg_name'] bg_name,
+         |    all_name,
+         |    case_amt    ,
+         |    date        ,
+         |    court_level ,
+         |    0 deleted
+         |FROM
+         |(
+         |SELECT  judicase_id
+         |        ,max(title) title
+         |        ,concat_ws(',',collect_set(case_type)) case_type
+         |        ,case_reason(case_reason,date,flag) case_reason
+         |        ,concat_ws(',',collect_set(case_no)) case_no
+         |        ,concat_ws(',',collect_set(court_name)) court_name
+         |        ,max(last_stage) case_stage
+         |        ,trim_black(concat_ws(',',max(case_type),collect_set(lable))) lable
+         |        ,case_amt(case_amt) AS case_amt
+         |        ,max(date) AS date
+         |        ,trim_black(concat_ws(',',collect_set(court_level))) court_level
+         |        ,name_aggs(yg_name,bg_name,flag,data['date']) name_aggs
+         |        ,all_name(concat_ws('\u0001',collect_set(all_name))) all_name
+         |FROM    (
+         |        SELECT  a.*
+         |        FROM    (
+         |                   SELECT  judicase_id,flag,title,case_type,case_reason,case_no,court_name,case_stage,lable,yg_name,bg_name,all_name,date,case_amt
+         |                   ,court_level(court_name) court_level,data
+         |                   ,first_value(case_stage) OVER (PARTITION BY judicase_id ORDER BY data['date'] DESC ) AS last_stage
+         |                   FROM    $ads_judicial_case_relation_id
+         |                   WHERE   ds = '$calc_ds'
+         |                ) a
+         |        )
+         |GROUP BY judicase_id
+         |)x
+         |""".stripMargin).show(20, false)
+
+    //明细表
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_relation_r2 PARTITION(ds='$calc_ds')
+         |SELECT
+         |    id,
+         |    judicase_id,
+         |    title       ,
+         |    case_type   ,
+         |    case_reason ,
+         |    case_no     ,
+         |    court_name  ,
+         |    case_stage  ,
+         |    lable       ,
+         |    detail      ,
+         |    name_aggs['yg_name'] yg_name,
+         |    name_aggs['bg_name'] bg_name,
+         |    last_date   ,
+         |    0 deleted
+         |FROM
+         |(
+         |SELECT  md5(concat_ws('',concat_ws('',judicase_id),CLEANUP(case_no))) id
+         |        ,judicase_id
+         |        ,max(first_title) title
+         |        ,case_type(max(case_no)) as case_type
+         |        ,case_reason(case_reason,date,flag) case_reason
+         |        ,case_no
+         |        ,max(court_name) court_name
+         |        ,case_stage(max(case_no)) as case_stage
+         |        ,trim_black(concat_ws(',',max(case_type),collect_set(lable))) lable
+         |        ,concat('[',concat_ws(',',collect_set(detail)),']') detail
+         |        ,max(last_date) last_date
+         |        ,name_aggs(yg_name,bg_name,flag,data['date']) name_aggs
+         |FROM    (
+         |        SELECT  a.*
+         |                ,first_value(title) OVER (PARTITION BY a.judicase_id ORDER BY date ASC ) AS first_title
+         |                ,first_value(date) OVER (PARTITION BY a.judicase_id ORDER BY date DESC ) AS last_date
+         |        FROM    (
+         |                   SELECT  *
+         |                   FROM    $ads_judicial_case_relation_id
+         |                   WHERE   ds = '$calc_ds' AND length(lable) > 0
+         |                )a
+         |)
+         |GROUP BY judicase_id
+         |         ,case_no
+         |) x
+         |""".stripMargin).show(10, false)
+
+    //分区不存在,插入空分区
+    addEmptyPartitionOrSkip(ads_judicial_case_relation_r1, calc_ds)
+    addEmptyPartitionOrSkip(ads_judicial_case_relation_r2, calc_ds)
+  }
+
+  private def get_partition_order_by(): String = {
+    if (pre_cols.contains("update_time") || pre_cols.contains("update_date")) {
+      " ds DESC,update_time DESC "
+    } else {
+      " ds DESC "
+    }
+  }
+
+  def calc_last_ds(tabName: String, default: String = "0"): String = {
+    var d1 = getLastPartitionsOrElse(tabName, default)
+    val d2 = BaseUtil.getYesterday()
+    if (d1.equals(d2)) {
+      d1 = getSecondLastPartitionOrElse(tabName, default)
+    }
+    d1
+  }
+}

+ 12 - 10
src/main/scala/com/winhc/bigdata/spark/ng/relation/inc_human_relation_back.scala

@@ -27,16 +27,18 @@ case class inc_human_relation_back(s: SparkSession,
 
     sql(
       s"""
-        |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table $inc_ads_company_human_relation_back_deleted PARTITION (ds='$ds')
-        |SELECT  person_name
-        |        ,person_id
-        |        ,create_time
-        |        ,update_time
-        |        ,deleted
-        |FROM    $inc_ods_company_human_relation_merge
-        |WHERE   ds = '$ds'
-        |AND     length(company_id) = 0   AND   length(person_id) = 33
-        |""".stripMargin)
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table $inc_ads_company_human_relation_back_deleted PARTITION (ds='$ds')
+         |SELECT  person_name
+         |        ,person_id
+         |        ,create_time
+         |        ,update_time
+         |        ,deleted
+         |FROM    $inc_ods_company_human_relation_merge
+         |WHERE   ds = '$ds'
+         |AND     deleted = 9
+         |""".stripMargin).show(100, false)
+
+    //AND     length(company_id) = 0   AND   length(person_id) = 33
   }
 
 }

+ 7 - 0
src/main/scala/com/winhc/bigdata/spark/udf/BaseFunc.scala

@@ -66,6 +66,13 @@ trait BaseFunc extends LoggingUtils {
          |""".stripMargin)
   }
 
+  def addEmptyPartitionOrSkipPlus(tab: String, ds: String, tn: String): Unit = {
+    sql(
+      s"""
+         |ALTER TABLE $tab ADD IF NOT EXISTS PARTITION(ds='$ds',tn='$tn')
+         |""".stripMargin)
+  }
+
   @deprecated
   def case_no_trim_udf(): Unit = {
     spark.udf.register("case_no_trim", case_no_trim _)

+ 108 - 0
src/main/scala/com/winhc/bigdata/spark/udf/CaseAmtAggs.scala

@@ -0,0 +1,108 @@
+package com.winhc.bigdata.spark.udf
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
+import org.apache.spark.sql.types._
+
+import scala.collection.mutable
+
+/**
+ * 1、如果有被执行人,那么取被执行人的执行标的额
+ * 2、如果有终本案件,取终本案件的执行标的
+ * 3、如果有裁判文书取裁判文书的金额(取一审)
+ *
+ * @Description:案件金额聚合
+ * @author π
+ * @date 2020/10/26 15:15
+ */
+
+class CaseAmtAggs(max: Int) extends UserDefinedAggregateFunction {
+
+  //val flags = Seq("0", "1", "2", "4", "8")
+  val split = "\u0001"
+
+  override def inputSchema: StructType = StructType(Array[StructField](
+    StructField("case_amt", DataTypes.DoubleType)
+    , StructField("bus_date", DataTypes.StringType)
+    , StructField("flag", DataTypes.StringType)
+  ))
+
+  override def bufferSchema: StructType = StructType(
+    Array[StructField](
+      StructField("t1", DataTypes.createMapType(DataTypes.StringType, DataTypes.DoubleType))
+    )
+  )
+
+  override def dataType: DataType = DataTypes.DoubleType
+
+  override def deterministic: Boolean = true
+
+  override def initialize(buffer: MutableAggregationBuffer): Unit = {
+    buffer.update(0, Map[String, Double]())
+  }
+
+  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
+    if (buffer.size >= max) {
+      return
+    }
+    //val case_amt: Double = input.getDouble(0)
+    //val bus_date = input.getString(1)
+    //val flag = input.getString(2)
+    if(input.size != 3) return
+    val case_amt: Double = input.getAs[Double](0)
+    val bus_date = input.getAs[String](1)
+    val flag = input.getAs[String](2)
+
+    if (case_amt == null) {
+      return
+    }
+    //    if (!flags.contains(flag)) {
+    //      return
+    //    }
+
+    buffer(0) = mutable.Map(bus_date + split + flag -> case_amt) ++= scala.collection.mutable.Map[String, Double](buffer.getMap[String, Double](0).toMap.toSeq: _*)
+  }
+
+  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
+    if (buffer1.size >= max) {
+      return
+    }
+    buffer1(0) = buffer1.getAs[Map[String, Double]](0) ++ buffer2.getAs[Map[String, Double]](0)
+  }
+
+  override def evaluate(buffer: Row): Any = {
+    var case_amt: Double = 0d
+    val m0: Map[String, Double] = buffer.getAs[Map[String, Double]](0)
+    if (m0.isEmpty) {
+      return case_amt
+    } else {
+      val keys = m0.keySet.toSeq
+      val tup: Seq[(String, String, Double)] = keys.map(x => {
+        val arr = x.split(split, -1)
+        (arr(1), arr(0), m0(x))
+      })
+      val flags = Seq[String]("7", "6", "0")
+      val seq = flags.map(x => {
+        get_value(tup, x)
+      }).filter(_ > 0)
+      if (seq.size > 0) {
+        case_amt = seq.head
+      }
+    }
+    case_amt
+  }
+
+  def get_value(tup: Seq[(String, String, Double)], flag: String): Double = {
+    var case_amt: Double = 0d
+    val t1 = tup.filter(x => {
+      x._1.equals(flag) && x._3 > 0
+    })
+    if (t1.size > 0) {
+      val t2 = t1.map(x => x._2 -> x._3).toMap
+      val key = t2.keySet.toSeq.sorted.head
+      case_amt = t2(key)
+    }
+    case_amt
+  }
+}

+ 6 - 4
src/main/scala/com/winhc/bigdata/spark/udf/CaseReasonAggs.scala

@@ -41,10 +41,12 @@ class CaseReasonAggs(max: Int) extends UserDefinedAggregateFunction {
     if (buffer.size >= max) {
       return
     }
-    val case_reason = input.getString(0)
-    val bus_date = input.getString(1)
-    val flag = input.getString(2)
-    if (StringUtils.isBlank(case_reason)) {
+    if(input.size != 3) return
+    val case_reason = input.getAs[String](0)
+    val bus_date = input.getAs[String](1)
+    val flag = input.getAs[String](2)
+
+    if (StringUtils.isBlank(case_reason) || StringUtils.isBlank(bus_date)) {
       return
     }
 //    if (!flags.contains(flag)) {

+ 11 - 1
src/main/scala/com/winhc/bigdata/spark/udf/CompanyMapping.scala

@@ -4,7 +4,7 @@ import com.winhc.bigdata.calc.DimScore
 import org.apache.commons.lang3.StringUtils
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.sql.SparkSession
-import com.winhc.bigdata.spark.utils.BaseUtil._
+import com.winhc.bigdata.spark.utils.BaseUtil.{json_array, _}
 import com.winhc.bigdata.spark.utils.RowkeyRuleUtils._
 
 trait CompanyMapping {
@@ -71,6 +71,16 @@ trait CompanyMapping {
       split_names(json_array, json_path)
     })
 
+    spark.udf.register("json_array", (json: String) => {
+      json_array(json)
+    })
+
+    spark.udf.register("merge_json", (yg_name: String, bg_name: String, all_name: String) => {
+      merge_json(yg_name, bg_name, all_name)
+    })
+    spark.udf.register("all_name", (name: String) => {
+      all_name(name)
+    })
   }
 
   def prepare(spark: SparkSession): Unit = {

+ 111 - 0
src/main/scala/com/winhc/bigdata/spark/udf/NameAggsPlus.scala

@@ -0,0 +1,111 @@
+package com.winhc.bigdata.spark.udf
+
+import java.util
+
+import com.alibaba.fastjson.JSON
+import com.winhc.bigdata.spark.utils.BaseUtil
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
+import org.apache.spark.sql.types._
+
+/**
+ * @Description:原告,被告聚合v8版本
+ * @author π
+ * @date 2021/05/24 14:40
+ */
+
+class NameAggsPlus(max: Int) extends UserDefinedAggregateFunction {
+
+  //val flags = Seq("0", "1", "2", "4", "8")
+  val split = "\u0001"
+  val empty_col = "[]"
+
+  override def inputSchema: StructType = StructType(Array[StructField](
+    StructField("yg_name", DataTypes.StringType)
+    , StructField("bg_name", DataTypes.StringType)
+    , StructField("flag", DataTypes.StringType)
+    , StructField("bus_date", DataTypes.StringType)
+  ))
+
+  override def bufferSchema: StructType = StructType(
+    Array[StructField](
+      StructField("t1", DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType))
+      , StructField("t2", DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType))
+    )
+  )
+
+  override def dataType: DataType = DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType)
+
+  override def deterministic: Boolean = true
+
+  override def initialize(buffer: MutableAggregationBuffer): Unit = {
+    buffer.update(0, Map[String, String]())
+    buffer.update(1, Map[String, String]())
+  }
+
+  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
+
+    if (buffer.size >= max) {
+      return
+    }
+    if(input.size != 4) return
+    val yg_name = input.getAs[String](0)
+    val bg_name = input.getAs[String](1)
+    val flag = input.getAs[String](2)
+    val bus_date = input.getAs[String](3)
+
+    if (StringUtils.isBlank(bus_date)) {
+      return
+    }
+    if (StringUtils.isBlank(yg_name) && StringUtils.isBlank(bg_name)) {
+      return
+    }
+
+    //原被告转换
+    val yg_name1 = BaseUtil.filter_json(yg_name, "name")
+    val bg_name1 = BaseUtil.filter_json(bg_name, "name")
+
+    if (empty_col.equals(yg_name1) && empty_col.equals(bg_name1)) {
+      return
+    }
+
+    val map0 = buffer.getMap[String, String](0).toMap
+    val map1 = buffer.getMap[String, String](1).toMap
+    var map_new0 = scala.collection.mutable.Map[String, String](map0.toSeq: _*)
+    var map_new1 = scala.collection.mutable.Map[String, String](map1.toSeq: _*)
+    if (!empty_col.equals(yg_name1) && !empty_col.equals(bg_name1)) {
+      map_new0 ++= Map(bus_date -> s"$yg_name1$split$bg_name1")
+    } else {
+      map_new1 ++= Map(bus_date -> s"$yg_name1$split$bg_name1")
+    }
+    buffer.update(0, map_new0)
+    buffer.update(1, map_new1)
+  }
+
+  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
+    buffer1(0) = buffer1.getAs[Map[String, String]](0) ++ buffer2.getAs[Map[String, String]](0)
+    buffer1(1) = buffer1.getAs[Map[String, String]](1) ++ buffer2.getAs[Map[String, String]](1)
+  }
+
+  override def evaluate(buffer: Row): Any = {
+    var yg_name = empty_col
+    var bg_name = empty_col
+    val m0: Map[String, String] = buffer.getAs[Map[String, String]](0)
+    val m1: Map[String, String] = buffer.getAs[Map[String, String]](1)
+    if (m0.isEmpty && m1.isEmpty) {
+      return Map("yg_name" -> yg_name, "bg_name" -> bg_name)
+    } else if (m0.nonEmpty) {
+      val key = m0.keySet.toSeq.sorted.head
+      val arr = m0(key).split(s"$split", -1)
+      yg_name = arr(0)
+      bg_name = arr(1)
+    } else if(m1.nonEmpty){
+      val key = m1.keySet.toSeq.sorted.head
+      val arr = m1(key).split(s"$split", -1)
+      yg_name = arr(0)
+      bg_name = arr(1)
+    }
+    Map("yg_name" -> yg_name, "bg_name" -> bg_name)
+  }
+}

+ 170 - 19
src/main/scala/com/winhc/bigdata/spark/utils/BaseUtil.scala

@@ -5,13 +5,19 @@ import com.alibaba.fastjson.{JSON, JSONArray, JSONPath}
 import com.winhc.bigdata.spark.implicits.RegexUtils._
 import org.apache.commons.lang3.StringUtils
 import org.apache.commons.lang3.time.DateFormatUtils
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{Row, SparkSession}
 import java.security.MessageDigest
 import java.text.SimpleDateFormat
+import java.util
 import java.util.regex.Pattern
 import java.util.{Calendar, Date, Locale}
 
+import com.alibaba.fastjson.serializer.SerializeFilter
+import org.json4s.DefaultFormats
+import org.json4s.jackson.Json
+
 import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
 
 /**
  * @Author: XuJiakai
@@ -47,7 +53,8 @@ object BaseUtil {
     sql(sql_s).collect.toList.map(_.getString(0).split("=")(1)).seq
   }
 
-  def getPartion(t: String, p: String, @transient spark: SparkSession): String = {
+
+  def getPartionsPlus(t: String, p: String, @transient spark: SparkSession): Seq[String] = {
     import spark._
     val sql_s = s"show partitions " + t
     val ps = sql(sql_s).collect.toList.map(r => {
@@ -62,6 +69,11 @@ object BaseUtil {
       }
       r1
     }).filter(_._2.equals(p)).map(_._1).seq
+    ps
+  }
+
+  def getPartion(t: String, p: String, @transient spark: SparkSession): String = {
+    val ps = getPartionsPlus(t, p, spark)
     if (ps.size > 0) {
       ps.sorted.last
     } else {
@@ -69,6 +81,15 @@ object BaseUtil {
     }
   }
 
+  def getSecondPartion(t: String, p: String, @transient spark: SparkSession, default: String = ""): String = {
+    val ps = getPartionsPlus(t, p, spark)
+    if (ps.length >= 2) {
+      ps(ps.length - 2)
+    } else {
+      default
+    }
+  }
+
   def getPartion(t: String, @transient spark: SparkSession) = {
     val ps = getPartitions(t, spark)
     if (ps.size > 0) {
@@ -232,7 +253,7 @@ object BaseUtil {
     "其它案件"
   }
 
-  def caseStage(caseNo: String): String = {
+  def caseStageOld(caseNo: String): String = {
     if (StringUtils.isNotBlank(caseNo)) {
       val casePre = caseType(caseNo).replaceAll("案件", "")
       if (StrUtil.containsAny(caseNo, "破申", "商申") || casePre.contains("其它")) {
@@ -252,6 +273,43 @@ object BaseUtil {
     "其它阶段"
   }
 
+  def caseStage(caseNo: String): String = {
+    if (StringUtils.isNotBlank(caseNo)) {
+      if (StrUtil.containsAny(caseNo, "民初")) {
+        return "民事一审"
+      } else if (StrUtil.containsAny(caseNo, "执恢")) {
+        return "恢复执行"
+      } else if (StrUtil.containsAny(caseNo, "执")) {
+        return "首次执行"
+      } else if (StrUtil.containsAny(caseNo, "刑初")) {
+        return "刑事一审"
+      } else if (StrUtil.containsAny(caseNo, "民终")) {
+        return "民事二审"
+      } else if (StrUtil.containsAny(caseNo, "特")) {
+        return "特别程序"
+      } else if (StrUtil.containsAny(caseNo, "执异")) {
+        return "执行异议"
+      } else if (StrUtil.containsAny(caseNo, "民申")) {
+        return "民事申请再审审查"
+      } else if (StrUtil.containsAny(caseNo, "刑终")) {
+        return "刑事二审"
+      } else if (StrUtil.containsAny(caseNo, "行终")) {
+        return "行政二审"
+      } else if (StrUtil.containsAny(caseNo, "财保")) {
+        return "财产保全"
+      } else if (StrUtil.containsAny(caseNo, "行初")) {
+        return "行政一审"
+      } else if (StrUtil.containsAny(caseNo, "民辖")) {
+        return "民事管辖"
+      } else if (StrUtil.containsAny(caseNo, "民再")) {
+        return "民事再审"
+      } else {
+        return "其它阶段"
+      }
+    }
+    "其它阶段"
+  }
+
   def lastStage(s: String): String = {
     if (StringUtils.isNotBlank(s)) {
       val arr = s.split(" ")
@@ -338,13 +396,18 @@ object BaseUtil {
   }
 
   @deprecated
-  private val case_pat = ".*([(\\(]\\d{4}[)\\)][^号]*号?).*".r
+  private val case_pat =
+    ".*([(\\(]\\d{4}[)\\)][^号]*号?).*".r
   @deprecated
-  private val case_pat2 = "(((20\\d{2}|19\\d{2})|\\((20\\d{2}|19\\d{2})\\))[^号]{3,}号?).*".r
-  private val case_pat3 = ".*([(\\(]\\d{4}[)\\)][^号]*号.*?)".r
+  private val case_pat2 =
+    "(((20\\d{2}|19\\d{2})|\\((20\\d{2}|19\\d{2})\\))[^号]{3,}号?).*".r
+  private val case_pat3 =
+    ".*([(\\(]\\d{4}[)\\)][^号]*号.*?)".r
   @deprecated
-  private val year_pat = "(\\d{4}?)年".r
-  private val year_pat_2 = "\\(?(\\d{4}?)\\)?年".r
+  private val year_pat =
+    "(\\d{4}?)年".r
+  private val year_pat_2 =
+    "\\(?(\\d{4}?)\\)?年".r
 
   /**
    * 案号格式规整
@@ -418,9 +481,11 @@ object BaseUtil {
     null
   }
 
-  private val id_card_pattern = "^[1-9]\\d{5}(19|20)\\d{2}((0[1-9])|(1[0-2])|([0-1]\\*{1,2})|\\*{2})(([0-2][1-9])|10|20|30|31|\\*{2})\\d{3}[0-9Xx]$".r
+  private val id_card_pattern =
+    "^[1-9]\\d{5}(19|20)\\d{2}((0[1-9])|(1[0-2])|([0-1]\\*{1,2})|\\*{2})(([0-2][1-9])|10|20|30|31|\\*{2})\\d{3}[0-9Xx]$".r
 
-  private lazy val maxYear = BaseUtil.nowDate(pattern = "yyyy").toInt
+  private lazy val maxYear =
+    BaseUtil.nowDate(pattern = "yyyy").toInt
 
   def is_id_card(str: String): Boolean = {
     if (StringUtils.isEmpty(str))
@@ -452,8 +517,10 @@ object BaseUtil {
     (province, city, district)
   }
 
-  private val LONGITUDE = "^[\\-\\+]?(0?\\d{1,2}\\.\\d{1,15}|1[0-7]?\\d{1}\\.\\d{1,15}|180\\.0{1,15})$"
-  private val LATITUDE = "^[\\-\\+]?([0-8]?\\d{1}\\.\\d{1,15}|90\\.0{1,15})$"
+  private val LONGITUDE =
+    "^[\\-\\+]?(0?\\d{1,2}\\.\\d{1,15}|1[0-7]?\\d{1}\\.\\d{1,15}|180\\.0{1,15})$"
+  private val LATITUDE =
+    "^[\\-\\+]?([0-8]?\\d{1}\\.\\d{1,15}|90\\.0{1,15})$"
 
   /**
    * 经纬度转换
@@ -473,7 +540,8 @@ object BaseUtil {
     s.toString()
   }
 
-  private val name_pat = Pattern.compile("[^\\u4e00-\\u9fa50-9a-zA-Z()() ·,]")
+  private val name_pat =
+    Pattern.compile("[^\\u4e00-\\u9fa50-9a-zA-Z()() ·,]")
 
   def nameCleanup(name: String): String = {
     var re = ""
@@ -516,17 +584,100 @@ object BaseUtil {
     }
   }
 
+  def filter_json(json: String, col: String): String = {
+    if (StringUtils.isBlank(json) || StringUtils.isBlank(col)) return "[]"
+    import scala.collection.JavaConverters._
+    try {
+      val list1 = JSON.parseArray(json, classOf[util.Map[String, String]]).asScala
+      val list2 = list1.filter(x => x.containsKey(col) && StringUtils.isNotBlank(x.get(col))).asJava
+      JSON.toJSONString(list2, new Array[SerializeFilter](0))
+    } catch {
+      case e: Exception => {
+        "[]"
+      }
+    }
+  }
+
+
+  def regJson(json: Option[Any]) = json match {
+    case Some(map: Map[String, Any]) => map
+    case Some(map: List[Map[String, Any]]) => map
+    case _ => List[Map[String, String]]()
+  }
+
+  def json_array(json: String, col: String = "name"): List[Map[String, String]] = {
+    try {
+      if (StringUtils.isBlank(json)) return List[Map[String, String]]()
+      regJson(scala.util.parsing.json.JSON.parseFull(filter_json(json, col))).asInstanceOf[List[Map[String, String]]]
+    } catch {
+      case e: Exception => {
+        //println("error--json "+json)
+        List[Map[String, String]]()
+        // List.empty
+      }
+    }
+  }
+
+  def list_json(list: List[Map[String, String]]): String = {
+    import org.json4s.DefaultFormats
+    var m = mutable.Map[String, mutable.Map[String, String]]()
+    try {
+      list.foreach(x => {
+        val n = collection.mutable.Map(x.toSeq: _*)
+        val name = x.getOrElse("name", "")
+        if (name.length > 0 && !m.contains(name)) {
+          m ++= mutable.Map(name -> n)
+        }
+      })
+      if (m.isEmpty) return "[]"
+      Json(DefaultFormats).write(m.values)
+    } catch {
+      case e: Exception => {
+        return "[]"
+      }
+    }
+  }
+
+  def merge_json(yg_name: String, bg_name: String, all_name: String): String = {
+    list_json(json_array(yg_name) ++ json_array(bg_name) ++ json_array(all_name))
+  }
+
+  def all_name(s: String): String = {
+    val r = "[]"
+    if (StringUtils.isNotBlank(s)) {
+      val names = s.split("\\u0001", -1)
+      val list = ListBuffer[Map[String, String]]()
+      for (n <- names) {
+        list ++= json_array(n)
+      }
+      return list_json(list.toList)
+    }
+    r
+  }
+
   def main(args: Array[String]): Unit = {
-    val json_array = "[{\"name\": \"史某某\", \"litigant_id\": \"\"}, {\"name\": \"丁某某\", \"litigant_id\": \"\"}, {\"name\": \"杨某\", \"litigant_id\": \"\"}, {\"name\": \"陈某某\", \"litigant_id\": \"\"}]&[{\"name\": \"丁某某\", \"litigant_id\": \"\"}, {\"name\": \"史某某\", \"litigant_id\": \"\"}]&[{\"name\": \"杨某\", \"litigant_id\": \"\"}, {\"name\": \"陈某某\", \"litigant_id\": \"\"}]"
-    val json_path = "$.name"
-    println(split_names(json_array,json_path))
-//    println(nameCleanup("小米科技.;有,@限公  司  雷军"))
+    val s = "[{\"name\":\"史某某\",\"litigant_id\":\"xx\"},{\"name\":\"伍新贵\",\"litigant_id\":\"\"}]\u0001[{\"name\":\"伍新贵\",\"litigant_id\":\"\"}]"
+    println(all_name(s))
+    val s1 = "[{\"name\":\"\",\"litigant_id\":\"xx\"},{\"name\":\"伍新贵\",\"litigant_id\":\"\"}]"
+    val s2 = "[{\"name\":\"\",\"litigant_id\":\"xx\"},{\"name\":\"何安平\",\"litigant_id\":\"\"}]"
+    val s3 = "[{\"name\":\"xxx\",\"litigant_id\":\"xx\"},{\"name\":\"伍新贵\",\"litigant_id\":\"\"},{\"name\":\"何安平\",\"litigant_id\":\"\"}]"
+
+    //    val s1 = null
+    //    val s2 = ""
+    //    val s3 = ""
+
+    println(merge_json(s1, s2, s3))
+    //val str = ""
+    //    val json_array = "[{\"name\": \"史某某\", \"litigant_id\": \"\"}, {\"name\": \"丁某某\", \"litigant_id\": \"\"}, {\"name\": \"杨某\", \"litigant_id\": \"\"}, {\"name\": \"陈某某\", \"litigant_id\": \"\"}]&[{\"name\": \"丁某某\", \"litigant_id\": \"\"}, {\"name\": \"史某某\", \"litigant_id\": \"\"}]&[{\"name\": \"杨某\", \"litigant_id\": \"\"}, {\"name\": \"陈某某\", \"litigant_id\": \"\"}]"
+    //    val json_path = "$.name"
+    //    println(split_names(json_array, json_path))
+    //    println(nameCleanup("小米科技.;有,@限公  司  雷军"))
     //    println(title("xx", null, "reason"))
     //    println(parseAddress("大石桥市人民法院"))
-      //  println(case_no_trim_v2("(2019)年中国贸仲京裁字第0394号号"))
+    //  println(case_no_trim_v2("(2019)年中国贸仲京裁字第0394号号"))
     //    val seq = Seq("1", "3", "2", "7").mkString("\001")
     //    println(sortString(seq))
-    //println(caseStage("(2019)鄂初7号"))
+    //println(caseStage("(2019)鄂初7号"))
     /*val yg_name = ",,"
     val bg_name = "张三,,小米,;"
     println(compareName(yg_name, bg_name))*/