xufei 4 роки тому
батько
коміт
721c97919c

+ 10 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/CalcIncrTotal.scala

@@ -31,8 +31,18 @@ object CalcIncrTotal {
   //winhc_eci_dev company_patent new_cid,pub_number,app_number cids
 
   //winhc_eci_dev company_court_open_announcement new_cid,case_no,plaintiff,defendant cids
+  //winhc_eci_dev company_court_register new_cid,case_no,plaintiff,defendant cids
+  //winhc_eci_dev company_court_announcement new_cid,plaintiff,litigant,publish_date,case_no cids
 
 
+  //  winhc_eci_dev company_check_info new_cid,check_org,check_date cid
+  //  winhc_eci_dev company_tax_contravention new_cid,taxpayer_number,case_info cid
+
+  //  winhc_eci_dev company_double_random_check_info check_task_num,new_cid cid
+  //  winhc_eci_dev company_double_random_check_result_info main_id,check_item,new_cid cid
+
+  //  winhc_eci_dev company_judicial_sale_combine main_id,new_cid cids
+
   def main(args: Array[String]): Unit = {
 
     val Array(project, tableName, dupliCols, flag) = args

+ 2 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyCourtAnnouncement.scala

@@ -136,6 +136,8 @@ case class CompanyCourtAnnouncement(s: SparkSession, project: String, //表所
          |            WHERE   ds = $adsListDs
          |            AND     announcement_type = '起诉状副本及开庭传票'
          |            AND     LENGTH(plaintiff_name) > 4
+         |            AND     plaintiff_name not like '%银行%'
+         |            AND     plaintiff_name not like '%保险%'
          |        ) x
          |WHERE   num = 1 AND publish_date >= '${atMonthsBefore(3)}'
          |""".stripMargin).cache().createOrReplaceTempView("announcement")

+ 4 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyForCid.scala

@@ -20,6 +20,10 @@ object CompanyForCid {
       "ods_company_mortgage_info" -> Seq("reg_date","reg_num","amount", "new_cid") //产品信息
     )
 //  winhc_eci_dev ods_company_own_tax tax_balance,tax_category,tax_num,new_cid
+//  winhc_eci_dev company_check_info check_org,check_date,new_cid
+//  winhc_eci_dev company_tax_contravention taxpayer_number,case_info,new_cid
+//  winhc_eci_dev company_double_random_check_info check_task_num,new_cid
+//  winhc_eci_dev company_double_random_check_result_info main_id,check_item,new_cid
 
   def main(args: Array[String]): Unit = {
     val Array(space, sourceTable, cols) = args

+ 5 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyForCids.scala

@@ -18,7 +18,12 @@ object CompanyForCids {
     )
 
   //  winhc_eci_dev company_court_open_announcement case_no,plaintiff,defendant,new_cid
+  //  winhc_eci_dev company_court_register case_no,plaintiff,defendant,new_cid
   //  winhc_eci_dev company_copyright_reg reg_num,new_cid
+  //  winhc_eci_dev company_court_announcement new_cid,plaintiff,litigant,publish_date,case_no
+  //  winhc_eci_dev company_judicial_sale new_cid,title
+
+  //winhc_eci_dev company_judicial_sale_combine new_cid,main_id
 
   def main(args: Array[String]): Unit = {
     val Array(space, sourceTable, cols) = args

+ 107 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyJudicialSaleCombine.scala

@@ -0,0 +1,107 @@
+package com.winhc.bigdata.spark.jobs
+
+import com.winhc.bigdata.spark.udf.CompanyMapping
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.mutable
+
+/**
+ * @Description: 司法拍卖两表合并
+ * @author π
+ * @date 2020/8/1115:35
+ */
+object CompanyJudicialSaleCombine {
+  def main(args: Array[String]): Unit = {
+    //winhc_eci_dev company_judicial_sale
+    val Array(project, tableName) = args
+    println(
+      s"""
+         |project: $project
+         |tableName: $tableName
+         |""".stripMargin)
+    if (args.length != 2) {
+      println("请输入 project:项目, tableName:表名 !!!")
+      sys.exit(-1)
+    }
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    CompanyJudicialSaleCombine(spark,project,tableName).calc
+    spark.stop()
+  }
+
+}
+case class CompanyJudicialSaleCombine(s: SparkSession,
+                                      project: String, //表所在工程名
+                                      tableName: String //表名(不加前后辍)
+                                     ) extends LoggingUtils with CompanyMapping{
+  override protected val spark: SparkSession = s
+
+  def calc ={
+
+    val inc_ods_v0 = s"${project}.inc_ods_${tableName}_item" //ods源表
+    val inc_ods_v1 = s"${project}.inc_ods_${tableName}_item" //ods源表
+    val inc_ods_v2 = s"${project}.inc_ods_${tableName}_combine" //ods合并表
+    //增量ods最新分区
+    val v0 = BaseUtil.getFirstPartion(inc_ods_v0, spark)
+    val v1 = BaseUtil.getPartion(inc_ods_v1, spark)
+    var v2 = BaseUtil.getPartion(inc_ods_v2, spark)
+    if(StringUtils.isBlank(v2)){
+      v2 = BaseUtil.atDaysAfter(-1, v0)
+    }
+
+    sql(
+      s"""
+        |INSERT OVERWRITE TABLE winhc_eci_dev.inc_ods_company_judicial_sale_combine PARTITION(ds='$v1')
+        |SELECT  a.id
+        |        ,a.main_id
+        |        ,a.cids
+        |        ,a.title AS name
+        |        ,a.initial_price
+        |        ,a.current_price
+        |        ,a.consult_price
+        |        ,a.start_time
+        |        ,a.end_time
+        |        ,a.pic_source_url
+        |        ,a.pic_oss_url
+        |        ,a.create_time
+        |        ,a.update_time
+        |        ,a.deleted
+        |        ,b.title
+        |        ,b.introduction
+        |        ,b.court
+        |        ,b.pub_time
+        |        ,b.end_date
+        |        ,b.content
+        |        ,b.source_tag
+        |        ,b.source_id
+        |FROM    (
+        |            SELECT  *
+        |            FROM    (
+        |                        SELECT  *
+        |                                ,ROW_NUMBER() OVER(PARTITION BY cids,main_id ORDER BY update_time DESC) num1
+        |                        FROM    winhc_eci_dev.inc_ods_company_judicial_sale_item
+        |                        WHERE   ds > $v2 AND ds <= $v1
+        |                        AND     cids IS NOT NULL
+        |                    ) c
+        |            WHERE   num1 = 1
+        |        ) a
+        |LEFT JOIN (
+        |              SELECT  *
+        |              FROM    (
+        |                          SELECT  *
+        |                                  ,ROW_NUMBER() OVER(PARTITION BY cids,title ORDER BY update_time DESC) num2
+        |                          FROM    winhc_eci_dev.inc_ods_company_judicial_sale
+        |                          WHERE   ds > $v2 AND ds <= $v1
+        |                      ) d
+        |              WHERE   num2 = 1
+        |          ) b
+        |ON      a.main_id = b.id
+        |WHERE   b.id IS NOT NULL
+        |""".stripMargin)
+  }
+}

+ 11 - 4
src/main/scala/com/winhc/bigdata/spark/utils/CompanyForCidUtils.scala

@@ -15,12 +15,16 @@ import scala.annotation.meta.getter
 case class CompanyForCidUtils(s: SparkSession, space: String, sourceTable: String, cols: Seq[String]) extends LoggingUtils  with CompanyMapping{
   @(transient@getter) val spark: SparkSession = s
 
+  val rowKeyMapping =
+    Map("company_double_random_check_result_info" -> "new_cid,main_id" //双随机抽查-结果公示
+    )
+
   def calc(): Unit = {
     println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
     prepareFunctions(spark)
-    val odsTable = s"${space}.$sourceTable"
-    val adsTable = s"${space}.ads_${sourceTable.substring(4)}"
-    val companyMapping = s"${space}.company_map"
+    val odsTable = s"${space}.ods_$sourceTable"
+    val adsTable = s"${space}.ads_${sourceTable}"
+    val companyMapping = s"${space}.company_name_mapping_pro_v2"
     val ds = BaseUtil.getPartion(odsTable, spark)
     //table字段
     val columns: Seq[String] = spark.table(odsTable).schema.map(_.name).filter(!_.equals("ds"))
@@ -28,6 +32,9 @@ case class CompanyForCidUtils(s: SparkSession, space: String, sourceTable: Strin
 
     val cols_md5 = disCol.filter(!_.equals("new_cid"))
 
+    //rowkey前缀匹配
+    val rowKeyPre = rowKeyMapping.getOrElse(sourceTable,"new_cid")
+
    val ddl =  spark.table(odsTable).schema.filter(s=>{!"ds".equals(s.name)}).map(s=>{
 
       val name = s.name
@@ -57,7 +64,7 @@ case class CompanyForCidUtils(s: SparkSession, space: String, sourceTable: Strin
          |        SELECT
          |                *
          |                ,ROW_NUMBER() OVER (PARTITION BY ${disCol.mkString(",")} ORDER BY id DESC ) num
-         |                ,CONCAT_WS('_',new_cid,md5(cleanup(CONCAT_WS('',${cols_md5.mkString(",")})))) AS rowkey
+         |                ,CONCAT_WS('_',$rowKeyPre,md5(cleanup(CONCAT_WS('',${cols_md5.mkString(",")})))) AS rowkey
          |                ,cleanup(CONCAT_WS('',${cols_md5.mkString(",")})) AS cols
          |        FROM    (
          |                SELECT

+ 9 - 2
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForCidUtils.scala

@@ -18,6 +18,10 @@ case class CompanyIncrForCidUtils(s: SparkSession,
                                   dupliCols: Seq[String] // 去重列
                                  ) extends LoggingUtils with CompanyMapping{
   @(transient@getter) val spark: SparkSession = s
+ //主键字段
+  val rowKeyMapping =
+    Map("company_double_random_check_result_info" -> "new_cid,main_id" //双随机抽查-结果公示
+    )
 
   def calc(): Unit = {
     println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
@@ -73,6 +77,9 @@ case class CompanyIncrForCidUtils(s: SparkSession,
       !s.equals("ds") && !s.equals("cid") && !s.equals("new_cid") && !s.equals("rowkey")
     })
 
+    //rowkey前缀匹配
+    val rowKeyPre = rowKeyMapping.getOrElse(tableName,"new_cid")
+
     sql(
       s"""
          |SELECT  cid,current_cid as new_cid
@@ -93,7 +100,7 @@ case class CompanyIncrForCidUtils(s: SparkSession,
          |        ,cid
          |        ,${columns.mkString(",")}
          |FROM    (
-         |            SELECT  CONCAT_WS('_',new_cid,md5(cleanup(CONCAT_WS('',${cols_md5.mkString(",")})))) AS rowkey
+         |            SELECT  CONCAT_WS('_',$rowKeyPre,md5(cleanup(CONCAT_WS('',${cols_md5.mkString(",")})))) AS rowkey
          |                    ,flag
          |                    ,new_cid
          |                    ,cid
@@ -142,7 +149,7 @@ case class CompanyIncrForCidUtils(s: SparkSession,
       inc_ads_company_tb,
       tableName,
       lastDsIncOds,
-      s"CONCAT_WS('_',new_cid,md5(cleanup(CONCAT_WS('',${cols_md5.mkString(",")}))))"
+      s"CONCAT_WS('_',$rowKeyPre,md5(cleanup(CONCAT_WS('',${cols_md5.mkString(",")}))))"
     ).syn()
 
     println(s"${this.getClass.getSimpleName} calc end! " + new Date().toString)