Quellcode durchsuchen

加入隐式转换,写出phoenix两种方式。jdbc和in-memory

许家凯 vor 5 Jahren
Ursprung
Commit
acc7d7770a

+ 10 - 0
src/main/scala/com/winhc/bigdata/spark/const/BaseConst.scala

@@ -0,0 +1,10 @@
+package com.winhc.bigdata.spark.const
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/6/3 19:28
+ * @Description:
+ */
+object BaseConst {
+  val PHOENIX_TABLE_NAME_FLAG = "\001"
+}

+ 4 - 5
src/main/scala/com/winhc/bigdata/spark/test/TestOps2Phoenix.scala

@@ -2,7 +2,7 @@ package com.winhc.bigdata.spark.test
 
 import com.aliyun.odps.TableSchema
 import com.aliyun.odps.data.Record
-import com.winhc.bigdata.spark.utils.{PhoenixUtil, SparkUtils}
+import com.winhc.bigdata.spark.utils.SparkUtils
 import org.apache.spark.odps.OdpsOps
 
 import scala.collection.mutable
@@ -21,7 +21,6 @@ object TestOps2Phoenix {
     val spark = SparkUtils.InitEnv("test ops to phoenix", map)
     val odpsOps = new OdpsOps(spark.sparkContext)
     import spark.implicits._
-    import spark._
 
     val rdd_2 = odpsOps.readTable(
       "winhc_test_dev",
@@ -29,10 +28,10 @@ object TestOps2Phoenix {
       (r: Record, schema: TableSchema) => (r.getBigint(0), r.getString(1), r.getString(2), r.getString(3), r.getString(4))
     )
     rdd_2.foreach(println(_))
-    sql(PhoenixUtil.getPhoenixTempView("tmp_table", "CONST_COMPANY_CATEGORY_CODE"))
+    import com.winhc.bigdata.spark.utils.PhoenixHelper._
+
     rdd_2.toDF("\"id\"", "\"category_code\"", "\"category_str\"", "\"category_str_middle\"", "\"category_str_big\"")
-      .write.mode("append")
-      .insertInto("tmp_table")
+      .save2Phoenix("CONST_COMPANY_CATEGORY_CODE")
 
     spark.stop()
   }

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/test/TestSpark2AliPhoenix.scala

@@ -19,7 +19,7 @@ object TestSpark2AliPhoenix {
     val sparkTableName = "test_spark"
 
     val driver = "org.apache.phoenix.queryserver.client.Driver"
-    val url = PhoenixUtil.getPhoenixUrl
+    val url = PhoenixUtil.getPhoenixJDBCUrl
 
     //    sparkSession.sql("select * from const_company_category_code").show()
 

+ 2 - 12
src/main/scala/com/winhc/bigdata/spark/test/TestSpark2PhoenixJDBC.scala

@@ -15,26 +15,16 @@ import scala.collection.mutable
  * @Description:
  */
 object TestSpark2PhoenixJDBC {
-  private val DB_PHOENIX_DRIVER = "org.apache.phoenix.queryserver.client.Driver"
-  private val DB_PHOENIX_USER = ""
-  private val DB_PHOENIX_PASS = ""
-  private val DB_PHOENIX_FETCHSIZE = "10000"
-
-
   def main(args: Array[String]): Unit = {
     val map = mutable.Map[String, String](
       "spark.hadoop.odps.spark.local.partition.amt" -> "100"
     )
     val sparkSession = SparkUtils.InitEnv("scala spark on Phoenix5.x test", map)
-    val DB_PHOENIX_URL = PhoenixUtil.getPhoenixUrl
+    val DB_PHOENIX_URL = PhoenixUtil.getPhoenixJDBCUrl
 
     // JDBC连接属性
     val SQL_QUERY = " ( SELECT ID,NCID,CID,TITLE FROM COMPANY_BID_LIST limit 10 )  events  "
-    val connProp = new Properties
-    connProp.put("driver", DB_PHOENIX_DRIVER)
-    connProp.put("user", DB_PHOENIX_USER)
-    connProp.put("password", DB_PHOENIX_PASS)
-    connProp.put("fetchsize", DB_PHOENIX_FETCHSIZE)
+    val connProp = PhoenixUtil.getPhoenixProperties
     val pDf = sparkSession.read.jdbc(DB_PHOENIX_URL, SQL_QUERY, connProp)
     val sc = pDf.schema
     println(sc)

+ 29 - 0
src/main/scala/com/winhc/bigdata/spark/utils/PhoenixHelper.scala

@@ -0,0 +1,29 @@
+package com.winhc.bigdata.spark.utils
+
+import com.winhc.bigdata.spark.const.BaseConst
+import org.apache.spark.sql.DataFrame
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/6/4 08:40
+ * @Description:
+ */
+object PhoenixHelper {
+
+  implicit class DataFrameEnhancer(df: DataFrame) {
+    def save2Phoenix(tableName: String): Unit = {
+      val tmpTable = "tmp_" + tableName
+      df.sparkSession.sql(PhoenixUtil.getPhoenixTempView(tmpTable, tableName))
+      df.write
+        .mode("append")
+        .insertInto(tmpTable + BaseConst.PHOENIX_TABLE_NAME_FLAG)
+    }
+
+    def save2PhoenixByJDBC(tableName: String): Unit = {
+      df.write
+        .mode("append")
+        .jdbc(PhoenixUtil.getPhoenixJDBCUrl, tableName + BaseConst.PHOENIX_TABLE_NAME_FLAG, PhoenixUtil.getPhoenixProperties)
+    }
+  }
+
+}

+ 20 - 3
src/main/scala/com/winhc/bigdata/spark/utils/PhoenixUtil.scala

@@ -1,5 +1,7 @@
 package com.winhc.bigdata.spark.utils
 
+import java.util.Properties
+
 import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
 
 /**
@@ -8,7 +10,22 @@ import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
  * @Description:
  */
 object PhoenixUtil {
-  def getPhoenixUrl: String = {
+  private val DB_PHOENIX_DRIVER = "org.apache.phoenix.queryserver.client.Driver"
+  private val DB_PHOENIX_USER = ""
+  private val DB_PHOENIX_PASS = ""
+  private val DB_PHOENIX_FETCHSIZE = "10000"
+
+  def getPhoenixProperties: Properties = {
+    val connProp = new Properties
+    connProp.put("driver", DB_PHOENIX_DRIVER)
+    connProp.put("user", DB_PHOENIX_USER)
+    connProp.put("password", DB_PHOENIX_PASS)
+    connProp.put("fetchsize", DB_PHOENIX_FETCHSIZE)
+    connProp
+  }
+
+
+  def getPhoenixJDBCUrl: String = {
     var queryServerAddress: String = null
     if (isWindows) {
       queryServerAddress = "http://hb-proxy-pub-uf6as8i6h85k02092-001.hbase.rds.aliyuncs.com:8765"
@@ -19,12 +36,12 @@ object PhoenixUtil {
     url
   }
 
-  def getPhoenixTempView(tempViewTableName: String, phoenixTableName: String, fetchsize: Int = 100): String =
+  def getPhoenixTempView(tempViewTableName: String, phoenixTableName: String, fetchsize: Int = 1000): String =
     s"""
        |CREATE TABLE $tempViewTableName USING org.apache.spark.sql.jdbc
        |OPTIONS (
        |  'driver' 'org.apache.phoenix.queryserver.client.Driver',
-       |  'url' '${getPhoenixUrl}',
+       |  'url' '${getPhoenixJDBCUrl}',
        |  'dbtable' '$phoenixTableName',
        |  'fetchsize' '$fetchsize'
        |)

+ 8 - 3
src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala

@@ -20,10 +20,11 @@ package org.apache.spark.sql.execution.datasources.jdbc
 import java.sql.{Connection, Driver, DriverManager, JDBCType, PreparedStatement, ResultSet, ResultSetMetaData, SQLException}
 import java.util.Locale
 
+import com.winhc.bigdata.spark.const.BaseConst
+
 import scala.collection.JavaConverters._
 import scala.util.Try
 import scala.util.control.NonFatal
-
 import org.apache.spark.TaskContext
 import org.apache.spark.executor.InputMetrics
 import org.apache.spark.internal.Logging
@@ -142,7 +143,11 @@ object JdbcUtils extends Logging {
       }.mkString(",")
     }
     val placeholders = rddSchema.fields.map(_ => "?").mkString(",")
-    s"INSERT INTO $table ($columns) VALUES ($placeholders)"
+    if (table.contains(BaseConst.PHOENIX_TABLE_NAME_FLAG)) {
+      s"UPSERT INTO ${table.replace("\001", "")} ($columns) VALUES ($placeholders)"
+    } else {
+      s"INSERT INTO $table ($columns) VALUES ($placeholders)"
+    }
   }
 
   /**
@@ -638,7 +643,7 @@ object JdbcUtils extends Logging {
         conn.setAutoCommit(false) // Everything in the same db transaction.
         conn.setTransactionIsolation(finalIsolationLevel)
       }
-      val stmt = conn.prepareStatement(insertStmt.replace("INSERT", "UPSERT"))
+      val stmt = conn.prepareStatement(insertStmt)
       val setters = rddSchema.fields.map(f => makeSetter(conn, dialect, f.dataType))
       val nullTypes = rddSchema.fields.map(f => getJdbcType(f.dataType, dialect).jdbcNullType)
       val numFields = rddSchema.fields.length