Procházet zdrojové kódy

另一种spark读取phoenix方式绕开阿里云的bug

yongnian před 4 roky
rodič
revize
076e6ef15d

+ 21 - 0
src/main/scala/com/winhc/bigdata/spark/test/TestSpark2AliPhoenix.scala

@@ -1,5 +1,8 @@
 package com.winhc.bigdata.spark.test
 
+import java.util.Properties
+
+import com.winhc.bigdata.spark.test.newPhoenixTest.{DB_PHOENIX_DRIVER, DB_PHOENIX_FETCHSIZE, DB_PHOENIX_PASS, DB_PHOENIX_URL, DB_PHOENIX_USER, SQL_QUERY}
 import com.winhc.bigdata.spark.utils.SparkUtils
 
 import scala.collection.mutable
@@ -10,17 +13,25 @@ import scala.collection.mutable
  * @Description:
  */
 object TestSpark2AliPhoenix {
+  private val DB_PHOENIX_DRIVER = "org.apache.phoenix.queryserver.client.Driver"
+  private val DB_PHOENIX_URL = "jdbc:phoenix:thin:url=http://hb-uf6as8i6h85k02092-001.hbase.rds.aliyuncs.com:8765;serialization=PROTOBUF"
+  private val DB_PHOENIX_USER = ""
+  private val DB_PHOENIX_PASS = ""
+  private val DB_PHOENIX_FETCHSIZE = "10000"
+
   def main(args: Array[String]): Unit = {
     val map = mutable.Map[String, String](
       "spark.hadoop.odps.spark.local.partition.amt" -> "100"
     )
     val sparkSession = SparkUtils.InitEnv("scala spark on Phoenix5.x test", map)
+
     val phoenixTableName = "COMPANY_BID_LIST"
     val sparkTableName = "test_spark"
 
     val driver = "org.apache.phoenix.queryserver.client.Driver"
     val url = SparkUtils.PhoenixUrl
 
+/*
     sparkSession.sql(s"drop table if exists $sparkTableName")
     val createCmd = "CREATE TABLE " +
       sparkTableName +
@@ -35,5 +46,15 @@ object TestSpark2AliPhoenix {
     sparkSession.sql(createCmd)
     val querySql = "select * from " + sparkTableName + " limit 100"
     sparkSession.sql(querySql).show
+*/
+// JDBC连接属性
+    val SQL_QUERY = " ( SELECT ID,NCID,CID,TITLE FROM COMPANY_BID_LIST limit 10 )  events  "
+    val connProp = new Properties
+    connProp.put("driver", DB_PHOENIX_DRIVER)
+    connProp.put("user", DB_PHOENIX_USER)
+    connProp.put("password", DB_PHOENIX_PASS)
+    connProp.put("fetchsize", DB_PHOENIX_FETCHSIZE)
+    sparkSession.read.jdbc(DB_PHOENIX_URL, SQL_QUERY, connProp).show
+
   }
 }