Explorar el Código

fix: 企业变更及动态调整

- 企业变更mapping表sql分区调整
- 企业动态兼容多种类型表同时运行
许家凯 hace 4 años
padre
commit
4b1f265d00

+ 2 - 12
src/main/scala/com/winhc/bigdata/spark/jobs/chance/ChangeExtract.scala

@@ -64,22 +64,16 @@ object ChangeExtract {
 
       val update_time = BaseUtil.nowDate()
 
-      val inc_ods_company = s"${project}.inc_ods_company" //每日公司基本信息增量
-      val inc_ods_company_tb = s"${project}.inc_ods_$tableName" //增量ods表
-      //增量ads最后一个分区
-      val lastDsIncAds = BaseUtil.getPartion(s"$project.inc_ads_$tableName", spark)
-
       sql(
         s"""
            |SELECT  cid,current_cid as new_cid
-           |FROM    ${inc_ods_company}
+           |FROM    ${project}.inc_ods_company
            |WHERE   ds > $lastDs_ads_all and ds < $ds
            |AND     cid IS NOT NULL
            |AND     current_cid IS NOT NULL
            |GROUP BY cid,current_cid
            |""".stripMargin).createOrReplaceTempView("mapping")
 
-
       val cid = getColumns(s"$project.ads_$tableName").filter(f => f.equals("cid") || f.equals("new_cid")).max
 
       val rdd = sql(
@@ -96,11 +90,7 @@ object ChangeExtract {
            |        ) AS t1
            |JOIN    (
            |             SELECT  concat_ws('_',coalesce(mm.new_cid,tmp.$cid),split(rowkey, '_')[1]) AS rowkey
-           |                     ,${
-          intersectCols.filter(s => {
-            !s.equals("rowkey") && !s.equals("cid") && !s.equals("new_cid")
-          }).mkString(",")
-        }
+           |                     ,${intersectCols.diff(Seq("rowkey", "cid", "new_cid")).mkString(",")}
            |                     ,coalesce(mm.new_cid,tmp.$cid) AS new_cid
            |                     ,tmp.$cid as cid
            |                     ,c

+ 24 - 9
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamic.scala

@@ -24,8 +24,8 @@ object CompanyDynamic {
 
   case class CompanyDynamicUtil(s: SparkSession,
                                 project: String, //表所在工程名
-                                ds: String, //此维度主键
-                                bName: Boolean = false //是否补充cname字段
+                                ds: String //此维度主键
+
                                ) extends LoggingUtils with Logging {
     @(transient@getter) val spark: SparkSession = s
 
@@ -56,7 +56,9 @@ object CompanyDynamic {
     }
 
     //表名(不加前后辍)
-    def calc(tableName: String): Unit = {
+    def calc(tableName: String
+             , bName: Boolean = false //是否补充cname字段
+            ): Unit = {
       val handle = getClazz[CompanyDynamicHandle](s"com.winhc.bigdata.spark.jobs.dynamic.tables.$tableName")
 
       val types = handle.org_type()
@@ -65,7 +67,7 @@ object CompanyDynamic {
           case false =>
             s"""
                |SELECT  *,null AS cname
-               |FROM    ${getEnvProjectName(env, project)}.ads_change_extract
+               |FROM    ${project}.ads_change_extract
                |WHERE   ds = '$ds'
                |AND     tn = '$tableName'
                |AND     TYPE in (${types.map("'" + _ + "'").mkString(",")})
@@ -75,14 +77,14 @@ object CompanyDynamic {
                |SELECT A.*,B.cname AS cname
                |FROM(
                |  SELECT  *
-               |  FROM    ${getEnvProjectName(env, project)}.ads_change_extract
+               |  FROM    ${project}.ads_change_extract
                |  WHERE   ds = '$ds'
                |  AND     tn = '$tableName'
                |  AND     TYPE in (${types.map("'" + _ + "'").mkString(",")})
                |) AS A
                |LEFT JOIN (
-               |    SELECT cid,cname FROM ${getEnvProjectName(env, project)}.base_company_mapping
-               |    WHERE ds=(SELECT MAX(ds) FROM ${getEnvProjectName(env, project)}.base_company_mapping WHERE ds>'0')
+               |    SELECT cid,cname FROM  $project.base_company_mapping
+               |    WHERE ds = '${getLastPartitionsOrElse(project + "base_company_mapping", "0")}'
                |) AS B
                |ON A.cid = B.cid
                |""".stripMargin
@@ -134,7 +136,16 @@ object CompanyDynamic {
 
 
   def main(args: Array[String]): Unit = {
-    val Array(project, tableName, ds, bName) = if (args.length >= 4) args else args :+ "false"
+    if (args.length < 3 || args.length > 4) {
+      println(
+        s"""
+           |Please enter the legal parameters !
+           |<project> <ds> <tableNames> [cname_tableNames]
+           |""".stripMargin)
+      sys.exit(-99)
+    }
+
+    val Array(project, ds, tableName, bName_table) = if (args.length == 4) args else args :+ ""
 
     println(
       s"""
@@ -148,13 +159,17 @@ object CompanyDynamic {
       "spark.hadoop.odps.spark.local.partition.amt" -> "10"
     )
     val spark = SparkUtils.InitEnv("CompanyDynamic", config)
-    val cd = CompanyDynamicUtil(spark, project, ds, bName.toBoolean)
+    val cd = CompanyDynamicUtil(spark, project, ds)
 
     cd.init()
 
     for (e <- tableName.split(",")) {
       cd.calc(e)
     }
+
+    for (e <- bName_table.split(",")) {
+      cd.calc(e, bName = true)
+    }
     spark.stop()
   }
 }