Procházet zdrojové kódy

feat: 企业变更加入存量增量标识位

许家凯 před 3 roky
rodič
revize
e9320da251
27 změnil soubory, kde provedl 61 přidání a 26 odebrání
  1. 1 1
      src/main/scala/com/winhc/bigdata/spark/ng/change/NgChangeExtract.scala
  2. 4 1
      src/main/scala/com/winhc/bigdata/spark/ng/change/NgCompanyChangeHandle.scala
  3. 1 1
      src/main/scala/com/winhc/bigdata/spark/ng/change/table/auction_tracking.scala
  4. 1 1
      src/main/scala/com/winhc/bigdata/spark/ng/change/table/company.scala
  5. 1 1
      src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_abnormal_info.scala
  6. 1 1
      src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_check_info.scala
  7. 1 1
      src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_court_announcement.scala
  8. 1 1
      src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_court_open_announcement.scala
  9. 1 1
      src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_court_register.scala
  10. 1 1
      src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_double_random_check_info.scala
  11. 1 1
      src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_env_punishment.scala
  12. 1 1
      src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_finance.scala
  13. 1 1
      src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_holder.scala
  14. 1 1
      src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_icp.scala
  15. 1 1
      src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_illegal_info.scala
  16. 1 1
      src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_license.scala
  17. 1 1
      src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_license_creditchina.scala
  18. 1 1
      src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_license_entpub.scala
  19. 1 1
      src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_own_tax.scala
  20. 1 1
      src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_punishment_info.scala
  21. 1 1
      src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_punishment_info_creditchina.scala
  22. 1 1
      src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_send_announcement.scala
  23. 1 1
      src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_staff.scala
  24. 1 1
      src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_tax_contravention.scala
  25. 1 1
      src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_tm.scala
  26. 1 1
      src/main/scala/com/winhc/bigdata/spark/ng/change/table/zxr_evaluate_results.scala
  27. 32 0
      src/main/scala/com/winhc/bigdata/spark/utils/LoggingUtils.scala

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/ng/change/NgChangeExtract.scala

@@ -87,7 +87,7 @@ case class NgChangeExtract(s: SparkSession,
 
     val lastDs_ads_all = getLastPartitionsOrElse(s"$project.ads_$tableName", "0")
 
-    val handle = ReflectUtils.getClazz[NgCompanyChangeHandle](s"com.winhc.bigdata.spark.ng.change.table.$tableName1", cols)
+    val handle = ReflectUtils.getClazz[NgCompanyChangeHandle](s"com.winhc.bigdata.spark.ng.change.table.$tableName1", cols,true)
 
 
     val df = sql(

+ 4 - 1
src/main/scala/com/winhc/bigdata/spark/ng/change/NgCompanyChangeHandle.scala

@@ -16,6 +16,9 @@ trait NgCompanyChangeHandle extends Serializable with Logging {
   @getter
   @setter
   protected val equCols: Seq[String]
+  @getter
+  @setter
+  protected val is_inc: Boolean //false 为存量
 
   /**
    * 主入口函数
@@ -56,7 +59,7 @@ trait NgCompanyChangeHandle extends Serializable with Logging {
 
         case "00" => dynamic_type = NgCompanyUpdateType.Update
 
-        case "01"|"91" => dynamic_type = NgCompanyUpdateType.Deleted
+        case "01" | "91" => dynamic_type = NgCompanyUpdateType.Deleted
 
         case "09" | "19" => dynamic_type = NgCompanyUpdateType.Remove
 

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/ng/change/table/auction_tracking.scala

@@ -6,7 +6,7 @@ import com.winhc.bigdata.spark.ng.change.NgCompanyChangeHandle
 import com.winhc.bigdata.spark.utils.DateUtils
 
 
-case class auction_tracking(equCols: Seq[String]) extends NgCompanyChangeHandle {
+case class auction_tracking(equCols: Seq[String],is_inc:Boolean) extends NgCompanyChangeHandle {
 
 
   override protected def getBizDate(newMap: Map[String, String]): String = DateUtils.getBizDate(newMap("update_time"))

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/ng/change/table/company.scala

@@ -9,7 +9,7 @@ import com.winhc.bigdata.spark.ng.change.NgCompanyChangeHandle
  * @Description:公司基本信息
  */
 
-case class company(equCols: Seq[String]) extends NgCompanyChangeHandle with Serializable {
+case class company(equCols: Seq[String],is_inc:Boolean) extends NgCompanyChangeHandle with Serializable {
   /**
    * 获取公司company_id,默认为rowkey前半段
    *

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_abnormal_info.scala

@@ -6,7 +6,7 @@ import com.winhc.bigdata.spark.ng.change.NgCompanyChangeHandle
 import com.winhc.bigdata.spark.utils.DateUtils
 
 
-case class company_abnormal_info(equCols: Seq[String]) extends NgCompanyChangeHandle {
+case class company_abnormal_info(equCols: Seq[String],is_inc:Boolean) extends NgCompanyChangeHandle {
 
 
   override protected def getBizDate(newMap: Map[String, String]): String = DateUtils.getBizDate(newMap("update_time"))

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_check_info.scala

@@ -6,7 +6,7 @@ import com.winhc.bigdata.spark.ng.change.NgCompanyChangeHandle
 import com.winhc.bigdata.spark.utils.DateUtils
 
 
-case class company_check_info(equCols: Seq[String]) extends NgCompanyChangeHandle {
+case class company_check_info(equCols: Seq[String],is_inc:Boolean) extends NgCompanyChangeHandle {
 
 
   override protected def getBizDate(newMap: Map[String, String]): String = DateUtils.getBizDate(newMap("check_date"))

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_court_announcement.scala

@@ -6,7 +6,7 @@ import com.winhc.bigdata.spark.ng.change.NgCompanyChangeHandle
 import com.winhc.bigdata.spark.utils.DateUtils
 
 
-case class company_court_announcement(equCols: Seq[String]) extends NgCompanyChangeHandle {
+case class company_court_announcement(equCols: Seq[String],is_inc:Boolean) extends NgCompanyChangeHandle {
 
 
   override protected def getBizDate(newMap: Map[String, String]): String = DateUtils.getBizDate(newMap("update_time"))

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_court_open_announcement.scala

@@ -6,7 +6,7 @@ import com.winhc.bigdata.spark.ng.change.NgCompanyChangeHandle
 import com.winhc.bigdata.spark.utils.DateUtils
 
 
-case class company_court_open_announcement(equCols: Seq[String]) extends NgCompanyChangeHandle {
+case class company_court_open_announcement(equCols: Seq[String],is_inc:Boolean) extends NgCompanyChangeHandle {
 
 
   override protected def getBizDate(newMap: Map[String, String]): String = DateUtils.getBizDate(newMap("update_time"))

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_court_register.scala

@@ -6,7 +6,7 @@ import com.winhc.bigdata.spark.ng.change.NgCompanyChangeHandle
 import com.winhc.bigdata.spark.utils.DateUtils
 
 
-case class company_court_register(equCols: Seq[String]) extends NgCompanyChangeHandle {
+case class company_court_register(equCols: Seq[String],is_inc:Boolean) extends NgCompanyChangeHandle {
 
 
   override protected def getBizDate(newMap: Map[String, String]): String = DateUtils.getBizDate(newMap("update_time"))

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_double_random_check_info.scala

@@ -6,7 +6,7 @@ import com.winhc.bigdata.spark.ng.change.NgCompanyChangeHandle
 import com.winhc.bigdata.spark.utils.DateUtils
 
 
-case class company_double_random_check_info(equCols: Seq[String]) extends NgCompanyChangeHandle {
+case class company_double_random_check_info(equCols: Seq[String],is_inc:Boolean) extends NgCompanyChangeHandle {
 
 
   override protected def getBizDate(newMap: Map[String, String]): String = DateUtils.getBizDate(newMap("check_date"))

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_env_punishment.scala

@@ -6,7 +6,7 @@ import com.winhc.bigdata.spark.ng.change.NgCompanyChangeHandle
 import com.winhc.bigdata.spark.utils.DateUtils
 
 
-case class company_env_punishment(equCols: Seq[String]) extends NgCompanyChangeHandle {
+case class company_env_punishment(equCols: Seq[String],is_inc:Boolean) extends NgCompanyChangeHandle {
 
 
   override protected def getBizDate(newMap: Map[String, String]): String = DateUtils.getBizDate(newMap("update_time"))

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_finance.scala

@@ -6,7 +6,7 @@ import com.winhc.bigdata.spark.ng.change.NgCompanyChangeHandle
 import com.winhc.bigdata.spark.utils.DateUtils
 
 
-case class company_finance(equCols: Seq[String]) extends NgCompanyChangeHandle {
+case class company_finance(equCols: Seq[String],is_inc:Boolean) extends NgCompanyChangeHandle {
 
 
   override protected def getBizDate(newMap: Map[String, String]): String = DateUtils.getBizDate(newMap("finance_time"))

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_holder.scala

@@ -9,7 +9,7 @@ import com.winhc.bigdata.spark.ng.change.NgCompanyChangeHandle
  * @Description:股东
  */
 
-case class company_holder(equCols: Seq[String]) extends NgCompanyChangeHandle with Serializable {
+case class company_holder(equCols: Seq[String],is_inc:Boolean) extends NgCompanyChangeHandle with Serializable {
 
   /**
    * 获取变更的业务时间

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_icp.scala

@@ -10,7 +10,7 @@ import com.winhc.bigdata.spark.utils.ChangeExtractUtils
  * @Description:
  */
 //网站
-case class company_icp(equCols: Seq[String]) extends NgCompanyChangeHandle {
+case class company_icp(equCols: Seq[String],is_inc:Boolean) extends NgCompanyChangeHandle {
   /**
    * 获取变更的业务时间
    *

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_illegal_info.scala

@@ -6,7 +6,7 @@ import com.winhc.bigdata.spark.ng.change.NgCompanyChangeHandle
 import com.winhc.bigdata.spark.utils.DateUtils
 
 
-case class company_illegal_info(equCols: Seq[String]) extends NgCompanyChangeHandle {
+case class company_illegal_info(equCols: Seq[String],is_inc:Boolean) extends NgCompanyChangeHandle {
 
 
   override protected def getBizDate(newMap: Map[String, String]): String = DateUtils.getBizDate(newMap("update_time"))

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_license.scala

@@ -6,7 +6,7 @@ import com.winhc.bigdata.spark.ng.change.NgCompanyChangeHandle
 import com.winhc.bigdata.spark.utils.DateUtils
 
 
-case class company_license(equCols: Seq[String]) extends NgCompanyChangeHandle {
+case class company_license(equCols: Seq[String],is_inc:Boolean) extends NgCompanyChangeHandle {
 
 
   override protected def getBizDate(newMap: Map[String, String]): String = DateUtils.getBizDate(newMap("issue_date"))

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_license_creditchina.scala

@@ -6,7 +6,7 @@ import com.winhc.bigdata.spark.ng.change.NgCompanyChangeHandle
 import com.winhc.bigdata.spark.utils.DateUtils
 
 
-case class company_license_creditchina(equCols: Seq[String]) extends NgCompanyChangeHandle {
+case class company_license_creditchina(equCols: Seq[String],is_inc:Boolean) extends NgCompanyChangeHandle {
 
 
   override protected def getBizDate(newMap: Map[String, String]): String = DateUtils.getBizDate(newMap("decision_date"))

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_license_entpub.scala

@@ -6,7 +6,7 @@ import com.winhc.bigdata.spark.ng.change.NgCompanyChangeHandle
 import com.winhc.bigdata.spark.utils.DateUtils
 
 
-case class company_license_entpub(equCols: Seq[String]) extends NgCompanyChangeHandle {
+case class company_license_entpub(equCols: Seq[String],is_inc:Boolean) extends NgCompanyChangeHandle {
 
 
   override protected def getBizDate(newMap: Map[String, String]): String = DateUtils.getBizDate(newMap("start_date"))

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_own_tax.scala

@@ -6,7 +6,7 @@ import com.winhc.bigdata.spark.ng.change.NgCompanyChangeHandle
 import com.winhc.bigdata.spark.utils.DateUtils
 
 
-case class company_own_tax(equCols: Seq[String]) extends NgCompanyChangeHandle {
+case class company_own_tax(equCols: Seq[String],is_inc:Boolean) extends NgCompanyChangeHandle {
 
 
   override protected def getBizDate(newMap: Map[String, String]): String = DateUtils.getBizDate(newMap("update_time"))

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_punishment_info.scala

@@ -6,7 +6,7 @@ import com.winhc.bigdata.spark.ng.change.NgCompanyChangeHandle
 import com.winhc.bigdata.spark.utils.DateUtils
 
 
-case class company_punishment_info(equCols: Seq[String]) extends NgCompanyChangeHandle {
+case class company_punishment_info(equCols: Seq[String],is_inc:Boolean) extends NgCompanyChangeHandle {
 
 
   override protected def getBizDate(newMap: Map[String, String]): String = DateUtils.getBizDate(newMap("update_time"))

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_punishment_info_creditchina.scala

@@ -6,7 +6,7 @@ import com.winhc.bigdata.spark.ng.change.NgCompanyChangeHandle
 import com.winhc.bigdata.spark.utils.DateUtils
 
 
-case class company_punishment_info_creditchina(equCols: Seq[String]) extends NgCompanyChangeHandle {
+case class company_punishment_info_creditchina(equCols: Seq[String],is_inc:Boolean) extends NgCompanyChangeHandle {
 
 
   override protected def getBizDate(newMap: Map[String, String]): String = DateUtils.getBizDate(newMap("update_time"))

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_send_announcement.scala

@@ -6,7 +6,7 @@ import com.winhc.bigdata.spark.ng.change.NgCompanyChangeHandle
 import com.winhc.bigdata.spark.utils.DateUtils
 
 
-case class company_send_announcement(equCols: Seq[String]) extends NgCompanyChangeHandle {
+case class company_send_announcement(equCols: Seq[String],is_inc:Boolean) extends NgCompanyChangeHandle {
 
 
   override protected def getBizDate(newMap: Map[String, String]): String = DateUtils.getBizDate(newMap("update_time"))

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_staff.scala

@@ -9,7 +9,7 @@ import com.winhc.bigdata.spark.ng.change.NgCompanyChangeHandle
  * @Description:主要成员
  */
 
-case class company_staff(equCols: Seq[String]) extends NgCompanyChangeHandle with Serializable {
+case class company_staff(equCols: Seq[String],is_inc:Boolean) extends NgCompanyChangeHandle with Serializable {
 
   /**
    * 获取变更的业务时间

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_tax_contravention.scala

@@ -6,7 +6,7 @@ import com.winhc.bigdata.spark.ng.change.NgCompanyChangeHandle
 import com.winhc.bigdata.spark.utils.DateUtils
 
 
-case class company_tax_contravention(equCols: Seq[String]) extends NgCompanyChangeHandle {
+case class company_tax_contravention(equCols: Seq[String],is_inc:Boolean) extends NgCompanyChangeHandle {
 
 
   override protected def getBizDate(newMap: Map[String, String]): String = DateUtils.getBizDate(newMap("update_time"))

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_tm.scala

@@ -10,7 +10,7 @@ import com.winhc.bigdata.spark.utils.DateUtils
  * @Description:
  */
 //商标
-case class company_tm(equCols: Seq[String]) extends NgCompanyChangeHandle {
+case class company_tm(equCols: Seq[String],is_inc:Boolean) extends NgCompanyChangeHandle {
 
   /**
    * 获取变更的业务时间

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/ng/change/table/zxr_evaluate_results.scala

@@ -6,7 +6,7 @@ import com.winhc.bigdata.spark.ng.change.NgCompanyChangeHandle
 import com.winhc.bigdata.spark.utils.DateUtils
 
 
-case class zxr_evaluate_results(equCols: Seq[String]) extends NgCompanyChangeHandle {
+case class zxr_evaluate_results(equCols: Seq[String],is_inc:Boolean) extends NgCompanyChangeHandle {
 
 
   override protected def getBizDate(newMap: Map[String, String]): String = DateUtils.getBizDate(newMap("update_time"))

+ 32 - 0
src/main/scala/com/winhc/bigdata/spark/utils/LoggingUtils.scala

@@ -189,4 +189,36 @@ trait LoggingUtils extends Logging {
       projectName.substring(0, projectName.length - 4)
     }
   }
+
+
+  def generateAllTabSql(tableName: String, project: String): (String, Seq[String],String) = {
+    val inc_ads_tab = s"$project.inc_ads_$tableName"
+    val ads_tab = s"$project.ads_$tableName"
+    val cols = getColumns(inc_ads_tab).intersect(getColumns(ads_tab))
+
+    val ads_last_ds = getLastPartitionsOrElse(ads_tab, null)
+    if (ads_last_ds == null) {
+      throw new RuntimeException(s"$ads_tab ds is null !")
+    }
+    val p_b = if (cols.contains("rowkey")) "rowkey" else if (cols.contains("company_id")) "company_id" else throw new RuntimeException(s"$ads_tab partition key is null !")
+
+    (
+      s"""
+         |SELECT  ${cols.mkString(",")}
+         |FROM    (
+         |            SELECT  *
+         |                    ,ROW_NUMBER() OVER(PARTITION BY $p_b ORDER BY ds DESC ) AS xjk_num
+         |            FROM    (
+         |                        SELECT  ${cols.mkString(",")}
+         |                        FROM    $ads_tab
+         |                        WHERE   ds = $ads_last_ds
+         |                        UNION ALL
+         |                        SELECT  ${cols.mkString(",")}
+         |                        FROM    $inc_ads_tab
+         |                        WHERE   ds > $ads_last_ds
+         |                    ) AS all_t1
+         |        ) AS all_t2
+         |WHERE   all_t2.xjk_num = 1
+         |""".stripMargin, cols,p_b)
+  }
 }