ソースを参照

fix: 查失信限高修复数量问题

许家凯 3 年 前
コミット
c37a0d9951

+ 9 - 1
src/main/scala/com/winhc/bigdata/spark/ng/credit_punishment/CreditPunishment.scala

@@ -16,6 +16,14 @@ object CreditPunishment {
   val entityTargetTab = "winhc_ng.bds_credit_punishment_entity_info"
   val entityRemoveTargetTab = "winhc_ng.bds_credit_punishment_entity_info_remove"
 
+  //---跑增量重写数据
+  """
+    |
+    |ALTER TABLE winhc_ng.bds_credit_punishment_case_info RENAME TO bds_credit_punishment_case_info_bak_20210906;
+    |ALTER TABLE winhc_ng.bds_credit_punishment_entity_info RENAME TO bds_credit_punishment_entity_info_bak_20210906;
+    |ALTER TABLE winhc_ng.bds_credit_punishment_entity_info_remove RENAME TO bds_credit_punishment_entity_info_remove_bak_20210906;
+    |""".stripMargin
+
 
   def main(args: Array[String]): Unit = {
     val project = "winhc_ng"
@@ -26,7 +34,7 @@ object CreditPunishment {
     )
     val spark = SparkUtils.InitEnv(this.getClass.getSimpleName + ":step.01", config)
 
-//        val ds = "20210519"
+    //        val ds = "20210519"
     val ds = BaseUtil.getYesterday()
 
     CreditPunishmentDataExtraction(spark).calc()

+ 26 - 9
src/main/scala/com/winhc/bigdata/spark/ng/credit_punishment/CreditPunishmentEntityAgg.scala

@@ -47,12 +47,29 @@ case class CreditPunishmentEntityAgg(s: SparkSession
       } else {
         rs.split(",").filter(r => r.split("@@")(0).equals(prefix))
           .map(_.split("@@")(2))
-          .toSeq.mkStringOrNull()
+          .distinct
+          .toSeq
+          .mkStringOrNull()
       }
     }
 
     spark.udf.register("get_rowkey_by_prefix", get_rowkey_by_prefix _)
 
+
+    def get_rowkey_by_prefix_by_count(rs: String, prefix: String, flag: String): Int = {
+      if (StringUtils.isEmpty(rs)) {
+        0
+      } else {
+        rs.split(",").filter(r => r.split("@@")(0).equals(prefix) && r.split("@@")(1).equals(flag))
+          .map(_.split("@@")(2))
+          .distinct
+          .size
+      }
+    }
+
+    spark.udf.register("get_rowkey_by_prefix_count", get_rowkey_by_prefix_by_count _)
+
+
     def get_gender(card_num: String): Int = {
       if (StringUtils.isEmpty(card_num)) {
         return -1
@@ -234,15 +251,15 @@ case class CreditPunishmentEntityAgg(s: SparkSession
          |       ,final_case_exec_amount
          |       ,final_case_no_exec_amount
          |
-         |       ,company_dishonest_info_num_0
-         |       ,company_zxr_num_0
-         |       ,company_zxr_final_case_num_0
-         |       ,company_zxr_restrict_num_0
+         |       ,get_rowkey_by_prefix_count(all_rowkey,'company_dishonest_info','0') as company_dishonest_info_num_0
+         |       ,get_rowkey_by_prefix_count(all_rowkey,'company_zxr','0') as company_zxr_num_0
+         |       ,get_rowkey_by_prefix_count(all_rowkey,'company_zxr_final_case','0') as company_zxr_final_case_num_0
+         |       ,get_rowkey_by_prefix_count(all_rowkey,'company_zxr_restrict','0') as company_zxr_restrict_num_0
          |
-         |       ,company_dishonest_info_num_1
-         |       ,company_zxr_num_1
-         |       ,company_zxr_final_case_num_1
-         |       ,company_zxr_restrict_num_1
+         |       ,get_rowkey_by_prefix_count(all_rowkey,'company_dishonest_info','1') as company_dishonest_info_num_1
+         |       ,get_rowkey_by_prefix_count(all_rowkey,'company_zxr','1') as company_zxr_num_1
+         |       ,get_rowkey_by_prefix_count(all_rowkey,'company_zxr_final_case','1') as company_zxr_final_case_num_1
+         |       ,get_rowkey_by_prefix_count(all_rowkey,'company_zxr_restrict','1') as company_zxr_restrict_num_1
          |
          |       ,deleted
          |       ,get_rowkey_by_prefix(all_rowkey,'company_dishonest_info') as company_dishonest_info_rowkey

+ 2 - 0
src/main/scala/com/winhc/bigdata/spark/ng/credit_punishment/CreditPunishmentEntityRemove.scala

@@ -19,6 +19,8 @@ case class CreditPunishmentEntityRemove(s: SparkSession
 
   val target_tab = CreditPunishment.entityRemoveTargetTab
 
+  init()
+
 
   def init(): Unit = {
     sql(

+ 5 - 1
src/main/scala/com/winhc/bigdata/spark/ng/credit_punishment/udf/CreditPunishmentCaseAggUDF.scala

@@ -197,7 +197,11 @@ case class CreditPunishmentCaseAggUDF() extends UserDefinedAggregateFunction {
     val stringToInt: Map[String, Int] = all_rowkey.map(r => {
       val ss = r.split(delimiter)
       (ss(0), ss(1))
-    }).filter(r => r._2.equals("0") || r._2.equals("1")).map(r => s"${r._1}_${r._2}").groupBy(f => f).mapValues(_.size)
+    })
+      .filter(r => r._2.equals("0") || r._2.equals("1"))
+      .map(r => s"${r._1}_${r._2}")
+      .distinct
+      .groupBy(f => f).mapValues(_.size)
 
     val rowkey = strings
       .map(r => {