Browse Source

feat: 信用惩戒查询相关代码

许家凯 3 years ago
parent
commit
c5552932db

+ 38 - 0
src/main/scala/com/winhc/bigdata/spark/ng/credit_punishment/CreditPunishment.scala

@@ -0,0 +1,38 @@
+package com.winhc.bigdata.spark.ng.credit_punishment
+
+import com.winhc.bigdata.spark.config.EsConfig
+import com.winhc.bigdata.spark.utils.{BaseUtil, SparkUtils}
+
+import scala.collection.mutable
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/5/8 10:41
+ *        信用惩戒
+ */
+object CreditPunishment {
+  val extractionTargetTab = "winhc_ng.bds_credit_punishment_data_extraction"
+  val caseTargetTab = "winhc_ng.bds_credit_punishment_case_info"
+  val entityTargetTab = "winhc_ng.bds_credit_punishment_entity_info"
+  val entityRemoveTargetTab = "winhc_ng.bds_credit_punishment_entity_info_remove"
+
+
+  def main(args: Array[String]): Unit = {
+    val project = "winhc_ng"
+    val config = EsConfig.getEsConfigMap ++ mutable.Map(
+      "spark.hadoop.odps.project.name" -> project,
+      "spark.debug.maxToStringFields" -> "200",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "100"
+    )
+    val spark = SparkUtils.InitEnv(this.getClass.getSimpleName + ":step.01", config)
+
+    val ds = BaseUtil.getYesterday()
+
+    CreditPunishmentDataExtraction(spark).calc()
+    CreditPunishmentCaseAgg(spark).calc(ds)
+    CreditPunishmentEntityAgg(spark).calc(ds)
+    CreditPunishmentEntityRemove(spark).calc(ds)
+
+    spark.stop()
+  }
+}

+ 145 - 0
src/main/scala/com/winhc/bigdata/spark/ng/credit_punishment/CreditPunishmentCaseAgg.scala

@@ -0,0 +1,145 @@
+package com.winhc.bigdata.spark.ng.credit_punishment
+
+import com.winhc.bigdata.spark.ng.credit_punishment.udf.CreditPunishmentCaseAggUDF
+import com.winhc.bigdata.spark.ng.utils.StartAndEndDsUtils
+import com.winhc.bigdata.spark.udf.BaseFunc
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils}
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.SparkSession
+
+import scala.annotation.meta.getter
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/5/10 18:23
+ *
+ *        根据案件聚合
+ */
+case class CreditPunishmentCaseAgg(s: SparkSession
+                                  ) extends LoggingUtils with BaseFunc {
+  @(transient@getter) val spark: SparkSession = s
+
+  private val target_tab = CreditPunishment.caseTargetTab
+
+  init()
+
+  private def init(): Unit = {
+    spark.udf.register("credit_punishment_case_agg", new CreditPunishmentCaseAggUDF)
+
+    def generate_entity_id(keyno: String, card_num: String, person_id: String, credit_punishment_case_id: String): String = {
+
+      if (StringUtils.isNotEmpty(keyno) && keyno.length == 32) {
+        return keyno
+      }
+      if (BaseUtil.is_id_card(card_num)) {
+        return person_id
+      }
+      credit_punishment_case_id
+    }
+
+    spark.udf.register("generate_entity_id", generate_entity_id _)
+
+
+    cleanup()
+    sql(
+      s"""
+         |CREATE TABLE IF NOT EXISTS $target_tab
+         |(
+         |    credit_punishment_case_id STRING COMMENT '信用惩戒查询案件id'
+         |    ,credit_punishment_entity_id STRING COMMENT '信用惩戒对象主体id'
+         |    ,name STRING COMMENT '惩戒主体名称'
+         |    ,court_name STRING COMMENT '法院名称'
+         |    ,case_no STRING COMMENT '案号'
+         |    ,rowkey STRING COMMENT '涉案rowkey'
+         |    ,card_num STRING COMMENT '案号'
+         |    ,keyno STRING COMMENT '惩戒主体id'
+         |    ,label STRING COMMENT '标签'
+         |    ,case_create_time STRING COMMENT '立案时间'
+         |    ,record_num bigint COMMENT '记录条数'
+         |    ,total_exec_amount STRING COMMENT '累计被执行总金额'
+         |    ,total_no_exec_amount STRING COMMENT '疑似当前欠款总金额'
+         |    ,zxr_total_exec_amount STRING COMMENT '被执行人当前被执行总金额'
+         |    ,final_case_exec_amount STRING COMMENT '终本案件执行标的总金额'
+         |    ,final_case_no_exec_amount STRING COMMENT '终本案件未履行总金额'
+         |    ,company_dishonest_info_num BIGINT COMMENT '失信人条数'
+         |    ,company_zxr_num BIGINT COMMENT '被执条数'
+         |    ,company_zxr_final_case_num BIGINT COMMENT '终本案件条数'
+         |    ,company_zxr_restrict_num BIGINT COMMENT '限高条数'
+         |    ,deleted BIGINT COMMENT '0:未删除,1:删除'
+         |)
+         |COMMENT '信用惩戒查询案件表'
+         |PARTITIONED BY
+         |(
+         |    ds STRING COMMENT '分区'
+         |)
+         |""".stripMargin)
+
+  }
+
+
+  private def getCreditPunishmentDataExtraction(ds: String): String = {
+    val tmp_view = "credit_punishment_data_extraction_calc"
+    if (ds == null) {
+      sql(
+        s"""
+           |SELECT  *
+           |FROM    ${CreditPunishment.extractionTargetTab}
+           |WHERE   ds > '0'
+           |""".stripMargin)
+        .createTempView(tmp_view)
+    } else {
+      sql(
+        s"""
+           |SELECT  *
+           |FROM    ${CreditPunishment.extractionTargetTab}
+           |WHERE   ds > '$ds'
+           |""".stripMargin)
+        .createTempView(tmp_view)
+    }
+    tmp_view
+  }
+
+
+  def calc(ds:String): Unit = {
+    val args = StartAndEndDsUtils(spark).get_gt_ds(CreditPunishment.extractionTargetTab, CreditPunishment.caseTargetTab)
+
+    val org_tab = getCreditPunishmentDataExtraction(args.inc_tab_gt_ds)
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $target_tab PARTITION(ds='${ds}')
+         |SELECT  md5(cleanup(CONCAT_WS('',name,court_name,case_no))) AS credit_punishment_case_id
+         |        ,generate_entity_id(mm['keyno'],mm['card_num'],md5(CONCAT_WS('',mm['card_num'],name)),md5(cleanup(CONCAT_WS('',name,court_name,case_no)))) as credit_punishment_entity_id
+         |        ,name
+         |        ,court_name
+         |        ,case_no
+         |        ,mm['rowkey'] AS rowkey
+         |        ,mm['card_num'] AS card_num
+         |        ,mm['keyno'] AS keyno
+         |        ,mm['label'] AS label
+         |        ,mm['case_create_time'] AS case_create_time
+         |        ,cast(mm['record_num'] as bigint) AS record_num
+         |        ,mm['total_exec_amount'] AS total_exec_amount
+         |        ,mm['total_no_exec_amount'] AS total_no_exec_amount
+         |        ,mm['zxr_total_exec_amount'] AS zxr_total_exec_amount
+         |        ,mm['final_case_exec_amount'] AS final_case_exec_amount
+         |        ,mm['final_case_no_exec_amount'] AS final_case_no_exec_amount
+         |        ,CAST(mm['company_dishonest_info_num'] AS BIGINT) AS company_dishonest_info_num
+         |        ,CAST(mm['company_zxr_num'] AS BIGINT) AS company_zxr_num
+         |        ,CAST(mm['company_zxr_final_case_num'] AS BIGINT) AS company_zxr_final_case_num
+         |        ,CAST(mm['company_zxr_restrict_num'] AS BIGINT) AS company_zxr_restrict_num
+         |        ,cast(mm['deleted'] as bigint) AS deleted
+         |FROM    (
+         |            SELECT  name
+         |                    ,court_name
+         |                    ,case_no
+         |                    ,credit_punishment_case_agg(rowkey,tn,keyno,card_num,case_create_time,deleted,detail_data) AS mm
+         |            FROM    $org_tab
+         |            GROUP BY name
+         |                     ,court_name
+         |                     ,case_no
+         |        )
+         |WHERE   name IS NOT NULL and trim(name) <> ''
+         |""".stripMargin)
+  }
+}

+ 269 - 0
src/main/scala/com/winhc/bigdata/spark/ng/credit_punishment/CreditPunishmentDataExtraction.scala

@@ -0,0 +1,269 @@
+package com.winhc.bigdata.spark.ng.credit_punishment
+
+import com.winhc.bigdata.spark.ng.utils.StartAndEndDsUtils
+import com.winhc.bigdata.spark.udf.BaseFunc
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.LoggingUtils
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.functions.{col, struct, to_json}
+
+import scala.annotation.meta.getter
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/5/10 09:48
+ *        step.01 四个维度数据抽取
+ *
+ */
+case class CreditPunishmentDataExtraction(s: SparkSession
+                                         ) extends LoggingUtils with BaseFunc {
+  @(transient@getter) val spark: SparkSession = s
+
+
+  private val target_tab = CreditPunishment.extractionTargetTab
+  init()
+
+  private def init(): Unit = {
+
+    def case_no_replace(case_no: String): String = if (StringUtils.isEmpty(case_no)) null else case_no.replace("(", "(").replace(")", ")").trim
+
+    spark.udf.register("case_no_replace", case_no_replace _)
+
+    sql(
+      s"""
+         |CREATE TABLE IF NOT EXISTS $target_tab
+         |(
+         |    rowkey STRING COMMENT 'hbase中rowkey'
+         |    ,tn STRING COMMENT '数据维度'
+         |    ,keyno STRING COMMENT '公司id或人的id'
+         |    ,name STRING COMMENT '公司名或人名'
+         |    ,card_num STRING COMMENT '身份证号或工商注册号'
+         |    ,court_name STRING COMMENT '法院名称'
+         |    ,case_no STRING COMMENT '案号'
+         |    ,case_create_time datetime COMMENT '立案时间'
+         |    ,deleted BIGINT  COMMENT '是否删除'
+         |    ,detail_data STRING COMMENT '祥情数据'
+         |)
+         |COMMENT 'TABLE COMMENT'
+         |PARTITIONED BY
+         |(
+         |    ds STRING COMMENT '分区'
+         |)
+         |""".stripMargin)
+  }
+
+
+  private def getData(tab: String, ds: String): String = {
+    val ads_tab = s"winhc_ng.ads_$tab"
+    val inc_ads_tab = s"winhc_ng.inc_ads_$tab"
+    val cols = getColumns(ads_tab)
+
+    if (ds == null) {
+      sql(
+        s"""
+           |SELECT  ${cols.mkString(",")}
+           |FROM    (
+           |            SELECT  *
+           |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds DESC ) AS num
+           |            FROM    (
+           |                        SELECT  *
+           |                        FROM    $ads_tab
+           |                        WHERE   ds > 0
+           |                        UNION ALL
+           |                        SELECT  *
+           |                        FROM    $inc_ads_tab
+           |                        WHERE   ds > 0
+           |                    )
+           |        )
+           |WHERE   num = 1
+           |""".stripMargin)
+        .createTempView(s"${tab}_all_data")
+    } else {
+      sql(
+        s"""
+           |SELECT  ${cols.mkString(",")}
+           |FROM    (
+           |            SELECT  *
+           |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds DESC ) AS num
+           |            FROM    (
+           |                        SELECT  *
+           |                        FROM    $inc_ads_tab
+           |                        WHERE   ds > '$ds'
+           |                    )
+           |        )
+           |WHERE   num = 1
+           |""".stripMargin)
+        .createTempView(s"${tab}_all_data")
+    }
+    s"${tab}_all_data"
+  }
+
+
+  private def dataExtraction(ds: String, target_ds: String): Unit = {
+    val company_dishonest_info = getData("company_dishonest_info", ds)
+    val company_zxr = getData("company_zxr", ds)
+    val company_zxr_final_case = getData("company_zxr_final_case", ds)
+    val company_zxr_restrict = getData("company_zxr_restrict", ds)
+
+
+    val company_dishonest_info_cols = getColumns(s"winhc_ng.ads_company_dishonest_info")
+    val company_zxr_cols = getColumns(s"winhc_ng.ads_company_zxr")
+    val company_zxr_final_case_cols = getColumns(s"winhc_ng.ads_company_zxr_final_case")
+    val company_zxr_restrict_cols = getColumns(s"winhc_ng.ads_company_zxr_restrict")
+
+
+    val company_dishonest_info_df = sql(
+      s"""
+         |SELECT rowkey as xjk_rowkey
+         |       ,'company_dishonest_info' as tn
+         |       ,keyno as xjk_keyno
+         |       ,name as xjk_name
+         |       ,card_num as xjk_card_num
+         |       ,court as xjk_court_name
+         |       ,case_no as xjk_case_no
+         |       ,reg_time as xjk_case_create_time
+         |       ,deleted as xjk_deleted
+         |       ,*
+         |from   $company_dishonest_info
+         |""".stripMargin)
+      .withColumn("detail_data", to_json(struct(company_dishonest_info_cols.map(col): _*)))
+      .select("xjk_rowkey"
+        , "tn"
+        , "xjk_keyno"
+        , "xjk_name"
+        , "xjk_card_num"
+        , "xjk_court_name"
+        , "xjk_case_no"
+        , "xjk_case_create_time"
+        , "xjk_deleted"
+        , "detail_data"
+      )
+
+
+    val company_zxr_df = sql(
+      s"""
+         |SELECT rowkey as xjk_rowkey
+         |       ,'company_zxr' as tn
+         |       ,keyno as xjk_keyno
+         |       ,name as xjk_name
+         |       ,card as xjk_card_num
+         |       ,court as xjk_court_name
+         |       ,case_no as xjk_case_no
+         |       ,case_create_time as xjk_case_create_time
+         |       ,deleted as xjk_deleted
+         |       ,*
+         |from   $company_zxr
+         |""".stripMargin)
+      .withColumn("detail_data", to_json(struct(company_zxr_cols.map(col): _*)))
+      .select("xjk_rowkey"
+        , "tn"
+        , "xjk_keyno"
+        , "xjk_name"
+        , "xjk_card_num"
+        , "xjk_court_name"
+        , "xjk_case_no"
+        , "xjk_case_create_time"
+        , "xjk_deleted"
+        , "detail_data"
+      )
+
+    val company_zxr_final_case_df = sql(
+      s"""
+         |SELECT rowkey as xjk_rowkey
+         |       ,'company_zxr_final_case' as tn
+         |       ,keyno as xjk_keyno
+         |       ,name as xjk_name
+         |       ,identity_num as xjk_card_num
+         |       ,court_name as xjk_court_name
+         |       ,case_no as xjk_case_no
+         |       ,case_create_time as xjk_case_create_time
+         |       ,deleted as xjk_deleted
+         |       ,*
+         |from   $company_zxr_final_case
+         |""".stripMargin)
+      .withColumn("detail_data", to_json(struct(company_zxr_final_case_cols.map(col): _*)))
+      .select("xjk_rowkey"
+        , "tn"
+        , "xjk_keyno"
+        , "xjk_name"
+        , "xjk_card_num"
+        , "xjk_court_name"
+        , "xjk_case_no"
+        , "xjk_case_create_time"
+        , "xjk_deleted"
+        , "detail_data"
+      )
+
+    val company_zxr_restrict_df = sql(
+      s"""
+         |SELECT rowkey as xjk_rowkey
+         |       ,'company_zxr_restrict' as tn
+         |       ,pid as xjk_keyno
+         |       ,person_name as xjk_name
+         |       ,coalesce(card_num,identity_num) as xjk_card_num
+         |       ,court_name as xjk_court_name
+         |       ,case_no as xjk_case_no
+         |       ,case_create_time as xjk_case_create_time
+         |       ,deleted as xjk_deleted
+         |       ,*
+         |from   $company_zxr_restrict
+         |
+         |UNION ALL
+         |
+         |SELECT rowkey as xjk_rowkey
+         |       ,'company_zxr_restrict' as tn
+         |       ,company_id as xjk_keyno
+         |       ,company_name as xjk_name
+         |       ,coalesce(card_num,identity_num) as xjk_card_num
+         |       ,court_name as xjk_court_name
+         |       ,case_no as xjk_case_no
+         |       ,case_create_time as xjk_case_create_time
+         |       ,deleted as xjk_deleted
+         |       ,*
+         |from   $company_zxr_restrict
+         |""".stripMargin)
+      .withColumn("detail_data", to_json(struct(company_zxr_restrict_cols.map(col): _*)))
+      .select("xjk_rowkey"
+        , "tn"
+        , "xjk_keyno"
+        , "xjk_name"
+        , "xjk_card_num"
+        , "xjk_court_name"
+        , "xjk_case_no"
+        , "xjk_case_create_time"
+        , "xjk_deleted"
+        , "detail_data"
+      )
+
+
+    val df = company_dishonest_info_df
+      .union(company_zxr_df)
+      .union(company_zxr_final_case_df)
+      .union(company_zxr_restrict_df)
+
+    df.createOrReplaceTempView(s"credit_punishment_tmp_view")
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${target_tab} PARTITION (ds='$target_ds')
+         |SELECT xjk_rowkey as rowkey
+         |       ,tn as tn
+         |       ,xjk_keyno as keyno
+         |       ,xjk_name as name
+         |       ,xjk_card_num as card_num
+         |       ,xjk_court_name as court_name
+         |       ,case_no_replace(xjk_case_no) as case_no
+         |       ,xjk_case_create_time as case_create_time
+         |       ,xjk_deleted as deleted
+         |       ,detail_data
+         |FROM   credit_punishment_tmp_view
+         |""".stripMargin)
+  }
+
+
+  def calc(): Unit = {
+    val c = StartAndEndDsUtils(spark).get_start_and_end_args("winhc_ng.ads_company_dishonest_info", "winhc_ng.inc_ads_company_dishonest_info", target_tab)
+    dataExtraction(c.inc_tab_gt_ds, c.target_ds)
+  }
+
+}

File diff suppressed because it is too large
+ 285 - 0
src/main/scala/com/winhc/bigdata/spark/ng/credit_punishment/CreditPunishmentEntityAgg.scala


+ 67 - 0
src/main/scala/com/winhc/bigdata/spark/ng/credit_punishment/CreditPunishmentEntityRemove.scala

@@ -0,0 +1,67 @@
+package com.winhc.bigdata.spark.ng.credit_punishment
+
+import com.winhc.bigdata.spark.udf.BaseFunc
+import com.winhc.bigdata.spark.utils.LoggingUtils
+import org.apache.spark.sql.SparkSession
+
+import scala.annotation.meta.getter
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/5/13 14:59
+ *
+ *        将信用惩戒主体id标记删除
+ */
+case class CreditPunishmentEntityRemove(s: SparkSession
+                                       ) extends LoggingUtils with BaseFunc {
+  @(transient@getter) val spark: SparkSession = s
+
+
+  val target_tab = CreditPunishment.entityRemoveTargetTab
+
+
+  def init(): Unit = {
+    sql(
+      s"""
+         |CREATE TABLE IF NOT EXISTS $target_tab
+         |(
+         |    credit_punishment_case_id  STRING COMMENT '信用惩戒对象主体id,没有人的id采用的是案件id'
+         |    ,deleted BIGINT COMMENT '0未删除,1删除'
+         |)
+         |COMMENT 'bds_credit_punishment_entity_info表需要移除的主体id'
+         |PARTITIONED BY (ds STRING COMMENT '分区')
+         |""".stripMargin)
+  }
+
+
+  def calc(ds: String): Unit = {
+    sql(
+      s"""
+         |INSERT OVERWRITE TABLE $target_tab PARTITION(ds='$ds')
+         |SELECT  t1.credit_punishment_case_id
+         |        ,1 AS deleted
+         |FROM    (
+         |            SELECT  *
+         |            FROM    ${CreditPunishment.caseTargetTab}
+         |            WHERE   ds = '$ds'
+         |        ) AS t1
+         |JOIN    (
+         |            SELECT  *
+         |            FROM    (
+         |                        SELECT  *
+         |                                ,ROW_NUMBER() OVER(PARTITION BY credit_punishment_case_id ORDER BY ds DESC) AS num
+         |                        FROM    (
+         |                                    SELECT  *
+         |                                    FROM    ${CreditPunishment.caseTargetTab}
+         |                                    WHERE   ds < '$ds'
+         |                                )
+         |                    )
+         |            WHERE   num = 1
+         |        ) AS t2
+         |ON      t1.credit_punishment_case_id = t2.credit_punishment_case_id
+         |AND     t1.credit_punishment_entity_id <> t2.credit_punishment_entity_id
+         |""".stripMargin)
+
+  }
+
+}

+ 205 - 0
src/main/scala/com/winhc/bigdata/spark/ng/credit_punishment/udf/CreditPunishmentCaseAggUDF.scala

@@ -0,0 +1,205 @@
+package com.winhc.bigdata.spark.ng.credit_punishment.udf
+
+import com.alibaba.fastjson.{JSON, JSONPath}
+import com.winhc.bigdata.spark.utils.{BaseUtil, RegCapitalAmount}
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
+import org.apache.spark.sql.types._
+
+import java.sql.Timestamp
+import java.text.DecimalFormat
+import java.time.ZoneId
+import java.time.format.DateTimeFormatter
+import java.util.Locale
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/5/11 09:23
+ */
+case class CreditPunishmentCaseAggUDF() extends UserDefinedAggregateFunction {
+
+  private val tn_map = Map(
+    "company_dishonest_info" -> "1"
+    , "company_zxr" -> "2"
+    , "company_zxr_final_case" -> "3"
+    , "company_zxr_restrict" -> "4"
+  )
+  private val tn_name_map = Map(
+    "company_dishonest_info" -> "失信被执行人"
+    , "company_zxr" -> "被执行人"
+    , "company_zxr_final_case" -> "终本案件"
+    , "company_zxr_restrict" -> "限制高消费"
+  )
+
+  private val delimiter = "@@"
+
+
+  override def inputSchema: StructType = StructType(Array[StructField](
+    StructField("rowkey", StringType)
+    , StructField("tn", StringType)
+    , StructField("keyno", StringType)
+    , StructField("card_num", StringType)
+    , StructField("case_create_time", TimestampType)
+    , StructField("deleted", LongType)
+    , StructField("detail_data", StringType)
+  ))
+
+  override def bufferSchema: StructType = StructType(Array(
+    StructField("rowkey", ArrayType(StringType, containsNull = false)) // 0
+    , StructField("card_num", ArrayType(StringType, containsNull = false)) // 1
+    , StructField("keyno", ArrayType(StringType, containsNull = false)) // 2
+    , StructField("label", ArrayType(StringType, containsNull = false)) // 3
+    , StructField("case_create_time", TimestampType) // 4
+    , StructField("total_exec_amount", DoubleType) //累计被执行总金额 5
+    , StructField("total_no_exec_amount", DoubleType) //疑似当前欠款总金额 6
+    , StructField("zxr_total_exec_amount", DoubleType) //被执行人当前被执行总金额 7
+    , StructField("final_case_exec_amount", DoubleType) //终本案件执行标的总金额 8
+    , StructField("final_case_no_exec_amount", DoubleType) //终本案件未履行总金额 9
+  ))
+
+  override def dataType: DataType = DataTypes.createMapType(StringType, StringType)
+
+  override def deterministic: Boolean = false
+
+  override def initialize(buffer: MutableAggregationBuffer): Unit = {
+    buffer.update(0, Seq.empty[String])
+    buffer.update(1, Seq.empty[String])
+    buffer.update(2, Seq.empty[String])
+    buffer.update(3, Seq.empty[String])
+    buffer.update(4, null)
+    buffer.update(5, 0d)
+    buffer.update(6, 0d)
+    buffer.update(7, 0d)
+    buffer.update(8, 0d)
+    buffer.update(9, 0d)
+  }
+
+  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
+    val rowkey = input.getString(0)
+    val tn = input.getString(1)
+    val keyno = input.getString(2)
+    val card_num = input.getString(3)
+    val case_create_time = input.getTimestamp(4)
+    val deleted = input.getLong(5)
+    val detail_data = input.getString(6)
+
+    deleted match {
+      //不计历史信息:
+      case 0 => {
+        buffer(0) = s"$tn$delimiter$rowkey" +: buffer.getSeq[String](0)
+        buffer(1) = card_num +: buffer.getSeq[String](1)
+        buffer(2) = keyno +: buffer.getSeq[String](2)
+        buffer(3) = tn +: buffer.getSeq[String](3)
+        buffer(4) = getCaseCreateTime(buffer.getTimestamp(4), case_create_time)
+        buffer(6) = buffer.getDouble(6) + getAmount(detail_data, "$.no_exec_amount")
+      }
+      case 1 => {
+      }
+      case _ => return
+    }
+
+    tn match {
+      case "company_zxr" => {
+        buffer(5) = buffer.getDouble(5) + getAmount(detail_data, "$.exec_money")
+        buffer(7) = buffer.getDouble(7) + getAmount(detail_data, "$.exec_money")
+      }
+      case "company_zxr_final_case" => {
+        buffer(8) = buffer.getDouble(8) + getAmount(detail_data, "$.exec_money")
+        buffer(9) = buffer.getDouble(9) + getAmount(detail_data, "$.no_exec_amount")
+      }
+      case _ => {}
+    }
+
+  }
+
+  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
+    buffer1(0) = buffer1.getSeq[String](0) ++ buffer2.getSeq[String](0)
+    buffer1(1) = buffer1.getSeq[String](1) ++ buffer2.getSeq[String](1)
+    buffer1(2) = buffer1.getSeq[String](2) ++ buffer2.getSeq[String](2)
+    buffer1(3) = buffer1.getSeq[String](3) ++ buffer2.getSeq[String](3)
+    buffer1(4) = getCaseCreateTime(buffer1.getTimestamp(4), buffer2.getTimestamp(4))
+    buffer1(5) = buffer1.getDouble(5) + buffer2.getDouble(5)
+    buffer1(6) = buffer1.getDouble(6) + buffer2.getDouble(6)
+    buffer1(7) = buffer1.getDouble(7) + buffer2.getDouble(7)
+    buffer1(8) = buffer1.getDouble(8) + buffer2.getDouble(8)
+    buffer1(9) = buffer1.getDouble(9) + buffer2.getDouble(9)
+  }
+
+  override def evaluate(buffer: Row): Any = {
+    val strings = buffer.getSeq[String](0).distinct.filter(StringUtils.isNotBlank)
+
+    val tnDistribution: Map[String, Int] = strings.map(r => r.split(delimiter)(0)).groupBy(f => f).mapValues(_.size)
+
+    val rowkey = strings
+      .map(r => {
+        val s = r.split(delimiter)
+        s"${tn_map(s(0))}$delimiter${s(1)}"
+      })
+    val card_num: Seq[String] = buffer.getSeq[String](1).distinct
+      .filter(BaseUtil.is_id_card(_)).map(BaseUtil.id_card_trim).filter(StringUtils.isNotBlank)
+
+    val keyno = buffer.getSeq[String](2).distinct.filter(StringUtils.isNotBlank)
+    val label = buffer.getSeq[String](3).distinct.map(tn_name_map(_))
+    val case_create_time = buffer.getTimestamp(4)
+    val total_exec_amount = buffer.getDouble(5)
+    val total_no_exec_amount = buffer.getDouble(6)
+    val zxr_total_exec_amount = buffer.getDouble(7)
+    val final_case_exec_amount = buffer.getDouble(8)
+    val final_case_no_exec_amount = buffer.getDouble(9)
+    val df = DateTimeFormatter.ofPattern("yyyy-MM-dd").withLocale(Locale.CHINA).withZone(ZoneId.systemDefault)
+
+    val cct = if (case_create_time == null) null else df.format(case_create_time.toInstant)
+    val deleted = if (rowkey.isEmpty) "1" else "0"
+    Map(
+      "rowkey" -> rowkey.mkString(",")
+      , "card_num" -> card_num.headOption.getOrElse(null)
+      , "keyno" -> keyno.headOption.getOrElse(null)
+      , "label" -> label.mkString(",")
+      , "case_create_time" -> cct
+      , "record_num" -> s"${rowkey.length}"
+      , "total_exec_amount" -> double2String(total_exec_amount)
+      , "total_no_exec_amount" -> double2String(total_no_exec_amount)
+      , "zxr_total_exec_amount" -> double2String(zxr_total_exec_amount)
+      , "final_case_exec_amount" -> double2String(final_case_exec_amount)
+      , "final_case_no_exec_amount" -> double2String(final_case_no_exec_amount)
+
+      , "company_dishonest_info_num" -> s"${tnDistribution.getOrElse("company_dishonest_info", 0)}"
+      , "company_zxr_num" -> s"${tnDistribution.getOrElse("company_zxr", 0)}"
+      , "company_zxr_final_case_num" -> s"${tnDistribution.getOrElse("company_zxr_final_case", 0)}"
+      , "company_zxr_restrict_num" -> s"${tnDistribution.getOrElse("company_zxr_restrict", 0)}"
+
+      , "deleted" -> deleted
+    )
+
+  }
+
+
+  private def getAmount(json: String, jsonPath: String): Double = {
+    val str = JSONPath.eval(JSON.parseObject(json), jsonPath).asInstanceOf[String]
+    try {
+      RegCapitalAmount.getAmount(str).toDouble / 100
+    } catch {
+      case ex: Exception => 0d
+    }
+  }
+
+
+  private def getCaseCreateTime(time1: Timestamp, time2: Timestamp): Timestamp = {
+    if (time2 == null && time1 == null)
+      return null
+    if (time2 == null)
+      return time1
+    if (time1 == null)
+      return time2
+
+    if (time2.compareTo(time1) > 0)
+      time1
+    else
+      time2
+  }
+
+  private val dof = new DecimalFormat("0.##")
+
+  private def double2String(d: Double): String = dof.format(d)
+}

+ 46 - 0
src/main/scala/com/winhc/bigdata/spark/ng/credit_punishment/udf/DeletedMergeUDF.scala

@@ -0,0 +1,46 @@
+package com.winhc.bigdata.spark.ng.credit_punishment.udf
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
+import org.apache.spark.sql.types.{DataType, LongType, StructField, StructType}
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/5/12 10:26
+ *
+ *        用于合并deleted状态
+ *        0 1 => 0
+ *        1 1 => 1
+ *        0 0 => 0
+ */
+class DeletedMergeUDF extends UserDefinedAggregateFunction {
+  override def inputSchema: StructType = StructType(Array[StructField](
+    StructField("deleted", LongType)
+  ))
+
+  override def bufferSchema: StructType = StructType(Array[StructField](
+    StructField("deleted", LongType)
+  ))
+
+  override def dataType: DataType = LongType
+
+  override def deterministic: Boolean = false
+
+  override def initialize(buffer: MutableAggregationBuffer): Unit = buffer.update(0, 1l)
+
+  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
+    val del = input.getLong(0)
+    if (del == 0l) {
+      buffer(0) = 0l
+    }
+  }
+
+  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
+    val del = buffer2.getLong(0)
+    if (del == 0l) {
+      buffer1(0) = 0l
+    }
+  }
+
+  override def evaluate(buffer: Row): Any = buffer.getLong(0)
+}

+ 87 - 17
src/main/scala/com/winhc/bigdata/spark/ng/utils/StartAndEndDsUtils.scala

@@ -1,6 +1,7 @@
 package com.winhc.bigdata.spark.ng.utils
 
-import com.winhc.bigdata.spark.utils.{LoggingUtils, SparkUtils}
+import com.winhc.bigdata.spark.udf.BaseFunc
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
 import org.apache.spark.sql.SparkSession
 
 import scala.annotation.meta.getter
@@ -11,13 +12,13 @@ import scala.collection.mutable
  * @date: 2021/2/27 09:23
  */
 
-case class StartAndEndDsArgs(inc: Boolean
+case class StartAndEndDsArgs(inc: Boolean // true 写出到增量表或只读增量表
                              , target_ds: String
                              , inc_tab_gt_ds: String
                             )
 
 case class StartAndEndDsUtils(s: SparkSession
-                             ) extends LoggingUtils {
+                             ) extends LoggingUtils with BaseFunc{
   @(transient@getter) val spark: SparkSession = s
 
 
@@ -31,15 +32,6 @@ case class StartAndEndDsUtils(s: SparkSession
    */
   def get_start_and_end_args(org_tab: String, inc_org_tab: String, target_tab: String, inc_target_tab: String): StartAndEndDsArgs = {
 
-    def all(): StartAndEndDsArgs = {
-      val target_ds = getLastPartitionsOrElse(inc_org_tab, getLastPartitionsOrElse(org_tab, null))
-      if (target_ds == null) {
-        throw new RuntimeException("全量来源表为空 !")
-      }
-      StartAndEndDsArgs(false, target_ds = target_ds, null)
-    }
-
-
     def inc(): StartAndEndDsArgs = {
       var gt_ds = ""
       var target_ds = ""
@@ -49,7 +41,7 @@ case class StartAndEndDsUtils(s: SparkSession
       val inc_target_tab_ds = getLastPartitionsOrElse(inc_target_tab, null)
       val target_tab_ds = getLastPartitionsOrElse(target_tab, null)
       if (inc_org_tab_ds == null) {
-        return all()
+        return all(org_tab, inc_org_tab)
       }
 
       target_ds = inc_org_tab_ds
@@ -71,13 +63,85 @@ case class StartAndEndDsUtils(s: SparkSession
       StartAndEndDsArgs(true, target_ds = target_ds, gt_ds)
     }
 
-
     val ads_last_ds = getLastPartitionsOrElse(target_tab, null)
     if (ads_last_ds == null)
-      all()
+      all(org_tab, inc_org_tab)
     else
       inc()
   }
+
+
+  private def all(org_tab: String, inc_org_tab: String): StartAndEndDsArgs = {
+    val target_ds = getLastPartitionsOrElse(inc_org_tab, getLastPartitionsOrElse(org_tab, null))
+    if (target_ds == null) {
+      throw new RuntimeException("全量来源表为空 !")
+    }
+    StartAndEndDsArgs(false, target_ds = target_ds, null)
+  }
+
+
+  /**
+   * 两个表写入到一个表中
+   *
+   * @param org_tab
+   * @param inc_org_tab
+   * @param target_tab
+   */
+  def get_start_and_end_args(org_tab: String, inc_org_tab: String, target_tab: String): StartAndEndDsArgs = {
+
+    def inc(): StartAndEndDsArgs = {
+      var gt_ds = ""
+      var target_ds = ""
+
+
+      val inc_org_tab_ds = getLastPartitionsOrElse(inc_org_tab, null)
+      val target_tab_ds = getLastPartitionsOrElse(target_tab, null)
+      if (inc_org_tab_ds == null) {
+        return all(org_tab, inc_org_tab)
+      }
+
+      target_ds = inc_org_tab_ds
+      gt_ds = target_tab_ds
+
+      if (gt_ds.equals(target_ds)) {
+        val target_second_ds = getSecondLastPartitionOrElse(target_tab, null)
+        if (target_second_ds == null) {
+          return all(org_tab, inc_org_tab)
+        } else {
+          gt_ds = target_second_ds
+        }
+      }
+      StartAndEndDsArgs(true, target_ds = target_ds, gt_ds)
+    }
+
+    val targetLastDs = getLastPartitionsOrElse(target_tab, null)
+    if (targetLastDs == null) {
+      all(org_tab, inc_org_tab)
+    } else {
+      inc()
+    }
+  }
+
+
+  def get_gt_ds(org_tab: String, target_tab: String): StartAndEndDsArgs = {
+    val org_tab_ds = getLastPartitionsOrElse(org_tab, BaseUtil.getYesterday())
+
+    val target_ds = getLastPartitionsOrElse(target_tab, null)
+    if (target_ds == null) {
+      return StartAndEndDsArgs(inc = false, target_ds = org_tab_ds, inc_tab_gt_ds = null)
+    }
+
+    if (target_ds.equals(org_tab_ds)) {
+      val target_second_ds = getSecondLastPartitionOrElse(target_tab, null)
+      if (target_second_ds == null) {
+        StartAndEndDsArgs(inc = false, target_ds = org_tab_ds, inc_tab_gt_ds = null)
+      } else {
+        StartAndEndDsArgs(inc = true, target_ds = org_tab_ds, inc_tab_gt_ds = target_second_ds)
+      }
+    } else {
+      StartAndEndDsArgs(inc = true, target_ds = org_tab_ds, inc_tab_gt_ds = target_ds)
+    }
+  }
 }
 
 
@@ -100,10 +164,16 @@ object StartAndEndDsUtils {
 
 
     val a = StartAndEndDsUtils(spark)
-      .get_start_and_end_args(org_tab, inc_org_tab, target_tab, inc_target_tab)
-    println(a)
+
+    /* val b = a.get_start_and_end_args(org_tab, inc_org_tab, target_tab)
+     println(b)*/
+
+   /* val c = a.get_start_and_end_args("winhc_ng.ads_company_dishonest_info", "winhc_ng.inc_ads_company_dishonest_info", "winhc_ng.bds_credit_punishment_data_extraction")
+    println(c)*/
 
 
+    val d = a.get_gt_ds("winhc_ng.bds_credit_punishment_data_extraction", "winhc_ng.bds_credit_punishment_case_info")
+    println(d)
     spark.stop()
   }
 }

+ 11 - 1
src/main/scala/com/winhc/bigdata/spark/udf/BaseFunc.scala

@@ -10,6 +10,7 @@ import org.apache.spark.sql.SparkSession
 import org.json4s._
 import org.json4s.jackson.JsonMethods._
 
+import java.text.DecimalFormat
 import scala.annotation.meta.getter
 
 /**
@@ -53,7 +54,7 @@ trait BaseFunc extends LoggingUtils {
   }
 
 
-  def json_2_array_udf(): Unit ={
+  def json_2_array_udf(): Unit = {
     /**
      *
      * @param json_array
@@ -201,6 +202,15 @@ trait BaseFunc extends LoggingUtils {
   }
 
 
+  def double_2_str(pattern: String = "0.##"): Unit = {
+    val dof = new DecimalFormat(pattern)
+
+    def double2String(d: Double): String = dof.format(d)
+
+    spark.udf.register("double_2_str", double2String _)
+  }
+
+
   def area_code(): Unit = {
     spark.udf.register("get_province_code", (name: String) => {
       if (StringUtils.isNotEmpty(name) && name.trim.length == 6) {