HbaseAsyncFunction.scala 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. package com.winhc.bigdata.flink.func
  2. import com.winhc.bigdata.flink.config.HbaseConfig
  3. import com.winhc.bigdata.flink.event.UpdateEntity
  4. import com.winhc.bigdata.flink.implicits._
  5. import com.winhc.bigdata.flink.utils.BaseUtils
  6. import org.apache.flink.configuration.{ConfigOptions, Configuration}
  7. import org.apache.flink.streaming.api.scala.async.{ResultFuture, RichAsyncFunction}
  8. import org.apache.hadoop.hbase.TableName
  9. import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Get, HTable}
  10. import org.json4s.JsonAST.JObject
  11. import org.json4s.jackson.JsonMethods._
  12. import org.slf4j.LoggerFactory
  13. import scala.collection.JavaConverters._
  14. import scala.concurrent.ExecutionContext.Implicits.global
  15. import scala.concurrent.Future
  16. import scala.language.postfixOps
  17. import scala.util.{Failure, Success}
  18. /**
  19. * @author: XuJiakai
  20. * @date: 2021/8/31 17:07
  21. */
  22. case class HbaseAsyncFunction() extends RichAsyncFunction[UpdateEntity, UpdateEntity] {
  23. private val LOG = LoggerFactory.getLogger(classOf[HbaseAsyncFunction])
  24. @transient private var connection: Connection = _
  25. override def open(parameters: Configuration): Unit = {
  26. /* import com.alibaba.ververica.connector.shaded.hadoop3.org.apache.hadoop.conf.Configuration
  27. val configuration: Configuration = HbaseConfig.getHbaseConfiguration()
  28. val config: Configuration = HBaseConfigurationUtil.prepareRuntimeConfiguration(HBaseConfigurationUtil.serializeConfiguration(configuration), LOG)
  29. connection = ConnectionFactory.createConnection(config)
  30. */ val quorum = ConfigOptions.key("winhc.hbase.config.hbase.zookeeper.quorum").stringType().defaultValue("hb-proxy-pub-uf6m8e1nu4ivp06m5-master1-001.hbase.rds.aliyuncs.com:2181,hb-proxy-pub-uf6m8e1nu4ivp06m5-master2-001.hbase.rds.aliyuncs.com:2181,hb-proxy-pub-uf6m8e1nu4ivp06m5-master3-001.hbase.rds.aliyuncs.com:2181")
  31. val period = ConfigOptions.key("winhc.hbase.config.hbase.client.scanner.timeout.period").stringType().defaultValue("120000")
  32. val number = ConfigOptions.key("winhc.hbase.config.hbase.client.retries.number").stringType().defaultValue("5")
  33. val pause = ConfigOptions.key("winhc.hbase.config.hbase.client.pause").stringType().defaultValue("1000")
  34. val perserver_tasks = ConfigOptions.key("winhc.hbase.config.hbase.client.max.perserver.tasks").stringType().defaultValue("10")
  35. val perregion_tasks = ConfigOptions.key("winhc.hbase.config.hbase.client.max.perregion.tasks").stringType().defaultValue("10")
  36. val maxsize = ConfigOptions.key("winhc.hbase.config.hbase.client.keyvalue.maxsize").stringType().defaultValue("524288000")
  37. val size = ConfigOptions.key("winhc.hbase.config.base.client.ipc.pool.size").stringType().defaultValue("5")
  38. val retry = ConfigOptions.key("winhc.hbase.config.zookeeper.recovery.retry").stringType().defaultValue("5")
  39. val configuration = HbaseConfig.getHbaseConfiguration(parameters, Seq(
  40. quorum,
  41. period,
  42. number,
  43. pause,
  44. perserver_tasks,
  45. perregion_tasks,
  46. maxsize,
  47. size,
  48. retry
  49. ))
  50. connection = ConnectionFactory.createConnection(configuration)
  51. }
  52. /**
  53. * 内部函数必须是异步,非异步有bug
  54. *
  55. * @param input tn, rowkey
  56. * @param resultFuture result
  57. */
  58. override def asyncInvoke(input: UpdateEntity, resultFuture: ResultFuture[UpdateEntity]): Unit = {
  59. Future {
  60. val company = input.company
  61. val companyId = input.companyId()
  62. if (company != null && companyId != null) {
  63. val companyTable = connection.getTable(TableName.valueOf("NG_COMPANY")).asInstanceOf[HTable]
  64. val get = new Get(companyId.getBytes)
  65. val companyResult = companyTable.get(get)
  66. val oldCompany = parse(companyResult.toJson()).asInstanceOf[JObject]
  67. input.oldCompany = oldCompany
  68. companyTable.close()
  69. }
  70. input.dims.foreach {
  71. t => {
  72. val (k, v) = t
  73. val gs = v.data.keys.map(key => new Get(key.getBytes)).toList.asJava
  74. val ht = connection.getTable(TableName.valueOf(BaseUtils.hbaseTableName(k))).asInstanceOf[HTable]
  75. val rs = ht.get(gs)
  76. val rss = rs.map(_.toJson).map(parse(_).asInstanceOf[JObject])
  77. val rowkeys = rs.map(r => new String(r.getRow))
  78. val zrs=rowkeys zip rss toMap
  79. v.old=zrs
  80. }
  81. }
  82. } onComplete {
  83. case Success(r) => resultFuture.complete(Seq(input))
  84. case Failure(ex) => resultFuture.completeExceptionally(ex)
  85. }
  86. }
  87. override def close(): Unit = {
  88. if (connection != null) {
  89. connection.close()
  90. }
  91. }
  92. }