许家凯 3 éve
szülő
commit
428c810d93

+ 8 - 1
pom.xml

@@ -5,7 +5,7 @@
     <modelVersion>4.0.0</modelVersion>
 
     <groupId>com.winhc.bigdata</groupId>
-    <artifactId>flink-winhc_flink</artifactId>
+    <artifactId>winhc_flink</artifactId>
     <packaging>jar</packaging>
     <version>1.0</version>
 
@@ -152,6 +152,13 @@
 
 
         <!-- add other dependencies -->
+        <!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
+        <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro</artifactId>
+            <version>1.10.2</version>
+        </dependency>
+
 
         <dependency>
             <groupId>org.yaml</groupId>

+ 1 - 1
src/main/java/com/winhc/bigdata/filnk/java/sink/OdpsSinkBuilder4J.java

@@ -39,7 +39,7 @@ public class OdpsSinkBuilder4J {
         String project = EnvConst.getValue("odps.default-project");
         String tn = tableName;
         if (tableName.contains(".")) {
-            String[] strings = tableName.split(".");
+            String[] strings = tableName.split("\\.");
             project = strings[0];
             tn = strings[1];
         }

+ 0 - 2
src/main/java/com/winhc/bigdata/filnk/java/source/OdpsSourceBuilder4J.java

@@ -24,7 +24,6 @@ import java.util.stream.Collectors;
 public class OdpsSourceBuilder4J {
     private Configuration conf;
     private Odps odps;
-
     private String tableName;
     private List<String> ds;
     private String accessId;
@@ -35,7 +34,6 @@ public class OdpsSourceBuilder4J {
 
     public OdpsSourceBuilder4J(Configuration conf) {
         this.conf = conf;
-
         this.tableName = conf.getString(OdpsOptions.TABLE_NAME);
         this.ds = Arrays.stream(conf.getString(OdpsOptions.PARTITION, "").split(","))
                 .filter(StringUtils::isNotBlank).collect(Collectors.toList());

+ 16 - 4
src/main/scala/com/winhc/bigdata/flink/TestJob.scala

@@ -15,18 +15,30 @@ import java.util.concurrent.TimeUnit
  * @date: 2021/8/30 14:32
  */
 object TestJob {
-  def main(args: Array[String]): Unit = {
+
+  def test1(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
 
-    val odpsSource = OdpsSourceBuilder.buildSourceFunction(tableName = "xjk_test_flink_ads_company", partition = "ds=20210803")
+    val odpsSource = OdpsSourceBuilder.buildSourceFunction("winhc_ng.xjk_test_test")
+    val source = env.addSource[RowData](odpsSource)
+    source.addSink(new PrintSinkFunction[RowData])
+    env.execute("test job")
+  }
+
+  def test2(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val odpsSource = OdpsSourceBuilder.buildSourceFunction(tableName = "winhc_ng.xjk_test_flink_ads_company", partition = "ds=20210803")
     val source = env.addSource[RowData](odpsSource)
     val value1: DataStream[(String, String)] = source.map(r => ("ng_company", r.getString(0).toString))
     val function: AsyncFunction[(String, String), (String, String, String)] = HbaseAsyncFunction()
     val value: DataStream[(String, String, String)] = AsyncDataStream.unorderedWait(value1, function, 100, TimeUnit.SECONDS, 10)
 
-
     value.addSink(new PrintSinkFunction[(String, String, String)])
-
     env.execute("test job")
   }
+
+
+  def main(args: Array[String]): Unit = {
+    test2()
+  }
 }

+ 1 - 0
src/main/scala/com/winhc/bigdata/flink/config/HbaseConfig.scala

@@ -1,5 +1,6 @@
 package com.winhc.bigdata.flink.config
 
+
 import com.alibaba.ververica.connector.shaded.hadoop3.org.apache.hadoop.conf.Configuration
 import com.winhc.bigdata.filnk.java.constant.EnvConst
 import org.apache.hadoop.hbase.HBaseConfiguration

+ 12 - 5
src/main/scala/com/winhc/bigdata/flink/func/HbaseAsyncFunction.scala

@@ -1,29 +1,36 @@
 package com.winhc.bigdata.flink.func
 
+import com.alibaba.ververica.connector.cloudhbase.util.HBaseConfigurationUtil
 import com.winhc.bigdata.flink.config.HbaseConfig
 import com.winhc.bigdata.flink.implicits._
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.scala.async.{ResultFuture, RichAsyncFunction}
 import org.apache.hadoop.hbase.TableName
-import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Get}
+import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Get, HTable}
+import org.slf4j.LoggerFactory
 
 /**
  * @author: XuJiakai
  * @date: 2021/8/31 17:07
  */
 case class HbaseAsyncFunction() extends RichAsyncFunction[(String, String), (String, String, String)] {
-  var connection: Connection = null
+  private val LOG = LoggerFactory.getLogger(classOf[HbaseAsyncFunction])
+
+  @transient private var connection: Connection = null
+
 
   override def open(parameters: Configuration): Unit = {
-    val configuration = HbaseConfig.getHbaseConfiguration()
-    connection = ConnectionFactory.createConnection(configuration)
+    import com.alibaba.ververica.connector.shaded.hadoop3.org.apache.hadoop.conf.Configuration
+    val configuration: Configuration = HbaseConfig.getHbaseConfiguration()
+    val config: Configuration = HBaseConfigurationUtil.prepareRuntimeConfiguration(HBaseConfigurationUtil.serializeConfiguration(configuration), LOG)
+    connection = ConnectionFactory.createConnection(config)
   }
 
   override def asyncInvoke(input: (String, String), resultFuture: ResultFuture[(String, String, String)]): Unit = {
     val tn = input._1
     val rowkey = input._2
     try {
-      val table = connection.getTable(TableName.valueOf(tn.toUpperCase()))
+      val table = connection.getTable(TableName.valueOf(tn.toUpperCase())).asInstanceOf[HTable]
       val get = new Get(rowkey.getBytes)
       val result = table.get(get)
       val string = result.toJsonString

+ 3 - 2
src/main/scala/com/winhc/bigdata/flink/source/OdpsSourceBuilder.scala

@@ -14,7 +14,7 @@ object OdpsSourceBuilder {
     var project = EnvConst.getValue("odps.default-project")
     var tn = tableName
     if (tableName.contains(".")) {
-      val strings = tableName.split(".")
+      val strings = tableName.split("\\.")
       project = strings(0)
       tn = strings(1)
     }
@@ -25,7 +25,8 @@ object OdpsSourceBuilder {
     conf.setString(OdpsOptions.TABLE_NAME.key, tn)
     conf.setString(OdpsOptions.ACCESS_ID.key(), EnvConst.getValue("odps.prod-account.access-id"))
     conf.setString(OdpsOptions.ACCESS_KEY.key(), EnvConst.getValue("odps.prod-account.access-key"))
-    conf.setString(OdpsOptions.PARTITION.key, partition)
+    if (partition != null)
+      conf.setString(OdpsOptions.PARTITION.key, partition)
     val odpsSource = new OdpsSourceBuilder4J(conf).buildSourceFunction
     odpsSource
   }

+ 1 - 3
src/main/scala/com/winhc/bigdata/flink/test/MyTest.scala

@@ -30,9 +30,7 @@ object MyTest {
 
 
 
-    /*  val odpsSource = OdpsSource.buildSourceFunction("xjk_test_test")
-      val source = env.addSource[RowData](odpsSource)
-      source.addSink(new PrintSinkFunction[RowData])*/
+
 
     env.execute("test job")
   }