Sfoglia il codice sorgente

剔除一对一债权,原告被告cid去重

xufei 4 anni fa
parent
commit
aef690fafe

+ 104 - 94
src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JudicialCaseRelationDebtorRelation.scala

@@ -28,7 +28,7 @@ object JudicialCaseRelationDebtorRelation {
 }
 
 case class JudicialCaseRelationDebtorRelation(s: SparkSession, project: String, ds: String
-                                         ) extends LoggingUtils with CompanyMapping with BaseFunc with CourtRank {
+                                             ) extends LoggingUtils with CompanyMapping with BaseFunc with CourtRank {
   override protected val spark: SparkSession = s
 
   def get_seq_by_index(area_code: Broadcast[Map[String, Seq[String]]], code: String, index: Int): String = {
@@ -81,7 +81,6 @@ case class JudicialCaseRelationDebtorRelation(s: SparkSession, project: String,
       get_area_code(code, m2)
     })
 
-    spark.udf.register("name_aggs", new NameAggs(1000))
     spark.udf.register("case_reason", new CaseReasonAggs(1000))
     //预处理数据
     val t2 = s"ads_judicial_case_relation_pre"
@@ -92,13 +91,16 @@ case class JudicialCaseRelationDebtorRelation(s: SparkSession, project: String,
       t1_ds = t2_ds
     }
 
-    val t5 = s"base_company_mapping" //公司name和cid映射
     val t6 = s"ads_judicial_case_relation_replace_cids" //公司name和cid映射
     val eci_debtor_relation = "ads_eci_debtor_relation_v2"
     val deadbeat_company = "ads_deadbeat_company"
 
     val mapping_ds = BaseUtil.getPartion("winhc_eci_dev.base_company_mapping", spark) //映射表分区
 
+    val columns: Seq[String] = spark.table(s"winhc_eci.$eci_debtor_relation").schema.map(_.name).filter(s => {
+      !s.equals("ds")
+    })
+
     sql(
       s"""
          |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $project.ads_judicial_case_relation_r1
@@ -229,97 +231,105 @@ case class JudicialCaseRelationDebtorRelation(s: SparkSession, project: String,
     //生成债权表
     sql(
       s"""
-        |insert ${if (isWindows) "INTO" else "OVERWRITE"} table winhc_eci.$eci_debtor_relation PARTITION (ds='$t2_ds')
-        |select e.* from (
-        |SELECT
-        |c.id,c.yg_name,d.bg_name,c.new_cid yg_cid,d.new_cid bg_cid,
-        |c.reg_status yg_reg_status,
-        |c.province_code yg_province_code,
-        |c.province_name yg_province_name,
-        |c.city_code yg_city_code,
-        |c.city_name yg_city_name,
-        |c.county_code yg_county_code,
-        |c.county_name yg_county_name,
-        |c.reg_location yg_reg_location,
-        |c.estiblish_time yg_estiblish_time,
-        |c.category_code yg_category_code,
-        |c.category_first yg_category_first,
-        |c.category_second yg_category_second,
-        |c.category_third yg_category_third,
-        |c.reg_capital yg_reg_capital,
-        |c.phones yg_phones,
-        |c.emails yg_emails,
-        |d.reg_status bg_reg_status,
-        |d.province_code bg_province_code,
-        |d.province_name bg_province_name,
-        |d.city_code bg_city_code,
-        |d.city_name bg_city_name,
-        |d.county_code bg_county_code,
-        |d.county_name bg_county_name,
-        |d.reg_location bg_reg_location,
-        |d.estiblish_time bg_estiblish_time,
-        |d.category_code bg_category_code,
-        |d.category_first bg_category_first,
-        |d.category_second bg_category_second,
-        |d.category_third bg_category_third,
-        |d.reg_capital bg_reg_capital,
-        |d.phones bg_phones,
-        |d.emails bg_emails,
-        |0 as deleted
-        |FROM    (
-        |            SELECT  a.id
-        |                    ,a.yg_name
-        |                    ,b.*
-        |            FROM    (
-        |                        SELECT  id
-        |                                ,yg_name
-        |                        FROM    $project.tmp_xf_yg_bg_name
-        |                    ) a
-        |            JOIN    (
-        |                        SELECT  *
-        |                                ,ROW_NUMBER() OVER(PARTITION BY CLEANUP(cname) ORDER BY estiblish_time DESC) AS num
-        |                        FROM    $project.tmp_xf_base_company_mapping_new
-        |                        WHERE   length(cleanup(cname)) > 4
-        |                        AND     company_type NOT IN ('2','8')
-        |                    ) b
-        |            ON      cleanup(a.yg_name) = cleanup(b.cname)
-        |        )c
-        |JOIN    (
-        |            SELECT  a.id
-        |                    ,a.bg_name
-        |                    ,b.*
-        |            FROM    (
-        |                        SELECT  id
-        |                                ,bg_name
-        |                        FROM    $project.tmp_xf_yg_bg_name
-        |                    ) a
-        |            JOIN    (
-        |                        SELECT  *
-        |                                ,ROW_NUMBER() OVER(PARTITION BY CLEANUP(cname) ORDER BY estiblish_time DESC) AS num
-        |                        FROM    $project.tmp_xf_base_company_mapping_new
-        |                        WHERE   length(cleanup(cname)) > 4
-        |                        AND     company_type NOT IN ('2','8')
-        |                    ) b
-        |            ON      cleanup(a.bg_name) = cleanup(b.cname)
-        |        )d
-        |ON      c.id = d.id
-        |)e
-        |JOIN
-        |(
-        |  SELECT  cid
-        |  FROM    (
-        |              SELECT  cid,rowkey,
-        |                      max(deleted) f
-        |              FROM    $project.$deadbeat_company
-        |              WHERE   ds > '0'
-        |              AND     tn <> 'company_zxr_final_case'
-        |              GROUP by cid,rowkey
-        |          )
-        |  WHERE   f = '0'
-        |  GROUP by cid
-        |)f
-        |on e.bg_cid = f.cid
-        |""".stripMargin)
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table winhc_eci.$eci_debtor_relation PARTITION (ds='$t2_ds')
+         |SELECT ${columns.mkString(",")}
+         |FROM (
+         |SELECT e.*
+         |,COUNT(*) over(partition by bg_cid) cnt
+         |FROM (
+         |SELECT
+         |ROW_NUMBER() OVER (PARTITION by c.new_cid,d.new_cid order by c.id desc) num,
+         |c.id,c.yg_name,d.bg_name,c.new_cid yg_cid,d.new_cid bg_cid,
+         |c.reg_status yg_reg_status,
+         |c.province_code yg_province_code,
+         |c.province_name yg_province_name,
+         |c.city_code yg_city_code,
+         |c.city_name yg_city_name,
+         |c.county_code yg_county_code,
+         |c.county_name yg_county_name,
+         |c.reg_location yg_reg_location,
+         |c.estiblish_time yg_estiblish_time,
+         |c.category_code yg_category_code,
+         |c.category_first yg_category_first,
+         |c.category_second yg_category_second,
+         |c.category_third yg_category_third,
+         |c.reg_capital yg_reg_capital,
+         |c.phones yg_phones,
+         |c.emails yg_emails,
+         |d.reg_status bg_reg_status,
+         |d.province_code bg_province_code,
+         |d.province_name bg_province_name,
+         |d.city_code bg_city_code,
+         |d.city_name bg_city_name,
+         |d.county_code bg_county_code,
+         |d.county_name bg_county_name,
+         |d.reg_location bg_reg_location,
+         |d.estiblish_time bg_estiblish_time,
+         |d.category_code bg_category_code,
+         |d.category_first bg_category_first,
+         |d.category_second bg_category_second,
+         |d.category_third bg_category_third,
+         |d.reg_capital bg_reg_capital,
+         |d.phones bg_phones,
+         |d.emails bg_emails,
+         |0 as deleted
+         |FROM    (
+         |            SELECT  a.id
+         |                    ,a.yg_name
+         |                    ,b.*
+         |            FROM    (
+         |                        SELECT  id
+         |                                ,yg_name
+         |                        FROM    $project.tmp_xf_yg_bg_name
+         |                    ) a
+         |            JOIN    (
+         |                        SELECT  *
+         |                                ,ROW_NUMBER() OVER(PARTITION BY CLEANUP(cname) ORDER BY estiblish_time DESC) AS num
+         |                        FROM    $project.tmp_xf_base_company_mapping_new
+         |                        WHERE   length(cleanup(cname)) > 4
+         |                        AND     company_type  IN ('1')
+         |                    ) b
+         |            ON      cleanup(a.yg_name) = cleanup(b.cname)
+         |        )c
+         |JOIN    (
+         |            SELECT  a.id
+         |                    ,a.bg_name
+         |                    ,b.*
+         |            FROM    (
+         |                        SELECT  id
+         |                                ,bg_name
+         |                        FROM    $project.tmp_xf_yg_bg_name
+         |                    ) a
+         |            JOIN    (
+         |                        SELECT  *
+         |                                ,ROW_NUMBER() OVER(PARTITION BY CLEANUP(cname) ORDER BY estiblish_time DESC) AS num
+         |                        FROM    $project.tmp_xf_base_company_mapping_new
+         |                        WHERE   length(cleanup(cname)) > 4
+         |                        AND     company_type  IN ('1')
+         |                    ) b
+         |            ON      cleanup(a.bg_name) = cleanup(b.cname)
+         |        )d
+         |ON      c.id = d.id
+         |)e
+         |JOIN
+         |(
+         |  SELECT  cid
+         |  FROM    (
+         |              SELECT  cid,rowkey,
+         |                      max(deleted) f
+         |              FROM    $project.$deadbeat_company
+         |              WHERE   ds > '0'
+         |              AND     tn <> 'company_zxr_final_case'
+         |              GROUP by cid,rowkey
+         |          )
+         |  WHERE   f = '0'
+         |  GROUP by cid
+         |)f
+         |on e.bg_cid = f.cid
+         |where e.num = 1
+         |)
+         |where cnt > 1
+         |""".stripMargin)
 
   }