Ver código fonte

Merge remote-tracking branch 'origin/master'

许家凯 4 anos atrás
pai
commit
d832c12dfd

+ 2 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/CalcIncrTotal.scala

@@ -30,6 +30,8 @@ object CalcIncrTotal {
   //winhc_eci_dev company_copyright_works new_cid,reg_num cids
   //winhc_eci_dev company_patent new_cid,pub_number,app_number cids
 
+  //winhc_eci_dev company_court_open_announcement new_cid,case_no,plaintiff,defendant cids
+
 
   def main(args: Array[String]): Unit = {
 

+ 15 - 24
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyForCids.scala

@@ -3,6 +3,8 @@ package com.winhc.bigdata.spark.jobs
 import com.winhc.bigdata.spark.utils.{CompanyForCidsUtils, SparkUtils}
 import org.apache.spark.sql.SparkSession
 
+import scala.collection.mutable
+
 /**
  * 软件著作权 | 作品著作权 | 专利
  * π
@@ -15,32 +17,21 @@ object CompanyForCids {
       "ods_company_patent" -> Seq("pub_number", "title") //作品著作权
     )
 
+  //  winhc_eci_dev company_court_open_announcement case_no,plaintiff,defendant,new_cid
+  //  winhc_eci_dev company_copyright_reg reg_num,new_cid
+
   def main(args: Array[String]): Unit = {
-    val (sourceTable, cols) = valid(args)
-    //    var config = mutable.Map.empty[String, String]
-    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, null)
-    CompanyForCidsUtils(spark, sourceTable, cols).calc()
+    val Array(space, sourceTable, cols) = args
+
+    var config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "10"
+    )
+
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    CompanyForCidsUtils(spark, space ,sourceTable, cols.split(",")).calc()
     spark.stop()
   }
 
-  def valid(args: Array[String]): (String, Seq[String]) = {
-    println(args.toSeq.mkString(" "))
-    if (args.length == 1) {
-
-    } else if (args.length == 2) {
-      val Array(sourceTable, cols) = args
-      return (sourceTable, cols.split(";").toSeq)
-    } else {
-      println("请输入要计算的table!!!! ")
-      sys.exit(-1)
-    }
-    val Array(sourceTable) = args
-
-    val cols: Seq[String] = tabMapping.getOrElse("ods_" + sourceTable, Seq())
-    if (cols.isEmpty) {
-      println("输入表不存在,请配置计算规则!!!   ")
-      sys.exit(-1)
-    }
-    (sourceTable, cols)
-  }
+
 }

+ 50 - 7
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamic.scala

@@ -57,14 +57,18 @@ object CompanyDynamic {
 
     //表名(不加前后辍)
     def calc(tableName: String
-             , bName: Boolean = false //是否补充cname字段
+             , bName: Int = 0 //是否补充cname字段
             ): Unit = {
       val handle = getClazz[CompanyDynamicHandle](s"com.winhc.bigdata.spark.jobs.dynamic.tables.$tableName")
 
       val types = handle.org_type()
+      val colsExclusiveSome = spark.table(s"${project}.ads_change_extract").columns.filter(s => {
+        !s.equals("cid") && !s.equals("data") && !s.equals("old_data") && !s.equals("ds") && !s.equals("tn")
+      }).seq
       val rdd = sql(
         bName match {
-          case false =>
+//默认:无需补全cname字段
+          case 0 =>
             s"""
                |SELECT  *,null AS cname
                |FROM    ${project}.ads_change_extract
@@ -72,7 +76,8 @@ object CompanyDynamic {
                |AND     tn = '$tableName'
                |AND     TYPE in (${types.map("'" + _ + "'").mkString(",")})
                |""".stripMargin
-          case true =>
+//需根据cid补全cname字段数据
+          case 1 =>
             s"""
                |SELECT A.*,B.cname AS cname
                |FROM(
@@ -88,6 +93,42 @@ object CompanyDynamic {
                |) AS B
                |ON A.cid = B.cid
                |""".stripMargin
+//既需根据cid补全cname字段数据;还需在data字段中根据其中的cid添加对应cname数据
+          case 2 =>
+            s"""
+               |SELECT cid, ${colsExclusiveSome.mkString(",")},old_data, cname, STR_TO_MAP(regexp_replace(concat_ws(',',data),'[{"}]',''),',',':') AS data
+               |FROM(
+               |  SELECT new_cid AS cid, ${colsExclusiveSome.mkString(",")},to_json(old_data) AS old_data, COLLECT_SET(cname)[0] AS cname, COLLECT_SET(to_json(data)) AS data
+               |  FROM(
+               |    SELECT new_cid, ${colsExclusiveSome.map("A." + _).mkString(",")}, A.cid, IF(A.data IS NULL, STR_TO_MAP(CONCAT('cname:',IF(cname IS NULL, '',cname)),',',':'), A.data) AS data, old_data, IF(A.new_cid=A.cid,B.cname,null) AS cname
+               |    FROM(
+               |      SELECT cid AS new_cid, cid, ${colsExclusiveSome.mkString(",")},data, old_data
+               |      FROM(
+               |        SELECT  *
+               |        FROM    ${project}.ads_change_extract
+               |        WHERE   ds = '$ds'
+               |        AND     tn = '$tableName'
+               |        AND     TYPE in (${types.map("'" + _ + "'").mkString(",")})
+               |        )
+               |      UNION ALL
+               |      SELECT cid AS new_cid, data['cid'] AS cid, ${colsExclusiveSome.mkString(",")},null AS data, old_data
+               |      FROM(
+               |        SELECT  *
+               |        FROM    ${project}.ads_change_extract
+               |        WHERE   ds = '$ds'
+               |        AND     tn = '$tableName'
+               |        AND     TYPE in (${types.map("'" + _ + "'").mkString(",")})
+               |        )
+               |    ) AS A
+               |    LEFT JOIN (
+               |        SELECT cid,cname FROM  $project.base_company_mapping
+               |        WHERE ds = '${getLastPartitionsOrElse(project + ".base_company_mapping", "0")}'
+               |    ) AS B
+               |    ON A.cid = B.cid
+               |  )
+               |  GROUP BY new_cid, ${colsExclusiveSome.mkString(",")},to_json(old_data)
+               |)
+               |""".stripMargin
         })
         .rdd.flatMap(r => {
         val rowkey = r.getAs[String]("rowkey")
@@ -145,7 +186,7 @@ object CompanyDynamic {
       sys.exit(-99)
     }
 
-    val Array(project, ds, tableName, bName_table) = if (args.length == 4) args else args :+ ""
+    val Array(project, tableName, ds, bName_table) = if (args.length == 4) args else args :+ "0"
 
     println(
       s"""
@@ -162,15 +203,17 @@ object CompanyDynamic {
     val cd = CompanyDynamicUtil(spark, project, ds)
 
     cd.init()
+/*
 
     for (e <- tableName.split(",")) {
       if (e.length > 2) {
-        cd.calc(e)
+        cd.calc(e, bName_table.toInt)
       }
     }
 
-    for (e <- bName_table.split(",")) {
-      cd.calc(e, bName = true)
+*/
+    for (e <- tableName.split(",")) {
+      cd.calc(e, bName_table.toInt)
     }
     spark.stop()
   }

+ 0 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamicHandle.scala

@@ -117,7 +117,6 @@ trait CompanyDynamicHandle {
     ))
   }
 
-
   /**
    * 来源表的变更类型,insert or update
    *

+ 2 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_bid_list.scala

@@ -63,7 +63,8 @@ case class company_bid_list() extends CompanyDynamicHandle {
    */
   override def handle(rowkey: String, bizDate: String, cid: String, change_fields: Seq[String], old_map: Map[String, String], new_map: Map[String, String], cname: String, suggestion: String): Seq[(String, String, String, String, String, String, String, String, String, String)] = {
     val proxyName = new_map("proxy")
-    if (proxyName != null && !proxyName.isEmpty && proxyName.equals(cname)) {
+    val oldCName = new_map("cname")
+    if (proxyName != null && !proxyName.isEmpty && proxyName.equals(oldCName)) {
       null
     }
     else {

+ 41 - 31
src/main/scala/com/winhc/bigdata/spark/utils/CompanyForCidsUtils.scala

@@ -2,6 +2,7 @@ package com.winhc.bigdata.spark.utils
 
 import java.util.Date
 
+import com.winhc.bigdata.spark.udf.CompanyMapping
 import org.apache.spark.sql.SparkSession
 
 import scala.annotation.meta.getter
@@ -11,40 +12,47 @@ import scala.annotation.meta.getter
  * 拆平cids,落表
  */
 
-case class CompanyForCidsUtils(s: SparkSession, sourceTable: String, cols: Seq[String]) extends LoggingUtils {
+case class CompanyForCidsUtils(s: SparkSession, space: String, sourceTable: String, cols: Seq[String]) extends LoggingUtils  with CompanyMapping{
   @(transient@getter) val spark: SparkSession = s
 
   import spark.implicits._
   import spark._
   import org.apache.spark.sql.functions._
 
+  val tabMapping =
+    Map("company_court_open_announcement" -> ("litigant_cids", ";") //开庭公告
+    )
+
   def calc(): Unit = {
     println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
-    val odsTable = s"ods_$sourceTable"
-    val adsListTable = s"ads_${sourceTable}_list"
-    val adsTable = s"ads_$sourceTable"
-    val companyMapping = "company_name_mapping_pro"
+    val odsTable = s"${space}.ods_$sourceTable"
+    val adsListTable = s"${space}.ads_${sourceTable}_list"
+    val adsTable = s"${space}.ads_$sourceTable"
+    val companyMapping = s"${space}.company_name_mapping_pro_v2"
+    prepareFunctions(spark)
+
     val ds = BaseUtil.getPartion(odsTable, spark)
+
+    val (split_cols,delimiter) = tabMapping.getOrElse(sourceTable,("cids",";"))
+
     //table字段
     val columns: Seq[String] = spark.table(odsTable).schema.map(_.name).filter(!_.equals("ds"))
+    val list_columns: Seq[String] = spark.table(adsListTable).schema.map(_.name).filter(!_.equals("ds"))
     val disCol = cols
-
-    sql(s"select * from $odsTable where ds = $ds and cids is not null and trim(cids) <> '' ")
-      .dropDuplicates(disCol)
-      .createOrReplaceTempView("t1")
-
-    sql(s"CACHE TABLE t1")
+    val cols_md5 = disCol.filter(!_.equals("new_cid"))
 
     //拆平新表
     sql(
       s"""
+         |insert overwrite table ${adsListTable} partition (ds=${ds})
          |SELECT
-         |        rowkey,new_cid,${columns.mkString(",")}
+         |        ${list_columns.mkString(",")}
          |FROM    (
          |        SELECT
          |                *
-         |                ,ROW_NUMBER() OVER (PARTITION BY id,new_cid ORDER BY - ABS(CAST(new_cid AS BIGINT )- CAST(cid AS BIGINT )) DESC ) num
-         |                ,CONCAT_WS('_',new_cid,id) AS rowkey
+         |                ,ROW_NUMBER() OVER (PARTITION BY ${disCol.mkString(",")} ORDER BY - ABS(CAST(new_cid AS BIGINT )- CAST(cid AS BIGINT )) DESC ) num
+         |                ,CONCAT_WS('_',new_cid,md5(cleanup(CONCAT_WS('',${cols_md5.mkString(",")})))) AS rowkey
+         |                ,cleanup(CONCAT_WS('',${cols_md5.mkString(",")})) AS cols
          |        FROM    (
          |                SELECT
          |                        c.*
@@ -52,40 +60,42 @@ case class CompanyForCidsUtils(s: SparkSession, sourceTable: String, cols: Seq[S
          |                FROM    (
          |                        SELECT
          |                                *
-         |                        FROM    t1 a
-         |                        LATERAL VIEW explode(split(cids,';')) b AS cid
+         |                        FROM    $odsTable a
+         |                        LATERAL VIEW explode(split($split_cols,'$delimiter')) b AS cid
+         |                        WHERE   a.ds = $ds
+         |                        AND $split_cols is not null
+         |                        AND trim($split_cols) <> ''
          |                        ) c
          |                LEFT JOIN $companyMapping d
          |                ON      c.cid = d.cid
          |                ) e
          |        ) f
-         |WHERE   num =1
+         |WHERE   num =1  AND cols is not null AND trim(cols) <> ''
          |""".stripMargin)
       .createOrReplaceTempView(s"t2")
 
     //聚合新cids
-    val df1 = sql(
+    sql(
       s"""
+         |insert overwrite table ${adsTable} partition (ds=${ds})
          |SELECT
-         |x.new_cids,${columns.mkString(",")}
-         |FROM    t1
+         |md5(cleanup(CONCAT_WS('',${cols_md5.mkString(",")}))) AS rowkey
+         |,x.new_cids
+         |,${columns.mkString(",")}
+         |FROM    $odsTable y
          |LEFT JOIN (
          |              SELECT  id as new_id
          |                      ,concat_ws(';',collect_set(new_cid)) new_cids
-         |              FROM    t2
+         |              FROM    $adsListTable t
+         |              WHERE   ds = $ds
          |              GROUP BY id
          |          ) x
-         |ON      t1.id = x.new_id
-         |""".stripMargin)
-
-    df1.createOrReplaceTempView("t3")
-
-    sql(s"select rowkey,new_cid,${columns.mkString(",")} from t2").show(10)
-    sql(s"select new_cids,${columns.mkString(",")} from t3").show(10)
+         |ON      y.id = x.new_id
+         |WHERE   y.ds = 20200604
+         |AND     $split_cols IS NOT NULL
+         |AND     trim($split_cols) <> ''
+         |""".stripMargin).createOrReplaceTempView("t3")
 
-    //写表
-    sql(s"insert overwrite table ${adsListTable} partition (ds=${ds}) select rowkey,new_cid,${columns.mkString(",")} from t2")
-    sql(s"insert overwrite table ${adsTable} partition (ds=${ds}) select new_cids,${columns.mkString(",")} from t3")
     println(s"${this.getClass.getSimpleName} calc end! " + new Date().toString)
   }
 }

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncSummary.scala

@@ -35,12 +35,12 @@ case class CompanyIncSummary(s: SparkSession,
 
 
     val ads_table_cols = spark.table(ads_table).columns.filter(l => {
-      !l.equals("ds") && !l.equals("rowkey") && !l.equals("flag") && !l.equals("new_cids") && !l.equals("cids") && !l.equals("cid")
+      !l.equals("ds") && !l.equals("rowkey") && !l.equals("flag") && !l.equals("new_cids") && !l.equals("cids") && !l.equals("cid") && !l.equals("new_litigant_cids")
     }).toList.sorted
 
 
     val inc_ads_table_cols = spark.table(inc_ads_table).columns.filter(l => {
-      !l.equals("ds") && !l.equals("rowkey") && !l.equals("flag") && !l.equals("new_cids") && !l.equals("cids") && !l.equals("cid")
+      !l.equals("ds") && !l.equals("rowkey") && !l.equals("flag") && !l.equals("new_cids") && !l.equals("cids") && !l.equals("cid") && !l.equals("new_litigant_cids")
     }).toList.sorted
 
 

+ 23 - 12
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForCidsUtils.scala

@@ -22,7 +22,7 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
   @(transient@getter) val spark: SparkSession = s
 
   val tabMapping =
-    Map("company_court_register" -> ("litigant_cids", ";") //立案信息
+    Map("company_court_open_announcement" -> ("litigant_cids", ";") //开庭公告
     )
 
   def calc(): Unit = {
@@ -46,6 +46,11 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
       !s.equals("ds")
     }).seq
 
+
+    val subRes = spark.table(inc_ads_company_tb).columns.filter(s => {
+      !s.equals("ds")
+    }).seq
+
     //存量表ads最新分区
     val remainDs = BaseUtil.getPartion(ads_company_tb, spark)
 
@@ -87,9 +92,15 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
 
     //table字段
     val columns: Seq[String] = spark.table(ads_company_tb).schema.map(_.name).filter(s => {
-      !s.equals("ds") && !s.equals("cid") && !s.equals("new_cid") && !s.equals("rowkey") && !s.equals("cids") && !s.equals("new_cids")
+      !s.equals("ds") && !s.equals("cid") && !s.equals("new_cid") && !s.equals("rowkey") && !s.equals("cids") && !s.equals("new_cids") && !s.equals("new_litigant_cids") && !s.equals("litigant_cids")
     })
 
+    //判断字段是否有重复字段
+    var f= "flag"
+    if(sublistTableFieldName.contains(f)){
+      f = "update_flag"
+    }
+
     //mapping 映射关系
     sql(
       s"""
@@ -114,20 +125,20 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
 
 
     //替换cid,去重,复制老数据
-    val df1 = sql(
+    sql(
       s"""
          |INSERT OVERWRITE TABLE $inc_ads_company_tb_list PARTITION(ds='$lastDsIncOds')
          |SELECT
          |        ${sublistRes.mkString(",")}
          |FROM    (
          |            SELECT  CONCAT_WS( '_',new_cid,md5(cleanup(CONCAT_WS('',${cols_md5.mkString(",")})))) AS rowkey
-         |                    ,flag
+         |                    ,$f
          |                    ,new_cid
          |                    ,cid
          |                    ,${sublistTableFieldName.mkString(",")}
          |                    ,ROW_NUMBER() OVER (PARTITION BY cleanup(CONCAT_WS('',${dupliCols.mkString(",")})) ORDER BY update_time DESC ) num
          |            FROM    (
-         |                        SELECT  "0" AS flag
+         |                        SELECT  "0" AS $f
          |                                ,CAST(new_cid AS STRING) AS new_cid
          |                                ,CAST(cid AS STRING) AS cid
          |                                ,${sublistTableFieldName.mkString(",")}
@@ -142,7 +153,7 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
          |                                            ) e
          |                                ) f
          |                        UNION ALL
-         |                        SELECT  "1" AS flag
+         |                        SELECT  "1" AS $f
          |                                ,CAST(new_cid AS STRING) AS new_cid
          |                                ,CAST(cid AS STRING) AS cid
          |                                ,${sublistTableFieldName.mkString(",")}
@@ -175,15 +186,15 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
     sql(
       s"""
          |INSERT OVERWRITE TABLE  $inc_ads_company_tb PARTITION(ds='$lastDsIncOds')
-         |SELECT  md5(cleanup(CONCAT_WS('',${cols_md5.mkString(",")}))) AS rowkey,
-         |        cids,${columns.mkString(",")}
+         |SELECT  ${subRes.mkString(",")}
          |FROM    (
-         |            SELECT  cids,${columns.mkString(",")}
+         |            SELECT  $split_cols,${sublistTableFieldName.mkString(",")}
          |                    ,ROW_NUMBER() OVER (PARTITION BY id ORDER BY update_time DESC ) num
+         |                    ,md5(cleanup(CONCAT_WS('',${cols_md5.mkString(",")}))) AS rowkey
          |            FROM    ${inc_ods_company_tb}
          |            WHERE   ds >= ${runDs}
-         |            AND     cids IS NOT NULL
-         |            AND     trim(cids) <> ''
+         |            AND     $split_cols IS NOT NULL
+         |            AND     trim($split_cols) <> ''
          |        ) a
          |WHERE   num = 1
          |""".stripMargin)
@@ -200,7 +211,7 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
     ).syn()
 
     //同步增量主表数据
-    val cols = columns ++ Seq("cids")
+    val cols = columns ++ Seq(s"$split_cols")
     MaxComputer2Phoenix(
       spark,
       cols,