Browse Source

Merge remote-tracking branch 'origin/master'

xufei 4 năm trước cách đây
mục cha
commit
552ee1d9fd

+ 2 - 1
src/main/scala/com/winhc/bigdata/spark/utils/PhoenixHelper.scala

@@ -1,6 +1,7 @@
-package com.winhc.bigdata.spark.utils
+package com.winhc.bigdata.spark.implicits
 
 import com.winhc.bigdata.spark.const.BaseConst
+import com.winhc.bigdata.spark.utils.PhoenixUtil
 import org.apache.spark.sql.DataFrame
 
 /**

+ 67 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyIndexSave2Es.scala

@@ -0,0 +1,67 @@
+package com.winhc.bigdata.spark.jobs
+
+import com.winhc.bigdata.spark.utils.{EsUtils, SparkUtils}
+import org.datanucleus.util.StringUtils
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/6/5 14:28
+ * @Description:
+ */
+object CompanyIndexSave2Es {
+  val pattern = "[^\\u4e00-\\u9fa50-9a-zA-Z]".r
+
+  case class CompanyName(show: String, value: String) extends Serializable
+
+  case class CompanyDoc(cname: CompanyName, current_id: String = null, history_name: Seq[CompanyName] = null) extends Serializable
+
+  def getEsDoc(cid: String, cname: String, other_id_name: scala.collection.Map[String, String], new_cid: String): (String, CompanyDoc) = {
+    var history_name: Seq[CompanyName] = null
+    if (other_id_name != null)
+      if (new_cid != null) {
+        history_name = other_id_name
+          .filterKeys(!new_cid.equals(_))
+          .values
+          .map(getCompanyName)
+          .toSeq
+      } else {
+        history_name = other_id_name
+          .values
+          .map(getCompanyName)
+          .toSeq
+      }
+    if (cid.equals(new_cid)) {
+      (cid, CompanyDoc(getCompanyName(cname), null, history_name))
+    } else {
+      (cid, CompanyDoc(getCompanyName(cname), new_cid, history_name))
+    }
+  }
+
+
+  private def getCompanyName(name: String): CompanyName = {
+    if (StringUtils.isEmpty(name)) {
+      null
+    } else {
+      val value = pattern replaceAllIn(name, "")
+      Map("show" -> name, "value" -> value)
+      CompanyName(name, value)
+    }
+  }
+
+  def main(args: Array[String]): Unit = {
+    val map = EsUtils.getEsConfigMap
+
+    val spark = SparkUtils.InitEnv("CompanyIndexSave2Es", map)
+    import org.elasticsearch.spark._
+    import spark.implicits._
+    val df = spark.sql("select cid,cname,other_id_name,new_cid from company_name_mapping_pro")
+    df.map(r => {
+      val cid = r.getString(0)
+      val cname = r.getString(1)
+      val other_id_name = r.getMap[String, String](2)
+      val new_cid = r.getString(3)
+      getEsDoc(cid, cname, other_id_name, new_cid)
+    }).rdd.saveToEsWithMeta("winhc-company/company")
+    spark.stop()
+  }
+}

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/test/TestOps2Phoenix.scala

@@ -28,7 +28,7 @@ object TestOps2Phoenix {
       (r: Record, schema: TableSchema) => (r.getBigint(0), r.getString(1), r.getString(2), r.getString(3), r.getString(4))
     )
     rdd_2.foreach(println(_))
-    import com.winhc.bigdata.spark.utils.PhoenixHelper._
+    import com.winhc.bigdata.spark.implicits.PhoenixHelper._
 
     rdd_2.toDF("\"id\"", "\"category_code\"", "\"category_str\"", "\"category_str_middle\"", "\"category_str_big\"")
       .save2Phoenix("CONST_COMPANY_CATEGORY_CODE")

+ 27 - 0
src/main/scala/com/winhc/bigdata/spark/utils/EsUtils.scala

@@ -0,0 +1,27 @@
+package com.winhc.bigdata.spark.utils
+
+import com.winhc.bigdata.spark.utils.BaseUtil._
+
+import scala.collection.mutable
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/6/5 09:53
+ * @Description:
+ */
+object EsUtils {
+
+  def getEsConfigMap: mutable.Map[String, String] = {
+    val map = mutable.Map(
+      "es.nodes.wan.only" -> "true",
+      "es.internal.es.version" -> "5.5.3",
+      "es.nodes" -> (if (isWindows) "es-cn-0pp0r32zf000ipovd.public.elasticsearch.aliyuncs.com" else "es-cn-0pp0r32zf000ipovd.elasticsearch.aliyuncs.com"),
+      "es.port" -> "9200",
+      "es.index.auto.create" -> "true",
+      "es.net.http.auth.user" -> "elastic",
+      "es.net.http.auth.pass" -> "elastic_168"
+    )
+    map
+  }
+
+}