HbaseAsyncFunction.scala 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. package com.winhc.bigdata.flink.func
  2. import com.alibaba.ververica.connector.cloudhbase.util.HBaseConfigurationUtil
  3. import com.winhc.bigdata.flink.config.HbaseConfig
  4. import com.winhc.bigdata.flink.implicits._
  5. import org.apache.flink.configuration.Configuration
  6. import org.apache.flink.streaming.api.scala.async.{ResultFuture, RichAsyncFunction}
  7. import org.apache.hadoop.hbase.TableName
  8. import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Get, HTable}
  9. import org.slf4j.LoggerFactory
  10. /**
  11. * @author: XuJiakai
  12. * @date: 2021/8/31 17:07
  13. */
  14. case class HbaseAsyncFunction() extends RichAsyncFunction[(String, String), (String, String, String)] {
  15. private val LOG = LoggerFactory.getLogger(classOf[HbaseAsyncFunction])
  16. @transient private var connection: Connection = null
  17. override def open(parameters: Configuration): Unit = {
  18. import com.alibaba.ververica.connector.shaded.hadoop3.org.apache.hadoop.conf.Configuration
  19. val configuration: Configuration = HbaseConfig.getHbaseConfiguration()
  20. val config: Configuration = HBaseConfigurationUtil.prepareRuntimeConfiguration(HBaseConfigurationUtil.serializeConfiguration(configuration), LOG)
  21. connection = ConnectionFactory.createConnection(config)
  22. }
  23. override def asyncInvoke(input: (String, String), resultFuture: ResultFuture[(String, String, String)]): Unit = {
  24. val tn = input._1
  25. val rowkey = input._2
  26. try {
  27. val table = connection.getTable(TableName.valueOf(tn.toUpperCase())).asInstanceOf[HTable]
  28. val get = new Get(rowkey.getBytes)
  29. val result = table.get(get)
  30. val string = result.toJsonString
  31. resultFuture.complete(Seq((tn, rowkey, string)))
  32. } catch {
  33. case ex: Exception => {
  34. resultFuture.completeExceptionally(ex)
  35. }
  36. }
  37. }
  38. // override def timeout(input: (String, String), resultFuture: ResultFuture[(String, String, String)]): Unit = {
  39. //
  40. // }
  41. override def close(): Unit = {
  42. if (connection != null) {
  43. connection.close()
  44. }
  45. }
  46. }