Przeglądaj źródła

feat: 减资记录

许家凯 4 lat temu
rodzic
commit
f3205ced7e

+ 151 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/reduction_registered_capital_info.scala

@@ -0,0 +1,151 @@
+package com.winhc.bigdata.spark.jobs
+
+import com.winhc.bigdata.spark.udf.BaseFunc
+import com.winhc.bigdata.spark.utils._
+import org.apache.spark.sql.SparkSession
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @author: XuJiakai
+ * @date: 2020/11/3 11:23
+ *        减资记录
+ */
+
+case class reduction_registered_capital_info(s: SparkSession,
+                                             project: String //表所在工程名
+                                            ) extends LoggingUtils with BaseFunc {
+  @(transient@getter) val spark: SparkSession = s
+  private val target_tab = s"$project.ads_reduction_registered_capital_info"
+  private val inc_target_tab = s"$project.inc_ads_reduction_registered_capital_info"
+
+  def init(): Unit = {
+    if (!spark.catalog.tableExists(target_tab)) {
+      sql(
+        s"""
+           |CREATE TABLE IF NOT EXISTS $target_tab (
+           |  `rowkey` STRING COMMENT 'FIELD',
+           |  `new_cid` STRING COMMENT 'FIELD',
+           |  `category` STRING,
+           |  `change_item` STRING,
+           |  `content_before` STRING,
+           |  `content_after` STRING,
+           |  `change_time` DATETIME,
+           |  `create_time` DATETIME,
+           |  `update_time` DATETIME,
+           |  `deleted` BIGINT)
+           | COMMENT 'TABLE COMMENT'
+           |PARTITIONED BY (
+           |  `ds` STRING COMMENT '分区')
+           |""".stripMargin)
+    }
+
+    if (!spark.catalog.tableExists(inc_target_tab)) {
+      sql(
+        s"""
+           |CREATE TABLE IF NOT EXISTS $inc_target_tab (
+           |  `rowkey` STRING COMMENT 'FIELD',
+           |  `new_cid` STRING COMMENT 'FIELD',
+           |  `category` STRING,
+           |  `change_item` STRING,
+           |  `content_before` STRING,
+           |  `content_after` STRING,
+           |  `change_time` DATETIME,
+           |  `create_time` DATETIME,
+           |  `update_time` DATETIME,
+           |  `deleted` BIGINT)
+           | COMMENT 'TABLE COMMENT'
+           |PARTITIONED BY (
+           |  `ds` STRING COMMENT '分区')
+           |""".stripMargin)
+    }
+  }
+
+  private val target_cols = getColumns(target_tab)
+
+  def calc(): Unit = {
+    spark.udf.register("registered_capital_trim", RegisteredCapitalUtil.registered_capital_trim _)
+    val ads_ds = getLastPartitionsOrElse(target_tab, null)
+
+    def all(): Unit = {
+      val ads_tab = s"$project.ads_company_change"
+      val ads_ds = getLastPartitionsOrElse(ads_tab, "0")
+      sql(
+        s"""
+           |INSERT OVERWRITE TABLE $target_tab PARTITION(ds='$ads_ds')
+           |SELECT  ${target_cols.diff(Seq("ds")).mkString(",")}
+           |FROM    $ads_tab
+           |WHERE   ds = $ads_ds
+           |AND     category LIKE '%注册资本%'
+           |AND     registered_capital_trim(content_before) IS NOT NULL
+           |AND     registered_capital_trim(content_after) IS NOT NULL
+           |AND     CAST(registered_capital_trim(content_before) AS DOUBLE ) > CAST( registered_capital_trim(content_after) AS DOUBLE )
+           |""".stripMargin)
+    }
+
+    def inc(): Unit = {
+      val inc_ads_tab = s"$project.inc_ads_company_change"
+      val inc_ads_last_ds = getLastPartitionsOrElse(inc_ads_tab, "0")
+
+      sql(
+        s"""
+           |INSERT OVERWRITE TABLE $inc_target_tab PARTITION(ds='$inc_ads_last_ds')
+           |SELECT  ${target_cols.diff(Seq("ds")).mkString(",")}
+           |FROM    (
+           |        SELECT  * ,ROW_NUMBER()OVER (PARTITION BY rowkey ORDER BY ds DESC ) AS num
+           |        FROM    (
+           |                SELECT  *
+           |                FROM    $inc_ads_tab
+           |                WHERE   ds > $ads_ds
+           |                AND     category LIKE '%注册资本%'
+           |                AND     registered_capital_trim(content_before) IS NOT NULL
+           |                AND     registered_capital_trim(content_after) IS NOT NULL
+           |                AND     CAST (registered_capital_trim(content_before) AS DOUBLE )> CAST (registered_capital_trim(content_after) AS DOUBLE )
+           |                ) AS t1
+           |        ) AS t2
+           |WHERE   t2.num =1
+           |""".stripMargin)
+
+
+      MaxComputer2Phoenix(spark
+        , target_cols.diff(Seq("ds"))
+        , inc_ads_tab
+        , "REDUCTION_REGISTERED_CAPITAL_INFO"
+        , inc_ads_last_ds
+        , "rowkey").syn()
+
+      CompanySummaryPro(s = spark
+        , project = "winhc_eci_dev"
+        , tableName = "reduction_registered_capital_info"
+        , cidField = "split(rowkey,'_')[0]"
+      )
+        .calc()
+
+    }
+
+    if (ads_ds == null) {
+      println("all.... ")
+      all()
+    } else {
+      println("inc.... ")
+      inc()
+    }
+  }
+}
+
+object reduction_registered_capital_info {
+
+  def main(args: Array[String]): Unit = {
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
+      "spark.debug.maxToStringFields" -> "200",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "10000"
+    )
+
+    val spark = SparkUtils.InitEnv(getClass.getSimpleName, config)
+    reduction_registered_capital_info(s = spark, project = "winhc_eci_dev").calc()
+
+    spark.stop()
+  }
+}

+ 37 - 0
src/main/scala/com/winhc/bigdata/spark/utils/RegisteredCapitalUtil.scala

@@ -0,0 +1,37 @@
+package com.winhc.bigdata.spark.utils
+
+import java.util.regex.Pattern
+
+import com.aliyun.odps.utils.StringUtils
+
+/**
+ * @author: XuJiakai
+ * @date: 2020/11/3 11:18
+ */
+object RegisteredCapitalUtil {
+
+  private def isDouble(v: String) = try {
+    v.toDouble
+    true
+  } catch {
+    case e: Exception =>
+      false
+  }
+
+  private val pattern = Pattern.compile("[^0-9.]")
+
+  def registered_capital_trim(v: String): String = {
+    if (StringUtils.isEmpty(v)) return null
+    if (v.contains("%")) return null
+    val matcher = pattern.matcher(v)
+    try {
+      val a = matcher.replaceAll(" ").split("\\s+")(0)
+      if (StringUtils.isNotBlank(a)) if (isDouble(a)) a
+      else null
+      else null
+    } catch {
+      case exception: ArrayIndexOutOfBoundsException =>
+        null
+    }
+  }
+}