|
@@ -23,7 +23,7 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
|
|
|
|
|
|
val tabMapping =
|
|
|
Map("company_court_open_announcement" -> ("litigant_cids", ";") //开庭公告
|
|
|
- ,"company_send_announcement" -> ("litigant_cids",",")//送达公告
|
|
|
+ , "company_send_announcement" -> ("litigant_cids", ",") //送达公告
|
|
|
)
|
|
|
|
|
|
val funMap =
|
|
@@ -32,6 +32,12 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
|
|
|
"company_court_announcement.litigant" -> "replace_char(litigant)"
|
|
|
)
|
|
|
|
|
|
+ val rowkey_mapping: Map[String, String] =
|
|
|
+ Map(
|
|
|
+ "company_patent.pub_number" -> s"rowkey_trans(pub_number,'$mainTableName')",//专利
|
|
|
+ "company_patent.app_number" -> s"rowkey_trans(app_number,'$mainTableName')"
|
|
|
+ )
|
|
|
+
|
|
|
//转换字段
|
|
|
def trans(s: String): String = {
|
|
|
val key = mainTableName + "." + s
|
|
@@ -42,6 +48,15 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
|
|
|
res
|
|
|
}
|
|
|
|
|
|
+ def trans2(s: String) = {
|
|
|
+ val key = mainTableName + "." + s
|
|
|
+ var res = s
|
|
|
+ if (rowkey_mapping.contains(key)) {
|
|
|
+ res = rowkey_mapping(key)
|
|
|
+ }
|
|
|
+ res
|
|
|
+ }
|
|
|
+
|
|
|
def calc(): Unit = {
|
|
|
println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
|
|
|
|
|
@@ -88,7 +103,7 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
|
|
|
runDs = BaseUtil.atDaysAfter(1, lastDsIncAds)
|
|
|
}
|
|
|
|
|
|
- val cols_md5 = dupliCols.filter(!_.equals("new_cid")).map(trans)
|
|
|
+ val cols_md5 = dupliCols.filter(!_.equals("new_cid")).map(trans).map(trans2)
|
|
|
|
|
|
//增量ods和增量ads最后一个分区相等,跳出
|
|
|
if (lastDsIncOds.equals(lastDsIncAds)) {
|
|
@@ -97,7 +112,7 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
|
|
|
val l1 = sql(s"show partitions $inc_ads_company_tb").collect.toList.map(_.getString(0).split("=")(1)).sorted
|
|
|
if (l1.size > 1) {
|
|
|
runDs = BaseUtil.atDaysAfter(1, l1(l1.size - 2))
|
|
|
- }else{
|
|
|
+ } else {
|
|
|
runDs = firstDsIncOds
|
|
|
}
|
|
|
//sys.exit(-1)
|
|
@@ -159,7 +174,7 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
|
|
|
| ,new_cid
|
|
|
| ,cid
|
|
|
| ,${sublistTableFieldName.mkString(",")}
|
|
|
- | ,ROW_NUMBER() OVER (PARTITION BY cleanup(CONCAT_WS('',${dupliCols.map(trans).mkString(",")})) ORDER BY update_time DESC ) num
|
|
|
+ | ,ROW_NUMBER() OVER (PARTITION BY cleanup(CONCAT_WS('',${dupliCols.map(trans).map(trans2)mkString(",")})) ORDER BY update_time DESC ) num
|
|
|
| FROM (
|
|
|
| SELECT "0" AS $f
|
|
|
| ,CAST(new_cid AS STRING) AS new_cid
|