Bläddra i källkod

更新处理job方式

xufei 3 år sedan
förälder
incheckning
2e914d969d

+ 24 - 16
src/main/scala/com/winhc/bigdata/spark/ng/relation/lookup_tab_pid.scala

@@ -4,7 +4,7 @@ import com.alibaba.fastjson.{JSON, JSONArray, JSONPath}
 import com.winhc.bigdata.spark.config.EsConfig
 import com.winhc.bigdata.spark.udf.{BaseFunc, CompanyMapping}
 import com.winhc.bigdata.spark.utils.BaseUtil.{isWindows, is_json_str}
-import com.winhc.bigdata.spark.utils.{BaseUtil, CompanyRelationUtils, LoggingUtils, SparkUtils}
+import com.winhc.bigdata.spark.utils.{AsyncExtract, BaseUtil, CompanyRelationUtils, LoggingUtils, SparkUtils}
 import org.apache.commons.lang3.StringUtils
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.functions.{col, struct, to_json}
@@ -65,10 +65,10 @@ object args_job {
       , explode = "LATERAL VIEW explode( split(concat_ws('\u0001', cg_assignee_id, cg_executed_person_id, gn_executed_person_id, lf_executed_person_id, fz_executed_person_id, executed_person_id),'\u0001') ) key AS key_no"
       , keyno = "key_no")
 
-//    , args_job(tableName = "company_lawsuit"
-//      , explode = "LATERAL VIEW explode(jsonall_2_array('$.litigant_id', concat_ws('\u0001', defendant_info, plaintiff_info, litigant_info)) ) key AS key_no"
-//      , keyno = "key_no"
-//    )
+    //    , args_job(tableName = "company_lawsuit"
+    //      , explode = "LATERAL VIEW explode(jsonall_2_array('$.litigant_id', concat_ws('\u0001', defendant_info, plaintiff_info, litigant_info)) ) key AS key_no"
+    //      , keyno = "key_no"
+    //    )
 
     , args_job(tableName = "company"
       , rowkey = "company_id"
@@ -198,7 +198,7 @@ case class lookup_tab_pid(s: SparkSession
          |FROM    $tab_back_deleted
          |WHERE   ds > $ds
          |GROUP BY person_id
-         |""".stripMargin).createOrReplaceTempView("mapping")
+         |""".stripMargin).createOrReplaceTempView(s"mapping$tn")
 
     sql(
       s"""
@@ -217,18 +217,17 @@ case class lookup_tab_pid(s: SparkSession
          |                    ) AS t1
          |        ) AS t2
          |WHERE   t2.num = 1
-         |""".stripMargin).createOrReplaceTempView("tab_tmp")
+         |""".stripMargin).createOrReplaceTempView(s"tab_tmp$tn")
 
     sql(
       s"""
-         |INSERT OVERWRITE TABLE $tar_tab PARTITION(ds='$lastDs',tn='$tn')
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $tar_tab PARTITION(ds='$lastDs',tn='$tn')
          |SELECT  ${tar_cols.mkString(",")},
-         |        -- get_table_message(${tar_cols.mkString(",")}, '$tn') message
          |        ${to_json(tar_cols ++ Seq("tn"))}
-         |FROM    mapping a
+         |FROM    mapping$tn a
          |JOIN    (
          |            SELECT  *,'$tn' as tn
-         |            FROM  tab_tmp
+         |            FROM  tab_tmp$tn
          |            ${args_job.explode}
          |        ) b
          |ON      a.person_id = b.$keyno
@@ -241,11 +240,11 @@ case class lookup_tab_pid(s: SparkSession
 
   }
 
-  private def to_json(seq:Seq[String]): String = {
+  private def to_json(seq: Seq[String]): String = {
     val r1 = seq.map(x => {
       s"'$x',$x"
     }).mkString(",")
-   s"to_json(map($r1)) message"
+    s"to_json(map($r1)) message"
   }
 
   private def get_partition_order_by(): String = {
@@ -265,15 +264,24 @@ object lookup_tab_pid {
       println("please set project tn.")
       sys.exit(-1)
     }
-    val Array(project, tn) = args
+    val Array(project, tableName) = args
     val config = EsConfig.getEsConfigMap ++ mutable.Map(
       "spark.hadoop.odps.project.name" -> project,
       "spark.debug.maxToStringFields" -> "200",
       "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
     )
     val spark = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
-    val re = lookup_tab_pid(s = spark, project = project, args_job.get_args_company_job(tn))
-    re.calc()
+    var start = args_job.tab_args
+    if (!tableName.equals("all")) {
+      val set = tableName.split(",").toSet
+      start = start.filter(a => set.contains(a.tableName))
+    }
+    val a = start.map(e => (e.tableName, () => {
+      val re = lookup_tab_pid(s = spark, project = project, args_job.get_args_company_job(e.tableName))
+      re.calc()
+      true
+    }))
+    AsyncExtract.startAndWait(spark, a)
     spark.stop()
   }
 }

+ 4 - 2
src/main/scala/com/winhc/bigdata/spark/utils/AsyncExtract.scala

@@ -35,7 +35,7 @@ object AsyncExtract {
       .filter(_ != null)
       .map(i => i.status()).exists(i => JobExecutionStatus.FAILED.equals(i))
     if (failed) {
-      println("There are failed jobs !!!")
+      println("There are failed jobs -999 !!!")
       sys.exit(-999)
     }
   }
@@ -54,6 +54,7 @@ case class AsyncExtract(seq: Seq[(String, () => Boolean)]) extends Logging {
           println(s"---${e._1}---------$res-------------")
         } catch {
           case ex: Exception => {
+            println(s"---error-job: ${e._1}---------${ex.getMessage}-------------")
             error_count.incrementAndGet()
             ex.printStackTrace()
           }
@@ -64,7 +65,8 @@ case class AsyncExtract(seq: Seq[(String, () => Boolean)]) extends Logging {
     })
     latch.await()
     if (error_count.get() != 0) {
-      println("There are failed jobs !!!")
+      println("error_count: "+error_count.get())
+      println("There are failed jobs -998 !!!")
       sys.exit(-998)
     }
   }