Переглянути джерело

feat: 变更和动态主函数入口参数调整

许家凯 4 роки тому
батько
коміт
1dfbe8e709

+ 63 - 26
src/main/scala/com/winhc/bigdata/spark/jobs/chance/ChangeExtract.scala

@@ -3,7 +3,6 @@ package com.winhc.bigdata.spark.jobs.chance
 import com.winhc.bigdata.spark.config.EsConfig
 import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
 import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, ReflectUtils, SparkUtils}
-import org.apache.commons.lang3.StringUtils
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.types.{MapType, StringType, StructField, StructType}
@@ -239,17 +238,49 @@ object ChangeExtract {
 
 
   // winhc_eci_dev company cid 20200630 legal_entity_id,reg_location,business_scope,reg_status,reg_capital,emails,phones
+
+
+  private val startArgs = Seq(
+    Args(tableName = "company_tm", primaryFields = "status_new")
+    , Args(tableName = "company_patent_list", primaryFields = "lprs")
+    , Args(tableName = "company_copyright_works_list", primaryFields = "type")
+    , Args(tableName = "company_copyright_reg_list", primaryFields = "version")
+    , Args(tableName = "company_land_publicity", primaryFields = "title,location,use_for")
+    , Args(tableName = "company_land_announcement", primaryFields = "e_number,project_name")
+    , Args(tableName = "company_bid_list", primaryFields = "title")
+    , Args(tableName = "company_land_transfer", primaryFields = "num,location")
+    , Args(tableName = "company_employment", primaryFields = "source")
+    , Args(tableName = "company_env_punishment", primaryFields = "punish_number")
+    , Args(tableName = "company_icp", primaryFields = "domain")
+
+    , Args(tableName = "company_certificate", primaryFields = "type")
+    , Args(tableName = "company_abnormal_info", primaryFields = "remove_reason")
+
+    , Args(tableName = "company_own_tax", primaryFields = "tax_balance,tax_category,tax_num")
+
+    , Args(tableName = "company_equity_info", primaryKey = "id", primaryFields = "reg_number", isCopy = false)
+    //    , Args(tableName = "company_staff", primaryFields = "staff_type")
+
+  )
+
+  private case class Args(project: String = "winhc_eci_dev"
+                          , tableName: String
+                          , primaryKey: String = "rowkey"
+                          , primaryFields: String
+                          , isCopy: Boolean = true)
+
   def main(args: Array[String]): Unit = {
-    if (args.length >= 5 && args.length <= 6) {
-      val Array(project, tableName, rowkey, inc_ds, pf, isCopy) = if (args.length == 6) args else args :+ "true"
+    if (args.length == 2) {
+      val Array(tableName, inc_ds) = args
+
+      val e = startArgs.filter(_.tableName.equals(tableName)).head
       val config = EsConfig.getEsConfigMap ++ mutable.Map(
-        "spark.hadoop.odps.project.name" -> project,
+        "spark.hadoop.odps.project.name" -> e.project,
         "spark.hadoop.odps.spark.local.partition.amt" -> "10"
       )
       val spark = SparkUtils.InitEnv("ChangeExtract", config)
 
-
-      ChangeExtractHandle(spark, project, tableName, rowkey, inc_ds, pf.split(",")).calc(isCopy.toBoolean)
+      ChangeExtractHandle(spark, e.project, tableName, e.primaryKey, inc_ds, e.primaryFields.split(",")).calc(e.isCopy)
       spark.stop()
     } else {
       val ds = args(0)
@@ -259,26 +290,32 @@ object ChangeExtract {
         "spark.hadoop.odps.spark.local.partition.amt" -> "10"
       )
       val spark = SparkUtils.InitEnv("ChangeExtract", config)
-      val rows =
-        """winhc_eci_dev company_tm rowkey 20200717 status_new
-          |winhc_eci_dev company_patent_list rowkey 20200717 lprs
-          |winhc_eci_dev company_copyright_works_list rowkey 20200717 type
-          |winhc_eci_dev company_copyright_reg_list rowkey 20200717 version
-          |winhc_eci_dev company_land_publicity rowkey 20200717 title,location,use_for
-          |winhc_eci_dev company_land_announcement rowkey 20200717 e_number,project_name
-          |winhc_eci_dev company_bid_list rowkey 20200717 title
-          |winhc_eci_dev company_land_transfer rowkey 20200717 num,location
-          |winhc_eci_dev company_employment rowkey 20200717 source
-          |winhc_eci_dev company_env_punishment rowkey 20200717 punish_number
-          |winhc_eci_dev company_icp rowkey 20200717 domain
-          |""".stripMargin.replace("20200717", ds)
-      for (r <- rows.split("\r\n")) {
-        if (StringUtils.isNotEmpty(r)) {
-          val as = r.split(" ")
-          val Array(tmp, tableName, rowkey, inc_ds, pf, isCopy) = if (as.length == 6) as else as :+ "true"
-          ChangeExtractHandle(spark, project, tableName, rowkey, inc_ds, pf.split(",")).calc(isCopy.toBoolean)
-        }
-      }
+
+      startArgs.foreach(e => {
+        ChangeExtractHandle(spark, e.project, e.tableName, e.primaryKey, ds, e.primaryFields.split(",")).calc(e.isCopy)
+      })
+
+
+      /* val rows =
+         """winhc_eci_dev company_tm rowkey 20200717 status_new
+           |winhc_eci_dev company_patent_list rowkey 20200717 lprs
+           |winhc_eci_dev company_copyright_works_list rowkey 20200717 type
+           |winhc_eci_dev company_copyright_reg_list rowkey 20200717 version
+           |winhc_eci_dev company_land_publicity rowkey 20200717 title,location,use_for
+           |winhc_eci_dev company_land_announcement rowkey 20200717 e_number,project_name
+           |winhc_eci_dev company_bid_list rowkey 20200717 title
+           |winhc_eci_dev company_land_transfer rowkey 20200717 num,location
+           |winhc_eci_dev company_employment rowkey 20200717 source
+           |winhc_eci_dev company_env_punishment rowkey 20200717 punish_number
+           |winhc_eci_dev company_icp rowkey 20200717 domain
+           |""".stripMargin.replace("20200717", ds)
+       for (r <- rows.split("\r\n")) {
+         if (StringUtils.isNotEmpty(r)) {
+           val as = r.split(" ")
+           val Array(tmp, tableName, rowkey, inc_ds, pf, isCopy) = if (as.length == 6) as else as :+ "true"
+           ChangeExtractHandle(spark, project, tableName, rowkey, inc_ds, pf.split(",")).calc(isCopy.toBoolean)
+         }
+       }*/
       spark.stop()
     }
   }

+ 23 - 18
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamic.scala

@@ -67,7 +67,7 @@ object CompanyDynamic {
       }).seq
       val rdd = sql(
         bName match {
-//默认:无需补全cname字段
+          //默认:无需补全cname字段
           case 0 =>
             s"""
                |SELECT  *,null AS cname
@@ -76,7 +76,7 @@ object CompanyDynamic {
                |AND     tn = '$tableName'
                |AND     TYPE in (${types.map("'" + _ + "'").mkString(",")})
                |""".stripMargin
-//需根据cid补全cname字段数据
+          //需根据cid补全cname字段数据
           case 1 =>
             s"""
                |SELECT A.*,B.cname AS cname
@@ -93,7 +93,7 @@ object CompanyDynamic {
                |) AS B
                |ON A.cid = B.cid
                |""".stripMargin
-//既需根据cid补全cname字段数据;还需在data字段中根据其中的cid添加对应cname数据
+          //既需根据cid补全cname字段数据;还需在data字段中根据其中的cid添加对应cname数据
           case 2 =>
             s"""
                |SELECT cid, ${colsExclusiveSome.mkString(",")},old_data, cname, STR_TO_MAP(regexp_replace(concat_ws(',',data),'[{"}]',''),',',':') AS data
@@ -175,23 +175,33 @@ object CompanyDynamic {
     }
   }
 
+  private val startArgs = Seq(
+    Args(tableName = "company_abnormal_info", bName = 0)
+    , Args(tableName = "company_equity_info")
+  )
+
+  private case class Args(project: String = "winhc_eci_dev"
+                          , tableName: String
+                          , bName: Int = 1)
 
   def main(args: Array[String]): Unit = {
-    if (args.length < 3 || args.length > 4) {
+
+
+    if (args.length != 3) {
       println(
         s"""
            |Please enter the legal parameters !
-           |<project> <ds> <tableNames> [cname_tableNames]
+           |<project> <ds> <tableNames>
            |""".stripMargin)
       sys.exit(-99)
     }
 
-    val Array(project, tableName, ds, bName_table) = if (args.length == 4) args else args :+ "0"
+    val Array(project, tableNames, ds) = args
 
     println(
       s"""
          |project: $project
-         |tableNames: $tableName
+         |tableNames: $tableNames
          |ds: $ds
          |""".stripMargin)
 
@@ -201,20 +211,15 @@ object CompanyDynamic {
     )
     val spark = SparkUtils.InitEnv("CompanyDynamic", config)
     val cd = CompanyDynamicUtil(spark, project, ds)
-
     cd.init()
-/*
 
-    for (e <- tableName.split(",")) {
-      if (e.length > 2) {
-        cd.calc(e, bName_table.toInt)
-      }
-    }
+    val ts = tableNames.split(",").toSet
 
-*/
-    for (e <- tableName.split(",")) {
-      cd.calc(e, bName_table.toInt)
-    }
+    startArgs.filter(e => {
+      ts.contains(e.tableName)
+    }).foreach(e => {
+      cd.calc(e.tableName, e.bName)
+    })
     spark.stop()
   }
 }