Ver código fonte

fix: phoenix加入超时配制

许家凯 4 anos atrás
pai
commit
f49bd3920e

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/config/PhoenixConfig.scala

@@ -29,7 +29,7 @@ object PhoenixConfig {
 
   def getPhoenixJDBCUrl: String = {
     val queryServerAddress: String = EnvConst.getEnv().getValue("phoenix.address")
-    val url = "jdbc:phoenix:thin:url=" + queryServerAddress + ";serialization=PROTOBUF"
+    val url = "jdbc:phoenix:thin:url=" + queryServerAddress + ";serialization=PROTOBUF;phoenix.thin.driver.socket.timeout=60000"
     url
   }
 

+ 6 - 0
src/main/scala/com/winhc/bigdata/spark/implicits/PhoenixHelper.scala

@@ -35,6 +35,12 @@ object PhoenixHelper {
         .mode("append")
         .jdbc(PhoenixConfig.getPhoenixJDBCUrl, tableName, PhoenixConfig.getPhoenixProperties)
     }
+
+    def save2PhoenixByJDBC(tableName: String, cols: Seq[String]): Unit = {
+      import org.apache.spark.sql.functions.col
+      val stringDf = df.select(cols.map(column => col(column).cast("string")): _*)
+      stringDf.save2PhoenixByJDBC(tableName)
+    }
   }
 
 }