Переглянути джерело

Merge remote-tracking branch 'origin/master'

许家凯 4 роки тому
батько
коміт
64eccb4f2f

+ 32 - 14
src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JudicialCaseRelationPre10.scala

@@ -57,22 +57,40 @@ case class JudicialCaseRelationPre10(s: SparkSession, project: String
          |  ,case_amt
          |from (
          |      select
-         |      md5(cleanup(case_no)) as judicase_id
-         |      ,"7" as flag
-         |      ,concat_ws('',cname,'被执行人') as title
-         |      ,concat_ws('',case_type(case_no)) as case_type
-         |      ,null as case_reason
-         |      ,case_no
-         |      ,court as court_name
-         |      ,concat_ws('',case_stage(case_no)) as case_stage
-         |      ,null as yg_name
-         |      ,cname as bg_name
-         |      ,case_create_time as date
-         |      ,rowkey as detail_id
-         |      ,exec_money as case_amt
-         |      ,row_number() over(partition by rowkey order by update_time desc) num
+         |         md5(cleanup(case_no)) as judicase_id
+         |         ,"7" as flag
+         |         ,concat_ws('',cname,'被执行人') as title
+         |         ,concat_ws('',case_type(case_no)) as case_type
+         |         ,null as case_reason
+         |         ,case_no
+         |         ,court as court_name
+         |         ,concat_ws('',case_stage(case_no)) as case_stage
+         |         ,null as yg_name
+         |         ,cname as bg_name
+         |         ,case_create_time as date
+         |         ,rowkey as detail_id
+         |         ,exec_money as case_amt
+         |         ,row_number() over(partition by rowkey order by update_time desc) num
          |      from $project.inc_ads_company_zxr
          |      where length(case_no) > 0 and ds > '0'
+         |      union all
+         |      select
+         |         md5(cleanup(case_no)) as judicase_id
+         |         ,"7" as flag
+         |         ,concat_ws('',cname,'被执行人') as title
+         |         ,concat_ws('',case_type(case_no)) as case_type
+         |         ,null as case_reason
+         |         ,case_no
+         |         ,court as court_name
+         |         ,concat_ws('',case_stage(case_no)) as case_stage
+         |         ,null as yg_name
+         |         ,cname as bg_name
+         |         ,case_create_time as date
+         |         ,rowkey as detail_id
+         |         ,exec_money as case_amt
+         |         ,row_number() over(partition by rowkey order by update_time desc) num
+         |      from $project.ads_company_zxr
+         |      where length(case_no) > 0 and ds > '0'
          |   )
          |where num = 1
          |""".stripMargin).show(10, false)

+ 32 - 14
src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JudicialCaseRelationPre12.scala

@@ -57,22 +57,40 @@ case class JudicialCaseRelationPre12(s: SparkSession, project: String
          |  ,case_amt
          |from (
          |      select
-         |      md5(cleanup(case_no)) as judicase_id
-         |      ,"6" as flag
-         |      ,concat_ws('',name,'终本案件') as title
-         |      ,concat_ws('',case_type(case_no)) as case_type
-         |      ,NULL as case_reason
-         |      ,case_no
-         |      ,court_name
-         |      ,concat_ws('',case_stage(case_no)) as case_stage
-         |      ,NULL as yg_name
-         |      ,name as bg_name
-         |      ,case_create_time as date
-         |      ,rowkey as detail_id
-         |      ,exec_amount as case_amt
-         |      ,row_number() over(partition by rowkey order by update_time desc) num
+         |         md5(cleanup(case_no)) as judicase_id
+         |         ,"6" as flag
+         |         ,concat_ws('',name,'终本案件') as title
+         |         ,concat_ws('',case_type(case_no)) as case_type
+         |         ,NULL as case_reason
+         |         ,case_no
+         |         ,court_name
+         |         ,concat_ws('',case_stage(case_no)) as case_stage
+         |         ,NULL as yg_name
+         |         ,name as bg_name
+         |         ,case_create_time as date
+         |         ,rowkey as detail_id
+         |         ,exec_amount as case_amt
+         |         ,row_number() over(partition by rowkey order by update_time desc) num
          |      from $project.inc_ads_company_zxr_final_case
          |      where length(case_no) > 0 and ds > '0'
+         |      union all
+         |      select
+         |         md5(cleanup(case_no)) as judicase_id
+         |         ,"6" as flag
+         |         ,concat_ws('',name,'终本案件') as title
+         |         ,concat_ws('',case_type(case_no)) as case_type
+         |         ,NULL as case_reason
+         |         ,case_no
+         |         ,court_name
+         |         ,concat_ws('',case_stage(case_no)) as case_stage
+         |         ,NULL as yg_name
+         |         ,name as bg_name
+         |         ,case_create_time as date
+         |         ,rowkey as detail_id
+         |         ,exec_amount as case_amt
+         |         ,row_number() over(partition by rowkey order by update_time desc) num
+         |      from $project.ads_company_zxr_final_case
+         |      where length(case_no) > 0 and ds > '0'
          |   )
          |where num = 1
          |""".stripMargin).show(10, false)

+ 21 - 12
src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JudicialCaseRelationPreNew.scala

@@ -17,21 +17,22 @@ object JudicialCaseRelationPreNew {
   def main(args: Array[String]): Unit = {
     var project = ""
     var ds = ""
-    if (args.length == 2) {
-      val Array(p1, p2) = args
+    var c = ""
+    if (args.length == 3) {
+      val Array(p1, p2, p3) = args
       project = p1
       ds = p2
-    } else if (args.length == 1) {
-      val Array(p1) = args
-      project = p1
+      c = p3
     } else {
-      println("please check project ds !")
+      println("please check project ds c!")
       sys.exit(-1)
     }
+    if(ds.equals("all")) ds =""
     println(
       s"""
          |project: $project
          |ds: $ds
+         |c: $c
          |""".stripMargin)
 
     val config = mutable.Map(
@@ -40,12 +41,16 @@ object JudicialCaseRelationPreNew {
     )
     val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
     val r = JudicialCaseRelationPreNew(spark, project, ds)
-    r.precalc()
-    r.calc()
+    c match {
+      case "calc" => r.calc()
+      case "precalc" => r.precalc()
+    }
+
     spark.stop()
   }
 }
 
+
 case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: String
                                      ) extends LoggingUtils with CompanyMapping with BaseFunc with CourtRank {
   override protected val spark: SparkSession = s
@@ -63,7 +68,7 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
     prepareFunctions(spark)
     val t1 = s"$project.inc_ads_company_court_announcement"
     var t1_ds = ds
-    if(StringUtils.isBlank(ds)){
+    if (StringUtils.isBlank(ds)) {
       t1_ds = BaseUtil.getPartion(t1, spark)
     }
 
@@ -306,9 +311,7 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
     }
     //司法案件id交换表
     val t3 = "ads_judicial_case_relation_replace"
-    val second_ds = getSecondLastPartitionOrElse(t3, "0")
     val t4 = "ads_judicial_case_incr_mapping"
-    println(s"calc ds: $t2_ds, par ds : $t1_ds, second_ds : $second_ds")
 
     //替换司法案件id
     sql(
@@ -377,8 +380,12 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |      and date is not null and length(date) = 19
          |""".stripMargin).show(10, false)
 
+    val second_ds = getSecondLastPartitionOrElse(t3, "0")
+    println(s"calc ds: $t2_ds, par ds : $t1_ds, second_ds : $second_ds")
+
+
     //找出增量数据
-     sql(
+    sql(
       s"""
          |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $project.$t4
          |SELECT  coalesce(a.judicase_id,b.judicase_id)judicase_id
@@ -454,11 +461,13 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |        ,concat('[',concat_ws(',',collect_set(detail)),']') detail
          |        ,max(first_yg_name) yg_name
          |        ,max(first_bg_name) bg_name
+         |        ,max(last_date) last_date
          |        ,max(deleted) deleted
          |FROM    (
          |        SELECT  a.* ,first_value(yg_name) OVER (PARTITION BY a.judicase_id ORDER BY date ASC ) AS first_yg_name
          |                ,first_value(bg_name) OVER (PARTITION BY a.judicase_id ORDER BY date ASC ) AS first_bg_name
          |                ,first_value(title) OVER (PARTITION BY a.judicase_id ORDER BY date ASC ) AS first_title
+         |                ,first_value(date) OVER (PARTITION BY a.judicase_id ORDER BY date DESC ) AS last_date
          |                ,b.deleted
          |        FROM    (
          |                   SELECT  *