1234567891011121314151617181920212223242526272829303132333435363738 |
- package com.winhc.bigdata.flink.test
- import com.winhc.bigdata.flink.java.sink.OdpsSinkBuilder4J
- import org.apache.flink.api.common.functions.MapFunction
- import org.apache.flink.api.java.tuple.Tuple2
- import org.apache.flink.streaming.api.functions.sink.SinkFunction
- import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
- import org.apache.flink.types.Row
- /**
- * @author: XuJiakai
- * @date: 2021/9/1 10:15
- */
- object MyTest {
- def main(args: Array[String]): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val odpsSink: SinkFunction[Tuple2[Boolean, Row]] = OdpsSinkBuilder4J.buildSinkFunction("xjk_test_test",null).asInstanceOf[SinkFunction[Tuple2[Boolean,Row]]]
- val odpsSink2: SinkFunction[Tuple2[Boolean, Row]] = OdpsSinkBuilder4J.buildSinkFunction("xjk_test_test_2",null).asInstanceOf[SinkFunction[Tuple2[Boolean,Row]]]
- import org.apache.flink.api.scala._
- val value: DataStream[Tuple2[Boolean, Row]] = env.fromCollection(Seq(
- Row.of(Long.box(123), Double.box(123.1d)),
- Row.of(Long.box(456), Double.box(123.2d))
- ))
- .map(new MapFunction[Row, Tuple2[Boolean, Row]]() {
- @throws[Exception]
- override def map(row: Row): Tuple2[Boolean, Row] = new Tuple2(true, row)
- })
- value.addSink(odpsSink)
- value.addSink(odpsSink2)
- env.execute("test job")
- }
- }
|