晏永年 4 年之前
父節點
當前提交
b2d2ead733

+ 8 - 7
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamic.scala

@@ -11,6 +11,7 @@ import org.apache.spark.sql.types.StringType
 import org.apache.spark.sql.{Row, SparkSession}
 
 import scala.annotation.meta.getter
+import scala.collection.immutable.ListMap
 import scala.collection.mutable
 
 /**
@@ -29,7 +30,7 @@ object CompanyDynamic {
 
     private val env = "dev"
     var cleanFlag = false
-    val targetTab = "xjk_tmp_company_dynamic"
+    val targetTab = "ads_radar_rta_result"
 
     def init(): Unit = {
       sql(
@@ -71,13 +72,13 @@ object CompanyDynamic {
         val cid = r.getAs[String]("cid")
         val new_data = r.getAs[Map[String, String]]("data")
         val old_data = r.getAs[Map[String, String]]("old_data")
-        val biz_time = r.getAs[String]("biz_time")
+        val biz_date = r.getAs[String]("biz_date")
         val fields = r.getAs[String]("fields")
-        val res = handle.handle(rowkey, biz_time, cid, fields.split(","), old_data, new_data)
+        val res = handle.handle(rowkey, biz_date, cid, if (fields!=null) fields.split(",") else null, old_data, new_data)
         Row(cid, res._1, res._2, res._3, res._4, res._5, res._6, res._7, res._8, DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss"))
       })
 
-      val schema = getSchema(Map(
+      val schema = getSchema(ListMap(
         "cid" -> StringType
         , "info_type" -> StringType
         , "rta_desc" -> StringType
@@ -102,13 +103,13 @@ object CompanyDynamic {
 
       val cols = getColumns(s"$project.$targetTab").filter(!_.equals("ds"))
 
-      sql(
+      println(sql(
         s"""
            |INSERT INTO TABLE ${getEnvProjectName(env, project)}.$targetTab PARTITION(ds='$ds')
            |SELECT ${cols.mkString(",")}
            |FROM
            |    company_dynamic_tmp
-           |""".stripMargin)
+           |""".stripMargin))
 
     }
 
@@ -133,7 +134,7 @@ object CompanyDynamic {
     val spark = SparkUtils.InitEnv("CompanyDynamic", config)
     val cd = CompanyDynamicUtil(spark, project, ds)
 
-    cd.init()
+//    cd.init()
 
     for (e <- tableName.split(",")) {
       cd.calc(e)

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamicHandle.scala

@@ -27,7 +27,7 @@ trait CompanyDynamicHandle {
     , "" -> "" //对外投资
     , "" -> "eci_administrativepenalty" //行政处罚
     , "" -> "eci_chattel" //动产抵押
-    , "" -> "env_punishment" //环保处罚
+    , "company_env_punishment" -> "env_punishment" //环保处罚
     , "" -> "judicial_assistance" //股权冻结
     , "" -> "publish_notice" //公示催告
     , "" -> "serious_violation" //严重违法
@@ -60,7 +60,7 @@ trait CompanyDynamicHandle {
     , "" -> "13" // 招聘信息
     , "" -> "14" // 行政处罚
     , "" -> "15" // 公示催告
-    , "" -> "16" // 环保处罚
+    , "company_env_punishment" -> "16" // 环保处罚
     , "" -> "17" // 股权出质
     , "" -> "18" // 严重违法
     , "" -> "19" // 简易注销

+ 3 - 3
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_env_punishment.scala

@@ -8,7 +8,7 @@ import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
  * @Description TODO
  */
 //环保处罚
-case class company_env_punishment(equCols:Seq[String])extends CompanyDynamicHandle {
+case class company_env_punishment()extends CompanyDynamicHandle {
   /**
    * 信息描述
    *
@@ -16,7 +16,7 @@ case class company_env_punishment(equCols:Seq[String])extends CompanyDynamicHand
    * @param new_map
    * @return
    */
-  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = new_map("title")
+  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = new_map("punish_number")
 
   /**
    * 变更内容
@@ -25,7 +25,7 @@ case class company_env_punishment(equCols:Seq[String])extends CompanyDynamicHand
    * @param new_map
    * @return
    */
-  override def get_change_content(old_map: Map[String, String], new_map: Map[String, String]): String = new_map("label")
+  override def get_change_content(old_map: Map[String, String], new_map: Map[String, String]): String = new_map("content")
 
   /**
    * 变更时间