1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253 |
- package com.winhc.bigdata.flink.func
- import com.alibaba.ververica.connector.cloudhbase.util.HBaseConfigurationUtil
- import com.winhc.bigdata.flink.config.HbaseConfig
- import com.winhc.bigdata.flink.implicits._
- import org.apache.flink.configuration.Configuration
- import org.apache.flink.streaming.api.scala.async.{ResultFuture, RichAsyncFunction}
- import org.apache.hadoop.hbase.TableName
- import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Get, HTable}
- import org.slf4j.LoggerFactory
- /**
- * @author: XuJiakai
- * @date: 2021/8/31 17:07
- */
- case class HbaseAsyncFunction() extends RichAsyncFunction[(String, String), (String, String, String)] {
- private val LOG = LoggerFactory.getLogger(classOf[HbaseAsyncFunction])
- @transient private var connection: Connection = null
- override def open(parameters: Configuration): Unit = {
- import com.alibaba.ververica.connector.shaded.hadoop3.org.apache.hadoop.conf.Configuration
- val configuration: Configuration = HbaseConfig.getHbaseConfiguration()
- val config: Configuration = HBaseConfigurationUtil.prepareRuntimeConfiguration(HBaseConfigurationUtil.serializeConfiguration(configuration), LOG)
- connection = ConnectionFactory.createConnection(config)
- }
- override def asyncInvoke(input: (String, String), resultFuture: ResultFuture[(String, String, String)]): Unit = {
- val tn = input._1
- val rowkey = input._2
- try {
- val table = connection.getTable(TableName.valueOf(tn.toUpperCase())).asInstanceOf[HTable]
- val get = new Get(rowkey.getBytes)
- val result = table.get(get)
- val string = result.toJsonString
- resultFuture.complete(Seq((tn, rowkey, string)))
- } catch {
- case ex: Exception => {
- resultFuture.completeExceptionally(ex)
- }
- }
- }
- // override def timeout(input: (String, String), resultFuture: ResultFuture[(String, String, String)]): Unit = {
- //
- // }
- override def close(): Unit = {
- if (connection != null) {
- connection.close()
- }
- }
- }
|