许家凯 4 年之前
父節點
當前提交
d7bf2abd8d
共有 1 個文件被更改,包括 74 次插入18 次删除
  1. 74 18
      src/main/scala/com/winhc/bigdata/spark/implicits/CompanyIndexSave2EsHelper.scala

+ 74 - 18
src/main/scala/com/winhc/bigdata/spark/implicits/CompanyIndexSave2EsHelper.scala

@@ -1,6 +1,10 @@
 package com.winhc.bigdata.spark.implicits
 
+import java.time.LocalDate
+import java.time.format.DateTimeFormatter
+
 import org.apache.commons.lang3.StringUtils
+import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.functions.col
 import org.elasticsearch.spark._
@@ -33,17 +37,29 @@ object CompanyIndexSave2EsHelper {
   )
 
   implicit class DataFrameEnhancer(df: DataFrame) {
-    def companyIndexSave2Es(): Unit = {
-      import df.sparkSession.implicits._
+    def companyIndexSave2Es(category_code: Broadcast[Map[String, Seq[String]]], area_code: Broadcast[Map[String, Seq[String]]]): Unit = {
       df.select(companyIndexFields.map(column => col(column).cast("string")): _*)
         .rdd.map(r => {
         val map = companyIndexFields.map(f => {
           (f, r.getAs[String](f))
         }).toMap
-        getEsDoc(map)
+        getEsDoc(map, category_code, area_code)
       })
-        .saveToEsWithMeta("winhc-company-v5/company")
+        .saveToEsWithMeta("winhc-company-v6/company")
     }
+
+   /* def test2Es(spark:SparkSession,category_code: Broadcast[Map[String, Seq[String]]], area_code: Broadcast[Map[String, Seq[String]]]): Unit = {
+      val rdd = df.select(companyIndexFields.map(column => col(column).cast("string")): _*)
+        .rdd.map(r => {
+        val map = companyIndexFields.map(f => {
+          (f, r.getAs[String](f))
+        }).toMap
+        getEsDoc(map, category_code, area_code)
+      }).map(r=>Row(r._1,r._2.estiblish_time))
+      spark.createDataFrame(rdd,StructType(Array(StructField("cid",StringType),StructField("time",StringType))))
+       .write.mode("overwrite")
+       .insertInto("winhc_eci_dev.xjk_test_save_2_es_0721")
+    }*/
   }
 
   case class Geo(lat: String, lon: String)
@@ -59,11 +75,17 @@ object CompanyIndexSave2EsHelper {
                          , reg_status: String
                          //                         , geo: Geo
                          , province_code: String
+                         , province_name: String
                          , city_code: String
+                         , city_name: String
                          , county_code: String
+                         , county_name: String
                          , reg_location: String
                          , estiblish_time: String
                          , category_code: String
+                         , category_first: String
+                         , category_second: String
+                         , category_third: String
                          , reg_capital: String
                          , reg_capital_amount: String
                          , phones: Seq[String]
@@ -71,8 +93,9 @@ object CompanyIndexSave2EsHelper {
                        )
 
   val pattern = "[^\\u4e00-\\u9fa50-9a-zA-Z]".r
+  //  val time_pattern = ""
 
-  def getEsDoc(map: Map[String, String]): (String, CompanyDoc) = {
+  def getEsDoc(map: Map[String, String], category_map: Broadcast[Map[String, Seq[String]]], area_map: Broadcast[Map[String, Seq[String]]]): (String, CompanyDoc) = {
     val lat = map("lat")
     val lng = map("lng")
     var geo: String = null
@@ -80,19 +103,32 @@ object CompanyIndexSave2EsHelper {
       geo = lat + "," + lng
     }
 
-    val c = get_area_code(map("area_code"))
+    val c = get_area_code(map("area_code"), area_map)
     val province_code = c._1
-    val city_code = c._2
-    val county_code = c._3
+    val province_name = c._2
+    val city_code = c._3
+    val city_name = c._4
+    val county_code = c._5
+    val county_name = c._6
+
+    val category = get_category_code(map("category_code"), category_map)
+    val category_code = category._1
+    val category_first = category._2
+    val category_second = category._3
+    val category_third = category._4
+
     val et = map("estiblish_time")
-    var time = if (StringUtils.isNotBlank(et)){
+    var time = if (StringUtils.isNotBlank(et)) {
       if (et.contains(" ")) {
         et.split(" ")(0)
-      }else{
+      } else {
         et
       }
-    }  else null
+    } else null
 
+    if(!validateDf(time)){
+      time = null
+    }
 
     val doc = CompanyDoc(
       cname = getCompanyName(map("name"))
@@ -103,11 +139,17 @@ object CompanyIndexSave2EsHelper {
       , reg_status = map("reg_status")
       //      , geo = Geo(lat = lng, lon = lat)
       , province_code = province_code
+      , province_name = province_name
       , city_code = city_code
+      , city_name = city_name
       , county_code = county_code
+      , county_name = county_name
       , reg_location = map("reg_location")
       , estiblish_time = time
-      , category_code = map("category_code")
+      , category_code = category_code
+      , category_first = category_first
+      , category_second = category_second
+      , category_third = category_third
       , reg_capital = map("reg_capital")
       , reg_capital_amount = map("reg_capital_amount")
       , phones = getSplit(map("phones"))
@@ -141,15 +183,29 @@ object CompanyIndexSave2EsHelper {
     }
   }
 
-  private def get_area_code(code: String): (String, String, String) = {
+  private def get_area_code(code: String, area_code: Broadcast[Map[String, Seq[String]]]): (String, String, String, String, String, String) = {
     if (StringUtils.isNotBlank(code) && code.trim.length == 6) {
       val c = code.trim
-      (c.substring(0, 2), c.substring(2, 4), c.substring(4, 6))
+      (c.substring(0, 2), get_seq_by_index(area_code, c, 0), c.substring(2, 4), get_seq_by_index(area_code, c, 1), c.substring(4, 6), get_seq_by_index(area_code, c, 2))
+    } else {
+      (null, null, null, null, null, null)
+    }
+  }
+
+  private def get_category_code(code: String, category_code: Broadcast[Map[String, Seq[String]]]): (String, String, String, String) = {
+    if (StringUtils.isNotBlank(code)) {
+      val c = code.trim
+      (c, get_seq_by_index(category_code, c, 0), get_seq_by_index(category_code, c, 1), get_seq_by_index(category_code, c, 2))
     } else {
-      (null, null, null)
+      (null, null, null, null)
     }
   }
 
+  def get_seq_by_index(area_code: Broadcast[Map[String, Seq[String]]], code: String, index: Int): String = {
+    val c = area_code.value.getOrElse(code, null)
+    if (c == null) null else if (c(index) != null) c(index).trim else null
+  }
+
   private def getSplit(str: String): Seq[String] = {
     if (StringUtils.isNotBlank(str)) {
       str.split("\t;\t").filter(StringUtils.isNotBlank).toSet.toList
@@ -158,11 +214,11 @@ object CompanyIndexSave2EsHelper {
     }
   }
 
-  private val DATE_TIME_FORMAT = "yyyy-MM-dd"
+  private val df = DateTimeFormatter.ofPattern("yyyy-MM-dd")
 
   private def validateDf(str: String): Boolean = try {
     if (StringUtils.isNotBlank(str)) {
-      java.time.LocalDateTime.parse(str, java.time.format.DateTimeFormatter.ofPattern(DATE_TIME_FORMAT))
+      LocalDate.parse(str, df)
       true
     } else {
       false
@@ -174,6 +230,6 @@ object CompanyIndexSave2EsHelper {
   }
 
   def main(args: Array[String]): Unit = {
-    println(validateDf("2010-03-03 00:00:00"))
+    println(validateDf("2010-03-03"))
   }
 }