Browse Source

feat: 信用惩戒查询新需求

许家凯 3 years ago
parent
commit
5855a7b405

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/ng/credit_punishment/CreditPunishment.scala

@@ -26,7 +26,7 @@ object CreditPunishment {
     )
     val spark = SparkUtils.InitEnv(this.getClass.getSimpleName + ":step.01", config)
 
-//        val ds = "20210518"
+//        val ds = "20210519"
     val ds = BaseUtil.getYesterday()
 
     CreditPunishmentDataExtraction(spark).calc()

+ 2 - 0
src/main/scala/com/winhc/bigdata/spark/ng/credit_punishment/CreditPunishmentCaseAgg.scala

@@ -62,6 +62,7 @@ case class CreditPunishmentCaseAgg(s: SparkSession
          |    ,record_num bigint COMMENT '记录条数'
          |
          |    ,exec_info string COMMENT '执行情况'
+         |    ,amount_involved string COMMENT '涉案金额'
          |
          |    ,total_exec_amount STRING COMMENT '累计被执行总金额'
          |    ,total_no_exec_amount STRING COMMENT '疑似当前欠款总金额'
@@ -242,6 +243,7 @@ case class CreditPunishmentCaseAgg(s: SparkSession
        |        ,cast(mm['record_num'] as bigint) AS record_num
        |
        |        ,mm['exec_info'] AS exec_info
+       |        ,mm['amount_involved'] AS amount_involved
        |
        |        ,mm['total_exec_amount'] AS total_exec_amount
        |        ,mm['total_no_exec_amount'] AS total_no_exec_amount

+ 8 - 1
src/main/scala/com/winhc/bigdata/spark/ng/credit_punishment/CreditPunishmentEntityAgg.scala

@@ -30,7 +30,7 @@ case class CreditPunishmentEntityAgg(s: SparkSession
     code2Name()
     double_2_str()
 
-    def array_distinct(arr: Seq[String]): String = if (arr == null || arr.isEmpty) null else arr.distinct.mkString(",")
+    def array_distinct(arr: Seq[String]): String = if (arr == null || arr.isEmpty) null else arr.filter(StringUtils.isNotEmpty).distinct.mkString(",")
 
     spark.udf.register("array_distinct", array_distinct _)
     spark.udf.register("deleted_merge", new DeletedMergeUDF)
@@ -76,6 +76,9 @@ case class CreditPunishmentEntityAgg(s: SparkSession
          |    ,county STRING COMMENT '区'
          |    ,other_info STRING COMMENT '其他信息-公司或人的其它信息,json格式'
          |    ,record_num bigint COMMENT '记录条数'
+         |
+         |    ,create_case_time_year string COMMENT '立案时间,年份,逗号分割'
+         |
          |    ,total_exec_amount STRING COMMENT '累计被执行总金额'
          |    ,total_no_exec_amount STRING COMMENT '疑似当前欠款总金额'
          |    ,zxr_total_exec_amount STRING COMMENT '被执行人当前被执行总金额'
@@ -187,6 +190,7 @@ case class CreditPunishmentEntityAgg(s: SparkSession
          |        ,array_distinct(split(CONCAT_WS(',',collect_set(label)),',')) AS label
          |        --- ,sum(record_num) as record_num
          |        ,count(1) as record_num
+         |        ,array_distinct(collect_set(split(case_create_time,'-')[0])) AS case_create_time_year
          |        ,double_2_str(sum(CAST(total_exec_amount AS DOUBLE) )) AS total_exec_amount
          |        ,double_2_str(sum(CAST(total_no_exec_amount AS DOUBLE) )) AS total_no_exec_amount
          |        ,double_2_str(sum(CAST(zxr_total_exec_amount AS DOUBLE) )) AS zxr_total_exec_amount
@@ -225,6 +229,7 @@ case class CreditPunishmentEntityAgg(s: SparkSession
          |        ,get_county_name(t2.area_code) as county
          |        ,company_other_info(legal_entity_id,legal_entity_name,legal_entity_type,reg_capital,reg_capital_amount,reg_capital_currency,to_millis_timestamp(t2.estiblish_time),logo) as other_info
          |        ,record_num
+         |        ,case_create_time_year
          |        ,t1.total_exec_amount
          |        ,t1.total_no_exec_amount
          |        ,t1.zxr_total_exec_amount
@@ -262,6 +267,7 @@ case class CreditPunishmentEntityAgg(s: SparkSession
          |        ,get_county_name(SUBSTRING(card_num,0,6)) AS county
          |        ,person_other_info(get_birth_year(card_num),get_gender(card_num)) as other_info
          |        ,record_num
+         |        ,case_create_time_year
          |        ,total_exec_amount
          |        ,total_no_exec_amount
          |        ,zxr_total_exec_amount
@@ -295,6 +301,7 @@ case class CreditPunishmentEntityAgg(s: SparkSession
          |        ,null AS county
          |        ,null as other_info
          |        ,record_num
+         |        ,case_create_time_year
          |        ,total_exec_amount
          |        ,total_no_exec_amount
          |        ,zxr_total_exec_amount

+ 23 - 0
src/main/scala/com/winhc/bigdata/spark/ng/credit_punishment/udf/CreditPunishmentCaseAggUDF.scala

@@ -62,6 +62,9 @@ case class CreditPunishmentCaseAggUDF() extends UserDefinedAggregateFunction {
 
     , StructField("final_case_exec_info", StringType) //11
     , StructField("dishonest_exec_info", StringType) //12
+
+    , StructField("zxr_exec_amount", DoubleType) //13  被执行人标的额
+    , StructField("final_case_exec_amount", DoubleType) //14 终本案件标的额
   ))
 
   override def dataType: DataType = DataTypes.createMapType(StringType, StringType)
@@ -83,6 +86,9 @@ case class CreditPunishmentCaseAggUDF() extends UserDefinedAggregateFunction {
 
     buffer.update(11, null)
     buffer.update(12, null)
+
+    buffer.update(13, 0d)
+    buffer.update(14, 0d)
   }
 
   override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
@@ -133,10 +139,14 @@ case class CreditPunishmentCaseAggUDF() extends UserDefinedAggregateFunction {
       case "company_zxr" => {
         buffer(5) = buffer.getDouble(5) + getAmount(detail_data, "$.exec_money")
         buffer(7) = buffer.getDouble(7) + getAmount(detail_data, "$.exec_money")
+
+        buffer(13) = buffer.getDouble(13) + getAmount(detail_data, "$.exec_money")
       }
       case "company_zxr_final_case" => {
         buffer(8) = buffer.getDouble(8) + getAmount(detail_data, "$.exec_amount")
         buffer(9) = buffer.getDouble(9) + getAmount(detail_data, "$.no_exec_amount")
+
+        buffer(14) = buffer.getDouble(14) + getAmount(detail_data, "$.exec_amount")
       }
       case _ => {}
     }
@@ -162,6 +172,9 @@ case class CreditPunishmentCaseAggUDF() extends UserDefinedAggregateFunction {
     if (buffer1.getString(12) == null) {
       buffer1(12) = buffer2.getString(12)
     }
+
+    buffer1(13) = buffer1.getDouble(13) + buffer2.getDouble(13)
+    buffer1(14) = buffer1.getDouble(14) + buffer2.getDouble(14)
   }
 
   override def evaluate(buffer: Row): Any = {
@@ -198,6 +211,12 @@ case class CreditPunishmentCaseAggUDF() extends UserDefinedAggregateFunction {
     val e2 = buffer.getString(12)
 
     val exec_info = if (e1 == null) if (e2 == null) "未知" else e2 else e1
+
+    val a1 = getAmountInvolved(buffer.getDouble(13))
+    val a2 = getAmountInvolved(buffer.getDouble(14))
+
+    val amount_involved = if (a1 == null) if (a2 == null) "未知" else a2 else a1
+
     Map(
       "rowkey" -> rowkey.mkString(",")
       , "card_num" -> card_num.headOption.getOrElse(null)
@@ -207,6 +226,7 @@ case class CreditPunishmentCaseAggUDF() extends UserDefinedAggregateFunction {
       , "record_num" -> s"${rowkey.length}"
 
       , "exec_info" -> exec_info
+      , "amount_involved" -> amount_involved
 
       , "total_exec_amount" -> double2String(total_exec_amount)
       , "total_no_exec_amount" -> double2String(total_no_exec_amount)
@@ -230,6 +250,9 @@ case class CreditPunishmentCaseAggUDF() extends UserDefinedAggregateFunction {
 
   }
 
+  private def getAmountInvolved(d: Double): String = if (d == 0d) null else NumberUtil.decimalFormat("#.##万元", NumberUtil.div(d, 10000))
+
+
   private def getStr(json: String, jsonPath: String): String = {
     JSONPath.eval(JSON.parseObject(json), jsonPath).asInstanceOf[String]
   }