瀏覽代碼

添加ES访问测试;修改POM.XML

yongnian 4 年之前
父節點
當前提交
efe3bafbe1

+ 62 - 2
pom.xml

@@ -25,6 +25,16 @@
             <groupId>com.aliyun.odps</groupId>
             <artifactId>odps-sdk-core</artifactId>
             <version>${sdk.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>com.aliyun.odps</groupId>
@@ -104,6 +114,48 @@
             <groupId>org.apache.hbase.connectors.spark</groupId>
             <artifactId>hbase-spark</artifactId>
             <version>1.0.0</version>
+            <exclusions>
+                <exclusion>
+                    <groupId> org.apache.spark</groupId>
+                    <artifactId>spark-core_2.11</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId> org.apache.spark</groupId>
+                    <artifactId>spark-launcher_2.11</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId> org.apache.spark</groupId>
+                    <artifactId>spark-kvstore_2.11</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId> org.apache.spark</groupId>
+                    <artifactId>spark-network-common_2.11</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId> org.apache.spark</groupId>
+                    <artifactId>spark-network-shuffle_2.11</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId> org.apache.spark</groupId>
+                    <artifactId>spark-unsafe_2.11</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId> org.apache.spark</groupId>
+                    <artifactId>spark-tags_2.11</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId> org.apache.spark</groupId>
+                    <artifactId>spark-sql_2.11</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId> org.apache.spark</groupId>
+                    <artifactId>spark-sketch_2.11</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId> org.apache.spark</groupId>
+                    <artifactId>spark-catalyst_2.11</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.projectlombok</groupId>
@@ -116,6 +168,13 @@
             <artifactId>dns-cache-manipulator</artifactId>
             <version>1.5.1</version>
         </dependency>
+        <!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark-20 -->
+        <dependency>
+            <groupId>org.elasticsearch</groupId>
+            <artifactId>elasticsearch-spark-20_2.11</artifactId>
+            <version>6.0.0</version>
+        </dependency>
+
     </dependencies>
 
     <build>
@@ -135,10 +194,11 @@
                             <shadedArtifactAttached>true</shadedArtifactAttached>
                             <artifactSet>
                                 <includes>
-                                    <include>cn.hutool:*</include>
-                                    <include>com.aliyun.odps:*</include>
+                                    <include>*:*</include>
+                                    <!--<include>com.aliyun.odps:*</include>
                                     <include>org.mongodb.*:*</include>
                                     <include>org.apache.hbase:*</include>
+                                    <include>org.elasticsearch:*</include>-->
                                 </includes>
                             </artifactSet>
                             <filters>

+ 2 - 1
src/main/scala/com/winhc/bigdata/spark/etl/dwd_company_bid_ods.scala

@@ -44,7 +44,8 @@ object dwd_company_bid_ods {
     import org.apache.spark.sql.functions._
     df.dropDuplicates("link").withColumn("cids",explode(split(col("cids"), ";")))
       .select("id","cids","title","link","intro","abs","publish_time","purchaser","proxy","province","base","type","items","create_time","update_time","deleted")
-      .write.mode("Append")
+      .withColumnRenamed("cids","cid")
+      .write.mode("overwrite")
       .insertInto(resultTable)
 
     println("ETL end!   ")

+ 56 - 0
src/main/scala/com/winhc/bigdata/spark/test/TestSparkSql4ES.scala

@@ -0,0 +1,56 @@
+package com.winhc.bigdata.spark.test
+
+import com.winhc.bigdata.spark.utils.SparkUtils
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.mutable
+
+object TestSparkSql4ES {
+  def main(args: Array[String]): Unit = {
+
+    if (args.length != 3) {
+      println("请配置计算资源: instances, cores, memory .")
+      System.exit(-1)
+    }
+
+    var config = mutable.Map.empty[String, String]
+    val Array(instances, cores, memory) = args;
+
+    println(
+      s"""
+         |instances : $instances,
+         |cores : $cores,
+         |memory : $memory
+         |""".stripMargin)
+
+    config = mutable.Map("spark.executor.instances" -> instances,
+      "spark.executor.cores" -> cores,
+      "spark.executor.memory" -> memory,
+      "es.nodes.wan.only" -> "true",
+      "es.internal.es.version" -> "5.5.3",
+      //      "es.nodes" -> "es-cn-0pp0r32zf000ipovd.public.elasticsearch.aliyuncs.com",
+            "es.nodes"->"es-cn-0pp0r32zf000ipovd.elasticsearch.aliyuncs.com",
+      "es.port" -> "9200",
+      "es.index.auto.create"->"true",
+      "es.net.http.auth.user"->"elastic",
+      "es.net.http.auth.pass"->"elastic_168"
+    )
+    val localhost:java.net.InetAddress = java.net.InetAddress.getLocalHost();
+    System.out.println("hostName:" + localhost.getHostName());
+    System.out.println("hostAddress:" + localhost.getHostAddress());
+
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    /*val df = spark
+      .read
+      .format("es")
+      .options(options)
+      .load("test/user")
+    df.show()*/
+    import org.elasticsearch.spark.sql._
+    val query = "{\"query\":{\"match\":{\"name\":\"zhaoliu\"}}}"
+    val tableName = "test/user"
+    val esDf = spark.esDF(tableName)
+    esDf.show()
+    spark.stop()
+  }
+}