Browse Source

add phoenix test

许家凯 4 years ago
parent
commit
663dabba20

+ 51 - 0
src/main/scala/com/winhc/bigdata/spark/test/TestSpark2AliPhoenix.scala

@@ -0,0 +1,51 @@
+package com.winhc.bigdata.spark.test
+
+import com.winhc.bigdata.spark.utils.SparkUtils
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.mutable
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/6/2 16:56
+ * @Description:
+ */
+object TestSpark2AliPhoenix {
+  def main(args: Array[String]): Unit = {
+   /* val sparkSession = SparkSession
+      .builder()
+      .enableHiveSupport() //使用hive-metastore后通过beenline可以查看到代码中创建的表。
+      .appName("scala spark on Phoenix5.x test")
+      .getOrCreate()
+*/
+   val map = mutable.Map[String, String](
+//     "odps.bearer.token" -> "uNJOBskzcDqHq1TYG3m2rebR4c1009"
+//     ,"spark.hadoop.odps.cupid.bearer.token.enable" -> "false"
+   )
+    SparkUtils.PhoenixOptions("\"company_abnormal_info\"")("zkUrl")
+    val sparkSession = SparkUtils.InitEnv("scala spark on Phoenix5.x test",map)
+
+    val phoenixTableName = "TEST_P"
+    val sparkTableName = "test_spark"
+    val queryServerAddress = "http://hb-uf6as8i6h85k02092-001.hbase.rds.aliyuncs.com:8765"
+//    val queryServerAddress = "http://hb-proxy-pub-uf6as8i6h85k02092-001.hbase.rds.aliyuncs.com:8765"
+
+    val driver = "org.apache.phoenix.queryserver.client.Driver"
+    val url = "jdbc:phoenix:thin:url=" + queryServerAddress + ";serialization=PROTOBUF"
+
+   sparkSession.sql(s"drop table if exists $sparkTableName")
+    val createCmd = "CREATE TABLE " +
+      sparkTableName +
+      " USING org.apache.spark.sql.jdbc\n" +
+      "OPTIONS (\n" +
+      "  'driver' '" + driver + "',\n" +
+      "  'url' '" + url + "',\n" +
+      "  'dbtable' '" + phoenixTableName + "',\n" +
+      "  'fetchsize' '" + 100 + "'\n" +
+      ")"
+    println(" createCmd: \n" + createCmd)
+    sparkSession.sql(createCmd)
+    sparkSession.sql(s"select * from $sparkTableName").show
+    sparkSession.stop()
+  }
+}

+ 23 - 0
src/main/scala/com/winhc/bigdata/spark/test/TestSpark2Phoenix.scala

@@ -0,0 +1,23 @@
+package com.winhc.bigdata.spark.test
+
+//import org.apache.phoenix.spark._
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/6/2 13:49
+ * @Description:
+ */
+object TestSpark2Phoenix {
+  def main(args: Array[String]): Unit = {
+ /*   val spark = SparkUtils.InitEnv("testSpark2Phoenix")
+
+    val df1 = spark.sqlContext.phoenixTableAsDataFrame("\"company_abnormal_info\"", Seq("rowkey", "ncid"), zkUrl = Some(SparkUtils.PhoenixOptions("\"company_abnormal_info\"")("zkUrl")))
+    df1.show()
+
+    val df = spark.createDataFrame(Seq(("xjkxjk", "ncid_value", "ncname_value"))).toDF("\"rowkey\"", "\"ncid\"", "\"ncname\"")
+
+    df.saveToPhoenix(SparkUtils.PhoenixOptions("\"company_abnormal_info\""))
+
+    spark.stop()*/
+  }
+}

+ 10 - 0
src/main/scala/com/winhc/bigdata/spark/utils/SparkUtils.scala

@@ -9,6 +9,16 @@ import scala.collection.mutable
 
 object SparkUtils {
 
+  def PhoenixOptions(tableName: String): Map[String, String] = {
+    if (System.getProperty("os.name").contains("Windows")) {
+      import com.alibaba.dcm.DnsCacheManipulator
+      DnsCacheManipulator.setDnsCache("hb-uf6as8i6h85k02092-001.hbase.rds.aliyuncs.com", "47.101.251.157")
+      Map("table" -> tableName, "zkUrl" -> "hb-proxy-pub-uf6as8i6h85k02092-001.hbase.rds.aliyuncs.com:2181")
+    } else {
+      Map("table" -> tableName, "zkUrl" -> "hb-uf6as8i6h85k02092-001.hbase.rds.aliyuncs.com:2181")
+    }
+  }
+
   def HBaseOutputJobConf(outputTable: String): JobConf = {
     val config = HBaseConfiguration.create()
     var zkAddress: String = null