|
@@ -0,0 +1,255 @@
|
|
|
+package com.winhc.bigdata.spark.jobs.monitor
|
|
|
+
|
|
|
+import java.util.Date
|
|
|
+
|
|
|
+import com.winhc.bigdata.spark.config.EsConfig
|
|
|
+import com.winhc.bigdata.spark.jobs.dynamic.{CompanyDynamicForDayCount, CompanyDynamicHandle, CompanyDynamicHandleUtils}
|
|
|
+import com.winhc.bigdata.spark.jobs.message.IntellectualMessage
|
|
|
+import com.winhc.bigdata.spark.udf.BaseFunc
|
|
|
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
|
|
|
+import com.winhc.bigdata.spark.utils.ReflectUtils.getClazz
|
|
|
+import com.winhc.bigdata.spark.utils.{AsyncExtract, LoggingUtils, SparkUtils}
|
|
|
+import org.apache.commons.lang3.time.DateFormatUtils
|
|
|
+import org.apache.spark.internal.Logging
|
|
|
+import org.apache.spark.sql.types.StringType
|
|
|
+import org.apache.spark.sql.{Row, SparkSession}
|
|
|
+
|
|
|
+import scala.annotation.meta.getter
|
|
|
+import scala.collection.immutable.ListMap
|
|
|
+import scala.collection.mutable
|
|
|
+
|
|
|
+/**
|
|
|
+ * @Author: π
|
|
|
+ * @Date: 2020/12/8
|
|
|
+ * @Description: 企业财产监控
|
|
|
+ */
|
|
|
+object CompanyMonitor {
|
|
|
+ val env = "dev"
|
|
|
+ val targetTab = "ads_company_monitor"
|
|
|
+
|
|
|
+ case class CompanyMonitorUtil(s: SparkSession,
|
|
|
+ project: String, //表所在工程名
|
|
|
+ ds: String //此维度主键
|
|
|
+ ) extends LoggingUtils with Logging with BaseFunc {
|
|
|
+ @(transient@getter) val spark: SparkSession = s
|
|
|
+
|
|
|
+
|
|
|
+ def init(): Unit = {
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |CREATE TABLE IF NOT EXISTS ${getEnvProjectName(env, project)}.$targetTab
|
|
|
+ |(
|
|
|
+ | id STRING COMMENT '唯一标示',
|
|
|
+ | cid STRING COMMENT '公司id',
|
|
|
+ | cname STRING COMMENT '公司name',
|
|
|
+ | info_type STRING COMMENT '变更分类,大类',
|
|
|
+ | flow_type STRING COMMENT '财产流向',
|
|
|
+ | rta_desc STRING COMMENT '变更信息描述,变更标题',
|
|
|
+ | change_time STRING COMMENT '变更时间',
|
|
|
+ | biz_id STRING COMMENT '业务id,数据行id',
|
|
|
+ | info_risk_level STRING COMMENT '变更风险等级',
|
|
|
+ | create_time STRING COMMENT '创建时间'
|
|
|
+ |)
|
|
|
+ |COMMENT '企业财务监控输出表'
|
|
|
+ |PARTITIONED BY
|
|
|
+ |(
|
|
|
+ | ds STRING COMMENT '分区',
|
|
|
+ | tn STRING COMMENT '表名'
|
|
|
+ |)
|
|
|
+ |LIFECYCLE 30
|
|
|
+ |""".stripMargin)
|
|
|
+ }
|
|
|
+
|
|
|
+ val tabMapping =
|
|
|
+ Map("wenshu_detail_combine_v2" -> "wenshu_detail_combine" //胜诉案件
|
|
|
+ , "company_land_mortgage_v2" -> "company_land_mortgage" //土地抵押权人
|
|
|
+ )
|
|
|
+
|
|
|
+ //转换字段
|
|
|
+ def trans(s: String): String = {
|
|
|
+ var res = s
|
|
|
+ if (tabMapping.contains(s)) {
|
|
|
+ res = tabMapping(s)
|
|
|
+ }
|
|
|
+ res
|
|
|
+ }
|
|
|
+
|
|
|
+ //表名(不加前后辍)
|
|
|
+ def calc(tableName: String
|
|
|
+ , bName: Int = 0 //是否补充cname字段
|
|
|
+ ): Unit = {
|
|
|
+ val handle = getClazz[CompanyMonitorHandle](s"com.winhc.bigdata.spark.jobs.monitor.tables.$tableName")
|
|
|
+
|
|
|
+ val types = handle.org_type()
|
|
|
+ val conditional = handle.get_conditional_filter()
|
|
|
+ val tn = trans(tableName)
|
|
|
+
|
|
|
+ val rdd = sql(
|
|
|
+ bName match {
|
|
|
+ //默认:无需补全cname字段
|
|
|
+ case 0 =>
|
|
|
+ s"""
|
|
|
+ |SELECT *,null AS cname
|
|
|
+ |FROM ${project}.ads_change_extract
|
|
|
+ |WHERE ds = '$ds'
|
|
|
+ |AND tn = '$tn'
|
|
|
+ |AND TYPE in (${types.map("'" + _ + "'").mkString(",")})
|
|
|
+ |$conditional
|
|
|
+ |""".stripMargin
|
|
|
+ //需根据cid补全cname字段数据
|
|
|
+ case 1 =>
|
|
|
+ s"""
|
|
|
+ |SELECT A.*,B.cname AS cname
|
|
|
+ |FROM(
|
|
|
+ | SELECT *
|
|
|
+ | FROM ${project}.ads_change_extract
|
|
|
+ | WHERE ds = '$ds'
|
|
|
+ | AND tn = '$tn'
|
|
|
+ | AND TYPE in (${types.map("'" + _ + "'").mkString(",")})
|
|
|
+ | $conditional
|
|
|
+ |) AS A
|
|
|
+ |LEFT JOIN (
|
|
|
+ | SELECT cid,cname FROM $project.base_company_mapping
|
|
|
+ | WHERE ds = '${getLastPartitionsOrElse(project + ".base_company_mapping", "0")}'
|
|
|
+ |) AS B
|
|
|
+ |ON A.cid = B.cid
|
|
|
+ |""".stripMargin
|
|
|
+
|
|
|
+ })
|
|
|
+ .rdd.flatMap(r => {
|
|
|
+ val rowkey = r.getAs[String]("rowkey")
|
|
|
+ val cid = r.getAs[String]("cid")
|
|
|
+ val new_data = r.getAs[Map[String, String]]("data")
|
|
|
+ val old_data = r.getAs[Map[String, String]]("old_data")
|
|
|
+ val biz_date = r.getAs[String]("biz_date")
|
|
|
+ val fields = r.getAs[String]("fields")
|
|
|
+ val cname = r.getAs[String]("cname")
|
|
|
+ if (biz_date == null)
|
|
|
+ None
|
|
|
+ val result = handle.handle(rowkey, biz_date, cid, if (fields == null) null else fields.split(","), old_data, new_data, cname)
|
|
|
+ if (result == null) {
|
|
|
+ None
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ result.map(res => Row(CompanyDynamicHandleUtils.getDynamicId(res._1, res._5, res._7, res._6),
|
|
|
+ res._1, res._2, res._3, res._4,
|
|
|
+ res._5.replaceAll("null", ""), res._6, res._7, res._8, res._9,
|
|
|
+ DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss")))
|
|
|
+ }
|
|
|
+ })
|
|
|
+
|
|
|
+ val schema = getSchema(ListMap(
|
|
|
+ "id" -> StringType
|
|
|
+ , "cid" -> StringType
|
|
|
+ , "cname" -> StringType
|
|
|
+ , "table_type" -> StringType
|
|
|
+ , "flow_type" -> StringType
|
|
|
+ , "rta_desc" -> StringType
|
|
|
+ , "change_time" -> StringType
|
|
|
+ , "biz_id" -> StringType
|
|
|
+ , "info_risk_level" -> StringType
|
|
|
+ , "type" -> StringType
|
|
|
+ , "create_time" -> StringType
|
|
|
+ ))
|
|
|
+ spark.createDataFrame(rdd, schema)
|
|
|
+ .createOrReplaceTempView(s"company_monitor_tmp_$tableName")
|
|
|
+
|
|
|
+ unescapeHtml4()
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${getEnvProjectName(env, project)}.$targetTab PARTITION(ds='$ds',tn='$tableName')
|
|
|
+ |SELECT id
|
|
|
+ | , cid
|
|
|
+ | , cname
|
|
|
+ | , table_type
|
|
|
+ | , flow_type
|
|
|
+ | , unescapeHtml4(rta_desc) rta_desc
|
|
|
+ | , change_time
|
|
|
+ | , biz_id
|
|
|
+ | , info_risk_level
|
|
|
+ | , type
|
|
|
+ | , create_time
|
|
|
+ |FROM
|
|
|
+ | company_monitor_tmp_$tableName
|
|
|
+ |WHERE id IS NOT NULL
|
|
|
+ |AND to_timestamp(change_time) <= now()
|
|
|
+ |""".stripMargin)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private val startArgs = Seq(
|
|
|
+ Args(tableName = "wenshu_detail_combine_v2", bName = 1)
|
|
|
+ , Args(tableName = "company_dishonest_info", bName = 1)
|
|
|
+ , Args(tableName = "company_zxr_list", bName = 1)
|
|
|
+ , Args(tableName = "company_zxr_restrict", bName = 1)
|
|
|
+ , Args(tableName = "company_court_open_announcement_list", bName = 1)
|
|
|
+ , Args(tableName = "wenshu_detail_combine", bName = 1)
|
|
|
+ , Args(tableName = "company_equity_info_list", bName = 1)
|
|
|
+ , Args(tableName = "company_land_mortgage", bName = 1)
|
|
|
+ , Args(tableName = "company_land_announcement", bName = 1)
|
|
|
+ , Args(tableName = "company_finance", bName = 1)
|
|
|
+ , Args(tableName = "bankruptcy_open_case", bName = 1)
|
|
|
+ , Args(tableName = "company_bid_list", bName = 1)
|
|
|
+ , Args(tableName = "company_mortgage_info", bName = 1)
|
|
|
+ , Args(tableName = "company_tm", bName = 1)
|
|
|
+ , Args(tableName = "company_patent_list", bName = 1)
|
|
|
+ , Args(tableName = "company_copyright_reg_list", bName = 1)
|
|
|
+ , Args(tableName = "company_copyright_works_list", bName = 1)
|
|
|
+ , Args(tableName = "company_holder_v2", bName = 1)
|
|
|
+ //, Args(tableName = "company", bName = 1)
|
|
|
+ , Args(tableName = "company_land_mortgage_v2", bName = 1)
|
|
|
+ , Args(tableName = "auction_tracking_list", bName = 1)
|
|
|
+
|
|
|
+ )
|
|
|
+
|
|
|
+ private case class Args(project: String = "winhc_eci_dev"
|
|
|
+ , tableName: String
|
|
|
+ , bName: Int = 1
|
|
|
+ , aggs: Int = 0)
|
|
|
+
|
|
|
+ def main(args: Array[String]): Unit = {
|
|
|
+
|
|
|
+
|
|
|
+ if (args.length != 3) {
|
|
|
+ println(
|
|
|
+ s"""
|
|
|
+ |Please enter the legal parameters !
|
|
|
+ |<project> <tableNames> <ds>
|
|
|
+ |""".stripMargin)
|
|
|
+ sys.exit(-99)
|
|
|
+ }
|
|
|
+
|
|
|
+ val Array(project, tableNames, ds) = args
|
|
|
+
|
|
|
+ println(
|
|
|
+ s"""
|
|
|
+ |project: $project
|
|
|
+ |tableNames: $tableNames
|
|
|
+ |ds: $ds
|
|
|
+ |""".stripMargin)
|
|
|
+
|
|
|
+ val config = EsConfig.getEsConfigMap ++ mutable.Map(
|
|
|
+ "spark.hadoop.odps.project.name" -> project,
|
|
|
+ "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
|
|
|
+ )
|
|
|
+ val spark = SparkUtils.InitEnv("CompanyMonitor", config)
|
|
|
+ val cd = CompanyMonitorUtil(spark, project, ds)
|
|
|
+ //cd.init()
|
|
|
+
|
|
|
+ var start = startArgs
|
|
|
+ if (!tableNames.equals("all")) {
|
|
|
+ val set = tableNames.split(",").toSet
|
|
|
+ start = start.filter(a => set.contains(a.tableName))
|
|
|
+ }
|
|
|
+
|
|
|
+ val a = start.map(e => (e.tableName, () => {
|
|
|
+ e.aggs match {
|
|
|
+ case _ => cd.calc(e.tableName, e.bName) //通用处理
|
|
|
+ }
|
|
|
+ true
|
|
|
+ }))
|
|
|
+
|
|
|
+ AsyncExtract.startAndWait(spark, a)
|
|
|
+ spark.stop()
|
|
|
+ }
|
|
|
+}
|