|
@@ -1,12 +1,11 @@
|
|
package com.winhc.bigdata.spark.utils
|
|
package com.winhc.bigdata.spark.utils
|
|
|
|
|
|
-import java.util.Date
|
|
|
|
-
|
|
|
|
import com.winhc.bigdata.spark.udf.{BaseFunc, CompanyMapping}
|
|
import com.winhc.bigdata.spark.udf.{BaseFunc, CompanyMapping}
|
|
import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
|
|
import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
|
|
import org.apache.commons.lang3.StringUtils
|
|
import org.apache.commons.lang3.StringUtils
|
|
import org.apache.spark.sql.SparkSession
|
|
import org.apache.spark.sql.SparkSession
|
|
|
|
|
|
|
|
+import java.util.Date
|
|
import scala.annotation.meta.getter
|
|
import scala.annotation.meta.getter
|
|
import scala.collection.mutable
|
|
import scala.collection.mutable
|
|
|
|
|
|
@@ -78,6 +77,8 @@ case class Company_Completion_Utils(s: SparkSession,
|
|
}).toArray.mkString(" UNION ALL ")
|
|
}).toArray.mkString(" UNION ALL ")
|
|
).createOrReplaceTempView("tmp_company_cloze_1")
|
|
).createOrReplaceTempView("tmp_company_cloze_1")
|
|
|
|
|
|
|
|
+ val base_mapping_ds = getLastPartitionsOrElse(s"$project.base_company_mapping", "0")
|
|
|
|
+
|
|
//1、根据姓名和案号补全身份证号码,未去重是为了后续根据姓名和公司来补全
|
|
//1、根据姓名和案号补全身份证号码,未去重是为了后续根据姓名和公司来补全
|
|
sql(
|
|
sql(
|
|
s"""
|
|
s"""
|
|
@@ -103,7 +104,7 @@ case class Company_Completion_Utils(s: SparkSession,
|
|
| ,IF(A.cid IS NULL AND B.new_cid IS NOT NULL,-1,A.flag) AS flag
|
|
| ,IF(A.cid IS NULL AND B.new_cid IS NOT NULL,-1,A.flag) AS flag
|
|
| ,ROW_NUMBER() OVER (PARTITION BY A.cid,A.company_name ORDER BY A.cid DESC) num
|
|
| ,ROW_NUMBER() OVER (PARTITION BY A.cid,A.company_name ORDER BY A.cid DESC) num
|
|
| FROM tmp_company_cloze_2 A
|
|
| FROM tmp_company_cloze_2 A
|
|
- | LEFT JOIN $project.base_company_mapping B
|
|
|
|
|
|
+ | LEFT JOIN (select * from $project.base_company_mapping where ds = '$base_mapping_ds') B
|
|
| ON A.company_name=B.cname
|
|
| ON A.company_name=B.cname
|
|
|)
|
|
|)
|
|
|WHERE num=1
|
|
|WHERE num=1
|