Browse Source

fix: 信用惩戒查询delete为9时udf不反回rowkey问题

许家凯 3 years ago
parent
commit
c76adcf9e2

+ 1 - 2
src/main/scala/com/winhc/bigdata/spark/ng/credit_punishment/CreditPunishmentCaseAgg.scala

@@ -294,8 +294,7 @@ case class CreditPunishmentCaseAgg(s: SparkSession
     val all_record = getAllDataByTmpData("tmp_data1")
 
     //test
-/*
-    sql(
+   /* sql(
       s"""
          |CREATE TABLE IF NOT EXISTS winhc_ng.xjk_test_01 as
          |select * from

+ 30 - 13
src/main/scala/com/winhc/bigdata/spark/ng/credit_punishment/udf/CreditPunishmentCaseAggUDF.scala

@@ -63,8 +63,9 @@ case class CreditPunishmentCaseAggUDF() extends UserDefinedAggregateFunction {
     , StructField("final_case_exec_info", StringType) //11
     , StructField("dishonest_exec_info", StringType) //12
 
-    , StructField("zxr_exec_amount", DoubleType) //13  被执行人标的额
-    , StructField("final_case_exec_amount", DoubleType) //14 终本案件标的额
+    , StructField("zxr_exec_amount", DoubleType) //13  被执行人标的额历史+非历史 和5相同
+    , StructField("final_case_exec_amount_", DoubleType) //14 终本案件标的额 历史+非历史
+    , StructField("deleted", ArrayType(LongType, containsNull = false)) //15 deleted状态
   ))
 
   override def dataType: DataType = DataTypes.createMapType(StringType, StringType)
@@ -89,6 +90,7 @@ case class CreditPunishmentCaseAggUDF() extends UserDefinedAggregateFunction {
 
     buffer.update(13, 0d)
     buffer.update(14, 0d)
+    buffer.update(15, Seq.empty[Long])
   }
 
   override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
@@ -100,14 +102,21 @@ case class CreditPunishmentCaseAggUDF() extends UserDefinedAggregateFunction {
     val deleted = input.getLong(5)
     val detail_data = input.getString(6)
 
-
+    buffer(10) = s"$tn$delimiter$deleted$delimiter$rowkey" +: buffer.getSeq[String](10)
+    buffer(2) = keyno +: buffer.getSeq[String](2)
+    buffer(1) = card_num +: buffer.getSeq[String](1)
+    buffer(15) = deleted +: buffer.getSeq[Long](15)
     deleted match {
       //不计历史信息:
       case 0 => {
         buffer(0) = s"$tn$delimiter$rowkey" +: buffer.getSeq[String](0)
         buffer(3) = tn +: buffer.getSeq[String](3)
         buffer(4) = getCaseCreateTime(buffer.getTimestamp(4), case_create_time)
-        buffer(6) = buffer.getDouble(6) + getAmount(detail_data, "$.no_exec_amount")
+
+
+        if (tn.equals("company_zxr")) {
+          buffer(6) = buffer.getDouble(6) + getAmount(detail_data, "$.exec_money")
+        }
 
         if (tn.equals("company_zxr_final_case")) {
           val exec_money = getAmount(detail_data, "$.exec_amount")
@@ -121,31 +130,33 @@ case class CreditPunishmentCaseAggUDF() extends UserDefinedAggregateFunction {
             }
             buffer(11) = str
           }
+
+          buffer(6) = buffer.getDouble(6) + getAmount(detail_data, "$.no_exec_amount")
+
+          buffer(8) = buffer.getDouble(8) + getAmount(detail_data, "$.exec_amount")
+          buffer(9) = buffer.getDouble(9) + getAmount(detail_data, "$.no_exec_amount")
+
         }
         if (tn.equals("company_dishonest_info")) {
           buffer(12) = getStr(detail_data, "$.performance")
         }
+
+        if (tn.equals("company_zxr")) {
+          buffer(7) = buffer.getDouble(7) + getAmount(detail_data, "$.exec_money")
+        }
       }
       case 1 => {
       }
       case _ => return
     }
-    buffer(10) = s"$tn$delimiter$deleted$delimiter$rowkey" +: buffer.getSeq[String](10)
 
-    buffer(2) = keyno +: buffer.getSeq[String](2)
-    buffer(1) = card_num +: buffer.getSeq[String](1)
 
     tn match {
       case "company_zxr" => {
         buffer(5) = buffer.getDouble(5) + getAmount(detail_data, "$.exec_money")
-        buffer(7) = buffer.getDouble(7) + getAmount(detail_data, "$.exec_money")
-
         buffer(13) = buffer.getDouble(13) + getAmount(detail_data, "$.exec_money")
       }
       case "company_zxr_final_case" => {
-        buffer(8) = buffer.getDouble(8) + getAmount(detail_data, "$.exec_amount")
-        buffer(9) = buffer.getDouble(9) + getAmount(detail_data, "$.no_exec_amount")
-
         buffer(14) = buffer.getDouble(14) + getAmount(detail_data, "$.exec_amount")
       }
       case _ => {}
@@ -175,12 +186,14 @@ case class CreditPunishmentCaseAggUDF() extends UserDefinedAggregateFunction {
 
     buffer1(13) = buffer1.getDouble(13) + buffer2.getDouble(13)
     buffer1(14) = buffer1.getDouble(14) + buffer2.getDouble(14)
+    buffer1(15) = buffer1.getSeq[Long](15) ++ buffer2.getSeq[Long](15)
   }
 
   override def evaluate(buffer: Row): Any = {
     val strings = buffer.getSeq[String](0).distinct.filter(StringUtils.isNotBlank)
     val all_rowkey = buffer.getSeq[String](10).distinct.filter(StringUtils.isNotBlank)
 
+
     val stringToInt: Map[String, Int] = all_rowkey.map(r => {
       val ss = r.split(delimiter)
       (ss(0), ss(1))
@@ -205,7 +218,11 @@ case class CreditPunishmentCaseAggUDF() extends UserDefinedAggregateFunction {
     val df = DateTimeFormatter.ofPattern("yyyy-MM-dd").withLocale(Locale.CHINA).withZone(ZoneId.systemDefault)
 
     val cct = if (case_create_time == null) null else df.format(case_create_time.toInstant)
-    val deleted = if (rowkey.isEmpty) "1" else "0"
+
+    val del = buffer.getSeq[Long](15).distinct
+    val deleted = if (del.contains(0)) "0" else if (del.contains("1")) "1" else "9"
+
+    //    val deleted = if (rowkey.isEmpty) "1" else "0"
 
     val e1 = buffer.getString(11)
     val e2 = buffer.getString(12)