Browse Source

fix:jdbc phoenix

许家凯 5 years ago
parent
commit
044c2fef7a

+ 2 - 1
src/main/scala/com/winhc/bigdata/spark/const/BaseConst.scala

@@ -6,5 +6,6 @@ package com.winhc.bigdata.spark.const
  * @Description:
  */
 object BaseConst {
-  val PHOENIX_TABLE_NAME_FLAG = "\001"
+//  val PHOENIX_TABLE_NAME_FLAG = "\001"
+  val DB_PHOENIX_DRIVER = "org.apache.phoenix.queryserver.client.Driver"
 }

+ 3 - 2
src/main/scala/com/winhc/bigdata/spark/implicits/PhoenixHelper.scala

@@ -22,7 +22,7 @@ object PhoenixHelper {
       df.sparkSession.sql(PhoenixUtil.getPhoenixTempView(tmpTable, tableName))
       df.write
         .mode("append")
-        .insertInto(tmpTable + BaseConst.PHOENIX_TABLE_NAME_FLAG)
+        .insertInto(tmpTable)
     }
 
     /**
@@ -33,7 +33,8 @@ object PhoenixHelper {
     def save2PhoenixByJDBC(tableName: String): Unit = {
       df.write
         .mode("append")
-        .jdbc(PhoenixUtil.getPhoenixJDBCUrl, tableName + BaseConst.PHOENIX_TABLE_NAME_FLAG, PhoenixUtil.getPhoenixProperties)
+        .jdbc(PhoenixUtil.getPhoenixJDBCUrl, tableName, PhoenixUtil.getPhoenixProperties)
+      //        .jdbc(PhoenixUtil.getPhoenixJDBCUrl, tableName + BaseConst.PHOENIX_TABLE_NAME_FLAG, PhoenixUtil.getPhoenixProperties)
     }
   }
 

+ 10 - 23
src/main/scala/com/winhc/bigdata/spark/test/TestSpark2PhoenixJDBC.scala

@@ -1,11 +1,10 @@
 package com.winhc.bigdata.spark.test
 
 import java.util
-import java.util.Properties
 
-import com.winhc.bigdata.spark.utils.{PhoenixUtil, SparkUtils}
+import com.winhc.bigdata.spark.utils.SparkUtils
 import org.apache.spark.sql.Row
-import org.apache.spark.sql.types.{DataType, StringType, StructField, StructType, UserDefinedType, VarcharType}
+import org.apache.spark.sql.types.{DataType, StringType, StructField, StructType, VarcharType}
 
 import scala.collection.mutable
 
@@ -20,34 +19,22 @@ object TestSpark2PhoenixJDBC {
       "spark.hadoop.odps.spark.local.partition.amt" -> "100"
     )
     val sparkSession = SparkUtils.InitEnv("scala spark on Phoenix5.x test", map)
-    val DB_PHOENIX_URL = PhoenixUtil.getPhoenixJDBCUrl
 
-    // JDBC连接属性
-    val SQL_QUERY = " ( SELECT ID,NCID,CID,TITLE FROM COMPANY_BID_LIST limit 10 )  events  "
-    val connProp = PhoenixUtil.getPhoenixProperties
-    val pDf = sparkSession.read.jdbc(DB_PHOENIX_URL, SQL_QUERY, connProp)
-    val sc = pDf.schema
-    println(sc)
-    pDf.printSchema()
-    pDf.show()
-    import sparkSession.implicits._
     import sparkSession._
 
-    var dt:DataType = VarcharType(255)
-//    dt = StringType
+    val dt: DataType = StringType
     val schema = StructType(Array(
-      StructField("k", dt, nullable = false),
-      StructField("s", dt, nullable = true),
-      StructField("time", dt, nullable = true)
+      StructField("ROWKEY", dt, nullable = false),
+      StructField("ID", dt, nullable = false),
+      StructField("NAME", dt, nullable = true),
+      StructField("ADDR", dt, nullable = true)
     )
     )
     val dataList = new util.ArrayList[Row]()
-    dataList.add(Row("1", "2", "null"))
+    dataList.add(Row("adsfa", "1", "2", "null"))
     val df = createDataFrame(dataList, schema)
-
-    df.write
-      .mode("append")
-      .jdbc(DB_PHOENIX_URL, "TEST_P", connProp)
+    import com.winhc.bigdata.spark.implicits.PhoenixHelper._
+    df.save2PhoenixByJDBC("PHX_TEST")
 
     sparkSession.stop()
   }

+ 4 - 3
src/main/scala/com/winhc/bigdata/spark/utils/PhoenixUtil.scala

@@ -2,6 +2,7 @@ package com.winhc.bigdata.spark.utils
 
 import java.util.Properties
 
+import com.winhc.bigdata.spark.const.BaseConst
 import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
 
 /**
@@ -10,7 +11,7 @@ import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
  * @Description:
  */
 object PhoenixUtil {
-  private val DB_PHOENIX_DRIVER = "org.apache.phoenix.queryserver.client.Driver"
+  private val DB_PHOENIX_DRIVER = BaseConst.DB_PHOENIX_DRIVER
   private val DB_PHOENIX_USER = ""
   private val DB_PHOENIX_PASS = ""
   private val DB_PHOENIX_FETCHSIZE = "10000"
@@ -28,9 +29,9 @@ object PhoenixUtil {
   def getPhoenixJDBCUrl: String = {
     var queryServerAddress: String = null
     if (isWindows) {
-      queryServerAddress = "http://hb-proxy-pub-uf6as8i6h85k02092-001.hbase.rds.aliyuncs.com:8765"
+      queryServerAddress = "http://hb-uf6m8e1nu4ivp06m5-proxy-phoenix-pub.hbase.rds.aliyuncs.com:8765"
     } else {
-      queryServerAddress = "http://hb-uf6as8i6h85k02092-001.hbase.rds.aliyuncs.com:8765"
+      queryServerAddress = "http://hb-uf6m8e1nu4ivp06m5-proxy-phoenix.hbase.rds.aliyuncs.com:8765"
     }
     val url = "jdbc:phoenix:thin:url=" + queryServerAddress + ";serialization=PROTOBUF"
     url

+ 7 - 5
src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala

@@ -121,7 +121,9 @@ object JdbcUtils extends Logging {
                           rddSchema: StructType,
                           tableSchema: Option[StructType],
                           isCaseSensitive: Boolean,
-                          dialect: JdbcDialect): String = {
+                          dialect: JdbcDialect,
+                          isUpsert: Boolean
+                        ): String = {
     val columns = if (tableSchema.isEmpty) {
       rddSchema.fields.map(x => dialect.quoteIdentifier(x.name)).mkString(",")
     } else {
@@ -143,8 +145,9 @@ object JdbcUtils extends Logging {
       }.mkString(",")
     }
     val placeholders = rddSchema.fields.map(_ => "?").mkString(",")
-    if (table.contains(BaseConst.PHOENIX_TABLE_NAME_FLAG)) {
-      s"UPSERT INTO ${table.replace(BaseConst.PHOENIX_TABLE_NAME_FLAG, "")} ($columns) VALUES ($placeholders)"
+
+    if (isUpsert) {
+      s"UPSERT INTO $table ($columns) VALUES ($placeholders)"
     } else {
       s"INSERT INTO $table ($columns) VALUES ($placeholders)"
     }
@@ -822,8 +825,7 @@ object JdbcUtils extends Logging {
     val getConnection: () => Connection = createConnectionFactory(options)
     val batchSize = options.batchSize
     val isolationLevel = options.isolationLevel
-
-    val insertStmt = getInsertStatement(table, rddSchema, tableSchema, isCaseSensitive, dialect)
+    val insertStmt = getInsertStatement(table, rddSchema, tableSchema, isCaseSensitive, dialect, BaseConst.DB_PHOENIX_DRIVER.equals(options.driverClass))
     val repartitionedDF = options.numPartitions match {
       case Some(n) if n <= 0 => throw new IllegalArgumentException(
         s"Invalid value `$n` for parameter `${JDBCOptions.JDBC_NUM_PARTITIONS}` in table writing " +