|
@@ -0,0 +1,362 @@
|
|
|
+package com.winhc.bigdata.spark.jobs.judicial
|
|
|
+
|
|
|
+import java.util.NoSuchElementException
|
|
|
+
|
|
|
+import com.winhc.bigdata.spark.udf._
|
|
|
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
|
|
|
+import com.winhc.bigdata.spark.utils.{BaseUtil, EsRestUtils, LoggingUtils, SparkUtils}
|
|
|
+import org.apache.commons.lang3.StringUtils
|
|
|
+import org.apache.spark.broadcast.Broadcast
|
|
|
+import org.apache.spark.sql.types.StructType
|
|
|
+import org.apache.spark.sql.{Row, SparkSession}
|
|
|
+
|
|
|
+import scala.collection.mutable
|
|
|
+import scala.collection.mutable.ListBuffer
|
|
|
+
|
|
|
+object JudicialCaseRelationPreNewTmp2 {
|
|
|
+ def main(args: Array[String]): Unit = {
|
|
|
+ val project = "winhc_eci_dev"
|
|
|
+ val ds = ""
|
|
|
+
|
|
|
+ val config = mutable.Map(
|
|
|
+ "spark.hadoop.odps.project.name" -> s"$project",
|
|
|
+ "spark.hadoop.odps.spark.local.partition.amt" -> "10000"
|
|
|
+ )
|
|
|
+ val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
|
|
|
+ JudicialCaseRelationPreNewTmp2(spark, project, ds).calc()
|
|
|
+ spark.stop()
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+case class JudicialCaseRelationPreNewTmp2(s: SparkSession, project: String, ds: String
|
|
|
+ ) extends LoggingUtils with CompanyMapping with BaseFunc with CourtRank {
|
|
|
+ override protected val spark: SparkSession = s
|
|
|
+
|
|
|
+ def get_seq_by_index(area_code: Broadcast[Map[String, Seq[String]]], code: String, index: Int): String = {
|
|
|
+ val c = area_code.value.getOrElse(code, null)
|
|
|
+ if (c == null) "" else if (c(index) != null) c(index).trim else ""
|
|
|
+ }
|
|
|
+
|
|
|
+ private def get_area_code(code: String, area_code: Broadcast[Map[String, Seq[String]]]) = {
|
|
|
+ val listbuffer = ListBuffer[String]()
|
|
|
+ if (StringUtils.isNotBlank(code) && code.trim.length == 6) {
|
|
|
+ val c = code.trim
|
|
|
+ listbuffer.append(
|
|
|
+ c.substring(0, 2), get_seq_by_index(area_code, c, 0),
|
|
|
+ c.substring(2, 4), get_seq_by_index(area_code, c, 1),
|
|
|
+ c.substring(4, 6), get_seq_by_index(area_code, c, 2))
|
|
|
+ } else {
|
|
|
+ listbuffer.append("", "", "", "", "", "")
|
|
|
+ }
|
|
|
+ listbuffer.toSeq
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ private def get_category_code(code: String, category_code: Broadcast[Map[String, Seq[String]]]) = {
|
|
|
+ val listbuffer = ListBuffer[String]()
|
|
|
+ if (StringUtils.isNotBlank(code)) {
|
|
|
+ val c = code.trim
|
|
|
+ (c, get_seq_by_index(category_code, c, 0), get_seq_by_index(category_code, c, 1), get_seq_by_index(category_code, c, 2))
|
|
|
+ listbuffer.append(
|
|
|
+ get_seq_by_index(category_code, c, 0),
|
|
|
+ get_seq_by_index(category_code, c, 1),
|
|
|
+ get_seq_by_index(category_code, c, 2)
|
|
|
+ )
|
|
|
+ } else {
|
|
|
+ listbuffer.append("", "", "")
|
|
|
+ }
|
|
|
+ listbuffer.toSeq
|
|
|
+ }
|
|
|
+
|
|
|
+ def calc(): Unit = {
|
|
|
+ prepareFunctions(spark)
|
|
|
+ map_2_json()
|
|
|
+ case_no_trim_udf()
|
|
|
+ registerCourtRank()
|
|
|
+ val (m1, m2) = code2Name()
|
|
|
+
|
|
|
+ spark.udf.register("category_code", (code: String) => {
|
|
|
+ get_category_code(code, m1)
|
|
|
+ })
|
|
|
+ spark.udf.register("area_code", (code: String) => {
|
|
|
+ get_area_code(code, m2)
|
|
|
+ })
|
|
|
+
|
|
|
+ spark.udf.register("name_aggs", new NameAggs(1000))
|
|
|
+ spark.udf.register("case_reason", new CaseReasonAggs(1000))
|
|
|
+ //预处理数据
|
|
|
+ val t2 = s"ads_judicial_case_relation_pre"
|
|
|
+ var t2_ds = ds
|
|
|
+ var t1_ds = ds
|
|
|
+ if (StringUtils.isBlank(ds)) {
|
|
|
+ t2_ds = BaseUtil.getPartion(t2, "wenshu", spark)
|
|
|
+ t1_ds = t2_ds
|
|
|
+ }
|
|
|
+
|
|
|
+ val t3 = "ads_judicial_case_relation_replace" //司法案件id交换表
|
|
|
+ val t4 = "ads_judicial_case_incr_mapping"
|
|
|
+ val t5 = s"base_company_mapping" //公司name和cid映射
|
|
|
+ val t6 = s"ads_judicial_case_relation_replace_cids" //公司name和cid映射
|
|
|
+ val eci_debtor_relation = "ads_eci_debtor_relation_xf"
|
|
|
+ val deadbeat_company = "ads_deadbeat_company"
|
|
|
+
|
|
|
+
|
|
|
+ //println(schema)
|
|
|
+
|
|
|
+ val t5_ds = BaseUtil.getPartion(t5, spark) //映射表分区
|
|
|
+
|
|
|
+ // sql(
|
|
|
+ // """
|
|
|
+ // |insert overwrite table winhc_eci_dev.tmp_xf_deadbeat_company_deleted
|
|
|
+ // |SELECT rowkey,cid,name,card_num,publish_date,deleted,tn,flag from (
|
|
|
+ // |SELECT *,case when tn ='company_zxr' then 7 when tn ='company_dishonest_info' then 3 when tn ='company_zxr_restrict' then 5 else '' end as flag
|
|
|
+ // |,ROW_NUMBER() OVER(PARTITION BY rowkey,tn ORDER BY publish_date DESC) AS num
|
|
|
+ // |from ads_deadbeat_company
|
|
|
+ // |where ds > '0' and deleted = 1 and tn <> 'company_zxr_final_case'
|
|
|
+ // |)
|
|
|
+ // |where num = 1;
|
|
|
+ // |""".stripMargin)
|
|
|
+
|
|
|
+ // sql(
|
|
|
+ // s"""
|
|
|
+ // |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.ads_judicial_case_relation_r1
|
|
|
+ // |SELECT
|
|
|
+ // | concat_ws('',x.judicase_id,${t1_ds.substring(2)}) judicase_id,
|
|
|
+ // | title ,
|
|
|
+ // | case_type ,
|
|
|
+ // | case_reason ,
|
|
|
+ // | case_no ,
|
|
|
+ // | court_name ,
|
|
|
+ // | case_stage ,
|
|
|
+ // | lable ,
|
|
|
+ // | detail ,
|
|
|
+ // | -- name_aggs['yg_name'] yg_name,
|
|
|
+ // | -- name_aggs['bg_name'] bg_name,
|
|
|
+ // | yg_name,
|
|
|
+ // | bg_name,
|
|
|
+ // | case_amt ,
|
|
|
+ // | date ,
|
|
|
+ // | court_level ,
|
|
|
+ // | 0 as deleted ,
|
|
|
+ // | cids
|
|
|
+ // |FROM
|
|
|
+ // |(
|
|
|
+ // |SELECT judicase_id
|
|
|
+ // | ,max(title) title
|
|
|
+ // | ,concat_ws(',',collect_set(case_type)) case_type
|
|
|
+ // | ,case_reason(case_reason,date,flag) case_reason
|
|
|
+ // | ,concat_ws(',',collect_set(case_no)) case_no
|
|
|
+ // | ,concat_ws(',',collect_set(court_name)) court_name
|
|
|
+ // | ,last_stage(concat_ws(' ',collect_set(case_stage))) case_stage
|
|
|
+ // | ,trim_black(concat_ws(',',max(case_type),collect_set(lable))) lable
|
|
|
+ // | ,null as detail
|
|
|
+ // | ,max(case_amt) AS case_amt
|
|
|
+ // | ,max(date) AS date
|
|
|
+ // | ,trim_black(concat_ws(',',collect_set(court_level))) court_level
|
|
|
+ // | ,concat_ws(',',collect_set(cids)) cids
|
|
|
+ // | -- ,name_aggs(yg_name,bg_name,flag,date) name_aggs
|
|
|
+ // | ,concat_ws(',',collect_set(yg_name)) yg_name
|
|
|
+ // | ,concat_ws(',',collect_set(bg_name)) bg_name
|
|
|
+ // |FROM (
|
|
|
+ // | SELECT a.*
|
|
|
+ // | FROM (
|
|
|
+ // | SELECT judicase_id,flag,title,case_type,case_reason,case_no,court_name,case_stage,lable,yg_name,bg_name,date,case_amt,cids,detail_id
|
|
|
+ // | ,court_level(court_name) court_level
|
|
|
+ // | FROM $project.$t6
|
|
|
+ // | WHERE ds = '$t1_ds'
|
|
|
+ // | ) a
|
|
|
+ // | left join
|
|
|
+ // | (
|
|
|
+ // | select rowkey,flag from winhc_eci_dev.tmp_xf_deadbeat_company_deleted
|
|
|
+ // | )b on a.detail_id = b.rowkey and a.flag = b.flag
|
|
|
+ // | where b.rowkey is null
|
|
|
+ // | )
|
|
|
+ // |GROUP BY judicase_id
|
|
|
+ // |)x
|
|
|
+ // |""".stripMargin).show(20, false)
|
|
|
+
|
|
|
+ //ads_eci_debtor_relation_xf
|
|
|
+
|
|
|
+ //生成债权表
|
|
|
+ // val df = sql(
|
|
|
+ // s"""
|
|
|
+ // |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $project.tmp_xf_yg_bg_name
|
|
|
+ // |SELECT md5(concat_ws('',yg_name,bg_name)) id,yg_name,bg_name
|
|
|
+ // |FROM (
|
|
|
+ // |SELECT
|
|
|
+ // |yg_name_x as yg_name
|
|
|
+ // |,bg_name_x as bg_name
|
|
|
+ // |FROM winhc_eci_dev.ads_judicial_case_relation_r1
|
|
|
+ // |LATERAL VIEW explode(split(yg_name,',')) a AS yg_name_x
|
|
|
+ // |LATERAL VIEW explode(split(bg_name,',')) b AS bg_name_x
|
|
|
+ // |WHERE compare_name(yg_name,bg_name)
|
|
|
+ // |AND (lable like '%被执行人%' or lable like '%限制高消费%' or lable like '%失信人%')
|
|
|
+ // |AND LENGTH(cleanup(yg_name_x)) > 4
|
|
|
+ // |AND LENGTH(cleanup(bg_name_x)) > 4
|
|
|
+ // |AND size(split(yg_name,',')) < 500
|
|
|
+ // |AND size(split(bg_name,',')) < 500
|
|
|
+ // |)
|
|
|
+ // |GROUP BY yg_name,bg_name
|
|
|
+ // |""".stripMargin)
|
|
|
+
|
|
|
+
|
|
|
+// sql(
|
|
|
+// """
|
|
|
+// |INSERT OVERWRITE TABLE winhc_eci_dev.tmp_xf_base_company_mapping
|
|
|
+// |SELECT
|
|
|
+// |a.cid,a.cname,a.new_cid,id,base,name,name_en,name_alias,history_names,legal_entity_id,legal_entity_type,reg_number,company_org_type,reg_location,estiblish_time,from_time,
|
|
|
+// |to_time,business_scope,reg_institute,approved_time,reg_status,reg_capital,org_number,org_approved_institute,current_cid,parent_cid,b.company_type,
|
|
|
+// |credit_code,score,category_code,lat2,lng2,area_code,reg_capital_amount,reg_capital_currency,actual_capital_amount,actual_capital_currency,reg_status_std,
|
|
|
+// |social_security_staff_num,cancel_date,cancel_reason,revoke_date,revoke_reason,emails,phones,wechat_public_num,logo,crawled_time,create_time,b.update_time,b.deleted
|
|
|
+// |from (
|
|
|
+// |SELECT * from winhc_eci_dev.base_company_mapping where ds = '20201125'
|
|
|
+// |)a
|
|
|
+// |JOIN
|
|
|
+// |(
|
|
|
+// | SELECT
|
|
|
+// |id,cid,base,name,name_en,name_alias,history_names,legal_entity_id,legal_entity_type,reg_number,company_org_type,reg_location,estiblish_time,from_time,
|
|
|
+// |to_time,business_scope,reg_institute,approved_time,reg_status,reg_capital,org_number,org_approved_institute,current_cid,parent_cid,company_type,
|
|
|
+// |credit_code,score,category_code,lat2,lng2,area_code,reg_capital_amount,reg_capital_currency,actual_capital_amount,actual_capital_currency,reg_status_std,
|
|
|
+// |social_security_staff_num,cancel_date,cancel_reason,revoke_date,revoke_reason,emails,phones,wechat_public_num,logo,crawled_time,create_time,update_time,deleted
|
|
|
+// | from
|
|
|
+// |(
|
|
|
+// |SELECT
|
|
|
+// |*,ROW_NUMBER() OVER(PARTITION BY cid ORDER BY update_time DESC) AS num,split(verify,',')[0] lng2,split(verify,',')[1] lat2
|
|
|
+// |from (
|
|
|
+// |SELECT
|
|
|
+// |verify(lng,lat) verify,
|
|
|
+// |id,cid,base,name,name_en,name_alias,history_names,legal_entity_id,legal_entity_type,reg_number,company_org_type,reg_location,estiblish_time,from_time,
|
|
|
+// |to_time,business_scope,reg_institute,approved_time,reg_status,reg_capital,org_number,org_approved_institute,current_cid,parent_cid,company_type,
|
|
|
+// |credit_code,score,category_code,lat,lng,area_code,reg_capital_amount,reg_capital_currency,actual_capital_amount,actual_capital_currency,reg_status_std,
|
|
|
+// |social_security_staff_num,cancel_date,cancel_reason,revoke_date,revoke_reason,emails,phones,wechat_public_num,logo,crawled_time,create_time,update_time,deleted
|
|
|
+// |from winhc_eci_dev.ads_company where ds ='20200604'
|
|
|
+// |UNION ALL
|
|
|
+// |SELECT
|
|
|
+// |verify(lng,lat) verify,
|
|
|
+// |id,cid,base,name,name_en,name_alias,history_names,legal_entity_id,legal_entity_type,reg_number,company_org_type,reg_location,estiblish_time,from_time,
|
|
|
+// |to_time,business_scope,reg_institute,approved_time,reg_status,reg_capital,org_number,org_approved_institute,current_cid,parent_cid,company_type,
|
|
|
+// |credit_code,score,category_code,lat,lng,area_code,reg_capital_amount,reg_capital_currency,actual_capital_amount,actual_capital_currency,reg_status_std,
|
|
|
+// |social_security_staff_num,cancel_date,cancel_reason,revoke_date,revoke_reason,emails,phones,wechat_public_num,logo,crawled_time,create_time,update_time,deleted
|
|
|
+// |from winhc_eci_dev.inc_ads_company where ds >'20200604'
|
|
|
+// |)
|
|
|
+// |)
|
|
|
+// |where num = 1
|
|
|
+// |)b on a.cid =b.cid
|
|
|
+// |""".stripMargin)
|
|
|
+
|
|
|
+ //公司基本信息全量表
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $project.tmp_xf_base_company_mapping_new
|
|
|
+ |select
|
|
|
+ |cid,new_cid,cname,reg_status,area_code,
|
|
|
+ |a1[0] province_code,a1[1] province_name,
|
|
|
+ |a1[2] city_code,a1[3] city_name,
|
|
|
+ |a1[4] county_code,a1[5] county_name,
|
|
|
+ |reg_location,estiblish_time,category_code,
|
|
|
+ |a2[0] category_first,a2[1] category_second,a2[2] category_third,
|
|
|
+ |reg_capital,phones,emails,company_type
|
|
|
+ |from
|
|
|
+ |(
|
|
|
+ |select
|
|
|
+ | area_code(area_code) a1
|
|
|
+ | ,category_code(category_code) a2
|
|
|
+ | ,*
|
|
|
+ |from ${project}.tmp_xf_base_company_mapping
|
|
|
+ |)
|
|
|
+ |""".stripMargin).show(100,false)
|
|
|
+
|
|
|
+ //生成债权表
|
|
|
+ sql(
|
|
|
+ """
|
|
|
+ |insert overwrite table winhc_eci_dev.ads_eci_debtor_relation_xf PARTITION (ds='20201127')
|
|
|
+ |select e.* from (
|
|
|
+ |SELECT
|
|
|
+ |c.id,c.yg_name,d.bg_name,c.new_cid yg_cid,d.new_cid bg_cid,
|
|
|
+ |c.reg_status yg_reg_status,
|
|
|
+ |c.province_code yg_province_code,
|
|
|
+ |c.province_name yg_province_name,
|
|
|
+ |c.city_code yg_city_code,
|
|
|
+ |c.city_name yg_city_name,
|
|
|
+ |c.county_code yg_county_code,
|
|
|
+ |c.county_name yg_county_name,
|
|
|
+ |c.reg_location yg_reg_location,
|
|
|
+ |c.estiblish_time yg_estiblish_time,
|
|
|
+ |c.category_code yg_category_code,
|
|
|
+ |c.category_first yg_category_first,
|
|
|
+ |c.category_second yg_category_second,
|
|
|
+ |c.category_third yg_category_third,
|
|
|
+ |c.reg_capital yg_reg_capital,
|
|
|
+ |c.phones yg_phones,
|
|
|
+ |c.emails yg_emails,
|
|
|
+ |d.reg_status bg_reg_status,
|
|
|
+ |d.province_code bg_province_code,
|
|
|
+ |d.province_name bg_province_name,
|
|
|
+ |d.city_code bg_city_code,
|
|
|
+ |d.city_name bg_city_name,
|
|
|
+ |d.county_code bg_county_code,
|
|
|
+ |d.county_name bg_county_name,
|
|
|
+ |d.reg_location bg_reg_location,
|
|
|
+ |d.estiblish_time bg_estiblish_time,
|
|
|
+ |d.category_code bg_category_code,
|
|
|
+ |d.category_first bg_category_first,
|
|
|
+ |d.category_second bg_category_second,
|
|
|
+ |d.category_third bg_category_third,
|
|
|
+ |d.reg_capital bg_reg_capital,
|
|
|
+ |d.phones bg_phones,
|
|
|
+ |d.emails bg_emails,
|
|
|
+ |0 as deleted
|
|
|
+ |FROM (
|
|
|
+ | SELECT a.id
|
|
|
+ | ,a.yg_name
|
|
|
+ | ,b.*
|
|
|
+ | FROM (
|
|
|
+ | SELECT id
|
|
|
+ | ,yg_name
|
|
|
+ | FROM tmp_xf_yg_bg_name
|
|
|
+ | ) a
|
|
|
+ | JOIN (
|
|
|
+ | SELECT *
|
|
|
+ | ,ROW_NUMBER() OVER(PARTITION BY CLEANUP(cname) ORDER BY estiblish_time DESC) AS num
|
|
|
+ | FROM tmp_xf_base_company_mapping_new
|
|
|
+ | WHERE length(cleanup(cname)) > 4
|
|
|
+ | AND company_type NOT IN ('2','8')
|
|
|
+ | ) b
|
|
|
+ | ON cleanup(a.yg_name) = cleanup(b.cname)
|
|
|
+ | )c
|
|
|
+ |JOIN (
|
|
|
+ | SELECT a.id
|
|
|
+ | ,a.bg_name
|
|
|
+ | ,b.*
|
|
|
+ | FROM (
|
|
|
+ | SELECT id
|
|
|
+ | ,bg_name
|
|
|
+ | FROM tmp_xf_yg_bg_name
|
|
|
+ | ) a
|
|
|
+ | JOIN (
|
|
|
+ | SELECT *
|
|
|
+ | ,ROW_NUMBER() OVER(PARTITION BY CLEANUP(cname) ORDER BY estiblish_time DESC) AS num
|
|
|
+ | FROM tmp_xf_base_company_mapping_new
|
|
|
+ | WHERE length(cleanup(cname)) > 4
|
|
|
+ | AND company_type NOT IN ('2','8')
|
|
|
+ | ) b
|
|
|
+ | ON cleanup(a.bg_name) = cleanup(b.cname)
|
|
|
+ | )d
|
|
|
+ |ON c.id = d.id
|
|
|
+ |)e
|
|
|
+ |JOIN
|
|
|
+ |(
|
|
|
+ |SELECT * from (
|
|
|
+ |SELECT cid
|
|
|
+ |,ROW_NUMBER() OVER(PARTITION BY cid ORDER BY publish_date DESC) AS num
|
|
|
+ |from ads_deadbeat_company
|
|
|
+ |where ds > '0' and deleted = 0 and tn <> 'company_zxr_final_case'
|
|
|
+ |)
|
|
|
+ |where num = 1
|
|
|
+ |)f
|
|
|
+ |on e.bg_cid = f.cid
|
|
|
+ |""".stripMargin)
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+}
|