Browse Source

add phoenix

许家凯 4 years ago
parent
commit
58bf807021

+ 9 - 21
src/main/scala/com/winhc/bigdata/spark/test/TestSpark2AliPhoenix.scala

@@ -1,7 +1,6 @@
 package com.winhc.bigdata.spark.test
 
 import com.winhc.bigdata.spark.utils.SparkUtils
-import org.apache.spark.sql.SparkSession
 
 import scala.collection.mutable
 
@@ -12,28 +11,17 @@ import scala.collection.mutable
  */
 object TestSpark2AliPhoenix {
   def main(args: Array[String]): Unit = {
-   /* val sparkSession = SparkSession
-      .builder()
-      .enableHiveSupport() //使用hive-metastore后通过beenline可以查看到代码中创建的表。
-      .appName("scala spark on Phoenix5.x test")
-      .getOrCreate()
-*/
-   val map = mutable.Map[String, String](
-//     "odps.bearer.token" -> "uNJOBskzcDqHq1TYG3m2rebR4c1009"
-//     ,"spark.hadoop.odps.cupid.bearer.token.enable" -> "false"
-   )
-    SparkUtils.PhoenixOptions("\"company_abnormal_info\"")("zkUrl")
-    val sparkSession = SparkUtils.InitEnv("scala spark on Phoenix5.x test",map)
-
-    val phoenixTableName = "TEST_P"
+    val map = mutable.Map[String, String](
+      "spark.hadoop.odps.spark.local.partition.amt" -> "100"
+    )
+    val sparkSession = SparkUtils.InitEnv("scala spark on Phoenix5.x test", map)
+    val phoenixTableName = "COMPANY_BID_LIST"
     val sparkTableName = "test_spark"
-    val queryServerAddress = "http://hb-uf6as8i6h85k02092-001.hbase.rds.aliyuncs.com:8765"
-//    val queryServerAddress = "http://hb-proxy-pub-uf6as8i6h85k02092-001.hbase.rds.aliyuncs.com:8765"
 
     val driver = "org.apache.phoenix.queryserver.client.Driver"
-    val url = "jdbc:phoenix:thin:url=" + queryServerAddress + ";serialization=PROTOBUF"
+    val url = SparkUtils.PhoenixUrl
 
-   sparkSession.sql(s"drop table if exists $sparkTableName")
+    sparkSession.sql(s"drop table if exists $sparkTableName")
     val createCmd = "CREATE TABLE " +
       sparkTableName +
       " USING org.apache.spark.sql.jdbc\n" +
@@ -45,7 +33,7 @@ object TestSpark2AliPhoenix {
       ")"
     println(" createCmd: \n" + createCmd)
     sparkSession.sql(createCmd)
-    sparkSession.sql(s"select * from $sparkTableName").show
-    sparkSession.stop()
+    val querySql = "select * from " + sparkTableName + " limit 100"
+    sparkSession.sql(querySql).show
   }
 }

+ 12 - 0
src/main/scala/com/winhc/bigdata/spark/utils/SparkUtils.scala

@@ -9,6 +9,17 @@ import scala.collection.mutable
 
 object SparkUtils {
 
+  def PhoenixUrl: String = {
+    var queryServerAddress: String = null
+    if (System.getProperty("os.name").contains("Windows")) {
+      queryServerAddress = "http://hb-proxy-pub-uf6as8i6h85k02092-001.hbase.rds.aliyuncs.com:8765"
+    } else {
+      queryServerAddress = "http://hb-uf6as8i6h85k02092-001.hbase.rds.aliyuncs.com:8765"
+    }
+    val url = "jdbc:phoenix:thin:url=" + queryServerAddress + ";serialization=PROTOBUF"
+    url
+  }
+
   def PhoenixOptions(tableName: String): Map[String, String] = {
     if (System.getProperty("os.name").contains("Windows")) {
       import com.alibaba.dcm.DnsCacheManipulator
@@ -45,6 +56,7 @@ object SparkUtils {
     val spark = SparkSession
       .builder()
       .appName(appName)
+      .enableHiveSupport()
       .config("spark.sql.broadcastTimeout", 20 * 60)
       .config("spark.sql.crossJoin.enabled", true)
       .config("spark.hadoop.odps.cupid.smartnat.enable", true)