Browse Source

案件整合

xufei 3 years ago
parent
commit
5f9ae8bf12

+ 103 - 72
src/main/scala/com/winhc/bigdata/spark/ng/judicial/JudicialCaseRelationAggs.scala

@@ -1,6 +1,6 @@
 package com.winhc.bigdata.spark.ng.judicial
 
-import com.winhc.bigdata.spark.udf.{BaseFunc, CaseAmtAggs, CaseReasonAggs, CompanyMapping, CourtRank, NameAggs, NameAggsPlus}
+import com.winhc.bigdata.spark.udf.{AllNamePlus, AllNamePlusV2, BaseFunc, CaseAmtAggs, CaseAmtAggsPlus, CaseReasonAggs, CompanyMapping, CourtRank, NameAggs, NameAggsPlus}
 import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
 import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
 import org.apache.commons.lang3.StringUtils
@@ -188,8 +188,10 @@ case class JudicialCaseRelationAggs(s: SparkSession, project: String, args_case:
   val ods_judicial_case_id_mapping = s" $project.ods_judicial_case_id_mapping"
   //主表
   val ads_judicial_case_relation_r1 = s" $project.ads_judicial_case_relation_r1"
-  //明细表
-  val ads_judicial_case_relation_r2 = s" $project.ads_judicial_case_relation_r2"
+//  //明细表
+//  val ads_judicial_case_relation_r2 = s" $project.ads_judicial_case_relation_r2"
+  //明细表(增强)
+  val ads_judicial_case_relation_r3 = s" $project.ads_judicial_case_relation_r3"
   //案件移除表
   val ads_judicial_case_id_mapping_r1_deleted = s" $project.ads_judicial_case_id_mapping_r1_deleted"
   //案件移除表
@@ -279,12 +281,12 @@ case class JudicialCaseRelationAggs(s: SparkSession, project: String, args_case:
         |                        FROM    (
         |                                    SELECT  rowkey_1 AS start_id,tn_1 AS tn
         |                                    FROM    $bds_judicial_case_relation
-        |                                    WHERE   ds = '20210604'
+        |                                    WHERE   ds = '$calc_ds'
         |                                    AND     rowkey_1 IS NOT NULL AND     tn_1 IS NOT NULL
         |                                    UNION ALL
         |                                    SELECT  rowkey_2 AS start_id,tn_2 AS tn
         |                                    FROM    $bds_judicial_case_relation
-        |                                    WHERE   ds = '20210604'
+        |                                    WHERE   ds = '$calc_ds'
         |                                    AND     rowkey_2 IS NOT NULL AND     tn_2 IS NOT NULL
         |                                )
         |                    )
@@ -293,21 +295,26 @@ case class JudicialCaseRelationAggs(s: SparkSession, project: String, args_case:
         |${if (isWindows) "LIMIT 1000" else ""}
         |""".stripMargin).show(1000,false)
 
+    //todo 去重
     sql(
       s"""
         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_relation_kafka PARTITION(ds='$calc_ds')
-        |SELECT  *
-        |        ,to_json(MAP('start_id',start_id,'end_id',end_id,"topic_type",topic_type,"connect_type",connect_type)) relation_json
-        |FROM    (
-        |            SELECT  concat_ws('_',rowkey_1, tn_flag(tn_1)) start_id
-        |                    ,concat_ws('_',rowkey_2, tn_flag(tn_2)) end_id
-        |                    ,"400" AS topic_type
-        |                    ,connect_type
-        |            FROM    $bds_judicial_case_relation
-        |            WHERE   ds = '20210604'
-        |            AND     rowkey_1 IS NOT NULL AND  rowkey_2 IS NOT NULL
-        |            AND     tn_1 IS NOT NULL AND  tn_2 IS NOT NULL
-        |        )
+        |SELECT start_id, end_id, topic_type, connect_type, relation_json
+        |FROM (
+        |   SELECT  start_id, end_id, topic_type, connect_type
+        |           ,to_json(MAP('start_id',start_id,'end_id',end_id,"topic_type",topic_type,"connect_type",connect_type)) relation_json
+        |           ,ROW_NUMBER() OVER (PARTITION by combine_id(start_id,end_id) ORDER by start_id desc) num
+        |   FROM    (
+        |               SELECT  concat_ws('_',rowkey_1, tn_flag(tn_1)) start_id
+        |                       ,concat_ws('_',rowkey_2, tn_flag(tn_2)) end_id
+        |                       ,"400" AS topic_type
+        |                       ,connect_type
+        |               FROM    $bds_judicial_case_relation
+        |               WHERE   ds = '$calc_ds'
+        |               AND     rowkey_1 IS NOT NULL AND  rowkey_2 IS NOT NULL
+        |               AND     tn_1 IS NOT NULL AND  tn_2 IS NOT NULL
+        |           )
+        |) WHERE num = 1
         |${if (isWindows) "LIMIT 1000" else ""}
         |""".stripMargin).show(1000,false)
 
@@ -374,13 +381,18 @@ case class JudicialCaseRelationAggs(s: SparkSession, project: String, args_case:
          |            FROM    (
          |                        SELECT  id, judicase_id
          |                                ,ROW_NUMBER() OVER (PARTITION BY id ORDER BY ds DESC) num
-         |                        FROM    $ads_judicial_case_relation_r2
+         |                        FROM    $ads_judicial_case_relation_r3
          |                        WHERE   ds < '$calc_ds'
          |                    )
          |            WHERE   num = 1
          |        ) b
          |ON      a.old_id = b.judicase_id
          |""".stripMargin)
+
+    //分区不存在
+    addEmptyPartitionOrSkip(ads_judicial_case_id_mapping_r1_deleted, calc_ds)
+    addEmptyPartitionOrSkip(ads_judicial_case_id_mapping_r2_deleted, calc_ds)
+
   }
 
   def calc(): Unit = {
@@ -390,6 +402,9 @@ case class JudicialCaseRelationAggs(s: SparkSession, project: String, args_case:
     spark.udf.register("name_aggs", new NameAggsPlus(1000))
     spark.udf.register("case_reason", new CaseReasonAggs(1000))
     spark.udf.register("case_amt", new CaseAmtAggs(1000))
+    spark.udf.register("case_amt_plus", new CaseAmtAggsPlus(1000))
+    spark.udf.register("all_name_plus", new AllNamePlus(1000))
+    spark.udf.register("all_name_plus_v2", new AllNamePlusV2(100))
 
     //detail 文书id
     //替换司法案件id
@@ -423,7 +438,7 @@ case class JudicialCaseRelationAggs(s: SparkSession, project: String, args_case:
          |                        SELECT  *, md5(concat_ws('',detail_id,tn)) row_id
          |                                ,ROW_NUMBER() OVER (PARTITION BY detail_id,tn ORDER BY ds DESC) num
          |                        FROM    $ads_judicial_case_relation_pre
-         |                        WHERE   ds > ${calc_last_ds(ads_judicial_case_relation_id)}
+         |                        WHERE   ds > ${calc_last_ds(ads_judicial_case_relation_id)} AND  case_no_trim(case_no) is not null AND  date is not null
          |                    )
          |            WHERE   num = 1
          |        ) a
@@ -439,59 +454,10 @@ case class JudicialCaseRelationAggs(s: SparkSession, project: String, args_case:
          |ON      a.row_id = b.row_id
          |""".stripMargin)
 
-    //司法案件主表
-    sql(
-      s"""
-         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_relation_r1 PARTITION(ds='$calc_ds')
-         |SELECT
-         |    judicase_id,
-         |    title       ,
-         |    case_type   ,
-         |    case_reason ,
-         |    case_no     ,
-         |    court_name  ,
-         |    case_stage  ,
-         |    lable       ,
-         |    name_aggs['yg_name'] yg_name,
-         |    name_aggs['bg_name'] bg_name,
-         |    all_name,
-         |    case_amt    ,
-         |    date        ,
-         |    court_level ,
-         |    0 deleted
-         |FROM
-         |(
-         |SELECT  judicase_id
-         |        ,max(title) title
-         |        ,concat_ws(',',collect_set(case_type)) case_type
-         |        ,case_reason(case_reason,date,flag) case_reason
-         |        ,concat_ws(',',collect_set(case_no)) case_no
-         |        ,concat_ws(',',collect_set(court_name)) court_name
-         |        ,max(last_stage) case_stage
-         |        ,trim_black(concat_ws(',',max(case_type),collect_set(lable))) lable
-         |        ,case_amt(case_amt) AS case_amt
-         |        ,max(date) AS date
-         |        ,trim_black(concat_ws(',',collect_set(court_level))) court_level
-         |        ,name_aggs(yg_name,bg_name,flag,data['date']) name_aggs
-         |        ,all_name(concat_ws('\u0001',collect_set(all_name))) all_name
-         |FROM    (
-         |        SELECT  a.*
-         |        FROM    (
-         |                   SELECT  judicase_id,flag,title,case_type,case_reason,case_no,court_name,case_stage,lable,yg_name,bg_name,all_name,date,case_amt
-         |                   ,court_level(court_name) court_level,data
-         |                   ,first_value(case_stage) OVER (PARTITION BY judicase_id ORDER BY data['date'] DESC ) AS last_stage
-         |                   FROM    $ads_judicial_case_relation_id
-         |                   WHERE   ds = '$calc_ds'
-         |                ) a
-         |        )
-         |GROUP BY judicase_id
-         |)x
-         |""".stripMargin).show(20, false)
-
     //明细表
     sql(
       s"""
-         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_relation_r2 PARTITION(ds='$calc_ds')
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_relation_r3 PARTITION(ds='$calc_ds')
          |SELECT
          |    id,
          |    judicase_id,
@@ -506,7 +472,10 @@ case class JudicialCaseRelationAggs(s: SparkSession, project: String, args_case:
          |    name_aggs['yg_name'] yg_name,
          |    name_aggs['bg_name'] bg_name,
          |    last_date   ,
-         |    0 deleted
+         |    0 deleted   ,
+         |    all_name    ,
+         |    case_amt    ,
+         |    court_level
          |FROM
          |(
          |SELECT  md5(concat_ws('',concat_ws('',judicase_id),CLEANUP(case_no))) id
@@ -515,16 +484,23 @@ case class JudicialCaseRelationAggs(s: SparkSession, project: String, args_case:
          |        ,case_type(max(case_no)) as case_type
          |        ,case_reason(case_reason,date,flag) case_reason
          |        ,case_no
-         |        ,max(court_name) court_name
+         |        -- ,max(court_name) court_name
+         |        ,concat_ws(',',collect_set(court_name)) court_name
          |        ,case_stage(max(case_no)) as case_stage
          |        ,trim_black(concat_ws(',',max(case_type),collect_set(lable))) lable
          |        ,concat('[',concat_ws(',',collect_set(detail)),']') detail
          |        ,max(last_date) last_date
          |        ,name_aggs(yg_name,bg_name,flag,data['date']) name_aggs
+         |        -- ,all_name(concat_ws('\u0001',collect_set(all_name))) all_name
+         |        ,all_name_plus_v2(all_name) all_name
+         |        -- ,case_amt(case_amt, cast(data['date'] as string), flag) AS case_amt
+         |        ,case_amt_plus(cast(case_amt as string), cast(data['date'] as string), flag) AS case_amt
+         |        ,trim_black(concat_ws(',',collect_set(court_level))) court_level
          |FROM    (
          |        SELECT  a.*
          |                ,first_value(title) OVER (PARTITION BY a.judicase_id ORDER BY date ASC ) AS first_title
          |                ,first_value(date) OVER (PARTITION BY a.judicase_id ORDER BY date DESC ) AS last_date
+         |                ,court_level(court_name) court_level
          |        FROM    (
          |                   SELECT  *
          |                   FROM    $ads_judicial_case_relation_id
@@ -536,9 +512,64 @@ case class JudicialCaseRelationAggs(s: SparkSession, project: String, args_case:
          |) x
          |""".stripMargin).show(10, false)
 
+
+    //司法案件主表
+        sql(
+          s"""
+             |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_relation_r1 PARTITION(ds='$calc_ds')
+             |SELECT
+             |    judicase_id,
+             |    title       ,
+             |    case_type   ,
+             |    case_reason ,
+             |    case_no     ,
+             |    court_name  ,
+             |    case_stage  ,
+             |    lable       ,
+             |    name_aggs['yg_name'] yg_name,
+             |    name_aggs['bg_name'] bg_name,
+             |    all_name,
+             |    case_amt    ,
+             |    date        ,
+             |    court_level ,
+             |    0 deleted
+             |FROM
+             |(
+             |SELECT  judicase_id
+             |        ,max(title) title
+             |        ,concat_ws(',',collect_set(case_type)) case_type
+             |        ,case_reason(case_reason,date,'0') case_reason
+             |        ,concat_ws(',',collect_set(case_no)) case_no
+             |        ,trim_black(concat_ws(',',collect_set(court_name))) court_name
+             |        ,max(last_stage) case_stage
+             |        ,trim_black(concat_ws(',', collect_set(lable)) ) lable
+             |        -- ,max(case_amt) AS case_amt
+             |        ,max(first_case_amt) case_amt
+             |        -- ,cast(case_amt_plus(case_amt['case_amt'], case_amt['date'], case_amt['flag'])['case_amt'] as double) AS case_amt
+             |        ,max(date) AS date
+             |        ,trim_black(concat_ws(',',collect_set(court_level))) court_level
+             |        ,name_aggs(yg_name,bg_name,'0',date) name_aggs
+             |        -- ,all_name(concat_ws('\u0001',collect_set(all_name))) all_name
+             |        ,all_name_plus_v2(all_name) all_name
+             |        -- ,null all_name
+             |FROM    (
+             |        SELECT  a.*
+             |        FROM    (
+             |                   SELECT  judicase_id,title,case_type,case_reason,case_no,court_name,case_stage,lable,yg_name,bg_name,all_name,date,case_amt
+             |                   ,court_level(court_name) court_level
+             |                   ,first_value(case_stage) OVER (PARTITION BY judicase_id ORDER BY date DESC ) AS last_stage
+             |                   ,first_value(case_amt['case_amt']) OVER (PARTITION BY judicase_id ORDER BY case_amt['flag'] DESC ) AS first_case_amt
+             |                   FROM    $ads_judicial_case_relation_r3
+             |                   WHERE   ds = '$calc_ds'
+             |                ) a
+             |        )
+             |GROUP BY judicase_id
+             |)x
+             |""".stripMargin).show(20, false)
+
     //分区不存在,插入空分区
     addEmptyPartitionOrSkip(ads_judicial_case_relation_r1, calc_ds)
-    addEmptyPartitionOrSkip(ads_judicial_case_relation_r2, calc_ds)
+    addEmptyPartitionOrSkip(ads_judicial_case_relation_r3, calc_ds)
   }
 
   private def get_partition_order_by(): String = {

+ 73 - 0
src/main/scala/com/winhc/bigdata/spark/udf/AllNamePlus.scala

@@ -0,0 +1,73 @@
+package com.winhc.bigdata.spark.udf
+
+import com.winhc.bigdata.spark.utils.BaseUtil
+import com.winhc.bigdata.spark.utils.BaseUtil.{json_array, list_json}
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
+import org.apache.spark.sql.types._
+
+import scala.collection.mutable
+
+/**
+ *
+ * @Description:人名合并
+ * @author π
+ * @date 2020/10/26 15:15
+ */
+
+class AllNamePlus(max: Int) extends UserDefinedAggregateFunction {
+
+  //val flags = Seq("0", "1", "2", "4", "8")
+  val split = "\u0001"
+  val empty_col = "[]"
+
+  override def inputSchema: StructType = StructType(Array[StructField](
+    StructField("all_name", DataTypes.StringType)
+  ))
+
+  override def bufferSchema: StructType = StructType(
+    Array[StructField](
+      StructField("t1", DataTypes.StringType)
+    )
+  )
+
+  override def dataType: DataType = DataTypes.StringType
+
+  override def deterministic: Boolean = true
+
+  override def initialize(buffer: MutableAggregationBuffer): Unit = {
+    buffer.update(0, "[]")
+  }
+
+  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
+    if (buffer.size >= max) {
+      return
+    }
+    if (input.size != 1) return
+    var all_name = input.getAs[String](0)
+    all_name = BaseUtil.filter_json(all_name, "name")
+    if (empty_col.equals(all_name)) {
+      return
+    }
+    val buf_name: String = buffer.getAs[String](0)
+    buffer(0) = list_json(json_array(buf_name) ++ json_array(all_name))
+
+  }
+
+  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
+    if (buffer1.size >= max) {
+      return
+    }
+    update(buffer1, buffer2)
+  }
+
+  override def evaluate(buffer: Row): Any = {
+    var all_name = "[]"
+    val tmp_name = buffer.getAs[String](0)
+
+    if (StringUtils.isNotBlank(tmp_name)) all_name = tmp_name
+    all_name
+  }
+
+}

+ 74 - 0
src/main/scala/com/winhc/bigdata/spark/udf/AllNamePlusV2.scala

@@ -0,0 +1,74 @@
+package com.winhc.bigdata.spark.udf
+
+import com.winhc.bigdata.spark.utils.BaseUtil
+import com.winhc.bigdata.spark.utils.BaseUtil.{json_array, list_json}
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
+import org.apache.spark.sql.types._
+
+/**
+ *
+ * @Description:人名合并
+ * @author π
+ * @date 2020/10/26 15:15
+ */
+
+class AllNamePlusV2(max: Int) extends UserDefinedAggregateFunction {
+
+  //val flags = Seq("0", "1", "2", "4", "8")
+  val split = "\u0001"
+  val empty_col = "[]"
+
+  override def inputSchema: StructType = StructType(Array[StructField](
+    StructField("all_name", DataTypes.StringType)
+  ))
+
+  override def bufferSchema: StructType = StructType(
+    Array[StructField](
+      StructField("t1", DataTypes.createArrayType(DataTypes.StringType))
+    )
+  )
+
+  override def dataType: DataType = DataTypes.StringType
+
+  override def deterministic: Boolean = true
+
+  override def initialize(buffer: MutableAggregationBuffer): Unit = {
+    buffer.update(0, Seq.empty)
+  }
+
+  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
+    val buf_name = buffer.getSeq[String](0)
+    if (buf_name.size >= max) {
+      return
+    }
+    if (input.size != 1) return
+    var all_name = input.getAs[String](0)
+    all_name = BaseUtil.filter_json(all_name, "name")
+    if (empty_col.equals(all_name)) {
+      return
+    }
+    buffer(0) = buf_name ++ Seq(all_name)
+  }
+
+  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
+
+    val buf_name = buffer1.getSeq[String](0)
+    if (buf_name.size >= max) {
+      return
+    }
+    val all_name = buffer2.getSeq[String](0)
+    buffer1(0) = buf_name ++ all_name
+  }
+
+  override def evaluate(buffer: Row): Any = {
+    val tmp_name = buffer.getSeq[String](0)
+    if (tmp_name == null || tmp_name.isEmpty) return "[]"
+    val list = tmp_name.map(x => {
+      json_array(x)
+    }).reduce(_ ++ _)
+    list_json(list)
+  }
+
+}

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/udf/CaseAmtAggs.scala

@@ -54,7 +54,7 @@ class CaseAmtAggs(max: Int) extends UserDefinedAggregateFunction {
     val bus_date = input.getAs[String](1)
     val flag = input.getAs[String](2)
 
-    if (case_amt == null) {
+    if (case_amt == 0 || bus_date == null || flag == null) {
       return
     }
     //    if (!flags.contains(flag)) {

+ 111 - 0
src/main/scala/com/winhc/bigdata/spark/udf/CaseAmtAggsPlus.scala

@@ -0,0 +1,111 @@
+package com.winhc.bigdata.spark.udf
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
+import org.apache.spark.sql.types._
+
+import scala.collection.mutable
+
+/**
+ * 1、如果有被执行人,那么取被执行人的执行标的额
+ * 2、如果有终本案件,取终本案件的执行标的
+ * 3、如果有裁判文书取裁判文书的金额(取一审)
+ *
+ * @Description:案件金额聚合
+ * @author π
+ * @date 2020/10/26 15:15
+ */
+
+class CaseAmtAggsPlus(max: Int) extends UserDefinedAggregateFunction {
+
+  //val flags = Seq("0", "1", "2", "4", "8")
+  val split = "\u0001"
+
+  override def inputSchema: StructType = StructType(Array[StructField](
+    StructField("case_amt", DataTypes.StringType)
+    , StructField("bus_date", DataTypes.StringType)
+    , StructField("flag", DataTypes.StringType)
+  ))
+
+  override def bufferSchema: StructType = StructType(
+    Array[StructField](
+      StructField("t1", DataTypes.createMapType(DataTypes.StringType, DataTypes.DoubleType))
+    )
+  )
+
+  override def dataType: DataType = DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType)
+
+  override def deterministic: Boolean = true
+
+  override def initialize(buffer: MutableAggregationBuffer): Unit = {
+    buffer.update(0, Map[String, Double]())
+  }
+
+  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
+    val tmp_buff = buffer.getAs[Map[String, Double]](0)
+
+    if (buffer.size >= max) {
+      return
+    }
+    if (input.size != 3) return
+    val case_amt0 = input.getAs[String](0)
+    val bus_date = input.getAs[String](1)
+    val flag = input.getAs[String](2)
+
+    if (StringUtils.isBlank(case_amt0) || StringUtils.isBlank(bus_date) || StringUtils.isBlank(flag) || case_amt0 == "0") {
+      return
+    }
+    val case_amt = case_amt0.toDouble
+    buffer(0) = mutable.Map(bus_date + split + flag -> case_amt) ++= scala.collection.mutable.Map[String, Double](tmp_buff.toSeq: _*)
+  }
+
+  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
+    val tmp_buff = buffer1.getAs[Map[String, Double]](0)
+    if (tmp_buff.size >= max) {
+      return
+    }
+    buffer1(0) = tmp_buff ++ buffer2.getAs[Map[String, Double]](0)
+  }
+
+  override def evaluate(buffer: Row): Any = {
+    var bus_date = ""
+    var flag = ""
+    var case_amt = ""
+    val m0: Map[String, Double] = buffer.getAs[Map[String, Double]](0)
+    if (m0.isEmpty) {
+      return Map("flag" -> flag, "date" -> bus_date, "case_amt" -> case_amt)
+    } else {
+      val keys = m0.keySet.toSeq
+      val tup: Seq[(String, String, Double)] = keys.map(x => {
+        val arr = x.split(split, -1)
+        (arr(1), arr(0), m0(x))
+      })
+      val flags = Seq[String]("7", "6", "0")
+      val seq = flags.map(x => {
+        get_value(tup, x)
+      }).filter(_._3 != "0")
+      if (seq.nonEmpty) {
+        flag = seq.head._1
+        bus_date = seq.head._2
+        case_amt = seq.head._3
+      }
+    }
+    Map("flag" -> flag, "date" -> bus_date, "case_amt" -> case_amt)
+  }
+
+  def get_value(tup: Seq[(String, String, Double)], flag: String) = {
+    var bus_date: String = ""
+    var case_amt:String  = ""
+    val t1 = tup.filter(x => {
+      x._1.equals(flag) && x._3 > 0
+    })
+    if (t1.nonEmpty) {
+      val t2 = t1.map(x => x._2 -> x._3).toMap
+      val key = t2.keySet.toSeq.min
+      case_amt = t2(key).toString
+      bus_date = key
+    }
+    (flag, bus_date, case_amt)
+  }
+}

+ 12 - 0
src/main/scala/com/winhc/bigdata/spark/udf/CompanyMapping.scala

@@ -91,6 +91,18 @@ trait CompanyMapping {
       tn_flag(name)
     })
 
+    spark.udf.register("add_pre", (id: String) => {
+      add_pre(id)
+    })
+
+    spark.udf.register("remove_pre", (id: String) => {
+      remove_pre(id)
+    })
+
+    spark.udf.register("combine_id", (rw1: String, rw2: String) => {
+      combine_id(rw1, rw2)
+    })
+
 
   }
 

+ 27 - 1
src/main/scala/com/winhc/bigdata/spark/utils/BaseUtil.scala

@@ -349,7 +349,7 @@ object BaseUtil {
   def trimBlack(s: String): String = {
     var r = ""
     if (StringUtils.isNotBlank(s)) {
-      r = s.split(",").filter(StringUtils.isNotBlank(_)).mkString(",")
+      r = s.split(",").filter(StringUtils.isNotBlank(_)).toSet.mkString(",")
     }
     r
   }
@@ -680,7 +680,33 @@ object BaseUtil {
     "-1"
   }
 
+  def add_pre(id: String): String = {
+    import scala.util.Random
+    val r = new Random().nextInt(100)
+    s"${r}_${id}"
+  }
+
+  def remove_pre(id: String): String = {
+    if (StringUtils.isBlank(id)) return ""
+    id.split("_", -1)(1)
+  }
+
+  def combine_id(rw1: String, rw2: String): String = {
+    val listBuffer = new ListBuffer[String]()
+    if(StringUtils.isNotBlank(rw1)){
+      listBuffer.append(rw1)
+    }
+    if(StringUtils.isNotBlank(rw2)){
+      listBuffer.append(rw2)
+    }
+    listBuffer.toList.sorted.mkString("_")
+  }
+
   def main(args: Array[String]): Unit = {
+    println(combine_id("111",""))
+    println(combine_id(null,""))
+    println(add_pre("123"))
+    println(remove_pre("1_123"))
     println(flag_tn("10"))
     println(tn_flag("company_zxr1"))
     val s = "[{\"name\":\"史某某\",\"litigant_id\":\"xx\"},{\"name\":\"伍新贵\",\"litigant_id\":\"\"}]\u0001[{\"name\":\"伍新贵\",\"litigant_id\":\"\"}]"