package com.winhc.bigdata.flink.func import com.alibaba.ververica.connector.cloudhbase.util.HBaseConfigurationUtil import com.winhc.bigdata.flink.config.HbaseConfig import com.winhc.bigdata.flink.implicits._ import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala.async.{ResultFuture, RichAsyncFunction} import org.apache.hadoop.hbase.TableName import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Get, HTable} import org.slf4j.LoggerFactory /** * @author: XuJiakai * @date: 2021/8/31 17:07 */ case class HbaseAsyncFunction() extends RichAsyncFunction[(String, String), (String, String, String)] { private val LOG = LoggerFactory.getLogger(classOf[HbaseAsyncFunction]) @transient private var connection: Connection = null override def open(parameters: Configuration): Unit = { import com.alibaba.ververica.connector.shaded.hadoop3.org.apache.hadoop.conf.Configuration val configuration: Configuration = HbaseConfig.getHbaseConfiguration() val config: Configuration = HBaseConfigurationUtil.prepareRuntimeConfiguration(HBaseConfigurationUtil.serializeConfiguration(configuration), LOG) connection = ConnectionFactory.createConnection(config) } override def asyncInvoke(input: (String, String), resultFuture: ResultFuture[(String, String, String)]): Unit = { val tn = input._1 val rowkey = input._2 try { val table = connection.getTable(TableName.valueOf(tn.toUpperCase())).asInstanceOf[HTable] val get = new Get(rowkey.getBytes) val result = table.get(get) val string = result.toJsonString resultFuture.complete(Seq((tn, rowkey, string))) } catch { case ex: Exception => { resultFuture.completeExceptionally(ex) } } } // override def timeout(input: (String, String), resultFuture: ResultFuture[(String, String, String)]): Unit = { // // } override def close(): Unit = { if (connection != null) { connection.close() } } }