소스 검색

Merge remote-tracking branch 'origin/master'

许家凯 4 년 전
부모
커밋
4ad9601922

+ 23 - 25
src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JudicialCaseRelationPreNew.scala

@@ -310,7 +310,8 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
     var t1_ds = ds
     if (StringUtils.isBlank(ds)) {
       t2_ds = BaseUtil.getPartion(t2, "wenshu", spark)
-      t1_ds = BaseUtil.getPartion(t1, spark)
+      //t1_ds = BaseUtil.getPartion(t1, spark)
+      t1_ds = t2_ds
     }
 
     val t3 = "ads_judicial_case_relation_replace" //司法案件id交换表
@@ -507,7 +508,7 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
       s"""
          |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.ads_judicial_case_relation_r1
          |SELECT
-         |    judicase_id ,
+         |    x.judicase_id ,
          |    title       ,
          |    case_type   ,
          |    case_reason ,
@@ -521,42 +522,40 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |    case_amt    ,
          |    date        ,
          |    court_level ,
-         |    deleted     ,
+         |    y.deleted     ,
          |    cids
          |FROM
          |(
          |SELECT  judicase_id
-         |        ,max(first_title) title
+         |        ,max(title) title
          |        ,concat_ws(',',collect_set(case_type)) case_type
          |        ,case_reason(case_reason,date,flag) case_reason
          |        ,concat_ws(',',collect_set(case_no)) case_no
          |        ,concat_ws(',',collect_set(court_name)) court_name
          |        ,last_stage(concat_ws(' ',collect_set(case_stage))) case_stage
          |        ,trim_black(concat_ws(',',max(case_type),collect_set(lable))) lable
-         |        -- ,concat('[',concat_ws(',',collect_set(detail)),']') detail
          |        ,null as detail
          |        ,max(case_amt) AS case_amt
          |        ,max(date) AS date
          |        ,trim_black(concat_ws(',',collect_set(court_level))) court_level
-         |        ,max(deleted) deleted
          |        ,concat_ws(',',collect_set(cids)) cids
          |        ,name_aggs(yg_name,bg_name,flag,date) name_aggs
          |FROM    (
          |        SELECT  a.*
-         |                ,first_value(title) OVER (PARTITION BY a.judicase_id ORDER BY date ASC ) AS first_title
-         |                ,b.deleted
          |        FROM    (
-         |                   SELECT  *,court_level(court_name) court_level
+         |                   SELECT  judicase_id,flag,title,case_type,case_reason,case_no,court_name,case_stage,lable,yg_name,bg_name,date,case_amt,cids
+         |                   ,court_level(court_name) court_level
          |                   FROM    $project.$t6
          |                   WHERE   ds >= '$second_ds'
-         |                ) a JOIN
-         |                (
-         |                   select *
-         |                   from $project.$t4
-         |                ) b on a.judicase_id = b.judicase_id
+         |                ) a
          |        )
          |GROUP BY judicase_id
-         |)
+         |)x
+         |JOIN
+         |(
+         |   select *
+         |   from $project.$t4
+         |) y on x.judicase_id = y.judicase_id
          |""".stripMargin).show(20, false)
 
     //明细表
@@ -565,7 +564,7 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.ads_judicial_case_relation_r2
          |SELECT
          |    id    ,
-         |    judicase_id ,
+         |    x.judicase_id ,
          |    title       ,
          |    case_type   ,
          |    case_reason ,
@@ -577,7 +576,7 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |    name_aggs['yg_name'] yg_name,
          |    name_aggs['bg_name'] bg_name,
          |    last_date   ,
-         |    deleted
+         |    y.deleted
          |FROM
          |(
          |SELECT  md5(concat_ws('',judicase_id,CLEANUP(case_no))) id
@@ -591,26 +590,25 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |        ,trim_black(concat_ws(',',max(case_type),collect_set(lable))) lable
          |        ,concat('[',concat_ws(',',collect_set(detail)),']') detail
          |        ,max(last_date) last_date
-         |        ,max(deleted) deleted
          |        ,name_aggs(yg_name,bg_name,flag,date) name_aggs
          |FROM    (
          |        SELECT  a.*
          |                ,first_value(title) OVER (PARTITION BY a.judicase_id ORDER BY date ASC ) AS first_title
          |                ,first_value(date) OVER (PARTITION BY a.judicase_id ORDER BY date DESC ) AS last_date
-         |                ,b.deleted
          |        FROM    (
          |                   SELECT  *
          |                   FROM    $project.$t6
          |                   WHERE   ds >= '$second_ds' AND length(lable) > 0
-         |                )a JOIN
-         |                (
-         |                   select *
-         |                   from $project.$t4
-         |                )b on a.judicase_id = b.judicase_id
+         |                )a
          |)
          |GROUP BY judicase_id
          |         ,case_no
-         |)
+         |) x
+         |JOIN
+         |(
+         |   select *
+         |   from $project.$t4
+         |) y on x.judicase_id = y.judicase_id
          |""".stripMargin).show(10, false)
 
   }

+ 35 - 7
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrCombineUtils.scala

@@ -12,13 +12,32 @@ import scala.collection.mutable
  */
 object CompanyIncrCombineUtils {
   def main(args: Array[String]): Unit = {
-    val Array(project, source, target) = args
+    var project = ""
+    var source = ""
+    var target = ""
+    var flag = "0"
+    if (args.length == 4) {
+      val Array(project1, source1, target1, flag1) = args
+      project = project1
+      source = source1
+      target = target1
+      flag = flag1
+    } else if (args.length == 3) {
+      val Array(project1, source1, target1) = args
+      project = project1
+      source = source1
+      target = target1
+    } else {
+      println("please set project, source, target, flag")
+      sys.exit(-1)
+    }
 
     println(
       s"""
          |project:$project
          |source:$source
          |target:$target
+         |flag:$flag
          |""".stripMargin)
 
     val config = mutable.Map(
@@ -26,17 +45,26 @@ object CompanyIncrCombineUtils {
       "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
     )
     val spark = SparkUtils.InitEnv(getClass.getSimpleName, config)
-    CompanyIncrCombineUtils(spark, source, target).calc()
+    CompanyIncrCombineUtils(spark, source, target, flag).calc()
     spark.stop()
   }
 }
 
-case class CompanyIncrCombineUtils(s: SparkSession, source: String, target: String) extends LoggingUtils {
+case class CompanyIncrCombineUtils(s: SparkSession, source: String, target: String, flag: String = "0" //0=>插入目标表 1=>插入源表分区
+                                  ) extends LoggingUtils {
   override protected val spark: SparkSession = s
 
   def calc(): Unit = {
 
-    val ds2 = BaseUtil.getPartion(s"$target", spark) //目标表数据
+    var ds2 = ""
+    if (flag.equals("1")) {
+      ds2 = BaseUtil.getPartion(s"$source", spark) //源表分区
+    } else {
+      ds2 = BaseUtil.getPartion(s"$target", spark) //目标表数据
+    }
+    if (StringUtils.isBlank(ds2)) {
+      ds2 = BaseUtil.getYesterday()
+    }
 
     val cols: Seq[String] = spark.table(target).schema.map(_.name).filter(s => {
       !s.equals("ds")
@@ -48,14 +76,14 @@ case class CompanyIncrCombineUtils(s: SparkSession, source: String, target: Stri
          |select max(ds) max_ds from $target where id = -1 and ds > '0'
          |""".stripMargin).collect().toList.map(_.getAs[String]("max_ds"))
 
-    println(s"list: $list")
+    println(s"list: $list" + s", ds: $ds2")
 
     sql(
       s"""
-         |INSERT into table $target PARTITION(ds=$ds2)
+         |INSERT ${if (flag.equals("0")) "INTO" else "OVERWRITE"} table $target PARTITION(ds=$ds2)
          |SELECT ${cols.mkString(",")} from
          |$source
          |where ds > '${if (StringUtils.isNotBlank(list.head)) s"${list.head}" else s"0"}'
-         |""".stripMargin)
+         |""".stripMargin).show(100)
   }
 }