Преглед изворни кода

fix: 企业动态引入聚合参数

- 企业动态聚合
- spark区分临时表
许家凯 пре 4 година
родитељ
комит
32bd810223

+ 8 - 3
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamic.scala

@@ -184,7 +184,7 @@ object CompanyDynamic {
     , Args(tableName = "bankruptcy_open_case", bName = 1)
     , Args(tableName = "company_illegal_info", bName = 1)
     , Args(tableName = "company_land_publicity", bName = 1)
-//    , Args(tableName = "company_employment", bName = 1)
+    , Args(tableName = "company_employment", bName = 1, isAgg = true)
     , Args(tableName = "company_land_announcement", bName = 1)
     , Args(tableName = "company_bid_list", bName = 2)
     , Args(tableName = "company_land_transfer", bName = 1)
@@ -222,7 +222,8 @@ object CompanyDynamic {
 
   private case class Args(project: String = "winhc_eci_dev"
                           , tableName: String
-                          , bName: Int = 1)
+                          , bName: Int = 1
+                          , isAgg: Boolean = false)
 
   def main(args: Array[String]): Unit = {
 
@@ -260,7 +261,11 @@ object CompanyDynamic {
     }
 
     val a = start.map(e => (e.tableName, () => {
-      cd.calc(e.tableName, e.bName)
+      if (e.isAgg) {
+        CompanyDynamicForDayCount(spark, project, ds).calc(e.tableName, e.bName)
+      } else {
+        cd.calc(e.tableName, e.bName)
+      }
       true
     }))
 

+ 5 - 7
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamicForDayCount.scala

@@ -1,6 +1,5 @@
 package com.winhc.bigdata.spark.jobs.dynamic
 
-import java.util
 import java.util.Date
 
 import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamic.{env, targetTab}
@@ -14,7 +13,6 @@ import org.apache.spark.sql.{Row, SparkSession}
 
 import scala.annotation.meta.getter
 import scala.collection.immutable.ListMap
-import scala.collection.mutable
 
 case class CompanyDynamicForDayCount(s: SparkSession,
                                      project: String, //表所在工程名
@@ -90,12 +88,12 @@ case class CompanyDynamicForDayCount(s: SparkSession,
       })
       .rdd.flatMap(r => {
       val cid = r.getAs[String]("cid")
-      val biz_date = ds.substring(0,4)+"-"+ds.substring(4,6)+"-"+ds.substring(6) +" 00:00:00"
+      val biz_date = ds.substring(0, 4) + "-" + ds.substring(4, 6) + "-" + ds.substring(6) + " 00:00:00"
       val cnt = r.getAs[Long]("cnt")
       val cname = r.getAs[String]("cname")
-      val new_map = Map("cnt" -> (cnt+""))
+      val new_map = Map("cnt" -> (cnt + ""))
 
-      val result = handle.handle(cid+biz_date, biz_date, cid, null, null, new_map, cname)
+      val result = handle.handle(cid + biz_date, biz_date, cid, null, null, new_map, cname)
       if (result == null) {
         None
       }
@@ -119,7 +117,7 @@ case class CompanyDynamicForDayCount(s: SparkSession,
       , "create_time" -> StringType
     ))
     spark.createDataFrame(rdd, schema)
-      .createOrReplaceTempView("company_dynamic_tmp")
+      .createOrReplaceTempView("company_dynamic_tmp" + tableName)
 
     val cols = getColumns(s"$project.$targetTab").filter(!_.equals("ds")).filter(!_.equals("tn"))
 
@@ -128,7 +126,7 @@ case class CompanyDynamicForDayCount(s: SparkSession,
          |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${getEnvProjectName(env, project)}.$targetTab PARTITION(ds='$ds',tn='$tableName')
          |SELECT ${cols.mkString(",")}
          |FROM
-         |    company_dynamic_tmp
+         |    company_dynamic_tmp$tableName
          |""".stripMargin)
   }
 }