许家凯 4 лет назад
Родитель
Сommit
1a0b844d32

+ 11 - 3
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamic.scala

@@ -27,7 +27,7 @@ object CompanyDynamic {
                                ) extends LoggingUtils with Logging {
     @(transient@getter) val spark: SparkSession = s
 
-    private val env = "env"
+    private val env = "dev"
     var cleanFlag = false
     val targetTab = "xjk_tmp_company_dynamic"
 
@@ -55,7 +55,7 @@ object CompanyDynamic {
 
     //表名(不加前后辍)
     def calc(tableName: String): Unit = {
-      val handle = getClazz[CompanyDynamicHandle](s"com.winhc.bigdata.spark.jobs.dynamic.$tableName")
+      val handle = getClazz[CompanyDynamicHandle](s"com.winhc.bigdata.spark.jobs.dynamic.tables.$tableName")
 
       val types = handle.org_type()
       val rdd = sql(
@@ -95,7 +95,7 @@ object CompanyDynamic {
       if (!cleanFlag) {
         sql(
           s"""
-             |alter table ${getEnvProjectName(env, project)}.$targetTab drop if exists partition(ds='$ds')
+             |alter table ${getEnvProjectName(env, project)}.$targetTab drop if exists partition(ds='$ds')
              |""".stripMargin)
         cleanFlag = true
       }
@@ -118,6 +118,14 @@ object CompanyDynamic {
 
   def main(args: Array[String]): Unit = {
     val Array(project, tableName, ds) = args
+
+    println(
+      s"""
+         |project: $project
+         |tableNames: $tableName
+         |ds: $ds
+         |""".stripMargin)
+
     val config = EsConfig.getEsConfigMap ++ mutable.Map(
       "spark.hadoop.odps.project.name" -> project,
       "spark.hadoop.odps.spark.local.partition.amt" -> "10"

+ 0 - 30
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamicHandle.scala

@@ -182,33 +182,3 @@ trait CompanyDynamicHandle {
 
 }
 
-//经营异常
-case class company_abnormal_info() extends CompanyDynamicHandle {
-  /**
-   * 信息描述
-   *
-   * @param old_map
-   * @param new_map
-   * @return
-   */
-  override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = new_map.getOrElse("put_reason", null)
-
-  /**
-   * 变更内容
-   *
-   * @param old_map
-   * @param new_map
-   * @return
-   */
-  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String]): String = ???
-
-
-  /**
-   * 风险等级
-   *
-   * @param old_map
-   * @param new_map
-   * @return
-   */
-  override protected def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = ???
-}

+ 34 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_abnormal_info.scala

@@ -0,0 +1,34 @@
+package com.winhc.bigdata.spark.jobs.dynamic.tables
+
+import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
+import com.winhc.bigdata.spark.implicits.MapHelper._
+//经营异常
+case class company_abnormal_info() extends CompanyDynamicHandle {
+  /**
+   * 信息描述
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = new_map.getOrElse("put_reason", null)
+
+  /**
+   * 变更内容
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String]): String = new_map.toJson(Seq("put_department->做出决定机关","remove_department->移出决定机关","put_reason->列入经营异常目录原因","put_date->列入日期","remove_date->移出日期","remove_reason->移出经营异常目录原因"))
+
+
+  /**
+   * 风险等级
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override protected def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "3"
+}

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_env_punishment.scala

@@ -33,7 +33,7 @@ case class company_env_punishment(equCols:Seq[String])extends CompanyDynamicHand
    * @param new_map
    * @return
    */
-  override def get_change_time(new_map: Map[String, String]): String = new_map("biz_date")
+//  override def get_change_time(new_map: Map[String, String]): String = new_map("biz_date")
 
   /**
    * 风险等级

+ 10 - 15
src/main/scala/com/winhc/bigdata/spark/utils/ChangeExtractUtils.scala

@@ -2,6 +2,7 @@ package com.winhc.bigdata.spark.utils
 
 import org.apache.commons.lang3.StringUtils
 import com.winhc.bigdata.spark.implicits.MapHelper._
+
 /**
  * @Author: XuJiakai
  * @Date: 2020/7/7 13:59
@@ -16,22 +17,15 @@ object ChangeExtractUtils {
 
   //获取指定字段集的标签Json
   def getTags(fldMap: Map[String, String], type_val: String, fields: Array[String]): String = {
-    val json: StringBuilder = new StringBuilder(s"""{"type":${getValueOrNull(type_val)},""")
-    fields.foreach(item =>
-      if (item.contains("->")) {
-        val Array(key, keyAlias) = item.split("->")
-        json.append(s"${getValueOrNull(keyAlias)}")
-        json.append(s":${getValueOrNull(fldMap(key))},")
-      } else {
-        json.append(s"${getValueOrNull(item)}")
-        json.append(s":${getValueOrNull(fldMap(item))},")
-      }
-    )
-    json.deleteCharAt(json.lastIndexOf(",")).append("}").toString.trim
+    val json = fldMap.toJson(fields).substring(1)
+    if (json.length == 1) {
+      s"""{"type":${getValueOrNull(type_val)}""" + json
+    } else {
+      s"""{"type":${getValueOrNull(type_val)},""" + json
+    }
   }
 
 
-
   private def getValueOrNull(value: String): String = {
     if (StringUtils.isNotBlank(value)) {
       "\"" + value + "\""
@@ -42,7 +36,8 @@ object ChangeExtractUtils {
 
 
   def main(args: Array[String]): Unit = {
-    val map = Map("a"->"b","b"->"c")
-    println(map.toJson(Seq("a->你","b")))
+    val map = Map("a" -> "b", "b" -> "c")
+    println(map.toJson(Seq()))
+    println(getTags(map, "a", Array[String]()))
   }
 }

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/utils/LoggingUtils.scala

@@ -150,7 +150,7 @@ trait LoggingUtils {
 
   def getEnvProjectName(env: String, projectName: String): String = {
     if (env.equals("dev")) {
-      env
+      projectName
     } else {
       projectName.substring(0, projectName.length - 4)
     }