Przeglądaj źródła

fix: 信用惩戒查询bug修复

许家凯 4 lat temu
rodzic
commit
b385cf0344

+ 1 - 0
src/main/scala/com/winhc/bigdata/spark/ng/credit_punishment/CreditPunishment.scala

@@ -26,6 +26,7 @@ object CreditPunishment {
     )
     val spark = SparkUtils.InitEnv(this.getClass.getSimpleName + ":step.01", config)
 
+//        val ds = "20210518"
     val ds = BaseUtil.getYesterday()
 
     CreditPunishmentDataExtraction(spark).calc()

+ 36 - 2
src/main/scala/com/winhc/bigdata/spark/ng/credit_punishment/CreditPunishmentCaseAgg.scala

@@ -125,6 +125,21 @@ case class CreditPunishmentCaseAgg(s: SparkSession
            |                        SELECT  *
            |                        FROM    ${CreditPunishment.extractionTargetTab}
            |                        WHERE   ds > '$ds'
+           |                        UNION ALL
+           |                        SELECT  t2.*
+           |                        FROM    (
+           |                                    SELECT  *
+           |                                    FROM    winhc_ng.bds_credit_punishment_data_extraction
+           |                                    WHERE   ds > '$ds'
+           |                                ) AS t1
+           |                        JOIN    (
+           |                                    SELECT  *
+           |                                    FROM    winhc_ng.bds_credit_punishment_data_extraction
+           |                                    WHERE   ds <= '$ds'
+           |                                ) as t2
+           |                        ON      t1.case_no = t2.case_no
+           |                        AND     t1. name = t2.name
+           |                        AND     t1.court_name = t2. court_name
            |                    )
            |        )
            |WHERE   num = 1
@@ -143,10 +158,11 @@ case class CreditPunishmentCaseAgg(s: SparkSession
          |SELECT
          |        split(xjk_rowkey,'@@') [0 ] AS tn
          |        ,split(xjk_rowkey,'@@') [2 ] AS rowkey
+         |        ,name
          |FROM    (
          |        SELECT
          |                *
-         |                ,ROW_NUMBER() OVER(PARTITION BY xjk_rowkey ORDER BY xjk_rowkey DESC ) AS num
+         |                ,ROW_NUMBER() OVER(PARTITION BY xjk_rowkey,name ORDER BY xjk_rowkey DESC ) AS num
          |        FROM    (
          |                SELECT  *
          |                FROM    (
@@ -200,6 +216,7 @@ case class CreditPunishmentCaseAgg(s: SparkSession
          |                        JOIN    all_rowkey AS t2
          |                        ON      t1.rowkey = t2.rowkey
          |                        AND     t1.tn = t2.tn
+         |                        AND     t1.name = t2.name
          |                    )
          |        )
          |WHERE   num = 1
@@ -267,7 +284,6 @@ case class CreditPunishmentCaseAgg(s: SparkSession
 
     sql(
       s"""
-         |---INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $target_tab PARTITION(ds='${ds}')
          |${get_sql(org_tab)}
          |""".stripMargin)
       .cache()
@@ -275,6 +291,24 @@ case class CreditPunishmentCaseAgg(s: SparkSession
 
     val all_record = getAllDataByTmpData("tmp_data1")
 
+    //test
+/*
+    sql(
+      s"""
+         |CREATE TABLE IF NOT EXISTS winhc_ng.xjk_test_01 as
+         |select * from
+         |tmp_data1
+         |""".stripMargin)
+
+    sql(
+      s"""
+         |CREATE TABLE IF NOT EXISTS winhc_ng.xjk_test_02 as
+         |select * from
+         |$all_record
+         |""".stripMargin)
+    return*/
+    //test end
+
     sql(
       s"""
          |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $target_tab PARTITION(ds='${ds}')

+ 4 - 3
src/main/scala/com/winhc/bigdata/spark/ng/credit_punishment/CreditPunishmentEntityAgg.scala

@@ -4,7 +4,7 @@ import com.winhc.bigdata.spark.implicits.CaseClass2JsonHelper._
 import com.winhc.bigdata.spark.ng.credit_punishment.udf.DeletedMergeUDF
 import com.winhc.bigdata.spark.udf.BaseFunc
 import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
-import com.winhc.bigdata.spark.utils.{BaseUtil, DateUtils, LoggingUtils}
+import com.winhc.bigdata.spark.utils.{DateUtils, LoggingUtils}
 import org.apache.commons.lang3.StringUtils
 import org.apache.spark.sql.SparkSession
 
@@ -185,7 +185,8 @@ case class CreditPunishmentEntityAgg(s: SparkSession
          |        ,collect_set(name)[0] AS name
          |        ,collect_set(card_num)[0] AS card_num
          |        ,array_distinct(split(CONCAT_WS(',',collect_set(label)),',')) AS label
-         |        ,sum(record_num) as record_num
+         |        --- ,sum(record_num) as record_num
+         |        ,count(1) as record_num
          |        ,double_2_str(sum(CAST(total_exec_amount AS DOUBLE) )) AS total_exec_amount
          |        ,double_2_str(sum(CAST(total_no_exec_amount AS DOUBLE) )) AS total_no_exec_amount
          |        ,double_2_str(sum(CAST(zxr_total_exec_amount AS DOUBLE) )) AS zxr_total_exec_amount
@@ -212,7 +213,7 @@ case class CreditPunishmentEntityAgg(s: SparkSession
 
     sql(
       s"""
-         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $target_tab PARTITION(ds='${BaseUtil.getYesterday()}')
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $target_tab PARTITION(ds='${ds}')
          |SELECT  t1.credit_punishment_entity_id
          |        ,t1.keyno
          |        ,t1.name

+ 4 - 3
src/main/scala/com/winhc/bigdata/spark/ng/credit_punishment/udf/CreditPunishmentCaseAggUDF.scala

@@ -94,14 +94,11 @@ case class CreditPunishmentCaseAggUDF() extends UserDefinedAggregateFunction {
     val deleted = input.getLong(5)
     val detail_data = input.getString(6)
 
-    buffer(10) = s"$tn$delimiter$deleted$delimiter$rowkey" +: buffer.getSeq[String](0)
 
     deleted match {
       //不计历史信息:
       case 0 => {
         buffer(0) = s"$tn$delimiter$rowkey" +: buffer.getSeq[String](0)
-        buffer(1) = card_num +: buffer.getSeq[String](1)
-        buffer(2) = keyno +: buffer.getSeq[String](2)
         buffer(3) = tn +: buffer.getSeq[String](3)
         buffer(4) = getCaseCreateTime(buffer.getTimestamp(4), case_create_time)
         buffer(6) = buffer.getDouble(6) + getAmount(detail_data, "$.no_exec_amount")
@@ -127,6 +124,10 @@ case class CreditPunishmentCaseAggUDF() extends UserDefinedAggregateFunction {
       }
       case _ => return
     }
+    buffer(10) = s"$tn$delimiter$deleted$delimiter$rowkey" +: buffer.getSeq[String](10)
+
+    buffer(2) = keyno +: buffer.getSeq[String](2)
+    buffer(1) = card_num +: buffer.getSeq[String](1)
 
     tn match {
       case "company_zxr" => {