许家凯 2 vuotta sitten
vanhempi
commit
dfb9d81c80

+ 4 - 1
src/main/scala/com/winhc/bigdata/flink/func/HbaseAsyncFunction.scala

@@ -19,6 +19,9 @@ case class HbaseAsyncFunction() extends RichAsyncFunction[(String, String), (Str
   @transient private var connection: Connection = null
 
 
+
+
+
   override def open(parameters: Configuration): Unit = {
     import com.alibaba.ververica.connector.shaded.hadoop3.org.apache.hadoop.conf.Configuration
     val configuration: Configuration = HbaseConfig.getHbaseConfiguration()
@@ -33,7 +36,7 @@ case class HbaseAsyncFunction() extends RichAsyncFunction[(String, String), (Str
       val table = connection.getTable(TableName.valueOf(tn.toUpperCase())).asInstanceOf[HTable]
       val get = new Get(rowkey.getBytes)
       val result = table.get(get)
-      val string = result.toJsonString
+      val string = result.toJson
       resultFuture.complete(Seq((tn, rowkey, string)))
     } catch {
       case ex: Exception => {

+ 7 - 3
src/main/scala/com/winhc/bigdata/flink/implicits/CaseClass2JsonHelper.scala

@@ -1,9 +1,8 @@
 package com.winhc.bigdata.flink.implicits
 
+import org.apache.hadoop.hbase.client.Result
 import org.json4s.jackson.Serialization
 import org.json4s.{Formats, NoTypeHints}
-
-
 /**
  * @author: XuJiakai
  * @date: 2020/11/23 10:51
@@ -13,6 +12,11 @@ case class CaseClass2JsonHelper[A <: AnyRef](that: A) {
     if (that == null) {
       return null
     }
-    Serialization.write(that)
+    that match {
+      case result: Result =>
+        result.toJsonString()
+      case _ =>
+        Serialization.write(that)
+    }
   }
 }

+ 1 - 1
src/main/scala/com/winhc/bigdata/flink/implicits/HbaseResultHelper.scala

@@ -25,7 +25,7 @@ case class HbaseResultHelper(result: Result) {
         val v = Bytes.toString(cell.getValueArray, cell.getValueOffset, cell.getValueLength)
         map +=  key-> v
       }
-      map += "rowkey" -> rowkey
+      map += "ROWKEY" -> rowkey
       map.toJson
     }
   }

+ 1 - 0
src/main/scala/com/winhc/bigdata/flink/implicits/package.scala

@@ -4,5 +4,6 @@ import org.apache.hadoop.hbase.client.Result
 
 package object implicits {
   implicit def caseClass2JsonEnhancer(result: Result) = HbaseResultHelper(result)
+
   implicit def caseClass2JsonEnhancer[A <: AnyRef](that: A): CaseClass2JsonHelper[A] = CaseClass2JsonHelper(that)
 }