瀏覽代碼

兼容字段转换

xufei 4 年之前
父節點
當前提交
54e1fe0530
共有 1 個文件被更改,包括 26 次插入10 次删除
  1. 26 10
      src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForCidsUtils.scala

+ 26 - 10
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForCidsUtils.scala

@@ -18,18 +18,34 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
                                    mainTableName: String, //主表名(不加前辍)
                                    sublistTableName: String, //子表(不加前辍)
                                    dupliCols: Seq[String] // 去重列
-                                  ) extends LoggingUtils  with CompanyMapping{
+                                  ) extends LoggingUtils with CompanyMapping {
   @(transient@getter) val spark: SparkSession = s
 
   val tabMapping =
     Map("company_court_open_announcement" -> ("litigant_cids", ";") //开庭公告
     )
 
+  val funMap =
+    Map("company_court_announcement.case_no" -> "case_no(content)", //法院公告
+      "company_court_announcement.plaintiff" -> "replace_char(plaintiff)",
+      "company_court_announcement.litigant" -> "replace_char(litigant)"
+    )
+
+  //转换字段
+  def trans(s: String): String = {
+    val key = mainTableName + "." + s
+    var res = s
+    if (funMap.contains(key)) {
+      res = funMap(key)
+    }
+    res
+  }
+
   def calc(): Unit = {
     println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
 
     prepareFunctions(spark)
-    val (split_cols,delimiter) = tabMapping.getOrElse(mainTableName,("cids",";"))
+    val (split_cols, delimiter) = tabMapping.getOrElse(mainTableName, ("cids", ";"))
 
     val inc_ods_company = s"${project}.inc_ods_company" //每日公司基本信息增量
     val ads_company_tb = s"${project}.ads_$mainTableName" //存量ads主表数据
@@ -39,17 +55,17 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
     val inc_ads_company_tb_list = s"${project}.inc_ads_$sublistTableName" //增量数据ads 子表
 
     val sublistTableFieldName = spark.table(ads_company_tb_list).columns.filter(s => {
-      !s.equals("ds") && !s.equals("new_cid") && !s.equals("rowkey") && !s.equals("cids") && !s.equals("new_cids")&& !s.equals("cid")
+      !s.equals("ds") && !s.equals("new_cid") && !s.equals("rowkey") && !s.equals("cids") && !s.equals("new_cids") && !s.equals("cid")
     }).seq
 
     val sublistRes = spark.table(inc_ads_company_tb_list).columns.filter(s => {
       !s.equals("ds")
-    }).seq
+    }).map(trans).seq
 
 
     val subRes = spark.table(inc_ads_company_tb).columns.filter(s => {
       !s.equals("ds")
-    }).seq
+    }).map(trans).seq
 
     //存量表ads最新分区
     val remainDs = BaseUtil.getPartion(ads_company_tb, spark)
@@ -71,7 +87,7 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
       runDs = BaseUtil.atDaysAfter(1, lastDsIncAds)
     }
 
-    val cols_md5 = dupliCols.filter(!_.equals("new_cid"))
+    val cols_md5 = dupliCols.filter(!_.equals("new_cid")).map(trans)
 
     //增量ods和增量ads最后一个分区相等,跳出
     if (lastDsIncOds.equals(lastDsIncAds)) {
@@ -96,8 +112,8 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
     })
 
     //判断字段是否有重复字段
-    var f= "flag"
-    if(sublistTableFieldName.contains(f)){
+    var f = "flag"
+    if (sublistTableFieldName.contains(f)) {
       f = "update_flag"
     }
 
@@ -136,7 +152,7 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
          |                    ,new_cid
          |                    ,cid
          |                    ,${sublistTableFieldName.mkString(",")}
-         |                    ,ROW_NUMBER() OVER (PARTITION BY cleanup(CONCAT_WS('',${dupliCols.mkString(",")})) ORDER BY update_time DESC ) num
+         |                    ,ROW_NUMBER() OVER (PARTITION BY cleanup(CONCAT_WS('',${dupliCols.map(trans).mkString(",")})) ORDER BY update_time DESC ) num
          |            FROM    (
          |                        SELECT  "0" AS $f
          |                                ,CAST(new_cid AS STRING) AS new_cid
@@ -189,7 +205,7 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
          |SELECT  ${subRes.mkString(",")}
          |FROM    (
          |            SELECT  $split_cols,${columns.mkString(",")}
-         |                    ,ROW_NUMBER() OVER (PARTITION BY id ORDER BY update_time DESC ) num
+         |                    ,ROW_NUMBER() OVER (PARTITION BY cleanup(CONCAT_WS('',${cols_md5.mkString(",")})) ORDER BY update_time DESC ) num
          |                    ,md5(cleanup(CONCAT_WS('',${cols_md5.mkString(",")}))) AS rowkey
          |            FROM    ${inc_ods_company_tb}
          |            WHERE   ds >= ${runDs}