Explorar o código

加入工具类,优化写出es

许家凯 %!s(int64=5) %!d(string=hai) anos
pai
achega
51612201b5

+ 13 - 27
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyIndexSave2Es.scala

@@ -11,42 +11,28 @@ import org.datanucleus.util.StringUtils
 object CompanyIndexSave2Es {
   val pattern = "[^\\u4e00-\\u9fa50-9a-zA-Z]".r
 
-  case class CompanyName(show: String, value: String) extends Serializable
+  case class CompanyName(show: String, value: String)
 
-  case class CompanyDoc(cname: CompanyName, current_id: String = null, history_name: Seq[CompanyName] = null) extends Serializable
+  case class CompanyDoc(cname: CompanyName, current_id: String = null, history_name: Seq[CompanyName] = null)
 
   def getEsDoc(cid: String, cname: String, other_id_name: scala.collection.Map[String, String], new_cid: String): (String, CompanyDoc) = {
     var history_name: Seq[CompanyName] = null
-    if (other_id_name != null)
-      if (new_cid != null) {
-        history_name = other_id_name
-          .filterKeys(!new_cid.equals(_))
-          .values
-          .map(getCompanyName)
-          .toSeq
-      } else {
-        history_name = other_id_name
-          .values
-          .map(getCompanyName)
-          .toSeq
+    if (other_id_name != null) {
+      history_name = other_id_name
+        .filterKeys(!_.equals(new_cid))
+        .filterKeys(!_.equals(cid))
+        .values
+        .map(getCompanyName)
+        .toSeq
+      if (history_name.isEmpty) {
+        history_name = null
       }
-    if (cid.equals(new_cid)) {
-      (cid, CompanyDoc(getCompanyName(cname), null, history_name))
-    } else {
-      (cid, CompanyDoc(getCompanyName(cname), new_cid, history_name))
     }
+    (cid, CompanyDoc(getCompanyName(cname), if (cid.equals(new_cid)) null else new_cid, history_name))
   }
 
 
-  private def getCompanyName(name: String): CompanyName = {
-    if (StringUtils.isEmpty(name)) {
-      null
-    } else {
-      val value = pattern replaceAllIn(name, "")
-      Map("show" -> name, "value" -> value)
-      CompanyName(name, value)
-    }
-  }
+  private def getCompanyName(name: String): CompanyName = if (StringUtils.isEmpty(name)) null else CompanyName(name, pattern replaceAllIn(name, ""))
 
   def main(args: Array[String]): Unit = {
     val map = EsUtils.getEsConfigMap

+ 39 - 0
src/main/scala/com/winhc/bigdata/spark/utils/BaseUtil.scala

@@ -1,5 +1,7 @@
 package com.winhc.bigdata.spark.utils
 
+import scala.collection.mutable
+
 /**
  * @Author: XuJiakai
  * @Date: 2020/6/3 18:49
@@ -7,4 +9,41 @@ package com.winhc.bigdata.spark.utils
  */
 object BaseUtil {
   def isWindows: Boolean = System.getProperty("os.name").contains("Windows")
+
+  def getExecutorConfigOrExit(args: Array[String]): mutable.Map[String, String] = {
+    if (args.length != 3) {
+      println("请配置计算资源: instances, cores, memory .")
+      sys.exit(-1)
+    }
+    val Array(instances, cores, memory) = args;
+    getExecutorConfig(instances, cores, memory)
+  }
+
+  def getExecutorConfigOrDefault(args: Array[String]): mutable.Map[String, String] = {
+    var instances, cores, memory: String = null
+    if (args.length != 3) {
+      println("使用默认的计算资源: 2 instances, 2 cores, 10g memory.")
+      instances = "2"
+      cores = "2"
+      memory = "10g"
+    } else {
+      instances = args(0)
+      cores = args(1)
+      memory = args(2)
+    }
+    getExecutorConfig(instances, cores, memory)
+  }
+
+  private def getExecutorConfig(instances: String, cores: String, memory: String): mutable.Map[String, String] = {
+    println(
+      s"""
+         |instances : $instances,
+         |cores : $cores,
+         |memory : $memory
+         |""".stripMargin)
+    mutable.Map("spark.executor.instances" -> instances,
+      "spark.executor.cores" -> cores,
+      "spark.executor.memory" -> memory
+    )
+  }
 }