瀏覽代碼

异步执行测试

xufei 4 年之前
父節點
當前提交
c1870b74e0

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/jobs/chance/ChangeExtract.scala

@@ -251,14 +251,14 @@ object ChangeExtract {
       ))
 
       spark.createDataFrame(rdd, schema)
-        .createOrReplaceTempView("tmp_change_extract_view") //
+        .createOrReplaceTempView(s"tmp_change_extract_view$tableName") //
 
       sql(
         s"""
            |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${project}.$target_eci_change_extract PARTITION(ds='$ds',tn='$tableName')
            |SELECT *
            |FROM
-           |    tmp_change_extract_view
+           |    tmp_change_extract_view$tableName
            |""".stripMargin)
     }
   }

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamic.scala

@@ -162,7 +162,7 @@ object CompanyDynamic {
         , "create_time" -> StringType
       ))
       spark.createDataFrame(rdd, schema)
-        .createOrReplaceTempView("company_dynamic_tmp")
+        .createOrReplaceTempView(s"company_dynamic_tmp$tableName")
 
       val cols = getColumns(s"$project.$targetTab").filter(!_.equals("ds")).filter(!_.equals("tn"))
 
@@ -171,7 +171,7 @@ object CompanyDynamic {
            |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${getEnvProjectName(env, project)}.$targetTab PARTITION(ds='$ds',tn='$tableName')
            |SELECT ${cols.mkString(",")}
            |FROM
-           |    company_dynamic_tmp
+           |    company_dynamic_tmp$tableName
            |""".stripMargin)
     }
   }

+ 3 - 3
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamicHandle.scala

@@ -23,7 +23,7 @@ trait CompanyDynamicHandle {
     , "company_abnormal_info" -> "eci_exception" //经营异常
     , "" -> "eci_zscq" //知识产权
     , "wenshu_detail_combine" -> "eci_wenshu" //裁判文书
-    , "" -> "court_announcement" //法院公告
+    , "company_court_announcement_list" -> "court_announcement" //法院公告
     , "" -> "" //对外投资
     , "company_punishment_info" -> "punishment_info" //行政处罚
     , "company_punishment_info_creditchina" -> "punishment_info_creditchina" //行政处罚-信用中国
@@ -52,7 +52,7 @@ trait CompanyDynamicHandle {
     , "company_finance" -> "company_finance" //融资动态
     , "company_check_info" -> "spot_check" //抽查检查
     , "company_double_random_check_info" -> "company_double_random_check_info" //双随机抽查
-    , "company_court_register" -> "company_court_register" //立案信息
+    , "company_court_register_list" -> "company_court_register" //立案信息
     , "company_zxr_final_case" -> "company_zxr_final_case" //终本案件
     , "company_license_creditchina" -> "company_license_creditchina" //行政许可-信用中国
     , "company_license_entpub" -> "company_license_entpub" //行政许可-企业公示
@@ -106,7 +106,7 @@ trait CompanyDynamicHandle {
     , "" -> "37" // 被执行人
     , "company_send_announcement_list" -> "38" // 送达报告
     , "bankruptcy_open_case" -> "39" // 破产重整
-    , "company_court_register" -> "40" // 立案信息
+    , "company_court_register_list" -> "40" // 立案信息
     , "company_annual_report_out_guarantee" -> "41" // 年报-对外担保
     , "company_zxr_final_case" -> "41" // 终本案件
   )

+ 136 - 0
src/main/scala/com/winhc/bigdata/spark/test/TestChangeExtract.scala

@@ -0,0 +1,136 @@
+package com.winhc.bigdata.spark.test
+
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
+import com.winhc.bigdata.spark.config.EsConfig
+import com.winhc.bigdata.spark.jobs.chance.ChangeExtract.ChangeExtractHandle
+import com.winhc.bigdata.spark.utils.{SparkUtils, Watching}
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.mutable
+
+/**
+ * @Description:
+ * @author π
+ * @date 2020/8/1914:38
+ */
+object TestChangeExtract {
+
+  private case class Args(project: String = "winhc_eci_dev"
+                          , tableName: String
+                          , primaryKey: String = "rowkey"
+                          , primaryFields: String
+                          , isCopy: Boolean = true)
+
+  private val startArgs = Seq(
+    Args(tableName = "company_tm", primaryFields = "status_new")
+    , Args(tableName = "company_patent_list", primaryFields = "lprs")
+    , Args(tableName = "company_copyright_works_list", primaryFields = "type")
+    , Args(tableName = "company_copyright_reg_list", primaryFields = "version")
+    , Args(tableName = "company_land_publicity", primaryFields = "title,location,use_for")
+    , Args(tableName = "company_land_announcement", primaryFields = "e_number,project_name")
+    , Args(tableName = "company_bid_list", primaryFields = "title")
+    , Args(tableName = "company_land_transfer", primaryFields = "num,location")
+    , Args(tableName = "company_employment", primaryFields = "title,cid,url_path")
+    , Args(tableName = "company_brief_cancel_announcement_result", primaryFields = "cid,main_id")
+    , Args(tableName = "company_env_punishment", primaryFields = "punish_number")
+    , Args(tableName = "company_icp", primaryFields = "domain")
+    , Args(tableName = "company_punishment_info", primaryFields = "punish_number")
+    , Args(tableName = "company_punishment_info_creditchina", primaryFields = "punish_number")
+    , Args(tableName = "bankruptcy_open_case", primaryFields = "case_no", isCopy = false) //破产重整
+    , Args(tableName = "company_public_announcement2_list", primaryFields = "applicant_cid,owner_cid,drawer_cid,gather_name_cid,bill_num") //公示催告
+    , Args(tableName = "company_mortgage_info", primaryFields = "reg_num") //动产抵押
+    , Args(tableName = "company_stock_announcement", primaryFields = "title") //企业公告
+    , Args(tableName = "company_check_info", primaryFields = "check_result") //抽查检查
+    , Args(tableName = "company_court_announcement_list", primaryFields = "content") //法院公告
+    , Args(tableName = "company_court_open_announcement_list", primaryFields = "case_reason") //开庭公告
+    , Args(tableName = "company_court_register_list", primaryFields = "area") //立案信息
+    , Args(tableName = "company_double_random_check_info", primaryFields = "check_plan_name") //双随机抽查
+    , Args(tableName = "company_judicial_sale_combine_list", primaryFields = "title") //司法拍卖
+    , Args(tableName = "company_tax_contravention", primaryFields = "case_type") //税收违法
+    , Args(tableName = "company_send_announcement_list", primaryFields = "title") //送达公告
+    , Args(tableName = "company_annual_report_out_guarantee", primaryFields = "id") //年报-对外担保
+    , Args(tableName = "company_zxr_restrict", primaryFields = "status") //限制消费令,发现最新状态
+
+    , Args(tableName = "company_zxr_final_case", primaryFields = "identity_num") //终本案件
+    , Args(tableName = "company_license_creditchina", primaryFields = "licence_content") //行政许可-信用中国
+    , Args(tableName = "company_license_entpub", primaryFields = "license_name") //行政许可-企业公示
+    , Args(tableName = "company_license", primaryFields = "license_name") //行政许可
+    , Args(tableName = "wenshu_detail_combine", primaryFields = "cname") //文书
+
+    , Args(tableName = "company_certificate", primaryFields = "type")
+    , Args(tableName = "company_abnormal_info", primaryFields = "remove_reason")
+
+    , Args(tableName = "company_own_tax", primaryFields = "tax_balance,tax_category,tax_num")
+
+    , Args(tableName = "company_equity_info", primaryKey = "id", primaryFields = "reg_number", isCopy = false)
+    , Args(tableName = "company_staff", primaryFields = "staff_type")
+    //公司名称,法人ID:人标识或公司标识,公司类型,注册地址,营业期限终止日期,经营范围,登记机关,企业状态                 ,注册资本,实收资本金额(单位:分),注销日期,注销原因
+    , Args(tableName = "company", primaryKey = "cid", primaryFields = "name,legal_entity_id,company_org_type,reg_location,to_time,business_scope,reg_institute,reg_status,reg_capital,actual_capital_amount,cancel_date,cancel_reason")
+    , Args(tableName = "company_illegal_info", primaryFields = "remove_reason")
+    , Args(tableName = "company_finance", primaryFields = "round")
+    , Args(tableName = "company_dishonest_info", primaryFields = "case_no")
+  )
+
+  def main(args: Array[String]): Unit = {
+
+    val ds = args(0)
+    val project = "winhc_eci_dev"
+    val config = EsConfig.getEsConfigMap ++ mutable.Map(
+      "spark.hadoop.odps.project.name" -> project,
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
+    )
+    val spark = SparkUtils.InitEnv("ChangeExtract", config)
+    TestChangeExtract(spark, ds).calc()
+    spark.stop()
+  }
+
+  val seq = List(
+//    "company_mortgage_info"
+//    , "company_check_info"
+//    , "company_court_announcement_list"
+//    , "company_court_open_announcement_list"
+//    , "company_court_register_list"
+//    , "company_double_random_check_info"
+//    , "company_judicial_sale_combine_list"
+//    , "company_tax_contravention"
+//    , "company_zxr_final_case"
+//    , "company_license_creditchina"
+//
+//    ,
+//    "company_license_entpub"
+//    ,
+//    "company_license"
+//    ,
+    "wenshu_detail_combine"
+  )
+
+  case class TestChangeExtract(s: SparkSession,
+                               ds: String //表名(不加前后辍)
+                              ) extends Watching {
+
+    override protected val spark: SparkSession = s
+
+
+    def calc2(): Unit = {
+      startArgs.filter(s => seq.contains(s.tableName)).foreach(e => {
+          println("______________________________" + e.tableName + "___________________________________")
+          ChangeExtractHandle(spark, e.project, e.tableName, e.primaryKey, ds, e.primaryFields.split(",")).calc(e.isCopy)
+      })
+    }
+
+    def calc(): Unit = {
+      val latch = new CountDownLatch(seq.length)
+      startArgs.filter(s => seq.contains(s.tableName)).foreach(e => {
+        asyncWatch(e.tableName, () => {
+          println("______________________________" + e.tableName + "___________________________________")
+          ChangeExtractHandle(spark, e.project, e.tableName, e.primaryKey, ds, e.primaryFields.split(",")).calc(e.isCopy)
+          latch.countDown()
+        })
+      })
+      latch.await(60, TimeUnit.MINUTES)
+    }
+
+  }
+
+}

+ 120 - 0
src/main/scala/com/winhc/bigdata/spark/test/TestCompanyDynamic.scala

@@ -0,0 +1,120 @@
+package com.winhc.bigdata.spark.test
+
+import java.util.concurrent.CountDownLatch
+
+import com.winhc.bigdata.spark.config.EsConfig
+import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamic.{CompanyDynamicUtil}
+import com.winhc.bigdata.spark.utils.{SparkUtils, Watching}
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.mutable
+
+/**
+ * @Description: ${todo}
+ * @author π
+ * @date 2020/8/1918:08
+ */
+object TestCompanyDynamic {
+
+  private case class Args(project: String = "winhc_eci_dev"
+                          , tableName: String
+                          , bName: Int = 1)
+
+  private val startArgs = Seq(
+    Args(tableName = "company_abnormal_info", bName = 0)
+    , Args(tableName = "company_equity_info")
+    , Args(tableName = "company_staff", bName = 0)
+    , Args(tableName = "company", bName = 0)
+    , Args(tableName = "bankruptcy_open_case", bName = 1)
+    , Args(tableName = "company_illegal_info", bName = 0)
+    , Args(tableName = "company_land_publicity", bName = 1)
+    , Args(tableName = "company_employment", bName = 1)
+    , Args(tableName = "company_land_announcement", bName = 1)
+    , Args(tableName = "company_bid_list", bName = 1)
+    , Args(tableName = "company_land_transfer", bName = 1)
+    , Args(tableName = "company_env_punishment", bName = 1)
+    , Args(tableName = "company_punishment_info", bName = 1)
+    , Args(tableName = "company_punishment_info_creditchina", bName = 1)
+    , Args(tableName = "bankruptcy_open_case", bName = 1)
+    , Args(tableName = "company_public_announcement2_list", bName = 1)
+    , Args(tableName = "company_mortgage_info", bName = 1)
+    , Args(tableName = "company_stock_announcement", bName = 1)
+    , Args(tableName = "company_finance", bName = 1)
+    , Args(tableName = "company_dishonest_info", bName = 1)
+    , Args(tableName = "company_send_announcement_list", bName = 1)
+    , Args(tableName = "company_annual_report_out_guarantee", bName = 1)
+    , Args(tableName = "company_zxr_restrict", bName = 1)
+
+    , Args(tableName = "company_zxr_final_case", bName = 1) //终本案件
+    , Args(tableName = "company_license_creditchina", bName = 1) //行政许可-信用中国
+    , Args(tableName = "company_license_entpub", bName = 1) //行政许可-企业公示
+    , Args(tableName = "company_license", bName = 1) //行政许可
+    , Args(tableName = "company_check_info", bName = 1) //抽查检查
+    , Args(tableName = "company_court_announcement_list", bName = 1) //法院公告
+    , Args(tableName = "company_court_open_announcement_list", bName = 1) //开庭公告
+    , Args(tableName = "company_court_register_list", bName = 1) //立案信息
+    , Args(tableName = "company_double_random_check_info", bName = 1) //双随机抽查
+    , Args(tableName = "company_judicial_sale_combine_list", bName = 1) //司法拍卖
+    , Args(tableName = "company_tax_contravention", bName = 1) //税收违法
+    , Args(tableName = "wenshu_detail_combine", bName = 1) //裁判文书
+  )
+  val seq = List(
+//        "company_mortgage_info"
+//        , "company_check_info"
+        , "company_court_announcement_list"
+//        , "company_court_open_announcement_list"
+        , "company_court_register_list"
+//        , "company_double_random_check_info"
+        , "company_judicial_sale_combine_list"
+//        , "company_tax_contravention"
+//        , "company_zxr_final_case"
+//        , "company_license_creditchina"
+//        ,
+//    "company_license_entpub"
+//        , "company_license"
+//        ,"wenshu_detail_combine"
+  )
+
+  def main(args: Array[String]): Unit = {
+
+    val project = "winhc_eci_dev"
+
+    val config = EsConfig.getEsConfigMap ++ mutable.Map(
+      "spark.hadoop.odps.project.name" -> project,
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
+    )
+    val spark = SparkUtils.InitEnv("CompanyDynamic", config)
+    TestCompanyDynamic(spark, project, "20200817").calc()
+    spark.stop()
+  }
+
+
+  case class TestCompanyDynamic(s: SparkSession,
+                                project: String, //表名(不加前后辍)
+                                ds: String //表名(不加前后辍)
+                               ) extends Watching {
+
+    override protected val spark: SparkSession = s
+
+    def calc2(): Unit = {
+      startArgs.filter(s => seq.contains(s.tableName)).foreach(e => {
+          println("______________________________" + e.tableName + "___________________________________")
+          CompanyDynamicUtil(spark, project, ds).calc(e.tableName, e.bName)
+      })
+    }
+
+    def calc(): Unit = {
+      val latch = new CountDownLatch(seq.length)
+      startArgs.filter(s => seq.contains(s.tableName)).foreach(e => {
+        asyncWatch(e.tableName, () => {
+          println("______________________________" + e.tableName + "___________________________________")
+          CompanyDynamicUtil(spark, project, ds).calc(e.tableName, e.bName)
+          latch.countDown()
+        })
+      })
+      latch.await()
+    }
+
+  }
+
+}

+ 68 - 0
src/main/scala/com/winhc/bigdata/spark/utils/Watching.scala

@@ -0,0 +1,68 @@
+package com.winhc.bigdata.spark.utils
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.commons.lang3.time.StopWatch
+
+/**
+ * @author π
+ */
+trait Watching extends LoggingUtils {
+
+  def asyncWatch[T](name: String, f: () => Unit): Unit = {
+    val t = new Thread(new Runnable {
+      override def run(): Unit = {
+        watch(name, f)
+      }
+    })
+    t.setName(name)
+    t.start()
+  }
+
+  def watch[T](name: String, fun: () => T): T = {
+    val stopWatch = new StopWatch
+    stopWatch.start()
+    println(
+      s"""
+        |
+        |----------------------- submit [action=$name] begin... -----------------------
+        |""".stripMargin)
+    logInfo(
+      s"""
+         |
+         |----------------------- submit [action=$name] begin... -----------------------
+       """.stripMargin
+    )
+
+    val r = fun()
+    stopWatch.stop()
+    val totalSeconds = stopWatch.getTime(TimeUnit.SECONDS)
+
+    println(
+      s"""
+        |-------------- submit [action=$name] end, cost ${toTimeStr(totalSeconds)} -------------
+        |
+        |""".stripMargin)
+    logInfo(
+      s"""
+         |-------------- submit [action=$name] end, cost ${toTimeStr(totalSeconds)} -------------
+         |
+       """.stripMargin
+    )
+    r
+  }
+
+  private def toTimeStr(totalSeconds: Long): String = {
+    val hours = TimeUnit.SECONDS.toHours(totalSeconds)
+    val minutes = TimeUnit.SECONDS.toMinutes(totalSeconds - hours * 3600)
+    val seconds = totalSeconds - hours * 3600 - minutes * 60
+    val timeStr = if (hours > 0) {
+      s"${hours}h${minutes}m${seconds}s"
+    } else if (minutes > 0) {
+      s"${minutes}m${seconds}s"
+    } else {
+      s"${seconds}s"
+    }
+    timeStr
+  }
+}