Pārlūkot izejas kodu

feat: 前置处理添加失信人

许家凯 4 gadi atpakaļ
vecāks
revīzija
9945797758

+ 157 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JCR_pre3.scala

@@ -0,0 +1,157 @@
+package com.winhc.bigdata.spark.jobs.judicial
+
+import com.winhc.bigdata.spark.udf.{BaseFunc, CompanyMapping}
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.{LoggingUtils, SparkUtils}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/9/27 11:41
+ * @Description: 其它几个维度司法案件前置处理
+ */
+case class JCR_pre3(s: SparkSession,
+                    project: String //表所在工程名
+                    ) extends LoggingUtils with Logging with BaseFunc with CompanyMapping {
+  @(transient@getter) val spark: SparkSession = s
+
+  private val target_table = "ads_judicial_case_relation_pre"
+
+
+  def company_dishonest_info(is_inc: Boolean = true): Unit = {
+    prepareFunctions(spark)
+
+    def all(): Unit = {
+      val ads_ds = getLastPartitionsOrElse(s"$project.ads_company_dishonest_info", "0")
+      val inc_last_ds = getLastPartitionsOrElse(s"$project.inc_ads_company_dishonest_info", "0")
+
+      val intersectCols = getColumns(s"$project.ads_company_dishonest_info").toSet & getColumns(s"$project.inc_ads_company_dishonest_info").toSet
+
+      sql(
+        s"""
+           |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $project.ads_judicial_case_relation_pre PARTITION(ds='$inc_last_ds',tn='company_dishonest_info')
+           |SELECT  md5(CONCAT(rowkey,'company_dishonest_info')) as judicase_id
+           |        ,3 as flag
+           |        ,CONCAT('关于',name,'的失信信息') as title
+           |        ,case_type(gist_dd) as case_type
+           |        ,null as case_reason
+           |        ,case_no
+           |        ,court as court_name
+           |        ,'执行' as case_stage
+           |        ,null as yg_name
+           |        ,name as bg_name
+           |        ,pub_date as date
+           |        ,rowkey as detail_id
+           |        ,null as case_amt
+           |FROM    (
+           |            SELECT  *
+           |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds DESC ) AS num
+           |            FROM    (
+           |                        SELECT  ${intersectCols.mkString(",")}
+           |                        FROM    winhc_eci_dev.ads_company_dishonest_info
+           |                        WHERE   ds = '$ads_ds'
+           |                        UNION ALL
+           |                        SELECT  ${intersectCols.mkString(",")}
+           |                        FROM    winhc_eci_dev.inc_ads_company_dishonest_info
+           |                        WHERE   ds > '$ads_ds'
+           |                    ) AS t1
+           |        ) AS t2
+           |WHERE   t2.num = 1
+           |""".stripMargin)
+    }
+
+    def inc(): Unit = {
+
+      val last_ds = sql(s"show partitions $project.$target_table")
+        .collect()
+        .map(r => r.getString(0))
+        .filter(str => str.contains("company_dishonest_info"))
+        .map(str => str.split("/")(0).split("=")(1))
+        .max
+
+      val inc_last_ds = getLastPartitionsOrElse(s"$project.inc_ads_company_dishonest_info", "0")
+
+      sql(
+        s"""
+           |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $project.ads_judicial_case_relation_pre PARTITION(ds='$inc_last_ds',tn='company_dishonest_info')
+           |SELECT  judicase_id
+           |        ,flag
+           |        ,title
+           |        ,case_type
+           |        ,case_reason
+           |        ,case_no
+           |        ,court_name
+           |        ,case_stage
+           |        ,yg_name
+           |        ,bg_name
+           |        ,date
+           |        ,detail_id
+           |        ,case_amt
+           |FROM    (
+           |            SELECT  *
+           |                    ,ROW_NUMBER() OVER(PARTITION BY judicase_id ORDER BY xjk_f DESC ) AS num
+           |            FROM    (
+           |                        SELECT  judicase_id
+           |                                ,flag
+           |                                ,title
+           |                                ,case_type
+           |                                ,case_reason
+           |                                ,case_no
+           |                                ,court_name
+           |                                ,case_stage
+           |                                ,yg_name
+           |                                ,bg_name
+           |                                ,date
+           |                                ,detail_id
+           |                                ,case_amt
+           |                                ,0 AS xjk_f
+           |                        FROM    winhc_eci_dev.ads_judicial_case_relation_pre
+           |                        WHERE   ds = '$last_ds'
+           |                        AND     tn = 'company_dishonest_info'
+           |                        UNION ALL
+           |                        SELECT  md5(CONCAT(rowkey,'company_dishonest_info')) as judicase_id
+           |                                ,3 as flag
+           |                                ,CONCAT('关于',name,'的失信信息') as title
+           |                                ,case_type(gist_dd) as case_type
+           |                                ,null as case_reason
+           |                                ,case_no
+           |                                ,court as court_name
+           |                                ,'执行' as case_stage
+           |                                ,null as yg_name
+           |                                ,name as bg_name
+           |                                ,pub_date as date
+           |                                ,rowkey as detail_id
+           |                                ,null as case_amt
+           |                                ,1 AS xjk_f
+           |                        FROM    winhc_eci_dev.inc_ads_company_dishonest_info
+           |                        WHERE   ds > '$last_ds'
+           |                    ) AS t1
+           |        ) AS t2
+           |WHERE   t2.num = 1
+           |""".stripMargin)
+
+    }
+
+    if (is_inc)
+      inc()
+    else
+      all()
+  }
+}
+
+object JCR_pre3 {
+  def main(args: Array[String]): Unit = {
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+
+    JCR_pre3(s = spark, project = "winhc_eci_dev").company_dishonest_info(true)
+    spark.stop()
+  }
+}

+ 4 - 4
src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JudicialCaseRelation.scala

@@ -16,8 +16,8 @@ import scala.collection.mutable.ArrayBuffer
  * @Date: 2020/8/28 16:52
  * @Description:
  */
-case class JudicialCaseRelation(s: SparkSession,
-                                project: String //表所在工程名
+case class JudicialCaseRelation_CaseAgg(s: SparkSession,
+                                        project: String //表所在工程名
                                ) extends LoggingUtils with Logging with BaseFunc {
   @(transient@getter) val spark: SparkSession = s
   private val table_id_map = Map("justicase" -> "case_id")
@@ -235,7 +235,7 @@ case class JudicialCaseRelation(s: SparkSession,
 
 }
 
-object JudicialCaseRelation {
+object JudicialCaseRelation_CaseAgg {
 
   def main(args: Array[String]): Unit = {
     val config = mutable.Map(
@@ -243,7 +243,7 @@ object JudicialCaseRelation {
       "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
     )
     val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
-    val jcr = JudicialCaseRelation(spark, project = "winhc_eci_dev")
+    val jcr = JudicialCaseRelation_CaseAgg(spark, project = "winhc_eci_dev")
     jcr.etl()
 //    jcr.relationByGroup()
     spark.stop()