Browse Source

fix: 年报对外投资输出摘要

许家凯 4 years ago
parent
commit
17184217b2
1 changed files with 112 additions and 6 deletions
  1. 112 6
      src/main/scala/com/winhc/bigdata/spark/jobs/CompanyAnnualReport.scala

+ 112 - 6
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyAnnualReport.scala

@@ -18,6 +18,103 @@ import scala.collection.mutable
  */
 object CompanyAnnualReport {
 
+  case class sublist_summary(s: SparkSession,
+                             project: String, //表所在工程名
+                             tableName: String //表名,无前辍
+                            ) extends LoggingUtils with Logging {
+    @(transient@getter) val spark: SparkSession = s
+
+    def all(): Unit = {
+      val inc_ads_table_name = s"inc_ads_$tableName"
+      val ads_table_name = s"ads_$tableName"
+
+      val tableExists = spark.catalog.tableExists(inc_ads_table_name)
+      println(
+        s"""
+           |CREATE TABLE IF NOT EXISTS winhc_eci_dev.xjk_company_annual_report_out_investment_summary as 
+           |SELECT  split(rowkey,'_')[0] AS cid
+           |        ,COUNT(1) AS company_annual_report_out_investment
+           |FROM    (
+           |            SELECT  *
+           |                    ,DENSE_RANK() OVER(PARTITION BY split(rowkey,'_')[0] ORDER BY split(rowkey,'_')[1] DESC ) AS year_num
+           |            FROM    (
+           |                        SELECT  *
+           |                                ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds DESC ) AS num
+           |                        FROM    (
+           |                                    SELECT  *
+           |                                    FROM    winhc_eci_dev.ads_company_annual_report_out_investment
+           |                                    WHERE   ds = '${getLastPartitionsOrElse(ads_table_name, "0")}'
+           |                                    ${
+          tableExists match {
+            case true => {
+              s"""
+                 |UNION ALL
+                 |SELECT  *
+                 |FROM    winhc_eci_dev.inc_ads_company_annual_report_out_investment
+                 |WHERE   ds > '${getLastPartitionsOrElse(ads_table_name, "0")}'
+                 |""".stripMargin
+            }
+            case false => {
+              ""
+            }
+          }
+        }
+           |                                ) AS t1
+           |                    ) AS t2
+           |            WHERE   t2.num = 1
+           |        ) AS t3
+           |WHERE   t3.year_num = 1
+           |GROUP BY split(rowkey,'_')[0]
+           |""".stripMargin)
+
+    }
+
+
+    def inc(ds: String): Unit = {
+      val inc_ads_table_name = s"inc_ads_$tableName"
+      val ads_table_name = s"ads_$tableName";
+      import com.winhc.bigdata.spark.implicits.DataFrame2HBaseHelper._
+      sql(
+        s"""
+           |SELECT  split(rowkey,'_')[0] AS cid
+           |        ,COUNT(1) AS company_annual_report_out_investment
+           |FROM    (
+           |            SELECT  *
+           |                    ,DENSE_RANK() OVER(PARTITION BY split(rowkey,'_')[0] ORDER BY split(rowkey,'_')[1] DESC ) AS year_num
+           |            FROM    (
+           |                        SELECT  *
+           |                                ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds DESC ) AS num
+           |                        FROM    (
+           |                                    SELECT  t1.*
+           |                                    FROM    (
+           |                                                SELECT  *
+           |                                                FROM    winhc_eci_dev.$ads_table_name
+           |                                                WHERE   ds = '${getLastPartitionsOrElse(ads_table_name, "0")}'
+           |                                            ) AS t1
+           |                                    JOIN    (
+           |                                                SELECT  DISTINCT split(rowkey,'_')[0] AS cid
+           |                                                FROM    winhc_eci_dev.$inc_ads_table_name
+           |                                                WHERE   ds = '$ds'
+           |                                            ) AS t2
+           |                                    ON      t2.cid = split(t1.rowkey,'_')[0]
+           |                                    UNION ALL
+           |                                    SELECT  *
+           |                                    FROM    winhc_eci_dev.$inc_ads_table_name
+           |                                    WHERE   ds = '$ds'
+           |                                ) AS t3
+           |                    ) AS t4
+           |            WHERE   t4.num = 1
+           |        ) AS t5
+           |WHERE   t5.year_num = 1
+           |GROUP BY split(rowkey,'_')[0]
+           |""".stripMargin)
+        .save2HBase("COMPANY_SUMMARY", "cid", Seq("company_annual_report_out_investment"))
+
+    }
+
+  }
+
+
   case class CompanyAnnualReportUtils(s: SparkSession,
                                       project: String //表所在工程名
                                      ) extends LoggingUtils with Logging with BaseFunc {
@@ -238,7 +335,8 @@ object CompanyAnnualReport {
           , "rowkey"
           , "cid" +: writCols)
 
-      CompanyIncSummary(spark, project, "company_annual_report", "new_cid", Seq("new_cid","report_year")).calc
+      CompanyIncSummary(spark, project, "company_annual_report", "new_cid", Seq("new_cid", "report_year")).calc
+
 
     }
 
@@ -373,6 +471,13 @@ object CompanyAnnualReport {
         .save2HBase(tableName.toUpperCase
           , "rowkey"
           , "cid" +: writeCols)
+
+
+      //todo 年报对外投资需要输出摘要
+
+      if (tableName.equals("company_annual_report_out_investment")) {
+        sublist_summary(s = spark, project = "winhc_eci_dev", tableName = tableName).inc(inc_ods_end_ds)
+      }
     }
   }
 
@@ -405,13 +510,14 @@ object CompanyAnnualReport {
 
     val all_flag = false
 
+
     if (all_flag) {
       //存量
-     /* CompanyAnnualReportHandle(spark, project).main_table_all()
-      for (elem <- sublist_map) {
-        println("xjk:" + elem._1)
-        CompanyAnnualReportHandle(spark, project).sublist_all(elem._1, elem._2.split(","))
-      }*/
+      /* CompanyAnnualReportHandle(spark, project).main_table_all()
+       for (elem <- sublist_map) {
+         println("xjk:" + elem._1)
+         CompanyAnnualReportHandle(spark, project).sublist_all(elem._1, elem._2.split(","))
+       }*/
     } else {
       //增量
       CompanyAnnualReportHandle(spark, project).main_table_inc()