Browse Source

feat: 增减资记录代码

许家凯 3 years ago
parent
commit
c51892930c

+ 126 - 0
src/main/scala/com/winhc/bigdata/spark/ng/jobs/RegisteredCapitalInfo.scala

@@ -0,0 +1,126 @@
+package com.winhc.bigdata.spark.ng.jobs
+
+import com.winhc.bigdata.spark.config.EsConfig
+import com.winhc.bigdata.spark.ng.utils.StartAndEndDsUtils
+import com.winhc.bigdata.spark.udf.{BaseFunc, CompanyIndexFunc}
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.{LoggingUtils, RegisteredCapitalUtil, SparkUtils}
+import org.apache.spark.sql.SparkSession
+import com.winhc.bigdata.spark.implicits.CaseClass2JsonHelper._
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/9/15 14:10
+ *        增减资计算
+ */
+case class RegisteredCapitalInfo(s: SparkSession,
+                                 project: String,
+                                 target_tn: String,
+                                 symbol: String
+                                ) extends LoggingUtils with BaseFunc with CompanyIndexFunc {
+  @(transient@getter) val spark: SparkSession = s
+
+  val target_tab = s"$project.ads_$target_tn"
+  val inc_target_tab = s"$project.inc_ads_$target_tn"
+  init()
+
+  def init(): Unit = {
+    if (!spark.catalog.tableExists(target_tab)) {
+      sql(
+        s"""
+           |CREATE TABLE $target_tab like $project.ads_company_change
+           |""".stripMargin)
+    }
+
+    if (!spark.catalog.tableExists(inc_target_tab)) {
+      sql(
+        s"""
+           |CREATE TABLE $inc_target_tab like $project.ads_company_change
+           |""".stripMargin)
+    }
+
+    spark.udf.register("registered_capital_trim", RegisteredCapitalUtil.registered_capital_trim _)
+  }
+
+
+  def calc(): Unit = {
+    val org_tab = s"$project.ads_company_change"
+    val inc_org_tab = s"$project.inc_ads_company_change"
+    val aaa = StartAndEndDsUtils(spark).get_start_and_end_args(org_tab, inc_org_tab, target_tab)
+    println(aaa.toJson())
+    if (aaa.inc) {
+      sql(
+        s"""
+           |SELECT  *
+           |FROM    (
+           |            SELECT  *
+           |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds DESC ) AS num
+           |            FROM    (
+           |                     select * from $inc_org_tab where ds > ${aaa.inc_tab_gt_ds}
+           |                     ) AS all_t1
+           |        ) AS all_t2
+           |WHERE   all_t2.num = 1
+           |AND     category LIKE '%注册资本%'
+           |AND     registered_capital_trim(content_before) IS NOT NULL
+           |AND     registered_capital_trim(content_after) IS NOT NULL
+           |AND     CAST(registered_capital_trim(content_before) AS DOUBLE ) $symbol CAST( registered_capital_trim(content_after) AS DOUBLE )
+           |""".stripMargin)
+        .createTempView(s"all_change_tab_$target_tn")
+
+    } else {
+      sql(
+        s"""
+           |SELECT  *
+           |FROM    (
+           |            SELECT  *
+           |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds DESC ) AS num
+           |            FROM    (
+           |                     select * from $org_tab where ds = ${aaa.inc_tab_gt_ds}
+           |                     UNION ALL
+           |                     select * from $inc_org_tab where ds > ${aaa.inc_tab_gt_ds}
+           |                     ) AS all_t1
+           |        ) AS all_t2
+           |WHERE   all_t2.num = 1
+           |AND     category LIKE '%注册资本%'
+           |AND     registered_capital_trim(content_before) IS NOT NULL
+           |AND     registered_capital_trim(content_after) IS NOT NULL
+           |AND     CAST(registered_capital_trim(content_before) AS DOUBLE ) $symbol CAST( registered_capital_trim(content_after) AS DOUBLE )
+           |""".stripMargin)
+        .createTempView(s"all_change_tab_$target_tn")
+    }
+
+    val target_cols = getColumns(target_tab)
+
+    val out_target_tab = if (aaa.inc) inc_target_tab else target_tab
+
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $out_target_tab PARTITION(ds='${aaa.target_ds}')
+         |SELECT  ${target_cols.diff(Seq("ds")).mkString(",")}
+         |FROM    all_change_tab_$target_tn
+         |""".stripMargin)
+  }
+}
+
+object RegisteredCapitalInfo {
+  def main(args: Array[String]): Unit = {
+    val project = "winhc_ng"
+    val config = EsConfig.getEsConfigMap ++ mutable.Map(
+      "spark.hadoop.odps.project.name" -> project,
+      "spark.debug.maxToStringFields" -> "200",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "100000"
+    )
+    val spark = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+
+    for (t <- Seq(
+      ("reduction_registered_capital_info", ">")
+      , ("increase_registered_capital_info", "<")
+    )) {
+      RegisteredCapitalInfo(spark, project = project, target_tn = t._1, t._2).calc()
+    }
+    spark.stop()
+  }
+}

+ 4 - 4
src/main/scala/com/winhc/bigdata/spark/ng/utils/StartAndEndDsUtils.scala

@@ -18,7 +18,7 @@ case class StartAndEndDsArgs(inc: Boolean // true 写出到增量表或只读增
                             )
 
 case class StartAndEndDsUtils(s: SparkSession
-                             ) extends LoggingUtils with BaseFunc{
+                             ) extends LoggingUtils with BaseFunc {
   @(transient@getter) val spark: SparkSession = s
 
 
@@ -76,7 +76,7 @@ case class StartAndEndDsUtils(s: SparkSession
     if (target_ds == null) {
       throw new RuntimeException("全量来源表为空 !")
     }
-    StartAndEndDsArgs(false, target_ds = target_ds, null)
+    StartAndEndDsArgs(false, target_ds = target_ds, getLastPartitionsOrElse(org_tab, null))
   }
 
 
@@ -168,8 +168,8 @@ object StartAndEndDsUtils {
     /* val b = a.get_start_and_end_args(org_tab, inc_org_tab, target_tab)
      println(b)*/
 
-   /* val c = a.get_start_and_end_args("winhc_ng.ads_company_dishonest_info", "winhc_ng.inc_ads_company_dishonest_info", "winhc_ng.bds_credit_punishment_data_extraction")
-    println(c)*/
+    /* val c = a.get_start_and_end_args("winhc_ng.ads_company_dishonest_info", "winhc_ng.inc_ads_company_dishonest_info", "winhc_ng.bds_credit_punishment_data_extraction")
+     println(c)*/
 
 
     val d = a.get_gt_ds("winhc_ng.bds_credit_punishment_data_extraction", "winhc_ng.bds_credit_punishment_case_info")