浏览代码

add:cid与new_cid的mapping表
fix:log在rdd内部的空指针
企业变更通用程序的cid和new_cid兼容

许家凯 4 年之前
父节点
当前提交
64cc4c48b8

+ 15 - 20
src/main/scala/com/winhc/bigdata/spark/jobs/chance/ChangeExtract.scala

@@ -69,29 +69,19 @@ object ChangeExtract {
       //增量ads最后一个分区
       val lastDsIncAds = BaseUtil.getPartion(s"$project.inc_ads_$tableName", spark)
 
-      val list = sql(s"show partitions $inc_ods_company_tb").collect.toList.map(_.getString(0).split("=")(1))
-      //增量ods第一个分区
-      val firstDsIncOds = list.head
-      //增量ods最后一个分区//落表分区
-      val lastDsIncOds = list.last
-      //执行分区
-      var runDs = ""
-      //第一次run
-      if (StringUtils.isBlank(lastDsIncAds)) {
-        runDs = firstDsIncOds
-      } else { //非第一次分区时间 + 1天
-        runDs = BaseUtil.atDaysAfter(1, lastDsIncAds)
-      }
       sql(
         s"""
            |SELECT  cid,current_cid as new_cid
            |FROM    ${inc_ods_company}
-           |WHERE   ds >= ${runDs}
+           |WHERE   ds > $lastDs_ads_all and ds < $ds
            |AND     cid IS NOT NULL
            |AND     current_cid IS NOT NULL
            |GROUP BY cid,current_cid
            |""".stripMargin).createOrReplaceTempView("mapping")
 
+
+      val cid = getColumns(s"$project.ads_$tableName").filter(f => f.equals("cid") || f.equals("new_cid")).max
+
       val rdd = sql(
         s"""
            |SELECT  $primaryKey,${otherAllCols.mkString(",")},'0' as change_flag
@@ -105,10 +95,14 @@ object ChangeExtract {
            |            WHERE   ds = $ds
            |        ) AS t1
            |JOIN    (
-           |             SELECT  concat_ws('_',coalesce(mm.new_cid,tmp.cid),split(rowkey, '_')[1]) AS rowkey
-           |                     ,${intersectCols.filter(s => {!s.equals("rowkey") && !s.equals("cid") && !s.equals("new_cid")}).mkString(",")}
-           |                     ,coalesce(mm.new_cid,tmp.cid) AS new_cid
-           |                     ,tmp.cid
+           |             SELECT  concat_ws('_',coalesce(mm.new_cid,tmp.$cid),split(rowkey, '_')[1]) AS rowkey
+           |                     ,${
+          intersectCols.filter(s => {
+            !s.equals("rowkey") && !s.equals("cid") && !s.equals("new_cid")
+          }).mkString(",")
+        }
+           |                     ,coalesce(mm.new_cid,tmp.$cid) AS new_cid
+           |                     ,tmp.$cid as cid
            |                     ,c
            |             FROM    (
            |                         SELECT  a.*
@@ -124,7 +118,7 @@ object ChangeExtract {
            |                                 ) AS a
            |                     ) AS tmp
            |             LEFT JOIN mapping mm
-           |             ON tmp.cid = mm.cid
+           |             ON tmp.$cid = mm.cid
            |             WHERE   tmp.c = 1
            |        ) AS t2
            |ON      t1.${primaryKey} = t2.${primaryKey}
@@ -142,7 +136,7 @@ object ChangeExtract {
             Row(res._1, res._2, tableName, res._3, res._4, res._5, res._6, res._7, res._8, update_time, res._9)
           } else {
             if (map_list.size > 2) {
-              logger.error("list.size greater than 2! rowkey:" + rowkey)
+              logInfo("list.size greater than 2! rowkey:" + rowkey)
             }
             val m = getDoubleDataMap(map_list)
 
@@ -241,6 +235,7 @@ object ChangeExtract {
           |winhc_eci_dev company_land_transfer rowkey 20200717 num,location
           |winhc_eci_dev company_employment rowkey 20200717 source
           |winhc_eci_dev company_env_punishment rowkey 20200717 punish_number
+          |winhc_eci_dev company_icp rowkey 20200717 domain
           |""".stripMargin.replace("20200717", ds)
       for (r <- rows.split("\r\n")) {
         if (StringUtils.isNotEmpty(r)) {

+ 3 - 3
src/main/scala/com/winhc/bigdata/spark/jobs/chance/CompanyChangeHandle.scala

@@ -157,11 +157,11 @@ case class company_copyright_reg_list(equCols: Seq[String]) extends CompanyChang
 
 //网站
 case class company_icp(equCols: Seq[String]) extends CompanyChangeHandle {
-  override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("domain"), s"${newMap("domain")}网站发生变更")
+  override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("domain"), s"${newMap("domain")}网站备案发生变更")
 
-  override def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("domain"), s"新增${newMap("domain")}网站")
+  override def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("domain"), s"新增${newMap("domain")}网站备案")
 
-  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.get_ip_tags("网站", newMap("domain"), newMap("examine_date"), newMap("liscense"))
+  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.get_ip_tags("网站备案", newMap("domain"), newMap("examine_date"), newMap("liscense"))
 
   override def getBizTime(newMap: Map[String, String]): String = newMap("examine_date")
 }

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/chance/Inc_eci_debtor_relation.scala

@@ -705,7 +705,7 @@ object Inc_eci_debtor_relation {
             list.head.toRow
           } else {
             if (list.size > 2) {
-              logger.warn("list.size >2 ,id:" + id)
+             logWarning("list.size >2 ,id:" + id)
             }
             val all_map = list.map(e => (e.ds, e)).toMap
             val today = all_map(ds)

+ 43 - 5
src/main/scala/com/winhc/bigdata/spark/jobs/inc_company_equity_info.scala

@@ -22,7 +22,7 @@ object inc_company_equity_info {
   case class IncCompanyEquityInfoUtils(s: SparkSession,
                                        project: String, //表所在工程名
                                        ds: String //此维度主键
-                                      ) extends LoggingUtils with Logging with BaseFunc{
+                                      ) extends LoggingUtils with Logging with BaseFunc {
     @(transient@getter) val spark: SparkSession = s
 
 
@@ -30,8 +30,7 @@ object inc_company_equity_info {
       cleanup()
       val cols = getColumns(s"winhc_eci_dev.ads_company_equity_info").diff(Seq("ds", "rowkey", "id"))
 
-//      val startPart = getLastPartitionsOrElse(s"winhc_eci_dev.inc_ads_company_equity_info", "0")
-      val startPart = "20200725"
+      val startPart = getLastPartitionsOrElse(s"winhc_eci_dev.inc_ads_company_equity_info", "0")
       val endPart = getLastPartitionsOrElse(s"winhc_eci_dev.inc_ods_company_equity_info", BaseUtil.getYesterday())
 
       if (startPart.equals(endPart)) {
@@ -41,7 +40,7 @@ object inc_company_equity_info {
 
       sql(
         s"""
-           |SELECT  tmp.id
+           |SELECT  md5(cleanup(CONCAT_WS('',tmp.reg_number,tmp.reg_date,tmp.equity_amount))) as id
            |        ,tmp.cid
            |        ,tmp.base
            |        ,tmp.reg_number
@@ -79,11 +78,46 @@ object inc_company_equity_info {
       sql(
         s"""
            |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.inc_ads_company_equity_info PARTITION(ds='$endPart')
-           |SELECT  md5(cleanup(CONCAT_WS('',reg_number,cid,pledgor,pledgee))) as id
+           |SELECT  id
            |        ,${cols.mkString(",")}
            |FROM    tmp_company_equity_info_all
            |""".stripMargin)
 
+
+      sql(
+        s"""
+           |SELECT  cid,current_cid as new_cid
+           |FROM    winhc_eci_dev.inc_ods_company
+           |WHERE   ds >= ${startPart}
+           |AND     cid IS NOT NULL
+           |AND     current_cid IS NOT NULL
+           |GROUP BY cid,current_cid
+           |""".stripMargin).createOrReplaceTempView("mapping")
+
+      sql(
+        s"""
+           |SELECT   CONCAT_WS('_',t2.new_cid,split(t1.rowkey,'_')[1]) as rowkey,t2.new_cid as cid,${getColumns("winhc_eci_dev.ads_company_equity_info_list").diff(Seq("ds", "cid", "rowkey")).map("t1." + _).mkString(",")}
+           |FROM    (
+           |            SELECT  ${getColumns("winhc_eci_dev.ads_company_equity_info_list").diff(Seq("ds")).map("tmp." + _).mkString(",")}
+           |            FROM    (
+           |                        SELECT  t.*
+           |                                ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY rowkey) AS c
+           |                        FROM    (
+           |                                    SELECT  *
+           |                                    FROM    winhc_eci_dev.ads_company_equity_info_list
+           |                                    WHERE   ds = ${getLastPartitionsOrElse("winhc_eci_dev.ads_company_equity_info_list", "0")}
+           |                                    UNION ALL
+           |                                    SELECT  *
+           |                                    FROM    winhc_eci_dev.inc_ads_company_equity_info_list
+           |                                    WHERE   ds > ${getLastPartitionsOrElse("winhc_eci_dev.ads_company_equity_info_list", "0")}
+           |                                ) AS t
+           |                    ) AS tmp
+           |            WHERE   tmp.c = 1
+           |        ) AS t1
+           |JOIN    mapping as t2
+           |ON      t1.cid = t2.cid
+           |""".stripMargin).createOrReplaceTempView("replace_cid_tab")
+
       sql(
         s"""
            |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.inc_ads_company_equity_info_list PARTITION(ds='$endPart')
@@ -122,6 +156,10 @@ object inc_company_equity_info {
            |        ,deleted
            |FROM    tmp_company_equity_info_all
            |WHERE   cid IS NOT NULL
+           |
+           |UNION ALL
+           |
+           |select * from replace_cid_tab
            |""".stripMargin)
 
       val writeCols = getColumns("winhc_eci_dev.inc_ads_company_equity_info_list").diff(Seq("ds", "rowkey"))

+ 132 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/inc_company_mapping.scala

@@ -0,0 +1,132 @@
+package com.winhc.bigdata.spark.jobs
+
+import com.winhc.bigdata.spark.config.EsConfig
+import com.winhc.bigdata.spark.udf.BaseFunc
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/7/30 10:34
+ * @Description:
+ */
+object inc_company_mapping {
+
+  case class IncCompanyMappingUtil(s: SparkSession,
+                                   project: String
+                                  ) extends LoggingUtils with Logging with BaseFunc {
+    @(transient@getter) val spark: SparkSession = s
+
+
+    def init(): Unit = {
+      sql(
+        s"""
+           |CREATE TABLE IF NOT EXISTS winhc_eci_dev.base_company_mapping
+           |(
+           |    cid STRING COMMENT 'cid'
+           |    ,cname STRING COMMENT 'cname'
+           |    ,new_cid STRING COMMENT 'new_cid'
+           |    ,update_time TIMESTAMP COMMENT '更新时间'
+           |)
+           |COMMENT '公司全量数据cid到最新new_cid表,20200730'
+           |PARTITIONED BY
+           |(
+           |    ds STRING COMMENT '分区'
+           |)
+           |LIFECYCLE 15
+           |""".stripMargin)
+
+      sql(
+        s"""
+           |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.base_company_mapping PARTITION(ds='20200604')
+           |SELECT  cid
+           |        ,name AS cname
+           |        ,coalesce(current_cid,cid) AS new_cid
+           |        ,to_date('2020-06-04 00:00:00') as update_time
+           |FROM    winhc_eci_dev.ads_company
+           |WHERE   ds = '20200604'
+           |""".stripMargin)
+    }
+
+    def inc(ds: String): Unit = {
+      sql(
+        s"""
+           |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.base_company_mapping PARTITION(ds='$ds')
+           |SELECT  t1.cid AS cid
+           |        ,t1.cname AS cname
+           |        ,COALESCE(t2.new_cid,t1.new_cid) AS new_cid
+           |        ,COALESCE(t2.update_time,t1.update_time) AS update_time
+           |FROM    (
+           |            SELECT  all_mapping.cid
+           |                    ,all_mapping.cname
+           |                    ,all_mapping.new_cid
+           |                    ,all_mapping.update_time
+           |            FROM    (
+           |                        SELECT  *
+           |                                ,ROW_NUMBER() OVER(PARTITION BY cid ORDER BY update_time DESC) AS c
+           |                        FROM    (
+           |                                    SELECT  cid,cname,new_cid,update_time
+           |                                    FROM    winhc_eci_dev.base_company_mapping
+           |                                    WHERE   ds = '${BaseUtil.atDaysAfter(-1, ds)}'
+           |                                    UNION ALL
+           |                                    SELECT  cid
+           |                                            ,name AS cname
+           |                                            ,COALESCE(current_cid,cid) AS new_cid
+           |                                            ,now() AS update_time
+           |                                    FROM    winhc_eci_dev.inc_ods_company
+           |                                    WHERE   ds = '$ds'
+           |                                    AND     cid IS NOT NULL
+           |                                )
+           |                    ) AS all_mapping
+           |            WHERE   all_mapping.c = 1
+           |        ) AS t1
+           |LEFT JOIN (
+           |              SELECT  cid
+           |                      ,current_cid AS new_cid
+           |                      ,now() AS update_time
+           |              FROM    winhc_eci_dev.inc_ods_company
+           |              WHERE   ds = '$ds'
+           |              AND     cid IS NOT NULL
+           |              AND     current_cid IS NOT NULL
+           |              group by cid,current_cid
+           |          ) AS t2
+           |ON      t1.new_cid = t2.cid
+           |""".stripMargin)
+    }
+
+    def inc(): Unit = {
+
+      val lastDs = getLastPartitionsOrElse("winhc_eci_dev.base_company_mapping", "20200604")
+      val dss = getPartitions("winhc_eci_dev.inc_ods_company").filter(_ > lastDs)
+
+      println("计算分区:" + dss.mkString(","))
+
+      for (ds <- dss) {
+        inc(ds)
+      }
+    }
+  }
+
+
+  def main(args: Array[String]): Unit = {
+
+    val project = "winhc_eci_dev"
+    val config = EsConfig.getEsConfigMap ++ mutable.Map(
+      "spark.hadoop.odps.project.name" -> project,
+      "spark.debug.maxToStringFields" -> "200",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "100"
+    )
+    val spark = SparkUtils.InitEnv("inc_company_mapping", config)
+
+    val incCompanyMappingUtil = IncCompanyMappingUtil(spark, project)
+    //    incCompanyMappingUtil.init()
+    incCompanyMappingUtil.inc()
+    spark.stop()
+  }
+
+}

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/model/CompanyCourtAnnouncement.scala

@@ -88,7 +88,7 @@ case class CompanyCourtAnnouncement(s: SparkSession, sourceTable: String,
       "score", "total", "extraScore")
       .createOrReplaceTempView(s"${sourceTable}_tmp_view")
 
-    logger.info(
+    logInfo(
       s"""
          |- - - - - - - - - - - - - - - - - - - - - - - - -
          |${showString(sql(s"select * from ${sourceTable}_tmp_view"))}

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/model/CompanyIntellectualsScore.scala

@@ -155,7 +155,7 @@ case class CompanyIntellectualsScore(s: SparkSession, sourceTable: String, table
       "score", "total", "extraScore")
       .createOrReplaceTempView(s"t1_view")
 
-    //    logger.info(
+    //    logInfo(
     //      s"""
     //         |- - - - - - - - - - - - - - - - - - - - - - - - -
     //         |${showString(sql(s"select * from t1_view"))}

+ 3 - 5
src/main/scala/com/winhc/bigdata/spark/utils/LoggingUtils.scala

@@ -1,7 +1,7 @@
 package com.winhc.bigdata.spark.utils
 
 import org.apache.commons.lang3.StringUtils
-import org.apache.log4j.Logger
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.types.{DataType, StructField, StructType}
 import org.apache.spark.sql.{DataFrame, SparkSession}
 
@@ -11,15 +11,13 @@ import scala.collection.immutable.ListMap
 /**
  * π
  */
-trait LoggingUtils {
+trait LoggingUtils extends Logging {
   protected var sqlNo = 1
 
-  @transient protected[this] val logger: Logger = Logger.getLogger(this.getClass)
-
   @(transient@getter) protected val spark: SparkSession
 
   def sql(sqlString: String): DataFrame = {
-    logger.info(
+    logInfo(
       s"""
          |- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
          |Job[${this.getClass.getSimpleName}].SQL[No$sqlNo.]