|
@@ -0,0 +1,56 @@
|
|
|
+package com.winhc.bigdata.spark.test
|
|
|
+
|
|
|
+import com.winhc.bigdata.spark.utils.SparkUtils
|
|
|
+import org.apache.spark.sql.SparkSession
|
|
|
+
|
|
|
+import scala.collection.mutable
|
|
|
+
|
|
|
+object TestSparkSql4ES {
|
|
|
+ def main(args: Array[String]): Unit = {
|
|
|
+
|
|
|
+ if (args.length != 3) {
|
|
|
+ println("请配置计算资源: instances, cores, memory .")
|
|
|
+ System.exit(-1)
|
|
|
+ }
|
|
|
+
|
|
|
+ var config = mutable.Map.empty[String, String]
|
|
|
+ val Array(instances, cores, memory) = args;
|
|
|
+
|
|
|
+ println(
|
|
|
+ s"""
|
|
|
+ |instances : $instances,
|
|
|
+ |cores : $cores,
|
|
|
+ |memory : $memory
|
|
|
+ |""".stripMargin)
|
|
|
+
|
|
|
+ config = mutable.Map("spark.executor.instances" -> instances,
|
|
|
+ "spark.executor.cores" -> cores,
|
|
|
+ "spark.executor.memory" -> memory,
|
|
|
+ "es.nodes.wan.only" -> "true",
|
|
|
+ "es.internal.es.version" -> "5.5.3",
|
|
|
+ // "es.nodes" -> "es-cn-0pp0r32zf000ipovd.public.elasticsearch.aliyuncs.com",
|
|
|
+ "es.nodes"->"es-cn-0pp0r32zf000ipovd.elasticsearch.aliyuncs.com",
|
|
|
+ "es.port" -> "9200",
|
|
|
+ "es.index.auto.create"->"true",
|
|
|
+ "es.net.http.auth.user"->"elastic",
|
|
|
+ "es.net.http.auth.pass"->"elastic_168"
|
|
|
+ )
|
|
|
+ val localhost:java.net.InetAddress = java.net.InetAddress.getLocalHost();
|
|
|
+ System.out.println("hostName:" + localhost.getHostName());
|
|
|
+ System.out.println("hostAddress:" + localhost.getHostAddress());
|
|
|
+
|
|
|
+ val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
|
|
|
+ /*val df = spark
|
|
|
+ .read
|
|
|
+ .format("es")
|
|
|
+ .options(options)
|
|
|
+ .load("test/user")
|
|
|
+ df.show()*/
|
|
|
+ import org.elasticsearch.spark.sql._
|
|
|
+ val query = "{\"query\":{\"match\":{\"name\":\"zhaoliu\"}}}"
|
|
|
+ val tableName = "test/user"
|
|
|
+ val esDf = spark.esDF(tableName)
|
|
|
+ esDf.show()
|
|
|
+ spark.stop()
|
|
|
+ }
|
|
|
+}
|