Переглянути джерело

feat: 查老赖全异步处理

许家凯 4 роки тому
батько
коміт
305a098a8a

+ 21 - 10
src/main/scala/com/winhc/bigdata/spark/jobs/deadbeat/deadbeat_info.scala

@@ -214,7 +214,7 @@ case class deadbeat_info(s: SparkSession,
     spark.udf.register("get_birth_year", get_birth_year _)
     spark.udf.register("agg_label", new person_agg_label)
     spark.udf.register("get_empty_map", get_empty_map _)
-    is_id_card()
+    is_id_card_udf()
     id_card_trim_udf()
 
     def toTime(str: String): String = DateUtils.toMillisTimestamp(str, pattern = "yyyy-MM-dd HH:mm:ss")
@@ -229,10 +229,16 @@ case class deadbeat_info(s: SparkSession,
     mapTables("company_dishonest_info") = ("rowkey", "name", "card_num", "pub_date", "status")
     mapTables("company_zxr_final_case") = ("rowkey", "name", "identity_num", "case_create_time", "deleted")
     mapTables("company_zxr_restrict") = ("rowkey", "name", "identity_num", "case_create_time", "status")
-    mapTables.foreach(m => {
-      spark.sparkContext.setJobDescription(s"查老赖(个人)数据预处理:${m._1}")
-      deadbeat_info_pre(spark, project = project, table = m._1, rowkey = m._2._1, cid = null, name = m._2._2, card_num = m._2._3, publish_date = m._2._4, deleted = m._2._5).calc()
-    })
+    val ts = mapTables.map(m => {
+      val name = s"查老赖(个人)数据预处理:${m._1}"
+      val func = () => {
+        spark.sparkContext.setJobDescription(name)
+        deadbeat_info_pre(spark, project = project, table = m._1, rowkey = m._2._1, cid = null, name = m._2._2, card_num = m._2._3, publish_date = m._2._4, deleted = m._2._5).calc()
+        return true
+      }
+      (name, func)
+    }).toSeq
+    AsyncExtract.startAndWait(spark, ts)
   }
 
   def person(): Unit = {
@@ -311,11 +317,16 @@ case class deadbeat_info(s: SparkSession,
     mapTables("company_dishonest_info") = ("rowkey", "cid", "name", "card_num", "pub_date", "status")
     mapTables("company_zxr_final_case") = ("rowkey", "cid", "name", "identity_num", "case_create_time", "deleted")
     mapTables("company_zxr_restrict") = ("rowkey", "cid", "name", "identity_num", "case_create_time", "status")
-    mapTables.foreach(m => {
-      println(m)
-      spark.sparkContext.setJobDescription(s"查老赖(企业)数据预处理:${m._1}")
-      deadbeat_info_pre(spark, project = project, table = m._1, rowkey = m._2._1, cid = m._2._2, name = m._2._3, card_num = m._2._4, publish_date = m._2._5, deleted = m._2._6).calc()
-    })
+    val ts = mapTables.map(m => {
+      val name = s"查老赖(企业)数据预处理:${m._1}"
+      val func = () => {
+        spark.sparkContext.setJobDescription(name)
+        deadbeat_info_pre(spark, project = project, table = m._1, rowkey = m._2._1, cid = m._2._2, name = m._2._3, card_num = m._2._4, publish_date = m._2._5, deleted = m._2._6).calc()
+        return true
+      }
+      (name, func)
+    }).toSeq
+    AsyncExtract.startAndWait(spark, ts)
   }
 
   def company(): Unit = {