Browse Source

fix: 并发提交job优化

许家凯 4 years ago
parent
commit
206ffe523a
1 changed files with 13 additions and 17 deletions
  1. 13 17
      src/main/scala/com/winhc/bigdata/spark/utils/AsyncExtract.scala

+ 13 - 17
src/main/scala/com/winhc/bigdata/spark/utils/AsyncExtract.scala

@@ -25,18 +25,6 @@ object AsyncExtract {
 
   def wait(spark: SparkSession, jobSize: Int = 0): Unit = {
     val tracker = spark.sparkContext.statusTracker
-    var i = 0
-    while (!tracker.getActiveJobIds().nonEmpty && i < 100) {
-      i = i + 1
-      println("await job...")
-      Thread.sleep(10000)
-    }
-    println(tracker.getActiveJobIds().mkString(","))
-    while (tracker.getActiveJobIds().nonEmpty || tracker.getJobIdsForGroup(null).length < jobSize) {
-      println(tracker.getActiveJobIds().mkString(","))
-      println("spark is not stop ! ")
-      Thread.sleep(10000)
-    }
 
     val ints = tracker.getJobIdsForGroup(null)
     println(ints.mkString(","))
@@ -46,6 +34,7 @@ object AsyncExtract {
       .filter(_ != null)
       .map(i => i.status()).exists(i => JobExecutionStatus.FAILED.equals(i))
     if (failed) {
+      println("There are failed jobs !!!")
       sys.exit(-999)
     }
   }
@@ -57,13 +46,20 @@ case class AsyncExtract(seq: Seq[(String, () => Boolean)]) extends Logging {
     val latch = new CountDownLatch(seq.length)
     seq.foreach(e => {
       asyncWatch(e._1, () => {
-        println("______________________________" + e._1 + "___________________________________")
-        val res = e._2()
-        println(s"---${e._1}---------$res-------------")
-        latch.countDown()
+        try {
+          println("______________________________" + e._1 + "___________________________________")
+          val res = e._2()
+          println(s"---${e._1}---------$res-------------")
+        } catch {
+          case ex: Exception => {
+            ex.printStackTrace()
+          }
+        } finally {
+          latch.countDown()
+        }
       })
     })
-    //    latch.await()
+    latch.await()
   }
 
   private def asyncWatch[T](name: String, f: () => Unit): Unit = {