Explorar el Código

Merge remote-tracking branch 'origin/master'

xufei hace 4 años
padre
commit
f12788b8b5

+ 34 - 0
src/main/scala/com/winhc/bigdata/spark/implicits/MapHelper.scala

@@ -0,0 +1,34 @@
+package com.winhc.bigdata.spark.implicits
+
+import org.apache.commons.lang3.StringUtils
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/7/13 20:03
+ * @Description:
+ */
+object MapHelper {
+
+  implicit class MapEnhancer(map: Map[String, String]) extends Serializable {
+    def toJson(fields: Seq[String]): String = {
+      val content =  fields.map(item => {
+        if (item.contains("->")) {
+          val Array(key, keyAlias) = item.split("->")
+          s"${getValueOrNull(keyAlias)}:${getValueOrNull(map(key))}"
+        } else {
+          s"${getValueOrNull(item)}:${getValueOrNull(map(item))}"
+        }
+      }).mkString(",")
+      s"{$content}"
+    }
+  }
+
+  private def getValueOrNull(value: String): String = {
+    if (StringUtils.isNotBlank(value)) {
+      "\"" + value + "\""
+    } else {
+      "\"\""
+    }
+  }
+
+}

+ 1 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/chance/ChangeExtract.scala

@@ -171,6 +171,7 @@ object ChangeExtract {
 
   // winhc_eci_dev company_bid_list rowkey 20200717 title
   // winhc_eci_dev company_land_transfer rowkey 20200717 num,location
+  // winhc_eci_dev company_abnormal_info rowkey 20200717 remove_reason
 
 
   // winhc_eci_dev company cid 20200630 legal_entity_id,reg_location,business_scope,reg_status,reg_capital,emails,phones

+ 12 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/chance/CompanyChangeHandle.scala

@@ -199,8 +199,9 @@ case class company_land_transfer(equCols: Seq[String]) extends CompanyChangeHand
 
   override def getBizTime(newMap: Map[String, String]): String = newMap("merchandise_time")
 }
+
 //环保处罚
-case class company_env_punishment(equCols:Seq[String])extends CompanyChangeHandle {
+case class company_env_punishment(equCols: Seq[String]) extends CompanyChangeHandle {
   override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("punish_number"), s"${newMap("title")}环保处罚信息发生变更")
 
   override def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("punish_number"), s"新增${newMap("punish_number")}环保处罚信息")
@@ -209,3 +210,13 @@ case class company_env_punishment(equCols:Seq[String])extends CompanyChangeHandl
 
   override def getBizTime(newMap: Map[String, String]): String = newMap("publish_time")
 }
+
+case class company_abnormal_info(equCols: Seq[String]) extends CompanyChangeHandle {
+  override def getUpdateTitle(newMap: Map[String, String]): String = "经营异常发生变更"
+
+  override def getInsertTitle(newMap: Map[String, String]): String = "新增一条经营异常"
+
+  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap, "经营异常", Array("put_department", "remove_department", "put_reason", "put_date", "remove_date", "remove_reason"))
+
+  override def getBizTime(newMap: Map[String, String]): String = newMap("put_date")
+}

+ 141 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamic.scala

@@ -0,0 +1,141 @@
+package com.winhc.bigdata.spark.jobs.dynamic
+
+import java.util.Date
+
+import com.winhc.bigdata.spark.config.EsConfig
+import com.winhc.bigdata.spark.utils.ReflectUtils.getClazz
+import com.winhc.bigdata.spark.utils.{LoggingUtils, SparkUtils}
+import org.apache.commons.lang3.time.DateFormatUtils
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.types.StringType
+import org.apache.spark.sql.{Row, SparkSession}
+
+import scala.annotation.meta.getter
+import scala.collection.immutable.ListMap
+import scala.collection.mutable
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/7/27 16:52
+ * @Description: 企业动态
+ */
+object CompanyDynamic {
+
+  case class CompanyDynamicUtil(s: SparkSession,
+                                project: String, //表所在工程名
+
+                                ds: String //此维度主键
+                               ) extends LoggingUtils with Logging {
+    @(transient@getter) val spark: SparkSession = s
+
+    private val env = "dev"
+    var cleanFlag = false
+    val targetTab = "xjk_tmp_company_dynamic"
+
+    def init(): Unit = {
+      sql(
+        s"""
+           |CREATE TABLE IF NOT EXISTS ${getEnvProjectName(env, project)}.$targetTab
+           |(
+           |    cid  STRING COMMENT '公司id'
+           |    ,info_type STRING COMMENT '变更分类,大类'
+           |    ,rta_desc STRING COMMENT '变更信息描述,变更标题'
+           |    ,change_content STRING COMMENT '变更内容'
+           |    ,change_time STRING COMMENT '变更时间'
+           |    ,biz_id STRING COMMENT '业务id,数据行id'
+           |    ,sub_info_type STRING COMMENT '变更小类,表名'
+           |    ,info_risk_level STRING COMMENT '变更风险等级'
+           |    ,winhc_suggest STRING COMMENT '提示信息'
+           |    ,create_time STRING COMMENT '创建时间'
+           |)
+           |COMMENT '企业动态输出表'
+           |PARTITIONED BY (ds STRING COMMENT '分区')
+           |LIFECYCLE 30
+           |""".stripMargin)
+    }
+
+    //表名(不加前后辍)
+    def calc(tableName: String): Unit = {
+      val handle = getClazz[CompanyDynamicHandle](s"com.winhc.bigdata.spark.jobs.dynamic.tables.$tableName")
+
+      val types = handle.org_type()
+      val rdd = sql(
+        s"""
+           |SELECT  *
+           |FROM    winhc_eci_dev.ads_change_extract
+           |WHERE   ds = '$ds'
+           |AND     tn = '$tableName'
+           |AND     TYPE in (${types.map("'" + _ + "'").mkString(",")})
+           |""".stripMargin)
+        .rdd.map(r => {
+        val rowkey = r.getAs[String]("rowkey")
+        val cid = r.getAs[String]("cid")
+        val new_data = r.getAs[Map[String, String]]("data")
+        val old_data = r.getAs[Map[String, String]]("old_data")
+        val biz_date = r.getAs[String]("biz_date")
+        val fields = r.getAs[String]("fields")
+        val res = handle.handle(rowkey, biz_date, cid, if (fields == null) null else fields.split(","), old_data, new_data)
+        Row(cid, res._1, res._2, res._3, res._4, res._5, res._6, res._7, res._8, DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss"))
+      })
+
+      val schema = getSchema(ListMap(
+        "cid" -> StringType
+        , "info_type" -> StringType
+        , "rta_desc" -> StringType
+        , "change_content" -> StringType
+        , "change_time" -> StringType
+        , "biz_id" -> StringType
+        , "sub_info_type" -> StringType
+        , "info_risk_level" -> StringType
+        , "winhc_suggest" -> StringType
+        , "create_time" -> StringType
+      ))
+      spark.createDataFrame(rdd, schema)
+        .createOrReplaceTempView("company_dynamic_tmp")
+
+      if (!cleanFlag) {
+        sql(
+          s"""
+             |alter table ${getEnvProjectName(env, project)}.$targetTab drop if exists partition(ds='$ds')
+             |""".stripMargin)
+        cleanFlag = true
+      }
+
+      val cols = getColumns(s"$project.$targetTab").filter(!_.equals("ds"))
+
+      sql(
+        s"""
+           |INSERT INTO TABLE ${getEnvProjectName(env, project)}.$targetTab PARTITION(ds='$ds')
+           |SELECT ${cols.mkString(",")}
+           |FROM
+           |    company_dynamic_tmp
+           |""".stripMargin)
+    }
+  }
+
+
+  def main(args: Array[String]): Unit = {
+    val Array(project, tableName, ds) = args
+
+    println(
+      s"""
+         |project: $project
+         |tableNames: $tableName
+         |ds: $ds
+         |""".stripMargin)
+
+    val config = EsConfig.getEsConfigMap ++ mutable.Map(
+      "spark.hadoop.odps.project.name" -> project,
+      "spark.hadoop.odps.spark.local.partition.amt" -> "10"
+    )
+    val spark = SparkUtils.InitEnv("CompanyDynamic", config)
+    val cd = CompanyDynamicUtil(spark, project, ds)
+
+    cd.init()
+
+    for (e <- tableName.split(",")) {
+      cd.calc(e)
+    }
+    spark.stop()
+  }
+}

+ 184 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamicHandle.scala

@@ -0,0 +1,184 @@
+package com.winhc.bigdata.spark.jobs.dynamic
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/7/27 17:05
+ * @Description:
+ */
+trait CompanyDynamicHandle {
+
+  private val table_2_sub_info_type_map = Map(
+    "CompanyDynamicHandleTest" -> "MyTest"
+    , "" -> "eci_detail" //工商信息
+    , "" -> "land_notice" //土地公示
+    , "" -> "land_purchase" //土地购买
+    , "" -> "land_transfer" //土地转让
+    , "" -> "land_mortgage" //土地抵押
+    , "" -> "tender_es" //中标信息ES
+    , "" -> "enterprise_shixin" //失信
+    , "" -> "enterprise_zhixing" //被执
+    , "" -> "shareholder_shixin" //股东失信
+    , "" -> "shareholder_zhixing" //股东被执
+    , "" -> "tender_qichacha" //中标信息企查查
+    , "company_abnormal_info" -> "eci_exception" //经营异常
+    , "" -> "eci_zscq" //知识产权
+    , "" -> "eci_wenshu" //裁判文书
+    , "" -> "court_announcement" //法院公告
+    , "" -> "" //对外投资
+    , "" -> "eci_administrativepenalty" //行政处罚
+    , "" -> "eci_chattel" //动产抵押
+    , "company_env_punishment" -> "env_punishment" //环保处罚
+    , "" -> "judicial_assistance" //股权冻结
+    , "" -> "publish_notice" //公示催告
+    , "" -> "serious_violation" //严重违法
+    , "" -> "simple_cancellation" //简易注销
+    , "" -> "stock_pledge" //股权出质
+    , "" -> "tax_illegal" //税收违法
+    , "" -> "tax_owenotice" //欠税公告
+    , "" -> "judicial" //司法拍卖
+    , "" -> "recruit" //招聘信息
+    , "" -> "liquidation_information" //清算信息
+    , "" -> "investor_equity_change" //大股东变更
+    , "" -> "actual_controller_change" //实际控制人变更
+    , "" -> "court_notice" //开庭公告
+  )
+
+  private val table_2_info_type = Map(
+    "CompanyDynamicHandleTest" -> "0"
+    , "" -> "1" //工商信息
+    , "" -> "2" // 企业失信被执
+    , "" -> "3" // 企业股东失信被执
+    , "company_abnormal_info" -> "4" // 经营异常
+    , "" -> "5" // 知识产权
+    , "" -> "6" // 裁判文书
+    , "" -> "7" // 法院公告
+    , "" -> "8" // 对外投资
+    , "" -> "9" // 动产抵押
+    , "" -> "10" // 司法拍卖
+    , "" -> "11" // 土地信息
+    , "" -> "12" // 中标信息
+    , "" -> "13" // 招聘信息
+    , "" -> "14" // 行政处罚
+    , "" -> "15" // 公示催告
+    , "company_env_punishment" -> "16" // 环保处罚
+    , "" -> "17" // 股权出质
+    , "" -> "18" // 严重违法
+    , "" -> "19" // 简易注销
+    , "" -> "20" // 欠税公告
+    , "" -> "21" // 税收违法
+    , "" -> "22" // 股权冻结
+    , "" -> "23" // 清算信息
+    , "" -> "24" // 大股东变更
+    , "" -> "25" // 实际控制人变更
+    , "" -> "26" // 开庭公告
+    , "" -> "27" // 新闻信息
+    , "" -> "28" // 股东信息
+    , "" -> "29" // 最终受益人
+    , "" -> "30" // 主要成员
+    , "" -> "31" // 融资动态
+    , "" -> "32" // 企业公告
+    , "" -> "33" // 抽查检查
+    , "" -> "34" // 行政许可
+    , "" -> "35" // 双随机抽查
+    , "" -> "36" // 限制高消费
+    , "" -> "37" // 被执行人
+    , "" -> "38" // 送达报告
+  )
+
+  /**
+   *
+   * @param rowkey
+   * @param cid
+   * @param change_fields
+   * @param old_map
+   * @param new_map
+   * @return info_type
+   *         rta_desc
+   *         change_content
+   *         change_time
+   *         biz_id
+   *         sub_info_type
+   *         info_risk_level
+   *         winhc_suggest
+   */
+  def handle(rowkey: String, bizDate: String, cid: String, change_fields: Seq[String], old_map: Map[String, String], new_map: Map[String, String]): (String, String, String, String, String, String, String, String) = {
+    (get_info_type()
+      , get_rta_desc(old_map, new_map)
+      , get_change_content(old_map, new_map)
+      , get_change_time(bizDate, new_map)
+      , get_biz_id(rowkey)
+      , get_sub_info_type()
+      , get_info_risk_level(old_map, new_map)
+      , "被监控企业流动资金紧张,可能存在经营困难的情况。建议立即与被监控企业书面对账,适当催促其履行债务并持续监控。"
+    )
+
+  }
+
+
+  /**
+   * 来源表的变更类型,insert or update
+   *
+   * @return
+   */
+  def org_type(): Seq[String] = Seq("insert")
+
+
+  /**
+   * 信息描述
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String
+
+  /**
+   * 信息类型,大类
+   *
+   * @return
+   */
+  protected def get_info_type(): String = table_2_info_type(getClass.getSimpleName)
+
+  /**
+   * 变更内容
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String]): String
+
+  /**
+   * 变更时间
+   *
+   * @param new_map
+   * @return
+   */
+  protected def get_change_time(bizDate: String, new_map: Map[String, String]): String = bizDate
+
+  /**
+   * 业务id
+   *
+   * @param rowkey
+   * @return
+   */
+  protected def get_biz_id(rowkey: String): String = rowkey
+
+  /**
+   * 子信息类型,小类
+   *
+   * @return
+   */
+  protected def get_sub_info_type(): String = table_2_sub_info_type_map(getClass.getSimpleName)
+
+  /**
+   * 风险等级
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  protected def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String
+
+}
+

+ 34 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_abnormal_info.scala

@@ -0,0 +1,34 @@
+package com.winhc.bigdata.spark.jobs.dynamic.tables
+
+import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
+import com.winhc.bigdata.spark.implicits.MapHelper._
+//经营异常
+case class company_abnormal_info() extends CompanyDynamicHandle {
+  /**
+   * 信息描述
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = new_map.getOrElse("put_reason", null)
+
+  /**
+   * 变更内容
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String]): String = new_map.toJson(Seq("put_department->做出决定机关","remove_department->移出决定机关","put_reason->列入经营异常目录原因","put_date->列入日期","remove_date->移出日期","remove_reason->移出经营异常目录原因"))
+
+
+  /**
+   * 风险等级
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override protected def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "3"
+}

+ 46 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_env_punishment.scala

@@ -0,0 +1,46 @@
+package com.winhc.bigdata.spark.jobs.dynamic.tables
+
+import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
+
+/**
+ * @Author yyn
+ * @Date 2020/7/27
+ * @Description TODO
+ */
+//环保处罚
+case class company_env_punishment()extends CompanyDynamicHandle {
+  /**
+   * 信息描述
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = new_map("punish_number")
+
+  /**
+   * 变更内容
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override def get_change_content(old_map: Map[String, String], new_map: Map[String, String]): String = new_map("content")
+
+  /**
+   * 变更时间
+   *
+   * @param new_map
+   * @return
+   */
+//  override def get_change_time(new_map: Map[String, String]): String = new_map("biz_date")
+
+  /**
+   * 风险等级
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "警示信息"
+}

+ 10 - 14
src/main/scala/com/winhc/bigdata/spark/utils/ChangeExtractUtils.scala

@@ -1,6 +1,7 @@
 package com.winhc.bigdata.spark.utils
 
 import org.apache.commons.lang3.StringUtils
+import com.winhc.bigdata.spark.implicits.MapHelper._
 
 /**
  * @Author: XuJiakai
@@ -16,18 +17,12 @@ object ChangeExtractUtils {
 
   //获取指定字段集的标签Json
   def getTags(fldMap: Map[String, String], type_val: String, fields: Array[String]): String = {
-    val json: StringBuilder = new StringBuilder(s"""{"type":${getValueOrNull(type_val)},""")
-    fields.foreach(item =>
-      if (item.contains("->")) {
-        val Array(key, keyAlias) = item.split("->")
-        json.append(s"${getValueOrNull(keyAlias)}")
-        json.append(s":${getValueOrNull(fldMap(key))},")
-      } else {
-        json.append(s"${getValueOrNull(item)}")
-        json.append(s":${getValueOrNull(fldMap(item))},")
-      }
-    )
-    json.deleteCharAt(json.lastIndexOf(",")).append("}").toString.trim
+    val json = fldMap.toJson(fields).substring(1)
+    if (json.length == 1) {
+      s"""{"type":${getValueOrNull(type_val)}""" + json
+    } else {
+      s"""{"type":${getValueOrNull(type_val)},""" + json
+    }
   }
 
 
@@ -41,7 +36,8 @@ object ChangeExtractUtils {
 
 
   def main(args: Array[String]): Unit = {
-    val name = get_ip_tags("a", null, "b", null)
-    println(name)
+    val map = Map("a" -> "b", "b" -> "c")
+    println(map.toJson(Seq()))
+    println(getTags(map, "a", Array[String]()))
   }
 }

+ 17 - 5
src/main/scala/com/winhc/bigdata/spark/utils/LoggingUtils.scala

@@ -1,13 +1,12 @@
 package com.winhc.bigdata.spark.utils
 
-import java.io.PrintWriter
-
-import com.winhc.bigdata.spark.utils.BaseUtil.getPartitions
 import org.apache.commons.lang3.StringUtils
 import org.apache.log4j.Logger
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
 import org.apache.spark.sql.{DataFrame, SparkSession}
 
 import scala.annotation.meta.getter
+import scala.collection.immutable.ListMap
 
 /**
  * π
@@ -132,7 +131,8 @@ trait LoggingUtils {
       default
     }
   }
- def getHeadPartitionsOrElse(t: String, default: String): String = {
+
+  def getHeadPartitionsOrElse(t: String, default: String): String = {
     val ps = getPartitions(t)
     if (ps.nonEmpty) {
       ps.head
@@ -141,7 +141,19 @@ trait LoggingUtils {
     }
   }
 
-  def getColumns(t:String): Seq[String] ={
+  def getColumns(t: String): Seq[String] = {
     spark.table(t).columns.seq
   }
+
+  def getSchema(map: ListMap[String, DataType]): StructType = {
+    StructType(map.map(e => StructField(e._1, e._2)).toArray)
+  }
+
+  def getEnvProjectName(env: String, projectName: String): String = {
+    if (env.equals("dev")) {
+      projectName
+    } else {
+      projectName.substring(0, projectName.length - 4)
+    }
+  }
 }

+ 14 - 0
src/main/scala/com/winhc/bigdata/spark/utils/ReflectUtils.scala

@@ -0,0 +1,14 @@
+package com.winhc.bigdata.spark.utils
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/7/27 17:01
+ * @Description:
+ */
+object ReflectUtils {
+  def getClazz[T](clazzName: String, initargs: Any*): T = {
+    Class.forName(clazzName)
+      .getConstructors.head.newInstance(initargs.asInstanceOf[Seq[Object]]: _*)
+      .asInstanceOf[T]
+  }
+}