Jelajahi Sumber

Merge remote-tracking branch 'origin/master'

许家凯 4 tahun lalu
induk
melakukan
29bcae642f

+ 89 - 18
src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JudicialCaseRelationPreNew.scala

@@ -27,7 +27,7 @@ object JudicialCaseRelationPreNew {
       println("please check project ds c!")
       sys.exit(-1)
     }
-    if(ds.equals("all")) ds =""
+    if (ds.equals("all")) ds = ""
     println(
       s"""
          |project: $project
@@ -65,6 +65,7 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
   }
 
   def precalc(): Unit = {
+    case_no_trim_udf()
     prepareFunctions(spark)
     val t1 = s"$project.inc_ads_company_court_announcement"
     var t1_ds = ds
@@ -155,12 +156,12 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |            FROM $project.ads_judicial_case_relation_graph
          |        ) a
          |RIGHT JOIN (
-         |            SELECT *,md5(cleanup(case_no)) as new_judicase_id
+         |            SELECT *,md5(cleanup(case_no_trim(case_no))) as new_judicase_id
          |             FROM
          |            (
          |              SELECT  *,row_number() over(partition by docid order by judge_date desc) num
-         |              FROM    $project.ods_wenshu_detail
-         |              WHERE   ds > '0'
+         |              FROM    winhc_eci.ods_wenshu_detail
+         |              WHERE   ds > '0' AND case_no_trim(case_no) is not null
          |            )c
          |            where num = 1
          |        ) b
@@ -309,9 +310,13 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
       t2_ds = BaseUtil.getPartion(t2, "wenshu", spark)
       t1_ds = BaseUtil.getPartion(t1, spark)
     }
-    //司法案件id交换表
-    val t3 = "ads_judicial_case_relation_replace"
+
+    val t3 = "ads_judicial_case_relation_replace" //司法案件id交换表
     val t4 = "ads_judicial_case_incr_mapping"
+    val t5 = s"base_company_mapping" //公司name和cid映射
+    val t6 = s"ads_judicial_case_relation_replace_cids" //公司name和cid映射
+
+    val t5_ds = BaseUtil.getPartion(t5, spark) //映射表分区
 
     //替换司法案件id
     sql(
@@ -347,14 +352,14 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |     ,date
          |     ,detail_id
          |     ,case_amt
-         |     ,md5(CLEANUP(case_no)) as new_judicase_id
-         |  from $project.ads_judicial_case_relation_pre
+         |     ,md5(CLEANUP(case_no_trim(case_no))) as new_judicase_id
+         |  from $project.$t2
          |  where ds= '$t2_ds' and tn <> 'wenshu' and case_no_trim(case_no) is not null
          |        and date is not null and length(date) = 19
          |) a
          |LEFT JOIN (
          |  select case_no_trim(case_no) as case_no,max(judicase_id) judicase_id
-         |  from $project.ads_judicial_case_relation_pre
+         |  from $project.$t2
          |  where ds = '$t2_ds' and tn ='wenshu' and case_no_trim(case_no) is not null
          |  group by case_no
          |) b
@@ -375,14 +380,79 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |        ,date
          |        ,detail_id
          |        ,case_amt
-         |from $project.ads_judicial_case_relation_pre
+         |from $project.$t2
          |where ds = '$t2_ds' and tn ='wenshu'  and case_no_trim(case_no) is not null
          |      and date is not null and length(date) = 19
          |""".stripMargin).show(10, false)
 
-    val second_ds = getSecondLastPartitionOrElse(t3, "0")
-    println(s"calc ds: $t2_ds, par ds : $t1_ds, second_ds : $second_ds")
+    //name 替换 cid
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $project.$t6 partition (ds = '$t1_ds')
+         |SELECT
+         |        a.judicase_id
+         |        ,flag
+         |        ,title
+         |        ,case_type
+         |        ,case_reason
+         |        ,case_no
+         |        ,court_name
+         |        ,case_stage
+         |        ,lable
+         |        ,detail
+         |        ,yg_name
+         |        ,bg_name
+         |        ,DATE
+         |        ,detail_id
+         |        ,case_amt
+         |        ,coalesce(b.cids,'') AS cids
+         |FROM    (
+         |        SELECT  *
+         |        FROM    $project.$t3
+         |        WHERE   ds = '$t1_ds'
+         |        ) a
+         |LEFT JOIN (
+         |        SELECT
+         |                judicase_id
+         |                ,sort(concat_ws(',',collect_set(cid)),',') cids
+         |        FROM    (
+         |                SELECT
+         |                        e.judicase_id
+         |                        ,f.new_cid cid
+         |                FROM    (
+         |                        SELECT  *
+         |                        FROM    (
+         |                                SELECT
+         |                                        yg_name AS names
+         |                                        ,judicase_id
+         |                                FROM    $project.$t3
+         |                                WHERE   ds = '$t1_ds' AND length(cleanup(yg_name)) >4
+         |                                UNION ALL
+         |                                SELECT
+         |                                        bg_name AS names
+         |                                        ,judicase_id
+         |                                FROM    $project.$t3
+         |                                WHERE   ds = '$t1_ds' AND length(cleanup(bg_name)) >4
+         |                                ) a
+         |                        LATERAL VIEW explode(split(names,',')) t AS name
+         |                        ) e
+         |                JOIN (
+         |                        SELECT
+         |                                cname
+         |                                ,max(new_cid) AS new_cid
+         |                        FROM    $project.$t5
+         |                        WHERE   ds = '$t5_ds' AND length(cleanup(cname)) >4
+         |                        GROUP BY cname
+         |                     ) f
+         |                ON      cleanup(e.name) = cleanup(f.cname)
+         |                )
+         |        GROUP BY judicase_id
+         |          ) b
+         |ON      a.judicase_id = b.judicase_id
+         |""".stripMargin)
 
+    val second_ds = getSecondLastPartitionOrElse(t6, "0")
+    println(s"calc ds: $t2_ds, par ds : $t1_ds, second_ds : $second_ds")
 
     //找出增量数据
     sql(
@@ -392,15 +462,15 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |        ,CASE WHEN a.judicase_id IS NULL THEN 1 ELSE 0 END
          |FROM    (
          |            SELECT  judicase_id
-         |                    ,md5(concat_ws('',judicase_id, sort(concat_ws('\001',collect_set(case_no))))) r1
-         |            FROM    $project.$t3
+         |                    ,md5(concat_ws('',judicase_id, sort(concat_ws(',',collect_set(case_no)),','), sort(concat_ws(',',collect_set(cids)),','))) r1
+         |            FROM    $project.$t6
          |            WHERE   ds = '$t1_ds'
          |            GROUP BY judicase_id
          |        ) a
          |FULL JOIN (
          |              SELECT  judicase_id
-         |                      ,md5(concat_ws('',judicase_id, sort(concat_ws('\001',collect_set(case_no))))) r2
-         |              FROM    $project.$t3
+         |                      ,md5(concat_ws('',judicase_id, sort(concat_ws(',',collect_set(case_no)),','), sort(concat_ws(',',collect_set(cids)),','))) r2
+         |              FROM    $project.$t6
          |              WHERE   ds = '$second_ds'
          |              GROUP BY judicase_id
          |          ) b
@@ -427,6 +497,7 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |        ,max(date) AS date
          |        ,trim_black(concat_ws(',',collect_set(court_level))) court_level
          |        ,max(deleted) deleted
+         |        ,concat_ws(',',collect_set(cids)) cids
          |FROM    (
          |        SELECT  a.* ,first_value(yg_name) OVER (PARTITION BY a.judicase_id ORDER BY date ASC ) AS first_yg_name
          |                ,first_value(bg_name) OVER (PARTITION BY a.judicase_id ORDER BY date ASC ) AS first_bg_name
@@ -434,7 +505,7 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |                ,b.deleted
          |        FROM    (
          |                   SELECT  *,court_level(court_name) court_level
-         |                   FROM    $project.$t3
+         |                   FROM    $project.$t6
          |                   WHERE   ds >= '$second_ds'
          |                ) a JOIN
          |                (
@@ -471,7 +542,7 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |                ,b.deleted
          |        FROM    (
          |                   SELECT  *
-         |                   FROM    $project.$t3
+         |                   FROM    $project.$t6
          |                   WHERE   ds >= '$second_ds'
          |                )a JOIN
          |                (

+ 4 - 4
src/main/scala/com/winhc/bigdata/spark/model/CompanyCourtAnnouncement.scala

@@ -34,11 +34,11 @@ object CompanyCourtAnnouncement {
 
     val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
     //法院公告
-    CompanyCourtAnnouncement(spark, "company_court_announcement_list", "", "509", "update_time", "法律风险", "法院公告", "0", s"$namespace").calc()
+    CompanyCourtAnnouncement(spark, "company_court_announcement_list", "", "509", "publish_date", "法律风险", "法院公告", "0", s"$namespace").calc()
     //开庭公告
-    CompanyCourtAnnouncement(spark, "company_court_open_announcement_list", "", "507", "update_time", "法律风险", "开庭公告", "0", s"$namespace").calc()
+    CompanyCourtAnnouncement(spark, "company_court_open_announcement_list", "", "507", "start_date", "法律风险", "开庭公告", "0", s"$namespace").calc()
     //立案信息
-    CompanyCourtAnnouncement(spark, "company_court_register_list", "", "510", "update_time", "法律风险", "立案信息", "0", s"$namespace").calc()
+    CompanyCourtAnnouncement(spark, "company_court_register_list", "", "510", "filing_date", "法律风险", "立案信息", "0", s"$namespace").calc()
 
     spark.stop()
 
@@ -73,7 +73,7 @@ case class CompanyCourtAnnouncement(s: SparkSession, sourceTable: String, tableV
 
     //最近三个月内
     var sqlapp = ""
-    sqlapp = s"and $time >= '${BaseUtil.atMonthsBefore(3)}'"
+    sqlapp = s"and cast($time as string) >= '${BaseUtil.atMonthsBefore(3)}'"
 
     val company_mapping = s"$namespace.base_company_mapping"
     val mapping_ds = BaseUtil.getPartion(company_mapping, spark)

+ 3 - 2
src/main/scala/com/winhc/bigdata/spark/udf/CompanyMapping.scala

@@ -47,8 +47,8 @@ trait CompanyMapping {
       label(l)
     })
 
-    spark.udf.register("sort", (s: String) => {
-      sortString(s)
+    spark.udf.register("sort", (s: String, split: String) => {
+      sortString(s, split)
     })
 
     spark.udf.register("trim_black", (s: String) => {
@@ -58,6 +58,7 @@ trait CompanyMapping {
     spark.udf.register("title", (yg: String, bg: String, reason: String) => {
       title(yg, bg, reason)
     })
+
   }
 
   def prepare(spark: SparkSession): Unit = {

+ 9 - 9
src/main/scala/com/winhc/bigdata/spark/utils/BaseUtil.scala

@@ -252,8 +252,8 @@ object BaseUtil {
         case "8" => "立案信息" //企业
         case "9" => "失信人" //人
         case "10" => "被执行人" //人
-        case "11" => "限高" //人
-        case "12" => "终本" //人
+        case "11" => "限消费" //人
+        case "12" => "终本案件" //人
         case _ => ""
       }
     }
@@ -300,10 +300,10 @@ object BaseUtil {
     } else null
   }
 
-  def sortString(s: String): String = {
+  def sortString(s: String, split: String = "\\001"): String = {
     var r = ""
     if (StringUtils.isNotBlank(s)) {
-      r = s.split("\\001").sorted.mkString("")
+      r = s.split(split).sorted.mkString(split)
     }
     r
   }
@@ -356,11 +356,11 @@ object BaseUtil {
   }
 
   def main(args: Array[String]): Unit = {
-//    println(title("xx", null, "reason"))
-//    println(parseAddress("大石桥市人民法院"))
-//    println(case_no_trim("(2015)怀执字第03601号号"))
-//    val seq = Seq("1", "3", "2", "7").mkString("\001")
-//    println(sortString(seq))
+    //    println(title("xx", null, "reason"))
+    //    println(parseAddress("大石桥市人民法院"))
+    //    println(case_no_trim("(2015)怀执字第03601号号"))
+    //    val seq = Seq("1", "3", "2", "7").mkString("\001")
+    //    println(sortString(seq))
     println(is_id_card("4111111999****062x"))
   }