Jelajahi Sumber

fix: 优化若干问题

- 多线程提交job提前退问题
- 动态输出两个info_type调整
- 工商信息实缴资本不算做变更
许家凯 4 tahun lalu
induk
melakukan
beec40e317

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/jobs/chance/ChangeExtract.scala

@@ -344,8 +344,8 @@ object ChangeExtract {
 
     , Args(tableName = "company_equity_info", primaryKey = "id", primaryFields = "reg_number", isCopy = false)
     , Args(tableName = "company_staff", primaryFields = "staff_type")
-    //公司名称,法人ID:人标识或公司标识,公司类型,注册地址,营业期限终止日期,经营范围,登记机关,企业状态                 ,注册资本,实收资本金额(单位:分),注销日期,注销原因
-    , Args(tableName = "company", primaryKey = "cid", primaryFields = "name,legal_entity_id,company_org_type,reg_location,to_time,business_scope,reg_institute,reg_status,reg_capital,actual_capital_amount,cancel_date,cancel_reason")
+    //公司名称,法人ID:人标识或公司标识,公司类型,注册地址,营业期限终止日期,经营范围,登记机关,企业状态                 ,注册资本,注销日期,注销原因
+    , Args(tableName = "company", primaryKey = "cid", primaryFields = "name,legal_entity_id,company_org_type,reg_location,to_time,business_scope,reg_institute,reg_status,reg_capital,cancel_date,cancel_reason")
     , Args(tableName = "company_illegal_info", primaryFields = "remove_reason")
     , Args(tableName = "company_finance", primaryFields = "round")
     , Args(tableName = "company_dishonest_info", primaryFields = "case_no")

+ 5 - 3
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamicHandle.scala

@@ -7,6 +7,7 @@ package com.winhc.bigdata.spark.jobs.dynamic
  */
 trait CompanyDynamicHandle {
 
+  //废弃
   private val table_2_sub_info_type_map = Map(
     "CompanyDynamicHandleTest" -> "MyTest"
     , "company" -> "eci_detail" //工商信息
@@ -60,6 +61,7 @@ trait CompanyDynamicHandle {
     , "company_holder" -> "company_holder" //行政许可
   )
 
+  //废弃
   private val table_2_info_type = Map(
     "CompanyDynamicHandleTest" -> "0"
     , "company" -> "1" //工商信息
@@ -230,7 +232,7 @@ trait CompanyDynamicHandle {
    *
    * @return
    */
-  protected def get_info_type(): String = table_2_info_type(getClass.getSimpleName)
+  protected def get_info_type(): String = getClass.getSimpleName //table_2_info_type(getClass.getSimpleName)
 
   /**
    * 变更内容
@@ -239,7 +241,7 @@ trait CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String
+  protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String = null
 
   /**
    * 变更时间
@@ -268,7 +270,7 @@ trait CompanyDynamicHandle {
    *
    * @return
    */
-  protected def get_sub_info_type(): String = table_2_sub_info_type_map(getClass.getSimpleName)
+  protected def get_sub_info_type(): String = null //table_2_sub_info_type_map(getClass.getSimpleName)
 
   /**
    * 风险等级

+ 1 - 9
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company.scala

@@ -87,7 +87,7 @@ case class company() extends CompanyDynamicHandle {
         , rowkey
         , super.get_sub_info_type()
         , v
-        , "建议"
+        , null
       )
     })
   }
@@ -101,12 +101,4 @@ case class company() extends CompanyDynamicHandle {
    */
   override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = null
 
-  /**
-   * 变更内容
-   *
-   * @param old_map
-   * @param new_map
-   * @return
-   */
-  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = null
 }

+ 4 - 4
src/main/scala/com/winhc/bigdata/spark/utils/AsyncExtract.scala

@@ -16,23 +16,23 @@ object AsyncExtract {
 
   def startAndWait(spark: SparkSession, seq: Seq[(String, () => Boolean)]): Unit = {
     start(seq)
-    wait(spark)
+    wait(spark, jobSize = seq.length)
   }
 
   def start(seq: Seq[(String, () => Boolean)]): Unit = {
     AsyncExtract(seq).start()
   }
 
-  def wait(spark: SparkSession): Unit = {
+  def wait(spark: SparkSession, jobSize: Int = 0): Unit = {
     val tracker = spark.sparkContext.statusTracker
     var i = 0
     while (!tracker.getActiveJobIds().nonEmpty && i < 100) {
       i = i + 1
-      println("await job 。。。")
+      println("await job...")
       Thread.sleep(10000)
     }
     println(tracker.getActiveJobIds().mkString(","))
-    while (tracker.getActiveJobIds().nonEmpty) {
+    while (tracker.getActiveJobIds().nonEmpty || tracker.getJobIdsForGroup(null).length < jobSize) {
       println(tracker.getActiveJobIds().mkString(","))
       println("spark is not stop ! ")
       Thread.sleep(10000)