浏览代码

存量cid,cids模板修改

xufei 4 年之前
父节点
当前提交
e2b0454117

+ 15 - 24
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyForCids.scala

@@ -3,6 +3,8 @@ package com.winhc.bigdata.spark.jobs
 import com.winhc.bigdata.spark.utils.{CompanyForCidsUtils, SparkUtils}
 import org.apache.spark.sql.SparkSession
 
+import scala.collection.mutable
+
 /**
  * 软件著作权 | 作品著作权 | 专利
  * π
@@ -15,32 +17,21 @@ object CompanyForCids {
       "ods_company_patent" -> Seq("pub_number", "title") //作品著作权
     )
 
+  //  winhc_eci_dev company_court_open_announcement case_no,plaintiff,defendant,new_cid
+  //  winhc_eci_dev company_copyright_reg reg_num,new_cid
+
   def main(args: Array[String]): Unit = {
-    val (sourceTable, cols) = valid(args)
-    //    var config = mutable.Map.empty[String, String]
-    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, null)
-    CompanyForCidsUtils(spark, sourceTable, cols).calc()
+    val Array(space, sourceTable, cols) = args
+
+    var config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "10"
+    )
+
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    CompanyForCidsUtils(spark, space ,sourceTable, cols.split(",")).calc()
     spark.stop()
   }
 
-  def valid(args: Array[String]): (String, Seq[String]) = {
-    println(args.toSeq.mkString(" "))
-    if (args.length == 1) {
-
-    } else if (args.length == 2) {
-      val Array(sourceTable, cols) = args
-      return (sourceTable, cols.split(";").toSeq)
-    } else {
-      println("请输入要计算的table!!!! ")
-      sys.exit(-1)
-    }
-    val Array(sourceTable) = args
-
-    val cols: Seq[String] = tabMapping.getOrElse("ods_" + sourceTable, Seq())
-    if (cols.isEmpty) {
-      println("输入表不存在,请配置计算规则!!!   ")
-      sys.exit(-1)
-    }
-    (sourceTable, cols)
-  }
+
 }

+ 41 - 28
src/main/scala/com/winhc/bigdata/spark/utils/CompanyForCidsUtils.scala

@@ -2,6 +2,7 @@ package com.winhc.bigdata.spark.utils
 
 import java.util.Date
 
+import com.winhc.bigdata.spark.udf.CompanyMapping
 import org.apache.spark.sql.SparkSession
 
 import scala.annotation.meta.getter
@@ -11,40 +12,47 @@ import scala.annotation.meta.getter
  * 拆平cids,落表
  */
 
-case class CompanyForCidsUtils(s: SparkSession, sourceTable: String, cols: Seq[String]) extends LoggingUtils {
+case class CompanyForCidsUtils(s: SparkSession, space: String, sourceTable: String, cols: Seq[String]) extends LoggingUtils  with CompanyMapping{
   @(transient@getter) val spark: SparkSession = s
 
   import spark.implicits._
   import spark._
   import org.apache.spark.sql.functions._
 
+  val tabMapping =
+    Map("company_court_open_announcement" -> ("litigant_cids", ";") //开庭公告
+    )
+
   def calc(): Unit = {
     println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
-    val odsTable = s"ods_$sourceTable"
-    val adsListTable = s"ads_${sourceTable}_list"
-    val adsTable = s"ads_$sourceTable"
-    val companyMapping = "company_name_mapping_pro"
+    val odsTable = s"${space}.ods_$sourceTable"
+    val adsListTable = s"${space}.ads_${sourceTable}_list"
+    val adsTable = s"${space}.ads_$sourceTable"
+    val companyMapping = s"${space}.company_name_mapping_pro_v2"
+    prepareFunctions(spark)
+
     val ds = BaseUtil.getPartion(odsTable, spark)
+
+    val (split_cols,delimiter) = tabMapping.getOrElse(sourceTable,("cids",";"))
+
     //table字段
     val columns: Seq[String] = spark.table(odsTable).schema.map(_.name).filter(!_.equals("ds"))
+    val list_columns: Seq[String] = spark.table(adsListTable).schema.map(_.name).filter(!_.equals("ds"))
     val disCol = cols
-
-    sql(s"select * from $odsTable where ds = $ds and cids is not null and trim(cids) <> '' ")
-      .dropDuplicates(disCol)
-      .createOrReplaceTempView("t1")
-
-    sql(s"CACHE TABLE t1")
+    val cols_md5 = disCol.filter(!_.equals("new_cid"))
 
     //拆平新表
     sql(
       s"""
+         |--insert overwrite table ${adsListTable} partition (ds=${ds})
          |SELECT
-         |        rowkey,new_cid,${columns.mkString(",")}
+         |        ${list_columns.mkString(",")}
          |FROM    (
          |        SELECT
          |                *
-         |                ,ROW_NUMBER() OVER (PARTITION BY id,new_cid ORDER BY - ABS(CAST(new_cid AS BIGINT )- CAST(cid AS BIGINT )) DESC ) num
-         |                ,CONCAT_WS('_',new_cid,id) AS rowkey
+         |                ,ROW_NUMBER() OVER (PARTITION BY ${disCol.mkString(",")} ORDER BY - ABS(CAST(new_cid AS BIGINT )- CAST(cid AS BIGINT )) DESC ) num
+         |                ,CONCAT_WS('_',new_cid,md5(cleanup(CONCAT_WS('',${cols_md5.mkString(",")})))) AS rowkey
+         |                ,cleanup(CONCAT_WS('',${cols_md5.mkString(",")})) AS cols
          |        FROM    (
          |                SELECT
          |                        c.*
@@ -52,39 +60,44 @@ case class CompanyForCidsUtils(s: SparkSession, sourceTable: String, cols: Seq[S
          |                FROM    (
          |                        SELECT
          |                                *
-         |                        FROM    t1 a
-         |                        LATERAL VIEW explode(split(cids,';')) b AS cid
+         |                        FROM    $odsTable a
+         |                        LATERAL VIEW explode(split($split_cols,'$delimiter')) b AS cid
+         |                        WHERE   a.ds = $ds
+         |                        AND $split_cols is not null
+         |                        AND trim($split_cols) <> ''
          |                        ) c
          |                LEFT JOIN $companyMapping d
          |                ON      c.cid = d.cid
          |                ) e
          |        ) f
-         |WHERE   num =1
+         |WHERE   num =1  AND cols is not null AND trim(cols) <> ''
          |""".stripMargin)
       .createOrReplaceTempView(s"t2")
 
     //聚合新cids
     val df1 = sql(
       s"""
+         |insert overwrite table ${adsTable} partition (ds=${ds})
          |SELECT
-         |x.new_cids,${columns.mkString(",")}
-         |FROM    t1
+         |md5(cleanup(CONCAT_WS('',${cols_md5.mkString(",")}))) AS rowkey
+         |,x.new_cids
+         |,${columns.mkString(",")}
+         |FROM    $odsTable y
          |LEFT JOIN (
          |              SELECT  id as new_id
          |                      ,concat_ws(';',collect_set(new_cid)) new_cids
-         |              FROM    t2
+         |              FROM    $adsListTable t
+         |              WHERE   ds = $ds
          |              GROUP BY id
          |          ) x
-         |ON      t1.id = x.new_id
-         |""".stripMargin)
-
-    df1.createOrReplaceTempView("t3")
-
-    sql(s"select rowkey,new_cid,${columns.mkString(",")} from t2").show(10)
-    sql(s"select new_cids,${columns.mkString(",")} from t3").show(10)
+         |ON      y.id = x.new_id
+         |WHERE   y.ds = 20200604
+         |AND     $split_cols IS NOT NULL
+         |AND     trim($split_cols) <> ''
+         |""".stripMargin).createOrReplaceTempView("t3")
 
     //写表
-    sql(s"insert overwrite table ${adsListTable} partition (ds=${ds}) select rowkey,new_cid,${columns.mkString(",")} from t2")
+//    sql(s"insert overwrite table ${adsListTable} partition (ds=${ds}) select rowkey,new_cid,${columns.mkString(",")} from t2")
     sql(s"insert overwrite table ${adsTable} partition (ds=${ds}) select new_cids,${columns.mkString(",")} from t3")
     println(s"${this.getClass.getSimpleName} calc end! " + new Date().toString)
   }

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForCidsUtils.scala

@@ -22,7 +22,7 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
   @(transient@getter) val spark: SparkSession = s
 
   val tabMapping =
-    Map("company_court_register" -> ("litigant_cids", ";") //立案信息
+    Map("company_court_open_announcement" -> ("litigant_cids", ";") //开庭公告
     )
 
   def calc(): Unit = {