Browse Source

执行人和终本,人的增量,以及司法案件预处理

lyb 4 years ago
parent
commit
86bb74b9bb

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/etl/PersonIncrSync.scala

@@ -106,7 +106,7 @@ case class PersonIncrSync(s: SparkSession,
     var runDs = ""
     //table字段
     val columns: Seq[String] = spark.table(ads_company_tb).schema.map(_.name).filter(s => {
-      !s.equals("ds") && !s.equals(s"new_$cidCol") && !s.equals("rowkey")
+      !s.equals("ds") && !s.equals(s"new_$cidCol")  && !s.equals("rowkey") && !s.equals("cids")
     })
     //第一次run
     if (StringUtils.isBlank(lastDsIncAds)) {
@@ -174,7 +174,7 @@ case class PersonIncrSync(s: SparkSession,
 
     MaxComputer2Phoenix(
       spark,
-      colsTotal.filter(!_.equals("cid")).filter(!_.equals("new_cid")),
+      colsTotal.filter(!_.equals("cid")).filter(!_.equals("new_cid")).filter(!_.equals("cids")).filter(!_.equals("new_cids")),
       inc_ads_company_tb,
       tableName+"_PERSON",
       lastDsIncOds,

+ 26 - 91
src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JudicialCaseRelationPre10.scala

@@ -81,89 +81,10 @@ case class JudicialCaseRelationPre10(s: SparkSession, project: String
     //被执行人预处理(个人)
     val columns: Seq[String] = spark.table(s"$project.inc_ads_company_zxr_person").schema.map(_.name).filter(_!="flag")
     lastDsIncAds = BaseUtil.getPartion(s"$project.inc_ads_company_zxr_person", spark)
+    //身份证补全表最新分区
+    val personIdDs = BaseUtil.getPartion("ads_person_idcard_cloze", spark)
+    is_id_card()
     spark.sparkContext.setJobDescription(s"处理zxr_person($lastDsIncAds)")
-    //1、先从被执行人中用name和case_no关联补全身份证号码
-    sql(
-      s"""
-         |--先从company_zxr_person存量、增量表获取
-         |SELECT rowkey
-         |    ,0 AS flag
-         |    ,new_cid
-         |    ,cid
-         |    ,id
-         |    ,cname
-         |    ,sex
-         |    ,card
-         |    ,court
-         |    ,case_create_time
-         |    ,A.case_no AS case_no
-         |    ,gist_id
-         |    ,case_state
-         |    ,exec_money
-         |    ,type
-         |    ,source
-         |    ,status
-         |    ,appro_time
-         |    ,A.create_time AS create_time
-         |    ,A.update_time AS update_time
-         |    ,A.deleted AS deleted
-         |FROM(
-         |    SELECT *
-         |    FROM(
-         |        SELECT *
-         |        FROM(
-         |            SELECT *
-         |            ,ROW_NUMBER() OVER (PARTITION BY cleanup(CONCAT_WS('',cname,case_no)) ORDER BY update_time DESC ) num
-         |            FROM(
-         |                SELECT ${columns.mkString(",")}
-         |                FROM $project.ads_company_zxr_person
-         |                WHERE ds>'0'
-         |                UNION ALL
-         |                SELECT ${columns.mkString(",")}
-         |                FROM $project.inc_ads_company_zxr_person
-         |                WHERE ds>'0'
-         |            )
-         |        )
-         |        WHERE num=1
-         |    )
-         | ) A
-         |
-         |""".stripMargin
-    ).createOrReplaceTempView("tmp_person_cloze_1")
-
-    //3、再从自身表中根据name和company_name去补全身份证号码
-//    sql(
-//      s"""
-//         |--先从前面结果表获取
-//         |INSERT OVERWRITE TABLE winhc_eci_dev.ads_company_zxr_person_cloze PARTITION(ds=$lastDsIncAds)
-//         |SELECT A.rowkey
-//         |    ,IF(A.card IS NULL AND D.card IS NOT NULL,3,0) AS flag
-//         |    ,A.new_cid
-//         |    ,A.cid
-//         |    ,A.id
-//         |    ,A.cname
-//         |    ,A.sex
-//         |    ,COALESCE(A.card,D.card) AS card
-//         |    ,A.court
-//         |    ,A.case_create_time
-//         |    ,A.case_no
-//         |    ,A.gist_id
-//         |    ,A.case_state
-//         |    ,A.exec_money
-//         |    ,A.type
-//         |    ,A.source
-//         |    ,A.status
-//         |    ,A.appro_time
-//         |    ,A.create_time
-//         |    ,A.update_time
-//         |    ,A.deleted
-//         |FROM tmp_person_cloze_1 A
-//         |LEFT JOIN (tmp_person_cloze_1 D)
-//         |
-//         |ON A.cname=D.cname and d.card is not null  --根据实际数据情况
-//         |""".stripMargin
-//    )//.createOrReplaceTempView("tmp_person_cloze_3")
-
     sql(
       s"""
          |insert ${if (isWindows) "INTO" else "OVERWRITE"} table $project.ads_judicial_case_relation_pre partition(ds='$lastDsIncAds',tn='zxr_person')
@@ -183,22 +104,36 @@ case class JudicialCaseRelationPre10(s: SparkSession, project: String
          |  ,case_amt
          |from (
          |      select
-         |      md5(cleanup(case_no)) as judicase_id
+         |      md5(cleanup(A.case_no)) as judicase_id
          |      ,"10" as flag
-         |      ,concat_ws('',cname,'被执行人') AS title
-         |      ,concat_ws('',case_type(case_no)) as case_type
+         |      ,concat_ws('',A.cname,'被执行人') AS title
+         |      ,concat_ws('',case_type(A.case_no)) as case_type
          |      ,NULL AS case_reason
-         |      ,case_no
+         |      ,A.case_no
          |      ,court
-         |      ,concat_ws('',case_stage(case_no)) as case_stage
+         |      ,concat_ws('',case_stage(A.case_no)) as case_stage
          |      ,NULL as yg_name
-         |      ,cname as bg_name
-         |      ,case_create_time as date
+         |      ,A.cname as bg_name
+         |      ,case_create_time as date--目前天眼查数据没有执行日期,先以此代替
          |      ,rowkey as detail_id
          |      ,exec_money as case_amt
          |      ,row_number() over(partition by rowkey order by update_time desc) num
-         |      from tmp_person_cloze_1
-         |      where length(case_no) > 0
+         |      from (
+         |        select case_no,court,cname,case_create_time,rowkey,update_time,card
+         |        from $project.ads_company_zxr_person
+         |        where length(case_no) > 0 and ds>'0'
+         |        union all
+         |        select case_no,court,cname,case_create_time,rowkey,update_time,card
+         |        from $project.inc_ads_company_zxr_person
+         |        where length(case_no) > 0 and ds>'0'
+         |      ) A
+         |      left join(
+         |        select name, identity_num, case_no
+         |        from $project.ads_person_idcard_cloze
+         |        where ds=${personIdDs}
+         |      ) B
+         |      on A.cname=B.name AND A.case_no=B.case_no
+         |      where is_id_card(A.card) OR is_id_card(B.identity_num)
          |   )
          |where num = 1
          |""".stripMargin).show(10, false)

+ 25 - 109
src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JudicialCaseRelationPre12.scala

@@ -81,108 +81,10 @@ case class JudicialCaseRelationPre12(s: SparkSession, project: String
     //终本案件预处理(个人)
     val columns: Seq[String] = spark.table(s"$project.inc_ads_company_zxr_final_case_person").schema.map(_.name).filter(_!="flag")
     lastDsIncAds = BaseUtil.getPartion(s"$project.inc_ads_company_zxr_final_case_person", spark)
+    //身份证补全表最新分区
+    val personIdDs = BaseUtil.getPartion("ads_person_idcard_cloze", spark)
+    is_id_card()
     spark.sparkContext.setJobDescription(s"处理zxr_final_case_person($lastDsIncAds)")
-    //1、先从被执行人中用name和case_no关联补全身份证号码
-    sql(
-      s"""
-         |--先从company_zxr_final_case_person存量、增量表获取
-         |SELECT rowkey
-         |    ,IF(A.identity_num IS NULL AND B.card IS NOT NULL,1,0) AS flag
-         |    ,new_cid
-         |    ,cid
-         |    ,id
-         |    ,name
-         |    ,sex
-         |    ,COALESCE(A.identity_num,B.card) AS identity_num
-         |    ,court_name
-         |    ,case_final_time
-         |    ,case_create_time
-         |    ,A.case_no AS case_no
-         |    ,description
-         |    ,no_exec_amount
-         |    ,exec_amount
-         |    ,source
-         |    ,status
-         |    ,appro_time
-         |    ,A.create_time AS create_time
-         |    ,A.update_time AS update_time
-         |    ,A.deleted AS deleted
-         |FROM(
-         |    SELECT *
-         |    FROM(
-         |        SELECT *
-         |        FROM(
-         |            SELECT *
-         |            ,ROW_NUMBER() OVER (PARTITION BY cleanup(CONCAT_WS('',name,case_no)) ORDER BY update_time DESC ) num
-         |            FROM(
-         |                SELECT ${columns.mkString(",")}
-         |                FROM $project.ads_company_zxr_final_case_person
-         |                WHERE ds>'0'
-         |                UNION ALL
-         |                SELECT ${columns.mkString(",")}
-         |                FROM $project.inc_ads_company_zxr_final_case_person
-         |                WHERE ds>'0'
-         |            )
-         |        )
-         |        WHERE num=1
-         |    )
-         | ) A
-         |LEFT JOIN
-         |(
-         |    SELECT *
-         |    FROM(
-         |        SELECT *
-         |        ,ROW_NUMBER() OVER (PARTITION BY cleanup(CONCAT_WS('',cname,case_no)) ORDER BY update_time DESC ) num
-         |        FROM (
-         |            SELECT cname,card,case_no,type,update_time
-         |            FROM $project.ods_company_zxr
-         |            WHERE ds>'0'
-         |            UNION ALL
-         |            SELECT cname,card,case_no,type,update_time
-         |            FROM $project.inc_ods_company_zxr
-         |            WHERE ds>'0'
-         |        )
-         |    )
-         |    WHERE num=1 AND type='1'
-         |) B
-         |ON A.name=B.cname AND A.case_no=B.case_no
-         |""".stripMargin
-    ).createOrReplaceTempView("tmp_person_cloze_1")
-    //2、再从失信人中用name和case_no关联补全身份证号码
-
-    //3、再从自身表中根据name和company_name去补全身份证号码
-//    sql(
-//      s"""
-//         |--先从前面结果表获取
-//         |INSERT OVERWRITE TABLE winhc_eci_dev.ads_company_zxr_final_case_person_cloze PARTITION(ds=$lastDsIncAds)
-//         |SELECT A.rowkey
-//         |    ,IF(A.identity_num IS NULL AND D.identity_num IS NOT NULL,3,0) AS flag
-//         |    ,A.new_cid
-//         |    ,A.cid
-//         |    ,A.id
-//         |    ,A.name
-//         |    ,A.sex
-//         |    ,COALESCE(A.identity_num,D.identity_num) AS identity_num
-//         |    ,A.court_name
-//         |    ,A.case_final_time
-//         |    ,A.case_create_time
-//         |    ,A.case_no
-//         |    ,A.description
-//         |    ,A.no_exec_amount
-//         |    ,A.exec_amount
-//         |    ,A.source
-//         |    ,A.status
-//         |    ,A.appro_time
-//         |    ,A.create_time
-//         |    ,A.update_time
-//         |    ,A.deleted
-//         |FROM tmp_person_cloze_1 A
-//         |LEFT JOIN
-//         |tmp_person_cloze_1 D
-//         |ON A.name=D.name --根据实际数据情况
-//         |""".stripMargin
-//    )//.createOrReplaceTempView("tmp_person_cloze_3")
-
     sql(
       s"""
          |insert ${if (isWindows) "INTO" else "OVERWRITE"} table $project.ads_judicial_case_relation_pre partition(ds='$lastDsIncAds',tn='zxr_final_case_person')
@@ -202,22 +104,36 @@ case class JudicialCaseRelationPre12(s: SparkSession, project: String
          |  ,case_amt
          |from (
          |      select
-         |      md5(cleanup(case_no)) as judicase_id
+         |      md5(cleanup(A.case_no)) as judicase_id
          |      ,"12" as flag
-         |      ,concat_ws('',name,'终本案件') AS title
-         |      ,concat_ws('',case_type(case_no)) as case_type
+         |      ,concat_ws('',A.name,'终本案件') AS title
+         |      ,concat_ws('',case_type(A.case_no)) as case_type
          |      ,NULL AS case_reason
-         |      ,case_no
+         |      ,A.case_no
          |      ,court_name
-         |      ,concat_ws('',case_stage(case_no)) as case_stage
+         |      ,concat_ws('',case_stage(A.case_no)) as case_stage
          |      ,NULL as yg_name
-         |      ,name as bg_name
+         |      ,A.name as bg_name
          |      ,case_create_time as date--目前天眼查数据没有执行日期,先以此代替
          |      ,rowkey as detail_id
          |      ,exec_amount as case_amt
          |      ,row_number() over(partition by rowkey order by update_time desc) num
-         |      from tmp_person_cloze_1
-         |      where length(case_no) > 0
+         |      from (
+         |        select case_no,court_name,name,case_create_time,rowkey,update_time,identity_num
+         |        from $project.ads_company_zxr_final_case_person
+         |        where length(case_no) > 0 and ds>'0'
+         |        union all
+         |        select case_no,court_name,name,case_create_time,rowkey,update_time,identity_num
+         |        from $project.inc_ads_company_zxr_final_case_person
+         |        where length(case_no) > 0 and ds>'0'
+         |      ) A
+         |      left join(
+         |        select name, identity_num, case_no
+         |        from $project.ads_person_idcard_cloze
+         |        where ds=${personIdDs}
+         |      ) B
+         |      on A.name=B.name AND A.case_no=B.case_no
+         |      where is_id_card(A.identity_num) OR is_id_card(B.identity_num)
          |   )
          |where num = 1
          |""".stripMargin).show(10, false)