Browse Source

Merge remote-tracking branch 'origin/master'

许家凯 4 years ago
parent
commit
c9b2b73bc4

+ 336 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JudicialCaseRelationDebtorRelation.scala

@@ -0,0 +1,336 @@
+package com.winhc.bigdata.spark.jobs.judicial
+
+import com.winhc.bigdata.spark.udf._
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.sql.{Row, SparkSession}
+import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
+
+/**
+ * 债权表生成
+ */
+object JudicialCaseRelationDebtorRelation {
+  def main(args: Array[String]): Unit = {
+    val project = "winhc_eci_dev"
+    val ds = ""
+
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> s"$project",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "10000"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    JudicialCaseRelationDebtorRelation(spark, project, ds).calc()
+    spark.stop()
+  }
+}
+
+case class JudicialCaseRelationDebtorRelation(s: SparkSession, project: String, ds: String
+                                             ) extends LoggingUtils with CompanyMapping with BaseFunc with CourtRank {
+  override protected val spark: SparkSession = s
+
+  def get_seq_by_index(area_code: Broadcast[Map[String, Seq[String]]], code: String, index: Int): String = {
+    val c = area_code.value.getOrElse(code, null)
+    if (c == null) "" else if (c(index) != null) c(index).trim else ""
+  }
+
+  private def get_area_code(code: String, area_code: Broadcast[Map[String, Seq[String]]]) = {
+    val listbuffer = ListBuffer[String]()
+    if (StringUtils.isNotBlank(code) && code.trim.length == 6) {
+      val c = code.trim
+      listbuffer.append(
+        c.substring(0, 2), get_seq_by_index(area_code, c, 0),
+        c.substring(2, 4), get_seq_by_index(area_code, c, 1),
+        c.substring(4, 6), get_seq_by_index(area_code, c, 2))
+    } else {
+      listbuffer.append("", "", "", "", "", "")
+    }
+    listbuffer.toSeq
+  }
+
+
+  private def get_category_code(code: String, category_code: Broadcast[Map[String, Seq[String]]]) = {
+    val listbuffer = ListBuffer[String]()
+    if (StringUtils.isNotBlank(code)) {
+      val c = code.trim
+      (c, get_seq_by_index(category_code, c, 0), get_seq_by_index(category_code, c, 1), get_seq_by_index(category_code, c, 2))
+      listbuffer.append(
+        get_seq_by_index(category_code, c, 0),
+        get_seq_by_index(category_code, c, 1),
+        get_seq_by_index(category_code, c, 2)
+      )
+    } else {
+      listbuffer.append("", "", "")
+    }
+    listbuffer.toSeq
+  }
+
+  def calc(): Unit = {
+    prepareFunctions(spark)
+    map_2_json()
+    case_no_trim_udf()
+    registerCourtRank()
+    val (m1, m2) = code2Name()
+
+    spark.udf.register("category_code", (code: String) => {
+      get_category_code(code, m1)
+    })
+    spark.udf.register("area_code", (code: String) => {
+      get_area_code(code, m2)
+    })
+
+    spark.udf.register("case_reason", new CaseReasonAggs(1000))
+    //预处理数据
+    val t2 = s"ads_judicial_case_relation_pre"
+    var t2_ds = ds
+    var t1_ds = ds
+    if (StringUtils.isBlank(ds)) {
+      t2_ds = BaseUtil.getPartion(t2, "wenshu", spark)
+      t1_ds = t2_ds
+    }
+
+    val t6 = s"ads_judicial_case_relation_replace_cids" //公司name和cid映射
+    val eci_debtor_relation = "ads_eci_debtor_relation_v2"
+    val deadbeat_company = "ads_deadbeat_company"
+
+    val mapping_ds = BaseUtil.getPartion("winhc_eci_dev.base_company_mapping", spark) //映射表分区
+
+    val columns: Seq[String] = spark.table(s"winhc_eci.$eci_debtor_relation").schema.map(_.name).filter(s => {
+      !s.equals("ds")
+    })
+
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $project.ads_judicial_case_relation_r1
+         |SELECT
+         |    concat_ws('',x.judicase_id,${t1_ds.substring(2)}) judicase_id,
+         |    title       ,
+         |    case_type   ,
+         |    case_reason ,
+         |    case_no     ,
+         |    court_name  ,
+         |    case_stage  ,
+         |    lable       ,
+         |    detail      ,
+         |    yg_name,
+         |    bg_name,
+         |    case_amt    ,
+         |    date        ,
+         |    court_level ,
+         |    0 as deleted     ,
+         |    cids
+         |FROM
+         |(
+         |SELECT  judicase_id
+         |        ,max(title) title
+         |        ,concat_ws(',',collect_set(case_type)) case_type
+         |        ,case_reason(case_reason,date,flag) case_reason
+         |        ,concat_ws(',',collect_set(case_no)) case_no
+         |        ,concat_ws(',',collect_set(court_name)) court_name
+         |        ,last_stage(concat_ws(' ',collect_set(case_stage))) case_stage
+         |        ,trim_black(concat_ws(',',max(case_type),collect_set(lable))) lable
+         |        ,null as detail
+         |        ,max(case_amt) AS case_amt
+         |        ,max(date) AS date
+         |        ,trim_black(concat_ws(',',collect_set(court_level))) court_level
+         |        ,concat_ws(',',collect_set(cids)) cids
+         |        ,concat_ws(',',collect_set(yg_name)) yg_name
+         |        ,concat_ws(',',collect_set(bg_name)) bg_name
+         |FROM    (
+         |        SELECT  *
+         |        FROM    (
+         |                   SELECT  judicase_id,flag,title,case_type,case_reason,case_no,court_name,case_stage,lable,yg_name,bg_name,date,case_amt,cids,detail_id
+         |                   ,court_level(court_name) court_level
+         |                   FROM    $project.$t6
+         |                   WHERE   ds = '$t1_ds'
+         |                )
+         |        )
+         |GROUP BY judicase_id
+         |)x
+         |""".stripMargin).show(20, false)
+
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $project.tmp_xf_yg_bg_name
+         |SELECT md5(concat_ws('',yg_name,bg_name)) id,yg_name,bg_name
+         |FROM (
+         |SELECT
+         |yg_name_x as yg_name
+         |,bg_name_x as bg_name
+         |FROM winhc_eci_dev.ads_judicial_case_relation_r1
+         |LATERAL VIEW explode(split(yg_name,',')) a AS yg_name_x
+         |LATERAL VIEW explode(split(bg_name,',')) b AS bg_name_x
+         |WHERE compare_name(yg_name,bg_name)
+         |AND (lable like '%被执行人%' or lable like '%限制高消费%'  or lable like '%失信人%')
+         |AND  LENGTH(cleanup(yg_name_x)) > 4
+         |AND  LENGTH(cleanup(bg_name_x)) > 4
+         |AND size(split(yg_name,',')) < 500
+         |AND size(split(bg_name,',')) < 500
+         |)
+         |GROUP BY yg_name,bg_name
+         |""".stripMargin)
+
+    //公司基本信息全量表
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"}  TABLE $project.tmp_xf_base_company_mapping_new
+         |select
+         |cid,new_cid,cname,reg_status,area_code,
+         |a1[0]  province_code,a1[1]  province_name,
+         |a1[2]  city_code,a1[3]  city_name,
+         |a1[4]  county_code,a1[5]  county_name,
+         |reg_location,estiblish_time,category_code,
+         |a2[0]  category_first,a2[1]  category_second,a2[2]  category_third,
+         |reg_capital,phones,emails,company_type
+         |from
+         |(
+         |SELECT
+         | area_code(area_code) a1
+         | ,category_code(category_code) a2
+         |,a.cid,a.cname,a.new_cid,id,base,name,name_en,name_alias,history_names,legal_entity_id,legal_entity_type,reg_number,company_org_type,reg_location,estiblish_time,from_time,
+         |to_time,business_scope,reg_institute,approved_time,reg_status,reg_capital,org_number,org_approved_institute,current_cid,parent_cid,b.company_type,
+         |credit_code,score,category_code,lat,lng,area_code,reg_capital_amount,reg_capital_currency,actual_capital_amount,actual_capital_currency,reg_status_std,
+         |social_security_staff_num,cancel_date,cancel_reason,revoke_date,revoke_reason,emails,phones,wechat_public_num,logo,crawled_time,create_time,b.update_time,b.deleted
+         |from (
+         |SELECT * from $project.base_company_mapping where ds = '$mapping_ds'
+         |)a
+         |JOIN
+         |(
+         |SELECT
+         |id,cid,base,name,name_en,name_alias,history_names,legal_entity_id,legal_entity_type,reg_number,company_org_type,reg_location,estiblish_time,from_time,
+         |to_time,business_scope,reg_institute,approved_time,reg_status,reg_capital,org_number,org_approved_institute,current_cid,parent_cid,company_type,
+         |credit_code,score,category_code,lat,lng,area_code,reg_capital_amount,reg_capital_currency,actual_capital_amount,actual_capital_currency,reg_status_std,
+         |social_security_staff_num,cancel_date,cancel_reason,revoke_date,revoke_reason,emails,phones,wechat_public_num,logo,crawled_time,create_time,update_time,deleted
+         | from
+         |(
+         |SELECT
+         |*,ROW_NUMBER() OVER(PARTITION BY cid ORDER BY update_time DESC) AS num
+         |from (
+         |SELECT
+         |id,cid,base,name,name_en,name_alias,history_names,legal_entity_id,legal_entity_type,reg_number,company_org_type,reg_location,estiblish_time,from_time,
+         |to_time,business_scope,reg_institute,approved_time,reg_status,reg_capital,org_number,org_approved_institute,current_cid,parent_cid,company_type,
+         |credit_code,score,category_code,lat,lng,area_code,reg_capital_amount,reg_capital_currency,actual_capital_amount,actual_capital_currency,reg_status_std,
+         |social_security_staff_num,cancel_date,cancel_reason,revoke_date,revoke_reason,emails,phones,wechat_public_num,logo,crawled_time,create_time,update_time,deleted
+         |from $project.ads_company where ds ='20200604'
+         |UNION ALL
+         |SELECT
+         |id,cid,base,name,name_en,name_alias,history_names,legal_entity_id,legal_entity_type,reg_number,company_org_type,reg_location,estiblish_time,from_time,
+         |to_time,business_scope,reg_institute,approved_time,reg_status,reg_capital,org_number,org_approved_institute,current_cid,parent_cid,company_type,
+         |credit_code,score,category_code,lat,lng,area_code,reg_capital_amount,reg_capital_currency,actual_capital_amount,actual_capital_currency,reg_status_std,
+         |social_security_staff_num,cancel_date,cancel_reason,revoke_date,revoke_reason,emails,phones,wechat_public_num,logo,crawled_time,create_time,update_time,deleted
+         |from $project.inc_ads_company where ds >'20200604'
+         |)
+         |)
+         |where num = 1
+         |)b on a.cid =b.cid
+         |)
+         |""".stripMargin).show(100, false)
+
+    //生成债权表
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table winhc_eci.$eci_debtor_relation PARTITION (ds='$t2_ds')
+         |SELECT ${columns.mkString(",")}
+         |FROM (
+         |SELECT e.*
+         |,COUNT(*) over(partition by bg_cid) cnt
+         |FROM (
+         |SELECT
+         |ROW_NUMBER() OVER (PARTITION by c.new_cid,d.new_cid order by c.id desc) num,
+         |c.id,c.yg_name,d.bg_name,c.new_cid yg_cid,d.new_cid bg_cid,
+         |c.reg_status yg_reg_status,
+         |c.province_code yg_province_code,
+         |c.province_name yg_province_name,
+         |c.city_code yg_city_code,
+         |c.city_name yg_city_name,
+         |c.county_code yg_county_code,
+         |c.county_name yg_county_name,
+         |c.reg_location yg_reg_location,
+         |c.estiblish_time yg_estiblish_time,
+         |c.category_code yg_category_code,
+         |c.category_first yg_category_first,
+         |c.category_second yg_category_second,
+         |c.category_third yg_category_third,
+         |c.reg_capital yg_reg_capital,
+         |c.phones yg_phones,
+         |c.emails yg_emails,
+         |d.reg_status bg_reg_status,
+         |d.province_code bg_province_code,
+         |d.province_name bg_province_name,
+         |d.city_code bg_city_code,
+         |d.city_name bg_city_name,
+         |d.county_code bg_county_code,
+         |d.county_name bg_county_name,
+         |d.reg_location bg_reg_location,
+         |d.estiblish_time bg_estiblish_time,
+         |d.category_code bg_category_code,
+         |d.category_first bg_category_first,
+         |d.category_second bg_category_second,
+         |d.category_third bg_category_third,
+         |d.reg_capital bg_reg_capital,
+         |d.phones bg_phones,
+         |d.emails bg_emails,
+         |0 as deleted
+         |FROM    (
+         |            SELECT  a.id
+         |                    ,a.yg_name
+         |                    ,b.*
+         |            FROM    (
+         |                        SELECT  id
+         |                                ,yg_name
+         |                        FROM    $project.tmp_xf_yg_bg_name
+         |                    ) a
+         |            JOIN    (
+         |                        SELECT  *
+         |                                ,ROW_NUMBER() OVER(PARTITION BY CLEANUP(cname) ORDER BY estiblish_time DESC) AS num
+         |                        FROM    $project.tmp_xf_base_company_mapping_new
+         |                        WHERE   length(cleanup(cname)) > 4
+         |                        AND     company_type  IN ('1')
+         |                    ) b
+         |            ON      cleanup(a.yg_name) = cleanup(b.cname)
+         |        )c
+         |JOIN    (
+         |            SELECT  a.id
+         |                    ,a.bg_name
+         |                    ,b.*
+         |            FROM    (
+         |                        SELECT  id
+         |                                ,bg_name
+         |                        FROM    $project.tmp_xf_yg_bg_name
+         |                    ) a
+         |            JOIN    (
+         |                        SELECT  *
+         |                                ,ROW_NUMBER() OVER(PARTITION BY CLEANUP(cname) ORDER BY estiblish_time DESC) AS num
+         |                        FROM    $project.tmp_xf_base_company_mapping_new
+         |                        WHERE   length(cleanup(cname)) > 4
+         |                        AND     company_type  IN ('1')
+         |                    ) b
+         |            ON      cleanup(a.bg_name) = cleanup(b.cname)
+         |        )d
+         |ON      c.id = d.id
+         |)e
+         |JOIN
+         |(
+         |  SELECT  cid
+         |  FROM    (
+         |              SELECT  cid,rowkey,
+         |                      max(deleted) f
+         |              FROM    $project.$deadbeat_company
+         |              WHERE   ds > '0'
+         |              AND     tn <> 'company_zxr_final_case'
+         |              GROUP by cid,rowkey
+         |          )
+         |  WHERE   f = '0'
+         |  GROUP by cid
+         |)f
+         |on e.bg_cid = f.cid
+         |where e.num = 1
+         |)
+         |where cnt > 1
+         |""".stripMargin)
+
+  }
+
+}