HbaseAsyncFunction.scala 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. package com.winhc.bigdata.flink.func
  2. import com.winhc.bigdata.flink.config.HbaseConfig
  3. import com.winhc.bigdata.flink.event.UpdateEntity
  4. import com.winhc.bigdata.flink.implicits._
  5. import com.winhc.bigdata.flink.utils.BaseUtils
  6. import org.apache.flink.api.java.utils.ParameterTool
  7. import org.apache.flink.configuration.{ConfigOptions, Configuration}
  8. import org.apache.flink.streaming.api.scala.async.{ResultFuture, RichAsyncFunction}
  9. import org.apache.hadoop.hbase.TableName
  10. import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Get, HTable}
  11. import org.json4s.JsonAST.JObject
  12. import org.json4s.jackson.JsonMethods._
  13. import org.slf4j.LoggerFactory
  14. import scala.collection.JavaConverters._
  15. import scala.concurrent.ExecutionContext.Implicits.global
  16. import scala.concurrent.Future
  17. import scala.language.postfixOps
  18. import scala.util.{Failure, Success}
  19. /**
  20. * @author: XuJiakai
  21. * @date: 2021/8/31 17:07
  22. */
  23. case class HbaseAsyncFunction() extends RichAsyncFunction[UpdateEntity, UpdateEntity] {
  24. private val LOG = LoggerFactory.getLogger(classOf[HbaseAsyncFunction])
  25. @transient private var connection: Connection = _
  26. override def open(parameters: Configuration): Unit = {
  27. /* import com.alibaba.ververica.connector.shaded.hadoop3.org.apache.hadoop.conf.Configuration
  28. val configuration: Configuration = HbaseConfig.getHbaseConfiguration()
  29. val config: Configuration = HBaseConfigurationUtil.prepareRuntimeConfiguration(HBaseConfigurationUtil.serializeConfiguration(configuration), LOG)
  30. connection = ConnectionFactory.createConnection(config)
  31. */ val config=getRuntimeContext.getExecutionConfig.getGlobalJobParameters.asInstanceOf[ParameterTool]
  32. val configuration = HbaseConfig.getHbaseConfiguration(config, Seq(
  33. "winhc.hbase.config.hbase.zookeeper.quorum",
  34. "winhc.hbase.config.hbase.client.scanner.timeout.period",
  35. "winhc.hbase.config.hbase.client.retries.number",
  36. "winhc.hbase.config.hbase.client.pause",
  37. "winhc.hbase.config.hbase.client.max.perserver.tasks",
  38. "winhc.hbase.config.hbase.client.max.perregion.tasks",
  39. "winhc.hbase.config.hbase.client.keyvalue.maxsize",
  40. "winhc.hbase.config.hbase.client.ipc.pool.size",
  41. "winhc.hbase.config.zookeeper.recovery.retry"
  42. ))
  43. connection = ConnectionFactory.createConnection(configuration)
  44. }
  45. /**
  46. * 内部函数必须是异步,非异步有bug
  47. *
  48. * @param input tn, rowkey
  49. * @param resultFuture result
  50. */
  51. override def asyncInvoke(input: UpdateEntity, resultFuture: ResultFuture[UpdateEntity]): Unit = {
  52. Future {
  53. val company = input.company
  54. val companyId = input.companyId()
  55. if (company != null && companyId != null) {
  56. val companyTable = connection.getTable(TableName.valueOf("NG_COMPANY")).asInstanceOf[HTable]
  57. val get = new Get(companyId.getBytes)
  58. val companyResult = companyTable.get(get)
  59. if(companyResult!=null){
  60. val oldCompany = parse(companyResult.toJson()).asInstanceOf[JObject]
  61. input.oldCompany = oldCompany
  62. }
  63. companyTable.close()
  64. }
  65. input.dims.foreach {
  66. t => {
  67. val (k, v) = t
  68. val gs = v.data.keys.map(key => new Get(key.getBytes)).toList.asJava
  69. val ht = connection.getTable(TableName.valueOf(BaseUtils.hbaseTableName(k))).asInstanceOf[HTable]
  70. val rs = ht.get(gs)
  71. val rss = rs.map(_.toJson).map(parse(_).asInstanceOf[JObject])
  72. val rowkeys = rs.map(r => new String(r.getRow))
  73. val zrs=rowkeys zip rss toMap
  74. v.old=zrs
  75. }
  76. }
  77. } onComplete {
  78. case Success(r) => resultFuture.complete(Seq(input))
  79. case Failure(ex) => resultFuture.completeExceptionally(ex)
  80. }
  81. }
  82. override def close(): Unit = {
  83. if (connection != null) {
  84. connection.close()
  85. }
  86. }
  87. }