package com.winhc.bigdata.flink.test import com.winhc.bigdata.flink.java.sink.OdpsSinkBuilder4J import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.java.tuple.Tuple2 import org.apache.flink.streaming.api.functions.sink.SinkFunction import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.types.Row /** * @author: XuJiakai * @date: 2021/9/1 10:15 */ object MyTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val odpsSink: SinkFunction[Tuple2[Boolean, Row]] = OdpsSinkBuilder4J.buildSinkFunction("xjk_test_test",null).asInstanceOf[SinkFunction[Tuple2[Boolean,Row]]] val odpsSink2: SinkFunction[Tuple2[Boolean, Row]] = OdpsSinkBuilder4J.buildSinkFunction("xjk_test_test_2",null).asInstanceOf[SinkFunction[Tuple2[Boolean,Row]]] import org.apache.flink.api.scala._ val value: DataStream[Tuple2[Boolean, Row]] = env.fromCollection(Seq( Row.of(Long.box(123), Double.box(123.1d)), Row.of(Long.box(456), Double.box(123.2d)) )) .map(new MapFunction[Row, Tuple2[Boolean, Row]]() { @throws[Exception] override def map(row: Row): Tuple2[Boolean, Row] = new Tuple2(true, row) }) value.addSink(odpsSink) value.addSink(odpsSink2) env.execute("test job") } }