Pārlūkot izejas kodu

限制高消费维度

晏永年 4 gadi atpakaļ
vecāks
revīzija
f483c3bb4d

+ 2 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/chance/ChangeExtract.scala

@@ -311,6 +311,8 @@ object ChangeExtract {
     , Args(tableName = "company_tax_contravention", primaryFields = "case_type")//税收违法
     , Args(tableName = "company_send_announcement_list", primaryFields = "title")//送达公告
     , Args(tableName = "company_annual_report_out_guarantee", primaryFields = "id")//年报-对外担保
+    , Args(tableName = "company_zxr_restrict", primaryFields = "status")//限制消费令,发现最新状态
+
     , Args(tableName = "company_zxr_final_case", primaryFields = "identity_num")//终本案件
     , Args(tableName = "company_license_creditchina", primaryFields = "licence_content")//行政许可-信用中国
     , Args(tableName = "company_license_entpub", primaryFields = "license_name")//行政许可-企业公示

+ 28 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_zxr_restrict.scala

@@ -0,0 +1,28 @@
+
+package com.winhc.bigdata.spark.jobs.chance.table
+
+import com.winhc.bigdata.spark.jobs.chance.CompanyChangeHandle
+import com.winhc.bigdata.spark.utils.ChangeExtractUtils
+
+/**
+ * @Author: Yan Yongnian
+ * @Date: 2020/8/14
+ * @Description:
+ */
+
+
+//限制消费令
+
+case class company_zxr_restrict(equCols: Seq[String]) extends CompanyChangeHandle with Serializable {
+
+  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = {
+    val str = ChangeExtractUtils.getTags(newMap, "被限制高消费", Array("name", "identity_num", "court_name", "case_create_time", "case_no"))
+    str
+  }
+
+  override def getBizTime(newMap: Map[String, String]): String = newMap("case_create_time")
+
+  override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("case_no"), s"${newMap("title")}限制消费令发生变更")
+
+  override def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("case_no"), s"新增${newMap("title")}限制消费令")
+}

+ 2 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamic.scala

@@ -199,6 +199,8 @@ object CompanyDynamic {
     , Args(tableName = "company_dishonest_info", bName = 1)
     , Args(tableName = "company_send_announcement_list", bName = 1)
     , Args(tableName = "company_annual_report_out_guarantee", bName = 1)
+    , Args(tableName = "company_zxr_restrict", bName = 1)
+
     , Args(tableName = "company_zxr_final_case", bName = 1)//终本案件
     , Args(tableName = "company_license_creditchina", bName = 1)//行政许可-信用中国
     , Args(tableName = "company_license_entpub", bName = 1)//行政许可-企业公示

+ 2 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamicHandle.scala

@@ -46,6 +46,7 @@ trait CompanyDynamicHandle {
     , "company_stock_announcement" -> "company_stock_announcement" //企业公告
     , "company_send_announcement_list" -> "company_send_announcement_list" //送达公告
     , "company_annual_report_out_guarantee" -> "company_annual_report_out_guarantee" //年报-对外担保
+    , "company_zxr_restrict" -> "company_zxr_restrict" //年报-对外担保
 
     , "company_staff" -> "company_staff" //主要成员
     , "company_finance" -> "company_finance" //融资动态
@@ -101,7 +102,7 @@ trait CompanyDynamicHandle {
     , "company_license_creditchina" -> "34-2" // 行政许可-信用中国
     , "company_license_entpub" -> "34-3" // 行政许可-企业公示
     , "company_double_random_check_info" -> "35" // 双随机抽查
-    , "" -> "36" // 限制高消费
+    , "company_zxr_restrict" -> "36" // 限制高消费
     , "" -> "37" // 被执行人
     , "company_send_announcement_list" -> "38" // 送达报告
     , "bankruptcy_open_case" -> "39" // 破产重整

+ 51 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_zxr_restrict.scala

@@ -0,0 +1,51 @@
+package com.winhc.bigdata.spark.jobs.dynamic.tables
+
+import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
+
+/**
+ * @Author yyn
+ * @Date 2020/8/14
+ * @Description TODO
+ */
+//限制消费令
+case class company_zxr_restrict()extends CompanyDynamicHandle {
+  /**
+   * 信息描述
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = new_map("case_no")
+
+  /**
+   * 变更内容
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String = {
+    s"""案号:$new_map("case_no")\n
+       |限消令对象:$new_map("name")\n
+       |立案日期:$new_map("case_create_time")\n
+       |发布日期:$new_map("update_time")\n""".stripMargin
+  }
+
+  /**
+   * 变更时间
+   *
+   * @param new_map
+   * @return
+   */
+//  override def get_change_time(new_map: Map[String, String]): String = new_map("biz_date")
+
+  /**
+   * 风险等级
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "高风险"
+}

+ 12 - 6
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForCidWithoutMD5Utils.scala

@@ -17,7 +17,8 @@ import scala.collection.mutable
 case class CompanyIncrForCidWithoutMD5Utils(s: SparkSession,
                                             project: String, //表所在工程名
                                             tableName: String, //表名(不加前后辍)
-                                            dupliCols: Seq[String] // 去重列
+                                            dupliCols: Seq[String], // 去重列
+                                            updateCol: String = "update_time" //ROW_NUMBER窗口函数的ORDER BY字段,默认(可以不传参数)为update_time
                                  ) extends LoggingUtils with CompanyMapping{
   @(transient@getter) val spark: SparkSession = s
 
@@ -100,7 +101,7 @@ case class CompanyIncrForCidWithoutMD5Utils(s: SparkSession,
          |                    ,new_cid
          |                    ,cid
          |                    ,${columns.mkString(",")}
-         |                    ,ROW_NUMBER() OVER (PARTITION BY cleanup(CONCAT_WS('',${dupliCols.mkString(",")})) ORDER BY update_time DESC ) num
+         |                    ,ROW_NUMBER() OVER (PARTITION BY cleanup(CONCAT_WS('',${dupliCols.mkString(",")})) ORDER BY NVL($updateCol,update_time) DESC ) num
          |            FROM    (
          |                        SELECT  "0" AS flag
          |                                ,a.new_cid
@@ -154,7 +155,7 @@ object CompanyIncrForCidWithoutMD5Utils {
 
   def main(args: Array[String]): Unit = {
 
-    val Array(project, tableName, dupliCols, flag) = args
+    val Array(project, tableName, dupliCols, flag, _) = args
     println(
       s"""
          |project: $project
@@ -162,17 +163,22 @@ object CompanyIncrForCidWithoutMD5Utils {
          |dupliCols: $dupliCols
          |flag: $flag
          |""".stripMargin)
-    if (args.length != 4) {
-      println("请输入 project:项目, tableName:表名, dupliCols:去重字段, flag:标识 !!!")
+    if (args.length < 4) {
+      println("请输入 project:项目, tableName:表名, dupliCols:去重字段, flag:标识, [updateCol:排序列]!!!")
       sys.exit(-1)
     }
+    //ROW_NUMBER窗口函数的ORDER BY字段,默认(可以不传参数)为update_time
+    var updateCol: String = "update_time"
+    if(args.length == 5){
+      updateCol = args(4)
+    }
     val config = mutable.Map(
       "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
       "spark.hadoop.odps.spark.local.partition.amt" -> "1"
     )
     val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
     flag match {
-      case "cid" => CompanyIncrForCidWithoutMD5Utils(spark, project, tableName, (dupliCols.split(",").toSeq)).calc()
+      case "cid" => CompanyIncrForCidWithoutMD5Utils(spark, project, tableName, (dupliCols.split(",").toSeq), updateCol).calc()
     }
     spark.stop()
   }