Explorar o código

feat: 年报以及年报的子表,存量+增量

许家凯 %!s(int64=4) %!d(string=hai) anos
pai
achega
62945d19bb

+ 385 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyAnnualReport.scala

@@ -0,0 +1,385 @@
+package com.winhc.bigdata.spark.jobs
+
+import com.winhc.bigdata.spark.config.EsConfig
+import com.winhc.bigdata.spark.udf.BaseFunc
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.{DataTypeUtils, LoggingUtils, SparkUtils}
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/8/13 14:00
+ * @Description:
+ */
+object CompanyAnnualReport {
+
+  case class CompanyAnnualReportUtils(s: SparkSession,
+                                      project: String //表所在工程名
+                                     ) extends LoggingUtils with Logging with BaseFunc {
+    @(transient@getter) val spark: SparkSession = s
+
+
+    def getDDL(targetTablePre: String = "ads_", tableName: String, otherCols: Seq[(String, String)]): String = {
+      val colsSet = otherCols.map(_._1).toSet
+
+
+      val cols = otherCols.map(f => {
+        s"${f._1} ${f._2} \n"
+      }) ++
+        spark.table(s"$project.ods_$tableName").schema.fields
+          .filter(f => {
+            !f.name.equals("ds")
+          }).filter(f => {
+          !colsSet.contains(f.name)
+        }).map(f => {
+          val name = f.name
+          val dataType = f.dataType
+          s"$name ${DataTypeUtils.getDataType(dataType)} COMMENT '${f.getComment().getOrElse("")}'\n"
+        })
+
+      s"""
+         |CREATE TABLE IF NOT EXISTS $project.$targetTablePre$tableName
+         |(
+         |    ${cols.mkString(",")}
+         |)
+         |COMMENT 'TABLE COMMENT'
+         |PARTITIONED BY (ds STRING COMMENT '分区')
+         |""".stripMargin
+    }
+
+
+    def getMainTmpTab(tempView: String): Unit = {
+      val ds = getLastPartitionsOrElse(s"$project.ads_company_annual_report", "0")
+      sql(
+        s"""
+           |SELECT  rowkey
+           |        ,main_id
+           |FROM    (
+           |            SELECT  *
+           |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds DESC ) AS num
+           |            FROM    (
+           |                        SELECT  rowkey
+           |                                ,id AS main_id
+           |                                ,ds
+           |                        FROM    $project.ads_company_annual_report
+           |                        WHERE   ds = $ds
+           |                        UNION ALL
+           |                        SELECT  rowkey
+           |                                ,id AS main_id
+           |                                ,ds
+           |                        FROM    $project.inc_ads_company_annual_report
+           |                        WHERE   ds > $ds
+           |                    ) AS t1
+           |        ) AS t2
+           |WHERE   t2.num = 1
+           |""".stripMargin)
+        .createOrReplaceTempView(tempView)
+    }
+  }
+
+
+  case class CompanyAnnualReportHandle(s: SparkSession,
+                                       project: String //表所在工程名
+                                      ) extends LoggingUtils with Logging with BaseFunc {
+    @(transient@getter) val spark: SparkSession = s
+
+
+    def main_table_all(): Unit = {
+      //创建表
+      val utils = CompanyAnnualReportUtils(spark, project)
+      val ddl = utils.getDDL(tableName = "company_annual_report", otherCols = Seq(("rowkey", "string"), ("new_cid", "string"), ("cid", "string")))
+      if (!spark.catalog.tableExists(s"$project.ads_company_annual_report")) {
+        println(ddl)
+        println("\n请手动建表")
+        return
+      }
+
+      val ds = getLastPartitionsOrElse(s"$project.ods_company_annual_report", "0")
+
+      val columns = getColumns(s"$project.ads_company_annual_report").diff(Seq("ds", "rowkey", "new_cid", "cid"))
+
+      //写入到主表 ads
+      sql(
+        s"""
+           |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $project.ads_company_annual_report PARTITION(ds=${ds})
+           |SELECT  rowkey,new_cid,cid,${columns.mkString(",")}
+           |FROM    (
+           |        SELECT
+           |                *
+           |                ,ROW_NUMBER() OVER (PARTITION BY new_cid,report_year ORDER BY id DESC ) num
+           |                ,CONCAT_WS('_',new_cid,report_year) AS rowkey
+           |        FROM    (
+           |                SELECT
+           |                        a.*
+           |                        ,coalesce(b.new_cid,a.cid) AS new_cid
+           |                FROM    (
+           |                        select *
+           |                        from  $project.ods_company_annual_report
+           |                        WHERE   ds = $ds
+           |                        AND  cid IS NOT NULL
+           |                ) a
+           |                LEFT JOIN (
+           |                        select *
+           |                        from  $project.base_company_mapping
+           |                        where ds = $ds
+           |                 ) b
+           |                ON      a.cid = b.cid
+           |            ) c
+           |        ) d
+           |WHERE   num =1  AND report_year is not null AND report_year <> ''
+           |""".stripMargin)
+    }
+
+    def main_table_inc(): Unit = {
+      val utils = CompanyAnnualReportUtils(spark, project)
+      val ddl = utils.getDDL(targetTablePre = "inc_ads_", tableName = "company_annual_report", otherCols = Seq(("rowkey", "string"), ("new_cid", "string"), ("cid", "string")))
+      if (!spark.catalog.tableExists(s"$project.inc_ads_company_annual_report")) {
+        println(ddl)
+        println("\n请手动建表")
+        return
+      }
+
+      val inc_ods_end_ds = getLastPartitionsOrElse(s"$project.inc_ods_company_annual_report", "0")
+      val ads_end_ds = getLastPartitionsOrElse(s"$project.ads_company_annual_report", "0")
+
+      val columns = getColumns(s"$project.ads_company_annual_report").diff(Seq("ds", "rowkey", "new_cid", "cid"))
+
+      sql(
+        s"""
+           |SELECT  cid,current_cid as new_cid
+           |FROM    $project.inc_ods_company
+           |WHERE   ds > ${ads_end_ds}
+           |AND     cid IS NOT NULL
+           |AND     current_cid IS NOT NULL
+           |GROUP BY cid,current_cid
+           |""".stripMargin).createOrReplaceTempView("mapping")
+
+      sql(
+        s"""
+           |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $project.inc_ads_company_annual_report PARTITION(ds=${inc_ods_end_ds})
+           |SELECT
+           |        rowkey
+           |        ,new_cid
+           |        ,cid
+           |        ,${columns.mkString(",")}
+           |FROM    (
+           |        SELECT
+           |                *
+           |                ,ROW_NUMBER() OVER(PARTITION BY new_cid,report_year ORDER BY id DESC ) num
+           |                ,CONCAT_WS('_',new_cid,report_year) AS rowkey
+           |        FROM    (
+           |                SELECT
+           |                        coalesce(b.new_cid,a.cid) AS new_cid
+           |                        ,a.cid
+           |                        ,${columns.map(s => s"a.$s").mkString(",")}
+           |                FROM    (
+           |                        SELECT  *
+           |                        FROM    $project.inc_ods_company_annual_report
+           |                        WHERE   ds > $ads_end_ds and cid is not null
+           |                        ) a
+           |                LEFT JOIN (
+           |                        select *
+           |                        from $project.base_company_mapping
+           |                        where ds =${getLastPartitionsOrElse(s"$project.base_company_mapping", "0")}
+           |                ) b
+           |                ON      a.cid = b.cid
+           |                UNION ALL
+           |                SELECT
+           |                        a.new_cid
+           |                        ,b.cid
+           |                        ,${columns.mkString(",")}
+           |                FROM    mapping a
+           |                JOIN (
+           |                        SELECT
+           |                                new_cid AS cid
+           |                                ,${columns.mkString(",")}
+           |                        FROM    $project.ads_company_annual_report
+           |                        WHERE   ds = ${ads_end_ds}
+           |                        UNION ALL
+           |                        SELECT
+           |                                new_cid AS cid
+           |                                ,${columns.mkString(",")}
+           |                        FROM    $project.inc_ads_company_annual_report
+           |                        WHERE   ds > ${ads_end_ds}
+           |                     ) b
+           |                ON      a.cid = b.cid
+           |                ) c
+           |        ) d
+           |WHERE   num =1 AND report_year IS NOT NULL AND report_year <> ''
+           |""".stripMargin)
+
+      //todo 只写入hbase
+
+    }
+
+
+    def sublist_all(tableName: String, primaryFields: Seq[String]): Unit = {
+      cleanup()
+      val utils = CompanyAnnualReportUtils(spark, project)
+
+      //创建表
+      val ddl = utils.getDDL(tableName = tableName, otherCols = Seq(("rowkey", "String"), ("new_cid", "string")))
+      if (!spark.catalog.tableExists(s"$project.ads_$tableName")) {
+        println(ddl)
+        println("\n请手动建表")
+        return
+      }
+
+      utils.getMainTmpTab("main_table_tmp")
+
+      val ods_ds = getLastPartitionsOrElse(s"$project.ods_$tableName", "0")
+
+      sql(
+        s"""
+           |INSERT OVERWRITE TABLE $project.ads_$tableName PARTITION(ds='$ods_ds')
+           |SELECT  ${getColumns(s"$project.ads_$tableName").diff(Seq("ds")).mkString(",")}
+           |FROM    (
+           |            SELECT  t3.*
+           |                    ,split(rowkey,'_')[0] as new_cid
+           |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY id DESC) AS num
+           |            FROM    (
+           |                        SELECT  ${
+          if (primaryFields.exists(s => StringUtils.isNotBlank(s))) {
+            s"""
+               |concat_ws(
+               |  '_'
+               |  ,t2.rowkey
+               | ,md5(
+               |    cleanup(concat_ws('',${primaryFields.mkString(",")}))
+               |      )
+               | )
+               |""".stripMargin
+          } else "t2.rowkey"
+        }
+           |
+           |                                 AS rowkey
+           |                                ,t1.*
+           |                        FROM    (
+           |                                    SELECT  *
+           |                                    FROM    winhc_eci_dev.ods_$tableName
+           |                                    WHERE   ds = $ods_ds
+           |                                ) AS t1
+           |                        JOIN main_table_tmp AS t2
+           |                        ON      t1.main_id = t2.main_id
+           |                    ) AS t3
+           |        ) AS t4
+           |WHERE   t4.num = 1
+           |""".stripMargin)
+    }
+
+
+    def sublist_inc(tableName: String, primaryFields: Seq[String]): Unit = {
+      cleanup()
+      val utils = CompanyAnnualReportUtils(spark, project)
+
+      val ddl = utils.getDDL(targetTablePre = "inc_ads_", tableName = tableName, otherCols = Seq(("rowkey", "String"), ("new_cid", "string")))
+      if (!spark.catalog.tableExists(s"$project.inc_ads_$tableName")) {
+        println(ddl)
+        println("\n请手动建表")
+        return
+      }
+
+      utils.getMainTmpTab("main_table_tmp")
+
+
+      val inc_ads_end_ds = getLastPartitionsOrElse(s"$project.inc_ads_$project", "0")
+      val inc_ods_end_ds = getLastPartitionsOrElse(s"$project.inc_ods_$project", "0")
+
+      sql(
+        s"""
+           |INSERT OVERWRITE TABLE $project.inc_ads_$tableName PARTITION(ds='$inc_ods_end_ds')
+           |SELECT  ${getColumns(s"$project.inc_ads_$tableName").diff(Seq("ds")).mkString(",")}
+           |FROM    (
+           |            SELECT  t3.*
+           |                    ,split(rowkey,'_')[0] as new_cid
+           |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds DESC) AS num
+           |            FROM    (
+           |                        SELECT  ${
+          if (primaryFields.exists(s => StringUtils.isNotBlank(s))) {
+            s"""
+               |concat_ws(
+               |  '_'
+               |  ,t2.rowkey
+               | ,md5(
+               |    cleanup(concat_ws('',${primaryFields.mkString(",")}))
+               |      )
+               | )
+               |""".stripMargin
+          } else "t2.rowkey"
+        } AS rowkey
+           |                                ,t1.*
+           |                        FROM    (
+           |                                    SELECT  *
+           |                                    FROM    winhc_eci_dev.inc_ods_$tableName
+           |                                    WHERE   ds > $inc_ads_end_ds
+           |                                ) AS t1
+           |                        JOIN main_table_tmp AS t2
+           |                        ON      t1.main_id = t2.main_id
+           |                    ) AS t3
+           |        ) AS t4
+           |WHERE   t4.num = 1
+           |""".stripMargin)
+
+      //todo 只写入hbase
+
+    }
+  }
+
+
+  def main(args: Array[String]): Unit = {
+    val Array(project, ds) = args
+
+    println(
+      s"""
+         |project: $project
+         |ds: $ds
+         |""".stripMargin)
+
+    val config = EsConfig.getEsConfigMap ++ mutable.Map(
+      "spark.hadoop.odps.project.name" -> project,
+      "spark.debug.maxToStringFields" -> "200",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
+    )
+    val spark = SparkUtils.InitEnv("company_annual_report", config)
+
+    val sublist_map = Map(
+      "company_annual_report_out_investment" -> "out_investment_name" //年报对外投资
+      , "company_annual_report_change" -> "change_item,change_time" //年报-变更
+      , "company_annual_report_equity_change" -> "investor_name,change_time" //年报-股权变更
+      , "company_annual_report_holder" -> "investor_name" //年报-股东
+      , "company_annual_report_out_guarantee" -> "id" //年报-对外担保
+      , "company_annual_report_webinfo" -> "website" //年报-网站
+      , "company_annual_report_social_security" -> "" //年报-社保 (不采取去重,和main_id一对一)
+    )
+
+    //    for (elem <- sublist_map) {
+    //      CompanyAnnualReportHandle(spark, project).sublist_all(elem._1, elem._2.split(","))
+    //    }
+
+    //    for (e <- sublist_map) {
+    //      CompanyAnnualReportHandle(spark, project).sublist_all(e._1, e._2.split(","))
+    //    }
+
+    val all_flag = true
+
+    if (all_flag) {
+      //存量
+      CompanyAnnualReportHandle(spark, project).main_table_all()
+      for (elem <- sublist_map) {
+        CompanyAnnualReportHandle(spark, project).sublist_all(elem._1, elem._2.split(","))
+      }
+    } else {
+      //增量
+      CompanyAnnualReportHandle(spark, project).main_table_inc()
+      for (e <- sublist_map) {
+        CompanyAnnualReportHandle(spark, project).sublist_all(e._1, e._2.split(","))
+      }
+    }
+    spark.stop()
+  }
+}