|
@@ -0,0 +1,368 @@
|
|
|
+package com.winhc.bigdata.spark.ng.chance_v9
|
|
|
+
|
|
|
+import com.winhc.bigdata.spark.config.EsConfig
|
|
|
+import com.winhc.bigdata.spark.utils.ReflectUtils.getClazz
|
|
|
+import com.winhc.bigdata.spark.utils.{AsyncExtract, BaseUtil, LoggingUtils, SparkUtils}
|
|
|
+import org.apache.commons.lang3.StringUtils
|
|
|
+import org.apache.spark.internal.Logging
|
|
|
+import org.apache.spark.sql.SparkSession
|
|
|
+import org.apache.spark.sql.types.StringType
|
|
|
+import com.winhc.bigdata.spark.udf.{BaseFunc, CaseChanceFunc}
|
|
|
+
|
|
|
+import scala.annotation.meta.getter
|
|
|
+import scala.collection.immutable.ListMap
|
|
|
+import scala.collection.mutable
|
|
|
+
|
|
|
+/**
|
|
|
+ * @Author: π
|
|
|
+ * @Date: 2021/9/9
|
|
|
+ * @Description: 利好消息
|
|
|
+ */
|
|
|
+object GoodNewsV9 {
|
|
|
+
|
|
|
+ case class CompanyMonitorUtil(s: SparkSession,
|
|
|
+ project: String, //表所在工程名
|
|
|
+ incr: Boolean //是否增量
|
|
|
+ ) extends LoggingUtils with Logging with BaseFunc with CaseChanceFunc {
|
|
|
+ @(transient@getter) val spark: SparkSession = s
|
|
|
+
|
|
|
+ val inc_ads_eci_debtor_relation = "winhc_ng.inc_ads_eci_debtor_relation_v9"
|
|
|
+ val target_ads_case_chance = "winhc_ng.ads_case_chance_good_news_v9"
|
|
|
+ val target_ads_case_chance_element = "winhc_ng.ads_case_chance_element_good_news_v9"
|
|
|
+
|
|
|
+ //private lazy val org_tab = if (incr) "winhc_ng.bds_change_extract" else "winhc_ng.bds_change_extract_all_v2"
|
|
|
+ private lazy val org_tab = "winhc_ng.bds_change_extract_v9"
|
|
|
+ private var ds = getLastPartitionsOrElse(org_tab, "0")
|
|
|
+ ds= BaseUtil.getYesterday()
|
|
|
+ if (!incr) {
|
|
|
+ ds = "0"
|
|
|
+ }
|
|
|
+
|
|
|
+ val relation_cols: Seq[String] = getColumns(inc_ads_eci_debtor_relation).filter(!_.equals("ds"))
|
|
|
+ val eci_debtor_rel_ds: String = getLastPartitionsOrElse(inc_ads_eci_debtor_relation, "0")
|
|
|
+
|
|
|
+
|
|
|
+ //不同name映射table
|
|
|
+ val tabMapping =
|
|
|
+ Map("company_court_announcement_v1" -> "company_court_announcement" //开庭公告-被告
|
|
|
+ , "wenshu_detail_v2_bg_yishen" -> "wenshu_detail_v2" //文书
|
|
|
+ , "wenshu_detail_v2_yg_yishen" -> "wenshu_detail_v2" //文书
|
|
|
+ , "wenshu_detail_v2_yg_zhongben" -> "wenshu_detail_v2" //文书
|
|
|
+ )
|
|
|
+
|
|
|
+ //转换字段
|
|
|
+ def trans(s: String): String = {
|
|
|
+ if (tabMapping.contains(s)) return tabMapping(s)
|
|
|
+ s
|
|
|
+ }
|
|
|
+
|
|
|
+ def calc(tableName1: String, mode: String): Unit = {
|
|
|
+ cleanup()
|
|
|
+ json_utils()
|
|
|
+ json_add_kv()
|
|
|
+ map_2_json()
|
|
|
+ trans_number()
|
|
|
+ chance_dynamic_type()
|
|
|
+
|
|
|
+ val tableName2 = trans(tableName1)
|
|
|
+
|
|
|
+ val clazz = getClazz[CompanyChanceHandleV9](s"com.winhc.bigdata.spark.ng.chance_v9.table.$tableName1", incr)
|
|
|
+ val conditional = clazz.get_conditional_filter()
|
|
|
+ val filter = clazz.filter
|
|
|
+ val flat_map = clazz.flat_map
|
|
|
+ val tn = tableName2
|
|
|
+
|
|
|
+ //-- WHERE ds = '$ds'
|
|
|
+ val rdd = sql(
|
|
|
+ s"""
|
|
|
+ |SELECT *
|
|
|
+ |FROM $org_tab
|
|
|
+ |WHERE ds >= '$ds'
|
|
|
+ |AND tn = '$tn'
|
|
|
+ |$conditional
|
|
|
+ |""".stripMargin)
|
|
|
+ .rdd.map(r => {
|
|
|
+ val value = r.getAs[String]("change_fields")
|
|
|
+ val change_fields: Seq[String] = if (StringUtils.isEmpty(value)) Seq.empty else value.split(",")
|
|
|
+ //todo 待优化
|
|
|
+ val company_id = r.getAs("rowkey").toString.split("_")
|
|
|
+ val new_data = r.getAs[Map[String, String]]("new_data")
|
|
|
+ val biz_date = clazz.getBizDate(new_data)
|
|
|
+
|
|
|
+ ChangeExtract(rowkey = r.getAs("rowkey")
|
|
|
+ , company_id = company_id(0)
|
|
|
+ , company_name = null
|
|
|
+ , tn = r.getAs("tn")
|
|
|
+ , update_type = r.getAs("update_type")
|
|
|
+ , old_data = r.getAs("old_data")
|
|
|
+ , new_data = r.getAs("new_data")
|
|
|
+ , change_fields = change_fields
|
|
|
+ , biz_date = biz_date
|
|
|
+ , update_time = BaseUtil.nowDate()
|
|
|
+ )
|
|
|
+ }).filter(r => {
|
|
|
+ if (filter == null) {
|
|
|
+ true
|
|
|
+ } else {
|
|
|
+ filter(r.update_type, r.biz_date, r.change_fields, r.old_data, r.new_data)
|
|
|
+ }
|
|
|
+ }).flatMap(flat_map)
|
|
|
+ .map(_.format())
|
|
|
+ .filter(_ != null)
|
|
|
+
|
|
|
+ val schema = getSchema(ListMap(
|
|
|
+ "id" -> StringType
|
|
|
+ , "rowkey" -> StringType
|
|
|
+ , "company_id" -> StringType
|
|
|
+ , "title" -> StringType
|
|
|
+ , "label" -> StringType
|
|
|
+ , "change_time" -> StringType
|
|
|
+ , "update_time" -> StringType
|
|
|
+ , "update_type" -> StringType
|
|
|
+ , "deleted" -> StringType
|
|
|
+ , "new_data" -> StringType
|
|
|
+ , "plaintiff" -> StringType
|
|
|
+ , "defendant" -> StringType
|
|
|
+ ))
|
|
|
+
|
|
|
+ spark.createDataFrame(rdd.map(_.to_row()), schema)
|
|
|
+ .createOrReplaceTempView(s"good_news_debtor_relation_view$tableName1")
|
|
|
+
|
|
|
+ mode match {
|
|
|
+ case "0" => {
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |SELECT *
|
|
|
+ |FROM (
|
|
|
+ | SELECT ${relation_cols.map(n => s"$n as $n").mkString(",")}
|
|
|
+ | FROM $inc_ads_eci_debtor_relation
|
|
|
+ | WHERE ds = '$eci_debtor_rel_ds'
|
|
|
+ | and deleted = 0
|
|
|
+ | ) AS t1
|
|
|
+ |JOIN (
|
|
|
+ | SELECT
|
|
|
+ | *
|
|
|
+ | FROM good_news_debtor_relation_view$tableName1
|
|
|
+ | ) AS t2
|
|
|
+ |ON t1.bg_company_id = t2.company_id
|
|
|
+ |WHERE t1.bg_name not like concat('%','银行','%') AND t1.bg_name not like concat('%','保险','%')
|
|
|
+ |""".stripMargin).createOrReplaceTempView(s"good_news_debtor_relation_view${tableName1}v2")
|
|
|
+
|
|
|
+
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |INSERT INTO TABLE $target_ads_case_chance_element PARTITION(ds='$ds', tn='$tableName1')
|
|
|
+ |SELECT md5(cleanup(CONCAT_WS('',case_chance_id,case_chance_type,type,province,city,dynamic_time))) AS id
|
|
|
+ | ,CASE_CHANCE_ID
|
|
|
+ | ,TYPE
|
|
|
+ | ,province
|
|
|
+ | ,city
|
|
|
+ | ,county
|
|
|
+ | ,dynamic_time
|
|
|
+ | ,public_date
|
|
|
+ | ,CASE_CHANCE_TYPE
|
|
|
+ |FROM (
|
|
|
+ | SELECT *
|
|
|
+ | ,ROW_NUMBER() OVER(PARTITION BY CASE_CHANCE_ID,TYPE,PROVINCE,city ORDER BY CASE_CHANCE_ID) AS num
|
|
|
+ | FROM (
|
|
|
+ | SELECT rowkey AS CASE_CHANCE_ID
|
|
|
+ | ,0 AS TYPE
|
|
|
+ | ,get_table_type('$tableName1') AS CASE_CHANCE_TYPE
|
|
|
+ | ,yg_province_code AS PROVINCE
|
|
|
+ | ,yg_city_code AS city
|
|
|
+ | ,yg_county_code AS county
|
|
|
+ | ,change_time AS dynamic_time
|
|
|
+ | ,update_time AS public_date
|
|
|
+ | FROM good_news_debtor_relation_view${tableName1}v2
|
|
|
+ | UNION ALL
|
|
|
+ | SELECT rowkey AS CASE_CHANCE_ID
|
|
|
+ | ,1 AS TYPE
|
|
|
+ | ,get_table_type('$tableName1') AS CASE_CHANCE_TYPE
|
|
|
+ | ,bg_province_code AS PROVINCE
|
|
|
+ | ,bg_city_code AS city
|
|
|
+ | ,bg_county_code AS county
|
|
|
+ | ,change_time AS dynamic_time
|
|
|
+ | ,update_time AS public_date
|
|
|
+ | FROM good_news_debtor_relation_view${tableName1}v2
|
|
|
+ | )
|
|
|
+ | ) AS t
|
|
|
+ |WHERE t.num = 1
|
|
|
+ |""".stripMargin)
|
|
|
+
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |
|
|
|
+ |INSERT INTO TABLE $target_ads_case_chance PARTITION(ds='$ds', tn='$tableName1')
|
|
|
+ |SELECT rowkey AS case_chance_id
|
|
|
+ | -- ,title AS title
|
|
|
+ | ,bg_name as title
|
|
|
+ | ,plaintiff
|
|
|
+ | ,defendant
|
|
|
+ | ,bg_name AS company_name
|
|
|
+ | ,company_id
|
|
|
+ | ,json_add_str(label,CONCAT_WS(',',get_json_kv('reg_capital',trans_number(bg_reg_capital)),get_json_kv('province_code',bg_province_code),get_json_kv('city_code',bg_city_code),get_json_kv('county_code',bg_county_code),get_json_kv('estiblish_time',bg_estiblish_time) ,get_json_kv('province_name',bg_province_name),get_json_kv('city_name',bg_city_name),get_json_kv('county_name',bg_county_name),get_json_kv('category_first',bg_category_first_name) )) AS tags
|
|
|
+ | ,rowkey AS biz_id
|
|
|
+ | ,get_table_type('$tableName1') AS type
|
|
|
+ | ,get_chance_dynamic_type('$tableName1') AS dynamic_type
|
|
|
+ | ,new_data AS dynamic_content
|
|
|
+ | ,change_time AS dynamic_time
|
|
|
+ | ,update_time AS public_date
|
|
|
+ |FROM (
|
|
|
+ | SELECT *
|
|
|
+ | ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY rowkey) AS num
|
|
|
+ | FROM good_news_debtor_relation_view${tableName1}v2
|
|
|
+ | ) AS t
|
|
|
+ |WHERE t.num = 1
|
|
|
+ |""".stripMargin)
|
|
|
+ }
|
|
|
+ case "1" => {
|
|
|
+
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |SELECT t2.*,
|
|
|
+ | name,
|
|
|
+ | province_code,
|
|
|
+ | city_code,
|
|
|
+ | county_code,
|
|
|
+ | reg_capital,
|
|
|
+ | estiblish_time,
|
|
|
+ | phones,
|
|
|
+ | cate_first_code,
|
|
|
+ | cate_second_code,
|
|
|
+ | cate_third_code
|
|
|
+ |FROM (
|
|
|
+ | SELECT * FROM winhc_ng.tmp_xf_company_all
|
|
|
+ | ) AS t1
|
|
|
+ |JOIN (
|
|
|
+ | SELECT
|
|
|
+ | *
|
|
|
+ | FROM good_news_debtor_relation_view$tableName1
|
|
|
+ | ) AS t2
|
|
|
+ |ON t1.company_id = t2.company_id
|
|
|
+ |WHERE t1.name not like concat('%','银行','%') AND t1.name not like concat('%','保险','%')
|
|
|
+ |""".stripMargin).createOrReplaceTempView(s"good_news_debtor_relation_view${tableName1}v2")
|
|
|
+
|
|
|
+ //债务人要素表
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |INSERT INTO TABLE $target_ads_case_chance_element PARTITION (ds=$ds, tn='$tableName1')
|
|
|
+ |SELECT md5(cleanup(CONCAT_WS('',case_chance_id,case_chance_type,type,province_code,city_code,dynamic_time))) AS id
|
|
|
+ | ,case_chance_id
|
|
|
+ | ,type
|
|
|
+ | ,province_code
|
|
|
+ | ,city_code
|
|
|
+ | ,county_code
|
|
|
+ | ,dynamic_time
|
|
|
+ | ,public_date
|
|
|
+ | ,case_chance_type
|
|
|
+ |FROM
|
|
|
+ |(
|
|
|
+ |select
|
|
|
+ | rowkey AS case_chance_id
|
|
|
+ | ,0 AS type
|
|
|
+ | ,province_code
|
|
|
+ | ,city_code
|
|
|
+ | ,county_code
|
|
|
+ | ,change_time AS dynamic_time
|
|
|
+ | ,update_time AS public_date
|
|
|
+ | ,get_table_type('$tableName1') AS case_chance_type
|
|
|
+ |from good_news_debtor_relation_view${tableName1}v2
|
|
|
+ |)
|
|
|
+ |WHERE trim(province_code) <> ''
|
|
|
+ |""".stripMargin)
|
|
|
+
|
|
|
+ //案源机会表
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |INSERT INTO TABLE $target_ads_case_chance PARTITION (ds=$ds, tn='$tableName1')
|
|
|
+ |SELECT
|
|
|
+ | rowkey as case_chance_id
|
|
|
+ | ,title
|
|
|
+ | ,plaintiff
|
|
|
+ | ,defendant
|
|
|
+ | ,name as company_name
|
|
|
+ | ,company_id
|
|
|
+ | ,label as tags
|
|
|
+ | ,rowkey as biz_id
|
|
|
+ | ,get_table_type('$tableName1') AS type
|
|
|
+ | ,get_chance_dynamic_type('$tableName1') AS dynamic_type
|
|
|
+ | ,new_data AS dynamic_content
|
|
|
+ | ,change_time AS dynamic_time
|
|
|
+ | ,update_time AS public_date
|
|
|
+ |FROM good_news_debtor_relation_view${tableName1}v2
|
|
|
+ |""".stripMargin)
|
|
|
+
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private val startArgs = Seq(
|
|
|
+ Args(tableName = "company_equity_info")
|
|
|
+ , Args(tableName = "company_tm")
|
|
|
+ , Args(tableName = "company_patent")
|
|
|
+ , Args(tableName = "company_copyright_works")
|
|
|
+ , Args(tableName = "company_copyright_reg")
|
|
|
+ , Args(tableName = "company_land_publicity")
|
|
|
+ , Args(tableName = "company_land_announcement")
|
|
|
+ //, Args(tableName = "company_bid")
|
|
|
+ , Args(tableName = "company_land_transfer")
|
|
|
+ , Args(tableName = "company_employment")
|
|
|
+ , Args(tableName = "company_certificate")
|
|
|
+ , Args(tableName = "company_zxr_restrict")
|
|
|
+ , Args(tableName = "company_zxr")
|
|
|
+ , Args(tableName = "company_dishonest_info")
|
|
|
+ , Args(tableName = "company_court_announcement")
|
|
|
+ , Args(tableName = "bankruptcy_open_case")
|
|
|
+ , Args(tableName = "wenshu_detail_v2_bg_yishen")
|
|
|
+ , Args(tableName = "company_court_announcement_v1", mode = "1")
|
|
|
+ , Args(tableName = "wenshu_detail_v2_yg_yishen", mode = "1")
|
|
|
+ , Args(tableName = "wenshu_detail_v2_yg_zhongben", mode = "1")
|
|
|
+ )
|
|
|
+
|
|
|
+ private case class Args(project: String = "winhc_ng"
|
|
|
+ , tableName: String
|
|
|
+ , mode: String = "0") //处理模式
|
|
|
+
|
|
|
+ def main(args: Array[String]): Unit = {
|
|
|
+
|
|
|
+ if (args.length != 3) {
|
|
|
+ println(
|
|
|
+ s"""
|
|
|
+ |Please enter the legal parameters !
|
|
|
+ |<project> <tableNames> <incr>
|
|
|
+ |""".stripMargin)
|
|
|
+ sys.exit(-99)
|
|
|
+ }
|
|
|
+
|
|
|
+ val Array(project, tableNames, incr) = args
|
|
|
+
|
|
|
+ println(
|
|
|
+ s"""
|
|
|
+ |project: $project
|
|
|
+ |tableNames: $tableNames
|
|
|
+ |incr: $incr
|
|
|
+ |""".stripMargin)
|
|
|
+
|
|
|
+ val config = EsConfig.getEsConfigMap ++ mutable.Map(
|
|
|
+ "spark.hadoop.odps.project.name" -> project,
|
|
|
+ "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
|
|
|
+ )
|
|
|
+ val spark = SparkUtils.InitEnv("CompanyMonitorV2", config)
|
|
|
+ val cd = CompanyMonitorUtil(spark, project, if ("incr".equals(incr)) true else false)
|
|
|
+
|
|
|
+ var start = startArgs
|
|
|
+ if (!tableNames.equals("all")) {
|
|
|
+ val set = tableNames.split(",").toSet
|
|
|
+ start = start.filter(a => set.contains(a.tableName))
|
|
|
+ }
|
|
|
+
|
|
|
+ val a = start.map(e => (e.tableName, () => {
|
|
|
+ cd.calc(e.tableName, e.mode) //通用处理
|
|
|
+ true
|
|
|
+ }))
|
|
|
+
|
|
|
+ AsyncExtract.startAndWait(spark, a)
|
|
|
+ spark.stop()
|
|
|
+ }
|
|
|
+}
|