Jelajahi Sumber

添加增量同步不需md5的功能;动产抵押维度

晏永年 4 tahun lalu
induk
melakukan
bbb4f04dd1

+ 1 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/chance/ChangeExtract.scala

@@ -299,6 +299,7 @@ object ChangeExtract {
     , Args(tableName = "company_punishment_info_creditchina", primaryFields = "punish_number")
     , Args(tableName = "bankruptcy_open_case", primaryFields = "case_no", isCopy=false) //破产重整
     , Args(tableName = "company_public_announcement2_list", primaryFields = "applicant_cid,owner_cid,drawer_cid,gather_name_cid,bill_num")//公示催告
+    , Args(tableName = "company_mortgage_info", primaryFields = "reg_num")//动产抵押
 
     , Args(tableName = "company_certificate", primaryFields = "type")
     , Args(tableName = "company_abnormal_info", primaryFields = "remove_reason")

+ 28 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_mortgage_info.scala

@@ -0,0 +1,28 @@
+
+package com.winhc.bigdata.spark.jobs.chance.table
+
+import com.winhc.bigdata.spark.jobs.chance.CompanyChangeHandle
+import com.winhc.bigdata.spark.utils.ChangeExtractUtils
+
+/**
+ * @Author: Yan Yongnian
+ * @Date: 2020/8/10
+ * @Description:
+ */
+
+
+//动产抵押
+
+case class company_mortgage_info(equCols: Seq[String]) extends CompanyChangeHandle with Serializable {
+
+  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = {
+    val str = ChangeExtractUtils.getTags(newMap, "动产抵押", Array("reg_num", "reg_date", "publish_date", "amount", "overview_amount"))
+    str
+  }
+
+  override def getBizTime(newMap: Map[String, String]): String = newMap("publish_date")
+
+  override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("reg_num"), s"${newMap("reg_num")}动产抵押发生变更")
+
+  override def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("reg_num"), s"新增${newMap("reg_num")}动产抵押")
+}

+ 49 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_mortgage_info.scala

@@ -0,0 +1,49 @@
+package com.winhc.bigdata.spark.jobs.dynamic.tables
+
+import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
+
+/**
+ * @Author yyn
+ * @Date 2020/8/4
+ * @Description TODO
+ */
+//动产抵押
+case class company_mortgage_info() extends CompanyDynamicHandle {
+  /**
+   * 信息描述
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = new_map("reg_num")
+
+  /**
+   * 变更内容
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String = {
+    s"""被担保债权数额:$new_map("amount")\n
+       |债务人履行债务期限:$new_map("term")\n""".stripMargin
+  }
+
+  /**
+   * 变更时间
+   *
+   * @param new_map
+   * @return
+   */
+//  override def get_change_time(new_map: Map[String, String]): String = new_map("biz_date")
+
+  /**
+   * 风险等级
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "警示信息"
+}

+ 179 - 0
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForCidWithoutMD5Utils.scala

@@ -0,0 +1,179 @@
+package com.winhc.bigdata.spark.utils
+
+import java.util.Date
+
+import com.winhc.bigdata.spark.udf.CompanyMapping
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.SparkSession
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @Author yyn
+ * @Date 2020/8/10
+ * @Description 增量ods到ads的同步,直接将dupliCols字段用"_"串起来,而不用md5转换
+ */
+case class CompanyIncrForCidWithoutMD5Utils(s: SparkSession,
+                                            project: String, //表所在工程名
+                                            tableName: String, //表名(不加前后辍)
+                                            dupliCols: Seq[String] // 去重列
+                                 ) extends LoggingUtils with CompanyMapping{
+  @(transient@getter) val spark: SparkSession = s
+
+  def calc(): Unit = {
+    println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
+
+    prepareFunctions(spark)
+
+    val inc_ods_company = s"${project}.inc_ods_company" //每日公司基本信息增量
+    val ads_company_tb = s"${project}.ads_${tableName}" //存量ads表
+    val inc_ods_company_tb = s"${project}.inc_ods_$tableName" //增量ods表
+    val inc_ads_company_tb = s"${project}.inc_ads_$tableName" //增量ads表
+
+    //存量表ads最新分区
+    val remainDs = BaseUtil.getPartion(ads_company_tb, spark)
+
+    //增量ads最后一个分区
+    val lastDsIncAds = BaseUtil.getPartion(inc_ads_company_tb, spark)
+
+    val list = sql(s"show partitions $inc_ods_company_tb").collect.toList.map(_.getString(0).split("=")(1))
+    //增量ods第一个分区
+    val firstDsIncOds = list.head
+    //增量ods最后一个分区//落表分区
+    val lastDsIncOds = list.last
+    //执行分区
+    var runDs = ""
+    //第一次run
+    if (StringUtils.isBlank(lastDsIncAds)) {
+      runDs = firstDsIncOds
+    } else { //非第一次分区时间 + 1天
+      runDs = BaseUtil.atDaysAfter(1, lastDsIncAds)
+    }
+
+    val cols_md5 = dupliCols.filter(!_.equals("new_cid"))
+
+    //增量ods和增量ads最后一个分区相等,跳出
+    if (lastDsIncOds.equals(lastDsIncAds)) {
+      println("inc_ods equals inc_ads ds ,please delete last ds !!!")
+      runDs = lastDsIncOds
+      //sys.exit(-1)
+    }
+
+    println(
+      s"""
+         |cols_md5:$cols_md5
+         |remainDs:$remainDs
+         |lastDsIncOds:$lastDsIncOds
+         |lastDsIncAds:$lastDsIncAds
+         |runDs:$runDs
+         |firstDsIncOds:$firstDsIncOds
+         |""".stripMargin)
+
+    //table字段
+    val columns: Seq[String] = spark.table(ads_company_tb).schema.map(_.name).filter(s => {
+      !s.equals("ds") && !s.equals("cid") && !s.equals("new_cid") && !s.equals("rowkey")
+    })
+
+    sql(
+      s"""
+         |SELECT  cid,current_cid as new_cid
+         |FROM    ${inc_ods_company}
+         |WHERE   ds >= ${runDs}
+         |AND     cid IS NOT NULL
+         |AND     current_cid IS NOT NULL
+         |GROUP BY cid,current_cid
+         |""".stripMargin).createOrReplaceTempView("mapping")
+
+
+    sql(
+      s"""
+         |INSERT OVERWRITE TABLE ${inc_ads_company_tb} PARTITION(ds=$lastDsIncOds)
+         |SELECT  rowkey
+         |        ,flag
+         |        ,new_cid
+         |        ,cid
+         |        ,${columns.mkString(",")}
+         |FROM    (
+         |            SELECT  CONCAT_WS('_',new_cid,${cols_md5.mkString(",")}) AS rowkey
+         |                    ,flag
+         |                    ,new_cid
+         |                    ,cid
+         |                    ,${columns.mkString(",")}
+         |                    ,ROW_NUMBER() OVER (PARTITION BY cleanup(CONCAT_WS('',${dupliCols.mkString(",")})) ORDER BY update_time DESC ) num
+         |            FROM    (
+         |                        SELECT  "0" AS flag
+         |                                ,a.new_cid
+         |                                ,b.cid
+         |                                ,${columns.mkString(",")}
+         |                        FROM    mapping a
+         |                        JOIN    (
+         |                                    SELECT  new_cid AS cid
+         |                                            ,${columns.mkString(",")}
+         |                                    FROM    ${ads_company_tb}
+         |                                    WHERE   ds >= ${remainDs}
+         |                                    UNION ALL
+         |                                    SELECT  new_cid AS cid
+         |                                            ,${columns.mkString(",")}
+         |                                    FROM    ${inc_ads_company_tb}
+         |                                    WHERE   ds > ${remainDs} AND  ds <  ${runDs}
+         |                                ) b
+         |                        ON      a.cid = b.cid
+         |                        UNION ALL
+         |                        SELECT  "1" AS flag
+         |                                ,coalesce(b.new_cid,a.cid) new_cid
+         |                                ,CAST(a.cid AS STRING) AS cid
+         |                                ,${columns.mkString(",")}
+         |                        FROM    ${inc_ods_company_tb} a
+         |                        LEFT JOIN mapping b
+         |                        ON      a.cid = b.cid
+         |                        WHERE   a.ds >= ${runDs}
+         |                        AND     a.cid IS NOT NULL
+         |                    ) d
+         |        ) e
+         |WHERE   num = 1
+         |AND     CONCAT_WS('',${cols_md5.mkString(",")}) IS NOT NULL
+         |AND     trim(CONCAT_WS('',${cols_md5.mkString(",")})) <> ''
+         |""".stripMargin)
+
+    val colsTotal = columns ++ Seq("new_cid")
+
+    MaxComputer2Phoenix(
+      spark,
+      colsTotal,
+      inc_ads_company_tb,
+      tableName,
+      lastDsIncOds,
+      s"CONCAT_WS('_',new_cid,${cols_md5.mkString(",")})"
+    ).syn()
+
+    println(s"${this.getClass.getSimpleName} calc end! " + new Date().toString)
+  }
+}
+object CompanyIncrForCidWithoutMD5Utils {
+
+  def main(args: Array[String]): Unit = {
+
+    val Array(project, tableName, dupliCols, flag) = args
+    println(
+      s"""
+         |project: $project
+         |tableName: $tableName
+         |dupliCols: $dupliCols
+         |flag: $flag
+         |""".stripMargin)
+    if (args.length != 4) {
+      println("请输入 project:项目, tableName:表名, dupliCols:去重字段, flag:标识 !!!")
+      sys.exit(-1)
+    }
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    flag match {
+      case "cid" => CompanyIncrForCidWithoutMD5Utils(spark, project, tableName, (dupliCols.split(",").toSeq)).calc()
+    }
+    spark.stop()
+  }
+}