Browse Source

feat: 公司索引更新至v7

许家凯 4 years ago
parent
commit
aa766f172c

+ 25 - 23
src/main/scala/com/winhc/bigdata/spark/implicits/CompanyIndexSave2EsHelper.scala

@@ -3,6 +3,7 @@ package com.winhc.bigdata.spark.implicits
 import java.time.LocalDate
 import java.time.format.DateTimeFormatter
 
+import com.winhc.bigdata.spark.utils.DateUtils
 import org.apache.commons.lang3.StringUtils
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.sql.DataFrame
@@ -34,6 +35,10 @@ object CompanyIndexSave2EsHelper {
     , "reg_capital_amount" // 注册资本,数值类型
     , "phones" //电话
     , "emails" //邮箱
+
+    , "legal_entity_id" //法人id
+    , "legal_entity_type" //法人类型,1 人 2 公司
+    , "logo" //公司logo
   )
 
   implicit class DataFrameEnhancer(df: DataFrame) {
@@ -45,21 +50,21 @@ object CompanyIndexSave2EsHelper {
         }).toMap
         getEsDoc(map, category_code, area_code)
       })
-        .saveToEsWithMeta("winhc-company-v6/company")
+        .saveToEsWithMeta("winhc-company-v7/company")
     }
 
-   /* def test2Es(spark:SparkSession,category_code: Broadcast[Map[String, Seq[String]]], area_code: Broadcast[Map[String, Seq[String]]]): Unit = {
-      val rdd = df.select(companyIndexFields.map(column => col(column).cast("string")): _*)
-        .rdd.map(r => {
-        val map = companyIndexFields.map(f => {
-          (f, r.getAs[String](f))
-        }).toMap
-        getEsDoc(map, category_code, area_code)
-      }).map(r=>Row(r._1,r._2.estiblish_time))
-      spark.createDataFrame(rdd,StructType(Array(StructField("cid",StringType),StructField("time",StringType))))
-       .write.mode("overwrite")
-       .insertInto("winhc_eci_dev.xjk_test_save_2_es_0721")
-    }*/
+    /* def test2Es(spark:SparkSession,category_code: Broadcast[Map[String, Seq[String]]], area_code: Broadcast[Map[String, Seq[String]]]): Unit = {
+       val rdd = df.select(companyIndexFields.map(column => col(column).cast("string")): _*)
+         .rdd.map(r => {
+         val map = companyIndexFields.map(f => {
+           (f, r.getAs[String](f))
+         }).toMap
+         getEsDoc(map, category_code, area_code)
+       }).map(r=>Row(r._1,r._2.estiblish_time))
+       spark.createDataFrame(rdd,StructType(Array(StructField("cid",StringType),StructField("time",StringType))))
+        .write.mode("overwrite")
+        .insertInto("winhc_eci_dev.xjk_test_save_2_es_0721")
+     }*/
   }
 
   case class Geo(lat: String, lon: String)
@@ -90,6 +95,9 @@ object CompanyIndexSave2EsHelper {
                          , reg_capital_amount: String
                          , phones: Seq[String]
                          , emails: Seq[String]
+                         , legal_entity_id: String
+                         , legal_entity_type: String
+                         , logo: String
                        )
 
   val pattern = "[^\\u4e00-\\u9fa50-9a-zA-Z]".r
@@ -118,17 +126,8 @@ object CompanyIndexSave2EsHelper {
     val category_third = category._4
 
     val et = map("estiblish_time")
-    var time = if (StringUtils.isNotBlank(et)) {
-      if (et.contains(" ")) {
-        et.split(" ")(0)
-      } else {
-        et
-      }
-    } else null
 
-    if(!validateDf(time)){
-      time = null
-    }
+    val time: String = DateUtils.toMillisTimestamp(date = et)
 
     val doc = CompanyDoc(
       cname = getCompanyName(map("name"))
@@ -154,6 +153,9 @@ object CompanyIndexSave2EsHelper {
       , reg_capital_amount = map("reg_capital_amount")
       , phones = getSplit(map("phones"))
       , emails = getSplit(map("emails"))
+      , legal_entity_id = map("legal_entity_id")
+      , legal_entity_type = map("legal_entity_type")
+      , logo = map("logo")
     )
     (map("cid"), doc)
   }

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyIndexSave2Es.scala

@@ -25,11 +25,11 @@ object CompanyIndexSave2Es {
       val all_company_max_ds = getLastPartitionsOrElse(s"${project}.ads_company", "0")
       val code = code2Name()
 
-      sql(
+      println(
         s"""
            |DROP TABLE IF EXISTS winhc_eci_dev.$tmp_table
            |""".stripMargin)
-      sql(
+      println(
         s"""
            |CREATE TABLE IF NOT EXISTS winhc_eci_dev.$tmp_table AS
            |SELECT  ${companyIndexFields.map(f => if (f.eq("estiblish_time")) "date_format(tmp.estiblish_time,'yyyy-MM-dd') estiblish_time" else "tmp." + f).mkString(",")}

+ 9 - 2
src/main/scala/com/winhc/bigdata/spark/udf/BaseFunc.scala

@@ -2,6 +2,7 @@ package com.winhc.bigdata.spark.udf
 
 import com.winhc.bigdata.spark.implicits.CompanyIndexSave2EsHelper
 import com.winhc.bigdata.spark.utils.BaseUtil
+import com.winhc.bigdata.spark.utils.BaseUtil._
 import org.apache.commons.lang3.StringUtils
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.sql.SparkSession
@@ -9,7 +10,6 @@ import org.json4s._
 import org.json4s.jackson.JsonMethods._
 
 import scala.annotation.meta.getter
-import com.winhc.bigdata.spark.utils.BaseUtil._
 
 /**
  * @Author: XuJiakai
@@ -21,6 +21,12 @@ trait BaseFunc {
   private val pattern = "[^\\u4e00-\\u9fa5a-zA-Z \\(\\)().]+".r
 
 
+ /* def to_epoch_millis_timestamp(): Unit = {
+    spark.udf.register("to_epoch_millis_timestamp", (et: String) => {
+      DateUtils.toUnixTimestamp(date = et) * 1000 + 28800000L
+    })
+  }*/
+
   def code2Name(): (Broadcast[Map[String, Seq[String]]], Broadcast[Map[String, Seq[String]]]) = {
     val categoryCode2Name = spark.sparkContext.broadcast(spark.sql(
       s"""
@@ -141,7 +147,8 @@ trait BaseFunc {
       }
     })
   }
-  def justicase_ops() : Unit = {
+
+  def justicase_ops(): Unit = {
     spark.udf.register("get_justicase_id", (case_nos: String) => {
       BKDRHash(case_nos.split(",").sorted.mkString(","))
     })

+ 14 - 1
src/main/scala/com/winhc/bigdata/spark/utils/DateUtils.scala

@@ -34,6 +34,18 @@ object DateUtils {
     }
   }
 
+  def toMillisTimestamp(date: String, pattern: String = "yyyy-MM-dd HH:mm:ss"): String = {
+    if (StringUtils.isEmpty(date)) {
+      return null
+    }
+
+    var p = "yyyy-MM-dd HH:mm:ss"
+    if (date.length == 10) {
+      p = "yyyy-MM-dd"
+    }
+    val fm = new SimpleDateFormat(p)
+    fm.parse(date).getTime + 28800000L + ""
+  }
 
   def toUnixTimestamp(date: String, pattern: String = "yyyy-MM-dd HH:mm:ss"): Long = {
     var p = "yyyy-MM-dd HH:mm:ss"
@@ -139,7 +151,8 @@ object DateUtils {
   }
 
   def main(args: Array[String]): Unit = {
-    println(formatterDate("2017/2/6 0"))
+    println(DateUtils.toMillisTimestamp(date = "2020-09-17 18:02:02"))
+    println(System.currentTimeMillis())
     //    println(getNotNullStr(null, "2003-10-12 10:00:00", null, "2003-11-12 00:00:02"))
   }