Ver código fonte

Merge remote-tracking branch 'origin/master'

yandawei 4 anos atrás
pai
commit
192876f5f5

+ 46 - 25
src/main/scala/com/winhc/bigdata/spark/jobs/JudicialCaseRelationPre2.scala

@@ -3,6 +3,7 @@ package com.winhc.bigdata.spark.jobs
 import com.winhc.bigdata.spark.udf.{BaseFunc, CompanyMapping, CourtRank}
 import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
 import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import org.apache.commons.lang3.StringUtils
 import org.apache.spark.sql.SparkSession
 
 import scala.collection.mutable
@@ -12,12 +13,25 @@ import scala.collection.mutable
  * @author π
  * @date 2020/9/17 14:45
  */
-object JudicialCaseRelationPre2 {
+object JudicialCaseRelationPreNew {
   def main(args: Array[String]): Unit = {
-    val project = "winhc_eci_dev"
+    var project = ""
+    var ds = ""
+    if (args.length == 2) {
+      val Array(p1, p2) = args
+      project = p1
+      ds = p2
+    } else if (args.length == 1) {
+      val Array(p1) = args
+      project = p1
+    } else {
+      println("please check project ds !")
+      sys.exit(-1)
+    }
     println(
       s"""
          |project: $project
+         |ds: $ds
          |""".stripMargin)
 
     val config = mutable.Map(
@@ -25,16 +39,15 @@ object JudicialCaseRelationPre2 {
       "spark.hadoop.odps.spark.local.partition.amt" -> "10000"
     )
     val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
-
-    val r = JudicialCaseRelationPre2(spark, project)
-    //r.precalc()
+    val r = JudicialCaseRelationPreNew(spark, project, ds)
+    r.precalc()
     r.calc()
     spark.stop()
   }
 }
 
-case class JudicialCaseRelationPre2(s: SparkSession, project: String
-                                   ) extends LoggingUtils with CompanyMapping with BaseFunc with CourtRank {
+case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: String
+                                     ) extends LoggingUtils with CompanyMapping with BaseFunc with CourtRank {
   override protected val spark: SparkSession = s
 
 
@@ -214,18 +227,26 @@ case class JudicialCaseRelationPre2(s: SparkSession, project: String
     map_2_json()
     registerCourtRank()
     //预处理数据
-    //precalc()
     val cols = Seq("flag", "date", "detail_id")
 
     val t1 = s"$project.inc_ads_company_court_announcement"
     val t2 = s"$project.ads_judicial_case_relation_pre"
-    val t1_ds = BaseUtil.getPartion(t1, spark)
-    val t2_ds = BaseUtil.getPartion(t2, "wenshu", spark)
+    var t2_ds = ds
+    var t1_ds = ds
+    if (StringUtils.isBlank(ds)) {
+      t2_ds = BaseUtil.getPartion(t2, "wenshu", spark)
+      t1_ds = BaseUtil.getPartion(t1, spark)
+    }
+    //司法案件id交换表
+    val t3 = "tmp_xf_judicial_case_relation_replace"
+    val second_ds = getSecondLastPartitionOrElse(t3, "0")
+    val t4 = "tmp_xf_judicial_case_incr_mapping"
+    println(s"calc ds: $t2_ds, par ds : $t1_ds, second_ds : $second_ds")
 
     //替换司法案件id
     sql(
       s"""
-         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $project.tmp_xf_judicial_case_relation_replace_2 partition (ds = '$t1_ds')
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $project.$t3 partition (ds = '$t1_ds')
          |SELECT  COALESCE(b.judicase_id,a.new_judicase_id) judicase_id
          |        ,a.flag
          |        ,a.title
@@ -277,33 +298,33 @@ case class JudicialCaseRelationPre2(s: SparkSession, project: String
     //找出增量数据
     sql(
       s"""
-         |INSERT OVERWRITE TABLE $project.tmp_xf_judicial_case_incr_mapping
+         |INSERT OVERWRITE TABLE $project.$t4
          |SELECT  coalesce(a.judicase_id,b.judicase_id)judicase_id
          |        ,CASE WHEN a.judicase_id IS NULL THEN 1 ELSE 0 END
          |FROM    (
          |            SELECT  judicase_id
          |                    ,md5(concat_ws('',judicase_id, sort(concat_ws('\001',collect_set(case_no))))) r1
-         |            FROM    $project.tmp_xf_judicial_case_relation_replace_2
+         |            FROM    $project.$t3
          |            WHERE   ds = '$t1_ds'
          |            GROUP BY judicase_id
          |        ) a
          |FULL JOIN (
          |              SELECT  judicase_id
          |                      ,md5(concat_ws('',judicase_id, sort(concat_ws('\001',collect_set(case_no))))) r2
-         |              FROM    $project.tmp_xf_judicial_case_relation_replace_2
-         |              WHERE   ds < '$t1_ds'
+         |              FROM    $project.$t3
+         |              WHERE   ds = '$second_ds'
          |              GROUP BY judicase_id
          |          ) b
          |ON  r1 = r2
          |WHERE   r1 IS NULL OR r2 IS NULL
          |""".stripMargin)
 
-    sql(
-      s"""
-         |SELECT  court_name,court_level(court_name) court_level
-         |FROM    $project.tmp_xf_judicial_case_relation_replace_2
-         |WHERE   ds = '$t1_ds'
-         |""".stripMargin).show(200, false)
+    //    sql(
+    //      s"""
+    //         |SELECT  court_name,court_level(court_name) court_level
+    //         |FROM    $project.$t3
+    //         |WHERE   ds = '$t1_ds'
+    //         |""".stripMargin).show(200, false)
 
     //司法案件主表
     sql(
@@ -331,12 +352,12 @@ case class JudicialCaseRelationPre2(s: SparkSession, project: String
          |                ,b.deleted
          |        FROM    (
          |                   SELECT  *,court_level(court_name) court_level
-         |                   FROM    $project.tmp_xf_judicial_case_relation_replace_2
+         |                   FROM    $project.$t3
          |                   WHERE   ds = '$t1_ds'
          |                ) a JOIN
          |                (
          |                   select *
-         |                   from $project.tmp_xf_judicial_case_incr_mapping
+         |                   from $project.$t4
          |                ) b on a.judicase_id = b.judicase_id
          |        )
          |GROUP BY judicase_id
@@ -366,12 +387,12 @@ case class JudicialCaseRelationPre2(s: SparkSession, project: String
          |                ,b.deleted
          |        FROM    (
          |                   SELECT  *
-         |                   FROM    $project.tmp_xf_judicial_case_relation_replace_2
+         |                   FROM    $project.$t3
          |                   WHERE   ds = '$t1_ds'
          |                )a JOIN
          |                (
          |                   select *
-         |                   from $project.tmp_xf_judicial_case_incr_mapping
+         |                   from $project.$t4
          |                )b on a.judicase_id = b.judicase_id
          |)
          |GROUP BY judicase_id