123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102 |
- package com.winhc.bigdata.flink.func
- import com.winhc.bigdata.flink.config.HbaseConfig
- import com.winhc.bigdata.flink.event.UpdateEntity
- import com.winhc.bigdata.flink.implicits._
- import com.winhc.bigdata.flink.utils.BaseUtils
- import org.apache.flink.api.java.utils.ParameterTool
- import org.apache.flink.configuration.{ConfigOptions, Configuration}
- import org.apache.flink.streaming.api.scala.async.{ResultFuture, RichAsyncFunction}
- import org.apache.hadoop.hbase.TableName
- import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Get, HTable}
- import org.json4s.JsonAST.JObject
- import org.json4s.jackson.JsonMethods._
- import org.slf4j.LoggerFactory
- import scala.collection.JavaConverters._
- import scala.concurrent.ExecutionContext.Implicits.global
- import scala.concurrent.Future
- import scala.language.postfixOps
- import scala.util.{Failure, Success}
- /**
- * @author: XuJiakai
- * @date: 2021/8/31 17:07
- */
- case class HbaseAsyncFunction() extends RichAsyncFunction[UpdateEntity, UpdateEntity] {
- private val LOG = LoggerFactory.getLogger(classOf[HbaseAsyncFunction])
- @transient private var connection: Connection = _
- override def open(parameters: Configuration): Unit = {
- /* import com.alibaba.ververica.connector.shaded.hadoop3.org.apache.hadoop.conf.Configuration
- val configuration: Configuration = HbaseConfig.getHbaseConfiguration()
- val config: Configuration = HBaseConfigurationUtil.prepareRuntimeConfiguration(HBaseConfigurationUtil.serializeConfiguration(configuration), LOG)
- connection = ConnectionFactory.createConnection(config)
- */ val config=getRuntimeContext.getExecutionConfig.getGlobalJobParameters.asInstanceOf[ParameterTool]
- val configuration = HbaseConfig.getHbaseConfiguration(config, Seq(
- "winhc.hbase.config.hbase.zookeeper.quorum",
- "winhc.hbase.config.hbase.client.scanner.timeout.period",
- "winhc.hbase.config.hbase.client.retries.number",
- "winhc.hbase.config.hbase.client.pause",
- "winhc.hbase.config.hbase.client.max.perserver.tasks",
- "winhc.hbase.config.hbase.client.max.perregion.tasks",
- "winhc.hbase.config.hbase.client.keyvalue.maxsize",
- "winhc.hbase.config.hbase.client.ipc.pool.size",
- "winhc.hbase.config.zookeeper.recovery.retry"
- ))
- connection = ConnectionFactory.createConnection(configuration)
- }
- /**
- * 内部函数必须是异步,非异步有bug
- *
- * @param input tn, rowkey
- * @param resultFuture result
- */
- override def asyncInvoke(input: UpdateEntity, resultFuture: ResultFuture[UpdateEntity]): Unit = {
- Future {
- val company = input.company
- val companyId = input.companyId()
- if (company != null && companyId != null) {
- val companyTable = connection.getTable(TableName.valueOf("NG_COMPANY")).asInstanceOf[HTable]
- val get = new Get(companyId.getBytes)
- val companyResult = companyTable.get(get)
- if(companyResult!=null){
- val oldCompany = parse(companyResult.toJson()).asInstanceOf[JObject]
- input.oldCompany = oldCompany
- }
- companyTable.close()
- }
- input.dims.foreach {
- t => {
- val (k, v) = t
- val gs = v.data.keys.map(key => new Get(key.getBytes)).toList.asJava
- val ht = connection.getTable(TableName.valueOf(BaseUtils.hbaseTableName(k))).asInstanceOf[HTable]
- val rs = ht.get(gs)
- val rss = rs.map(_.toJson).map(parse(_).asInstanceOf[JObject])
- val rowkeys = rs.map(r => new String(r.getRow))
- val zrs=rowkeys zip rss toMap
- v.old=zrs
- }
- }
- } onComplete {
- case Success(r) => resultFuture.complete(Seq(input))
- case Failure(ex) => resultFuture.completeExceptionally(ex)
- }
- }
- override def close(): Unit = {
- if (connection != null) {
- connection.close()
- }
- }
- }
|