Procházet zdrojové kódy

Merge remote-tracking branch 'origin/master'

xufei před 4 roky
rodič
revize
1cb7989763

+ 10 - 0
src/main/scala/com/winhc/bigdata/spark/utils/PhoenixHelper.scala

@@ -11,6 +11,11 @@ import org.apache.spark.sql.DataFrame
 object PhoenixHelper {
 
   implicit class DataFrameEnhancer(df: DataFrame) {
+    /**
+     * 只有 catalog为in-memory方式可用
+     *
+     * @param tableName
+     */
     def save2Phoenix(tableName: String): Unit = {
       val tmpTable = "tmp_" + tableName
       df.sparkSession.sql(PhoenixUtil.getPhoenixTempView(tmpTable, tableName))
@@ -19,6 +24,11 @@ object PhoenixHelper {
         .insertInto(tmpTable + BaseConst.PHOENIX_TABLE_NAME_FLAG)
     }
 
+    /**
+     * catalog不限
+     *
+     * @param tableName
+     */
     def save2PhoenixByJDBC(tableName: String): Unit = {
       df.write
         .mode("append")

+ 1 - 1
src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala

@@ -144,7 +144,7 @@ object JdbcUtils extends Logging {
     }
     val placeholders = rddSchema.fields.map(_ => "?").mkString(",")
     if (table.contains(BaseConst.PHOENIX_TABLE_NAME_FLAG)) {
-      s"UPSERT INTO ${table.replace("\001", "")} ($columns) VALUES ($placeholders)"
+      s"UPSERT INTO ${table.replace(BaseConst.PHOENIX_TABLE_NAME_FLAG, "")} ($columns) VALUES ($placeholders)"
     } else {
       s"INSERT INTO $table ($columns) VALUES ($placeholders)"
     }