package com.winhc.bigdata.flink.sink import com.alibaba.ververica.connectors.common.sink.OutputFormatSinkFunction import com.alibaba.ververica.connectors.hologres.api.{AbstractHologresWriter, HologresTableSchema} import com.alibaba.ververica.connectors.hologres.config.{HologresConfigs, HologresConnectionParam} import com.alibaba.ververica.connectors.hologres.jdbc.HologresJDBCWriter import com.alibaba.ververica.connectors.hologres.rpc.HologresRpcWriter import com.alibaba.ververica.connectors.hologres.sink.HologresSinkFunction import com.alibaba.ververica.connectors.hologres.utils.HologresUtils import com.winhc.bigdata.filnk.java.constant.EnvConst import org.apache.flink.configuration.Configuration import org.apache.flink.table.api.{DataTypes, TableSchema} import org.apache.flink.table.data.RowData import org.apache.flink.table.types.DataType import org.postgresql.model import org.postgresql.model.Column /** * @author: XuJiakai * @date: 2021/9/3 16:54 */ object HoloSinkBuilder { def builderSinkFunction(tableName: String): OutputFormatSinkFunction[RowData] = { val config = new Configuration() config.setString(HologresConfigs.TABLE, tableName) config.setString(HologresConfigs.ENDPOINT, EnvConst.getValue("winhc.holo.endpoint")) config.setString(HologresConfigs.OPTIONAL_WRITE_BATCH_SIZE.key(), EnvConst.getValue("winhc.holo.batch-size")) config.setString(HologresConfigs.USERNAME, EnvConst.getValue("winhc.holo.username")) config.setString(HologresConfigs.PASSWORD, EnvConst.getValue("winhc.holo.password")) config.setString(HologresConfigs.DATABASE, EnvConst.getValue("winhc.holo.database")) config.setString(HologresConfigs.RPC_RETRIES.key(), EnvConst.getValue("winhc.holo.rpc-retries")) config.setString(HologresConfigs.USE_RPC_MODE.key(), EnvConst.getValue("winhc.holo.user-rpc-mode")) config.setString(HologresConfigs.MUTATE_TYPE, EnvConst.getValue("winhc.holo.mutate-type")) config.setString(HologresConfigs.OPTIONAL_SINK_IGNORE_DELETE.key(), EnvConst.getValue("winhc.holo.ignore-delete")) val connectionParam = new HologresConnectionParam(config) val writer = buildHoloWriter(config, connectionParam) new HologresSinkFunction(connectionParam, writer) } private def getHoloSchema(tableSchema: HologresTableSchema): TableSchema = { val schema: model.TableSchema = tableSchema.get() val builder = TableSchema.builder() for (elem: Column <- schema.getColumnSchema.toList) { builder.field(elem.getName, switchHoldSchema(elem)) } builder.build() } private def buildHoloWriter(config: Configuration, hologresConnectionParam: HologresConnectionParam): AbstractHologresWriter[RowData] = { val tableSchema = HologresTableSchema.get(hologresConnectionParam) val schema = getHoloSchema(tableSchema) var writer: AbstractHologresWriter[RowData] = null if (HologresUtils.shouldUseRpc(config)) writer = HologresRpcWriter.createTableWriter(hologresConnectionParam, schema, tableSchema) else writer = HologresJDBCWriter.createTableWriter(hologresConnectionParam, schema, tableSchema) writer } private def switchHoldSchema(col: Column): DataType = { val t = col.getTypeName.toLowerCase t match { case "string" => DataTypes.STRING() case "int" => DataTypes.INT() case "double" => DataTypes.DOUBLE() case "bigint" => DataTypes.BIGINT() case "date" => DataTypes.DATE() case _ => { println("not fount type: " + t) DataTypes.STRING() } } } }