|
@@ -0,0 +1,445 @@
|
|
|
+package com.winhc.bigdata.spark.jobs.deadbeat
|
|
|
+
|
|
|
+import com.winhc.bigdata.spark.udf.BaseFunc
|
|
|
+import com.winhc.bigdata.spark.utils.{DateUtils, LoggingUtils, SparkUtils}
|
|
|
+import org.apache.commons.lang3.StringUtils
|
|
|
+import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
|
|
|
+import org.apache.spark.sql.types._
|
|
|
+import org.apache.spark.sql.{Row, SparkSession}
|
|
|
+
|
|
|
+import scala.annotation.meta.getter
|
|
|
+import scala.collection.mutable
|
|
|
+
|
|
|
+/**
|
|
|
+ * @Author: XuJiakai
|
|
|
+ * @Date: 2020/10/12 18:43
|
|
|
+ * @Description: 查失信 查老赖 下游处理
|
|
|
+ */
|
|
|
+case class deadbeat_info(s: SparkSession,
|
|
|
+ project: String //表所在工程名
|
|
|
+ ) extends LoggingUtils with BaseFunc {
|
|
|
+ @(transient@getter) val spark: SparkSession = s
|
|
|
+
|
|
|
+ private val env = "dev"
|
|
|
+
|
|
|
+ private val filter_ele = Seq(
|
|
|
+ "company_dishonest_info"
|
|
|
+ , "company_dishonest_info_human"
|
|
|
+ )
|
|
|
+
|
|
|
+ private def is_con(s: String): Boolean = {
|
|
|
+ for (e <- filter_ele)
|
|
|
+ if (s.startsWith(e))
|
|
|
+ return true
|
|
|
+ false
|
|
|
+ }
|
|
|
+
|
|
|
+ private val m = Map("company_dishonest_info" -> "失信人"
|
|
|
+ , "company_dishonest_info_human" -> "失信人"
|
|
|
+ )
|
|
|
+
|
|
|
+ private val ids_m = Map("company_dishonest_info" -> "1"
|
|
|
+ , "company_dishonest_info_human" -> "2")
|
|
|
+
|
|
|
+ class person_agg_label extends UserDefinedAggregateFunction {
|
|
|
+
|
|
|
+ private def getMax(str1: String, str2: String): String = {
|
|
|
+ if (StringUtils.isEmpty(str1)) {
|
|
|
+ return str2
|
|
|
+ }
|
|
|
+ if (StringUtils.isEmpty(str2)) {
|
|
|
+ return str1
|
|
|
+ }
|
|
|
+ Seq(str1, str2).max
|
|
|
+ }
|
|
|
+
|
|
|
+ override def inputSchema: StructType = StructType(Array[StructField](
|
|
|
+ StructField("rowkey", DataTypes.StringType)
|
|
|
+ , StructField("tn", DataTypes.StringType)
|
|
|
+ , StructField("deleted", DataTypes.StringType)
|
|
|
+ , StructField("publish_date", DataTypes.StringType)
|
|
|
+ ))
|
|
|
+
|
|
|
+
|
|
|
+ override def bufferSchema: StructType = StructType(Array(
|
|
|
+ StructField("ids", ArrayType(StringType, containsNull = false))
|
|
|
+ , StructField("pub_date", StringType)
|
|
|
+ ))
|
|
|
+
|
|
|
+ override def dataType: DataType = DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType)
|
|
|
+
|
|
|
+ override def deterministic: Boolean = false
|
|
|
+
|
|
|
+ override def initialize(buffer: MutableAggregationBuffer): Unit = {
|
|
|
+ buffer.update(0, Seq.empty[String])
|
|
|
+ buffer.update(1, null)
|
|
|
+ }
|
|
|
+
|
|
|
+ override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
|
|
|
+ val rowkey = input.getString(0)
|
|
|
+ val tn = input.getString(1)
|
|
|
+ val deleted = input.getString(2)
|
|
|
+ val publish_date = DateUtils.toMillisTimestamp(input.getString(3))
|
|
|
+ if (deleted.equals("0")) {
|
|
|
+ buffer(0) = s"$tn@@$rowkey" +: buffer.getSeq[String](0)
|
|
|
+ buffer(1) = getMax(publish_date, buffer.getString(1))
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
|
|
|
+ buffer1(0) = buffer1.getSeq[String](0) ++ buffer2.getSeq[String](0)
|
|
|
+ buffer1(1) = getMax(buffer1.getString(1), buffer2.getString(1))
|
|
|
+ }
|
|
|
+
|
|
|
+ override def evaluate(buffer: Row): Any = {
|
|
|
+ val li = buffer.getSeq[String](0)
|
|
|
+ val publish_date = buffer.getString(1)
|
|
|
+ val ids = li
|
|
|
+ .filter(is_con)
|
|
|
+ .map(s => {
|
|
|
+ val strings = s.split("@@")
|
|
|
+ s"${ids_m(strings(0))}@@${strings(1)}"
|
|
|
+ }).mkString(",")
|
|
|
+
|
|
|
+ val labels = li.map(s => s.split("@@")(0)).map(s => m.getOrElse(s, null)).toSet.mkString(",")
|
|
|
+ if (StringUtils.isEmpty(ids)) {
|
|
|
+ Map(
|
|
|
+ "ids" -> null
|
|
|
+ , "deleted" -> "1"
|
|
|
+ , "labels" -> null
|
|
|
+ , "publish_date" -> null
|
|
|
+ )
|
|
|
+ } else {
|
|
|
+ Map(
|
|
|
+ "ids" -> ids
|
|
|
+ , "deleted" -> "0"
|
|
|
+ , "labels" -> labels
|
|
|
+ , "publish_date" -> publish_date
|
|
|
+ )
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private def get_empty_map(rowkey: String, tn: String, deleted: String, publish_date: String): Map[String, String] = Map(
|
|
|
+ "ids" -> s"${ids_m(tn)}@@$rowkey"
|
|
|
+ , "deleted" -> deleted
|
|
|
+ , "labels" -> s"${m(tn)}"
|
|
|
+ , "publish_date" -> s"${DateUtils.toMillisTimestamp(publish_date)}"
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
+ private def get_gender(card_num: String): Int = {
|
|
|
+ if (StringUtils.isEmpty(card_num)) {
|
|
|
+ return -1
|
|
|
+ }
|
|
|
+
|
|
|
+ card_num.substring(16, 17).toInt % 2
|
|
|
+ }
|
|
|
+
|
|
|
+ private def get_birth_year(card_num: String) = card_num.substring(6, 10)
|
|
|
+
|
|
|
+ def init(): Unit = {
|
|
|
+ println(
|
|
|
+ s"""
|
|
|
+ |CREATE TABLE IF NOT EXISTS winhc_eci_dev.ads_deadbeat_person_out
|
|
|
+ |(
|
|
|
+ | id STRING COMMENT 'es id'
|
|
|
+ | ,name STRING COMMENT '姓名'
|
|
|
+ | ,card_num STRING COMMENT '身份证号'
|
|
|
+ | ,birth_year STRING COMMENT '出生年'
|
|
|
+ | ,gender STRING COMMENT '性别 -1未知 1男 0女'
|
|
|
+ | ,province STRING COMMENT '省'
|
|
|
+ | ,city STRING COMMENT '市'
|
|
|
+ | ,district STRING COMMENT '区县'
|
|
|
+ | ,ids STRING COMMENT '祥情页id'
|
|
|
+ | ,label STRING COMMENT '标签'
|
|
|
+ | ,deleted STRING COMMENT '是否移除'
|
|
|
+ |)
|
|
|
+ |COMMENT 'TABLE COMMENT'
|
|
|
+ |PARTITIONED BY (ds STRING COMMENT '分区')
|
|
|
+ |""".stripMargin)
|
|
|
+
|
|
|
+ println(
|
|
|
+ s"""
|
|
|
+ |
|
|
|
+ |CREATE TABLE IF NOT EXISTS winhc_eci_dev.ads_deadbeat_company_out
|
|
|
+ |(
|
|
|
+ | id STRING COMMENT 'es 索引id'
|
|
|
+ | ,cid STRING COMMENT '公司cid'
|
|
|
+ | ,name STRING COMMENT '公司名称'
|
|
|
+ | ,card_num STRING COMMENT '组织机构代码'
|
|
|
+ | ,legal_entity_id STRING COMMENT '法人id'
|
|
|
+ | ,legal_entity_name STRING COMMENT '法人名字'
|
|
|
+ | ,legal_entity_type STRING COMMENT '法人类型 公司或人'
|
|
|
+ | ,reg_capital STRING COMMENT '注册资本,可视化字段'
|
|
|
+ | ,reg_capital_amount STRING COMMENT '注册资本,整数到小数点后两位'
|
|
|
+ | ,reg_capital_currency STRING COMMENT '注册资本单位'
|
|
|
+ | ,estiblish_time STRING COMMENT '注册时间'
|
|
|
+ | ,logo STRING COMMENT 'log url'
|
|
|
+ | ,province STRING COMMENT '省'
|
|
|
+ | ,city STRING COMMENT '市'
|
|
|
+ | ,district STRING COMMENT '区'
|
|
|
+ | ,publish_date STRING COMMENT '最后一次公布时间'
|
|
|
+ | ,ids STRING COMMENT '祥情页id'
|
|
|
+ | ,LABEL STRING COMMENT '标签'
|
|
|
+ | ,deleted STRING COMMENT '是否移除'
|
|
|
+ |)
|
|
|
+ |COMMENT '查失信 查被执,企业表'
|
|
|
+ |PARTITIONED BY
|
|
|
+ |(
|
|
|
+ | ds STRING COMMENT '分区'
|
|
|
+ |)
|
|
|
+ |""".stripMargin)
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ def reg_udf(): Unit = {
|
|
|
+ cleanup()
|
|
|
+ code2Name()
|
|
|
+ spark.udf.register("get_gender", get_gender _)
|
|
|
+ spark.udf.register("get_birth_year", get_birth_year _)
|
|
|
+ spark.udf.register("agg_label", new person_agg_label)
|
|
|
+ spark.udf.register("get_empty_map", get_empty_map _)
|
|
|
+
|
|
|
+ def toTime(str: String): String = DateUtils.toMillisTimestamp(str, pattern = "yyyy-MM-dd HH:mm:ss")
|
|
|
+
|
|
|
+ spark.udf.register("to_millis_timestamp", toTime _)
|
|
|
+ }
|
|
|
+
|
|
|
+ def person(): Unit = {
|
|
|
+ val target_tab = s"${getEnvProjectName(env, project)}.ads_deadbeat_person_out"
|
|
|
+ val org_tab = s"$project.ads_deadbeat_person"
|
|
|
+ val org_last_ds = getLastPartitionsOrElse(org_tab, "0")
|
|
|
+ val target_last_ds = getLastPartitionsOrElse(target_tab, "0")
|
|
|
+
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |INSERT OVERWRITE TABLE $target_tab PARTITION(ds='$org_last_ds')
|
|
|
+ |SELECT id
|
|
|
+ | ,name
|
|
|
+ | ,card_num
|
|
|
+ | ,birth_year
|
|
|
+ | ,gender
|
|
|
+ | ,province
|
|
|
+ | ,city
|
|
|
+ | ,district
|
|
|
+ | ,labels['ids'] as ids
|
|
|
+ | ,labels['labels'] as label
|
|
|
+ | ,labels['publish_date'] as publish_date
|
|
|
+ | ,labels['deleted'] as deleted
|
|
|
+ |FROM (
|
|
|
+ | SELECT md5(cleanup(CONCAT_WS('',card_num,name))) AS id
|
|
|
+ | ,name
|
|
|
+ | ,card_num
|
|
|
+ | ,get_birth_year(card_num) AS birth_year
|
|
|
+ | ,get_gender(card_num) AS gender
|
|
|
+ | ,get_province_name_pro(SUBSTRING(card_num,0,2)) AS province
|
|
|
+ | ,get_city_name(SUBSTRING(card_num,0,6)) AS city
|
|
|
+ | ,get_county_name(SUBSTRING(card_num,0,6)) AS district
|
|
|
+ | ,agg_label(rowkey,tn,deleted,publish_date) AS labels
|
|
|
+ | FROM winhc_eci_dev.ads_deadbeat_person
|
|
|
+ | WHERE ds > $target_last_ds
|
|
|
+ | AND card_num IS NOT NULL
|
|
|
+ | GROUP BY name
|
|
|
+ | ,card_num
|
|
|
+ | UNION ALL
|
|
|
+ | SELECT md5(cleanup(CONCAT_WS('',rowkey,name))) AS id
|
|
|
+ | ,name
|
|
|
+ | ,card_num
|
|
|
+ | ,NULL AS birth_year
|
|
|
+ | ,NULL AS gender
|
|
|
+ | ,NULL AS province
|
|
|
+ | ,NULL AS city
|
|
|
+ | ,NULL AS district
|
|
|
+ | ,get_empty_map(rowkey,tn,deleted,publish_date) AS labels
|
|
|
+ | FROM winhc_eci_dev.ads_deadbeat_person
|
|
|
+ | WHERE ds > $target_last_ds
|
|
|
+ | AND card_num IS NULL
|
|
|
+ | )
|
|
|
+ |""".stripMargin)
|
|
|
+ // .show(10000)
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ def company(): Unit = {
|
|
|
+ val target_tab = s"${getEnvProjectName(env, project)}.ads_deadbeat_company_out"
|
|
|
+ val org_tab = s"$project.ads_deadbeat_company"
|
|
|
+ val org_last_ds = getLastPartitionsOrElse(org_tab, "0")
|
|
|
+ val target_last_ds = getLastPartitionsOrElse(target_tab, "0")
|
|
|
+
|
|
|
+ val company_last_ds = getLastPartitionsOrElse(s"$project.ads_company", "0")
|
|
|
+ val intersect_company_cols = getColumns(s"$project.ads_company").intersect(getColumns(s"$project.inc_ads_company"))
|
|
|
+
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |SELECT *
|
|
|
+ |FROM (
|
|
|
+ | SELECT *
|
|
|
+ | ,ROW_NUMBER() OVER(PARTITION BY cid ORDER BY ds DESC ) AS num
|
|
|
+ | FROM (
|
|
|
+ | SELECT ${intersect_company_cols.mkString(",")}
|
|
|
+ | FROM winhc_eci_dev.ads_company
|
|
|
+ | WHERE ds = '$company_last_ds'
|
|
|
+ | UNION ALL
|
|
|
+ | SELECT ${intersect_company_cols.mkString(",")}
|
|
|
+ | FROM winhc_eci_dev.inc_ads_company
|
|
|
+ | WHERE ds > '$company_last_ds'
|
|
|
+ | ) AS t1
|
|
|
+ | ) AS t2
|
|
|
+ |WHERE t2.num = 1
|
|
|
+ |""".stripMargin)
|
|
|
+ .createOrReplaceTempView("tmp_company_all")
|
|
|
+
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |SELECT t2.cid as id
|
|
|
+ | ,t2.cid
|
|
|
+ | ,t3.name
|
|
|
+ | ,t3.org_number AS card_num
|
|
|
+ | ,t3.legal_entity_id
|
|
|
+ | ,null as legal_entity_name
|
|
|
+ | ,t3.legal_entity_type
|
|
|
+ | ,t3.reg_capital
|
|
|
+ | ,t3.reg_capital_amount
|
|
|
+ | ,t3.reg_capital_currency
|
|
|
+ | ,to_millis_timestamp(t3.estiblish_time) as estiblish_time
|
|
|
+ | ,t3.logo
|
|
|
+ | ,get_province_name(t3.area_code) as province
|
|
|
+ | ,get_city_name(t3.area_code) as city
|
|
|
+ | ,get_county_name(t3.area_code) as district
|
|
|
+ | ,t2.publish_date
|
|
|
+ | ,t2.ids
|
|
|
+ | ,t2.label
|
|
|
+ | ,t2.deleted
|
|
|
+ |FROM (
|
|
|
+ | SELECT cid
|
|
|
+ | ,labels['deleted'] AS deleted
|
|
|
+ | ,labels['ids'] AS ids
|
|
|
+ | ,labels['labels'] AS label
|
|
|
+ | ,labels['publish_date'] AS publish_date
|
|
|
+ | FROM (
|
|
|
+ | SELECT cid
|
|
|
+ | ,agg_label(rowkey,tn,deleted,publish_date) AS labels
|
|
|
+ | FROM winhc_eci_dev.ads_deadbeat_company
|
|
|
+ | WHERE ds > '$target_last_ds'
|
|
|
+ | AND cid IS NOT NULL
|
|
|
+ | GROUP BY cid
|
|
|
+ | ) AS t1
|
|
|
+ | ) AS t2
|
|
|
+ |JOIN tmp_company_all AS t3
|
|
|
+ |ON t2.cid = t3.cid
|
|
|
+ |""".stripMargin)
|
|
|
+ .createOrReplaceTempView("ads_deadbeat_company_out_tmp")
|
|
|
+
|
|
|
+
|
|
|
+ val human_last_ds = getLastPartitionsOrElse("winhc_eci_dev.ads_company_human_relation", "0")
|
|
|
+
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |SELECT rowkey
|
|
|
+ | ,human_name
|
|
|
+ |FROM (
|
|
|
+ | SELECT *
|
|
|
+ | ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds DESC ) AS num
|
|
|
+ | FROM (
|
|
|
+ | SELECT rowkey
|
|
|
+ | ,human_name
|
|
|
+ | ,ds
|
|
|
+ | FROM winhc_eci_dev.ads_company_human_relation
|
|
|
+ | WHERE ds = '$human_last_ds'
|
|
|
+ | UNION ALL
|
|
|
+ | SELECT rowkey
|
|
|
+ | ,human_name
|
|
|
+ | ,ds
|
|
|
+ | FROM winhc_eci_dev.inc_ads_company_human_relation
|
|
|
+ | WHERE ds > '$human_last_ds'
|
|
|
+ | ) AS t1
|
|
|
+ | ) AS t2
|
|
|
+ |WHERE t2.num = 1
|
|
|
+ |""".stripMargin)
|
|
|
+ .createOrReplaceTempView("human_all_tmp")
|
|
|
+
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |INSERT OVERWRITE TABLE $target_tab PARTITION(ds='$org_last_ds')
|
|
|
+ |SELECT t1.id
|
|
|
+ | ,t1.cid
|
|
|
+ | ,t1.name
|
|
|
+ | ,t1.card_num
|
|
|
+ | ,t1.legal_entity_id
|
|
|
+ | ,t2.human_name AS legal_entity_name
|
|
|
+ | ,t1.legal_entity_type
|
|
|
+ | ,t1.reg_capital
|
|
|
+ | ,t1.reg_capital_amount
|
|
|
+ | ,t1.reg_capital_currency
|
|
|
+ | ,t1.estiblish_time
|
|
|
+ | ,t1.logo
|
|
|
+ | ,t1.province
|
|
|
+ | ,t1.city
|
|
|
+ | ,t1.district
|
|
|
+ | ,t1.publish_date
|
|
|
+ | ,t1.ids
|
|
|
+ | ,t1.LABEL
|
|
|
+ | ,t1.deleted
|
|
|
+ |FROM (
|
|
|
+ | SELECT *
|
|
|
+ | FROM ads_deadbeat_company_out_tmp
|
|
|
+ | WHERE legal_entity_type = 1
|
|
|
+ | ) AS t1
|
|
|
+ |LEFT JOIN (
|
|
|
+ | SELECT *
|
|
|
+ | FROM human_all_tmp
|
|
|
+ | ) AS t2
|
|
|
+ |ON CONCAT_WS('_',cid,legal_entity_id) = t2.rowkey
|
|
|
+ |UNION ALL
|
|
|
+ |SELECT t1.id
|
|
|
+ | ,t1.cid
|
|
|
+ | ,t1.name
|
|
|
+ | ,t1.card_num
|
|
|
+ | ,t1.legal_entity_id
|
|
|
+ | ,t2.cname AS legal_entity_name
|
|
|
+ | ,t1.legal_entity_type
|
|
|
+ | ,t1.reg_capital
|
|
|
+ | ,t1.reg_capital_amount
|
|
|
+ | ,t1.reg_capital_currency
|
|
|
+ | ,t1.estiblish_time
|
|
|
+ | ,t1.logo
|
|
|
+ | ,t1.province
|
|
|
+ | ,t1.city
|
|
|
+ | ,t1.district
|
|
|
+ | ,t1.publish_date
|
|
|
+ | ,t1.ids
|
|
|
+ | ,t1.LABEL
|
|
|
+ | ,t1.deleted
|
|
|
+ |FROM (
|
|
|
+ | SELECT *
|
|
|
+ | FROM ads_deadbeat_company_out_tmp
|
|
|
+ | WHERE legal_entity_type = 2
|
|
|
+ | ) AS t1
|
|
|
+ |LEFT JOIN (
|
|
|
+ | SELECT cid,name as cname
|
|
|
+ | FROM tmp_company_all
|
|
|
+ | ) AS t2
|
|
|
+ |ON legal_entity_id = t2.cid
|
|
|
+ |""".stripMargin)
|
|
|
+ // .show(1000)
|
|
|
+
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+object deadbeat_info {
|
|
|
+ def main(args: Array[String]): Unit = {
|
|
|
+ val config = mutable.Map(
|
|
|
+ "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
|
|
|
+ "spark.hadoop.odps.spark.local.partition.amt" -> "10000"
|
|
|
+ )
|
|
|
+ val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
|
|
|
+ val di = deadbeat_info(spark, "winhc_eci_dev")
|
|
|
+ di.reg_udf()
|
|
|
+ di.person()
|
|
|
+ di.company()
|
|
|
+ spark.stop()
|
|
|
+ }
|
|
|
+
|
|
|
+}
|