Browse Source

Merge remote-tracking branch 'origin/master'

许家凯 4 years ago
parent
commit
3be3264de2

+ 2 - 6
src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_check_info.scala

@@ -2,7 +2,7 @@
 package com.winhc.bigdata.spark.jobs.chance.table
 
 import com.winhc.bigdata.spark.jobs.chance.CompanyChangeHandle
-import com.winhc.bigdata.spark.utils.ChangeExtractUtils
+import com.winhc.bigdata.spark.utils.{ChangeExtractUtils, DateUtils}
 import org.apache.commons.lang3.StringUtils
 
 /**
@@ -19,10 +19,6 @@ case class company_check_info(equCols: Seq[String]) extends CompanyChangeHandle
   override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap, "抽查检查", Array("check_org", "check_date"))
 
   override def getBizTime(newMap: Map[String, String]): String = {
-    if(StringUtils.isBlank(newMap("check_date"))){
-      newMap("update_time")
-    }else{
-      newMap("check_date")
-    }
+    DateUtils.getBizDate(newMap("check_date"), newMap("update_time"))
   }
 }

+ 2 - 6
src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_court_announcement_list.scala

@@ -2,7 +2,7 @@
 package com.winhc.bigdata.spark.jobs.chance.table
 
 import com.winhc.bigdata.spark.jobs.chance.CompanyChangeHandle
-import com.winhc.bigdata.spark.utils.ChangeExtractUtils
+import com.winhc.bigdata.spark.utils.{ChangeExtractUtils, DateUtils}
 import org.apache.commons.lang3.StringUtils
 
 /**
@@ -19,10 +19,6 @@ case class company_court_announcement_list(equCols: Seq[String]) extends Company
   override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap, "法院公告", Array("bltn_no", "publish_date", "case_no"))
 
   override def getBizTime(newMap: Map[String, String]): String = {
-    if(StringUtils.isBlank(newMap("publish_date"))){
-      newMap("update_time")
-    }else{
-      newMap("publish_date")
-    }
+    DateUtils.getBizDate(newMap("publish_date"), newMap("update_time"))
   }
 }

+ 2 - 6
src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_court_open_announcement_list.scala

@@ -2,7 +2,7 @@
 package com.winhc.bigdata.spark.jobs.chance.table
 
 import com.winhc.bigdata.spark.jobs.chance.CompanyChangeHandle
-import com.winhc.bigdata.spark.utils.ChangeExtractUtils
+import com.winhc.bigdata.spark.utils.{ChangeExtractUtils, DateUtils}
 import org.apache.commons.lang3.StringUtils
 
 /**
@@ -19,10 +19,6 @@ case class company_court_open_announcement_list(equCols: Seq[String]) extends Co
   override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap, "开庭公告", Array("case_no", "start_date"))
 
   override def getBizTime(newMap: Map[String, String]): String ={
-    if(StringUtils.isBlank(newMap("start_date"))){
-      newMap("update_time")
-    }else{
-      newMap("start_date")
-    }
+    DateUtils.getBizDate(newMap("start_date"), newMap("update_time"))
   }
 }

+ 2 - 6
src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_court_register_list.scala

@@ -2,7 +2,7 @@
 package com.winhc.bigdata.spark.jobs.chance.table
 
 import com.winhc.bigdata.spark.jobs.chance.CompanyChangeHandle
-import com.winhc.bigdata.spark.utils.ChangeExtractUtils
+import com.winhc.bigdata.spark.utils.{ChangeExtractUtils, DateUtils}
 import org.apache.commons.lang3.StringUtils
 
 /**
@@ -19,10 +19,6 @@ case class company_court_register_list(equCols: Seq[String]) extends CompanyChan
   override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap, "立案信息", Array("case_no", "filing_date"))
 
   override def getBizTime(newMap: Map[String, String]): String = {
-    if(StringUtils.isBlank(newMap("filing_date"))){
-      newMap("update_time")
-    }else{
-      newMap("filing_date")
-    }
+    DateUtils.getBizDate(newMap("filing_date"), newMap("update_time"))
   }
 }

+ 2 - 6
src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_double_random_check_info.scala

@@ -2,7 +2,7 @@
 package com.winhc.bigdata.spark.jobs.chance.table
 
 import com.winhc.bigdata.spark.jobs.chance.CompanyChangeHandle
-import com.winhc.bigdata.spark.utils.ChangeExtractUtils
+import com.winhc.bigdata.spark.utils.{ChangeExtractUtils, DateUtils}
 import org.apache.commons.lang3.StringUtils
 
 /**
@@ -19,10 +19,6 @@ case class company_double_random_check_info(equCols: Seq[String]) extends Compan
   override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap, "双随机抽查", Array("check_task_num", "check_date"))
 
   override def getBizTime(newMap: Map[String, String]): String = {
-    if(StringUtils.isBlank(newMap("check_date"))){
-      newMap("update_time")
-    }else{
-      newMap("check_date")
-    }
+    DateUtils.getBizDate(newMap("check_date"), newMap("update_time"))
   }
 }

+ 3 - 7
src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_judicial_sale_combine_list.scala

@@ -2,7 +2,7 @@
 package com.winhc.bigdata.spark.jobs.chance.table
 
 import com.winhc.bigdata.spark.jobs.chance.CompanyChangeHandle
-import com.winhc.bigdata.spark.utils.ChangeExtractUtils
+import com.winhc.bigdata.spark.utils.{ChangeExtractUtils, DateUtils}
 import org.apache.commons.lang3.StringUtils
 
 /**
@@ -11,7 +11,7 @@ import org.apache.commons.lang3.StringUtils
  * @Description:司法拍卖
  */
 
-case class company_judicial_sale_combine_list(equCols: Seq[String]) extends CompanyChangeHandle with Serializable  {
+case class company_judicial_sale_combine_list(equCols: Seq[String]) extends CompanyChangeHandle with Serializable {
   override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("title"), s"${newMap("title")}司法拍卖发生变更")
 
   override def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("title"), s"新增${newMap("title")}司法拍卖")
@@ -19,10 +19,6 @@ case class company_judicial_sale_combine_list(equCols: Seq[String]) extends Comp
   override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap, "司法拍卖", Array("title", "pub_time"))
 
   override def getBizTime(newMap: Map[String, String]): String = {
-    if(StringUtils.isBlank(newMap("pub_time"))){
-      newMap("update_time")
-    }else{
-      newMap("pub_time")
-    }
+    DateUtils.getBizDate(newMap("pub_time"), newMap("update_time"))
   }
 }

+ 1 - 11
src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_license.scala

@@ -19,16 +19,6 @@ case class company_license(equCols: Seq[String]) extends CompanyChangeHandle wit
   override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap, "行政许可", Array("license_name","start_date"))
 
   override def getBizTime(newMap: Map[String, String]): String = {
-    if(StringUtils.isBlank(newMap("start_date"))){
-      newMap("update_time")
-    }else{
-      val s = newMap("start_date").replaceAll("年","-").replaceAll("月","-")
-        .replaceAll("日","").replaceAll("/","-")
-      if(s.size >=10 && DateUtils.isLegalDate(s.substring(0,10))){
-        s.substring(0,10)
-      }else{
-        newMap("update_time")
-      }
-    }
+    DateUtils.getBizDate(newMap("start_date"), newMap("update_time"))
   }
 }

+ 2 - 6
src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_license_creditchina.scala

@@ -2,7 +2,7 @@
 package com.winhc.bigdata.spark.jobs.chance.table
 
 import com.winhc.bigdata.spark.jobs.chance.CompanyChangeHandle
-import com.winhc.bigdata.spark.utils.ChangeExtractUtils
+import com.winhc.bigdata.spark.utils.{ChangeExtractUtils, DateUtils}
 import org.apache.commons.lang3.StringUtils
 
 /**
@@ -19,10 +19,6 @@ case class company_license_creditchina(equCols: Seq[String]) extends CompanyChan
   override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap, "行政许可-信用中国", Array("licence_content","decision_date"))
 
   override def getBizTime(newMap: Map[String, String]): String = {
-    if(StringUtils.isBlank(newMap("decision_date"))){
-      newMap("update_time")
-    }else{
-      newMap("decision_date")
-    }
+    DateUtils.getBizDate(newMap("decision_date"), newMap("update_time"))
   }
 }

+ 2 - 6
src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_license_entpub.scala

@@ -2,7 +2,7 @@
 package com.winhc.bigdata.spark.jobs.chance.table
 
 import com.winhc.bigdata.spark.jobs.chance.CompanyChangeHandle
-import com.winhc.bigdata.spark.utils.ChangeExtractUtils
+import com.winhc.bigdata.spark.utils.{ChangeExtractUtils, DateUtils}
 import org.apache.commons.lang3.StringUtils
 
 /**
@@ -19,10 +19,6 @@ case class company_license_entpub(equCols: Seq[String]) extends CompanyChangeHan
   override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap, "行政许可-企业公示", Array("license_name","start_date"))
 
   override def getBizTime(newMap: Map[String, String]): String = {
-    if(StringUtils.isBlank(newMap("start_date"))){
-      newMap("update_time")
-    }else{
-      newMap("start_date").replaceAll("年","-").replaceAll("月","-").replaceAll("日","")
-    }
+    DateUtils.getBizDate(newMap("start_date"), newMap("update_time"))
   }
 }

+ 2 - 6
src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_tax_contravention.scala

@@ -2,7 +2,7 @@
 package com.winhc.bigdata.spark.jobs.chance.table
 
 import com.winhc.bigdata.spark.jobs.chance.CompanyChangeHandle
-import com.winhc.bigdata.spark.utils.ChangeExtractUtils
+import com.winhc.bigdata.spark.utils.{ChangeExtractUtils, DateUtils}
 import org.apache.commons.lang3.StringUtils
 
 /**
@@ -19,10 +19,6 @@ case class company_tax_contravention(equCols: Seq[String]) extends CompanyChange
   override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap, "税收违法", Array("taxpayer_number", "publish_time"))
 
   override def getBizTime(newMap: Map[String, String]): String = {
-    if(StringUtils.isBlank(newMap("publish_time"))){
-      newMap("update_time")
-    }else{
-      newMap("publish_time")
-    }
+    DateUtils.getBizDate(newMap("publish_time"), newMap("update_time"))
   }
 }

+ 2 - 6
src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_zxr_final_case.scala

@@ -2,7 +2,7 @@
 package com.winhc.bigdata.spark.jobs.chance.table
 
 import com.winhc.bigdata.spark.jobs.chance.CompanyChangeHandle
-import com.winhc.bigdata.spark.utils.ChangeExtractUtils
+import com.winhc.bigdata.spark.utils.{ChangeExtractUtils, DateUtils}
 import org.apache.commons.lang3.StringUtils
 
 /**
@@ -19,10 +19,6 @@ case class company_zxr_final_case(equCols: Seq[String]) extends CompanyChangeHan
   override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap, "终本案件", Array("identity_num","case_create_time"))
 
   override def getBizTime(newMap: Map[String, String]): String = {
-    if(StringUtils.isBlank(newMap("case_create_time"))){
-      newMap("update_time")
-    }else{
-      newMap("case_create_time")
-    }
+    DateUtils.getBizDate(newMap("case_create_time"), newMap("update_time"))
   }
 }

+ 2 - 6
src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/wenshu_detail_combine.scala

@@ -2,7 +2,7 @@
 package com.winhc.bigdata.spark.jobs.chance.table
 
 import com.winhc.bigdata.spark.jobs.chance.CompanyChangeHandle
-import com.winhc.bigdata.spark.utils.ChangeExtractUtils
+import com.winhc.bigdata.spark.utils.{ChangeExtractUtils, DateUtils}
 import org.apache.commons.lang3.StringUtils
 
 /**
@@ -19,10 +19,6 @@ case class wenshu_detail_combine(equCols: Seq[String]) extends CompanyChangeHand
   override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap, "裁判文书", Array("case_no", "cname"))
 
   override def getBizTime(newMap: Map[String, String]): String = {
-    if(StringUtils.isBlank(newMap("judge_date"))){
-      newMap("update_date")
-    }else{
-      newMap("judge_date")
-    }
+    DateUtils.getBizDate(newMap("judge_date"), newMap("update_date"))
   }
 }

+ 15 - 19
src/main/scala/com/winhc/bigdata/spark/test/TestChangeExtract.scala

@@ -86,23 +86,19 @@ object TestChangeExtract {
   }
 
   val seq = List(
-//    "company_mortgage_info"
-//    , "company_check_info"
-//    , "company_court_announcement_list"
-//    , "company_court_open_announcement_list"
-//    , "company_court_register_list"
-//    , "company_double_random_check_info"
-//    , "company_judicial_sale_combine_list"
-//    , "company_tax_contravention"
-//    , "company_zxr_final_case"
-//    , "company_license_creditchina"
-//
-//    ,
-//    "company_license_entpub"
-//    ,
-//    "company_license"
-//    ,
-    "wenshu_detail_combine"
+    "company_mortgage_info"
+    , "company_check_info"
+    , "company_court_announcement_list"
+    , "company_court_open_announcement_list"
+    , "company_court_register_list"
+    , "company_double_random_check_info"
+    , "company_judicial_sale_combine_list"
+    , "company_tax_contravention"
+    , "company_zxr_final_case"
+    , "company_license_creditchina"
+    , "company_license_entpub"
+    , "company_license"
+    , "wenshu_detail_combine"
   )
 
   case class TestChangeExtract(s: SparkSession,
@@ -114,8 +110,8 @@ object TestChangeExtract {
 
     def calc2(): Unit = {
       startArgs.filter(s => seq.contains(s.tableName)).foreach(e => {
-          println("______________________________" + e.tableName + "___________________________________")
-          ChangeExtractHandle(spark, e.project, e.tableName, e.primaryKey, ds, e.primaryFields.split(",")).calc(e.isCopy)
+        println("______________________________" + e.tableName + "___________________________________")
+        ChangeExtractHandle(spark, e.project, e.tableName, e.primaryKey, ds, e.primaryFields.split(",")).calc(e.isCopy)
       })
     }
 

+ 15 - 16
src/main/scala/com/winhc/bigdata/spark/test/TestCompanyDynamic.scala

@@ -59,20 +59,19 @@ object TestCompanyDynamic {
     , Args(tableName = "wenshu_detail_combine", bName = 1) //裁判文书
   )
   val seq = List(
-//        "company_mortgage_info"
-//        , "company_check_info"
-        , "company_court_announcement_list"
-//        , "company_court_open_announcement_list"
-        , "company_court_register_list"
-//        , "company_double_random_check_info"
-        , "company_judicial_sale_combine_list"
-//        , "company_tax_contravention"
-//        , "company_zxr_final_case"
-//        , "company_license_creditchina"
-//        ,
-//    "company_license_entpub"
-//        , "company_license"
-//        ,"wenshu_detail_combine"
+    "company_mortgage_info"
+    , "company_check_info"
+    , "company_court_announcement_list"
+    , "company_court_open_announcement_list"
+    , "company_court_register_list"
+    , "company_double_random_check_info"
+    , "company_judicial_sale_combine_list"
+    , "company_tax_contravention"
+    , "company_zxr_final_case"
+    , "company_license_creditchina"
+    , "company_license_entpub"
+    , "company_license"
+    , "wenshu_detail_combine"
   )
 
   def main(args: Array[String]): Unit = {
@@ -98,8 +97,8 @@ object TestCompanyDynamic {
 
     def calc2(): Unit = {
       startArgs.filter(s => seq.contains(s.tableName)).foreach(e => {
-          println("______________________________" + e.tableName + "___________________________________")
-          CompanyDynamicUtil(spark, project, ds).calc(e.tableName, e.bName)
+        println("______________________________" + e.tableName + "___________________________________")
+        CompanyDynamicUtil(spark, project, ds).calc(e.tableName, e.bName)
       })
     }
 

+ 32 - 9
src/main/scala/com/winhc/bigdata/spark/utils/DateUtils.scala

@@ -2,6 +2,8 @@ package com.winhc.bigdata.spark.utils
 
 import java.text.SimpleDateFormat
 
+import org.apache.commons.lang3.StringUtils
+
 /**
  * @Author: XuJiakai
  * @Date: 2020/8/10 11:19
@@ -10,7 +12,7 @@ import java.text.SimpleDateFormat
 object DateUtils {
 
   def toUnixTimestamp(date: String, pattern: String = "yyyy-MM-dd HH:mm:ss"): Long = {
-    var p ="yyyy-MM-dd HH:mm:ss"
+    var p = "yyyy-MM-dd HH:mm:ss"
     if (date.length == 10) {
       p = "yyyy-MM-dd"
     }
@@ -79,21 +81,42 @@ object DateUtils {
 
   def isLegalDate(date: String): Boolean = {
     try {
-    var p ="yyyy-MM-dd HH:mm:ss"
-    if (date.length == 10) {
-      p = "yyyy-MM-dd"
-    }
-    val fm = new SimpleDateFormat(p)
+      var p = "yyyy-MM-dd HH:mm:ss"
+      if (date.length == 10) {
+        p = "yyyy-MM-dd"
+      }
+      val fm = new SimpleDateFormat(p)
       val date1 = fm.parse(date)
       date.equals(fm.format(date1))
-    }catch {
-      case e:Exception => false
+    } catch {
+      case e: Exception => false
+    }
+  }
+
+  /**
+   * 判断bizdate是否合法,否则用update_time替代
+   *
+   * @param date1
+   * @param date2
+   * @return
+   */
+  def getBizDate(date1: String, date2: String): String = {
+    if (StringUtils.isBlank(date1)) {
+      return date2
+    }
+    val s = date1.replaceAll("年", "-").replaceAll("月", "-")
+      .replaceAll("日", "").replaceAll("/", "-")
+
+    if (s.length >= 10 && isLegalDate(s.substring(0, 10))) {
+      s.substring(0, 10)
+    } else {
+      date2
     }
   }
 
   def main(args: Array[String]): Unit = {
     println(isLegalDate("2003-10-12 10:00:00"))
-//    println(getNotNullStr(null, "2003-10-12 10:00:00", null, "2003-11-12 00:00:02"))
+    //    println(getNotNullStr(null, "2003-10-12 10:00:00", null, "2003-11-12 00:00:02"))
   }
 
 }