Procházet zdrojové kódy

feat: 添加天眼查数据对比

许家凯 před 2 roky
rodič
revize
10209a8aab

+ 349 - 0
src/main/scala/com/winhc/bigdata/spark/ng/jobs/V7DataCompare.scala

@@ -0,0 +1,349 @@
+package com.winhc.bigdata.spark.ng.jobs
+
+import com.winhc.bigdata.spark.config.EsConfig
+import com.winhc.bigdata.spark.implicits.Bool._
+import com.winhc.bigdata.spark.udf.{BaseFunc, CompanyIndexFunc}
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.{DataFrame, SparkSession}
+
+import java.security.MessageDigest
+import java.text.DecimalFormat
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/12/30 15:44
+ */
+case class V7DataCompare(s: SparkSession
+                        ) extends LoggingUtils with BaseFunc with CompanyIndexFunc {
+  @(transient@getter) val spark: SparkSession = s
+
+  private val out_tab = "winhc_eci.out_v7_tyc_compare_data"
+
+
+  def init(): Unit = {
+    def toHex(bytes: Array[Byte]): String = bytes.map("%02x".format(_)).mkString("")
+
+    def md5(s: String): String = toHex(MessageDigest.getInstance("MD5").digest(s.getBytes("UTF-8")))
+
+    def customMd5(cols: Seq[String]): String = {
+      cols.map(s => (s == null) ? "" | s).mkString("\001")
+    }
+
+    spark.udf.register("custom_md5", customMd5 _)
+    val df = new DecimalFormat("00.00")
+
+    def amount_format(amount: String): String = {
+      if (StringUtils.isBlank(amount)) null else {
+        try {
+          df.format(amount.toDouble)
+        }
+        catch {
+          case ex: Exception => {
+            amount
+          }
+        }
+      }
+    }
+
+    spark.udf.register("amount_format", amount_format _)
+
+  }
+
+  init()
+
+  def pre(): Unit = {
+    val v7_company_tuple = generateAllTabSql(tableName = "company", project = "winhc_eci_dev", partitionBy = "cid")
+    val v7_hid_tuple = generateAllTabSql(tableName = "company_human_relation", project = "winhc_eci_dev", partitionBy = "hid")
+    val company_tuple = generateAllTabSql(tableName = "company", project = "winhc_ng")
+    //    def my_split_name(name: String, history_name: String): Seq[String] = {
+    //      val strings = CompanyIndexUtils.getSplit(history_name) :+ name
+    //      strings.filter(StringUtils.isNotBlank).map(BaseUtil.cleanup).filter(StringUtils.isNotBlank).distinct
+    //    }
+    //
+    //    spark.udf.register("split_name", my_split_name _)
+
+
+    sql(
+      s"""
+         |CREATE TABLE IF NOT EXISTS winhc_ng.tmp_cid_company_id_mapping
+         |(
+         |    company_id  STRING COMMENT 'FIELD'
+         |    ,cid STRING COMMENT 'FIELD'
+         |    ,name STRING COMMENT 'FIELD'
+         |
+         |)
+         |COMMENT 'TABLE COMMENT'
+         |PARTITIONED BY (ds STRING COMMENT '分区')
+         |LIFECYCLE 7
+         |""".stripMargin)
+    sql(
+      s"""
+         |CREATE TABLE IF NOT EXISTS winhc_ng.tmp_hid_name_mapping
+         |(
+         |    id STRING COMMENT 'cid or id'
+         |    ,name STRING COMMENT 'FIELD'
+         |    ,entity_type bigint comment '2:公司 1:人'
+         |)
+         |COMMENT 'TABLE COMMENT'
+         |PARTITIONED BY (ds STRING COMMENT '分区')
+         |LIFECYCLE 7
+         |""".stripMargin)
+
+    if (!BaseUtil.getYesterday().equals(getLastPartitionsOrElse("winhc_ng.tmp_cid_company_id_mapping", null))) {
+      sql(
+        s"""
+           |INSERT OVERWRITE TABLE winhc_ng.tmp_cid_company_id_mapping PARTITION (ds = '${BaseUtil.getYesterday()}')
+           |select  company_id
+           |        ,cid
+           |        ,name
+           |from (
+           |select *
+           |       ,ROW_NUMBER() OVER(PARTITION BY company_id,cid ORDER BY company_id DESC) AS xjk_num
+           |from (
+           |SELECT  t1.company_id
+           |        ,t2.cid
+           |        ,t1.name
+           |FROM    (
+           |         SELECT  company_id
+           |                 ,reg_number
+           |                 ,name
+           |         FROM    (
+           |                    ${company_tuple._1}
+           |                 )
+           |         where reg_number is not null and reg_number <> '' and trim(reg_number)<>''
+           |        ) AS t1
+           |join (
+           |         SELECT  cid
+           |                 ,reg_number
+           |         FROM    (
+           |                 ${v7_company_tuple._1}
+           |                 )
+           |         where reg_number is not null and reg_number <> '' and trim(reg_number)<>''
+           |     ) AS t2
+           |ON      t1.reg_number = t2.reg_number
+           |UNION ALL
+           |SELECT  t1.company_id
+           |        ,t2.cid
+           |        ,t1.name
+           |FROM    (
+           |         SELECT  company_id
+           |                 ,credit_code
+           |                 ,name
+           |         FROM    (
+           |                    ${company_tuple._1}
+           |                 )
+           |         where credit_code is not null and credit_code <> '' and trim(credit_code)<>''
+           |        ) AS t1
+           |join (
+           |         SELECT  cid
+           |                 ,credit_code
+           |         FROM    (
+           |                 ${v7_company_tuple._1}
+           |                 )
+           |         where credit_code is not null and credit_code <> '' and trim(credit_code)<>''
+           |     ) AS t2
+           |ON      t1.credit_code = t2.credit_code
+           |)
+           |)
+           |where xjk_num = 1
+           |""".stripMargin)
+    }
+
+
+    if (!BaseUtil.getYesterday().equals(getLastPartitionsOrElse("winhc_ng.tmp_hid_name_mapping", null))) {
+      sql(
+        s"""
+           |INSERT OVERWRITE TABLE winhc_ng.tmp_hid_name_mapping PARTITION (ds = '${BaseUtil.getYesterday()}')
+           |select cid as id
+           |       ,name
+           |       ,2 as entity_type
+           |from   (
+           |       ${v7_company_tuple._1}
+           |)
+           |UNION ALL
+           |select hid as id
+           |       ,human_name as name
+           |       ,1 as entity_type
+           |from   (
+           |       ${v7_hid_tuple._1}
+           |)
+           |""".stripMargin)
+
+    }
+  }
+
+
+  def get_inc_ods(tn: String): String = {
+
+    tn match {
+      case "company" => {
+        s"""
+           |
+           |select gt1.*,gt2.name as legal_entity_name from (
+           |       select * from winhc_eci_dev.inc_ods_$tn where ds = '${BaseUtil.getYesterday()}'
+           |) as gt1
+           |left join (
+           |       select * from winhc_ng.tmp_hid_name_mapping where ds = '${BaseUtil.getYesterday()}'
+           |) as gt2
+           |on gt1.legal_entity_id = gt2.id and gt1.legal_entity_type = gt2.entity_type
+           |""".stripMargin
+      }
+      case "company_holder" => {
+        s"""
+           |
+           |select gt1.*,gt2.name as holder_name from (
+           |       select * from winhc_eci_dev.inc_ods_$tn where ds = '${BaseUtil.getYesterday()}'
+           |) as gt1
+           |left join (
+           |       select * from winhc_ng.tmp_hid_name_mapping where ds = '${BaseUtil.getYesterday()}'
+           |) as gt2
+           |on gt1.holder_id = gt2.id and gt1.holder_type = gt2.entity_type
+           |""".stripMargin
+      }
+      case "company_staff" => {
+        s"""
+           |
+           |select gt1.*,gt2.name as staff_name from (
+           |       select * from winhc_eci_dev.inc_ods_$tn where ds = '${BaseUtil.getYesterday()}'
+           |) as gt1
+           |left join (
+           |       select * from winhc_ng.tmp_hid_name_mapping where ds = '${BaseUtil.getYesterday()}' and entity_type = 1
+           |) as gt2
+           |on gt1.hid = gt2.id
+           |""".stripMargin
+      }
+      case _ => {
+        s"select * from winhc_eci_dev.inc_ods_$tn where ds = '${BaseUtil.getYesterday()}'"
+      }
+    }
+  }
+
+  def calc(tn: String, compare_cols: Seq[String]): DataFrame = {
+    val tuple = generateAllTabSql(tableName = tn, project = "winhc_ng")
+    val array = compare_cols.map(s => {
+      if (s.endsWith(")")) s else "cast(" + s + " as string)"
+    }).mkString(",")
+    val cols_md5 = s"concat('$tn-_',custom_md5(array($array)))"
+    sql(
+      s"""
+         |select t1.company_id
+         |       ,t1.cid
+         |       ,t1.cols_md5 as cid_cols_md5
+         |from (
+         |    select company_id,cid,$cols_md5 as cols_md5 from (
+         |         select tt1.*,tt2.company_id from (
+         |                 ${get_inc_ods(tn)}
+         |         ) as tt1
+         |         join (
+         |              select * from winhc_ng.tmp_cid_company_id_mapping where ds = '${BaseUtil.getYesterday()}'
+         |         ) as tt2
+         |         on tt1.cid = tt2.cid
+         |    )
+         |) as t1
+         |left anti join (
+         |    select company_id,$cols_md5 as cols_md5 from (
+         |        ${tuple._1}
+         |    )
+         |) as t2
+         |on t1.company_id = t2.company_id and t1.cols_md5 = t2.cols_md5
+         |""".stripMargin)
+  }
+
+
+  def get_ods_cid(tn: String): DataFrame = {
+    sql(
+      s"""
+         |select DISTINCT cid from winhc_eci_dev.inc_ods_$tn where ds = '${BaseUtil.getYesterday()}'
+         |""".stripMargin)
+  }
+
+
+  def run(stringToSeq: Map[String, Seq[String]]): Unit = {
+    pre()
+    val r_f = Seq("crawled_time", "create_time", "update_time", "ds", "parent_company_id")
+    var df: DataFrame = null
+
+    var cid_df: DataFrame = null
+
+    for (elem <- stringToSeq) {
+      val seq: Seq[String] = {
+        if (elem._2.isEmpty)
+          (getColumns("winhc_ng.ads_" + elem._1).toSet & getColumns("winhc_eci_dev.ods_" + elem._1).toSet -- r_f.toSet).toSeq
+        else elem._2
+      }
+      if (df == null) {
+        df = calc(elem._1, seq)
+      } else {
+        df = df.union(calc(elem._1, seq))
+      }
+
+      if (cid_df == null) cid_df = get_ods_cid(elem._1)
+      else cid_df = cid_df.union(get_ods_cid(elem._1))
+    }
+
+    cid_df.createTempView("tmp_out_cid_df")
+    df.createTempView("tmp_out_tab")
+    sql(
+      s"""
+         |CREATE TABLE IF NOT EXISTS $out_tab
+         |(
+         |    company_id  STRING COMMENT 'FIELD'
+         |    ,cid STRING COMMENT 'FIELD'
+         |    ,cid_cols_md5 STRING COMMENT 'FIELD'
+         |)
+         |COMMENT 'TABLE COMMENT'
+         |PARTITIONED BY (ds STRING COMMENT '分区')
+         |LIFECYCLE 7
+         |""".stripMargin)
+
+    sql(
+      s"""
+         |INSERT OVERWRITE TABLE $out_tab PARTITION (ds = '${BaseUtil.getYesterday()}')
+         |select company_id
+         |       ,cid
+         |       ,cid_cols_md5
+         |from   (
+         |       select *
+         |              ,ROW_NUMBER() OVER(PARTITION BY company_id,cid ORDER BY company_id DESC) AS xjk_num
+         |       from tmp_out_tab
+         |)
+         |where xjk_num = 1
+         |UNION ALL
+         |select null as company_id
+         |       ,t1.cid
+         |       ,null as cid_cols_md5
+         |from (select DISTINCT cid from tmp_out_cid_df) as t1
+         |left anti join (
+         |       select * from winhc_ng.tmp_cid_company_id_mapping where ds = '${BaseUtil.getYesterday()}'
+         |) as t2
+         |on t1.cid = t2.cid
+         |""".stripMargin)
+  }
+
+
+}
+
+object V7DataCompare {
+
+  val stringToSeq: Map[String, Seq[String]] = Map(
+    "company" -> Seq("name", "legal_entity_name", "reg_location", "split(cast(approved_time as string),' ')[0]", "amount_format(reg_capital_amount)", "reg_status", "business_scope")
+    , "company_holder" -> Seq("holder_name", "amount_format(amount)")
+    , "company_staff" -> Seq("staff_name", "staff_type")
+    , "company_change" -> Seq("split(cast(change_time as string),' ')[0]")
+  )
+
+
+  def main(args: Array[String]): Unit = {
+    val config = EsConfig.getEsConfigMap ++ mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
+      "spark.debug.maxToStringFields" -> "200",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "100"
+    )
+    val spark = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    V7DataCompare(spark).run(stringToSeq)
+    spark.stop()
+  }
+}