|
@@ -18,18 +18,34 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
|
|
|
mainTableName: String, //主表名(不加前辍)
|
|
|
sublistTableName: String, //子表(不加前辍)
|
|
|
dupliCols: Seq[String] // 去重列
|
|
|
- ) extends LoggingUtils with CompanyMapping{
|
|
|
+ ) extends LoggingUtils with CompanyMapping {
|
|
|
@(transient@getter) val spark: SparkSession = s
|
|
|
|
|
|
val tabMapping =
|
|
|
Map("company_court_open_announcement" -> ("litigant_cids", ";") //开庭公告
|
|
|
)
|
|
|
|
|
|
+ val funMap =
|
|
|
+ Map("company_court_announcement.case_no" -> "case_no(content)", //法院公告
|
|
|
+ "company_court_announcement.plaintiff" -> "replace_char(plaintiff)",
|
|
|
+ "company_court_announcement.litigant" -> "replace_char(litigant)"
|
|
|
+ )
|
|
|
+
|
|
|
+ //转换字段
|
|
|
+ def trans(s: String): String = {
|
|
|
+ val key = mainTableName + "." + s
|
|
|
+ var res = s
|
|
|
+ if (funMap.contains(key)) {
|
|
|
+ res = funMap(key)
|
|
|
+ }
|
|
|
+ res
|
|
|
+ }
|
|
|
+
|
|
|
def calc(): Unit = {
|
|
|
println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
|
|
|
|
|
|
prepareFunctions(spark)
|
|
|
- val (split_cols,delimiter) = tabMapping.getOrElse(mainTableName,("cids",";"))
|
|
|
+ val (split_cols, delimiter) = tabMapping.getOrElse(mainTableName, ("cids", ";"))
|
|
|
|
|
|
val inc_ods_company = s"${project}.inc_ods_company" //每日公司基本信息增量
|
|
|
val ads_company_tb = s"${project}.ads_$mainTableName" //存量ads主表数据
|
|
@@ -39,17 +55,17 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
|
|
|
val inc_ads_company_tb_list = s"${project}.inc_ads_$sublistTableName" //增量数据ads 子表
|
|
|
|
|
|
val sublistTableFieldName = spark.table(ads_company_tb_list).columns.filter(s => {
|
|
|
- !s.equals("ds") && !s.equals("new_cid") && !s.equals("rowkey") && !s.equals("cids") && !s.equals("new_cids")&& !s.equals("cid")
|
|
|
+ !s.equals("ds") && !s.equals("new_cid") && !s.equals("rowkey") && !s.equals("cids") && !s.equals("new_cids") && !s.equals("cid")
|
|
|
}).seq
|
|
|
|
|
|
val sublistRes = spark.table(inc_ads_company_tb_list).columns.filter(s => {
|
|
|
!s.equals("ds")
|
|
|
- }).seq
|
|
|
+ }).map(trans).seq
|
|
|
|
|
|
|
|
|
val subRes = spark.table(inc_ads_company_tb).columns.filter(s => {
|
|
|
!s.equals("ds")
|
|
|
- }).seq
|
|
|
+ }).map(trans).seq
|
|
|
|
|
|
//存量表ads最新分区
|
|
|
val remainDs = BaseUtil.getPartion(ads_company_tb, spark)
|
|
@@ -71,7 +87,7 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
|
|
|
runDs = BaseUtil.atDaysAfter(1, lastDsIncAds)
|
|
|
}
|
|
|
|
|
|
- val cols_md5 = dupliCols.filter(!_.equals("new_cid"))
|
|
|
+ val cols_md5 = dupliCols.filter(!_.equals("new_cid")).map(trans)
|
|
|
|
|
|
//增量ods和增量ads最后一个分区相等,跳出
|
|
|
if (lastDsIncOds.equals(lastDsIncAds)) {
|
|
@@ -96,8 +112,8 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
|
|
|
})
|
|
|
|
|
|
//判断字段是否有重复字段
|
|
|
- var f= "flag"
|
|
|
- if(sublistTableFieldName.contains(f)){
|
|
|
+ var f = "flag"
|
|
|
+ if (sublistTableFieldName.contains(f)) {
|
|
|
f = "update_flag"
|
|
|
}
|
|
|
|
|
@@ -136,7 +152,7 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
|
|
|
| ,new_cid
|
|
|
| ,cid
|
|
|
| ,${sublistTableFieldName.mkString(",")}
|
|
|
- | ,ROW_NUMBER() OVER (PARTITION BY cleanup(CONCAT_WS('',${dupliCols.mkString(",")})) ORDER BY update_time DESC ) num
|
|
|
+ | ,ROW_NUMBER() OVER (PARTITION BY cleanup(CONCAT_WS('',${dupliCols.map(trans).mkString(",")})) ORDER BY update_time DESC ) num
|
|
|
| FROM (
|
|
|
| SELECT "0" AS $f
|
|
|
| ,CAST(new_cid AS STRING) AS new_cid
|
|
@@ -189,7 +205,7 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
|
|
|
|SELECT ${subRes.mkString(",")}
|
|
|
|FROM (
|
|
|
| SELECT $split_cols,${columns.mkString(",")}
|
|
|
- | ,ROW_NUMBER() OVER (PARTITION BY id ORDER BY update_time DESC ) num
|
|
|
+ | ,ROW_NUMBER() OVER (PARTITION BY cleanup(CONCAT_WS('',${cols_md5.mkString(",")})) ORDER BY update_time DESC ) num
|
|
|
| ,md5(cleanup(CONCAT_WS('',${cols_md5.mkString(",")}))) AS rowkey
|
|
|
| FROM ${inc_ods_company_tb}
|
|
|
| WHERE ds >= ${runDs}
|