فهرست منبع

fix增量维度同步更换new_cid导致与老数据比较误判为insert数据

晏永年 4 سال پیش
والد
کامیت
09181e597e

+ 36 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/chance/ChangeExtract.scala

@@ -63,6 +63,35 @@ object ChangeExtract {
       val handle = getHandleClazz(tableName, cols)
 
       val update_time = BaseUtil.nowDate()
+
+      val inc_ods_company = s"${project}.inc_ods_company" //每日公司基本信息增量
+      val inc_ods_company_tb = s"${project}.inc_ods_$tableName" //增量ods表
+      //增量ads最后一个分区
+      val lastDsIncAds = BaseUtil.getPartion(s"$project.inc_ads_$tableName", spark)
+
+      val list = sql(s"show partitions $inc_ods_company_tb").collect.toList.map(_.getString(0).split("=")(1))
+      //增量ods第一个分区
+      val firstDsIncOds = list.head
+      //增量ods最后一个分区//落表分区
+      val lastDsIncOds = list.last
+      //执行分区
+      var runDs = ""
+      //第一次run
+      if (StringUtils.isBlank(lastDsIncAds)) {
+        runDs = firstDsIncOds
+      } else { //非第一次分区时间 + 1天
+        runDs = BaseUtil.atDaysAfter(1, lastDsIncAds)
+      }
+      sql(
+        s"""
+           |SELECT  cid,current_cid as new_cid
+           |FROM    ${inc_ods_company}
+           |WHERE   ds >= ${runDs}
+           |AND     cid IS NOT NULL
+           |AND     current_cid IS NOT NULL
+           |GROUP BY cid,current_cid
+           |""".stripMargin).createOrReplaceTempView("mapping")
+
       val rdd = sql(
         s"""
            |SELECT  $primaryKey,${otherAllCols.mkString(",")},'0' as change_flag
@@ -76,7 +105,11 @@ object ChangeExtract {
            |            WHERE   ds = $ds
            |        ) AS t1
            |JOIN    (
-           |             SELECT  tmp.*
+           |             SELECT  concat_ws('_',coalesce(mm.new_cid,tmp.cid),split(rowkey, '_')[1]) AS rowkey
+           |                     ,${intersectCols.filter(s => {!s.equals("rowkey") && !s.equals("cid") && !s.equals("new_cid")}).mkString(",")}
+           |                     ,coalesce(mm.new_cid,tmp.cid) AS new_cid
+           |                     ,tmp.cid
+           |                     ,c
            |             FROM    (
            |                         SELECT  a.*
            |                                 ,row_number() OVER (PARTITION BY a.${primaryKey} ORDER BY update_time DESC) c
@@ -90,6 +123,8 @@ object ChangeExtract {
            |                                     WHERE   ds > $lastDs_ads_all and ds < $ds
            |                                 ) AS a
            |                     ) AS tmp
+           |             LEFT JOIN mapping mm
+           |             ON tmp.cid = mm.cid
            |             WHERE   tmp.c = 1
            |        ) AS t2
            |ON      t1.${primaryKey} = t2.${primaryKey}

+ 46 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_bid_list.scala

@@ -0,0 +1,46 @@
+package com.winhc.bigdata.spark.jobs.dynamic.tables
+
+import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
+
+/**
+ * @Author yyn
+ * @Date 2020/7/28
+ * @Description TODO
+ */
+case class company_bid_list() extends CompanyDynamicHandle{
+  /**
+   * 信息描述
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = new_map("title")
+
+  /**
+   * 变更内容
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override def get_change_content(old_map: Map[String, String], new_map: Map[String, String]): String = new_map("abs")
+
+  /**
+   * 变更时间
+   *
+   * @param new_map
+   * @return
+   */
+  //  override def get_change_time(new_map: Map[String, String]): String = new_map("biz_date")
+
+  /**
+   * 风险等级
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "提示"
+
+}

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_env_punishment.scala

@@ -42,5 +42,5 @@ case class company_env_punishment()extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "警示信息"
+  override def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "警示"
 }