xufei 3 lat temu
rodzic
commit
4585fd3827

+ 166 - 167
pom.xml

@@ -19,8 +19,8 @@
         <maven.compiler.source>8</maven.compiler.source>
         <maven.compiler.target>8</maven.compiler.target>
         <connector.version>1.13-vvr-4.0.7</connector.version>
-        <scope>compile</scope>
-        <!--        <scope>provided</scope>-->
+        <!-- <scope>provided</scope>-->
+        <scope>provided</scope>
     </properties>
 
     <repositories>
@@ -51,19 +51,19 @@
     <dependencies>
         <!-- Apache Flink dependencies -->
         <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-avro</artifactId>
-            <version>${flink.version}</version>
-            <scope>${scope}</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
-            <version>${flink.version}</version>
-            <scope>${scope}</scope>
-        </dependency>
+        <!--        <dependency>-->
+        <!--            <groupId>org.apache.flink</groupId>-->
+        <!--            <artifactId>flink-avro</artifactId>-->
+        <!--            <version>${flink.version}</version>-->
+        <!--            <scope>${scope}</scope>-->
+        <!--        </dependency>-->
+
+        <!--        <dependency>-->
+        <!--            <groupId>org.apache.flink</groupId>-->
+        <!--            <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>-->
+        <!--            <version>${flink.version}</version>-->
+        <!--            <scope>${scope}</scope>-->
+        <!--        </dependency>-->
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-scala_${scala.binary.version}</artifactId>
@@ -219,7 +219,6 @@
         </dependency>
 
 
-
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
@@ -236,158 +235,158 @@
         </dependency>
     </dependencies>
 
-    <build>
-        <finalName>winhc_flink-${project.version}</finalName>
-        <plugins>
-            <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
-            <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-shade-plugin</artifactId>
-                <version>3.1.1</version>
-                <executions>
-                    <!-- Run shade goal on package phase -->
-                    <execution>
-                        <phase>package</phase>
-                        <goals>
-                            <goal>shade</goal>
-                        </goals>
-                        <configuration>
-                            <artifactSet>
-                                <excludes>
-                                    <exclude>org.apache.flink:force-shading</exclude>
-                                    <exclude>com.google.code.findbugs:jsr305</exclude>
-                                    <exclude>org.slf4j:*</exclude>
-                                    <exclude>org.apache.logging.log4j:*</exclude>
-                                </excludes>
-                            </artifactSet>
-                            <filters>
-                                <filter>
-                                    <!-- Do not copy the signatures in the META-INF folder.
-                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
-                                    <artifact>*:*</artifact>
+        <build>
+            <finalName>winhc_flink-${project.version}</finalName>
+            <plugins>
+                <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
+                <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
+                <plugin>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-shade-plugin</artifactId>
+                    <version>3.1.1</version>
+                    <executions>
+                        <!-- Run shade goal on package phase -->
+                        <execution>
+                            <phase>package</phase>
+                            <goals>
+                                <goal>shade</goal>
+                            </goals>
+                            <configuration>
+                                <artifactSet>
                                     <excludes>
-                                        <exclude>META-INF/*.SF</exclude>
-                                        <exclude>META-INF/*.DSA</exclude>
-                                        <exclude>META-INF/*.RSA</exclude>
+                                        <exclude>org.apache.flink:force-shading</exclude>
+                                        <exclude>com.google.code.findbugs:jsr305</exclude>
+                                        <exclude>org.slf4j:*</exclude>
+                                        <exclude>org.apache.logging.log4j:*</exclude>
                                     </excludes>
-                                </filter>
-                            </filters>
-                            <transformers>
-                                <transformer
-                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
-                                    <mainClass>org.myorg.quickstart.StreamingJob</mainClass>
-                                </transformer>
-
-                                <!-- The service transformer is needed to merge META-INF/services files -->
-                                <transformer
-                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
-                                <transformer
-                                        implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer">
-                                    <projectName>Apache Flink</projectName>
-                                    <encoding>UTF-8</encoding>
-                                </transformer>
-
-                            </transformers>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-
-            <!-- Java Compiler -->
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-compiler-plugin</artifactId>
-                <version>3.1</version>
-                <configuration>
-                    <source>1.8</source>
-                    <target>1.8</target>
-                </configuration>
-            </plugin>
-
-            <!-- Scala Compiler -->
-            <plugin>
-                <groupId>net.alchim31.maven</groupId>
-                <artifactId>scala-maven-plugin</artifactId>
-                <version>3.2.2</version>
-                <executions>
-                    <execution>
-                        <goals>
-                            <goal>compile</goal>
-                            <goal>testCompile</goal>
-                        </goals>
-                    </execution>
-                </executions>
-                <configuration>
-                    <addScalacArgs>-target:jvm-1.8</addScalacArgs>
-                    <args>
-                        <arg>-nobootcp</arg>
-                    </args>
-                </configuration>
-            </plugin>
-
-            <!-- Eclipse Scala Integration -->
-            <!--            <plugin>-->
-            <!--                <groupId>org.apache.maven.plugins</groupId>-->
-            <!--                <artifactId>maven-eclipse-plugin</artifactId>-->
-            <!--                <version>2.8</version>-->
-            <!--                <configuration>-->
-            <!--                    <downloadSources>true</downloadSources>-->
-            <!--                    <projectnatures>-->
-            <!--                        <projectnature>org.scala-ide.sdt.core.scalanature</projectnature>-->
-            <!--                        <projectnature>org.eclipse.jdt.core.javanature</projectnature>-->
-            <!--                    </projectnatures>-->
-            <!--                    <buildcommands>-->
-            <!--                        <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>-->
-            <!--                    </buildcommands>-->
-            <!--                    <classpathContainers>-->
-            <!--                        <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>-->
-            <!--                        <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>-->
-            <!--                    </classpathContainers>-->
-            <!--                    <excludes>-->
-            <!--                        <exclude>org.scala-lang:scala-library</exclude>-->
-            <!--                        <exclude>org.scala-lang:scala-compiler</exclude>-->
-            <!--                    </excludes>-->
-            <!--                    <sourceIncludes>-->
-            <!--                        <sourceInclude>**/*.scala</sourceInclude>-->
-            <!--                        <sourceInclude>**/*.java</sourceInclude>-->
-            <!--                    </sourceIncludes>-->
-            <!--                </configuration>-->
-            <!--            </plugin>-->
-            <plugin>
-                <groupId>org.codehaus.mojo</groupId>
-                <artifactId>build-helper-maven-plugin</artifactId>
-                <version>1.7</version>
-                <executions>
-                    <!-- Add src/main/scala to eclipse build path -->
-                    <execution>
-                        <id>add-source</id>
-                        <phase>generate-sources</phase>
-                        <goals>
-                            <goal>add-source</goal>
-                        </goals>
-                        <configuration>
-                            <sources>
-                                <source>src/main/scala</source>
-                            </sources>
-                        </configuration>
-                    </execution>
-                    <!-- Add src/test/scala to eclipse build path -->
-                    <execution>
-                        <id>add-test-source</id>
-                        <phase>generate-test-sources</phase>
-                        <goals>
-                            <goal>add-test-source</goal>
-                        </goals>
-                        <configuration>
-                            <sources>
-                                <source>src/test/scala</source>
-                            </sources>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-    </build>
+                                </artifactSet>
+                                <filters>
+                                    <filter>
+                                        <!-- Do not copy the signatures in the META-INF folder.
+                                        Otherwise, this might cause SecurityExceptions when using the JAR. -->
+                                        <artifact>*:*</artifact>
+                                        <excludes>
+                                            <exclude>META-INF/*.SF</exclude>
+                                            <exclude>META-INF/*.DSA</exclude>
+                                            <exclude>META-INF/*.RSA</exclude>
+                                        </excludes>
+                                    </filter>
+                                </filters>
+                                <transformers>
+                                    <transformer
+                                            implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                        <mainClass>org.myorg.quickstart.StreamingJob</mainClass>
+                                    </transformer>
+
+                                    <!-- The service transformer is needed to merge META-INF/services files -->
+                                    <transformer
+                                            implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                                    <transformer
+                                            implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer">
+                                        <projectName>Apache Flink</projectName>
+                                        <encoding>UTF-8</encoding>
+                                    </transformer>
+
+                                </transformers>
+                            </configuration>
+                        </execution>
+                    </executions>
+                </plugin>
+
+                <!-- Java Compiler -->
+                <plugin>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-compiler-plugin</artifactId>
+                    <version>3.1</version>
+                    <configuration>
+                        <source>1.8</source>
+                        <target>1.8</target>
+                    </configuration>
+                </plugin>
+
+                <!-- Scala Compiler -->
+                <plugin>
+                    <groupId>net.alchim31.maven</groupId>
+                    <artifactId>scala-maven-plugin</artifactId>
+                    <version>3.2.2</version>
+                    <executions>
+                        <execution>
+                            <goals>
+                                <goal>compile</goal>
+                                <goal>testCompile</goal>
+                            </goals>
+                        </execution>
+                    </executions>
+                    <configuration>
+                        <addScalacArgs>-target:jvm-1.8</addScalacArgs>
+                        <args>
+                            <arg>-nobootcp</arg>
+                        </args>
+                    </configuration>
+                </plugin>
+
+                <!-- Eclipse Scala Integration -->
+                <!--            <plugin>-->
+                <!--                <groupId>org.apache.maven.plugins</groupId>-->
+                <!--                <artifactId>maven-eclipse-plugin</artifactId>-->
+                <!--                <version>2.8</version>-->
+                <!--                <configuration>-->
+                <!--                    <downloadSources>true</downloadSources>-->
+                <!--                    <projectnatures>-->
+                <!--                        <projectnature>org.scala-ide.sdt.core.scalanature</projectnature>-->
+                <!--                        <projectnature>org.eclipse.jdt.core.javanature</projectnature>-->
+                <!--                    </projectnatures>-->
+                <!--                    <buildcommands>-->
+                <!--                        <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>-->
+                <!--                    </buildcommands>-->
+                <!--                    <classpathContainers>-->
+                <!--                        <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>-->
+                <!--                        <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>-->
+                <!--                    </classpathContainers>-->
+                <!--                    <excludes>-->
+                <!--                        <exclude>org.scala-lang:scala-library</exclude>-->
+                <!--                        <exclude>org.scala-lang:scala-compiler</exclude>-->
+                <!--                    </excludes>-->
+                <!--                    <sourceIncludes>-->
+                <!--                        <sourceInclude>**/*.scala</sourceInclude>-->
+                <!--                        <sourceInclude>**/*.java</sourceInclude>-->
+                <!--                    </sourceIncludes>-->
+                <!--                </configuration>-->
+                <!--            </plugin>-->
+                <plugin>
+                    <groupId>org.codehaus.mojo</groupId>
+                    <artifactId>build-helper-maven-plugin</artifactId>
+                    <version>1.7</version>
+                    <executions>
+                        <!-- Add src/main/scala to eclipse build path -->
+                        <execution>
+                            <id>add-source</id>
+                            <phase>generate-sources</phase>
+                            <goals>
+                                <goal>add-source</goal>
+                            </goals>
+                            <configuration>
+                                <sources>
+                                    <source>src/main/scala</source>
+                                </sources>
+                            </configuration>
+                        </execution>
+                        <!-- Add src/test/scala to eclipse build path -->
+                        <execution>
+                            <id>add-test-source</id>
+                            <phase>generate-test-sources</phase>
+                            <goals>
+                                <goal>add-test-source</goal>
+                            </goals>
+                            <configuration>
+                                <sources>
+                                    <source>src/test/scala</source>
+                                </sources>
+                            </configuration>
+                        </execution>
+                    </executions>
+                </plugin>
+            </plugins>
+        </build>
 
 </project>

+ 1 - 1
src/main/resources/env.yml

@@ -1,5 +1,5 @@
 profile:
-  activate: dev
+  activate: prod
 
 odps:
   default-project: winhc_bigdata_learn_dev

+ 8 - 4
src/main/scala/com/winhc/bigdata/flink/config/HbaseConfig.scala

@@ -5,8 +5,11 @@ import com.alibaba.ververica.connector.shaded.hadoop3.org.apache.hadoop.conf.Con
 import com.winhc.bigdata.flink.java.constant.EnvConst
 import org.apache.flink.configuration.{ConfigOption, ConfigOptions}
 import org.apache.hadoop.hbase.HBaseConfiguration
-
 import java.util
+
+import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters
+import org.apache.flink.api.java.utils.ParameterTool
+
 import scala.collection.JavaConverters._
 
 /**
@@ -30,11 +33,12 @@ object HbaseConfig {
    * @param configs Flink ConfigOption
    * @return HBaseConfiguration
    */
-  def getHbaseConfiguration(parameters: org.apache.flink.configuration.Configuration, configs: Seq[ConfigOption[String]]): Configuration = {
+  def getHbaseConfiguration(parameters: ParameterTool, configs: Seq[String]): Configuration = {
     val configuration: Configuration = HBaseConfiguration.create
     for (config <- configs) {
-      val value = parameters.getString(config)
-      val key = config.key()
+
+      val value = parameters.get(config)
+      val key = config
       configuration.set(key.replace("winhc.hbase.config.", ""), value)
     }
     configuration

+ 18 - 21
src/main/scala/com/winhc/bigdata/flink/func/HbaseAsyncFunction.scala

@@ -4,6 +4,7 @@ import com.winhc.bigdata.flink.config.HbaseConfig
 import com.winhc.bigdata.flink.event.UpdateEntity
 import com.winhc.bigdata.flink.implicits._
 import com.winhc.bigdata.flink.utils.BaseUtils
+import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.configuration.{ConfigOptions, Configuration}
 import org.apache.flink.streaming.api.scala.async.{ResultFuture, RichAsyncFunction}
 import org.apache.hadoop.hbase.TableName
@@ -33,25 +34,18 @@ case class HbaseAsyncFunction() extends RichAsyncFunction[UpdateEntity, UpdateEn
         val configuration: Configuration = HbaseConfig.getHbaseConfiguration()
         val config: Configuration = HBaseConfigurationUtil.prepareRuntimeConfiguration(HBaseConfigurationUtil.serializeConfiguration(configuration), LOG)
        connection = ConnectionFactory.createConnection(config)
-       */ val quorum = ConfigOptions.key("winhc.hbase.config.hbase.zookeeper.quorum").stringType().defaultValue("hb-proxy-pub-uf6m8e1nu4ivp06m5-master1-001.hbase.rds.aliyuncs.com:2181,hb-proxy-pub-uf6m8e1nu4ivp06m5-master2-001.hbase.rds.aliyuncs.com:2181,hb-proxy-pub-uf6m8e1nu4ivp06m5-master3-001.hbase.rds.aliyuncs.com:2181")
-    val period = ConfigOptions.key("winhc.hbase.config.hbase.client.scanner.timeout.period").stringType().defaultValue("120000")
-    val number = ConfigOptions.key("winhc.hbase.config.hbase.client.retries.number").stringType().defaultValue("5")
-    val pause = ConfigOptions.key("winhc.hbase.config.hbase.client.pause").stringType().defaultValue("1000")
-    val perserver_tasks = ConfigOptions.key("winhc.hbase.config.hbase.client.max.perserver.tasks").stringType().defaultValue("10")
-    val perregion_tasks = ConfigOptions.key("winhc.hbase.config.hbase.client.max.perregion.tasks").stringType().defaultValue("10")
-    val maxsize = ConfigOptions.key("winhc.hbase.config.hbase.client.keyvalue.maxsize").stringType().defaultValue("524288000")
-    val size = ConfigOptions.key("winhc.hbase.config.base.client.ipc.pool.size").stringType().defaultValue("5")
-    val retry = ConfigOptions.key("winhc.hbase.config.zookeeper.recovery.retry").stringType().defaultValue("5")
-    val configuration = HbaseConfig.getHbaseConfiguration(parameters, Seq(
-      quorum,
-      period,
-      number,
-      pause,
-      perserver_tasks,
-      perregion_tasks,
-      maxsize,
-      size,
-      retry
+       */     val config=getRuntimeContext.getExecutionConfig.getGlobalJobParameters.asInstanceOf[ParameterTool]
+
+    val configuration = HbaseConfig.getHbaseConfiguration(config, Seq(
+      "winhc.hbase.config.hbase.zookeeper.quorum",
+      "winhc.hbase.config.hbase.client.scanner.timeout.period",
+      "winhc.hbase.config.hbase.client.retries.number",
+      "winhc.hbase.config.hbase.client.pause",
+      "winhc.hbase.config.hbase.client.max.perserver.tasks",
+      "winhc.hbase.config.hbase.client.max.perregion.tasks",
+      "winhc.hbase.config.hbase.client.keyvalue.maxsize",
+      "winhc.hbase.config.hbase.client.ipc.pool.size",
+      "winhc.hbase.config.zookeeper.recovery.retry"
     ))
     connection = ConnectionFactory.createConnection(configuration)
 
@@ -72,8 +66,11 @@ case class HbaseAsyncFunction() extends RichAsyncFunction[UpdateEntity, UpdateEn
         val companyTable = connection.getTable(TableName.valueOf("NG_COMPANY")).asInstanceOf[HTable]
         val get = new Get(companyId.getBytes)
         val companyResult = companyTable.get(get)
-        val oldCompany = parse(companyResult.toJson()).asInstanceOf[JObject]
-        input.oldCompany = oldCompany
+        if(companyResult!=null){
+          val oldCompany = parse(companyResult.toJson()).asInstanceOf[JObject]
+          input.oldCompany = oldCompany
+        }
+
         companyTable.close()
       }
       input.dims.foreach {

+ 15 - 20
src/main/scala/com/winhc/bigdata/flink/func/HbaseSinkFunction.scala

@@ -9,8 +9,10 @@ import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
 import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
 import org.apache.hadoop.hbase.TableName
 import org.apache.hadoop.hbase.client.{BufferedMutator, Connection, ConnectionFactory, RetriesExhaustedWithDetailsException}
-
 import java.util.concurrent.atomic.AtomicReference
+
+import org.apache.flink.api.java.utils.ParameterTool
+
 import scala.collection.JavaConverters.asScalaSetConverter
 
 /**
@@ -22,25 +24,18 @@ class HbaseSinkFunction() extends RichSinkFunction[PutCollection] with Checkpoin
   private val failureThrowable: AtomicReference[Throwable] = new AtomicReference[Throwable]
 
   override def open(parameters: Configuration): Unit = {
-    val quorum = ConfigOptions.key("winhc.hbase.config.hbase.zookeeper.quorum").stringType().defaultValue("hb-proxy-pub-uf6m8e1nu4ivp06m5-master1-001.hbase.rds.aliyuncs.com:2181,hb-proxy-pub-uf6m8e1nu4ivp06m5-master2-001.hbase.rds.aliyuncs.com:2181,hb-proxy-pub-uf6m8e1nu4ivp06m5-master3-001.hbase.rds.aliyuncs.com:2181")
-    val period = ConfigOptions.key("winhc.hbase.config.hbase.client.scanner.timeout.period").stringType().defaultValue("120000")
-    val number = ConfigOptions.key("winhc.hbase.config.hbase.client.retries.number").stringType().defaultValue("5")
-    val pause = ConfigOptions.key("winhc.hbase.config.hbase.client.pause").stringType().defaultValue("1000")
-    val perserver_tasks = ConfigOptions.key("winhc.hbase.config.hbase.client.max.perserver.tasks").stringType().defaultValue("10")
-    val perregion_tasks = ConfigOptions.key("winhc.hbase.config.hbase.client.max.perregion.tasks").stringType().defaultValue("10")
-    val maxsize = ConfigOptions.key("winhc.hbase.config.hbase.client.keyvalue.maxsize").stringType().defaultValue("524288000")
-    val size = ConfigOptions.key("winhc.hbase.config.base.client.ipc.pool.size").stringType().defaultValue("5")
-    val retry = ConfigOptions.key("winhc.hbase.config.zookeeper.recovery.retry").stringType().defaultValue("5")
-    val configuration = HbaseConfig.getHbaseConfiguration(parameters, Seq(
-      quorum,
-      period,
-      number,
-      pause,
-      perserver_tasks,
-      perregion_tasks,
-      maxsize,
-      size,
-      retry
+    val config=getRuntimeContext.getExecutionConfig.getGlobalJobParameters.asInstanceOf[ParameterTool]
+
+    val configuration = HbaseConfig.getHbaseConfiguration(config, Seq(
+      "winhc.hbase.config.hbase.zookeeper.quorum",
+      "winhc.hbase.config.hbase.client.scanner.timeout.period",
+      "winhc.hbase.config.hbase.client.retries.number",
+      "winhc.hbase.config.hbase.client.pause",
+      "winhc.hbase.config.hbase.client.max.perserver.tasks",
+      "winhc.hbase.config.hbase.client.max.perregion.tasks",
+      "winhc.hbase.config.hbase.client.keyvalue.maxsize",
+      "winhc.hbase.config.hbase.client.ipc.pool.size",
+      "winhc.hbase.config.zookeeper.recovery.retry"
     ))
     connection = ConnectionFactory.createConnection(configuration)
   }

+ 1 - 1
src/main/scala/com/winhc/bigdata/flink/implicits/HbaseResultHelper.scala

@@ -13,7 +13,7 @@ import scala.collection.mutable.Map
 case class HbaseResultHelper(result: Result) {
   def toJsonString(): String = {
     if (result == null || result.isEmpty) {
-      null
+      "{}"
     } else {
       val map: Map[String, String] = Map()
       var rowkey: String = null

+ 0 - 43
src/main/scala/com/winhc/bigdata/flink/jobs/TestJob.scala

@@ -1,43 +0,0 @@
-package com.winhc.bigdata.flink.jobs
-
-import com.winhc.bigdata.flink.func.HbaseAsyncFunction
-import com.winhc.bigdata.flink.source.OdpsSourceBuilder
-import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction
-import org.apache.flink.streaming.api.scala.async.AsyncFunction
-import org.apache.flink.streaming.api.scala.{AsyncDataStream, DataStream, StreamExecutionEnvironment}
-import org.apache.flink.table.data.RowData
-import org.apache.flink.api.scala._
-import scala.concurrent.duration.SECONDS
-
-/**
- * @author: XuJiakai
- * @date: 2021/8/30 14:32
- */
-object TestJob {
-
-  def test1(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-    val odpsSource = OdpsSourceBuilder.buildSourceFunction("winhc_ng.xjk_test_test")
-    val source = env.addSource[RowData](odpsSource)
-    source.addSink(new PrintSinkFunction[RowData])
-    env.execute("test job")
-  }
-
-  def test2(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val odpsSource = OdpsSourceBuilder.buildSourceFunction(tableName = "winhc_ng.xjk_test_flink_ads_company", partition = "ds=20210803")
-    val source = env.addSource[RowData](odpsSource)
-    val value1: DataStream[(String, String)] = source.map(r => ("ng_company", r.getString(0).toString))
-    val function: AsyncFunction[(String, String), (String, String, String)] = HbaseAsyncFunction()
-    val value: DataStream[(String, String, String)] = AsyncDataStream.unorderedWait(value1, function, 100, SECONDS, 10)
-
-    value.addSink(new PrintSinkFunction[(String, String, String)])
-    env.execute("test job")
-  }
-
-
-  def main(args: Array[String]): Unit = {
-    test2()
-  }
-}

+ 6 - 2
src/main/scala/com/winhc/bigdata/flink/jobs/TestJob1.scala

@@ -23,10 +23,11 @@ import org.apache.flink.util.Collector
 import org.apache.hadoop.hbase.client.Put
 import org.json4s.JsonAST.{JNull, JObject, JValue}
 import org.json4s.{JNothing, JString}
-
 import java.nio.charset.StandardCharsets
 import java.util.concurrent.TimeUnit
 
+import com.alibaba.dcm.DnsCacheManipulator
+
 
 /**
  * @author ZhangJi
@@ -34,8 +35,9 @@ import java.util.concurrent.TimeUnit
  */
 object TestJob1 {
   def main(args: Array[String]): Unit = {
+    DnsCacheManipulator.loadDnsCacheConfig()
 
-    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration())
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
 
     env.getConfig.setGlobalJobParameters(EnvConst.createParameterTool(args))
     val kafkaSource: KafkaSource[String] = KafkaSourceBuilder.buildSourceFunction("flink_test")
@@ -130,7 +132,9 @@ object TestJob1 {
         PutCollection.merge(a, b)
       }
     })
+
       .addSink(new HbaseSinkFunction)
+
     env.execute("flink start")
   }
 

+ 299 - 13
src/main/scala/com/winhc/bigdata/flink/utils/RowDataUtils.scala

@@ -9,8 +9,8 @@ import com.winhc.bigdata.flink.utils.BaseUtils.md5
 import org.json4s.JsonAST.{JArray, JObject, JString}
 import org.json4s.JsonDSL.jobject2assoc
 import org.json4s.jackson.JsonMethods._
+import java.io.{File, InputStream}
 
-import java.io.File
 import scala.collection.JavaConverters._
 
 /**
@@ -18,20 +18,302 @@ import scala.collection.JavaConverters._
  * @since 2021-10-13 14:54
  */
 object RowDataUtils {
+  val schemas: Map[String, JsonNode] = createAllSchema()
 
-  private val schema_dir: String = this.getClass.getResource("/schema").getPath
-  val schemas: Map[String, JsonNode] = createAllSchema
+  def createAllSchema(): Map[String, JsonNode] = {
+//    val schema_dir: String = this.getClass.getClassLoader.getResource("/schema").getPath
 
-  def createAllSchema: Map[String, JsonNode] = {
-    new File(schema_dir).listFiles().filter(!_.isDirectory)
-      .filter(t => t.toString.endsWith(".schema"))
-      .map(f =>
-        (f.getName.replace(".schema", ""), scala.io.Source.fromFile(f))
-      )
-      .map(t => {
-        val (n, s) = t
-        (n, try s.getLines().mkString("\n") finally s.close())
-      })
+    Seq(
+      ("company",
+        """
+          |{
+          |  "$schema": "http://json-schema.org/draft-04/schema#",
+          |  "type": ["null","object"],
+          |  "properties": {
+          |    "company_id": {
+          |      "type": ["string"]
+          |    },
+          |    "new_cid": {
+          |      "type": ["null","string"]
+          |    },
+          |    "base": {
+          |      "type": ["null","string"]
+          |    },
+          |    "name": {
+          |      "type": ["null","string"]
+          |    },
+          |    "name_en": {
+          |      "type": ["null","string"]
+          |    },
+          |    "name_alias": {
+          |      "type": ["null","string"]
+          |    },
+          |    "history_names": {
+          |      "type": ["null","array"],
+          |      "items": [
+          |        {
+          |          "type": ["string"]
+          |        }
+          |      ]
+          |    },
+          |    "legal_entity_id": {
+          |      "type": ["null","string"]
+          |    },
+          |    "legal_entity_name": {
+          |      "type": ["null","string"]
+          |    },
+          |    "legal_entity_type": {
+          |      "type": ["null","integer"]
+          |    },
+          |    "reg_number": {
+          |      "type": ["null","string"]
+          |    },
+          |    "company_org_type": {
+          |      "type": ["null","string"]
+          |    },
+          |    "reg_location": {
+          |      "type": ["null","string"]
+          |    },
+          |    "estiblish_time": {
+          |      "type": ["null","string"]
+          |    },
+          |    "from_time": {
+          |      "type": ["null","string"]
+          |    },
+          |    "to_time": {
+          |      "type": ["null","string"]
+          |    },
+          |    "business_scope": {
+          |      "type": ["null","string"]
+          |    },
+          |    "reg_institute": {
+          |      "type": ["null","string"]
+          |    },
+          |    "approved_time": {
+          |      "type": ["null","string"]
+          |    },
+          |    "reg_status": {
+          |      "type": ["null","string"]
+          |    },
+          |    "reg_capital": {
+          |      "type": ["null","string"]
+          |    },
+          |    "org_approved_institute": {
+          |      "type": ["null","string"]
+          |    },
+          |    "parent_company_id": {
+          |      "type": ["null","string"]
+          |    },
+          |    "company_type": {
+          |      "type": ["null","integer"]
+          |    },
+          |    "credit_code": {
+          |      "type": ["null","string"]
+          |    },
+          |    "org_number": {
+          |      "type": ["null","string"]
+          |    },
+          |    "score": {
+          |      "type": ["null","number"]
+          |    },
+          |    "cate_first_code": {
+          |      "type": ["null","string"]
+          |    },
+          |    "cate_second_code": {
+          |      "type": ["null","string"]
+          |    },
+          |    "cate_third_code": {
+          |      "type": ["null","string"]
+          |    },
+          |    "lat": {
+          |      "type": ["null","string"]
+          |    },
+          |    "lng": {
+          |      "type": ["null","string"]
+          |    },
+          |    "province_code": {
+          |      "type": ["null","string"]
+          |    },
+          |    "city_code": {
+          |      "type": ["null","string"]
+          |    },
+          |    "county_code": {
+          |      "type": ["null","string"]
+          |    },
+          |    "reg_capital_amount": {
+          |      "type": ["null","integer"]
+          |    },
+          |    "reg_capital_currency": {
+          |      "type": ["null","string"]
+          |    },
+          |    "actual_capital_amount": {
+          |      "type": ["null","integer"]
+          |    },
+          |    "actual_capital_currency": {
+          |      "type": ["null","string"]
+          |    },
+          |    "reg_status_std": {
+          |      "type": ["null","string"]
+          |    },
+          |    "social_security_staff_num": {
+          |      "type": ["null","integer"]
+          |    },
+          |    "cancel_date": {
+          |      "type": ["null","string"]
+          |    },
+          |    "cancel_reason": {
+          |      "type": ["null","string"]
+          |    },
+          |    "revoke_date": {
+          |      "type": ["null","string"]
+          |    },
+          |    "revoke_reason": {
+          |      "type": ["null","string"]
+          |    },
+          |    "emails": {
+          |      "type": ["null","string"]
+          |    },
+          |    "phones": {
+          |      "type": ["null","string"]
+          |    },
+          |    "wechat_public_num": {
+          |      "type": ["null","string"]
+          |    },
+          |    "logo": {
+          |      "type": ["null","string"]
+          |    },
+          |    "crawled_time": {
+          |      "type": ["null","string"]
+          |    },
+          |    "create_time": {
+          |      "type": ["null","string"]
+          |    },
+          |    "update_time": {
+          |      "type": ["null","string"]
+          |    },
+          |    "deleted": {
+          |      "type": ["null","integer"]
+          |    }
+          |  },
+          |  "required": [
+          |    "company_id"
+          |  ]
+          |}
+          |""".stripMargin),
+      ("company_holder",
+        """{
+          |  "$schema": "http://json-schema.org/draft-04/schema#",
+          |  "type": ["null","object"],
+          |  "properties": {
+          |    "capital_actual": {
+          |      "type": ["null","string"]
+          |    },
+          |    "amount": {
+          |      "type": ["null","number"]
+          |    },
+          |    "capital": {
+          |      "type": ["null","array"],
+          |      "items": [
+          |        {
+          |          "type": ["object"],
+          |          "properties": {
+          |            "amomon": {
+          |              "type": ["string"]
+          |            },
+          |            "paymet": {
+          |              "type": ["null","string"]
+          |            },
+          |            "time": {
+          |              "type": ["null","string"]
+          |            }
+          |          },
+          |          "required": [
+          |            "amomon"
+          |          ]
+          |        }
+          |      ]
+          |    },
+          |    "company_id": {
+          |      "type": ["string"]
+          |    },
+          |    "create_time": {
+          |      "type": ["null","string"]
+          |    },
+          |    "holder_type": {
+          |      "type": ["integer"]
+          |    },
+          |    "percent": {
+          |      "type": ["null","number"]
+          |    },
+          |    "update_time": {
+          |      "type": ["null","string"]
+          |    },
+          |    "deleted": {
+          |      "type": ["null","integer"]
+          |    },
+          |    "company_name": {
+          |      "type": ["null","string"]
+          |    },
+          |    "holder_id": {
+          |      "type": ["string"]
+          |    },
+          |    "holder_name": {
+          |      "type": ["string"]
+          |    }
+          |  },
+          |  "required": [
+          |    "company_id",
+          |    "company_name",
+          |    "holder_type",
+          |    "holder_name"
+          |  ]
+          |}""".stripMargin),
+      ("company_staff",
+        """{
+          |  "$schema": "http://json-schema.org/draft-04/schema#",
+          |  "type": ["null","object"],
+          |  "properties": {
+          |    "staff_name": {
+          |      "type": ["string"]
+          |    },
+          |    "hid": {
+          |      "type": ["null","string"]
+          |    },
+          |    "update_time": {
+          |      "type": ["null","string"]
+          |    },
+          |    "deleted": {
+          |      "type": ["null","integer"]
+          |    },
+          |    "company_id": {
+          |      "type": ["string"]
+          |    },
+          |    "staff_type": {
+          |      "type": ["null","string"]
+          |    },
+          |    "create_time": {
+          |      "type": ["null","string"]
+          |    },
+          |    "company_name": {
+          |      "type": ["null","string"]
+          |    }
+          |  },
+          |  "required": [
+          |    "staff_name",
+          |    "company_id"
+          |  ]
+          |}""".stripMargin)
+
+    )
+      //    new File(schema_dir).listFiles().filter(!_.isDirectory)
+      //      .filter(t => t.toString.endsWith(".schema"))
+      //      .map(f =>
+      //        (f.getName.replace(".schema", ""), scala.io.Source.fromFile(f))
+      //      )
+      //      .map(t => {
+      //        val (n, s) = t
+      //        (n, try s.getLines().mkString("\n") finally s.close())
+      //      })
       .map(t => {
         val (n, s) = t
         (n, asJsonNode(parse(s)))
@@ -150,4 +432,8 @@ object RowDataUtils {
     }
   }
 
+  def main(args: Array[String]): Unit = {
+println("")
+  }
+
 }