123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475 |
- package com.winhc.bigdata.flink.sink
- import com.alibaba.ververica.connectors.common.sink.OutputFormatSinkFunction
- import com.alibaba.ververica.connectors.hologres.api.{AbstractHologresWriter, HologresTableSchema}
- import com.alibaba.ververica.connectors.hologres.config.{HologresConfigs, HologresConnectionParam}
- import com.alibaba.ververica.connectors.hologres.jdbc.HologresJDBCWriter
- import com.alibaba.ververica.connectors.hologres.rpc.HologresRpcWriter
- import com.alibaba.ververica.connectors.hologres.sink.HologresSinkFunction
- import com.alibaba.ververica.connectors.hologres.utils.HologresUtils
- import com.winhc.bigdata.filnk.java.constant.EnvConst
- import org.apache.flink.configuration.Configuration
- import org.apache.flink.table.api.{DataTypes, TableSchema}
- import org.apache.flink.table.data.RowData
- import org.apache.flink.table.types.DataType
- import org.postgresql.model
- import org.postgresql.model.Column
- /**
- * @author: XuJiakai
- * @date: 2021/9/3 16:54
- */
- object HoloSinkBuilder {
- def builderSinkFunction(tableName: String): OutputFormatSinkFunction[RowData] = {
- val config = new Configuration()
- config.setString(HologresConfigs.TABLE, tableName)
- config.setString(HologresConfigs.ENDPOINT, EnvConst.getValue("winhc.holo.endpoint"))
- config.setString(HologresConfigs.OPTIONAL_WRITE_BATCH_SIZE.key(), EnvConst.getValue("winhc.holo.batch-size"))
- config.setString(HologresConfigs.USERNAME, EnvConst.getValue("winhc.holo.username"))
- config.setString(HologresConfigs.PASSWORD, EnvConst.getValue("winhc.holo.password"))
- config.setString(HologresConfigs.DATABASE, EnvConst.getValue("winhc.holo.database"))
- config.setString(HologresConfigs.RPC_RETRIES.key(), EnvConst.getValue("winhc.holo.rpc-retries"))
- config.setString(HologresConfigs.USE_RPC_MODE.key(), EnvConst.getValue("winhc.holo.user-rpc-mode"))
- config.setString(HologresConfigs.MUTATE_TYPE, EnvConst.getValue("winhc.holo.mutate-type"))
- config.setString(HologresConfigs.OPTIONAL_SINK_IGNORE_DELETE.key(), EnvConst.getValue("winhc.holo.ignore-delete"))
- val connectionParam = new HologresConnectionParam(config)
- val writer = buildHoloWriter(config, connectionParam)
- new HologresSinkFunction(connectionParam, writer)
- }
- private def getHoloSchema(tableSchema: HologresTableSchema): TableSchema = {
- val schema: model.TableSchema = tableSchema.get()
- val builder = TableSchema.builder()
- for (elem: Column <- schema.getColumnSchema.toList) {
- builder.field(elem.getName, switchHoldSchema(elem))
- }
- builder.build()
- }
- private def buildHoloWriter(config: Configuration, hologresConnectionParam: HologresConnectionParam): AbstractHologresWriter[RowData] = {
- val tableSchema = HologresTableSchema.get(hologresConnectionParam)
- val schema = getHoloSchema(tableSchema)
- var writer: AbstractHologresWriter[RowData] = null
- if (HologresUtils.shouldUseRpc(config)) writer = HologresRpcWriter.createTableWriter(hologresConnectionParam, schema, tableSchema)
- else writer = HologresJDBCWriter.createTableWriter(hologresConnectionParam, schema, tableSchema)
- writer
- }
- private def switchHoldSchema(col: Column): DataType = {
- val t = col.getTypeName.toLowerCase
- t match {
- case "string" => DataTypes.STRING()
- case "int" => DataTypes.INT()
- case "double" => DataTypes.DOUBLE()
- case "bigint" => DataTypes.BIGINT()
- case "date" => DataTypes.DATE()
- case _ => {
- println("not fount type: " + t)
- DataTypes.STRING()
- }
- }
- }
- }
|