浏览代码

添加企业动态

许家凯 4 年之前
父节点
当前提交
5d1246ec18

+ 34 - 0
src/main/scala/com/winhc/bigdata/spark/implicits/MapHelper.scala

@@ -0,0 +1,34 @@
+package com.winhc.bigdata.spark.implicits
+
+import org.apache.commons.lang3.StringUtils
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/7/13 20:03
+ * @Description:
+ */
+object MapHelper {
+
+  implicit class MapEnhancer(map: Map[String, String]) extends Serializable {
+    def toJson(fields: Seq[String]): String = {
+      val content =  fields.map(item => {
+        if (item.contains("->")) {
+          val Array(key, keyAlias) = item.split("->")
+          s"${getValueOrNull(keyAlias)}:${getValueOrNull(map(key))}"
+        } else {
+          s"${getValueOrNull(item)}:${getValueOrNull(map(item))}"
+        }
+      }).mkString(",")
+      s"{$content}"
+    }
+  }
+
+  private def getValueOrNull(value: String): String = {
+    if (StringUtils.isNotBlank(value)) {
+      "\"" + value + "\""
+    } else {
+      "\"\""
+    }
+  }
+
+}

+ 55 - 14
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamic.scala

@@ -1,9 +1,11 @@
 package com.winhc.bigdata.spark.jobs.dynamic
 
+import java.util.Date
+
 import com.winhc.bigdata.spark.config.EsConfig
-import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
 import com.winhc.bigdata.spark.utils.ReflectUtils.getClazz
 import com.winhc.bigdata.spark.utils.{LoggingUtils, SparkUtils}
+import org.apache.commons.lang3.time.DateFormatUtils
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.types.StringType
 import org.apache.spark.sql.{Row, SparkSession}
@@ -25,14 +27,35 @@ object CompanyDynamic {
                                ) extends LoggingUtils with Logging {
     @(transient@getter) val spark: SparkSession = s
 
-    val targetTab = "target_tab"
+    private val env = "env"
+    var cleanFlag = false
+    val targetTab = "xjk_tmp_company_dynamic"
+
+    def init(): Unit = {
+      sql(
+        s"""
+           |CREATE TABLE IF NOT EXISTS ${getEnvProjectName(env, project)}.$targetTab
+           |(
+           |    cid  STRING COMMENT '公司id'
+           |    ,info_type STRING COMMENT '变更分类,大类'
+           |    ,rta_desc STRING COMMENT '变更信息描述,变更标题'
+           |    ,change_content STRING COMMENT '变更内容'
+           |    ,change_time STRING COMMENT '变更时间'
+           |    ,biz_id STRING COMMENT '业务id,数据行id'
+           |    ,sub_info_type STRING COMMENT '变更小类,表名'
+           |    ,info_risk_level STRING COMMENT '变更风险等级'
+           |    ,winhc_suggest STRING COMMENT '提示信息'
+           |    ,create_time STRING COMMENT '创建时间'
+           |)
+           |COMMENT '企业动态输出表'
+           |PARTITIONED BY (ds STRING COMMENT '分区')
+           |LIFECYCLE 30
+           |""".stripMargin)
+    }
 
 
     def calc(): Unit = {
-      val handle = getClazz[ {
-        def org_type(): Seq[String]
-        def handle(rowkey: String, cid: String, change_fields: Seq[String], old_map: Map[String, String], new_map: Map[String, String]): (String, String, String, String, String, String, String, String)
-      }](s"com.winhc.bigdata.spark.jobs.dynamic.$tableName")
+      val handle = getClazz[CompanyDynamicHandle](s"com.winhc.bigdata.spark.jobs.dynamic.$tableName")
 
       val types = handle.org_type()
       val rdd = sql(
@@ -48,10 +71,10 @@ object CompanyDynamic {
         val cid = r.getAs[String]("cid")
         val new_data = r.getAs[Map[String, String]]("data")
         val old_data = r.getAs[Map[String, String]]("old_data")
-        val biz_data = r.getAs[Map[String, String]]("biz_data")
+        val biz_time = r.getAs[String]("biz_time")
         val fields = r.getAs[String]("fields")
-        val res = handle.handle(rowkey, cid, fields.split(","), old_data, new_data)
-        Row(cid, res._1, res._2, res._3, res._4, res._5, res._6, res._7, res._8)
+        val res = handle.handle(rowkey, biz_time, cid, fields.split(","), old_data, new_data)
+        Row(cid, res._1, res._2, res._3, res._4, res._5, res._6, res._7, res._8, DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss"))
       })
 
       val schema = getSchema(Map(
@@ -66,9 +89,26 @@ object CompanyDynamic {
         , "winhc_suggest" -> StringType
         , "create_time" -> StringType
       ))
-      spark.createDataFrame(rdd, schema).write
-        .mode(if (isWindows) "append" else "overwrite")
-        .insertInto(targetTab)
+      spark.createDataFrame(rdd, schema)
+        .createOrReplaceTempView("company_dynamic_tmp")
+
+      if (!cleanFlag) {
+        sql(
+          s"""
+             |alter table ${getEnvProjectName(env, project)}.$targetTab drop if exists partition(ds='$ds')
+             |""".stripMargin)
+        cleanFlag = true
+      }
+
+      val cols = getColumns(s"$project.$targetTab").filter(!_.equals("ds"))
+
+      sql(
+        s"""
+           |INSERT INTO TABLE ${getEnvProjectName(env, project)}.$targetTab PARTITION(ds='$ds')
+           |SELECT ${cols.mkString(",")}
+           |FROM
+           |    company_dynamic_tmp
+           |""".stripMargin)
 
     }
 
@@ -83,8 +123,9 @@ object CompanyDynamic {
       "spark.hadoop.odps.spark.local.partition.amt" -> "10"
     )
     val spark = SparkUtils.InitEnv("CompanyDynamic", config)
-    CompanyDynamicUtil(spark, "winhc_eci_dev", "table_name", "ds").calc()
+    val cd = CompanyDynamicUtil(spark, "winhc_eci_dev", "table_name", "ds")
+    cd.init()
+    cd.calc()
     spark.stop()
-
   }
 }

+ 92 - 34
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamicHandle.scala

@@ -7,9 +7,83 @@ package com.winhc.bigdata.spark.jobs.dynamic
  */
 trait CompanyDynamicHandle {
 
-  private val table_2_sub_info_type_map = Map("CompanyDynamicHandleTest" -> "MyTest")
-
-  private val table_2_info_type = Map("CompanyDynamicHandleTest" -> "0")
+  private val table_2_sub_info_type_map = Map(
+    "CompanyDynamicHandleTest" -> "MyTest"
+    , "" -> "eci_detail" //工商信息
+    , "" -> "land_notice" //土地公示
+    , "" -> "land_purchase" //土地购买
+    , "" -> "land_transfer" //土地转让
+    , "" -> "land_mortgage" //土地抵押
+    , "" -> "tender_es" //中标信息ES
+    , "" -> "enterprise_shixin" //失信
+    , "" -> "enterprise_zhixing" //被执
+    , "" -> "shareholder_shixin" //股东失信
+    , "" -> "shareholder_zhixing" //股东被执
+    , "" -> "tender_qichacha" //中标信息企查查
+    , "" -> "eci_exception" //经营异常
+    , "" -> "eci_zscq" //知识产权
+    , "" -> "eci_wenshu" //裁判文书
+    , "" -> "court_announcement" //法院公告
+    , "" -> "" //对外投资
+    , "" -> "eci_administrativepenalty" //行政处罚
+    , "" -> "eci_chattel" //动产抵押
+    , "" -> "env_punishment" //环保处罚
+    , "" -> "judicial_assistance" //股权冻结
+    , "" -> "publish_notice" //公示催告
+    , "" -> "serious_violation" //严重违法
+    , "" -> "simple_cancellation" //简易注销
+    , "" -> "stock_pledge" //股权出质
+    , "" -> "tax_illegal" //税收违法
+    , "" -> "tax_owenotice" //欠税公告
+    , "" -> "judicial" //司法拍卖
+    , "" -> "recruit" //招聘信息
+    , "" -> "liquidation_information" //清算信息
+    , "" -> "investor_equity_change" //大股东变更
+    , "" -> "actual_controller_change" //实际控制人变更
+    , "" -> "court_notice" //开庭公告
+  )
+
+  private val table_2_info_type = Map(
+    "CompanyDynamicHandleTest" -> "0"
+    , "" -> "1" //工商信息
+    , "" -> "2" // 企业失信被执
+    , "" -> "3" // 企业股东失信被执
+    , "" -> "4" // 经营异常
+    , "" -> "5" // 知识产权
+    , "" -> "6" // 裁判文书
+    , "" -> "7" // 法院公告
+    , "" -> "8" // 对外投资
+    , "" -> "9" // 动产抵押
+    , "" -> "10" // 司法拍卖
+    , "" -> "11" // 土地信息
+    , "" -> "12" // 中标信息
+    , "" -> "13" // 招聘信息
+    , "" -> "14" // 行政处罚
+    , "" -> "15" // 公示催告
+    , "" -> "16" // 环保处罚
+    , "" -> "17" // 股权出质
+    , "" -> "18" // 严重违法
+    , "" -> "19" // 简易注销
+    , "" -> "20" // 欠税公告
+    , "" -> "21" // 税收违法
+    , "" -> "22" // 股权冻结
+    , "" -> "23" // 清算信息
+    , "" -> "24" // 大股东变更
+    , "" -> "25" // 实际控制人变更
+    , "" -> "26" // 开庭公告
+    , "" -> "27" // 新闻信息
+    , "" -> "28" // 股东信息
+    , "" -> "29" // 最终受益人
+    , "" -> "30" // 主要成员
+    , "" -> "31" // 融资动态
+    , "" -> "32" // 企业公告
+    , "" -> "33" // 抽查检查
+    , "" -> "34" // 行政许可
+    , "" -> "35" // 双随机抽查
+    , "" -> "36" // 限制高消费
+    , "" -> "37" // 被执行人
+    , "" -> "38" // 送达报告
+  )
 
   /**
    *
@@ -27,11 +101,11 @@ trait CompanyDynamicHandle {
    *         info_risk_level
    *         winhc_suggest
    */
-  def handle(rowkey: String, cid: String, change_fields: Seq[String], old_map: Map[String, String], new_map: Map[String, String]): (String, String, String, String, String, String, String, String) = {
+  def handle(rowkey: String, bizDate: String, cid: String, change_fields: Seq[String], old_map: Map[String, String], new_map: Map[String, String]): (String, String, String, String, String, String, String, String) = {
     (get_info_type()
       , get_rta_desc(old_map, new_map)
       , get_change_content(old_map, new_map)
-      , get_change_time(new_map)
+      , get_change_time(bizDate, new_map)
       , get_biz_id(rowkey)
       , get_sub_info_type()
       , get_info_risk_level(old_map, new_map)
@@ -56,14 +130,14 @@ trait CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String
+  protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String
 
   /**
    * 信息类型,大类
    *
    * @return
    */
-  def get_info_type(): String = table_2_info_type(getClass.getSimpleName)
+  protected def get_info_type(): String = table_2_info_type(getClass.getSimpleName)
 
   /**
    * 变更内容
@@ -72,7 +146,7 @@ trait CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  def get_change_content(old_map: Map[String, String], new_map: Map[String, String]): String
+  protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String]): String
 
   /**
    * 变更时间
@@ -80,7 +154,7 @@ trait CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  def get_change_time(new_map: Map[String, String]): String
+  protected def get_change_time(bizDate: String, new_map: Map[String, String]): String = bizDate
 
   /**
    * 业务id
@@ -88,14 +162,14 @@ trait CompanyDynamicHandle {
    * @param rowkey
    * @return
    */
-  def get_biz_id(rowkey: String): String
+  protected def get_biz_id(rowkey: String): String = rowkey
 
   /**
    * 子信息类型,小类
    *
    * @return
    */
-  def get_sub_info_type(): String = table_2_sub_info_type_map(getClass.getSimpleName)
+  protected def get_sub_info_type(): String = table_2_sub_info_type_map(getClass.getSimpleName)
 
   /**
    * 风险等级
@@ -104,11 +178,12 @@ trait CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String
+  protected def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String
 
 }
 
-case class CompanyDynamicHandleTest() extends CompanyDynamicHandle {
+//经营异常
+case class company_abnormal_info() extends CompanyDynamicHandle {
   /**
    * 信息描述
    *
@@ -116,8 +191,7 @@ case class CompanyDynamicHandleTest() extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = ???
-
+  override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = new_map.getOrElse("put_reason", null)
 
   /**
    * 变更内容
@@ -126,23 +200,7 @@ case class CompanyDynamicHandleTest() extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override def get_change_content(old_map: Map[String, String], new_map: Map[String, String]): String = ???
-
-  /**
-   * 变更时间
-   *
-   * @param new_map
-   * @return
-   */
-  override def get_change_time(new_map: Map[String, String]): String = ???
-
-  /**
-   * 业务id
-   *
-   * @param rowkey
-   * @return
-   */
-  override def get_biz_id(rowkey: String): String = ???
+  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String]): String = ???
 
 
   /**
@@ -152,5 +210,5 @@ case class CompanyDynamicHandleTest() extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = ???
-}
+  override protected def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = ???
+}

+ 4 - 3
src/main/scala/com/winhc/bigdata/spark/utils/ChangeExtractUtils.scala

@@ -1,7 +1,7 @@
 package com.winhc.bigdata.spark.utils
 
 import org.apache.commons.lang3.StringUtils
-
+import com.winhc.bigdata.spark.implicits.MapHelper._
 /**
  * @Author: XuJiakai
  * @Date: 2020/7/7 13:59
@@ -31,6 +31,7 @@ object ChangeExtractUtils {
   }
 
 
+
   private def getValueOrNull(value: String): String = {
     if (StringUtils.isNotBlank(value)) {
       "\"" + value + "\""
@@ -41,7 +42,7 @@ object ChangeExtractUtils {
 
 
   def main(args: Array[String]): Unit = {
-    val name = get_ip_tags("a", null, "b", null)
-    println(name)
+    val map = Map("a"->"b","b"->"c")
+    println(map.toJson(Seq("a->你","b")))
   }
 }

+ 9 - 0
src/main/scala/com/winhc/bigdata/spark/utils/LoggingUtils.scala

@@ -143,7 +143,16 @@ trait LoggingUtils {
   def getColumns(t: String): Seq[String] = {
     spark.table(t).columns.seq
   }
+
   def getSchema(map: Map[String, DataType]): StructType = {
     StructType(map.map(e => StructField(e._1, e._2)).toArray)
   }
+
+  def getEnvProjectName(env: String, projectName: String): String = {
+    if (env.equals("dev")) {
+      env
+    } else {
+      projectName.substring(0, projectName.length - 4)
+    }
+  }
 }