|
@@ -0,0 +1,373 @@
|
|
|
+package com.winhc.bigdata.spark.jobs.monitor.alldata.change
|
|
|
+
|
|
|
+import com.winhc.bigdata.spark.config.EsConfig
|
|
|
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
|
|
|
+import com.winhc.bigdata.spark.utils._
|
|
|
+import org.apache.spark.internal.Logging
|
|
|
+import org.apache.spark.sql.functions.col
|
|
|
+import org.apache.spark.sql.types.{MapType, StringType, StructField, StructType}
|
|
|
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
|
|
|
+
|
|
|
+import scala.annotation.meta.getter
|
|
|
+import scala.collection.mutable
|
|
|
+
|
|
|
+/**
|
|
|
+ * @Author: XuJiakai
|
|
|
+ * @Date: 2020/7/7 11:25
|
|
|
+ * @Description: 筛选出数据的具体变更
|
|
|
+ */
|
|
|
+object ChangeExtractAll {
|
|
|
+
|
|
|
+ //判断两个map在指定key上是否相等,如不等反回不相等字段
|
|
|
+ def getDoubleDataMap(iterable: Iterable[Map[String, String]]): (Map[String, String], Map[String, String]) = {
|
|
|
+ val map = iterable.map(m => (m("change_flag"), m)).toMap
|
|
|
+ (map.getOrElse("0", null), map.getOrElse("1", null))
|
|
|
+ }
|
|
|
+
|
|
|
+ def getHandleClazz(tableName: String, equCols: Seq[String]): {def handle(rowkey: String, oldMap: Map[String, String], newMap: Map[String, String]): (String, String, String, Map[String, String], String, String, String, String, Map[String, String])} = {
|
|
|
+ val clazz = s"com.winhc.bigdata.spark.jobs.monitor.alldata.change.tables.$tableName"
|
|
|
+ val foo = Class.forName(clazz)
|
|
|
+ .getConstructors.head.newInstance(equCols)
|
|
|
+ .asInstanceOf[ {
|
|
|
+ def handle(rowkey: String, oldMap: Map[String, String], newMap: Map[String, String]): (String, String, String, Map[String, String], String, String, String, String, Map[String, String])
|
|
|
+ }]
|
|
|
+ foo
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ case class ChangeExtractHandle(s: SparkSession,
|
|
|
+ project: String, //表所在工程名
|
|
|
+ tableName1: String, //表名(不加前后辍)
|
|
|
+ primaryKey: String, //此维度主键
|
|
|
+ inc_ds: String, //需要计算的分区
|
|
|
+ primaryFields: Seq[String] //主要字段,该字段任意一个不同 则认为发生变化
|
|
|
+ ) extends LoggingUtils with Logging {
|
|
|
+ @(transient@getter) val spark: SparkSession = s
|
|
|
+
|
|
|
+
|
|
|
+ val target_eci_change_extract = "ads_change_extract_all"
|
|
|
+
|
|
|
+ val updateTimeMapping = Map(
|
|
|
+ "wenshu_detail_combine" -> "update_date", //文书排序时间
|
|
|
+ "company_equity_info_list" -> "reg_date" //文书排序时间
|
|
|
+ )
|
|
|
+ //不同name映射table
|
|
|
+ val tabMapping =
|
|
|
+ Map("company_holder_v2" -> "company_holder"//胜诉案件
|
|
|
+ )
|
|
|
+
|
|
|
+ //转换字段
|
|
|
+ def trans(s: String): String = {
|
|
|
+ var res = s
|
|
|
+ if (tabMapping.contains(s)) {
|
|
|
+ res = tabMapping(s)
|
|
|
+ }
|
|
|
+ res
|
|
|
+ }
|
|
|
+
|
|
|
+ def calc(isCopy: Boolean = true): Unit = {
|
|
|
+ val tableName = trans(tableName1)
|
|
|
+ val cols = primaryFields.filter(!_.equals(primaryKey)).seq
|
|
|
+
|
|
|
+ val ds = inc_ds.replace("-", "")
|
|
|
+
|
|
|
+ val intersectCols = getColumns(s"$project.ads_$tableName").toSet & getColumns(s"$project.inc_ads_$tableName").toSet
|
|
|
+
|
|
|
+ val otherAllCols = intersectCols.filter(!primaryKey.equals(_)).toSeq
|
|
|
+ val all_cols = primaryKey +: otherAllCols :+ "change_flag"
|
|
|
+
|
|
|
+ val lastDs_ads_all = getLastPartitionsOrElse(s"$project.ads_$tableName", "0")
|
|
|
+
|
|
|
+ val handle = ReflectUtils.getClazz[CompanyChangeHandle1](s"com.winhc.bigdata.spark.jobs.monitor.alldata.change.tables.$tableName1", cols)
|
|
|
+ // val handle = getHandleClazz(tableName, cols)
|
|
|
+
|
|
|
+ var condition = handle.getCondition()
|
|
|
+
|
|
|
+ val update_time = BaseUtil.nowDate()
|
|
|
+
|
|
|
+ var df: DataFrame = null
|
|
|
+ isCopy match {
|
|
|
+ case true => {
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |SELECT cid,current_cid as new_cid
|
|
|
+ |FROM ${project}.inc_ads_company
|
|
|
+ |WHERE ds = '${getLastPartitionsOrElse(s"$project.inc_ads_company", "0")}'
|
|
|
+ |AND cid IS NOT NULL
|
|
|
+ |AND current_cid IS NOT NULL
|
|
|
+ |GROUP BY cid,current_cid
|
|
|
+ |""".stripMargin).createOrReplaceTempView("mapping")
|
|
|
+
|
|
|
+ val cid = getColumns(s"$project.ads_$tableName").filter(f => f.equals("cid") || f.equals("new_cid")).max
|
|
|
+
|
|
|
+ primaryKey.equals("rowkey") match {
|
|
|
+ case true => {
|
|
|
+ df = sql(
|
|
|
+ s"""
|
|
|
+ |SELECT t2.$primaryKey,${otherAllCols.map("t2." + _).mkString(",")},'0' as change_flag
|
|
|
+ |FROM (
|
|
|
+ |
|
|
|
+ | SELECT concat_ws('_',coalesce(mm.new_cid,tmp.$cid),split(rowkey, '_')[1]) AS rowkey
|
|
|
+ | ,${intersectCols.diff(Set("rowkey", "cid", "new_cid")).mkString(",")}
|
|
|
+ | ,coalesce(mm.new_cid,tmp.$cid) AS new_cid
|
|
|
+ | ,tmp.$cid as cid
|
|
|
+ | ,c
|
|
|
+ | FROM (
|
|
|
+ | SELECT a.*
|
|
|
+ | ,row_number() OVER (PARTITION BY a.${primaryKey} ORDER BY ${updateTimeMapping.getOrElse(tableName, "update_time")} DESC) c
|
|
|
+ | FROM (
|
|
|
+ | SELECT ${intersectCols.mkString(",")}
|
|
|
+ | FROM $project.ads_$tableName
|
|
|
+ | WHERE ds = $lastDs_ads_all ${condition}
|
|
|
+ | UNION ALL
|
|
|
+ | SELECT ${intersectCols.mkString(",")}
|
|
|
+ | FROM $project.inc_ads_$tableName
|
|
|
+ | WHERE ds > $lastDs_ads_all ${condition}
|
|
|
+ | ) AS a
|
|
|
+ | ) AS tmp
|
|
|
+ | LEFT JOIN mapping mm
|
|
|
+ | ON tmp.$cid = mm.cid
|
|
|
+ | WHERE tmp.c = 1
|
|
|
+ | ) AS t2
|
|
|
+ |""".stripMargin)
|
|
|
+ }
|
|
|
+ case false => {
|
|
|
+ df = sql(
|
|
|
+ s"""
|
|
|
+ |SELECT t2.$primaryKey,${otherAllCols.map("t2." + _).mkString(",")},'0' as change_flag
|
|
|
+ |FROM (
|
|
|
+ |
|
|
|
+ | SELECT ${intersectCols.diff(Set("rowkey", cid)).mkString(",")}
|
|
|
+ | ,coalesce(mm.new_cid,tmp.$cid) AS $cid
|
|
|
+ | FROM (
|
|
|
+ | SELECT a.*
|
|
|
+ | ,row_number() OVER (PARTITION BY a.${primaryKey} ORDER BY ${updateTimeMapping.getOrElse(tableName, "update_time")} DESC) c
|
|
|
+ | FROM (
|
|
|
+ | SELECT ${intersectCols.mkString(",")}
|
|
|
+ | FROM $project.ads_$tableName
|
|
|
+ | WHERE ds = $lastDs_ads_all ${condition}
|
|
|
+ | UNION ALL
|
|
|
+ | SELECT ${intersectCols.mkString(",")}
|
|
|
+ | FROM $project.inc_ads_$tableName
|
|
|
+ | WHERE ds > $lastDs_ads_all ${condition}
|
|
|
+ | ) AS a
|
|
|
+ | ) AS tmp
|
|
|
+ | LEFT JOIN mapping mm
|
|
|
+ | ON tmp.$cid = mm.cid
|
|
|
+ | WHERE tmp.c = 1
|
|
|
+ | ) AS t2
|
|
|
+ |""".stripMargin)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ }
|
|
|
+ case false => {
|
|
|
+ df = sql(
|
|
|
+ s"""
|
|
|
+
|
|
|
+ |SELECT t2.$primaryKey,${otherAllCols.map("t2." + _).mkString(",")},'0' as change_flag
|
|
|
+ |FROM
|
|
|
+ | (
|
|
|
+ | SELECT tmp.*
|
|
|
+ | FROM (
|
|
|
+ | SELECT a.*
|
|
|
+ | ,row_number() OVER (PARTITION BY a.${primaryKey} ORDER BY ${updateTimeMapping.getOrElse(tableName, "update_time")} DESC) c
|
|
|
+ | FROM (
|
|
|
+ | SELECT ${intersectCols.mkString(",")}
|
|
|
+ | FROM $project.ads_$tableName
|
|
|
+ | WHERE ds = $lastDs_ads_all ${condition}
|
|
|
+ | UNION ALL
|
|
|
+ | SELECT ${intersectCols.mkString(",")}
|
|
|
+ | FROM $project.inc_ads_$tableName
|
|
|
+ | WHERE ds > $lastDs_ads_all ${condition}
|
|
|
+ | ) AS a
|
|
|
+ | ) AS tmp
|
|
|
+ | WHERE tmp.c = 1
|
|
|
+ | ) AS t2
|
|
|
+ |""".stripMargin)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ val rdd =
|
|
|
+ df.select(all_cols.map(column => col(column).cast("string")): _*)
|
|
|
+ .rdd.map(r => {
|
|
|
+ (r.getAs[String](primaryKey), all_cols.map(f => (f, r.getAs[String](f))).toMap)
|
|
|
+ }).groupByKey()
|
|
|
+ .map(x => {
|
|
|
+ val rowkey = x._1
|
|
|
+ val map_list = x._2
|
|
|
+ // try {
|
|
|
+ // if (map_list.size == 1) {
|
|
|
+ // val res = handle.handle(rowkey, null, map_list.head)
|
|
|
+ // Row(res._1, res._2, tableName, res._3, res._4, res._5, res._6, res._7, res._8, update_time, res._9)
|
|
|
+ // } else {
|
|
|
+ // if (map_list.size > 2) {
|
|
|
+ // logInfo("list.size > 2! rowkey:" + rowkey)
|
|
|
+ // }
|
|
|
+ val m = getDoubleDataMap(map_list)
|
|
|
+
|
|
|
+ val new_map = m._1
|
|
|
+ val old_map = m._2
|
|
|
+ if (new_map == null && old_map == null) {
|
|
|
+ null
|
|
|
+ } else if (old_map == null) {
|
|
|
+ val res = handle.handle(rowkey, null, map_list.head)
|
|
|
+ if (res == null) {
|
|
|
+ null
|
|
|
+ } else {
|
|
|
+ Row(res._1, res._2, tableName, res._3, res._4, res._5, res._6, res._7, res._8, update_time, res._9)
|
|
|
+ }
|
|
|
+ } else if (new_map == null) {
|
|
|
+ null
|
|
|
+ } else {
|
|
|
+ val res = handle.handle(rowkey, old_map, new_map)
|
|
|
+ if (res == null) {
|
|
|
+ null
|
|
|
+ } else {
|
|
|
+ Row(res._1, res._2, tableName, res._3, res._4, res._5, res._6, res._7, res._8, update_time, res._9)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // }
|
|
|
+ /* } catch {
|
|
|
+ case e: Exception => {
|
|
|
+ logError(s"xjk rowkey:$rowkey msg:${e.getMessage} equCols:$cols")
|
|
|
+ logError(e.getMessage, e)
|
|
|
+ println(s"xjk rowkey:$rowkey msg:${e.getMessage} equCols:$cols")
|
|
|
+ }
|
|
|
+ null
|
|
|
+ }*/
|
|
|
+ }).filter(_ != null)
|
|
|
+
|
|
|
+ val schema = StructType(Array(
|
|
|
+ StructField("rowkey", StringType), //表数据主建
|
|
|
+ StructField("cid", StringType), //公司id
|
|
|
+ StructField("table_name", StringType), //表名
|
|
|
+ StructField("type", StringType), // 变更类型 insert update
|
|
|
+ StructField("data", MapType(StringType, StringType)), //变更后数据
|
|
|
+ StructField("fields", StringType), //如果是更新 则显示更新字段
|
|
|
+ StructField("title", StringType), // 动态数据展示 ps. 新增某土地公示
|
|
|
+ StructField("label", StringType), // 1.一般变更,2.风险变更
|
|
|
+ StructField("biz_time", StringType), //业务时间
|
|
|
+ StructField("update_time", StringType), //处理时间
|
|
|
+ StructField("old_data", MapType(StringType, StringType)) //变更前数据
|
|
|
+ ))
|
|
|
+
|
|
|
+ spark.createDataFrame(rdd, schema)
|
|
|
+ .createOrReplaceTempView(s"tmp_change_all_view$tableName1") //
|
|
|
+
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${project}.$target_eci_change_extract PARTITION(ds='$ds',tn='$tableName1')
|
|
|
+ |SELECT *
|
|
|
+ |FROM
|
|
|
+ | tmp_change_all_view$tableName1
|
|
|
+ |""".stripMargin)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ // winhc_eci_dev company_tm rowkey 20200717 status_new
|
|
|
+ // winhc_eci_dev company_patent_list rowkey 20200717 lprs
|
|
|
+ // winhc_eci_dev company_certificate rowkey 20200707 type
|
|
|
+ // winhc_eci_dev company_copyright_works_list rowkey 20200717 type
|
|
|
+ // winhc_eci_dev company_copyright_reg_list rowkey 20200717 version
|
|
|
+ // winhc_eci_dev company_employment rowkey 20200630 source
|
|
|
+
|
|
|
+ // winhc_eci_dev company_land_publicity rowkey 20200717 title,location,use_for
|
|
|
+ // winhc_eci_dev company_land_announcement rowkey 20200717 e_number,project_name
|
|
|
+
|
|
|
+ // winhc_eci_dev company_bid_list rowkey 20200717 title
|
|
|
+ // winhc_eci_dev company_land_transfer rowkey 20200717 num,location
|
|
|
+ // winhc_eci_dev company_abnormal_info rowkey 20200717 remove_reason
|
|
|
+
|
|
|
+ // winhc_eci_dev company_own_tax rowkey 20200729 tax_balance,tax_category,tax_num
|
|
|
+
|
|
|
+
|
|
|
+ //winhc_eci_dev company_equity_info id 20200730 reg_number false
|
|
|
+
|
|
|
+
|
|
|
+ // winhc_eci_dev company cid 20200630 legal_entity_id,reg_location,business_scope,reg_status,reg_capital,emails,phones
|
|
|
+
|
|
|
+
|
|
|
+ private val startArgs = Seq(
|
|
|
+// Args(tableName = "company_tm", primaryFields = "status_new")
|
|
|
+// , Args(tableName = "company_patent_list", primaryFields = "lprs")
|
|
|
+ Args(tableName = "company_land_announcement", primaryFields = "e_number,project_name")
|
|
|
+ , Args(tableName = "company_bid_list", primaryFields = "title")
|
|
|
+ , Args(tableName = "company_zxr_list", primaryFields = "status")
|
|
|
+// , Args(tableName = "company_copyright_works_list", primaryFields = "type")
|
|
|
+// , Args(tableName = "company_copyright_reg_list", primaryFields = "version")
|
|
|
+ , Args(tableName = "company_land_mortgage", primaryFields = "land_num,source_url")
|
|
|
+ , Args(tableName = "bankruptcy_open_case", primaryFields = "case_no", isCopy = false) //破产重整
|
|
|
+ , Args(tableName = "company_mortgage_info", primaryFields = "reg_num") //动产抵押
|
|
|
+ , Args(tableName = "company_court_open_announcement_list", primaryFields = "case_reason") //开庭公告
|
|
|
+ , Args(tableName = "company_zxr_restrict", primaryFields = "status") //限制消费令,发现最新状态
|
|
|
+
|
|
|
+
|
|
|
+ , Args(tableName = "wenshu_detail_combine", primaryFields = "cname") //文书
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ , Args(tableName = "company_equity_info_list", primaryFields = "reg_number")
|
|
|
+ //公司名称,法人ID:人标识或公司标识,公司类型,注册地址,营业期限终止日期,经营范围,登记机关,企业状态 ,注册资本,注销日期,注销原因
|
|
|
+ , Args(tableName = "company_finance", primaryFields = "round")
|
|
|
+ , Args(tableName = "company_dishonest_info", primaryFields = "status")
|
|
|
+// , Args(tableName = "company_holder", primaryFields = "amount")
|
|
|
+// , Args(tableName = "company_holder_v2", primaryFields = "deleted")
|
|
|
+ , Args(tableName = "increase_registered_capital_info", primaryFields = "change_time")
|
|
|
+ , Args(tableName = "auction_tracking_list", primaryFields = "auction_items_id")
|
|
|
+
|
|
|
+
|
|
|
+ , Args(tableName = "zxr_evaluate", primaryFields = "name,case_no,asset_name")
|
|
|
+ , Args(tableName = "zxr_evaluate_results", primaryFields = "name,case_no,asset_name")
|
|
|
+
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
+ private case class Args(project: String = "winhc_eci_dev"
|
|
|
+ , tableName: String
|
|
|
+ , primaryKey: String = "rowkey"
|
|
|
+ , primaryFields: String
|
|
|
+ , isCopy: Boolean = true)
|
|
|
+
|
|
|
+
|
|
|
+ def main(args: Array[String]): Unit = {
|
|
|
+ val Array(tableName, inc_ds) = args
|
|
|
+
|
|
|
+ val config = EsConfig.getEsConfigMap ++ mutable.Map(
|
|
|
+ "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
|
|
|
+ "spark.hadoop.odps.spark.local.partition.amt" -> "100"
|
|
|
+ )
|
|
|
+ val spark = SparkUtils.InitEnv("MonitorChangeAll", config)
|
|
|
+
|
|
|
+
|
|
|
+ var start = startArgs
|
|
|
+ if (!tableName.equals("all")) {
|
|
|
+ val set = tableName.split(",").toSet
|
|
|
+ start = start.filter(a => set.contains(a.tableName))
|
|
|
+ }
|
|
|
+
|
|
|
+ val a = start.map(e => (e.tableName, () => {
|
|
|
+ ChangeExtractHandle(spark, e.project, e.tableName, e.primaryKey, inc_ds, e.primaryFields.split(",")).calc(e.isCopy)
|
|
|
+ true
|
|
|
+ }))
|
|
|
+
|
|
|
+ AsyncExtract.startAndWait(spark, a)
|
|
|
+
|
|
|
+ /* if (tableName.equals("all")) {
|
|
|
+ startArgs.foreach(e => {
|
|
|
+ ChangeExtractHandle(spark, e.project, e.tableName, e.primaryKey, inc_ds, e.primaryFields.split(",")).calc(e.isCopy)
|
|
|
+ })
|
|
|
+ } else {
|
|
|
+ val set = tableName.split(",").toSet
|
|
|
+ startArgs.filter(a => set.contains(a.tableName)).foreach(e => {
|
|
|
+ ChangeExtractHandle(spark, e.project, e.tableName, e.primaryKey, inc_ds, e.primaryFields.split(",")).calc(e.isCopy)
|
|
|
+ })
|
|
|
+ }*/
|
|
|
+
|
|
|
+ spark.stop()
|
|
|
+ }
|
|
|
+
|
|
|
+}
|