Ver código fonte

Merge remote-tracking branch 'origin/master'

xufei 4 anos atrás
pai
commit
e263f83026

+ 66 - 30
src/main/scala/com/winhc/bigdata/spark/jobs/chance/ChangeExtract.scala

@@ -3,7 +3,6 @@ package com.winhc.bigdata.spark.jobs.chance
 import com.winhc.bigdata.spark.config.EsConfig
 import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
 import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, ReflectUtils, SparkUtils}
-import org.apache.commons.lang3.StringUtils
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.types.{MapType, StringType, StructField, StructType}
@@ -239,17 +238,52 @@ object ChangeExtract {
 
 
   // winhc_eci_dev company cid 20200630 legal_entity_id,reg_location,business_scope,reg_status,reg_capital,emails,phones
+
+
+  private val startArgs = Seq(
+    Args(tableName = "company_tm", primaryFields = "status_new")
+    , Args(tableName = "company_patent_list", primaryFields = "lprs")
+    , Args(tableName = "company_copyright_works_list", primaryFields = "type")
+    , Args(tableName = "company_copyright_reg_list", primaryFields = "version")
+    , Args(tableName = "company_land_publicity", primaryFields = "title,location,use_for")
+    , Args(tableName = "company_land_announcement", primaryFields = "e_number,project_name")
+    , Args(tableName = "company_bid_list", primaryFields = "title")
+    , Args(tableName = "company_land_transfer", primaryFields = "num,location")
+    , Args(tableName = "company_employment", primaryFields = "source")
+    , Args(tableName = "company_env_punishment", primaryFields = "punish_number")
+    , Args(tableName = "company_icp", primaryFields = "domain")
+    , Args(tableName = "company_punishment_info", primaryFields = "punish_number")
+    , Args(tableName = "company_punishment_info_creditchina", primaryFields = "punish_number")
+    , Args(tableName = "bankruptcy_open_case", primaryFields = "case_no")//破产重整
+
+    , Args(tableName = "company_certificate", primaryFields = "type")
+    , Args(tableName = "company_abnormal_info", primaryFields = "remove_reason")
+
+    , Args(tableName = "company_own_tax", primaryFields = "tax_balance,tax_category,tax_num")
+
+    , Args(tableName = "company_equity_info", primaryKey = "id", primaryFields = "reg_number", isCopy = false)
+    //    , Args(tableName = "company_staff", primaryFields = "staff_type")
+
+  )
+
+  private case class Args(project: String = "winhc_eci_dev"
+                          , tableName: String
+                          , primaryKey: String = "rowkey"
+                          , primaryFields: String
+                          , isCopy: Boolean = true)
+
   def main(args: Array[String]): Unit = {
-    if (args.length >= 5 && args.length <= 6) {
-      val Array(project, tableName, rowkey, inc_ds, pf, isCopy) = if (args.length == 6) args else args :+ "true"
+    if (args.length == 2) {
+      val Array(tableName, inc_ds) = args
+
+      val e = startArgs.filter(_.tableName.equals(tableName)).head
       val config = EsConfig.getEsConfigMap ++ mutable.Map(
-        "spark.hadoop.odps.project.name" -> project,
+        "spark.hadoop.odps.project.name" -> e.project,
         "spark.hadoop.odps.spark.local.partition.amt" -> "10"
       )
       val spark = SparkUtils.InitEnv("ChangeExtract", config)
 
-
-      ChangeExtractHandle(spark, project, tableName, rowkey, inc_ds, pf.split(",")).calc(isCopy.toBoolean)
+      ChangeExtractHandle(spark, e.project, tableName, e.primaryKey, inc_ds, e.primaryFields.split(",")).calc(e.isCopy)
       spark.stop()
     } else {
       val ds = args(0)
@@ -259,30 +293,32 @@ object ChangeExtract {
         "spark.hadoop.odps.spark.local.partition.amt" -> "10"
       )
       val spark = SparkUtils.InitEnv("ChangeExtract", config)
-      val rows =
-        """winhc_eci_dev company_tm rowkey 20200717 status_new
-          |winhc_eci_dev company_patent_list rowkey 20200717 lprs
-          |winhc_eci_dev company_copyright_works_list rowkey 20200717 type
-          |winhc_eci_dev company_copyright_reg_list rowkey 20200717 version
-          |winhc_eci_dev company_land_publicity rowkey 20200717 title,location,use_for
-          |winhc_eci_dev company_land_announcement rowkey 20200717 e_number,project_name
-          |winhc_eci_dev company_land_transfer rowkey 20200717 num,location
-          |winhc_eci_dev company_land_mortgage rowkey 20200717 land_num,land_aministrative_area
-          |winhc_eci_dev company_bid_list rowkey 20200717 title
-          |winhc_eci_dev company_punishment_info rowkey 20200717 punish_number,reg_number
-          |winhc_eci_dev company_punishment_info_creditchina rowkey 20200717 punish_number
-          |winhc_eci_dev company_employment rowkey 20200717 source
-          |winhc_eci_dev company_env_punishment rowkey 20200717 punish_number
-          |winhc_eci_dev bankruptcy_open_case id 20200717 case_no
-          |winhc_eci_dev company_icp rowkey 20200717 domain
-          |""".stripMargin.replace("20200717", ds)
-      for (r <- rows.split("\r\n")) {
-        if (StringUtils.isNotEmpty(r)) {
-          val as = r.split(" ")
-          val Array(tmp, tableName, rowkey, inc_ds, pf, isCopy) = if (as.length == 6) as else as :+ "true"
-          ChangeExtractHandle(spark, project, tableName, rowkey, inc_ds, pf.split(",")).calc(isCopy.toBoolean)
-        }
-      }
+
+      startArgs.foreach(e => {
+        ChangeExtractHandle(spark, e.project, e.tableName, e.primaryKey, ds, e.primaryFields.split(",")).calc(e.isCopy)
+      })
+
+
+      /* val rows =
+         """winhc_eci_dev company_tm rowkey 20200717 status_new
+           |winhc_eci_dev company_patent_list rowkey 20200717 lprs
+           |winhc_eci_dev company_copyright_works_list rowkey 20200717 type
+           |winhc_eci_dev company_copyright_reg_list rowkey 20200717 version
+           |winhc_eci_dev company_land_publicity rowkey 20200717 title,location,use_for
+           |winhc_eci_dev company_land_announcement rowkey 20200717 e_number,project_name
+           |winhc_eci_dev company_bid_list rowkey 20200717 title
+           |winhc_eci_dev company_land_transfer rowkey 20200717 num,location
+           |winhc_eci_dev company_employment rowkey 20200717 source
+           |winhc_eci_dev company_env_punishment rowkey 20200717 punish_number
+           |winhc_eci_dev company_icp rowkey 20200717 domain
+           |""".stripMargin.replace("20200717", ds)
+       for (r <- rows.split("\r\n")) {
+         if (StringUtils.isNotEmpty(r)) {
+           val as = r.split(" ")
+           val Array(tmp, tableName, rowkey, inc_ds, pf, isCopy) = if (as.length == 6) as else as :+ "true"
+           ChangeExtractHandle(spark, project, tableName, rowkey, inc_ds, pf.split(",")).calc(isCopy.toBoolean)
+         }
+       }*/
       spark.stop()
     }
   }

+ 28 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/bankruptcy_open_case.scala

@@ -0,0 +1,28 @@
+
+package com.winhc.bigdata.spark.jobs.chance.table
+
+import com.winhc.bigdata.spark.jobs.chance.CompanyChangeHandle
+import com.winhc.bigdata.spark.utils.ChangeExtractUtils
+
+/**
+ * @Author: Yan Yongnian
+ * @Date: 2020/8/5
+ * @Description:
+ */
+
+
+//破产公告
+
+case class bankruptcy_open_case(equCols: Seq[String]) extends CompanyChangeHandle with Serializable {
+
+  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = {
+    val str = ChangeExtractUtils.getTags(newMap, "破产重整", Array("case_no", "case_type", "agency_court", "applicant", "respondent", "public_date"))
+    str
+  }
+
+  override def getBizTime(newMap: Map[String, String]): String = newMap("public_date")
+
+  override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("case_no"), s"${newMap("case_no")}破产重整发生变更")
+
+  override def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("case_no"), s"新增${newMap("case_no")}破产重整")
+}

+ 23 - 18
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamic.scala

@@ -67,7 +67,7 @@ object CompanyDynamic {
       }).seq
       val rdd = sql(
         bName match {
-//默认:无需补全cname字段
+          //默认:无需补全cname字段
           case 0 =>
             s"""
                |SELECT  *,null AS cname
@@ -76,7 +76,7 @@ object CompanyDynamic {
                |AND     tn = '$tableName'
                |AND     TYPE in (${types.map("'" + _ + "'").mkString(",")})
                |""".stripMargin
-//需根据cid补全cname字段数据
+          //需根据cid补全cname字段数据
           case 1 =>
             s"""
                |SELECT A.*,B.cname AS cname
@@ -93,7 +93,7 @@ object CompanyDynamic {
                |) AS B
                |ON A.cid = B.cid
                |""".stripMargin
-//既需根据cid补全cname字段数据;还需在data字段中根据其中的cid添加对应cname数据
+          //既需根据cid补全cname字段数据;还需在data字段中根据其中的cid添加对应cname数据
           case 2 =>
             s"""
                |SELECT cid, ${colsExclusiveSome.mkString(",")},old_data, cname, STR_TO_MAP(regexp_replace(concat_ws(',',data),'[{"}]',''),',',':') AS data
@@ -175,23 +175,33 @@ object CompanyDynamic {
     }
   }
 
+  private val startArgs = Seq(
+    Args(tableName = "company_abnormal_info", bName = 0)
+    , Args(tableName = "company_equity_info")
+  )
+
+  private case class Args(project: String = "winhc_eci_dev"
+                          , tableName: String
+                          , bName: Int = 1)
 
   def main(args: Array[String]): Unit = {
-    if (args.length < 3 || args.length > 4) {
+
+
+    if (args.length != 3) {
       println(
         s"""
            |Please enter the legal parameters !
-           |<project> <ds> <tableNames> [cname_tableNames]
+           |<project> <ds> <tableNames>
            |""".stripMargin)
       sys.exit(-99)
     }
 
-    val Array(project, tableName, ds, bName_table) = if (args.length == 4) args else args :+ "0"
+    val Array(project, tableNames, ds) = args
 
     println(
       s"""
          |project: $project
-         |tableNames: $tableName
+         |tableNames: $tableNames
          |ds: $ds
          |""".stripMargin)
 
@@ -201,20 +211,15 @@ object CompanyDynamic {
     )
     val spark = SparkUtils.InitEnv("CompanyDynamic", config)
     val cd = CompanyDynamicUtil(spark, project, ds)
-
     cd.init()
-/*
 
-    for (e <- tableName.split(",")) {
-      if (e.length > 2) {
-        cd.calc(e, bName_table.toInt)
-      }
-    }
+    val ts = tableNames.split(",").toSet
 
-*/
-    for (e <- tableName.split(",")) {
-      cd.calc(e, bName_table.toInt)
-    }
+    startArgs.filter(e => {
+      ts.contains(e.tableName)
+    }).foreach(e => {
+      cd.calc(e.tableName, e.bName)
+    })
     spark.stop()
   }
 }

+ 2 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamicHandle.scala

@@ -42,6 +42,7 @@ trait CompanyDynamicHandle {
     , "" -> "investor_equity_change" //大股东变更
     , "" -> "actual_controller_change" //实际控制人变更
     , "" -> "court_notice" //开庭公告
+    , "bankruptcy_open_case" -> "bankruptcy_open_case" //破产重整
   )
 
   private val table_2_info_type = Map(
@@ -88,6 +89,7 @@ trait CompanyDynamicHandle {
     , "" -> "36" // 限制高消费
     , "" -> "37" // 被执行人
     , "" -> "38" // 送达报告
+    , "bankruptcy_open_case" -> "39" // 破产重整
   )
 
   /**

+ 51 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/bankruptcy_open_case.scala

@@ -0,0 +1,51 @@
+package com.winhc.bigdata.spark.jobs.dynamic.tables
+
+import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
+
+/**
+ * @Author yyn
+ * @Date 2020/8/5
+ * @Description TODO
+ */
+//破产公告
+case class bankruptcy_open_case() extends CompanyDynamicHandle {
+  /**
+   * 信息描述
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = new_map("case_no")
+
+  /**
+   * 变更内容
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String = {
+    s"""案号:$new_map("case_no")\n
+       |被申请人:$new_map("respondent")\n
+       |申请人:$new_map("applicant")\n
+       |公开日期:$new_map("public_date")\n""".stripMargin
+  }
+
+  /**
+   * 变更时间
+   *
+   * @param new_map
+   * @return
+   */
+//  override def get_change_time(new_map: Map[String, String]): String = new_map("biz_date")
+
+  /**
+   * 风险等级
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "高风险"
+}