소스 검색

添加企业动态启动参数

许家凯 4 년 전
부모
커밋
d36d594d22
1개의 변경된 파일10개의 추가작업 그리고 46개의 파일을 삭제
  1. 10 46
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamic.scala

+ 10 - 46
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamic.scala

@@ -6,7 +6,6 @@ import com.winhc.bigdata.spark.config.EsConfig
 import com.winhc.bigdata.spark.utils.ReflectUtils.getClazz
 import com.winhc.bigdata.spark.utils.{LoggingUtils, SparkUtils}
 import org.apache.commons.lang3.time.DateFormatUtils
-import org.apache.commons.lang3.StringUtils
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.types.StringType
 import org.apache.spark.sql.{Row, SparkSession}
@@ -23,7 +22,7 @@ object CompanyDynamic {
 
   case class CompanyDynamicUtil(s: SparkSession,
                                 project: String, //表所在工程名
-                                tableName: String, //表名(不加前后辍)
+
                                 ds: String //此维度主键
                                ) extends LoggingUtils with Logging {
     @(transient@getter) val spark: SparkSession = s
@@ -54,8 +53,8 @@ object CompanyDynamic {
            |""".stripMargin)
     }
 
-
-    def calc(): Unit = {
+    //表名(不加前后辍)
+    def calc(tableName: String): Unit = {
       val handle = getClazz[CompanyDynamicHandle](s"com.winhc.bigdata.spark.jobs.dynamic.$tableName")
 
       val types = handle.org_type()
@@ -118,54 +117,19 @@ object CompanyDynamic {
 
 
   def main(args: Array[String]): Unit = {
-    val Array(project, tableName) = args
+    val Array(project, tableName, ds) = args
     val config = EsConfig.getEsConfigMap ++ mutable.Map(
       "spark.hadoop.odps.project.name" -> project,
       "spark.hadoop.odps.spark.local.partition.amt" -> "10"
     )
     val spark = SparkUtils.InitEnv("CompanyDynamic", config)
-    val cd = CompanyDynamicUtil(spark, "winhc_eci_dev", "table_name", "ds")
+    val cd = CompanyDynamicUtil(spark, project, ds)
+
     cd.init()
-    cd.calc()
-    spark.stop()
-    if (args.length == 5) {
-      val Array(project, tableName, rowkey, inc_ds, pf) = args
-      val config = EsConfig.getEsConfigMap ++ mutable.Map(
-        "spark.hadoop.odps.project.name" -> project,
-        "spark.hadoop.odps.spark.local.partition.amt" -> "10"
-      )
-      val spark = SparkUtils.InitEnv("ChangeExtract", config)
-
-
-      CompanyDynamicUtil(spark, project, tableName, inc_ds).calc
-      spark.stop()
-    } else {
-      val ds = args(0)
-      val project = "winhc_eci_dev"
-      val config = EsConfig.getEsConfigMap ++ mutable.Map(
-        "spark.hadoop.odps.project.name" -> project,
-        "spark.hadoop.odps.spark.local.partition.amt" -> "10"
-      )
-      val spark = SparkUtils.InitEnv("CompanyDynamic", config)
-      val rows =
-        """winhc_eci_dev company_tm rowkey 20200717 status_new
-          |winhc_eci_dev company_patent_list rowkey 20200717 lprs
-          |winhc_eci_dev company_copyright_works_list rowkey 20200717 type
-          |winhc_eci_dev company_copyright_reg_list rowkey 20200717 version
-          |winhc_eci_dev company_land_publicity rowkey 20200717 title,location,use_for
-          |winhc_eci_dev company_land_announcement rowkey 20200717 e_number,project_name
-          |winhc_eci_dev company_bid_list rowkey 20200717 title
-          |winhc_eci_dev company_land_transfer rowkey 20200717 num,location
-          |winhc_eci_dev company_employment rowkey 20200717 source
-          |winhc_eci_dev company_env_punishment rowkey 20200717 punish_number
-          |""".stripMargin.replace("20200717", ds)
-      for (r <- rows.split("\r\n")) {
-        if (StringUtils.isNotEmpty(r)) {
-          val Array(tmp, tableName, inc_ds) = r.split(" ")
-          CompanyDynamicUtil(spark, project, tableName, inc_ds).calc()
-        }
-      }
-      spark.stop()
+
+    for (e <- tableName.split(",")) {
+      cd.calc(e)
     }
+    spark.stop()
   }
 }