HoloSinkBuilder.scala 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. package com.winhc.bigdata.flink.sink
  2. import com.alibaba.ververica.connectors.common.sink.OutputFormatSinkFunction
  3. import com.alibaba.ververica.connectors.hologres.api.{AbstractHologresWriter, HologresTableSchema}
  4. import com.alibaba.ververica.connectors.hologres.config.{HologresConfigs, HologresConnectionParam}
  5. import com.alibaba.ververica.connectors.hologres.jdbc.HologresJDBCWriter
  6. import com.alibaba.ververica.connectors.hologres.rpc.HologresRpcWriter
  7. import com.alibaba.ververica.connectors.hologres.sink.HologresSinkFunction
  8. import com.alibaba.ververica.connectors.hologres.utils.HologresUtils
  9. import com.winhc.bigdata.filnk.java.constant.EnvConst
  10. import org.apache.flink.configuration.Configuration
  11. import org.apache.flink.table.api.{DataTypes, TableSchema}
  12. import org.apache.flink.table.data.RowData
  13. import org.apache.flink.table.types.DataType
  14. import org.postgresql.model
  15. import org.postgresql.model.Column
  16. /**
  17. * @author: XuJiakai
  18. * @date: 2021/9/3 16:54
  19. */
  20. object HoloSinkBuilder {
  21. def builderSinkFunction(tableName: String): OutputFormatSinkFunction[RowData] = {
  22. val config = new Configuration()
  23. config.setString(HologresConfigs.TABLE, tableName)
  24. config.setString(HologresConfigs.ENDPOINT, EnvConst.getValue("winhc.holo.endpoint"))
  25. config.setString(HologresConfigs.OPTIONAL_WRITE_BATCH_SIZE.key(), EnvConst.getValue("winhc.holo.batch-size"))
  26. config.setString(HologresConfigs.USERNAME, EnvConst.getValue("winhc.holo.username"))
  27. config.setString(HologresConfigs.PASSWORD, EnvConst.getValue("winhc.holo.password"))
  28. config.setString(HologresConfigs.DATABASE, EnvConst.getValue("winhc.holo.database"))
  29. config.setString(HologresConfigs.RPC_RETRIES.key(), EnvConst.getValue("winhc.holo.rpc-retries"))
  30. config.setString(HologresConfigs.USE_RPC_MODE.key(), EnvConst.getValue("winhc.holo.user-rpc-mode"))
  31. config.setString(HologresConfigs.MUTATE_TYPE, EnvConst.getValue("winhc.holo.mutate-type"))
  32. config.setString(HologresConfigs.OPTIONAL_SINK_IGNORE_DELETE.key(), EnvConst.getValue("winhc.holo.ignore-delete"))
  33. val connectionParam = new HologresConnectionParam(config)
  34. val writer = buildHoloWriter(config, connectionParam)
  35. new HologresSinkFunction(connectionParam, writer)
  36. }
  37. private def getHoloSchema(tableSchema: HologresTableSchema): TableSchema = {
  38. val schema: model.TableSchema = tableSchema.get()
  39. val builder = TableSchema.builder()
  40. for (elem: Column <- schema.getColumnSchema.toList) {
  41. builder.field(elem.getName, switchHoldSchema(elem))
  42. }
  43. builder.build()
  44. }
  45. private def buildHoloWriter(config: Configuration, hologresConnectionParam: HologresConnectionParam): AbstractHologresWriter[RowData] = {
  46. val tableSchema = HologresTableSchema.get(hologresConnectionParam)
  47. val schema = getHoloSchema(tableSchema)
  48. var writer: AbstractHologresWriter[RowData] = null
  49. if (HologresUtils.shouldUseRpc(config)) writer = HologresRpcWriter.createTableWriter(hologresConnectionParam, schema, tableSchema)
  50. else writer = HologresJDBCWriter.createTableWriter(hologresConnectionParam, schema, tableSchema)
  51. writer
  52. }
  53. private def switchHoldSchema(col: Column): DataType = {
  54. val t = col.getTypeName.toLowerCase
  55. t match {
  56. case "string" => DataTypes.STRING()
  57. case "int" => DataTypes.INT()
  58. case "double" => DataTypes.DOUBLE()
  59. case "bigint" => DataTypes.BIGINT()
  60. case "date" => DataTypes.DATE()
  61. case _ => {
  62. println("not fount type: " + t)
  63. DataTypes.STRING()
  64. }
  65. }
  66. }
  67. }