Forráskód Böngészése

feat: 减资记录和增资记录

许家凯 4 éve
szülő
commit
453dda2192

+ 72 - 58
src/main/scala/com/winhc/bigdata/spark/jobs/reduction_registered_capital_info.scala

@@ -17,54 +17,54 @@ case class reduction_registered_capital_info(s: SparkSession,
                                              project: String //表所在工程名
                                             ) extends LoggingUtils with BaseFunc {
   @(transient@getter) val spark: SparkSession = s
-  private val target_tab = s"$project.ads_reduction_registered_capital_info"
-  private val inc_target_tab = s"$project.inc_ads_reduction_registered_capital_info"
 
-  def init(): Unit = {
-    if (!spark.catalog.tableExists(target_tab)) {
-      sql(
-        s"""
-           |CREATE TABLE IF NOT EXISTS $target_tab (
-           |  `rowkey` STRING COMMENT 'FIELD',
-           |  `new_cid` STRING COMMENT 'FIELD',
-           |  `category` STRING,
-           |  `change_item` STRING,
-           |  `content_before` STRING,
-           |  `content_after` STRING,
-           |  `change_time` DATETIME,
-           |  `create_time` DATETIME,
-           |  `update_time` DATETIME,
-           |  `deleted` BIGINT)
-           | COMMENT 'TABLE COMMENT'
-           |PARTITIONED BY (
-           |  `ds` STRING COMMENT '分区')
-           |""".stripMargin)
-    }
-
-    if (!spark.catalog.tableExists(inc_target_tab)) {
-      sql(
-        s"""
-           |CREATE TABLE IF NOT EXISTS $inc_target_tab (
-           |  `rowkey` STRING COMMENT 'FIELD',
-           |  `new_cid` STRING COMMENT 'FIELD',
-           |  `category` STRING,
-           |  `change_item` STRING,
-           |  `content_before` STRING,
-           |  `content_after` STRING,
-           |  `change_time` DATETIME,
-           |  `create_time` DATETIME,
-           |  `update_time` DATETIME,
-           |  `deleted` BIGINT)
-           | COMMENT 'TABLE COMMENT'
-           |PARTITIONED BY (
-           |  `ds` STRING COMMENT '分区')
-           |""".stripMargin)
+  def calc(tn: String, symbol: String): Unit = {
+    val target_tab = s"$project.ads_$tn"
+    val inc_target_tab = s"$project.inc_ads_$tn"
+    val target_cols = getColumns(target_tab)
+
+    def init(): Unit = {
+      if (!spark.catalog.tableExists(target_tab)) {
+        sql(
+          s"""
+             |CREATE TABLE IF NOT EXISTS $target_tab (
+             |  `rowkey` STRING COMMENT 'FIELD',
+             |  `new_cid` STRING COMMENT 'FIELD',
+             |  `category` STRING,
+             |  `change_item` STRING,
+             |  `content_before` STRING,
+             |  `content_after` STRING,
+             |  `change_time` DATETIME,
+             |  `create_time` DATETIME,
+             |  `update_time` DATETIME,
+             |  `deleted` BIGINT)
+             | COMMENT 'TABLE COMMENT'
+             |PARTITIONED BY (
+             |  `ds` STRING COMMENT '分区')
+             |""".stripMargin)
+      }
+
+      if (!spark.catalog.tableExists(inc_target_tab)) {
+        sql(
+          s"""
+             |CREATE TABLE IF NOT EXISTS $inc_target_tab (
+             |  `rowkey` STRING COMMENT 'FIELD',
+             |  `new_cid` STRING COMMENT 'FIELD',
+             |  `category` STRING,
+             |  `change_item` STRING,
+             |  `content_before` STRING,
+             |  `content_after` STRING,
+             |  `change_time` DATETIME,
+             |  `create_time` DATETIME,
+             |  `update_time` DATETIME,
+             |  `deleted` BIGINT)
+             | COMMENT 'TABLE COMMENT'
+             |PARTITIONED BY (
+             |  `ds` STRING COMMENT '分区')
+             |""".stripMargin)
+      }
     }
-  }
 
-  private val target_cols = getColumns(target_tab)
-
-  def calc(): Unit = {
     spark.udf.register("registered_capital_trim", RegisteredCapitalUtil.registered_capital_trim _)
     val ads_ds = getLastPartitionsOrElse(target_tab, null)
 
@@ -80,28 +80,32 @@ case class reduction_registered_capital_info(s: SparkSession,
            |AND     category LIKE '%注册资本%'
            |AND     registered_capital_trim(content_before) IS NOT NULL
            |AND     registered_capital_trim(content_after) IS NOT NULL
-           |AND     CAST(registered_capital_trim(content_before) AS DOUBLE ) > CAST( registered_capital_trim(content_after) AS DOUBLE )
+           |AND     CAST(registered_capital_trim(content_before) AS DOUBLE ) $symbol CAST( registered_capital_trim(content_after) AS DOUBLE )
            |""".stripMargin)
     }
 
     def inc(): Unit = {
-      val inc_ads_tab = s"$project.inc_ads_company_change"
-      val inc_ads_last_ds = getLastPartitionsOrElse(inc_ads_tab, "0")
-
+      val inc_org_tab = s"$project.inc_ads_company_change"
+      val inc_org_last_ds = getLastPartitionsOrElse(inc_org_tab, "0")
+      var inc_target_tab_ds = getLastPartitionsOrElse(inc_target_tab, "0")
+      if (inc_target_tab_ds.equals(inc_org_last_ds)) {
+        println("rerun...")
+        inc_target_tab_ds = getSecondLastPartitionOrElse(inc_target_tab, "0")
+      }
       sql(
         s"""
-           |INSERT OVERWRITE TABLE $inc_target_tab PARTITION(ds='$inc_ads_last_ds')
+           |INSERT OVERWRITE TABLE $inc_target_tab PARTITION(ds='$inc_org_last_ds')
            |SELECT  ${target_cols.diff(Seq("ds")).mkString(",")}
            |FROM    (
            |        SELECT  * ,ROW_NUMBER()OVER (PARTITION BY rowkey ORDER BY ds DESC ) AS num
            |        FROM    (
            |                SELECT  *
-           |                FROM    $inc_ads_tab
-           |                WHERE   ds > $ads_ds
+           |                FROM    $inc_org_tab
+           |                WHERE   ds > $inc_target_tab_ds
            |                AND     category LIKE '%注册资本%'
            |                AND     registered_capital_trim(content_before) IS NOT NULL
            |                AND     registered_capital_trim(content_after) IS NOT NULL
-           |                AND     CAST (registered_capital_trim(content_before) AS DOUBLE )> CAST (registered_capital_trim(content_after) AS DOUBLE )
+           |                AND     CAST (registered_capital_trim(content_before) AS DOUBLE ) $symbol CAST (registered_capital_trim(content_after) AS DOUBLE )
            |                ) AS t1
            |        ) AS t2
            |WHERE   t2.num =1
@@ -110,14 +114,14 @@ case class reduction_registered_capital_info(s: SparkSession,
 
       MaxComputer2Phoenix(spark
         , target_cols.diff(Seq("ds"))
-        , inc_ads_tab
-        , "REDUCTION_REGISTERED_CAPITAL_INFO"
-        , inc_ads_last_ds
+        , inc_target_tab
+        , tn.toUpperCase
+        , inc_org_last_ds
         , "rowkey").syn()
 
       CompanySummaryPro(s = spark
         , project = "winhc_eci_dev"
-        , tableName = "reduction_registered_capital_info"
+        , tableName = tn
         , cidField = "split(rowkey,'_')[0]"
       )
         .calc()
@@ -132,6 +136,15 @@ case class reduction_registered_capital_info(s: SparkSession,
       inc()
     }
   }
+
+  def additionalShareAndReduction(): Unit = {
+    for (t <- Seq(
+      ("reduction_registered_capital_info", ">")
+      , ("increase_registered_capital_info", "<")
+    )) {
+      calc(t._1, t._2)
+    }
+  }
 }
 
 object reduction_registered_capital_info {
@@ -144,7 +157,8 @@ object reduction_registered_capital_info {
     )
 
     val spark = SparkUtils.InitEnv(getClass.getSimpleName, config)
-    reduction_registered_capital_info(s = spark, project = "winhc_eci_dev").calc()
+    reduction_registered_capital_info(s = spark, project = "winhc_eci_dev")
+      .additionalShareAndReduction()
 
     spark.stop()
   }