Browse Source

fix: 信用惩戒增量优化

许家凯 3 years ago
parent
commit
6d43a97a5b

+ 146 - 6
src/main/scala/com/winhc/bigdata/spark/ng/credit_punishment/CreditPunishmentCaseAgg.scala

@@ -67,6 +67,7 @@ case class CreditPunishmentCaseAgg(s: SparkSession
          |    ,company_zxr_final_case_num BIGINT COMMENT '终本案件条数'
          |    ,company_zxr_restrict_num BIGINT COMMENT '限高条数'
          |    ,deleted BIGINT COMMENT '0:未删除,1:删除'
+         |    ,all_rowkey STRING COMMENT '所有rowkey的集合,包含历史非历史'
          |)
          |COMMENT '信用惩戒查询案件表'
          |PARTITIONED BY
@@ -80,20 +81,40 @@ case class CreditPunishmentCaseAgg(s: SparkSession
 
   private def getCreditPunishmentDataExtraction(ds: String): String = {
     val tmp_view = "credit_punishment_data_extraction_calc"
+
+    //去重 tn + rowkey + name
+    //关联 (rowkey + tn) or (case_no + court_name)
+
     if (ds == null) {
       sql(
         s"""
            |SELECT  *
-           |FROM    ${CreditPunishment.extractionTargetTab}
-           |WHERE   ds > '0'
+           |FROM    (
+           |            SELECT  *
+           |                    ,ROW_NUMBER() OVER(PARTITION BY tn,rowkey,name ORDER BY ds DESC) AS num
+           |            FROM    (
+           |                        SELECT  *
+           |                        FROM    ${CreditPunishment.extractionTargetTab}
+           |                        WHERE   ds > '0'
+           |                    )
+           |        )
+           |WHERE   num = 1
            |""".stripMargin)
         .createTempView(tmp_view)
     } else {
       sql(
         s"""
            |SELECT  *
-           |FROM    ${CreditPunishment.extractionTargetTab}
-           |WHERE   ds > '$ds'
+           |FROM    (
+           |            SELECT  *
+           |                    ,ROW_NUMBER() OVER(PARTITION BY tn,rowkey,name ORDER BY ds DESC) AS num
+           |            FROM    (
+           |                        SELECT  *
+           |                        FROM    ${CreditPunishment.extractionTargetTab}
+           |                        WHERE   ds > '$ds'
+           |                    )
+           |        )
+           |WHERE   num = 1
            |""".stripMargin)
         .createTempView(tmp_view)
     }
@@ -101,13 +122,88 @@ case class CreditPunishmentCaseAgg(s: SparkSession
   }
 
 
-  def calc(ds:String): Unit = {
+  private def getAllDataByTmpData(tmpTab: String): String = {
+
+    val cols = getColumns(CreditPunishment.caseTargetTab)
+    sql(
+      s"""
+         |SELECT
+         |        split(xjk_rowkey,'@@') [0 ] AS tn
+         |        ,split(xjk_rowkey,'@@') [1 ] AS rowkey
+         |FROM    (
+         |        SELECT
+         |                *
+         |                ,ROW_NUMBER() OVER(PARTITION BY xjk_rowkey ORDER BY xjk_rowkey DESC ) AS num
+         |        FROM    (
+         |                SELECT  *
+         |                FROM    (
+         |                        SELECT  t2.*
+         |                        FROM    (
+         |                                SELECT DISTINCT  credit_punishment_case_id
+         |                                FROM    ${tmpTab}
+         |                                ) AS t1
+         |                        JOIN (
+         |                                SELECT  *
+         |                                FROM    ${CreditPunishment.caseTargetTab}
+         |                                WHERE   ds > '0'
+         |                             ) AS t2
+         |                        ON      t1.credit_punishment_case_id = t2.credit_punishment_case_id
+         |                        UNION ALL
+         |                        SELECT  ${cols.diff(Seq("ds")).mkString(",")},'${BaseUtil.getYesterday()}' as ds
+         |                        FROM    ${tmpTab}
+         |                        UNION ALL
+         |                        SELECT  tt2.*
+         |                        FROM    (
+         |                                SELECT DISTINCT  credit_punishment_entity_id
+         |                                FROM    ${tmpTab}
+         |                                ) AS tt1
+         |                        JOIN (
+         |                                SELECT  *
+         |                                FROM    ${CreditPunishment.caseTargetTab}
+         |                                WHERE   ds > '0'
+         |                             ) AS tt2
+         |                        ON      tt1.credit_punishment_entity_id = tt2.credit_punishment_entity_id
+         |                        ) AS tab3
+         |                LATERAL VIEW explode(split(all_rowkey,',')) tt AS xjk_rowkey )
+         |        )
+         |WHERE   num = 1
+         |
+         |""".stripMargin)
+      .createTempView("all_rowkey")
+
+    sql(
+      s"""
+         |SELECT  *
+         |FROM    (
+         |            SELECT  *
+         |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey,tn,name ORDER BY ds DESC) AS num
+         |            FROM    (
+         |                        SELECT  t1.*
+         |                        FROM    (
+         |                                    SELECT  *
+         |                                    FROM    winhc_ng.bds_credit_punishment_data_extraction
+         |                                    WHERE   ds > '0'
+         |                                ) AS t1
+         |                        JOIN    all_rowkey AS t2
+         |                        ON      t1.rowkey = t2.rowkey
+         |                        AND     t1.tn = t2.tn
+         |                    )
+         |        )
+         |WHERE   num = 1
+         |""".stripMargin)
+
+      .createTempView("all_record")
+    "all_record"
+  }
+
+
+  def calc(ds: String): Unit = {
     val args = StartAndEndDsUtils(spark).get_gt_ds(CreditPunishment.extractionTargetTab, CreditPunishment.caseTargetTab)
 
     val org_tab = getCreditPunishmentDataExtraction(args.inc_tab_gt_ds)
     sql(
       s"""
-         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $target_tab PARTITION(ds='${ds}')
+         |---INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $target_tab PARTITION(ds='${ds}')
          |SELECT  md5(cleanup(CONCAT_WS('',name,court_name,case_no))) AS credit_punishment_case_id
          |        ,generate_entity_id(mm['keyno'],mm['card_num'],md5(CONCAT_WS('',mm['card_num'],name)),md5(cleanup(CONCAT_WS('',name,court_name,case_no)))) as credit_punishment_entity_id
          |        ,name
@@ -129,6 +225,7 @@ case class CreditPunishmentCaseAgg(s: SparkSession
          |        ,CAST(mm['company_zxr_final_case_num'] AS BIGINT) AS company_zxr_final_case_num
          |        ,CAST(mm['company_zxr_restrict_num'] AS BIGINT) AS company_zxr_restrict_num
          |        ,cast(mm['deleted'] as bigint) AS deleted
+         |        ,mm['all_rowkey'] AS all_rowkey
          |FROM    (
          |            SELECT  name
          |                    ,court_name
@@ -141,5 +238,48 @@ case class CreditPunishmentCaseAgg(s: SparkSession
          |        )
          |WHERE   name IS NOT NULL and trim(name) <> ''
          |""".stripMargin)
+      .cache()
+      .createTempView(s"tmp_data1")
+
+    val all_record = getAllDataByTmpData("tmp_data1")
+
+
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $target_tab PARTITION(ds='${ds}')
+         |SELECT  md5(cleanup(CONCAT_WS('',name,court_name,case_no))) AS credit_punishment_case_id
+         |        ,generate_entity_id(mm['keyno'],mm['card_num'],md5(CONCAT_WS('',mm['card_num'],name)),md5(cleanup(CONCAT_WS('',name,court_name,case_no)))) as credit_punishment_entity_id
+         |        ,name
+         |        ,court_name
+         |        ,case_no
+         |        ,mm['rowkey'] AS rowkey
+         |        ,mm['card_num'] AS card_num
+         |        ,mm['keyno'] AS keyno
+         |        ,mm['label'] AS label
+         |        ,mm['case_create_time'] AS case_create_time
+         |        ,cast(mm['record_num'] as bigint) AS record_num
+         |        ,mm['total_exec_amount'] AS total_exec_amount
+         |        ,mm['total_no_exec_amount'] AS total_no_exec_amount
+         |        ,mm['zxr_total_exec_amount'] AS zxr_total_exec_amount
+         |        ,mm['final_case_exec_amount'] AS final_case_exec_amount
+         |        ,mm['final_case_no_exec_amount'] AS final_case_no_exec_amount
+         |        ,CAST(mm['company_dishonest_info_num'] AS BIGINT) AS company_dishonest_info_num
+         |        ,CAST(mm['company_zxr_num'] AS BIGINT) AS company_zxr_num
+         |        ,CAST(mm['company_zxr_final_case_num'] AS BIGINT) AS company_zxr_final_case_num
+         |        ,CAST(mm['company_zxr_restrict_num'] AS BIGINT) AS company_zxr_restrict_num
+         |        ,cast(mm['deleted'] as bigint) AS deleted
+         |        ,mm['all_rowkey'] AS all_rowkey
+         |FROM    (
+         |            SELECT  name
+         |                    ,court_name
+         |                    ,case_no
+         |                    ,credit_punishment_case_agg(rowkey,tn,keyno,card_num,case_create_time,deleted,detail_data) AS mm
+         |            FROM    $all_record
+         |            GROUP BY name
+         |                     ,court_name
+         |                     ,case_no
+         |        )
+         |WHERE   name IS NOT NULL and trim(name) <> ''
+         |""".stripMargin)
   }
 }

+ 4 - 2
src/main/scala/com/winhc/bigdata/spark/ng/credit_punishment/CreditPunishmentEntityRemove.scala

@@ -26,7 +26,7 @@ case class CreditPunishmentEntityRemove(s: SparkSession
          |CREATE TABLE IF NOT EXISTS $target_tab
          |(
          |    credit_punishment_case_id  STRING COMMENT '信用惩戒对象主体id,没有人的id采用的是案件id'
-         |    ,deleted BIGINT COMMENT '0未删除,1删除'
+         |    ,deleted BIGINT COMMENT '0未删除'
          |)
          |COMMENT 'bds_credit_punishment_entity_info表需要移除的主体id'
          |PARTITIONED BY (ds STRING COMMENT '分区')
@@ -39,7 +39,7 @@ case class CreditPunishmentEntityRemove(s: SparkSession
       s"""
          |INSERT OVERWRITE TABLE $target_tab PARTITION(ds='$ds')
          |SELECT  t1.credit_punishment_case_id
-         |        ,1 AS deleted
+         |        ,9 AS deleted
          |FROM    (
          |            SELECT  *
          |            FROM    ${CreditPunishment.caseTargetTab}
@@ -60,6 +60,8 @@ case class CreditPunishmentEntityRemove(s: SparkSession
          |        ) AS t2
          |ON      t1.credit_punishment_case_id = t2.credit_punishment_case_id
          |AND     t1.credit_punishment_entity_id <> t2.credit_punishment_entity_id
+         |AND     t1.credit_punishment_case_id = t1.credit_punishment_entity_id
+         |AND     t2.credit_punishment_case_id <> t2.credit_punishment_entity_id
          |""".stripMargin)
 
   }

+ 6 - 0
src/main/scala/com/winhc/bigdata/spark/ng/credit_punishment/udf/CreditPunishmentCaseAggUDF.scala

@@ -56,6 +56,7 @@ case class CreditPunishmentCaseAggUDF() extends UserDefinedAggregateFunction {
     , StructField("zxr_total_exec_amount", DoubleType) //被执行人当前被执行总金额 7
     , StructField("final_case_exec_amount", DoubleType) //终本案件执行标的总金额 8
     , StructField("final_case_no_exec_amount", DoubleType) //终本案件未履行总金额 9
+    , StructField("all_rowkey", ArrayType(StringType, containsNull = false)) //所有rowkey的集合,包含历史非历史
   ))
 
   override def dataType: DataType = DataTypes.createMapType(StringType, StringType)
@@ -73,6 +74,7 @@ case class CreditPunishmentCaseAggUDF() extends UserDefinedAggregateFunction {
     buffer.update(7, 0d)
     buffer.update(8, 0d)
     buffer.update(9, 0d)
+    buffer.update(10, Seq.empty[String])
   }
 
   override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
@@ -84,6 +86,8 @@ case class CreditPunishmentCaseAggUDF() extends UserDefinedAggregateFunction {
     val deleted = input.getLong(5)
     val detail_data = input.getString(6)
 
+    buffer(10) = s"$tn$delimiter$rowkey" +: buffer.getSeq[String](0)
+
     deleted match {
       //不计历史信息:
       case 0 => {
@@ -124,6 +128,7 @@ case class CreditPunishmentCaseAggUDF() extends UserDefinedAggregateFunction {
     buffer1(7) = buffer1.getDouble(7) + buffer2.getDouble(7)
     buffer1(8) = buffer1.getDouble(8) + buffer2.getDouble(8)
     buffer1(9) = buffer1.getDouble(9) + buffer2.getDouble(9)
+    buffer1(10) = buffer1.getSeq[String](10) ++ buffer2.getSeq[String](10)
   }
 
   override def evaluate(buffer: Row): Any = {
@@ -170,6 +175,7 @@ case class CreditPunishmentCaseAggUDF() extends UserDefinedAggregateFunction {
       , "company_zxr_restrict_num" -> s"${tnDistribution.getOrElse("company_zxr_restrict", 0)}"
 
       , "deleted" -> deleted
+      , "all_rowkey" -> buffer.getSeq[String](10).distinct.filter(StringUtils.isNotBlank).mkString(",")
     )
 
   }