Procházet zdrojové kódy

存量cid,cids模板修改

xufei před 4 roky
rodič
revize
bfd9a06e3a

+ 2 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/CalcIncrTotal.scala

@@ -30,6 +30,8 @@ object CalcIncrTotal {
   //winhc_eci_dev company_copyright_works new_cid,reg_num cids
   //winhc_eci_dev company_patent new_cid,pub_number,app_number cids
 
+  //winhc_eci_dev company_court_open_announcement new_cid,case_no,plaintiff,defendant cids
+
 
   def main(args: Array[String]): Unit = {
 

+ 2 - 5
src/main/scala/com/winhc/bigdata/spark/utils/CompanyForCidsUtils.scala

@@ -44,7 +44,7 @@ case class CompanyForCidsUtils(s: SparkSession, space: String, sourceTable: Stri
     //拆平新表
     sql(
       s"""
-         |--insert overwrite table ${adsListTable} partition (ds=${ds})
+         |insert overwrite table ${adsListTable} partition (ds=${ds})
          |SELECT
          |        ${list_columns.mkString(",")}
          |FROM    (
@@ -75,7 +75,7 @@ case class CompanyForCidsUtils(s: SparkSession, space: String, sourceTable: Stri
       .createOrReplaceTempView(s"t2")
 
     //聚合新cids
-    val df1 = sql(
+    sql(
       s"""
          |insert overwrite table ${adsTable} partition (ds=${ds})
          |SELECT
@@ -96,9 +96,6 @@ case class CompanyForCidsUtils(s: SparkSession, space: String, sourceTable: Stri
          |AND     trim($split_cols) <> ''
          |""".stripMargin).createOrReplaceTempView("t3")
 
-    //写表
-//    sql(s"insert overwrite table ${adsListTable} partition (ds=${ds}) select rowkey,new_cid,${columns.mkString(",")} from t2")
-    sql(s"insert overwrite table ${adsTable} partition (ds=${ds}) select new_cids,${columns.mkString(",")} from t3")
     println(s"${this.getClass.getSimpleName} calc end! " + new Date().toString)
   }
 }

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncSummary.scala

@@ -35,12 +35,12 @@ case class CompanyIncSummary(s: SparkSession,
 
 
     val ads_table_cols = spark.table(ads_table).columns.filter(l => {
-      !l.equals("ds") && !l.equals("rowkey") && !l.equals("flag") && !l.equals("new_cids") && !l.equals("cids") && !l.equals("cid")
+      !l.equals("ds") && !l.equals("rowkey") && !l.equals("flag") && !l.equals("new_cids") && !l.equals("cids") && !l.equals("cid") && !l.equals("new_litigant_cids")
     }).toList.sorted
 
 
     val inc_ads_table_cols = spark.table(inc_ads_table).columns.filter(l => {
-      !l.equals("ds") && !l.equals("rowkey") && !l.equals("flag") && !l.equals("new_cids") && !l.equals("cids") && !l.equals("cid")
+      !l.equals("ds") && !l.equals("rowkey") && !l.equals("flag") && !l.equals("new_cids") && !l.equals("cids") && !l.equals("cid") && !l.equals("new_litigant_cids")
     }).toList.sorted
 
 

+ 22 - 11
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForCidsUtils.scala

@@ -46,6 +46,11 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
       !s.equals("ds")
     }).seq
 
+
+    val subRes = spark.table(inc_ads_company_tb).columns.filter(s => {
+      !s.equals("ds")
+    }).seq
+
     //存量表ads最新分区
     val remainDs = BaseUtil.getPartion(ads_company_tb, spark)
 
@@ -87,9 +92,15 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
 
     //table字段
     val columns: Seq[String] = spark.table(ads_company_tb).schema.map(_.name).filter(s => {
-      !s.equals("ds") && !s.equals("cid") && !s.equals("new_cid") && !s.equals("rowkey") && !s.equals("cids") && !s.equals("new_cids")
+      !s.equals("ds") && !s.equals("cid") && !s.equals("new_cid") && !s.equals("rowkey") && !s.equals("cids") && !s.equals("new_cids") && !s.equals("new_litigant_cids") && !s.equals("litigant_cids")
     })
 
+    //判断字段是否有重复字段
+    var f= "flag"
+    if(sublistTableFieldName.contains(f)){
+      f = "update_flag"
+    }
+
     //mapping 映射关系
     sql(
       s"""
@@ -114,20 +125,20 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
 
 
     //替换cid,去重,复制老数据
-    val df1 = sql(
+    sql(
       s"""
          |INSERT OVERWRITE TABLE $inc_ads_company_tb_list PARTITION(ds='$lastDsIncOds')
          |SELECT
          |        ${sublistRes.mkString(",")}
          |FROM    (
          |            SELECT  CONCAT_WS( '_',new_cid,md5(cleanup(CONCAT_WS('',${cols_md5.mkString(",")})))) AS rowkey
-         |                    ,flag
+         |                    ,$f
          |                    ,new_cid
          |                    ,cid
          |                    ,${sublistTableFieldName.mkString(",")}
          |                    ,ROW_NUMBER() OVER (PARTITION BY cleanup(CONCAT_WS('',${dupliCols.mkString(",")})) ORDER BY update_time DESC ) num
          |            FROM    (
-         |                        SELECT  "0" AS flag
+         |                        SELECT  "0" AS $f
          |                                ,CAST(new_cid AS STRING) AS new_cid
          |                                ,CAST(cid AS STRING) AS cid
          |                                ,${sublistTableFieldName.mkString(",")}
@@ -142,7 +153,7 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
          |                                            ) e
          |                                ) f
          |                        UNION ALL
-         |                        SELECT  "1" AS flag
+         |                        SELECT  "1" AS $f
          |                                ,CAST(new_cid AS STRING) AS new_cid
          |                                ,CAST(cid AS STRING) AS cid
          |                                ,${sublistTableFieldName.mkString(",")}
@@ -175,15 +186,15 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
     sql(
       s"""
          |INSERT OVERWRITE TABLE  $inc_ads_company_tb PARTITION(ds='$lastDsIncOds')
-         |SELECT  md5(cleanup(CONCAT_WS('',${cols_md5.mkString(",")}))) AS rowkey,
-         |        cids,${columns.mkString(",")}
+         |SELECT  ${subRes.mkString(",")}
          |FROM    (
-         |            SELECT  cids,${columns.mkString(",")}
+         |            SELECT  $split_cols,${sublistTableFieldName.mkString(",")}
          |                    ,ROW_NUMBER() OVER (PARTITION BY id ORDER BY update_time DESC ) num
+         |                    ,md5(cleanup(CONCAT_WS('',${cols_md5.mkString(",")}))) AS rowkey
          |            FROM    ${inc_ods_company_tb}
          |            WHERE   ds >= ${runDs}
-         |            AND     cids IS NOT NULL
-         |            AND     trim(cids) <> ''
+         |            AND     $split_cols IS NOT NULL
+         |            AND     trim($split_cols) <> ''
          |        ) a
          |WHERE   num = 1
          |""".stripMargin)
@@ -200,7 +211,7 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
     ).syn()
 
     //同步增量主表数据
-    val cols = columns ++ Seq("cids")
+    val cols = columns ++ Seq(s"$split_cols")
     MaxComputer2Phoenix(
       spark,
       cols,