MyTest.scala 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738
  1. package com.winhc.bigdata.flink.test
  2. import com.winhc.bigdata.flink.java.sink.OdpsSinkBuilder4J
  3. import org.apache.flink.api.common.functions.MapFunction
  4. import org.apache.flink.api.java.tuple.Tuple2
  5. import org.apache.flink.streaming.api.functions.sink.SinkFunction
  6. import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
  7. import org.apache.flink.types.Row
  8. /**
  9. * @author: XuJiakai
  10. * @date: 2021/9/1 10:15
  11. */
  12. object MyTest {
  13. def main(args: Array[String]): Unit = {
  14. val env = StreamExecutionEnvironment.getExecutionEnvironment
  15. val odpsSink: SinkFunction[Tuple2[Boolean, Row]] = OdpsSinkBuilder4J.buildSinkFunction("xjk_test_test",null).asInstanceOf[SinkFunction[Tuple2[Boolean,Row]]]
  16. val odpsSink2: SinkFunction[Tuple2[Boolean, Row]] = OdpsSinkBuilder4J.buildSinkFunction("xjk_test_test_2",null).asInstanceOf[SinkFunction[Tuple2[Boolean,Row]]]
  17. import org.apache.flink.api.scala._
  18. val value: DataStream[Tuple2[Boolean, Row]] = env.fromCollection(Seq(
  19. Row.of(Long.box(123), Double.box(123.1d)),
  20. Row.of(Long.box(456), Double.box(123.2d))
  21. ))
  22. .map(new MapFunction[Row, Tuple2[Boolean, Row]]() {
  23. @throws[Exception]
  24. override def map(row: Row): Tuple2[Boolean, Row] = new Tuple2(true, row)
  25. })
  26. value.addSink(odpsSink)
  27. value.addSink(odpsSink2)
  28. env.execute("test job")
  29. }
  30. }