Explorar o código

Merge remote-tracking branch 'origin/master'

许家凯 %!s(int64=4) %!d(string=hai) anos
pai
achega
b0a8903811

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncSummary.scala

@@ -35,12 +35,12 @@ case class CompanyIncSummary(s: SparkSession,
 
 
     val ads_table_cols = spark.table(ads_table).columns.filter(l => {
-      !l.equals("ds") && !l.equals("rowkey") && !l.equals("flag") && !l.equals("new_cids") && !l.equals("cids")
+      !l.equals("ds") && !l.equals("rowkey") && !l.equals("flag") && !l.equals("new_cids") && !l.equals("cids") && !l.equals("cid")
     }).toList.sorted
 
 
     val inc_ads_table_cols = spark.table(inc_ads_table).columns.filter(l => {
-      !l.equals("ds") && !l.equals("rowkey") && !l.equals("flag") && !l.equals("new_cids") && !l.equals("cids")
+      !l.equals("ds") && !l.equals("rowkey") && !l.equals("flag") && !l.equals("new_cids") && !l.equals("cids") && !l.equals("cid")
     }).toList.sorted
 
 

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForCidUtils.scala

@@ -114,7 +114,7 @@ case class CompanyIncrForCidUtils(s: SparkSession,
          |                                    SELECT  new_cid AS cid
          |                                            ,${columns.mkString(",")}
          |                                    FROM    ${inc_ads_company_tb}
-         |                                    WHERE   ds > ${remainDs}
+         |                                    WHERE   ds > ${remainDs} AND  ds <  ${runDs}
          |                                ) b
          |                        ON      a.cid = b.cid
          |                        UNION ALL

+ 19 - 8
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForCidsUtils.scala

@@ -21,10 +21,15 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
                                   ) extends LoggingUtils  with CompanyMapping{
   @(transient@getter) val spark: SparkSession = s
 
+  val tabMapping =
+    Map("company_court_register" -> ("litigant_cids", ";") //立案信息
+    )
+
   def calc(): Unit = {
     println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
 
     prepareFunctions(spark)
+    val (split_cols,delimiter) = tabMapping.getOrElse(mainTableName,("cids",";"))
 
     val inc_ods_company = s"${project}.inc_ods_company" //每日公司基本信息增量
     val ads_company_tb = s"${project}.ads_$mainTableName" //存量ads主表数据
@@ -37,6 +42,10 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
       !s.equals("ds") && !s.equals("new_cid") && !s.equals("rowkey") && !s.equals("cids") && !s.equals("new_cids")&& !s.equals("cid")
     }).seq
 
+    val sublistRes = spark.table(inc_ads_company_tb_list).columns.filter(s => {
+      !s.equals("ds")
+    }).seq
+
     //存量表ads最新分区
     val remainDs = BaseUtil.getPartion(ads_company_tb, spark)
 
@@ -97,10 +106,10 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
       s"""
          |SELECT  *
          |FROM    ${inc_ods_company_tb} a
-         |LATERAL VIEW explode(split(cids,';')) b AS cid
+         |LATERAL VIEW explode(split($split_cols,'$delimiter')) b AS cid
          |WHERE   ds >= ${runDs}
-         |AND     cids IS NOT NULL
-         |AND     trim(cids) <> ''
+         |AND     $split_cols IS NOT NULL
+         |AND     trim($split_cols) <> ''
          |""".stripMargin).createOrReplaceTempView("incr_tb")
 
 
@@ -108,19 +117,19 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
     val df1 = sql(
       s"""
          |INSERT OVERWRITE TABLE $inc_ads_company_tb_list PARTITION(ds='$lastDsIncOds')
-         |SELECT  rowkey
-         |        ,flag
-         |        ,new_cid
-         |        ,${sublistTableFieldName.mkString(",")}
+         |SELECT
+         |        ${sublistRes.mkString(",")}
          |FROM    (
          |            SELECT  CONCAT_WS( '_',new_cid,md5(cleanup(CONCAT_WS('',${cols_md5.mkString(",")})))) AS rowkey
          |                    ,flag
          |                    ,new_cid
+         |                    ,cid
          |                    ,${sublistTableFieldName.mkString(",")}
          |                    ,ROW_NUMBER() OVER (PARTITION BY cleanup(CONCAT_WS('',${dupliCols.mkString(",")})) ORDER BY update_time DESC ) num
          |            FROM    (
          |                        SELECT  "0" AS flag
          |                                ,CAST(new_cid AS STRING) AS new_cid
+         |                                ,CAST(cid AS STRING) AS cid
          |                                ,${sublistTableFieldName.mkString(",")}
          |                        FROM    (
          |                                    SELECT  *
@@ -135,16 +144,18 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
          |                        UNION ALL
          |                        SELECT  "1" AS flag
          |                                ,CAST(new_cid AS STRING) AS new_cid
+         |                                ,CAST(cid AS STRING) AS cid
          |                                ,${sublistTableFieldName.mkString(",")}
          |                        FROM    (
          |                                    SELECT  a.new_cid
+         |                                            ,a.cid
          |                                            ,${sublistTableFieldName.mkString(",")}
          |                                    FROM    mapping a
          |                                    JOIN    (
          |                                                SELECT  new_cid AS cid
          |                                                        ,${sublistTableFieldName.mkString(",")}
          |                                                FROM    ${inc_ads_company_tb_list}
-         |                                                WHERE   ds > ${remainDs}
+         |                                                WHERE   ds > ${remainDs} AND  ds <  ${runDs}
          |                                                UNION ALL
          |                                                SELECT  new_cid AS cid
          |                                                        ,${sublistTableFieldName.mkString(",")}