ソースを参照

feat: 工商信息变更及动态

许家凯 4 年 前
コミット
e5b3a417d4

+ 32 - 0
src/main/scala/com/winhc/bigdata/spark/implicits/BaseHelper.scala

@@ -0,0 +1,32 @@
+package com.winhc.bigdata.spark.implicits
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/7/14 08:45
+ * @Description:
+ */
+object BaseHelper {
+
+  implicit class StringEnhancer(str: String) extends Serializable {
+    private val pattern = "^([0-9]*\\.?[0-9]+)[^0-9\\.]*".r
+
+    import com.winhc.bigdata.spark.implicits.RegexUtils._
+
+
+    def isRegCapital(): Boolean = pattern matches str
+
+
+    def getOrNull(): String = if (str == null) "null" else "\"" + str + "\""
+
+
+    def toRegCapital(): Double = str match {
+      case pattern(v) => {
+        v.toDouble
+      }
+      case _ => {
+        -1
+      }
+    }
+  }
+
+}

+ 16 - 0
src/main/scala/com/winhc/bigdata/spark/implicits/RegexUtils.scala

@@ -0,0 +1,16 @@
+package com.winhc.bigdata.spark.implicits
+
+import scala.util.matching.Regex
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/8/6 18:29
+ * @Description:
+ */
+object RegexUtils {
+
+  implicit class RichRegex(val underlying: Regex) extends AnyVal {
+    def matches(s: String) = if (s == null) false else underlying.pattern.matcher(s).matches
+  }
+
+}

+ 85 - 41
src/main/scala/com/winhc/bigdata/spark/jobs/chance/ChangeExtract.scala

@@ -70,51 +70,94 @@ object ChangeExtract {
           sql(
             s"""
                |SELECT  cid,current_cid as new_cid
-               |FROM    ${project}.inc_ods_company
-               |WHERE   ds > $lastDs_ads_all and ds < $ds
+               |FROM    ${project}.inc_ads_company
+               |WHERE   ds = $ds
                |AND     cid IS NOT NULL
                |AND     current_cid IS NOT NULL
                |GROUP BY cid,current_cid
                |""".stripMargin).createOrReplaceTempView("mapping")
 
           val cid = getColumns(s"$project.ads_$tableName").filter(f => f.equals("cid") || f.equals("new_cid")).max
-          df = sql(
-            s"""
-               |SELECT  $primaryKey,${otherAllCols.mkString(",")},'0' as change_flag
-               |FROM    $project.inc_ads_$tableName
-               |WHERE   ds = $ds
-               |UNION ALL
-               |SELECT  t2.$primaryKey,${otherAllCols.map("t2." + _).mkString(",")},'1' as change_flag
-               |FROM    (
-               |            SELECT  DISTINCT ${primaryKey}
-               |            FROM    $project.inc_ads_$tableName
-               |            WHERE   ds = $ds
-               |        ) AS t1
-               |JOIN    (
-               |             SELECT  concat_ws('_',coalesce(mm.new_cid,tmp.$cid),split(rowkey, '_')[1]) AS rowkey
-               |                     ,${intersectCols.diff(Set("rowkey", "cid", "new_cid")).mkString(",")}
-               |                     ,coalesce(mm.new_cid,tmp.$cid) AS new_cid
-               |                     ,tmp.$cid as cid
-               |                     ,c
-               |             FROM    (
-               |                         SELECT  a.*
-               |                                 ,row_number() OVER (PARTITION BY a.${primaryKey} ORDER BY update_time DESC) c
-               |                         FROM    (
-               |                                     SELECT  ${intersectCols.mkString(",")}
-               |                                     FROM    $project.ads_$tableName
-               |                                     WHERE   ds = $lastDs_ads_all
-               |                                     UNION ALL
-               |                                     SELECT  ${intersectCols.mkString(",")}
-               |                                     FROM    $project.inc_ads_$tableName
-               |                                     WHERE   ds > $lastDs_ads_all and ds < $ds
-               |                                 ) AS a
-               |                     ) AS tmp
-               |             LEFT JOIN mapping mm
-               |             ON tmp.$cid = mm.cid
-               |             WHERE   tmp.c = 1
-               |        ) AS t2
-               |ON      t1.${primaryKey} = t2.${primaryKey}
-               |""".stripMargin)
+
+          primaryKey.equals("rowkey") match {
+            case true => {
+              df = sql(
+                s"""
+                   |SELECT  $primaryKey,${otherAllCols.mkString(",")},'0' as change_flag
+                   |FROM    $project.inc_ads_$tableName
+                   |WHERE   ds = $ds
+                   |UNION ALL
+                   |SELECT  t2.$primaryKey,${otherAllCols.map("t2." + _).mkString(",")},'1' as change_flag
+                   |FROM    (
+                   |            SELECT  DISTINCT ${primaryKey}
+                   |            FROM    $project.inc_ads_$tableName
+                   |            WHERE   ds = $ds
+                   |        ) AS t1
+                   |JOIN    (
+                   |             SELECT  concat_ws('_',coalesce(mm.new_cid,tmp.$cid),split(rowkey, '_')[1]) AS rowkey
+                   |                     ,${intersectCols.diff(Set("rowkey", "cid", "new_cid")).mkString(",")}
+                   |                     ,coalesce(mm.new_cid,tmp.$cid) AS new_cid
+                   |                     ,tmp.$cid as cid
+                   |                     ,c
+                   |             FROM    (
+                   |                         SELECT  a.*
+                   |                                 ,row_number() OVER (PARTITION BY a.${primaryKey} ORDER BY update_time DESC) c
+                   |                         FROM    (
+                   |                                     SELECT  ${intersectCols.mkString(",")}
+                   |                                     FROM    $project.ads_$tableName
+                   |                                     WHERE   ds = $lastDs_ads_all
+                   |                                     UNION ALL
+                   |                                     SELECT  ${intersectCols.mkString(",")}
+                   |                                     FROM    $project.inc_ads_$tableName
+                   |                                     WHERE   ds > $lastDs_ads_all and ds < $ds
+                   |                                 ) AS a
+                   |                     ) AS tmp
+                   |             LEFT JOIN mapping mm
+                   |             ON tmp.$cid = mm.cid
+                   |             WHERE   tmp.c = 1
+                   |        ) AS t2
+                   |ON      t1.${primaryKey} = t2.${primaryKey}
+                   |""".stripMargin)
+            }
+            case false => {
+              df = sql(
+                s"""
+                   |SELECT  $primaryKey,${otherAllCols.mkString(",")},'0' as change_flag
+                   |FROM    $project.inc_ads_$tableName
+                   |WHERE   ds = $ds
+                   |UNION ALL
+                   |SELECT  t2.$primaryKey,${otherAllCols.map("t2." + _).mkString(",")},'1' as change_flag
+                   |FROM    (
+                   |            SELECT  DISTINCT ${primaryKey}
+                   |            FROM    $project.inc_ads_$tableName
+                   |            WHERE   ds = $ds
+                   |        ) AS t1
+                   |JOIN    (
+                   |             SELECT  ${intersectCols.diff(Set("rowkey", cid)).mkString(",")}
+                   |                     ,coalesce(mm.new_cid,tmp.$cid) AS $cid
+                   |             FROM    (
+                   |                         SELECT  a.*
+                   |                                 ,row_number() OVER (PARTITION BY a.${primaryKey} ORDER BY update_time DESC) c
+                   |                         FROM    (
+                   |                                     SELECT  ${intersectCols.mkString(",")}
+                   |                                     FROM    $project.ads_$tableName
+                   |                                     WHERE   ds = $lastDs_ads_all
+                   |                                     UNION ALL
+                   |                                     SELECT  ${intersectCols.mkString(",")}
+                   |                                     FROM    $project.inc_ads_$tableName
+                   |                                     WHERE   ds > $lastDs_ads_all and ds < $ds
+                   |                                 ) AS a
+                   |                     ) AS tmp
+                   |             LEFT JOIN mapping mm
+                   |             ON tmp.$cid = mm.cid
+                   |             WHERE   tmp.c = 1
+                   |        ) AS t2
+                   |ON      t1.${primaryKey} = t2.${primaryKey}
+                   |""".stripMargin)
+            }
+          }
+
+
         }
         case false => {
           df = sql(
@@ -254,7 +297,7 @@ object ChangeExtract {
     , Args(tableName = "company_icp", primaryFields = "domain")
     , Args(tableName = "company_punishment_info", primaryFields = "punish_number")
     , Args(tableName = "company_punishment_info_creditchina", primaryFields = "punish_number")
-    , Args(tableName = "bankruptcy_open_case", primaryFields = "case_no")//破产重整
+    , Args(tableName = "bankruptcy_open_case", primaryFields = "case_no") //破产重整
     , Args(tableName = "company_public_announcement2_list", primaryFields = "applicant_cid,owner_cid,drawer_cid,gather_name_cid,bill_num")//公示催告
 
     , Args(tableName = "company_certificate", primaryFields = "type")
@@ -264,7 +307,8 @@ object ChangeExtract {
 
     , Args(tableName = "company_equity_info", primaryKey = "id", primaryFields = "reg_number", isCopy = false)
     , Args(tableName = "company_staff", primaryFields = "staff_type")
-
+    //公司名称,法人ID:人标识或公司标识,公司类型,注册地址,营业期限终止日期,经营范围,登记机关,企业状态                 ,注册资本,实收资本金额(单位:分),注销日期,注销原因
+    , Args(tableName = "company", primaryKey = "cid", primaryFields = "name,legal_entity_id,company_org_type,reg_location,to_time,business_scope,reg_institute,reg_status,reg_capital,actual_capital_amount,cancel_date,cancel_reason")
   )
 
   private case class Args(project: String = "winhc_eci_dev"

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company.scala

@@ -15,7 +15,7 @@ case class company(equCols: Seq[String]) extends CompanyChangeHandle with Serial
 
   override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = "1"
 
-  override def getBizTime(newMap: Map[String, String]): String = "业务时间"
+  override def getBizTime(newMap: Map[String, String]): String = newMap("update_time")
 
   override def getUpdateTitle(newMap: Map[String, String]): String = "更新一家公司"
 

+ 1 - 3
src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_abnormal_info.scala

@@ -7,10 +7,8 @@ import com.winhc.bigdata.spark.utils.ChangeExtractUtils
 /**
  * @Author: XuJiakai
  * @Date: 2020/7/9 16:44
- * @Description:
+ * @Description: 经营异常
  */
-
-
 case class company_abnormal_info(equCols: Seq[String]) extends CompanyChangeHandle {
   override def getUpdateTitle(newMap: Map[String, String]): String = "经营异常发生变更"
 

+ 1 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamic.scala

@@ -179,6 +179,7 @@ object CompanyDynamic {
     Args(tableName = "company_abnormal_info", bName = 0)
     , Args(tableName = "company_equity_info")
     , Args(tableName = "company_staff", bName = 0)
+    , Args(tableName = "company", bName = 0)
   )
 
   private case class Args(project: String = "winhc_eci_dev"

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamicHandle.scala

@@ -9,7 +9,7 @@ trait CompanyDynamicHandle {
 
   private val table_2_sub_info_type_map = Map(
     "CompanyDynamicHandleTest" -> "MyTest"
-    , "" -> "eci_detail" //工商信息
+    , "company" -> "eci_detail" //工商信息
     , "company_land_publicity" -> "land_publicity" //土地公示
     , "company_land_announcement" -> "land_announcement" //土地购买
     , "company_land_transfer" -> "land_transfer" //土地转让
@@ -49,7 +49,7 @@ trait CompanyDynamicHandle {
 
   private val table_2_info_type = Map(
     "CompanyDynamicHandleTest" -> "0"
-    , "" -> "1" //工商信息
+    , "company" -> "1" //工商信息
     , "" -> "2" // 企业失信被执
     , "" -> "3" // 企业股东失信被执
     , "company_abnormal_info" -> "4" // 经营异常

+ 112 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company.scala

@@ -0,0 +1,112 @@
+package com.winhc.bigdata.spark.jobs.dynamic.tables
+
+import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
+import com.winhc.bigdata.spark.implicits.BaseHelper._
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/8/6 17:07
+ * @Description:
+ */
+case class company() extends CompanyDynamicHandle {
+
+  private val change_type_map = Map(
+    "name" -> "公司名称"
+    , "legal_entity_id" -> "法人"
+    , "company_org_type" -> "公司类型"
+    , "reg_location" -> "注册地址"
+    , "to_time" -> "营业期限"
+    , "business_scope" -> "经营范围"
+    , "reg_institute" -> "登记机关"
+    , "reg_status" -> "企业状态"
+    , "reg_capital" -> "注册资本"
+    , "actual_capital_amount" -> "实收资本"
+  )
+
+
+  /**
+   * 来源表的变更类型,insert or update
+   *
+   * @return
+   */
+  override def org_type(): Seq[String] = Seq("update")
+
+  private def makeRiskLevelHandle(field: String, old_map: Map[String, String], new_map: Map[String, String]) = field match {
+    case "actual_capital_amount" | "reg_capital" => {
+
+      val old_val = old_map.getOrElse(field, "")
+      val new_val = new_map.getOrElse(field, "")
+      if (old_val.isRegCapital && new_val.isRegCapital) {
+        old_val.toRegCapital().compareTo(new_val.toRegCapital()) match {
+          case 1 => {
+            "3"
+          }
+          case -1 => {
+            "1"
+          }
+          case 0 => {
+            null
+          }
+        }
+      } else "2"
+    }
+    case _ => "2"
+
+  }
+
+  /**
+   *
+   * @param rowkey
+   * @param cid
+   * @param change_fields
+   * @param old_map
+   * @param new_map
+   * @return cid
+   *         cname
+   *         info_type
+   *         rta_desc
+   *         change_content
+   *         change_time
+   *         biz_id
+   *         sub_info_type
+   *         info_risk_level
+   *         winhc_suggest
+   */
+  override def handle(rowkey: String, bizDate: String, cid: String, change_fields: Seq[String], old_map: Map[String, String], new_map: Map[String, String], cname: String, suggestion: String): Seq[(String, String, String, String, String, String, String, String, String, String)] = {
+    change_fields.filter(e => change_type_map.contains(e)).map(e => {
+      val v = makeRiskLevelHandle(e, old_map, new_map)
+      if (v == null) {
+        return null
+      }
+      (
+        rowkey
+        , new_map("name")
+        , super.get_info_type()
+        , s"${change_type_map(e)}发生变更"
+        , s"""{"变更后内容": ${new_map(e).getOrNull()},"变更事项": ${change_type_map(e).getOrNull()},"变更日期": "$bizDate","变更前内容": ${old_map(e).getOrNull()}}"""
+        , bizDate
+        , rowkey
+        , super.get_sub_info_type()
+        , v
+        , "建议"
+      )
+    })
+  }
+
+  /**
+   * 信息描述
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = null
+
+  /**
+   * 变更内容
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = null
+}