Explorar el Código

添加环保处罚简单维度的动态处理

晏永年 hace 4 años
padre
commit
98d9bf9a39

+ 39 - 8
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamic.scala

@@ -4,6 +4,7 @@ import com.winhc.bigdata.spark.config.EsConfig
 import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
 import com.winhc.bigdata.spark.utils.ReflectUtils.getClazz
 import com.winhc.bigdata.spark.utils.{LoggingUtils, SparkUtils}
+import org.apache.commons.lang3.StringUtils
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.types.StringType
 import org.apache.spark.sql.{Row, SparkSession}
@@ -77,14 +78,44 @@ object CompanyDynamic {
 
 
   def main(args: Array[String]): Unit = {
-    val Array(project, tableName) = args
-    val config = EsConfig.getEsConfigMap ++ mutable.Map(
-      "spark.hadoop.odps.project.name" -> project,
-      "spark.hadoop.odps.spark.local.partition.amt" -> "10"
-    )
-    val spark = SparkUtils.InitEnv("CompanyDynamic", config)
-    CompanyDynamicUtil(spark, "winhc_eci_dev", "table_name", "ds").calc()
-    spark.stop()
+    if (args.length == 5) {
+      val Array(project, tableName, rowkey, inc_ds, pf) = args
+      val config = EsConfig.getEsConfigMap ++ mutable.Map(
+        "spark.hadoop.odps.project.name" -> project,
+        "spark.hadoop.odps.spark.local.partition.amt" -> "10"
+      )
+      val spark = SparkUtils.InitEnv("ChangeExtract", config)
 
+
+      CompanyDynamicUtil(spark, project, tableName, inc_ds).calc
+      spark.stop()
+    } else {
+      val ds = args(0)
+      val project = "winhc_eci_dev"
+      val config = EsConfig.getEsConfigMap ++ mutable.Map(
+        "spark.hadoop.odps.project.name" -> project,
+        "spark.hadoop.odps.spark.local.partition.amt" -> "10"
+      )
+      val spark = SparkUtils.InitEnv("CompanyDynamic", config)
+      val rows =
+        """winhc_eci_dev company_tm rowkey 20200717 status_new
+          |winhc_eci_dev company_patent_list rowkey 20200717 lprs
+          |winhc_eci_dev company_copyright_works_list rowkey 20200717 type
+          |winhc_eci_dev company_copyright_reg_list rowkey 20200717 version
+          |winhc_eci_dev company_land_publicity rowkey 20200717 title,location,use_for
+          |winhc_eci_dev company_land_announcement rowkey 20200717 e_number,project_name
+          |winhc_eci_dev company_bid_list rowkey 20200717 title
+          |winhc_eci_dev company_land_transfer rowkey 20200717 num,location
+          |winhc_eci_dev company_employment rowkey 20200717 source
+          |winhc_eci_dev company_env_punishment rowkey 20200717 punish_number
+          |""".stripMargin.replace("20200717", ds)
+      for (r <- rows.split("\r\n")) {
+        if (StringUtils.isNotEmpty(r)) {
+          val Array(tmp, tableName, inc_ds) = r.split(" ")
+          CompanyDynamicUtil(spark, project, tableName, inc_ds).calc()
+        }
+      }
+      spark.stop()
+    }
   }
 }

+ 1 - 48
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamicHandle.scala

@@ -88,7 +88,7 @@ trait CompanyDynamicHandle {
    * @param rowkey
    * @return
    */
-  def get_biz_id(rowkey: String): String
+  def get_biz_id(rowkey: String): String = rowkey.split("_")(1)
 
   /**
    * 子信息类型,小类
@@ -107,50 +107,3 @@ trait CompanyDynamicHandle {
   def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String
 
 }
-
-case class CompanyDynamicHandleTest() extends CompanyDynamicHandle {
-  /**
-   * 信息描述
-   *
-   * @param old_map
-   * @param new_map
-   * @return
-   */
-  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = ???
-
-
-  /**
-   * 变更内容
-   *
-   * @param old_map
-   * @param new_map
-   * @return
-   */
-  override def get_change_content(old_map: Map[String, String], new_map: Map[String, String]): String = ???
-
-  /**
-   * 变更时间
-   *
-   * @param new_map
-   * @return
-   */
-  override def get_change_time(new_map: Map[String, String]): String = ???
-
-  /**
-   * 业务id
-   *
-   * @param rowkey
-   * @return
-   */
-  override def get_biz_id(rowkey: String): String = ???
-
-
-  /**
-   * 风险等级
-   *
-   * @param old_map
-   * @param new_map
-   * @return
-   */
-  override def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = ???
-}

+ 46 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_env_punishment.scala

@@ -0,0 +1,46 @@
+package com.winhc.bigdata.spark.jobs.dynamic.tables
+
+import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
+
+/**
+ * @Author yyn
+ * @Date 2020/7/27
+ * @Description TODO
+ */
+//环保处罚
+case class company_env_punishment(equCols:Seq[String])extends CompanyDynamicHandle {
+  /**
+   * 信息描述
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = new_map("title")
+
+  /**
+   * 变更内容
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override def get_change_content(old_map: Map[String, String], new_map: Map[String, String]): String = new_map("label")
+
+  /**
+   * 变更时间
+   *
+   * @param new_map
+   * @return
+   */
+  override def get_change_time(new_map: Map[String, String]): String = new_map("biz_date")
+
+  /**
+   * 风险等级
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "警示信息"
+}