Pārlūkot izejas kodu

Merge branch 'master' of http://139.224.213.4:3000/bigdata/Spark_Max

晏永年 4 gadi atpakaļ
vecāks
revīzija
db6d1f5dce

+ 25 - 34
src/main/scala/com/winhc/bigdata/spark/jobs/chance/ChangeExtract.scala

@@ -1,9 +1,6 @@
 package com.winhc.bigdata.spark.jobs.chance
 
-import java.util.concurrent.{CountDownLatch, TimeUnit}
-
 import com.winhc.bigdata.spark.config.EsConfig
-import com.winhc.bigdata.spark.test.TestChangeExtract.seq
 import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
 import com.winhc.bigdata.spark.utils._
 import org.apache.spark.internal.Logging
@@ -357,35 +354,13 @@ object ChangeExtract {
   )
 
 
-  case class ChangeExtract(s: SparkSession,
-                           ds: String //表名(不加前后辍)
-                          ) extends Watching {
-
-    override protected val spark: SparkSession = s
-
-    def calc(): Unit = {
-      val latch = new CountDownLatch(seq.length)
-      startArgs
-        //        .filter(s => seq.contains(s.tableName))
-        .foreach(e => {
-          asyncWatch(e.tableName, () => {
-            println("______________________________" + e.tableName + "___________________________________")
-            ChangeExtractHandle(spark, e.project, e.tableName, e.primaryKey, ds, e.primaryFields.split(",")).calc(e.isCopy)
-            latch.countDown()
-          })
-        })
-      latch.await(60, TimeUnit.MINUTES)
-    }
-
-  }
-
-
   private case class Args(project: String = "winhc_eci_dev"
                           , tableName: String
                           , primaryKey: String = "rowkey"
                           , primaryFields: String
                           , isCopy: Boolean = true)
 
+
   def main(args: Array[String]): Unit = {
     val Array(tableName, inc_ds) = args
 
@@ -395,16 +370,32 @@ object ChangeExtract {
     )
     val spark = SparkUtils.InitEnv("ChangeExtract", config)
 
-    if (tableName.equals("all")) {
-      startArgs.foreach(e => {
-        ChangeExtractHandle(spark, e.project, e.tableName, e.primaryKey, inc_ds, e.primaryFields.split(",")).calc(e.isCopy)
-      })
-    } else {
+
+    var start = startArgs
+    if (!tableName.equals("all")) {
       val set = tableName.split(",").toSet
-      startArgs.filter(a => set.contains(a.tableName)).foreach(e => {
-        ChangeExtractHandle(spark, e.project, e.tableName, e.primaryKey, inc_ds, e.primaryFields.split(",")).calc(e.isCopy)
-      })
+      start = start.filter(a => set.contains(a.tableName))
     }
+
+    val a = start.map(e => (e.tableName, () => {
+      ChangeExtractHandle(spark, e.project, e.tableName, e.primaryKey, inc_ds, e.primaryFields.split(",")).calc(e.isCopy)
+      true
+    }))
+
+    AsyncExtract.startAndWait(spark, a)
+
+    /* if (tableName.equals("all")) {
+       startArgs.foreach(e => {
+         ChangeExtractHandle(spark, e.project, e.tableName, e.primaryKey, inc_ds, e.primaryFields.split(",")).calc(e.isCopy)
+       })
+     } else {
+       val set = tableName.split(",").toSet
+       startArgs.filter(a => set.contains(a.tableName)).foreach(e => {
+         ChangeExtractHandle(spark, e.project, e.tableName, e.primaryKey, inc_ds, e.primaryFields.split(",")).calc(e.isCopy)
+       })
+     }*/
+
     spark.stop()
   }
+
 }

+ 125 - 0
src/main/scala/com/winhc/bigdata/spark/utils/AsyncExtract.scala

@@ -0,0 +1,125 @@
+package com.winhc.bigdata.spark.utils
+
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
+import org.apache.commons.lang3.time.StopWatch
+import org.apache.spark.JobExecutionStatus
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/8/21 11:36
+ * @Description:
+ */
+object AsyncExtract {
+
+  def startAndWait(spark: SparkSession, seq: Seq[(String, () => Boolean)]): Unit = {
+    start(seq)
+    wait(spark)
+  }
+
+  def start(seq: Seq[(String, () => Boolean)]): Unit = {
+    AsyncExtract(seq).start()
+  }
+
+  def wait(spark: SparkSession): Unit = {
+    val tracker = spark.sparkContext.statusTracker
+    var i = 0
+    while (!tracker.getActiveJobIds().nonEmpty && i < 100) {
+      i = i + 1
+      println("await job 。。。")
+      Thread.sleep(10000)
+    }
+    println(tracker.getActiveJobIds().mkString(","))
+    while (tracker.getActiveJobIds().nonEmpty) {
+      println(tracker.getActiveJobIds().mkString(","))
+      println("spark is not stop ! ")
+      Thread.sleep(10000)
+    }
+
+    val ints = tracker.getJobIdsForGroup(null)
+    println(ints.mkString(","))
+    val failed = ints
+      .map(i => tracker.getJobInfo(i))
+      .map(o => o.orNull)
+      .filter(_ != null)
+      .map(i => i.status()).exists(i => JobExecutionStatus.FAILED.equals(i))
+    if (failed) {
+      sys.exit(-999)
+    }
+  }
+}
+
+case class AsyncExtract(seq: Seq[(String, () => Boolean)]) extends Logging {
+
+  def start(): Unit = {
+    val latch = new CountDownLatch(seq.length)
+    seq.foreach(e => {
+      asyncWatch(e._1, () => {
+        println("______________________________" + e._1 + "___________________________________")
+        val res = e._2()
+        println(s"---${e._1}---------$res-------------")
+        latch.countDown()
+      })
+    })
+    //    latch.await()
+  }
+
+  private def asyncWatch[T](name: String, f: () => Unit): Unit = {
+    val t = new Thread(new Runnable {
+      override def run(): Unit = {
+        watch(name, f)
+      }
+    })
+    t.setName(name)
+    t.start()
+  }
+
+  private def watch[T](name: String, fun: () => T): T = {
+    val stopWatch = new StopWatch
+    stopWatch.start()
+    println(
+      s"""
+         |
+         |----------------------- submit [action=$name] begin... -----------------------
+         |""".stripMargin)
+    logInfo(
+      s"""
+         |
+         |----------------------- submit [action=$name] begin... -----------------------
+       """.stripMargin
+    )
+
+    val r = fun()
+    stopWatch.stop()
+    val totalSeconds = stopWatch.getTime(TimeUnit.SECONDS)
+
+    println(
+      s"""
+         |-------------- submit [action=$name] end, cost ${toTimeStr(totalSeconds)} -------------
+         |
+         |""".stripMargin)
+    logInfo(
+      s"""
+         |-------------- submit [action=$name] end, cost ${toTimeStr(totalSeconds)} -------------
+         |
+       """.stripMargin
+    )
+    r
+  }
+
+  private def toTimeStr(totalSeconds: Long): String = {
+    val hours = TimeUnit.SECONDS.toHours(totalSeconds)
+    val minutes = TimeUnit.SECONDS.toMinutes(totalSeconds - hours * 3600)
+    val seconds = totalSeconds - hours * 3600 - minutes * 60
+    val timeStr = if (hours > 0) {
+      s"${hours}h${minutes}m${seconds}s"
+    } else if (minutes > 0) {
+      s"${minutes}m${seconds}s"
+    } else {
+      s"${seconds}s"
+    }
+    timeStr
+  }
+}