Browse Source

feat:通用自然人身份证号码及企业cid补全

晏永年 4 years ago
parent
commit
d1b1c6ff23

+ 136 - 0
src/main/scala/com/winhc/bigdata/spark/utils/Company_Completion_Utils.scala

@@ -0,0 +1,136 @@
+package com.winhc.bigdata.spark.utils
+
+import java.util.Date
+
+import com.winhc.bigdata.spark.udf.{BaseFunc, CompanyMapping}
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.SparkSession
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @Author yyn
+ * @Date 2020/10/14
+ * @Description 企业CID补全
+ */
+case class Company_Completion_Utils(s: SparkSession,
+                                    project: String, //表所在工程名
+                                    isNeedVerify: Boolean //是否需要验证,如所有表增量表的分区是否对齐(相等)
+                                  ) extends LoggingUtils with BaseFunc with CompanyMapping {
+  @(transient@getter) val spark: SparkSession = s
+
+  def calc(): Unit = {
+    println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
+
+    prepareFunctions(spark)
+
+    //参与补全的表
+    var mapTables = new mutable.HashMap[String, (String, String, String, String)]()
+    mapTables("company_zxr") = ("cids", "cname", "card", "1")
+    mapTables("company_dishonest_info") = ("cid", "name", "card_num", "2")
+    mapTables("company_zxr_final_case") = ("cid", "name", "identity_num", "3")
+    mapTables("company_zxr_restrict") = ("cid", "coalesce(company_name,company_info)", "identity_num", "4")
+    var lastDsIncOds: String = ""
+    var minDs: String = ""
+    var maxDs: String = ""
+    mapTables.foreach(m => {
+      if (lastDsIncOds == "") {
+        lastDsIncOds = BaseUtil.getPartion("inc_ods_" + m._1, spark)
+        minDs = lastDsIncOds
+        maxDs = minDs
+      }
+      else {
+        if (Integer.parseInt(lastDsIncOds) < Integer.parseInt(BaseUtil.getPartion("inc_ods_" + m._1, spark))) {
+          minDs = lastDsIncOds
+          maxDs = BaseUtil.getPartion("inc_ods_" + m._1, spark)
+        }
+        else {
+          minDs = BaseUtil.getPartion("inc_ods_" + m._1, spark)
+          maxDs = lastDsIncOds
+        }
+      }
+    })
+    //增量数据表没有准备好(无分区)时错误退出
+    if (StringUtils.isBlank(minDs)) {
+      println("all tables have not inc's partition !!!")
+      sys.exit(-1)
+    }
+    //要求验证时发现各个表的分区未对齐(不相等)时错误退出
+    if (isNeedVerify == true && minDs != maxDs) {
+      println("not all tables have the same partition of newest !!!")
+      sys.exit(-1)
+    }
+    id_card_trimOrRaw_udf()
+    lastDsIncOds = minDs
+    spark.sparkContext.setJobDescription(s"补全身份证号码:zxr_restrict_person($lastDsIncOds)")
+    sql(mapTables.map(m => {
+      s"""
+         |SELECT SPLIT(${m._2._1},';')[0] AS cid, ${m._2._2} AS company_name, ${m._2._3} AS organization_code, ${m._2._4} AS source, ${m._2._4} AS flag
+         |FROM ods_${m._1}
+         |WHERE ds>'0' AND ${m._2._2} IS NOT NULL AND LENGTH(${m._2._2})>=5
+         |UNION ALL
+         |SELECT SPLIT(${m._2._1},';')[0] AS cid, ${m._2._2} AS company_name, ${m._2._3} AS organization_code, ${m._2._4} AS source, ${m._2._4} AS flag
+         |FROM inc_ods_${m._1}
+         |WHERE ds=$lastDsIncOds AND ${m._2._2} IS NOT NULL AND LENGTH(${m._2._2})>=5
+         |""".stripMargin
+    }).toArray.mkString(" UNION ALL ")
+    ).createOrReplaceTempView("tmp_company_cloze_1")
+
+    //1、根据姓名和案号补全身份证号码,未去重是为了后续根据姓名和公司来补全
+    sql(
+      s"""
+         |SELECT cid, company_name, organization_code, source, flag
+         |FROM(
+         |  SELECT cid, company_name, organization_code, source, flag
+         |        ,ROW_NUMBER() OVER (PARTITION BY company_name ORDER BY cid DESC) num
+         |  FROM tmp_company_cloze_1
+         |)
+         |WHERE num=1
+         |""".stripMargin
+    ).createOrReplaceTempView("tmp_company_cloze_2")
+    //2、根据姓名和公司来补全身份证号码
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table $project.ads_company_cloze partition(ds='$lastDsIncOds')
+         |SELECT cid, company_name, organization_code, source, flag
+         |FROM(
+         |  SELECT cid, company_name, organization_code, source, flag
+         |         ,ROW_NUMBER() OVER (PARTITION BY cid,company_name ORDER BY cid DESC) num
+         |  FROM tmp_company_cloze_2
+         |)
+         |WHERE num=1
+         |""".stripMargin
+    )
+    //    CompanyIncSummary(spark, project, tableName, "new_cid", dupliCols).calc
+
+    println(s"${this.getClass.getSimpleName} calc end! " + new Date().toString)
+  }
+}
+object Company_Completion_Utils {
+
+  def main(args: Array[String]): Unit = {
+
+    val Array(project, flag) = args
+    println(
+      s"""
+         |project: $project
+         |flag: $flag
+         |""".stripMargin)
+    if (args.length < 2) {
+      println("请输入 project:项目, tableName:表名, flag:标识!!!")
+      sys.exit(-1)
+    }
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "100"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    Company_Completion_Utils(spark, project, flag.toBoolean).calc()
+    spark.stop()
+  }
+}
+
+
+

+ 151 - 0
src/main/scala/com/winhc/bigdata/spark/utils/IDCard_Completion_Utils.scala

@@ -0,0 +1,151 @@
+package com.winhc.bigdata.spark.utils
+
+import java.util.Date
+
+import com.winhc.bigdata.spark.udf.{BaseFunc, CompanyMapping}
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.SparkSession
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @Author yyn
+ * @Date 2020/10/13
+ * @Description 身份证补全
+ */
+case class IDCard_Completion_Utils(s: SparkSession,
+                                   project: String, //表所在工程名
+                                   isNeedVerify: Boolean //是否需要验证,如所有表增量表的分区是否对齐(相等)
+                                  ) extends LoggingUtils with BaseFunc with CompanyMapping {
+  @(transient@getter) val spark: SparkSession = s
+
+  def calc(): Unit = {
+    println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
+
+    prepareFunctions(spark)
+
+    //参与补全的表
+    var mapTables = new mutable.HashMap[String, (String, String, String, String, String, String, String)]()
+    mapTables("company_zxr") = ("cids", "cname", "card", "null", "case_no", "court", "1")
+    mapTables("company_dishonest_info") = ("cid", "name", "card_num", "null", "case_no", "court", "2")
+    mapTables("company_zxr_final_case") = ("cid", "name", "identity_num", "null", "case_no", "court_name", "3")
+    mapTables("company_zxr_restrict") = ("cid", "name", "identity_num", "coalesce(company_name,company_info)", "court_name", "case_no", "4")
+    var lastDsIncOds: String = ""
+    var minDs: String = ""
+    var maxDs: String = ""
+    mapTables.foreach(m => {
+      if (lastDsIncOds == "") {
+        lastDsIncOds = BaseUtil.getPartion("inc_ods_" + m._1, spark)
+        minDs = lastDsIncOds
+        maxDs = minDs
+      }
+      else {
+        if (Integer.parseInt(lastDsIncOds) < Integer.parseInt(BaseUtil.getPartion("inc_ods_" + m._1, spark))) {
+          minDs = lastDsIncOds
+          maxDs = BaseUtil.getPartion("inc_ods_" + m._1, spark)
+        }
+        else {
+          minDs = BaseUtil.getPartion("inc_ods_" + m._1, spark)
+          maxDs = lastDsIncOds
+        }
+      }
+    })
+    //增量数据表没有准备好(无分区)时错误退出
+    if (StringUtils.isBlank(minDs)) {
+      println("all tables have not inc's partition !!!")
+      sys.exit(-1)
+    }
+    //要求验证时发现各个表的分区未对齐(不相等)时错误退出
+    if (isNeedVerify == true && minDs != maxDs) {
+      println("not all tables have the same partition of newest !!!")
+      sys.exit(-1)
+    }
+    id_card_trimOrRaw_udf()
+    lastDsIncOds = minDs
+    spark.sparkContext.setJobDescription(s"补全身份证号码:zxr_restrict_person($lastDsIncOds)")
+    sql(mapTables.map(m => {
+      s"""
+         |SELECT ${m._2._2} AS name, ${m._2._3} AS identity_num, ${m._2._4} AS company_name, ${m._2._5} AS case_no, ${m._2._6} AS court_name, ${m._2._7} AS source, ${m._2._7} AS flag
+         |FROM ods_${m._1}
+         |WHERE ds>'0' AND ${m._2._1} IS NULL
+         |UNION ALL
+         |SELECT ${m._2._1} AS name, ${m._2._2} AS identity_num, ${m._2._3} AS company_name, ${m._2._4} AS case_no, ${m._2._5} AS court_name, ${m._2._6} AS source, ${m._2._6} AS flag
+         |FROM inc_ods_${m._1}
+         |WHERE ds=$lastDsIncOds AND ${m._2._1} IS NULL
+         |""".stripMargin
+    }).toArray.mkString(" UNION ALL ")
+    ).where("name IS NOT NULL AND case_no IS NOT NULL AND LENGTH(name)>0 AND LENGTH(case_no)>0")
+      .createOrReplaceTempView("tmp_person_idcard_cloze_1")
+
+    //1、根据姓名和案号补全身份证号码,未去重是为了后续根据姓名和公司来补全
+    sql(
+      s"""
+         |SELECT name, identity_num, company_name, case_no, court_name, source, flag
+         |FROM(
+         |  SELECT name, identity_num, company_name, case_no, court_name, source, flag
+         |        ,ROW_NUMBER() OVER (PARTITION BY name,case_no,source ORDER BY identity_num DESC) num
+         |  FROM tmp_person_idcard_cloze_1
+         |)
+         |WHERE num=1
+         |""".stripMargin
+    ).createOrReplaceTempView("tmp_person_idcard_cloze_2")
+    //2、根据姓名和公司来补全身份证号码
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table $project.ads_person_idcard_cloze partition(ds='$lastDsIncOds')
+         |SELECT name, id_card_trimOrRaw(identity_num), company_name, case_no, court_name, source, flag
+         |FROM(
+         |  SELECT name, identity_num, company_name, case_no, court_name, source, flag
+         |         ,ROW_NUMBER() OVER (PARTITION BY name,case_no ORDER BY identity_num DESC) num
+         |  FROM(
+         |    SELECT A.name
+         |          ,IF(LENGTH(A.identity_num)<2 AND LENGTH(B.identity_num)>10,B.identity_num,A.identity_num) AS identity_num
+         |          ,A.company_name
+         |          ,A.case_no
+         |          ,A.court_name
+         |          ,A.source
+         |          ,IF(LENGTH(A.identity_num)<2 AND LENGTH(B.identity_num)>10,B.flag,A.flag) AS flag
+         |    FROM tmp_person_idcard_cloze_2 A
+         |    LEFT JOIN
+         |    (SELECT name, identity_num, company_name, case_no, court_name, source, flag
+         |    FROM tmp_person_idcard_cloze_2
+         |    WHERE company_name IS NOT NULL AND LENGTH(company_name)>1
+         |    ) B
+         |    ON A.name=B.name AND A.company_name=B.company_name
+         |  )
+         |)
+         |WHERE num=1
+         |""".stripMargin
+    )
+    //    CompanyIncSummary(spark, project, tableName, "new_cid", dupliCols).calc
+
+    println(s"${this.getClass.getSimpleName} calc end! " + new Date().toString)
+  }
+}
+
+object IDCard_Completion_Utils {
+
+  def main(args: Array[String]): Unit = {
+
+    val Array(project, flag) = args
+    println(
+      s"""
+         |project: $project
+         |flag: $flag
+         |""".stripMargin)
+    if (args.length < 2) {
+      println("请输入 project:项目, tableName:表名, flag:标识!!!")
+      sys.exit(-1)
+    }
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "100"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    IDCard_Completion_Utils(spark, project, flag.toBoolean).calc()
+    spark.stop()
+  }
+}
+