package com.winhc.bigdata.flink.func import com.winhc.bigdata.flink.config.HbaseConfig import com.winhc.bigdata.flink.event.UpdateEntity import com.winhc.bigdata.flink.implicits._ import com.winhc.bigdata.flink.utils.BaseUtils import org.apache.flink.configuration.{ConfigOptions, Configuration} import org.apache.flink.streaming.api.scala.async.{ResultFuture, RichAsyncFunction} import org.apache.hadoop.hbase.TableName import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Get, HTable} import org.json4s.JsonAST.JObject import org.json4s.jackson.JsonMethods._ import org.slf4j.LoggerFactory import scala.collection.JavaConverters._ import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future import scala.language.postfixOps import scala.util.{Failure, Success} /** * @author: XuJiakai * @date: 2021/8/31 17:07 */ case class HbaseAsyncFunction() extends RichAsyncFunction[UpdateEntity, UpdateEntity] { private val LOG = LoggerFactory.getLogger(classOf[HbaseAsyncFunction]) @transient private var connection: Connection = _ override def open(parameters: Configuration): Unit = { /* import com.alibaba.ververica.connector.shaded.hadoop3.org.apache.hadoop.conf.Configuration val configuration: Configuration = HbaseConfig.getHbaseConfiguration() val config: Configuration = HBaseConfigurationUtil.prepareRuntimeConfiguration(HBaseConfigurationUtil.serializeConfiguration(configuration), LOG) connection = ConnectionFactory.createConnection(config) */ val quorum = ConfigOptions.key("winhc.hbase.config.hbase.zookeeper.quorum").stringType().defaultValue("hb-proxy-pub-uf6m8e1nu4ivp06m5-master1-001.hbase.rds.aliyuncs.com:2181,hb-proxy-pub-uf6m8e1nu4ivp06m5-master2-001.hbase.rds.aliyuncs.com:2181,hb-proxy-pub-uf6m8e1nu4ivp06m5-master3-001.hbase.rds.aliyuncs.com:2181") val period = ConfigOptions.key("winhc.hbase.config.hbase.client.scanner.timeout.period").stringType().defaultValue("120000") val number = ConfigOptions.key("winhc.hbase.config.hbase.client.retries.number").stringType().defaultValue("5") val pause = ConfigOptions.key("winhc.hbase.config.hbase.client.pause").stringType().defaultValue("1000") val perserver_tasks = ConfigOptions.key("winhc.hbase.config.hbase.client.max.perserver.tasks").stringType().defaultValue("10") val perregion_tasks = ConfigOptions.key("winhc.hbase.config.hbase.client.max.perregion.tasks").stringType().defaultValue("10") val maxsize = ConfigOptions.key("winhc.hbase.config.hbase.client.keyvalue.maxsize").stringType().defaultValue("524288000") val size = ConfigOptions.key("winhc.hbase.config.base.client.ipc.pool.size").stringType().defaultValue("5") val retry = ConfigOptions.key("winhc.hbase.config.zookeeper.recovery.retry").stringType().defaultValue("5") val configuration = HbaseConfig.getHbaseConfiguration(parameters, Seq( quorum, period, number, pause, perserver_tasks, perregion_tasks, maxsize, size, retry )) connection = ConnectionFactory.createConnection(configuration) } /** * 内部函数必须是异步,非异步有bug * * @param input tn, rowkey * @param resultFuture result */ override def asyncInvoke(input: UpdateEntity, resultFuture: ResultFuture[UpdateEntity]): Unit = { Future { val company = input.company val companyId = input.companyId() if (company != null && companyId != null) { val companyTable = connection.getTable(TableName.valueOf("NG_COMPANY")).asInstanceOf[HTable] val get = new Get(companyId.getBytes) val companyResult = companyTable.get(get) val oldCompany = parse(companyResult.toJson()).asInstanceOf[JObject] input.oldCompany = oldCompany companyTable.close() } input.dims.foreach { t => { val (k, v) = t val gs = v.data.keys.map(key => new Get(key.getBytes)).toList.asJava val ht = connection.getTable(TableName.valueOf(BaseUtils.hbaseTableName(k))).asInstanceOf[HTable] val rs = ht.get(gs) val rss = rs.map(_.toJson).map(parse(_).asInstanceOf[JObject]) val rowkeys = rs.map(r => new String(r.getRow)) val zrs=rowkeys zip rss toMap v.old=zrs } } } onComplete { case Success(r) => resultFuture.complete(Seq(input)) case Failure(ex) => resultFuture.completeExceptionally(ex) } } override def close(): Unit = { if (connection != null) { connection.close() } } }