xufei 4 yıl önce
ebeveyn
işleme
d7d4dede45

+ 8 - 4
src/main/scala/com/winhc/bigdata/spark/jobs/chance/ChangeExtract.scala

@@ -47,6 +47,10 @@ object ChangeExtract {
 
     val target_eci_change_extract = "ads_change_extract"
 
+    val updateTimeMapping = Map(
+      "wenshu_detail_combine" -> "update_date" //文书排序时间
+    )
+
     def calc(isCopy: Boolean = true): Unit = {
       val cols = primaryFields.filter(!_.equals(primaryKey)).seq
 
@@ -101,7 +105,7 @@ object ChangeExtract {
                    |                     ,c
                    |             FROM    (
                    |                         SELECT  a.*
-                   |                                 ,row_number() OVER (PARTITION BY a.${primaryKey} ORDER BY update_time DESC) c
+                   |                                 ,row_number() OVER (PARTITION BY a.${primaryKey} ORDER BY ${updateTimeMapping.getOrElse(tableName, "update_time")} DESC) c
                    |                         FROM    (
                    |                                     SELECT  ${intersectCols.mkString(",")}
                    |                                     FROM    $project.ads_$tableName
@@ -137,7 +141,7 @@ object ChangeExtract {
                    |                     ,coalesce(mm.new_cid,tmp.$cid) AS $cid
                    |             FROM    (
                    |                         SELECT  a.*
-                   |                                 ,row_number() OVER (PARTITION BY a.${primaryKey} ORDER BY update_time DESC) c
+                   |                                 ,row_number() OVER (PARTITION BY a.${primaryKey} ORDER BY ${updateTimeMapping.getOrElse(tableName, "update_time")} DESC) c
                    |                         FROM    (
                    |                                     SELECT  ${intersectCols.mkString(",")}
                    |                                     FROM    $project.ads_$tableName
@@ -176,7 +180,7 @@ object ChangeExtract {
                |             SELECT  tmp.*
                |             FROM    (
                |                         SELECT  a.*
-               |                                 ,row_number() OVER (PARTITION BY a.${primaryKey} ORDER BY update_time DESC) c
+               |                                 ,row_number() OVER (PARTITION BY a.${primaryKey} ORDER BY ${updateTimeMapping.getOrElse(tableName, "update_time")} DESC) c
                |                         FROM    (
                |                                     SELECT  ${intersectCols.mkString(",")}
                |                                     FROM    $project.ads_$tableName
@@ -319,7 +323,7 @@ object ChangeExtract {
     , Args(tableName = "company_license_creditchina", primaryFields = "licence_content")//行政许可-信用中国
     , Args(tableName = "company_license_entpub", primaryFields = "license_name")//行政许可-企业公示
     , Args(tableName = "company_license", primaryFields = "license_name")//行政许可
-    , Args(tableName = "wenshu_detail_combine", primaryFields = "license_name")//文书
+    , Args(tableName = "wenshu_detail_combine", primaryFields = "cname")//文书
 
     , Args(tableName = "company_certificate", primaryFields = "type")
     , Args(tableName = "company_abnormal_info", primaryFields = "remove_reason")

+ 8 - 2
src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_license.scala

@@ -2,7 +2,7 @@
 package com.winhc.bigdata.spark.jobs.chance.table
 
 import com.winhc.bigdata.spark.jobs.chance.CompanyChangeHandle
-import com.winhc.bigdata.spark.utils.ChangeExtractUtils
+import com.winhc.bigdata.spark.utils.{ChangeExtractUtils, DateUtils}
 import org.apache.commons.lang3.StringUtils
 
 /**
@@ -22,7 +22,13 @@ case class company_license(equCols: Seq[String]) extends CompanyChangeHandle wit
     if(StringUtils.isBlank(newMap("start_date"))){
       newMap("update_time")
     }else{
-      newMap("start_date").replaceAll("年","-").replaceAll("月","-").replaceAll("日","-")
+      val s = newMap("start_date").replaceAll("年","-").replaceAll("月","-")
+        .replaceAll("日","").replaceAll("/","-")
+      if(s.size >=10 && DateUtils.isLegalDate(s.substring(0,10))){
+        s.substring(0,10)
+      }else{
+        newMap("update_time")
+      }
     }
   }
 }

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_license_entpub.scala

@@ -22,7 +22,7 @@ case class company_license_entpub(equCols: Seq[String]) extends CompanyChangeHan
     if(StringUtils.isBlank(newMap("start_date"))){
       newMap("update_time")
     }else{
-      newMap("start_date").replaceAll("年","-").replaceAll("月","-").replaceAll("日","-")
+      newMap("start_date").replaceAll("年","-").replaceAll("月","-").replaceAll("日","")
     }
   }
 }

+ 22 - 2
src/main/scala/com/winhc/bigdata/spark/utils/DateUtils.scala

@@ -10,12 +10,17 @@ import java.text.SimpleDateFormat
 object DateUtils {
 
   def toUnixTimestamp(date: String, pattern: String = "yyyy-MM-dd HH:mm:ss"): Long = {
-    val fm = new SimpleDateFormat(pattern)
+    var p ="yyyy-MM-dd HH:mm:ss"
+    if (date.length == 10) {
+      p = "yyyy-MM-dd"
+    }
+    val fm = new SimpleDateFormat(p)
     fm.parse(date).getTime / 1000
   }
 
   /**
    * 获取第一个不为空的字符串
+   *
    * @param date
    * @return
    */
@@ -72,8 +77,23 @@ object DateUtils {
     }
   }
 
+  def isLegalDate(date: String): Boolean = {
+    try {
+    var p ="yyyy-MM-dd HH:mm:ss"
+    if (date.length == 10) {
+      p = "yyyy-MM-dd"
+    }
+    val fm = new SimpleDateFormat(p)
+      val date1 = fm.parse(date)
+      date.equals(fm.format(date1))
+    }catch {
+      case e:Exception => false
+    }
+  }
+
   def main(args: Array[String]): Unit = {
-    println(getNotNullStr(null,"2003-10-12 10:00:00", null, "2003-11-12 00:00:02"))
+    println(isLegalDate("2003-10-12 10:00:00"))
+//    println(getNotNullStr(null, "2003-10-12 10:00:00", null, "2003-11-12 00:00:02"))
   }
 
 }