瀏覽代碼

Merge branch 'master' of http://139.224.213.4:3000/bigdata/Spark_Max

# Conflicts:
#	src/main/scala/com/winhc/bigdata/spark/test/Justicase.scala
晏永年 4 年之前
父節點
當前提交
38d7d1bd62

+ 8 - 6
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamic.scala

@@ -3,6 +3,7 @@ package com.winhc.bigdata.spark.jobs.dynamic
 import java.util.Date
 
 import com.winhc.bigdata.spark.config.EsConfig
+import com.winhc.bigdata.spark.jobs.message.IntellectualMessage
 import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
 import com.winhc.bigdata.spark.utils.ReflectUtils.getClazz
 import com.winhc.bigdata.spark.utils.{AsyncExtract, LoggingUtils, SparkUtils}
@@ -184,7 +185,7 @@ object CompanyDynamic {
     , Args(tableName = "bankruptcy_open_case", bName = 1)
     , Args(tableName = "company_illegal_info", bName = 1)
     , Args(tableName = "company_land_publicity", bName = 1)
-    , Args(tableName = "company_employment", bName = 1, isAgg = true)
+    , Args(tableName = "company_employment", bName = 1, aggs = 1)
     , Args(tableName = "company_land_announcement", bName = 1)
     , Args(tableName = "company_bid_list", bName = 2)
     , Args(tableName = "company_land_transfer", bName = 1)
@@ -218,12 +219,13 @@ object CompanyDynamic {
     , Args(tableName = "company_holder", bName = 1) //股东
     , Args(tableName = "company_annual_report_out_investment", bName = 1) //裁判文书
     , Args(tableName = "company_own_tax", bName = 1) //欠税公告
+    , Args(tableName = "intellectual", bName = 1, aggs = 2)//知识产权
   )
 
   private case class Args(project: String = "winhc_eci_dev"
                           , tableName: String
                           , bName: Int = 1
-                          , isAgg: Boolean = false)
+                          , aggs: Int = 0)
 
   def main(args: Array[String]): Unit = {
 
@@ -261,10 +263,10 @@ object CompanyDynamic {
     }
 
     val a = start.map(e => (e.tableName, () => {
-      if (e.isAgg) {
-        CompanyDynamicForDayCount(spark, project, ds).calc(e.tableName, e.bName)
-      } else {
-        cd.calc(e.tableName, e.bName)
+      e.aggs match {
+        case 1 => CompanyDynamicForDayCount(spark, project, ds).calc(e.tableName, e.bName)//招聘
+        case 2 => IntellectualMessage(spark, project).calc()//知识产权
+        case _ => cd.calc(e.tableName, e.bName)//通用处理
       }
       true
     }))

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_land_publicity.scala

@@ -25,7 +25,7 @@ case class company_land_publicity() extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String = new_map("location") + new_map("user_for")
+  override def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String = new_map("location") + new_map("use_for")
 
   /**
    * 变更时间

+ 7 - 6
src/main/scala/com/winhc/bigdata/spark/jobs/message/IntellectualMessage.scala

@@ -5,7 +5,7 @@ import java.util.Date
 import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandleUtils
 import com.winhc.bigdata.spark.udf.MapAggs
 import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
-import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import com.winhc.bigdata.spark.utils.{BaseUtil, DateUtils, LoggingUtils, SparkUtils}
 import org.apache.commons.lang3.time.DateFormatUtils
 import org.apache.spark.sql.SparkSession
 import org.json4s.DefaultFormats
@@ -68,6 +68,7 @@ case class IntellectualMessage(s: SparkSession, project: String,
   def calc(): Unit = {
     println("start calc" + new Date())
     spark.udf.register("col2Map", col2Map _)
+    spark.udf.register("form_date", DateUtils.formatterDate _)
     spark.udf.register("MapAggs", new MapAggs())
 
     sql(
@@ -133,7 +134,7 @@ case class IntellectualMessage(s: SparkSession, project: String,
          |(
          |select *
          |from ${res_tb}
-         |where ds = $ds
+         |where ds = $ds and date is not null
          |)a
          |left join
          |(
@@ -146,8 +147,8 @@ case class IntellectualMessage(s: SparkSession, project: String,
       val cid = r.getAs[String]("cid")
       val cname = r.getAs[String]("cname")
       val info_type = tn
-      val change_time = r.getAs[String]("date")
-      val biz_id = s"${cid}_$change_time"
+      val change_time = r.getAs[String]("date").concat(" 00:00:00")
+      val biz_id = s"${cid}"
       val sub_info_type = ""
       val info_risk_level = "2"
       val winhc_suggest = ""
@@ -215,7 +216,7 @@ case class IntellectualMessage(s: SparkSession, project: String,
       s"""
          |(
          |SELECT  new_cid
-         |        ,SUBSTR(CAST($date AS STRING),1,10) AS DATE
+         |        ,form_date(SUBSTR(CAST($date AS STRING),1,10)) AS DATE
          |        ,sum(CASE WHEN deleted = 0 THEN 1 ELSE 0 END) OVER(PARTITION BY new_cid,$date) inc_cnt
          |        ,0 del_cnt
          |        ,sum(CASE WHEN deleted = 0 THEN 1 ELSE 0 END) OVER(PARTITION BY new_cid) total_cnt
@@ -249,7 +250,7 @@ case class IntellectualMessage(s: SparkSession, project: String,
          |select * from
          |(
          |SELECT  new_cid
-         |        ,SUBSTR(CAST(date AS STRING),1,10) AS DATE
+         |        ,form_date(SUBSTR(CAST(date AS STRING),1,10)) AS DATE
          |        ,count(rowkey) OVER(PARTITION BY new_cid,date) inc_cnt
          |        ,0 del_cnt
          |        ,count(rowkey) OVER(PARTITION BY new_cid) total_cnt

+ 25 - 1
src/main/scala/com/winhc/bigdata/spark/utils/DateUtils.scala

@@ -10,6 +10,30 @@ import org.apache.commons.lang3.StringUtils
  * @Description:
  */
 object DateUtils {
+  private def addZero(str: String): String = {
+    if (str.length == 2) {
+      str
+    } else {
+      "0" + str
+    }
+  }
+
+  def formatterDate(date: String): String = {
+    try {
+      var d = date.replaceAll("[年月日号/]", "-")
+      if (d.contains(" ")) {
+        d = d.split(" ")(0)
+      }
+      val ymd = d.split("-")
+      s"${ymd(0)}-${addZero(ymd(1))}-${addZero(ymd(2))}"
+    } catch {
+      case ex: Exception => {
+        println(date)
+      }
+        date
+    }
+  }
+
 
   def toUnixTimestamp(date: String, pattern: String = "yyyy-MM-dd HH:mm:ss"): Long = {
     var p = "yyyy-MM-dd HH:mm:ss"
@@ -115,7 +139,7 @@ object DateUtils {
   }
 
   def main(args: Array[String]): Unit = {
-    println(isLegalDate("2003-10-12 10:00:00"))
+    println(formatterDate("2017/2/6 0"))
     //    println(getNotNullStr(null, "2003-10-12 10:00:00", null, "2003-11-12 00:00:02"))
   }