Browse Source

加入es client

许家凯 4 năm trước cách đây
mục cha
commit
eeca257202
2 tập tin đã thay đổi với 94 bổ sung0 xóa
  1. 6 0
      pom.xml
  2. 88 0
      src/main/scala/com/winhc/bigdata/spark/utils/EsRestUtils.scala

+ 6 - 0
pom.xml

@@ -295,6 +295,12 @@
             <artifactId>snakeyaml</artifactId>
             <version>1.17</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.elasticsearch.client</groupId>
+            <artifactId>rest</artifactId>
+            <version>5.5.3</version>
+        </dependency>
     </dependencies>
 
     <build>

+ 88 - 0
src/main/scala/com/winhc/bigdata/spark/utils/EsRestUtils.scala

@@ -0,0 +1,88 @@
+package com.winhc.bigdata.spark.utils
+
+
+import java.util.Collections
+
+import com.winhc.bigdata.spark.const.EnvConst
+import org.apache.http.HttpHost
+import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials}
+import org.apache.http.entity.ContentType
+import org.apache.http.impl.client.BasicCredentialsProvider
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder
+import org.apache.http.nio.entity.NStringEntity
+import org.apache.http.util.EntityUtils
+import org.elasticsearch.client.{RestClient, RestClientBuilder}
+
+import scala.util.parsing.json.JSON;
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/7/10 10:10
+ * @Description:
+ */
+object EsRestUtils {
+  def regJson(json: Option[Any]) = json match {
+    case Some(map: Map[String, Any]) => map
+    //      case None => "erro"
+    //      case other => "Unknow data structure : " + other
+  }
+
+  def getIndexResult(json: String): List[Map[String, Any]] = {
+    regJson(JSON.parseFull(json))("hits").asInstanceOf[Map[String, Any]]("hits").asInstanceOf[List[Map[String, Any]]]
+  }
+
+  def getRestClient(): RestClient = {
+    val credentialsProvider = new BasicCredentialsProvider();
+    credentialsProvider.setCredentials(AuthScope.ANY,
+      new UsernamePasswordCredentials("elastic", "elastic_168"));
+    // 单击所创建的Elasticsearch实例ID,在基本信息页面获取公网地址,即为HOST。
+    val restClient = RestClient.builder(new HttpHost(EnvConst.getEnv().getValue("es.nodes"), 9200))
+      .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
+        override def customizeHttpClient(httpClientBuilder: HttpAsyncClientBuilder): HttpAsyncClientBuilder = httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
+      }).build();
+    restClient
+  }
+
+  def getCidByCompanyName(restClient: RestClient, companyName: String): String = {
+    val query =
+      s"""
+         |{
+         |  "_source": {
+         |     "includes": [ "_id" ]
+         |   },
+         |  "query": {
+         |    "term": {
+         |      "cname.value.keyword": "${BaseUtil.cleanup(companyName)}"
+         |    }
+         |  }
+         |}
+         |""".stripMargin
+    val entity = new NStringEntity(query, ContentType.APPLICATION_JSON)
+
+
+    val indexResponse = restClient.performRequest(
+      "GET",
+      "/winhc-company/company/_search",
+      Collections.emptyMap[String, String](),
+      //      Collections.singletonMap("pretty", "true"),
+      entity);
+
+    val en = indexResponse.getEntity
+    val res = EntityUtils.toString(en)
+    val list = getIndexResult(res)
+    if (list.nonEmpty) {
+      list.head("_id").asInstanceOf[String]
+    } else {
+      ""
+    }
+  }
+
+
+  def main(args: Array[String]): Unit = {
+    val restClient = getRestClient()
+    val id = getCidByCompanyName(restClient, "珠海格力电器股份有限公司")
+    println(id)
+    restClient.close()
+  }
+
+}