Переглянути джерело

fix: 并发提job异常退出

许家凯 4 роки тому
батько
коміт
5a92bd8d70

+ 8 - 1
src/main/scala/com/winhc/bigdata/spark/utils/AsyncExtract.scala

@@ -1,12 +1,13 @@
 package com.winhc.bigdata.spark.utils
 
 import java.util.concurrent.{CountDownLatch, TimeUnit}
-
 import org.apache.commons.lang3.time.StopWatch
 import org.apache.spark.JobExecutionStatus
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 
+import java.util.concurrent.atomic.AtomicInteger
+
 /**
  * @Author: XuJiakai
  * @Date: 2020/8/21 11:36
@@ -44,6 +45,7 @@ case class AsyncExtract(seq: Seq[(String, () => Boolean)]) extends Logging {
 
   def start(): Unit = {
     val latch = new CountDownLatch(seq.length)
+    val error_count = new AtomicInteger(0)
     seq.foreach(e => {
       asyncWatch(e._1, () => {
         try {
@@ -52,6 +54,7 @@ case class AsyncExtract(seq: Seq[(String, () => Boolean)]) extends Logging {
           println(s"---${e._1}---------$res-------------")
         } catch {
           case ex: Exception => {
+            error_count.incrementAndGet()
             ex.printStackTrace()
           }
         } finally {
@@ -60,6 +63,10 @@ case class AsyncExtract(seq: Seq[(String, () => Boolean)]) extends Logging {
       })
     })
     latch.await()
+    if (error_count.get() != 0) {
+      println("There are failed jobs !!!")
+      sys.exit(-998)
+    }
   }
 
   private def asyncWatch[T](name: String, f: () => Unit): Unit = {