Browse Source

feat: 司法案件流程发布上线

许家凯 4 years ago
parent
commit
c678f9faee

+ 4 - 3
src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JudicialCaseRelationPre39.scala

@@ -16,13 +16,13 @@ import scala.collection.mutable
  */
 case class JudicialCaseRelationPre39(s: SparkSession,
                                      project: String //表所在工程名
-                   ) extends LoggingUtils with Logging with BaseFunc with CompanyMapping {
+                                    ) extends LoggingUtils with Logging with BaseFunc with CompanyMapping {
   @(transient@getter) val spark: SparkSession = s
 
   private val target_table = "ads_judicial_case_relation_pre"
 
-  val flag_map  =Map("company_dishonest_info"->"3"
-    ,"company_dishonest_info_human"->"9")
+  val flag_map = Map("company_dishonest_info" -> "3"
+    , "company_dishonest_info_human" -> "9")
 
 
   def company_dishonest_info(is_inc: Boolean = true): Unit = {
@@ -162,6 +162,7 @@ object JudicialCaseRelationPre39 {
     )
     val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
 
+    //直接每天计算全量、不受前一天数据影响
     JudicialCaseRelationPre39(s = spark, project = "winhc_eci_dev").company_dishonest_info(false)
     spark.stop()
   }

+ 20 - 14
src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JudicialCaseRelation_CaseAgg.scala

@@ -18,24 +18,23 @@ import scala.collection.mutable.ArrayBuffer
  */
 case class JudicialCaseRelation_CaseAgg(s: SparkSession,
                                         project: String //表所在工程名
-                               ) extends LoggingUtils with Logging with BaseFunc {
+                                       ) extends LoggingUtils with Logging with BaseFunc {
   @(transient@getter) val spark: SparkSession = s
-  private val table_id_map = Map("justicase" -> "case_id")
+  private val table_id_map = Map("wenshu_detail" -> "case_id")
   private val pat = ".*\\d+.*".r
 
   import spark.implicits._
 
-  def etl(): Unit = {
-    val ds = "20200913"
+  def etl(ds: String): Unit = {
     etl_wenshu(ds)
     relationByGroup()
   }
 
   private def etl_wenshu(ds: String): Unit = {
-    def tableName = "justicase"
+    def tableName = "wenshu_detail"
 
     val table_id = table_id_map(tableName)
-    val other_cols = Seq("yg_name", "court_name", "case_no", "bg_name") ++ Seq(table_id,"ds","connect_case_no")
+    val other_cols = Seq("yg_name", "court_name", "case_no", "bg_name") ++ Seq(table_id, "ds", "connect_case_no")
 
     val ods_end_ds = getLastPartitionsOrElse(s"winhc_eci_dev.ods_$tableName", "0")
     val tmp_tab = s"all_${tableName}_tmp_$ods_end_ds"
@@ -49,11 +48,12 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
          |            FROM    (
          |                        SELECT  ${other_cols.mkString(",")}
          |                        FROM    winhc_eci_dev.ods_$tableName
-         |                        WHERE   ds = '$ods_end_ds'
-         |                        UNION ALL
-         |                        SELECT  ${other_cols.mkString(",")}
-         |                        FROM    winhc_eci_dev.inc_ods_$tableName
-         |                        WHERE   ds > $ods_end_ds
+         |                        WHERE   ds > 0
+         | ---                        WHERE   ds = '$ods_end_ds'
+         | ---                        UNION ALL
+         | ---                        SELECT  ${other_cols.mkString(",")}
+         | ---                        FROM    winhc_eci_dev.inc_ods_$tableName
+         | ---                        WHERE   ds > $ods_end_ds
          |                    ) AS t1
          |        ) AS t2
          |WHERE   t2.num = 1
@@ -163,7 +163,7 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
 
     sql(
       s"""
-         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.xjk_ads_judicial_case_relation1
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.ads_judicial_case_relation
          |SELECT  id_1
          |        ,id_2
          |        ,case_no_1
@@ -238,14 +238,20 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
 object JudicialCaseRelation_CaseAgg {
 
   def main(args: Array[String]): Unit = {
+
+    val Array(ds) = args
+
+    println(
+      s"""
+         |ds: $ds
+         |""".stripMargin)
     val config = mutable.Map(
       "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
       "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
     )
     val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
     val jcr = JudicialCaseRelation_CaseAgg(spark, project = "winhc_eci_dev")
-    jcr.etl()
-//    jcr.relationByGroup()
+    jcr.etl(ds)
     spark.stop()
   }
 }