晏永年 пре 4 година
родитељ
комит
0457931c60

+ 5 - 5
src/main/scala/com/winhc/bigdata/spark/etl/PersonIncrSync.scala

@@ -88,12 +88,12 @@ case class PersonIncrSync(s: SparkSession,
          |        ,cid
          |        ,${columns.filter(s=>{s!="cid" && s!="new_cid"}).mkString(",")}
          |FROM    (
-         |            SELECT  CONCAT_WS('_',MD5(cleanup(${dupliCols(0)})),MD5(cleanup(${cols_md5.drop(1).mkString("")}))) AS rowkey
+         |            SELECT  CONCAT_WS('_',MD5(cleanup(${dupliCols.map(s => s"a.$s").toVector(0)})),MD5(cleanup(${cols_md5.drop(1).map(s => s"a.$s").mkString("")}))) AS rowkey
          |                    ,a.flag
-         |                    ,MD5(cleanup(${dupliCols(0)})) AS new_cid
+         |                    ,MD5(cleanup(${dupliCols.map(s => s"a.$s").toVector(0)})) AS new_cid
          |                    ,null AS cid
-         |                    ,${columns.filter(s=>{s!="cid" && s!="new_cid"}).mkString(",").replace("${idCardCol}","IF(a.${idCardCol} IS NULL AND c.identity_num IS NOT NULL,c.identity_num,a.${idCardCol}) AS ${idCardCol}")}
-         |                    ,ROW_NUMBER() OVER (PARTITION BY cleanup(CONCAT_WS('',${dupliCols.mkString(",")})) ORDER BY NVL($updateCol,update_time) DESC ) num
+         |                    ,${columns.filter(s=>{s!="cid" && s!="new_cid"}).map(s => s"a.$s").mkString(",").replace("${idCardCol}","IF(a.${idCardCol} IS NULL AND c.identity_num IS NOT NULL,c.identity_num,a.${idCardCol}) AS ${idCardCol}")}
+         |                    ,ROW_NUMBER() OVER (PARTITION BY cleanup(CONCAT_WS('',${dupliCols.map(s => s"a.$s").mkString(",")})) ORDER BY NVL($updateCol,update_time) DESC ) num
          |            FROM    (
          |                        SELECT  "1" AS flag
          |                                ,${columns.mkString(",")}
@@ -105,7 +105,7 @@ case class PersonIncrSync(s: SparkSession,
          |             LEFT JOIN
          |                    (
          |                        SELECT *
-         |                        FROM ads_person_idcard_cloze
+         |                        FROM ${project}.ads_person_idcard_cloze
          |                        WHERE ds=${personIdDs}
          |                    ) i
          |             ON a.name=i.name AND a.case_no=i.case_no

+ 13 - 7
src/main/scala/com/winhc/bigdata/spark/utils/Company_Completion_Utils.scala

@@ -64,16 +64,16 @@ case class Company_Completion_Utils(s: SparkSession,
     }
     id_card_trimOrRaw_udf()
     lastDsIncOds = minDs
-    spark.sparkContext.setJobDescription(s"补全身份证号码:zxr_restrict_person($lastDsIncOds)")
+    spark.sparkContext.setJobDescription(s"补全企业cid:${mapTables.size}个表+企业映射表聚合($lastDsIncOds)")
     sql(mapTables.map(m => {
       s"""
          |SELECT SPLIT(${m._2._1},';')[0] AS cid, ${m._2._2} AS company_name, ${m._2._3} AS organization_code, ${m._2._4} AS source, ${m._2._4} AS flag
-         |FROM ods_${m._1}
+         |FROM $project.ods_${m._1}
          |WHERE ds>'0' AND ${m._2._2} IS NOT NULL AND LENGTH(${m._2._2})>=5
          |UNION ALL
          |SELECT SPLIT(${m._2._1},';')[0] AS cid, ${m._2._2} AS company_name, ${m._2._3} AS organization_code, ${m._2._4} AS source, ${m._2._4} AS flag
-         |FROM inc_ods_${m._1}
-         |WHERE ds=$lastDsIncOds AND ${m._2._2} IS NOT NULL AND LENGTH(${m._2._2})>=5
+         |FROM $project.inc_ods_${m._1}
+         |WHERE ds>'0' AND ${m._2._2} IS NOT NULL AND LENGTH(${m._2._2})>=5
          |""".stripMargin
     }).toArray.mkString(" UNION ALL ")
     ).createOrReplaceTempView("tmp_company_cloze_1")
@@ -96,9 +96,15 @@ case class Company_Completion_Utils(s: SparkSession,
          |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table $project.ads_company_cloze partition(ds='$lastDsIncOds')
          |SELECT cid, company_name, organization_code, source, flag
          |FROM(
-         |  SELECT cid, company_name, organization_code, source, flag
-         |         ,ROW_NUMBER() OVER (PARTITION BY cid,company_name ORDER BY cid DESC) num
-         |  FROM tmp_company_cloze_2
+         |  SELECT IF(A.cid IS NULL AND B.new_cid IS NOT NULL,B.new_cid,A.cid) AS cid
+         |        ,A.company_name
+         |        ,A.organization_code
+         |        ,A.source
+         |        ,IF(A.cid IS NULL AND B.new_cid IS NOT NULL,-1,A.flag) AS flag
+         |        ,ROW_NUMBER() OVER (PARTITION BY A.cid,A.company_name ORDER BY A.cid DESC) num
+         |  FROM tmp_company_cloze_2 A
+         |  LEFT JOIN $project.base_company_mapping B
+         |  ON A.company_name=B.cname
          |)
          |WHERE num=1
          |""".stripMargin

+ 4 - 4
src/main/scala/com/winhc/bigdata/spark/utils/IDCard_Completion_Utils.scala

@@ -64,16 +64,16 @@ case class IDCard_Completion_Utils(s: SparkSession,
     }
     id_card_trimOrRaw_udf()
     lastDsIncOds = minDs
-    spark.sparkContext.setJobDescription(s"补全身份证号码:zxr_restrict_person($lastDsIncOds)")
+    spark.sparkContext.setJobDescription(s"补全身份证号码:${mapTables.size}个表聚合($lastDsIncOds)")
     sql(mapTables.map(m => {
       s"""
          |SELECT ${m._2._2} AS name, ${m._2._3} AS identity_num, ${m._2._4} AS company_name, ${m._2._5} AS case_no, ${m._2._6} AS court_name, ${m._2._7} AS source, ${m._2._7} AS flag
-         |FROM ods_${m._1}
+         |FROM $project.ods_${m._1}
          |WHERE ds>'0' AND ${m._2._1} IS NULL
          |UNION ALL
          |SELECT ${m._2._1} AS name, ${m._2._2} AS identity_num, ${m._2._3} AS company_name, ${m._2._4} AS case_no, ${m._2._5} AS court_name, ${m._2._6} AS source, ${m._2._6} AS flag
-         |FROM inc_ods_${m._1}
-         |WHERE ds=$lastDsIncOds AND ${m._2._1} IS NULL
+         |FROM $project.inc_ods_${m._1}
+         |WHERE ds>'0' AND ${m._2._1} IS NULL
          |""".stripMargin
     }).toArray.mkString(" UNION ALL ")
     ).where("name IS NOT NULL AND case_no IS NOT NULL AND LENGTH(name)>0 AND LENGTH(case_no)>0")