许家凯 vor 3 Jahren
Commit
79d357bed6
52 geänderte Dateien mit 2445 neuen und 0 gelöschten Zeilen
  1. 251 0
      .gitignore
  2. 396 0
      pom.xml
  3. 13 0
      src/main/java/com/winhc/bigdata/filnk/java/BaseEntity.java
  4. 35 0
      src/main/java/com/winhc/bigdata/filnk/java/CompanyData.java
  5. 89 0
      src/main/java/com/winhc/bigdata/filnk/java/company_data_entity/Company.java
  6. 49 0
      src/main/java/com/winhc/bigdata/filnk/java/company_data_entity/CompanyHolder.java
  7. 45 0
      src/main/java/com/winhc/bigdata/filnk/java/company_data_entity/CompanyStaff.java
  8. 68 0
      src/main/java/com/winhc/bigdata/filnk/java/company_data_entity/CompanyTm.java
  9. 26 0
      src/main/java/com/winhc/bigdata/filnk/java/constant/EnvConst.java
  10. 46 0
      src/main/java/com/winhc/bigdata/filnk/java/entity/CompanyIndexEntity.java
  11. 19 0
      src/main/java/com/winhc/bigdata/filnk/java/entity/CompanyName.java
  12. 20 0
      src/main/java/com/winhc/bigdata/filnk/java/entity/Entity.java
  13. 94 0
      src/main/java/com/winhc/bigdata/filnk/java/sink/OdpsSinkBuilder4J.java
  14. 35 0
      src/main/java/com/winhc/bigdata/filnk/java/source/KafkaSourceBuilder4J.java
  15. 68 0
      src/main/java/com/winhc/bigdata/filnk/java/source/OdpsSourceBuilder4J.java
  16. 100 0
      src/main/java/com/winhc/bigdata/filnk/java/utils/EnvironmentProperties.java
  17. 25 0
      src/main/java/com/winhc/bigdata/filnk/java/utils/FieldNameUtils.java
  18. 102 0
      src/main/java/com/winhc/bigdata/filnk/java/utils/Json2EntityUtils.java
  19. 5 0
      src/main/resources/dns-cache.properties
  20. 43 0
      src/main/resources/env-dev.yml
  21. 34 0
      src/main/resources/env-prod.yml
  22. 12 0
      src/main/resources/env.yml
  23. 14 0
      src/main/scala/com/winhc/bigdata/flink/CompanyData.scala
  24. 32 0
      src/main/scala/com/winhc/bigdata/flink/TestJob.scala
  25. 66 0
      src/main/scala/com/winhc/bigdata/flink/config/ElasticsearchConfig.scala
  26. 23 0
      src/main/scala/com/winhc/bigdata/flink/config/HbaseConfig.scala
  27. 15 0
      src/main/scala/com/winhc/bigdata/flink/constant/CompanySummaryConst.scala
  28. 70 0
      src/main/scala/com/winhc/bigdata/flink/func/ElasticsearchAsyncFunction.scala
  29. 46 0
      src/main/scala/com/winhc/bigdata/flink/func/HbaseAsyncFunction.scala
  30. 39 0
      src/main/scala/com/winhc/bigdata/flink/func/HbaseSinkFunction.scala
  31. 18 0
      src/main/scala/com/winhc/bigdata/flink/implicits/CaseClass2JsonHelper.scala
  32. 33 0
      src/main/scala/com/winhc/bigdata/flink/implicits/HbaseResultHelper.scala
  33. 8 0
      src/main/scala/com/winhc/bigdata/flink/implicits/package.scala
  34. 22 0
      src/main/scala/com/winhc/bigdata/flink/jobs/CompanyStreamJob.scala
  35. 13 0
      src/main/scala/com/winhc/bigdata/flink/jobs/CustomWatermarkStrategy.scala
  36. 8 0
      src/main/scala/com/winhc/bigdata/flink/jobs/company_job_step/package.scala
  37. 31 0
      src/main/scala/com/winhc/bigdata/flink/jobs/company_job_step/step_01_kafka_transform.scala
  38. 46 0
      src/main/scala/com/winhc/bigdata/flink/jobs/company_job_step/step_02_sink_2_ods.scala
  39. 16 0
      src/main/scala/com/winhc/bigdata/flink/jobs/company_job_step/step_03_etl.scala
  40. 13 0
      src/main/scala/com/winhc/bigdata/flink/jobs/company_job_step/step_04_builder_data.scala
  41. 19 0
      src/main/scala/com/winhc/bigdata/flink/jobs/company_job_step/step_05_save_data.scala
  42. 16 0
      src/main/scala/com/winhc/bigdata/flink/jobs/company_job_step/step_06_builder_company_index.scala
  43. 59 0
      src/main/scala/com/winhc/bigdata/flink/sink/ElasticsearchSinkBuilder.scala
  44. 10 0
      src/main/scala/com/winhc/bigdata/flink/sink/HBaseSinkBuilder.scala
  45. 75 0
      src/main/scala/com/winhc/bigdata/flink/sink/HoloSinkBuilder.scala
  46. 19 0
      src/main/scala/com/winhc/bigdata/flink/sink/JsonObjectToMutationConverter.scala
  47. 17 0
      src/main/scala/com/winhc/bigdata/flink/sink/OdpsSinkBuilder.scala
  48. 14 0
      src/main/scala/com/winhc/bigdata/flink/source/KafkaSourceBuilder.scala
  49. 33 0
      src/main/scala/com/winhc/bigdata/flink/source/OdpsSourceBuilder.scala
  50. 40 0
      src/main/scala/com/winhc/bigdata/flink/test/MyTest.scala
  51. 23 0
      src/main/scala/com/winhc/bigdata/flink/utils/BaseUtils.scala
  52. 32 0
      src/main/scala/com/winhc/bigdata/flink/utils/DateUtils.scala

+ 251 - 0
.gitignore

@@ -0,0 +1,251 @@
+### Scala template
+*.class
+*.log
+
+### Example user template template
+### Example user template
+
+# IntelliJ project files
+.idea
+*.iml
+out
+gen
+### Java template
+# Compiled class file
+*.class
+
+# Log file
+*.log
+
+# BlueJ files
+*.ctxt
+
+# Mobile Tools for Java (J2ME)
+.mtj.tmp/
+
+# Package Files #
+*.jar
+*.war
+*.nar
+*.ear
+*.zip
+*.tar.gz
+*.rar
+
+# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
+hs_err_pid*
+
+### Python template
+# Byte-compiled / optimized / DLL files
+__pycache__/
+*.py[cod]
+*$py.class
+
+# C extensions
+*.so
+
+# Distribution / packaging
+.Python
+build/
+develop-eggs/
+dist/
+downloads/
+eggs/
+.eggs/
+lib/
+lib64/
+parts/
+sdist/
+var/
+wheels/
+share/python-wheels/
+*.egg-info/
+.installed.cfg
+*.egg
+MANIFEST
+
+# PyInstaller
+#  Usually these files are written by a python script from a template
+#  before PyInstaller builds the exe, so as to inject date/other infos into it.
+*.manifest
+*.spec
+
+# Installer logs
+pip-log.txt
+pip-delete-this-directory.txt
+
+# Unit test / coverage reports
+htmlcov/
+.tox/
+.nox/
+.coverage
+.coverage.*
+.cache
+nosetests.xml
+coverage.xml
+*.cover
+*.py,cover
+.hypothesis/
+.pytest_cache/
+cover/
+
+# Translations
+*.mo
+*.pot
+
+# Django stuff:
+*.log
+local_settings.py
+db.sqlite3
+db.sqlite3-journal
+
+# Flask stuff:
+instance/
+.webassets-cache
+
+# Scrapy stuff:
+.scrapy
+
+# Sphinx documentation
+docs/_build/
+
+# PyBuilder
+.pybuilder/
+target/
+
+# Jupyter Notebook
+.ipynb_checkpoints
+
+# IPython
+profile_default/
+ipython_config.py
+
+# pyenv
+#   For a library or package, you might want to ignore these files since the code is
+#   intended to run in multiple environments; otherwise, check them in:
+# .python-version
+
+# pipenv
+#   According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
+#   However, in case of collaboration, if having platform-specific dependencies or dependencies
+#   having no cross-platform support, pipenv may install dependencies that don't work, or not
+#   install all needed dependencies.
+#Pipfile.lock
+
+# PEP 582; used by e.g. github.com/David-OConnor/pyflow
+__pypackages__/
+
+# Celery stuff
+celerybeat-schedule
+celerybeat.pid
+
+# SageMath parsed files
+*.sage.py
+
+# Environments
+.env
+.venv
+env/
+venv/
+ENV/
+env.bak/
+venv.bak/
+
+# Spyder project settings
+.spyderproject
+.spyproject
+
+# Rope project settings
+.ropeproject
+
+# mkdocs documentation
+/site
+
+# mypy
+.mypy_cache/
+.dmypy.json
+dmypy.json
+
+# Pyre type checker
+.pyre/
+
+# pytype static type analyzer
+.pytype/
+
+# Cython debug symbols
+cython_debug/
+
+### JetBrains template
+# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider
+# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
+
+# User-specific stuff
+.idea/**/workspace.xml
+.idea/**/tasks.xml
+.idea/**/usage.statistics.xml
+.idea/**/dictionaries
+.idea/**/shelf
+
+# Generated files
+.idea/**/contentModel.xml
+
+# Sensitive or high-churn files
+.idea/**/dataSources/
+.idea/**/dataSources.ids
+.idea/**/dataSources.local.xml
+.idea/**/sqlDataSources.xml
+.idea/**/dynamic.xml
+.idea/**/uiDesigner.xml
+.idea/**/dbnavigator.xml
+
+# Gradle
+.idea/**/gradle.xml
+.idea/**/libraries
+
+# Gradle and Maven with auto-import
+# When using Gradle or Maven with auto-import, you should exclude module files,
+# since they will be recreated, and may cause churn.  Uncomment if using
+# auto-import.
+# .idea/artifacts
+# .idea/compiler.xml
+# .idea/jarRepositories.xml
+# .idea/modules.xml
+# .idea/*.iml
+# .idea/modules
+# *.iml
+# *.ipr
+
+# CMake
+cmake-build-*/
+
+# Mongo Explorer plugin
+.idea/**/mongoSettings.xml
+
+# File-based project format
+*.iws
+
+# IntelliJ
+out/
+
+# mpeltonen/sbt-idea plugin
+.idea_modules/
+
+# JIRA plugin
+atlassian-ide-plugin.xml
+
+# Cursive Clojure plugin
+.idea/replstate.xml
+
+# Crashlytics plugin (for Android Studio and IntelliJ)
+com_crashlytics_export_strings.xml
+crashlytics.properties
+crashlytics-build.properties
+fabric.properties
+
+# Editor-based Rest Client
+.idea/httpRequests
+
+# Android studio 3.1+ serialized cache file
+.idea/caches/build_file_checksums.ser
+dependency-reduced-pom.xml
+/src/main/scala/com/winhc/bigdata/flink/xjk/

+ 396 - 0
pom.xml

@@ -0,0 +1,396 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>com.winhc.bigdata</groupId>
+    <artifactId>flink-winhc_flink</artifactId>
+    <packaging>jar</packaging>
+    <version>1.0</version>
+
+    <properties>
+
+        <flink.version>1.12.3</flink.version>
+        <scala.version>2.11.12</scala.version>
+        <scala.binary.version>2.11</scala.binary.version>
+        <log4j.version>2.12.1</log4j.version>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+        <!--        <connector.version>1.12-vvr-3.0.2</connector.version>-->
+        <connector.version>1.13-vvr-4.0.7-SNAPSHOT</connector.version>
+        <scope>compile</scope>
+        <!--        <scope>provided</scope>-->
+    </properties>
+
+    <repositories>
+        <repository>
+            <id>oss.sonatype.org-snapshot</id>
+            <name>OSS Sonatype Snapshot Repository</name>
+            <url>http://oss.sonatype.org/content/repositories/snapshots</url>
+            <releases>
+                <enabled>false</enabled>
+            </releases>
+            <snapshots>
+                <enabled>true</enabled>
+            </snapshots>
+        </repository>
+        <repository>
+            <id>apache.snapshots</id>
+            <name>Apache Development Snapshot Repository</name>
+            <url>https://repository.apache.org/content/repositories/snapshots/</url>
+            <releases>
+                <enabled>false</enabled>
+            </releases>
+            <snapshots>
+                <enabled>true</enabled>
+            </snapshots>
+        </repository>
+    </repositories>
+
+    <dependencies>
+        <!-- Apache Flink dependencies -->
+        <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-scala_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>${scope}</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>${scope}</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-clients_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>${scope}</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>${scope}</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>${scope}</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+            <version>${flink.version}</version>
+            <scope>${scope}</scope>
+        </dependency>
+
+        <!-- Scala Library, provided by Flink as well. -->
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <version>${scala.version}</version>
+            <scope>${scope}</scope>
+        </dependency>
+
+        <!-- Add connector dependencies here. They must be in the default scope (compile). -->
+
+
+        <!-- Add logging framework, to produce console output when running in the IDE. -->
+        <!-- These dependencies are excluded from the application JAR by default. -->
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-slf4j-impl</artifactId>
+            <version>${log4j.version}</version>
+            <scope>runtime</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-api</artifactId>
+            <version>${log4j.version}</version>
+            <scope>runtime</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-core</artifactId>
+            <version>${log4j.version}</version>
+            <scope>runtime</scope>
+        </dependency>
+
+
+        <dependency>
+            <groupId>org.yaml</groupId>
+            <artifactId>snakeyaml</artifactId>
+            <version>1.17</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>dns-cache-manipulator</artifactId>
+            <version>1.5.1</version>
+        </dependency>
+        <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+            <version>1.2.78</version>
+        </dependency>
+
+
+        <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <version>1.18.20</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>cn.hutool</groupId>
+            <artifactId>hutool-all</artifactId>
+            <version>5.7.10</version>
+        </dependency>
+        <dependency>
+            <groupId>org.json4s</groupId>
+            <artifactId>json4s-jackson_${scala.binary.version}</artifactId>
+            <version>3.7.0-M7</version>
+        </dependency>
+
+
+        <dependency>
+            <groupId>com.alibaba.ververica</groupId>
+            <artifactId>ververica-connector-odps</artifactId>
+            <version>${connector.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.alibaba.ververica</groupId>
+            <artifactId>ververica-connector-hologres</artifactId>
+            <version>${connector.version}</version>
+
+         <!--   <exclusions>
+                <exclusion>
+                    <groupId>com.alibaba.hologres</groupId>
+                    <artifactId>holo-client</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.alibaba.blink</groupId>
+                    <artifactId>hologres-client</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.aliyun.datahub</groupId>
+                    <artifactId>aliyun-sdk-datahub-holo</artifactId>
+                </exclusion>
+            </exclusions>-->
+
+        </dependency>
+      <!--  <dependency>
+            <groupId>com.alibaba.hologres</groupId>
+            <artifactId>holo-client</artifactId>
+            <version>1.2.13.6</version>
+        </dependency>-->
+
+
+        <dependency>
+            <groupId>com.alibaba.ververica</groupId>
+            <artifactId>ververica-connector-elasticsearch6</artifactId>
+            <version>${connector.version}</version>
+         <!--   <exclusions>
+                <exclusion>
+                    <groupId>org.apache.flink</groupId>
+                    <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
+                </exclusion>
+            </exclusions>-->
+        </dependency>
+
+        <dependency>
+            <groupId>com.alibaba.ververica</groupId>
+            <artifactId>ververica-connector-cloudhbase</artifactId>
+            <version>${connector.version}</version>
+          <!--  <exclusions>
+                <exclusion>
+                    <groupId>com.aliyun.hbase</groupId>
+                    <artifactId>alihbase-client</artifactId>
+                </exclusion>
+            </exclusions>-->
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba.ververica</groupId>
+            <artifactId>ververica-connector-kafka</artifactId>
+            <version>${connector.version}</version>
+           <!-- <exclusions>
+                <exclusion>
+                    <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
+                    <groupId>org.apache.flink</groupId>
+                </exclusion>
+            </exclusions>-->
+        </dependency>
+
+       <!-- <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
+            <scope>test</scope>
+            <version>${flink.version}</version>
+        </dependency>-->
+
+    </dependencies>
+
+    <build>
+        <finalName>winhc_flink-${project.version}</finalName>
+        <plugins>
+            <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
+            <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>3.1.1</version>
+                <executions>
+                    <!-- Run shade goal on package phase -->
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <artifactSet>
+                                <excludes>
+                                    <exclude>org.apache.flink:force-shading</exclude>
+                                    <exclude>com.google.code.findbugs:jsr305</exclude>
+                                    <exclude>org.slf4j:*</exclude>
+                                    <exclude>org.apache.logging.log4j:*</exclude>
+                                </excludes>
+                            </artifactSet>
+                            <filters>
+                                <filter>
+                                    <!-- Do not copy the signatures in the META-INF folder.
+                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                            <transformers>
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                    <mainClass>org.myorg.quickstart.StreamingJob</mainClass>
+                                </transformer>
+
+                                <!-- The service transformer is needed to merge META-INF/services files -->
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer">
+                                    <projectName>Apache Flink</projectName>
+                                    <encoding>UTF-8</encoding>
+                                </transformer>
+
+                            </transformers>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+
+            <!-- Java Compiler -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.1</version>
+                <configuration>
+                    <source>1.8</source>
+                    <target>1.8</target>
+                </configuration>
+            </plugin>
+
+            <!-- Scala Compiler -->
+            <plugin>
+                <groupId>net.alchim31.maven</groupId>
+                <artifactId>scala-maven-plugin</artifactId>
+                <version>3.2.2</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>compile</goal>
+                            <goal>testCompile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <args>
+                        <arg>-nobootcp</arg>
+                    </args>
+                </configuration>
+            </plugin>
+
+            <!-- Eclipse Scala Integration -->
+            <!--            <plugin>-->
+            <!--                <groupId>org.apache.maven.plugins</groupId>-->
+            <!--                <artifactId>maven-eclipse-plugin</artifactId>-->
+            <!--                <version>2.8</version>-->
+            <!--                <configuration>-->
+            <!--                    <downloadSources>true</downloadSources>-->
+            <!--                    <projectnatures>-->
+            <!--                        <projectnature>org.scala-ide.sdt.core.scalanature</projectnature>-->
+            <!--                        <projectnature>org.eclipse.jdt.core.javanature</projectnature>-->
+            <!--                    </projectnatures>-->
+            <!--                    <buildcommands>-->
+            <!--                        <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>-->
+            <!--                    </buildcommands>-->
+            <!--                    <classpathContainers>-->
+            <!--                        <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>-->
+            <!--                        <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>-->
+            <!--                    </classpathContainers>-->
+            <!--                    <excludes>-->
+            <!--                        <exclude>org.scala-lang:scala-library</exclude>-->
+            <!--                        <exclude>org.scala-lang:scala-compiler</exclude>-->
+            <!--                    </excludes>-->
+            <!--                    <sourceIncludes>-->
+            <!--                        <sourceInclude>**/*.scala</sourceInclude>-->
+            <!--                        <sourceInclude>**/*.java</sourceInclude>-->
+            <!--                    </sourceIncludes>-->
+            <!--                </configuration>-->
+            <!--            </plugin>-->
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <version>1.7</version>
+                <executions>
+                    <!-- Add src/main/scala to eclipse build path -->
+                    <execution>
+                        <id>add-source</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>src/main/scala</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                    <!-- Add src/test/scala to eclipse build path -->
+                    <execution>
+                        <id>add-test-source</id>
+                        <phase>generate-test-sources</phase>
+                        <goals>
+                            <goal>add-test-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>src/test/scala</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>

+ 13 - 0
src/main/java/com/winhc/bigdata/filnk/java/BaseEntity.java

@@ -0,0 +1,13 @@
+package com.winhc.bigdata.filnk.java;
+
+import org.apache.flink.types.Row;
+
+/**
+ * @author: XuJiakai
+ * 2021/8/31 17:32
+ */
+public interface BaseEntity {
+
+    Row toRow();
+
+}

+ 35 - 0
src/main/java/com/winhc/bigdata/filnk/java/CompanyData.java

@@ -0,0 +1,35 @@
+//package com.winhc.bigdata.filnk;
+//
+//import com.alibaba.fastjson.JSONObject;
+//import com.alibaba.fastjson.serializer.SerializerFeature;
+//import lombok.AllArgsConstructor;
+//import lombok.Getter;
+//import lombok.NoArgsConstructor;
+//import lombok.Setter;
+//
+//import java.util.List;
+//import java.util.Map;
+//
+///**
+// * @author: XuJiakai
+// * 2021/9/1 09:40
+// */
+//@Getter
+//@Setter
+//@AllArgsConstructor
+//@NoArgsConstructor
+//public class CompanyData {
+//    private Map<String, List<BaseEntity>> content;
+//    private String companyId;
+//    private String companyName;
+//    private Map<String, List<BaseEntity>> oldContent;
+//    /**
+//     * json格式
+//     */
+//    private String otherInfo;
+//
+//    @Override
+//    public String toString() {
+//        return JSONObject.toJSONString(this, SerializerFeature.WriteMapNullValue);
+//    }
+//}

Datei-Diff unterdrückt, da er zu groß ist
+ 89 - 0
src/main/java/com/winhc/bigdata/filnk/java/company_data_entity/Company.java


+ 49 - 0
src/main/java/com/winhc/bigdata/filnk/java/company_data_entity/CompanyHolder.java

@@ -0,0 +1,49 @@
+
+package com.winhc.bigdata.filnk.java.company_data_entity;
+
+import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.winhc.bigdata.filnk.java.BaseEntity;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import org.apache.flink.types.Row;
+import java.util.Date;
+/**
+ * @author: XuJiakai
+ * 2021/8/31 17:32
+ */
+@Getter
+@Setter
+@AllArgsConstructor
+@NoArgsConstructor
+public class CompanyHolder  implements BaseEntity {
+
+ private String rowkey;
+ private String companyId;
+ private String companyName;
+ private String holderName;
+ private String holderId;
+ private Long holderType;
+ private Double amount;
+ private String capital;
+ private String capitalActual;
+ private Double percent;
+ private Date createTime;
+ private Date updateTime;
+ private Long deleted;
+
+
+@Override
+    public String toString() {
+        return JSONObject.toJSONString(this, SerializerFeature.WriteMapNullValue);
+    }
+    
+    @Override
+    public Row toRow() {
+        return Row.of(rowkey,companyId,companyName,holderName,holderId,holderType,amount,capital,capitalActual,percent,createTime,updateTime,deleted);
+    }
+
+}
+    

+ 45 - 0
src/main/java/com/winhc/bigdata/filnk/java/company_data_entity/CompanyStaff.java

@@ -0,0 +1,45 @@
+
+package com.winhc.bigdata.filnk.java.company_data_entity;
+
+import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.winhc.bigdata.filnk.java.BaseEntity;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import org.apache.flink.types.Row;
+import java.util.Date;
+/**
+ * @author: XuJiakai
+ * 2021/8/31 17:32
+ */
+@Getter
+@Setter
+@AllArgsConstructor
+@NoArgsConstructor
+public class CompanyStaff  implements BaseEntity {
+
+ private String rowkey;
+ private String companyId;
+ private String companyName;
+ private String hid;
+ private String staffName;
+ private String staffType;
+ private Date createTime;
+ private Date updateTime;
+ private Long deleted;
+
+
+@Override
+    public String toString() {
+        return JSONObject.toJSONString(this, SerializerFeature.WriteMapNullValue);
+    }
+    
+    @Override
+    public Row toRow() {
+        return Row.of(rowkey,companyId,companyName,hid,staffName,staffType,createTime,updateTime,deleted);
+    }
+
+}
+    

+ 68 - 0
src/main/java/com/winhc/bigdata/filnk/java/company_data_entity/CompanyTm.java

@@ -0,0 +1,68 @@
+
+package com.winhc.bigdata.filnk.java.company_data_entity;
+
+import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.winhc.bigdata.filnk.java.BaseEntity;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import org.apache.flink.types.Row;
+import java.util.Date;
+/**
+ * @author: XuJiakai
+ * 2021/8/31 17:32
+ */
+@Getter
+@Setter
+@AllArgsConstructor
+@NoArgsConstructor
+public class CompanyTm  implements BaseEntity {
+
+ private String rowkey;
+ private String keyno;
+ private String regNo;
+ private Long intCls;
+ private String tmName;
+ private Date appDate;
+ private String applicantCn;
+ private String addressCn;
+ private String applicant1;
+ private String applicant2;
+ private String applicantEn;
+ private String addressEn;
+ private Long announcemenIssue;
+ private Date announcementDate;
+ private Long regIssue;
+ private Date regDate;
+ private Date privateDateStart;
+ private Date privateDateEnd;
+ private String agent;
+ private Long category;
+ private String hqzdrq;
+ private String gjzcrq;
+ private String yxqrq;
+ private String color;
+ private Long status;
+ private String source;
+ private String secondClassCode;
+ private String appYear;
+ private String logo;
+ private Date createTime;
+ private Date updateTime;
+ private Long deleted;
+
+
+@Override
+    public String toString() {
+        return JSONObject.toJSONString(this, SerializerFeature.WriteMapNullValue);
+    }
+    
+    @Override
+    public Row toRow() {
+        return Row.of(rowkey,keyno,regNo,intCls,tmName,appDate,applicantCn,addressCn,applicant1,applicant2,applicantEn,addressEn,announcemenIssue,announcementDate,regIssue,regDate,privateDateStart,privateDateEnd,agent,category,hqzdrq,gjzcrq,yxqrq,color,status,source,secondClassCode,appYear,logo,createTime,updateTime,deleted);
+    }
+
+}
+    

+ 26 - 0
src/main/java/com/winhc/bigdata/filnk/java/constant/EnvConst.java

@@ -0,0 +1,26 @@
+package com.winhc.bigdata.filnk.java.constant;
+
+import com.winhc.bigdata.filnk.java.utils.EnvironmentProperties;
+
+import java.util.Map;
+
+/**
+ * @author: XuJiakai
+ * 2021/8/31 16:17
+ */
+public class EnvConst {
+    private static final EnvironmentProperties env = new EnvironmentProperties();
+
+    public static Map<String, String> getMapByPrefix(String prefix) {
+        return env.getMapByPrefix(prefix);
+    }
+
+    public static String getValue(String key) {
+        return getValue(key, null);
+    }
+
+    public static String getValue(String key, String orDefault) {
+        String value = env.getValue(key);
+        return value == null ? orDefault : value;
+    }
+}

+ 46 - 0
src/main/java/com/winhc/bigdata/filnk/java/entity/CompanyIndexEntity.java

@@ -0,0 +1,46 @@
+package com.winhc.bigdata.filnk.java.entity;
+
+import java.util.List;
+
+/**
+ * @author: XuJiakai
+ * 2021/9/1 17:38
+ */
+public class CompanyIndexEntity {
+    private String _id;
+    private CompanyName cname;
+    private List<CompanyName> history_name;
+    private Long new_cid;
+    private String legal_entity_id;
+    private Integer legal_entity_type;
+    private String legal_entity_name;
+    private List<Entity> holder;
+    private List<Entity> staff;
+    private String province_code;
+    private String city_code;
+    private String county_code;
+    private Long estiblish_time;
+    private String category_first_code;
+    private String category_second_code;
+    private String category_third_code;
+    private String reg_status;
+    private String reg_status_std;
+    private String company_org_type_std;
+    private String company_type;
+    private String credit_code;
+    private String reg_capital;
+    private Double reg_capital_amount;
+    private String reg_location;
+    private List<String> phones;
+    private List<String> emails;
+    private String geo;
+    private String logo;
+    private Double company_sort_score;
+    private Double company_rank;
+    private String reg_number;
+    private List<String> company_tm;
+    private List<String> app_info;
+    private List<String> icp;
+    private String deleted;
+    private Double company_score_weight;
+}

+ 19 - 0
src/main/java/com/winhc/bigdata/filnk/java/entity/CompanyName.java

@@ -0,0 +1,19 @@
+package com.winhc.bigdata.filnk.java.entity;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+/**
+ * @author: XuJiakai
+ * 2021/9/1 17:40
+ */
+@Getter
+@Setter
+@AllArgsConstructor
+@NoArgsConstructor
+public class CompanyName {
+    private String show;
+    private String value;
+}

+ 20 - 0
src/main/java/com/winhc/bigdata/filnk/java/entity/Entity.java

@@ -0,0 +1,20 @@
+package com.winhc.bigdata.filnk.java.entity;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+/**
+ * @author: XuJiakai
+ * 2021/9/1 17:42
+ */
+@Getter
+@Setter
+@AllArgsConstructor
+@NoArgsConstructor
+public class Entity {
+    private String id;
+    private Integer type;
+    private String name;
+}

+ 94 - 0
src/main/java/com/winhc/bigdata/filnk/java/sink/OdpsSinkBuilder4J.java

@@ -0,0 +1,94 @@
+package com.winhc.bigdata.filnk.java.sink;
+
+import com.alibaba.ververica.connectors.common.sink.TupleOutputFormatSinkFunction;
+import com.alibaba.ververica.connectors.odps.sink.OdpsOutputFormat;
+import com.aliyun.odps.Column;
+import com.aliyun.odps.Odps;
+import com.aliyun.odps.Table;
+import com.aliyun.odps.account.AliyunAccount;
+import com.aliyun.odps.type.TypeInfo;
+import com.winhc.bigdata.filnk.java.constant.EnvConst;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
+
+/**
+ * @author: XuJiakai
+ * 2021/8/31 16:12
+ */
+public class OdpsSinkBuilder4J {
+
+    /**
+     * 如果是动态分区只需要指定 ds
+     *
+     * @param tableName
+     * @param partition ds  或 ds=20210903
+     * @return
+     */
+    public static SinkFunction<Tuple2<Boolean, Row>> buildSinkFunction(String tableName, String partition) {
+        String endPoint = EnvConst.getValue("odps.url.end-point");
+        String tunnelEndPoint = EnvConst.getValue("odps.url.tunnel-end-point");
+        String accessId = EnvConst.getValue("odps.prod-account.access-id");
+        String accessKey = EnvConst.getValue("odps.prod-account.access-key");
+
+        String project = EnvConst.getValue("odps.default-project");
+        String tn = tableName;
+        if (tableName.contains(".")) {
+            String[] strings = tableName.split(".");
+            project = strings[0];
+            tn = strings[1];
+        }
+
+
+        DescriptorProperties properties = new DescriptorProperties();
+        properties.putString("endpoint", endPoint);
+        properties.putString("tunnelEndpoint", tunnelEndPoint);
+        properties.putString("project", project);
+        properties.putString("tablename", tn);
+        properties.putString("accessid", accessId);
+        properties.putString("accesskey", accessKey);
+        if (partition != null) {
+            properties.putString("partition", partition);
+        }
+
+        AliyunAccount account = new AliyunAccount(accessId, accessKey);
+        Odps odps = new Odps(account);
+        odps.setEndpoint(endPoint);
+        odps.setDefaultProject(project);
+        Table table = odps.tables().get(tableName);
+
+
+        TableSchema.Builder builder = TableSchema.builder();
+        for (Column column : table.getSchema().getColumns()) {
+            builder.field(column.getName(), switchOdpsType(column.getTypeInfo()));
+        }
+        if (StringUtils.isNotBlank(partition) && !partition.contains("=")) {
+            builder.field("ds", DataTypes.STRING());
+        }
+
+        return new TupleOutputFormatSinkFunction<>(
+                new OdpsOutputFormat(builder.build(), properties)
+        );
+    }
+
+    private static DataType switchOdpsType(TypeInfo typeInfo) {
+        switch (typeInfo.getOdpsType()) {
+            case STRING:
+                return DataTypes.STRING();
+            case INT:
+                return DataTypes.INT();
+            case DOUBLE:
+                return DataTypes.DOUBLE();
+            case BIGINT:
+                return DataTypes.BIGINT();
+            default:
+                throw new RuntimeException("data type is not fount !" + typeInfo.getTypeName());
+        }
+    }
+
+}

+ 35 - 0
src/main/java/com/winhc/bigdata/filnk/java/source/KafkaSourceBuilder4J.java

@@ -0,0 +1,35 @@
+package com.winhc.bigdata.filnk.java.source;
+
+import com.winhc.bigdata.filnk.java.constant.EnvConst;
+import org.apache.flink.connector.kafka.source.KafkaSource;
+import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
+import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetResetStrategy;
+
+/**
+ * @author: XuJiakai
+ * 2021/9/1 16:36
+ */
+public class KafkaSourceBuilder4J {
+    public static KafkaSource<String> buildSourceFunction(String topic) {
+        String profile = EnvConst.getValue("profile.activate");
+        OffsetsInitializer offsets;
+        if (profile.equals("dev")) {
+            offsets = OffsetsInitializer.earliest();
+        } else {
+            offsets = OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST);
+        }
+        return buildSourceFunction(topic, offsets);
+    }
+
+    public static KafkaSource<String> buildSourceFunction(String topic, OffsetsInitializer offsetsInitializer) {
+        String bootstrapServers = EnvConst.getValue("winhc.bootstrap-servers");
+        String groupId = EnvConst.getValue("winhc.group-id");
+        KafkaSource<String> build = KafkaSource.<String>builder()
+                .setBootstrapServers(bootstrapServers)
+                .setGroupId(groupId)
+                .setTopics(topic)
+                .setStartingOffsets(offsetsInitializer)
+                .build();
+        return build;
+    }
+}

+ 68 - 0
src/main/java/com/winhc/bigdata/filnk/java/source/OdpsSourceBuilder4J.java

@@ -0,0 +1,68 @@
+package com.winhc.bigdata.filnk.java.source;
+
+import com.alibaba.ververica.connectors.odps.ODPSStreamSource;
+import com.alibaba.ververica.connectors.odps.OdpsConf;
+import com.alibaba.ververica.connectors.odps.OdpsOptions;
+import com.alibaba.ververica.connectors.odps.schema.ODPSColumn;
+import com.aliyun.odps.Column;
+import com.aliyun.odps.Odps;
+import com.aliyun.odps.Table;
+import com.aliyun.odps.account.AliyunAccount;
+import com.aliyun.odps.utils.StringUtils;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.GenericRowData;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * @author: XuJiakai
+ * 2021/8/30 17:29
+ */
+public class OdpsSourceBuilder4J {
+    private Configuration conf;
+    private Odps odps;
+
+    private String tableName;
+    private List<String> ds;
+    private String accessId;
+    private String accessKey;
+    private String endPoint;
+    private String projectName;
+    private String tunnelEndpoint;
+
+    public OdpsSourceBuilder4J(Configuration conf) {
+        this.conf = conf;
+
+        this.tableName = conf.getString(OdpsOptions.TABLE_NAME);
+        this.ds = Arrays.stream(conf.getString(OdpsOptions.PARTITION, "").split(","))
+                .filter(StringUtils::isNotBlank).collect(Collectors.toList());
+        this.accessId = conf.getString(OdpsOptions.ACCESS_ID);
+        this.accessKey = conf.getString(OdpsOptions.ACCESS_KEY);
+        this.endPoint = conf.getString(OdpsOptions.END_POINT);
+        this.projectName = conf.getString(OdpsOptions.PROJECT_NAME);
+        this.tunnelEndpoint = conf.getString(OdpsOptions.TUNNEL_END_POINT);
+
+
+        AliyunAccount account = new AliyunAccount(accessId, accessKey);
+        Odps odps = new Odps(account);
+        odps.setEndpoint(endPoint);
+        odps.setDefaultProject(projectName);
+        this.odps = odps;
+    }
+
+    public ODPSStreamSource buildSourceFunction() {
+        OdpsConf odpsConf = new OdpsConf(accessId, accessKey, endPoint, projectName, tunnelEndpoint);
+        Table table = odps.tables().get(tableName);
+
+        List<Column> columns = table.getSchema().getColumns();
+        ODPSColumn[] odpsColumns = columns.stream().map(c -> new ODPSColumn(c.getName(), c.getTypeInfo().getOdpsType())).collect(Collectors.toList())
+                .toArray(new ODPSColumn[columns.size()]);
+
+        TypeInformation info = TypeInformation.of(GenericRowData.class);
+        ODPSStreamSource source = new ODPSStreamSource(odpsConf, tableName, odpsColumns, ds, info, 1000L, 1000L);
+        return source;
+    }
+}

+ 100 - 0
src/main/java/com/winhc/bigdata/filnk/java/utils/EnvironmentProperties.java

@@ -0,0 +1,100 @@
+package com.winhc.bigdata.filnk.java.utils;
+
+import org.apache.commons.lang3.StringUtils;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * @author: XuJiakai
+ * 2021/8/31 09:32
+ */
+public class EnvironmentProperties {
+    private static final String fileName = "env.yml";
+
+    private Map<String, String> source;
+
+    public String getValue(String key) {
+        return source.getOrDefault(key, null);
+    }
+
+    public Map<String, String> getMapByPrefix(String prefix) {
+        return source.entrySet().stream().filter(e->e.getKey().startsWith(prefix)).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue,(e1, e2)->e1));
+    }
+
+    public EnvironmentProperties() {
+        Map<String, String> source = getYmlByFileName(null,null);
+        String profile = source.getOrDefault("profile.activate", null);
+        if (StringUtils.isNotBlank(profile)) {
+            source = getYmlByFileName("env-" + profile + ".yml", source);
+        }
+        this.source = source;
+    }
+
+    /**
+     * 根据文件名获取yml的文件内容
+     *
+     * @return
+     */
+    private static Map<String, String> getYmlByFileName(String file, Map<String, String> source) {
+        if (source == null) {
+            source = new HashMap<>();
+        }
+        if (file == null)
+            file = fileName;
+        InputStream in = EnvironmentProperties.class.getClassLoader().getResourceAsStream(file);
+        Yaml props = new Yaml();
+        Object obj = props.loadAs(in, Map.class);
+        Map<String, Object> param = (Map<String, Object>) obj;
+
+        for (Map.Entry<String, Object> entry : param.entrySet()) {
+            String key = entry.getKey();
+            Object val = entry.getValue();
+
+            if (val instanceof Map) {
+                forEachYaml(source, key, (Map<String, Object>) val);
+            } else {
+                source.put(key, val.toString());
+            }
+        }
+        return source;
+    }
+
+    /**
+     * 遍历yml文件,获取map集合
+     *
+     * @param key_str
+     * @param obj
+     * @return
+     */
+    private static Map<String, String> forEachYaml(Map<String, String> source, String key_str, Map<String, Object> obj) {
+        for (Map.Entry<String, Object> entry : obj.entrySet()) {
+            String key = entry.getKey();
+            Object val = entry.getValue();
+
+            String str_new = "";
+            if (StringUtils.isNotBlank(key_str)) {
+                str_new = key_str + "." + key;
+            } else {
+                str_new = key;
+            }
+            if (val instanceof Map) {
+                forEachYaml(source, str_new, (Map<String, Object>) val);
+            } else {
+                source.put(str_new, val.toString());
+            }
+        }
+        return source;
+    }
+
+
+    public static void main(String[] args) {
+        EnvironmentProperties environmentProperties = new EnvironmentProperties();
+        String value = environmentProperties.getValue("es.a");
+        System.out.println(value);
+    }
+
+}

+ 25 - 0
src/main/java/com/winhc/bigdata/filnk/java/utils/FieldNameUtils.java

@@ -0,0 +1,25 @@
+package com.winhc.bigdata.filnk.java.utils;
+
+import java.util.Locale;
+
+/**
+ * @author: XuJiakai
+ * 2021/8/31 18:26
+ */
+public class FieldNameUtils {
+    public static String hump2Underline(String str) {
+        if (str == null) {
+            return null;
+        }
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < str.length(); i++) {
+            String s = str.charAt(i) + "";
+            if (s.toUpperCase().equals(s) && i != 0) {
+                sb.append('_');
+            }
+            sb.append(s);
+        }
+        return sb.toString().toLowerCase(Locale.ROOT);
+    }
+
+}

+ 102 - 0
src/main/java/com/winhc/bigdata/filnk/java/utils/Json2EntityUtils.java

@@ -0,0 +1,102 @@
+package com.winhc.bigdata.filnk.java.utils;
+
+import cn.hutool.core.util.ClassUtil;
+import com.alibaba.fastjson.JSONObject;
+import com.winhc.bigdata.filnk.java.BaseEntity;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+/**
+ * @author: XuJiakai
+ * 2021/8/31 18:03
+ */
+public class Json2EntityUtils {
+    private static final Map<String, ? extends Class<?>> collect;
+
+    static {
+        Set<Class<?>> c = ClassUtil.scanPackageBySuper("com.winhc.bigdata.filnk.java", BaseEntity.class);
+        collect = c.stream().collect(Collectors.toMap(e -> FieldNameUtils.hump2Underline(e.getSimpleName()), e -> e));
+    }
+
+    public static BaseEntity parseEntity(String tn, String json) {
+        BaseEntity o = ((BaseEntity) JSONObject.parseObject(json, collect.get(tn)));
+        return o;
+    }
+
+    public static List<BaseEntity> parseArrayEntity(String tn, String json) {
+        List<?> objects = JSONObject.parseArray(json, collect.get(tn));
+        return ((List<BaseEntity>) objects);
+    }
+
+    public static Class<BaseEntity> getClazz(String tn) {
+        return ((Class<BaseEntity>) collect.get(tn));
+    }
+
+    public static List<String> getTn() {
+        return new ArrayList<>(collect.keySet());
+    }
+
+    public static void main(String[] args) {
+        String json = "{\n" +
+                "    \"ACTUAL_CAPITAL_AMOUNT\": \"\",\n" +
+                "    \"LNG\": \"118.74012977757219\",\n" +
+                "    \"NEW_CID\": \"3031495173\",\n" +
+                "    \"CREDIT_CODE\": \"91320100MA1N042DXN\",\n" +
+                "    \"LOGO\": \"/logo/lll/135d6a29aa603f758a4297c71021731b.png@!f_200x200\",\n" +
+                "    \"HISTORY_NAMES\": \"\",\n" +
+                "    \"PROVINCE_CODE\": \"\",\n" +
+                "    \"WECHAT_PUBLIC_NUM\": \"\",\n" +
+                "    \"ACTUAL_CAPITAL_CURRENCY\": \"人民币\",\n" +
+                "    \"rowkey\": \"00e30d8f19b9a54d1fbdf561a339918a\",\n" +
+                "    \"CREATE_TIME\": \"2021-02-13 03:05:53\",\n" +
+                "    \"NAME_ALIAS\": \"懿馨四\",\n" +
+                "    \"REG_STATUS\": \"注销\",\n" +
+                "    \"CITY_CODE\": \"\",\n" +
+                "    \"REG_CAPITAL_AMOUNT\": \"152400\",\n" +
+                "    \"REG_INSTITUTE\": \"南京市市场监督管理局\",\n" +
+                "    \"REG_STATUS_STD\": \"存续\",\n" +
+                "    \"CATE_FIRST_CODE\": \"28\",\n" +
+                "    \"ESTIBLISH_TIME\": \"2016-11-17 00:00:00\",\n" +
+                "    \"LEGAL_ENTITY_ID\": \"f3e1b78560e5c5f968559ad810c604b5\",\n" +
+                "    \"TO_TIME\": \"\",\n" +
+                "    \"CATE_SECOND_CODE\": \"63\",\n" +
+                "    \"NAME\": \"南京懿馨四一八号文化艺术品投资合伙企业(有限合伙)\",\n" +
+                "    \"BUSINESS_SCOPE\": \"文化艺术品投资;艺术品、工艺品销售。(依法须经批准的项目,经相关部门批准后方可开展经营活动)\",\n" +
+                "    \"REG_CAPITAL\": \"0.1524万元人民币\",\n" +
+                "    \"CANCEL_DATE\": \"2021-02-11 00:00:00\",\n" +
+                "    \"NAME_EN\": \"\",\n" +
+                "    \"REG_NUMBER\": \"320100001729363\",\n" +
+                "    \"ORG_APPROVED_INSTITUTE\": \"\",\n" +
+                "    \"ORG_NUMBER\": \"MA1N042DX\",\n" +
+                "    \"REVOKE_REASON\": \"\",\n" +
+                "    \"REG_CAPITAL_CURRENCY\": \"人民币\",\n" +
+                "    \"COMPANY_TYPE\": \"1\",\n" +
+                "    \"SCORE\": \"4736\",\n" +
+                "    \"LEGAL_ENTITY_TYPE\": \"2\",\n" +
+                "    \"CANCEL_REASON\": \"其他原因\",\n" +
+                "    \"SOCIAL_SECURITY_STAFF_NUM\": \"0\",\n" +
+                "    \"DELETED\": \"0\",\n" +
+                "    \"EMAILS\": \"2221222014@qq.com\\t;\\tzxy@zixingyun.com\\t;\\tbenyue_ec@126.com\\t;\\t470936908@qq.com\\t;\\t\",\n" +
+                "    \"BASE\": \"js\",\n" +
+                "    \"PARENT_COMPANY_ID\": \"\",\n" +
+                "    \"REVOKE_DATE\": \"\",\n" +
+                "    \"APPROVED_TIME\": \"2021-02-11 00:00:00\",\n" +
+                "    \"PHONES\": \"025-58807399\\t;\\t\",\n" +
+                "    \"COUNTY_CODE\": \"\",\n" +
+                "    \"COMPANY_ID\": \"00e30d8f19b9a54d1fbdf561a339918a\",\n" +
+                "    \"REG_LOCATION\": \"南京市建邺区嘉陵江东街18号05栋7层\",\n" +
+                "    \"FROM_TIME\": \"2016-11-17 00:00:00\",\n" +
+                "    \"UPDATE_TIME\": \"2021-02-13 03:05:53\",\n" +
+                "    \"COMPANY_ORG_TYPE\": \"有限合伙企业\",\n" +
+                "    \"LEGAL_ENTITY_NAME\": \"南京犇越投资管理有限公司\",\n" +
+                "    \"CATE_THIRD_CODE\": \"250\",\n" +
+                "    \"LAT\": \"31.986383793889924\",\n" +
+                "    \"CRAWLED_TIME\": \"2021-02-13 03:05:52\"\n" +
+                "  }";
+
+        BaseEntity company = parseEntity("company", json);
+        System.out.println(company);
+
+    }
+}

+ 5 - 0
src/main/resources/dns-cache.properties

@@ -0,0 +1,5 @@
+hb-uf63a7d09rpl8mcvm-001.hbase.rds.aliyuncs.com=47.101.250.84
+hb-uf6m8e1nu4ivp06m5-master1-001.hbase.rds.aliyuncs.com=47.101.253.8
+hb-uf6m8e1nu4ivp06m5-master2-001.hbase.rds.aliyuncs.com=106.15.29.178
+hb-uf6m8e1nu4ivp06m5-master3-001.hbase.rds.aliyuncs.com=106.15.29.202
+hb-uf6m8e1nu4ivp06m5-core-001.hbase.rds.aliyuncs.com=106.15.29.252

+ 43 - 0
src/main/resources/env-dev.yml

@@ -0,0 +1,43 @@
+odps:
+  url:
+    end-point: http://service.cn-shanghai.maxcompute.aliyun.com/api
+    tunnel-end-point: http://dt.cn-shanghai.maxcompute.aliyun.com
+
+winhc:
+  holo:
+    endpoint: http://dfasjlk
+    batch-size: 10
+    username: ua
+    password: pwd
+    database: base
+    rpc-retries: 5
+    user-rpc-mode: true
+    ignore-delete: false
+    mutate-type: insertOrReplace
+  kafka:
+    bootstrap-servers: 47.101.221.131:9092
+  hbase:
+    config:
+      hbase.zookeeper.quorum: hb-proxy-pub-uf6m8e1nu4ivp06m5-master1-001.hbase.rds.aliyuncs.com:2181,hb-proxy-pub-uf6m8e1nu4ivp06m5-master2-001.hbase.rds.aliyuncs.com:2181,hb-proxy-pub-uf6m8e1nu4ivp06m5-master3-001.hbase.rds.aliyuncs.com:2181
+      hbase.client.scanner.timeout.period: 120000
+      hbase.client.retries.number: 5
+      hbase.client.pause: 1000
+      hbase.client.max.perserver.tasks: 10
+      hbase.client.max.perregion.tasks: 10
+      hbase.client.keyvalue.maxsize: 524288000
+      hbase.client.ipc.pool.size: 5
+      zookeeper.recovery.retry: 5
+
+  elasticsearch:
+    primary: new
+    rest:
+      old:
+        username: elastic
+        password: elastic_168
+        uris: es-cn-0pp0r32zf000ipovd.public.elasticsearch.aliyuncs.com:9200
+      new:
+        username: elastic
+        password: elastic_168
+        uris: es-cn-oew22t8bw002iferu.public.elasticsearch.aliyuncs.com:9200
+
+

+ 34 - 0
src/main/resources/env-prod.yml

@@ -0,0 +1,34 @@
+odps:
+  url:
+    end-point: http://service.cn-shanghai.maxcompute.aliyun-inc.com/api
+    tunnel-end-point: http://dt.cn-shanghai.maxcompute.aliyun-inc.com
+
+
+winhc:
+  kafka:
+    bootstrap-servers: 192.168.4.239:9092,192.168.4.241:9092,192.168.4.240:9092
+  hbase:
+    config:
+      hbase.zookeeper.quorum: hb-uf6m8e1nu4ivp06m5-master1-001.hbase.rds.aliyuncs.com:2181,hb-uf6m8e1nu4ivp06m5-master2-001.hbase.rds.aliyuncs.com:2181,hb-uf6m8e1nu4ivp06m5-master3-001.hbase.rds.aliyuncs.com:2181
+      hbase.client.scanner.timeout.period: 120000
+      hbase.client.retries.number: 5
+      hbase.client.pause: 1000
+      hbase.client.max.perserver.tasks: 10
+      hbase.client.max.perregion.tasks: 10
+      hbase.client.keyvalue.maxsize: 524288000
+      hbase.client.ipc.pool.size: 5
+      zookeeper.recovery.retry: 5
+
+  elasticsearch:
+    primary: new
+    rest:
+      old:
+        username: elastic
+        password: elastic_168
+        uris: es-cn-0pp0r32zf000ipovd.elasticsearch.aliyuncs.com:9200
+      new:
+        username: elastic
+        password: elastic_168
+        uris: es-cn-oew22t8bw002iferu.elasticsearch.aliyuncs.com:9200
+
+

+ 12 - 0
src/main/resources/env.yml

@@ -0,0 +1,12 @@
+profile:
+  activate: dev
+
+odps:
+  default-project: winhc_bigdata_learn_dev
+  prod-account:
+    access-id: LTAI4FynxS5nNuKyZ3LHhMX5
+    access-key: r6gWoySXC8kSK4qnfKRxEuWJ5uHIiE
+
+winhc:
+  kafka:
+    group-id: xjk_test

+ 14 - 0
src/main/scala/com/winhc/bigdata/flink/CompanyData.scala

@@ -0,0 +1,14 @@
+package com.winhc.bigdata.flink
+
+import com.winhc.bigdata.filnk.java.BaseEntity
+import com.winhc.bigdata.flink.implicits._
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/9/1 14:23
+ */
+case class CompanyData(companyId: String, companyName: String, content: Map[String, Seq[BaseEntity]]) {
+  var oldData: Map[String, Seq[BaseEntity]] = _
+
+  override def toString: String = this.toJson
+}

+ 32 - 0
src/main/scala/com/winhc/bigdata/flink/TestJob.scala

@@ -0,0 +1,32 @@
+package com.winhc.bigdata.flink
+
+import com.winhc.bigdata.flink.func.HbaseAsyncFunction
+import com.winhc.bigdata.flink.source.OdpsSourceBuilder
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction
+import org.apache.flink.streaming.api.scala.async.AsyncFunction
+import org.apache.flink.streaming.api.scala.{AsyncDataStream, DataStream, StreamExecutionEnvironment}
+import org.apache.flink.table.data.RowData
+
+import java.util.concurrent.TimeUnit
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/8/30 14:32
+ */
+object TestJob {
+  def main(args: Array[String]): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    val odpsSource = OdpsSourceBuilder.buildSourceFunction(tableName = "xjk_test_flink_ads_company", partition = "ds=20210803")
+    val source = env.addSource[RowData](odpsSource)
+    val value1: DataStream[(String, String)] = source.map(r => ("ng_company", r.getString(0).toString))
+    val function: AsyncFunction[(String, String), (String, String, String)] = HbaseAsyncFunction()
+    val value: DataStream[(String, String, String)] = AsyncDataStream.unorderedWait(value1, function, 100, TimeUnit.SECONDS, 10)
+
+
+    value.addSink(new PrintSinkFunction[(String, String, String)])
+
+    env.execute("test job")
+  }
+}

+ 66 - 0
src/main/scala/com/winhc/bigdata/flink/config/ElasticsearchConfig.scala

@@ -0,0 +1,66 @@
+package com.winhc.bigdata.flink.config
+
+import com.winhc.bigdata.filnk.java.constant.EnvConst
+import org.apache.flink.elasticsearch6.shaded.org.apache.http.HttpHost
+import org.apache.flink.elasticsearch6.shaded.org.apache.http.auth.{AuthScope, UsernamePasswordCredentials}
+import org.apache.flink.elasticsearch6.shaded.org.apache.http.client.config.RequestConfig
+import org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.client.BasicCredentialsProvider
+import org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.client.HttpAsyncClientBuilder
+import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.{RestClient, RestClientBuilder, RestHighLevelClient}
+
+import java.time.Duration
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/8/31 13:51
+ */
+object ElasticsearchConfig {
+  def getConfig(clusterName: String): (Array[HttpHost], RestClientBuilder.HttpClientConfigCallback , RestClientBuilder.RequestConfigCallback) = {
+    val hosts: Array[HttpHost] = EnvConst.getValue("winhc.elasticsearch.rest." + clusterName + ".uris")
+      .split(",").map(h => HttpHost.create(h))
+    val username = EnvConst.getValue("winhc.elasticsearch.rest." + clusterName + ".username")
+    val password = EnvConst.getValue("winhc.elasticsearch.rest." + clusterName + ".password")
+    val connectionTimeout = EnvConst.getValue("winhc.elasticsearch.rest." + clusterName + ".connection-timeout", "100s")
+    val readTimeout = EnvConst.getValue("winhc.elasticsearch.rest." + clusterName + ".read-timeout", "300s")
+
+
+    val credentialsProvider: BasicCredentialsProvider = new BasicCredentialsProvider
+    credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password))
+    val clientCallback: RestClientBuilder.HttpClientConfigCallback = new RestClientBuilder.HttpClientConfigCallback {
+      override def customizeHttpClient(httpAsyncClientBuilder: HttpAsyncClientBuilder): HttpAsyncClientBuilder = httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
+    }
+
+    val requestCallback: RestClientBuilder.RequestConfigCallback = new RestClientBuilder.RequestConfigCallback {
+      override def customizeRequestConfig(builder: RequestConfig.Builder): RequestConfig.Builder = {
+        builder.setConnectTimeout(Duration.parse(connectionTimeout).toMillis().toInt)
+        builder.setSocketTimeout(Duration.parse(readTimeout).toMillis().toInt)
+        builder
+      }
+    }
+    (hosts, clientCallback, requestCallback)
+  }
+
+
+  private def getClientBuilder(clusterName: String): RestClientBuilder = {
+    val tuple = getConfig(clusterName)
+
+    val hosts = tuple._1
+    val clientCallback = tuple._2
+    val requestCallback = tuple._3
+
+    val builder = RestClient.builder(hosts: _*)
+    builder.setHttpClientConfigCallback(clientCallback)
+    builder.setRequestConfigCallback(requestCallback)
+    builder
+  }
+
+  def getRestHighLevelClient(clusterName: String): RestHighLevelClient = {
+    val builder = getClientBuilder(clusterName)
+    new RestHighLevelClient(builder)
+  }
+
+  def getRestClient(clusterName: String): RestClient = {
+    getClientBuilder(clusterName).build()
+  }
+
+}

+ 23 - 0
src/main/scala/com/winhc/bigdata/flink/config/HbaseConfig.scala

@@ -0,0 +1,23 @@
+package com.winhc.bigdata.flink.config
+
+import com.alibaba.ververica.connector.shaded.hadoop3.org.apache.hadoop.conf.Configuration
+import com.winhc.bigdata.filnk.java.constant.EnvConst
+import org.apache.hadoop.hbase.HBaseConfiguration
+
+import java.util
+import scala.collection.JavaConverters._
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/8/31 17:11
+ */
+object HbaseConfig {
+  def getHbaseConfiguration(): Configuration = {
+    val map: util.Map[String, String] = EnvConst.getMapByPrefix("winhc.hbase.config")
+    val configuration: Configuration = HBaseConfiguration.create
+    for (elem <- map.asScala) {
+      configuration.set(elem._1.replace("winhc.hbase.config.", ""), elem._2)
+    }
+    configuration
+  }
+}

+ 15 - 0
src/main/scala/com/winhc/bigdata/flink/constant/CompanySummaryConst.scala

@@ -0,0 +1,15 @@
+package com.winhc.bigdata.flink.constant
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/8/31 14:39
+ */
+case class CompanySummaryConst(tn: String, companyIdField: String, aggField: String, aggKey: String, aliasName: String)
+
+object CompanySummaryConst {
+
+  private val seq = Seq(
+    CompanySummaryConst(tn = "company_staff", companyIdField = "company_id", aggField = "deleted", aggKey = "0", aliasName = "company_staff_del_0")
+  )
+  val companyGroupMap: Map[String, Seq[CompanySummaryConst]] = seq.groupBy(_.tn)
+}

+ 70 - 0
src/main/scala/com/winhc/bigdata/flink/func/ElasticsearchAsyncFunction.scala

@@ -0,0 +1,70 @@
+package com.winhc.bigdata.flink.func
+
+import com.winhc.bigdata.flink.config.ElasticsearchConfig
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestHighLevelClient
+import org.apache.flink.streaming.api.functions.async.{ResultFuture, RichAsyncFunction}
+
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/8/30 18:18
+ *        异步加载es数据
+ */
+case class ElasticsearchAsyncFunction(clusterName: String) extends RichAsyncFunction[(String, String), (String, String, Integer)] {
+  private var client: RestHighLevelClient = null;
+
+  private val docType = ""
+
+
+
+  override def open(parameters: Configuration): Unit = {
+    client = ElasticsearchConfig.getRestHighLevelClient(clusterName)
+  }
+
+  override def asyncInvoke(input: (String, String), resultFuture: ResultFuture[(String, String, Integer)]): Unit = ???
+
+  override def close(): Unit = {
+    client.close()
+  }
+
+
+
+
+
+  /*private def getSummary(tn: String, companyId: String): Int = {
+    val builder = SearchSourceBuilder.searchSource().size(0).query(QueryBuilders.termQuery(companyKey(tn), companyId))
+    val index = s"winhc_index_$tn"
+    new SearchRequest().indices(index)
+      .types(docType)
+      .source(builder)
+
+    import com.alibaba.fastjson.JSONObject
+    import org.elasticsearch.action.ActionListener
+    import org.elasticsearch.action.search.SearchResponse
+    import org.elasticsearch.search.SearchHit
+    import java.util.Collections
+    val listener = new ActionListener[SearchResponse]() { //成功
+      override def onResponse(searchResponse: SearchResponse): Unit = {
+        var stationPercent:String = null
+        val searchHits = searchResponse.getHits.getHits
+        if (searchHits.length > 0) {
+          val jsonObject = JSON.parseObject(searchHits(0).getSourceAsString)
+          stationPercent = jsonObject.getString("section_search_percent")
+        }
+        System.out.println("get data from the es :" + stationPercent)
+        resultFuture.complete(Collections.singleton(new Nothing(input.f0, stationPercent)))
+      } //失败
+
+      override def onFailure(e: Exception): Unit = {
+        resultFuture.complete(Collections.singleton(new Nothing(input.f0, null)))
+      }
+    }
+
+    client.searchAsync(searchRequest, listener)
+
+    0
+  }*/
+
+  override def timeout(input: (String, String), resultFuture: ResultFuture[(String, String, Integer)]): Unit = super.timeout(input, resultFuture)
+}

+ 46 - 0
src/main/scala/com/winhc/bigdata/flink/func/HbaseAsyncFunction.scala

@@ -0,0 +1,46 @@
+package com.winhc.bigdata.flink.func
+
+import com.winhc.bigdata.flink.config.HbaseConfig
+import com.winhc.bigdata.flink.implicits._
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.scala.async.{ResultFuture, RichAsyncFunction}
+import org.apache.hadoop.hbase.TableName
+import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Get}
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/8/31 17:07
+ */
+case class HbaseAsyncFunction() extends RichAsyncFunction[(String, String), (String, String, String)] {
+  var connection: Connection = null
+
+  override def open(parameters: Configuration): Unit = {
+    val configuration = HbaseConfig.getHbaseConfiguration()
+    connection = ConnectionFactory.createConnection(configuration)
+  }
+
+  override def asyncInvoke(input: (String, String), resultFuture: ResultFuture[(String, String, String)]): Unit = {
+    val tn = input._1
+    val rowkey = input._2
+    try {
+      val table = connection.getTable(TableName.valueOf(tn.toUpperCase()))
+      val get = new Get(rowkey.getBytes)
+      val result = table.get(get)
+      val string = result.toJsonString
+      resultFuture.complete(Seq((tn, rowkey, string)))
+    } catch {
+      case ex: Exception => {
+        resultFuture.completeExceptionally(ex)
+      }
+    }
+  }
+  //  override def timeout(input: (String, String), resultFuture: ResultFuture[(String, String, String)]): Unit = {
+  //
+  //  }
+
+  override def close(): Unit = {
+    if (connection != null) {
+      connection.close()
+    }
+  }
+}

+ 39 - 0
src/main/scala/com/winhc/bigdata/flink/func/HbaseSinkFunction.scala

@@ -0,0 +1,39 @@
+package com.winhc.bigdata.flink.func
+
+import com.alibaba.fastjson.JSONObject
+import com.winhc.bigdata.flink.config.HbaseConfig
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
+import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
+import org.apache.hadoop.hbase.client.{BufferedMutator, Connection, ConnectionFactory, RetriesExhaustedWithDetailsException}
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/9/1 17:52
+ */
+case class HbaseSinkFunction(tableName:String,jsonObject:JSONObject) extends RichSinkFunction[JSONObject] with CheckpointedFunction with BufferedMutator.ExceptionListener {
+  var connection: Connection = null
+
+  override def open(parameters: Configuration): Unit = {
+    val configuration = HbaseConfig.getHbaseConfiguration()
+    connection = ConnectionFactory.createConnection(configuration)
+  }
+
+
+  override def invoke(value: JSONObject, context: SinkFunction.Context): Unit = {
+
+  }
+
+  override def close(): Unit = {
+    if (connection != null) {
+      connection.close()
+    }
+  }
+
+  override def snapshotState(context: FunctionSnapshotContext): Unit = ???
+
+  override def initializeState(context: FunctionInitializationContext): Unit = ???
+
+  override def onException(e: RetriesExhaustedWithDetailsException, bufferedMutator: BufferedMutator): Unit = ???
+}

+ 18 - 0
src/main/scala/com/winhc/bigdata/flink/implicits/CaseClass2JsonHelper.scala

@@ -0,0 +1,18 @@
+package com.winhc.bigdata.flink.implicits
+
+import org.json4s.jackson.Serialization
+import org.json4s.{Formats, NoTypeHints}
+
+
+/**
+ * @author: XuJiakai
+ * @date: 2020/11/23 10:51
+ */
+case class CaseClass2JsonHelper[A <: AnyRef](that: A) {
+  def toJson()(implicit formats: Formats = Serialization.formats(NoTypeHints)): String = {
+    if (that == null) {
+      return null
+    }
+    Serialization.write(that)
+  }
+}

+ 33 - 0
src/main/scala/com/winhc/bigdata/flink/implicits/HbaseResultHelper.scala

@@ -0,0 +1,33 @@
+package com.winhc.bigdata.flink.implicits
+
+import org.apache.hadoop.hbase.client.Result
+import org.apache.hadoop.hbase.util.Bytes
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.Map
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/8/31 17:29
+ */
+case class HbaseResultHelper(result: Result) {
+  def toJsonString(): String = {
+    if (result == null || result.isEmpty) {
+      null
+    } else {
+      val map: Map[String, String] = Map()
+      var rowkey: String = null
+      for (cell <- result.listCells().asScala) {
+        if (rowkey == null) {
+          rowkey = Bytes.toString(cell.getRowArray, cell.getRowOffset, cell.getRowLength)
+        }
+        val key = Bytes.toString(cell.getQualifierArray, cell.getQualifierOffset, cell.getQualifierLength)
+        val v = Bytes.toString(cell.getValueArray, cell.getValueOffset, cell.getValueLength)
+        map +=  key-> v
+      }
+      map += "rowkey" -> rowkey
+      map.toJson
+    }
+  }
+}
+

+ 8 - 0
src/main/scala/com/winhc/bigdata/flink/implicits/package.scala

@@ -0,0 +1,8 @@
+package com.winhc.bigdata.flink
+
+import org.apache.hadoop.hbase.client.Result
+
+package object implicits {
+  implicit def caseClass2JsonEnhancer(result: Result) = HbaseResultHelper(result)
+  implicit def caseClass2JsonEnhancer[A <: AnyRef](that: A): CaseClass2JsonHelper[A] = CaseClass2JsonHelper(that)
+}

+ 22 - 0
src/main/scala/com/winhc/bigdata/flink/jobs/CompanyStreamJob.scala

@@ -0,0 +1,22 @@
+package com.winhc.bigdata.flink.jobs
+
+import com.winhc.bigdata.flink.jobs.company_job_step._
+import com.winhc.bigdata.flink.source.KafkaSourceBuilder
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/9/1 11:34
+ */
+object CompanyStreamJob {
+
+  def main(args: Array[String]): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val kafkaSource = KafkaSourceBuilder.buildSourceFunction("xjk_test_topic")
+    val source: DataStream[String] = env.fromSource[String](kafkaSource, CustomWatermarkStrategy(), "Kafka Source")
+    source.winhcTransform()
+
+    env.execute("CompanyStreamJob")
+  }
+}

+ 13 - 0
src/main/scala/com/winhc/bigdata/flink/jobs/CustomWatermarkStrategy.scala

@@ -0,0 +1,13 @@
+package com.winhc.bigdata.flink.jobs
+
+import org.apache.flink.api.common.eventtime.{NoWatermarksGenerator, WatermarkGenerator, WatermarkGeneratorSupplier, WatermarkStrategy}
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/9/1 16:47
+ */
+case class CustomWatermarkStrategy() extends WatermarkStrategy[String] {
+  override def createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context): WatermarkGenerator[String] = {
+    new NoWatermarksGenerator[String]
+  }
+}

+ 8 - 0
src/main/scala/com/winhc/bigdata/flink/jobs/company_job_step/package.scala

@@ -0,0 +1,8 @@
+package com.winhc.bigdata.flink.jobs
+
+import org.apache.flink.streaming.api.scala.DataStream
+
+package object company_job_step {
+  implicit def kafkaTransformEnhancer(source: DataStream[String]) = step_01_kafka_transform(source)
+
+}

+ 31 - 0
src/main/scala/com/winhc/bigdata/flink/jobs/company_job_step/step_01_kafka_transform.scala

@@ -0,0 +1,31 @@
+package com.winhc.bigdata.flink.jobs.company_job_step
+
+import com.alibaba.fastjson.JSON
+import com.winhc.bigdata.filnk.java.BaseEntity
+import com.winhc.bigdata.filnk.java.utils.Json2EntityUtils
+import com.winhc.bigdata.flink.CompanyData
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.DataStream
+
+import scala.collection.JavaConverters._
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/9/1 14:10
+ *
+ *        接收kafka数据并转化
+ */
+case class step_01_kafka_transform(source: DataStream[String]) {
+  def winhcTransform(): DataStream[CompanyData] = {
+    source.map(str => {
+      val jsonObject = JSON.parseObject(str)
+      val companyId = jsonObject.getString("company_id")
+      val companyName = jsonObject.getString("company_name")
+      val content = jsonObject.getJSONObject("content")
+      val map: Map[String, Seq[BaseEntity]] = content.keySet().asScala.map(tn => {
+        (tn, content.getJSONArray(tn).toJavaList(Json2EntityUtils.getClazz(tn)).asScala.toSeq)
+      }).toMap
+      CompanyData(companyId = companyId, companyName = companyName, content = map)
+    })
+  }
+}

+ 46 - 0
src/main/scala/com/winhc/bigdata/flink/jobs/company_job_step/step_02_sink_2_ods.scala

@@ -0,0 +1,46 @@
+package com.winhc.bigdata.flink.jobs.company_job_step
+
+import com.winhc.bigdata.filnk.java.BaseEntity
+import com.winhc.bigdata.filnk.java.utils.Json2EntityUtils
+import com.winhc.bigdata.flink.CompanyData
+import com.winhc.bigdata.flink.sink.OdpsSinkBuilder
+import com.winhc.bigdata.flink.utils.DateUtils
+import org.apache.flink.api.java.tuple.Tuple2
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.functions.sink.SinkFunction
+import org.apache.flink.streaming.api.scala.{DataStream, OutputTag}
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+import scala.collection.JavaConverters._
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/9/1 14:31
+ *        将原始数据写出到ods,用于备份(暂不写出)
+ */
+case class step_02_sink_2_ods(dataStream: DataStream[CompanyData]) {
+
+  def saveToOdpsOds(): DataStream[CompanyData] = {
+    val map = Json2EntityUtils.getTn.asScala.map(t => {
+      (t, new OutputTag[(String, Seq[BaseEntity])](t) {})
+    }).toMap
+
+    val processStream = dataStream.flatMap(e => e.content)
+      .process(new ProcessFunction[(String, Seq[BaseEntity]), (String, Seq[BaseEntity])]() {
+        override def processElement(value: (String, Seq[BaseEntity]), ctx: ProcessFunction[(String, Seq[BaseEntity]), (String, Seq[BaseEntity])]#Context, out: Collector[(String, Seq[BaseEntity])]): Unit = ctx.output(map(value._1), value)
+      })
+
+    for (elem <- map) {
+      val tn = elem._1
+      val sink: SinkFunction[Tuple2[Boolean, Row]] = OdpsSinkBuilder.buildSinkFunction(s"inc_ods_$tn", s"ds=${DateUtils.getYesterday()}")
+      processStream.getSideOutput(elem._2)
+        .flatMap(r => r._2)
+        .map(r => new Tuple2(true, r.toRow))
+        .addSink(sink)
+    }
+
+    dataStream
+  }
+}

+ 16 - 0
src/main/scala/com/winhc/bigdata/flink/jobs/company_job_step/step_03_etl.scala

@@ -0,0 +1,16 @@
+package com.winhc.bigdata.flink.jobs.company_job_step
+
+import com.winhc.bigdata.flink.CompanyData
+import org.apache.flink.streaming.api.scala.DataStream
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/9/1 15:10
+ *        处理转化数据etl
+ */
+case class step_03_etl(dataStream: DataStream[CompanyData]) {
+
+  def etl(): DataStream[CompanyData] = {
+    dataStream
+  }
+}

+ 13 - 0
src/main/scala/com/winhc/bigdata/flink/jobs/company_job_step/step_04_builder_data.scala

@@ -0,0 +1,13 @@
+package com.winhc.bigdata.flink.jobs.company_job_step
+
+import com.winhc.bigdata.flink.CompanyData
+import org.apache.flink.streaming.api.scala.DataStream
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/9/2 14:25
+ *        加载已有数据,并可进行必要拦截(暂不实现)
+ */
+case class step_04_builder_data(dataStream: DataStream[CompanyData]) {
+
+}

+ 19 - 0
src/main/scala/com/winhc/bigdata/flink/jobs/company_job_step/step_05_save_data.scala

@@ -0,0 +1,19 @@
+package com.winhc.bigdata.flink.jobs.company_job_step
+
+import com.winhc.bigdata.flink.CompanyData
+import org.apache.flink.streaming.api.scala.DataStream
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/9/1 17:17
+ *        数据输出
+ *        1.写出ads/Hologres
+ *        2.列表页写出到es
+ *        3.数据输出到hbase
+ */
+case class step_05_save_data(dataStream: DataStream[CompanyData]) {
+
+  def save2Ads(): DataStream[CompanyData] = {
+    dataStream
+  }
+}

+ 16 - 0
src/main/scala/com/winhc/bigdata/flink/jobs/company_job_step/step_06_builder_company_index.scala

@@ -0,0 +1,16 @@
+package com.winhc.bigdata.flink.jobs.company_job_step
+
+import com.winhc.bigdata.flink.CompanyData
+import org.apache.flink.streaming.api.scala.DataStream
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/9/1 17:18
+ * 构建
+ */
+case class step_06_builder_company_index(dataStream: DataStream[CompanyData]){
+
+  def builderCompanyIndex(): DataStream[CompanyData] ={
+    dataStream
+  }
+}

+ 59 - 0
src/main/scala/com/winhc/bigdata/flink/sink/ElasticsearchSinkBuilder.scala

@@ -0,0 +1,59 @@
+package com.winhc.bigdata.flink.sink
+
+import com.alibaba.fastjson.JSONObject
+import com.winhc.bigdata.flink.config.ElasticsearchConfig
+import org.apache.flink.api.common.functions.RuntimeContext
+import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.index.IndexRequest
+import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.update.UpdateRequest
+import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestClientBuilder
+import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.xcontent.XContentType
+import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
+import org.apache.flink.streaming.connectors.elasticsearch6.{ElasticsearchSink, RestClientFactory}
+
+import scala.collection.JavaConversions
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/9/1 17:19
+ */
+object ElasticsearchSinkBuilder {
+  case class JsonObjectElasticsearchSinkFunction[T](index: String, docType: String, handle: (T) => (String, JSONObject)) extends ElasticsearchSinkFunction[T] {
+    override def process(t: T, runtimeContext: RuntimeContext, indexer: RequestIndexer): Unit = {
+      val tuple = handle(t)
+      val id = tuple._1
+      val json: JSONObject = tuple._2
+      if (id != null) {
+        val updateRequest = new UpdateRequest(index, docType, id)
+          .doc(json, XContentType.JSON)
+          .upsert(json, XContentType.JSON)
+        indexer.add(updateRequest)
+      } else {
+        val indexRequest: IndexRequest = new IndexRequest(index, docType, id)
+          .source(json, XContentType.JSON)
+        indexer.add(indexRequest)
+      }
+    }
+  }
+
+  def buildSinkFunction[T](clusterName: String, index: String, docType: String, handle: (T) => (String, JSONObject)): ElasticsearchSink[T] = {
+
+    val tuple = ElasticsearchConfig.getConfig(clusterName)
+    val hosts = tuple._1
+    val clientCallback = tuple._2
+    val requestCallback = tuple._3
+
+    val esSinkBuilder = new ElasticsearchSink.Builder[T](JavaConversions.seqAsJavaList(hosts), JsonObjectElasticsearchSinkFunction(index, docType, handle))
+
+    esSinkBuilder.setBulkFlushMaxActions(500)
+
+    // provide a RestClientFactory for custom configuration on the internally created REST client
+    esSinkBuilder.setRestClientFactory(new RestClientFactory {
+      override def configureRestClientBuilder(restClientBuilder: RestClientBuilder): Unit = {
+        restClientBuilder.setHttpClientConfigCallback(clientCallback)
+        restClientBuilder.setRequestConfigCallback(requestCallback)
+      }
+    })
+
+    esSinkBuilder.build()
+  }
+}

+ 10 - 0
src/main/scala/com/winhc/bigdata/flink/sink/HBaseSinkBuilder.scala

@@ -0,0 +1,10 @@
+package com.winhc.bigdata.flink.sink
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/9/2 09:11
+ */
+object HBaseSinkBuilder {
+
+
+}

+ 75 - 0
src/main/scala/com/winhc/bigdata/flink/sink/HoloSinkBuilder.scala

@@ -0,0 +1,75 @@
+package com.winhc.bigdata.flink.sink
+
+import com.alibaba.ververica.connectors.common.sink.OutputFormatSinkFunction
+import com.alibaba.ververica.connectors.hologres.api.{AbstractHologresWriter, HologresTableSchema}
+import com.alibaba.ververica.connectors.hologres.config.{HologresConfigs, HologresConnectionParam}
+import com.alibaba.ververica.connectors.hologres.jdbc.HologresJDBCWriter
+import com.alibaba.ververica.connectors.hologres.rpc.HologresRpcWriter
+import com.alibaba.ververica.connectors.hologres.sink.HologresSinkFunction
+import com.alibaba.ververica.connectors.hologres.utils.HologresUtils
+import com.winhc.bigdata.filnk.java.constant.EnvConst
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.api.{DataTypes, TableSchema}
+import org.apache.flink.table.data.RowData
+import org.apache.flink.table.types.DataType
+import org.postgresql.model
+import org.postgresql.model.Column
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/9/3 16:54
+ */
+object HoloSinkBuilder {
+  def builderSinkFunction(tableName: String): OutputFormatSinkFunction[RowData] = {
+    val config = new Configuration()
+    config.setString(HologresConfigs.TABLE, tableName)
+    config.setString(HologresConfigs.ENDPOINT, EnvConst.getValue("winhc.holo.endpoint"))
+    config.setString(HologresConfigs.OPTIONAL_WRITE_BATCH_SIZE.key(), EnvConst.getValue("winhc.holo.batch-size"))
+    config.setString(HologresConfigs.USERNAME, EnvConst.getValue("winhc.holo.username"))
+    config.setString(HologresConfigs.PASSWORD, EnvConst.getValue("winhc.holo.password"))
+    config.setString(HologresConfigs.DATABASE, EnvConst.getValue("winhc.holo.database"))
+    config.setString(HologresConfigs.RPC_RETRIES.key(), EnvConst.getValue("winhc.holo.rpc-retries"))
+    config.setString(HologresConfigs.USE_RPC_MODE.key(), EnvConst.getValue("winhc.holo.user-rpc-mode"))
+    config.setString(HologresConfigs.MUTATE_TYPE, EnvConst.getValue("winhc.holo.mutate-type"))
+    config.setString(HologresConfigs.OPTIONAL_SINK_IGNORE_DELETE.key(), EnvConst.getValue("winhc.holo.ignore-delete"))
+    val connectionParam = new HologresConnectionParam(config)
+    val writer = buildHoloWriter(config, connectionParam)
+    new HologresSinkFunction(connectionParam, writer)
+  }
+
+
+  private def getHoloSchema(tableSchema: HologresTableSchema): TableSchema = {
+    val schema: model.TableSchema = tableSchema.get()
+    val builder = TableSchema.builder()
+    for (elem: Column <- schema.getColumnSchema.toList) {
+      builder.field(elem.getName, switchHoldSchema(elem))
+    }
+    builder.build()
+  }
+
+
+  private def buildHoloWriter(config: Configuration, hologresConnectionParam: HologresConnectionParam): AbstractHologresWriter[RowData] = {
+    val tableSchema = HologresTableSchema.get(hologresConnectionParam)
+    val schema = getHoloSchema(tableSchema)
+    var writer: AbstractHologresWriter[RowData] = null
+    if (HologresUtils.shouldUseRpc(config)) writer = HologresRpcWriter.createTableWriter(hologresConnectionParam, schema, tableSchema)
+    else writer = HologresJDBCWriter.createTableWriter(hologresConnectionParam, schema, tableSchema)
+    writer
+  }
+
+
+  private def switchHoldSchema(col: Column): DataType = {
+    val t = col.getTypeName.toLowerCase
+    t match {
+      case "string" => DataTypes.STRING()
+      case "int" => DataTypes.INT()
+      case "double" => DataTypes.DOUBLE()
+      case "bigint" => DataTypes.BIGINT()
+      case "date" => DataTypes.DATE()
+      case _ => {
+        println("not fount type: " + t)
+        DataTypes.STRING()
+      }
+    }
+  }
+}

+ 19 - 0
src/main/scala/com/winhc/bigdata/flink/sink/JsonObjectToMutationConverter.scala

@@ -0,0 +1,19 @@
+package com.winhc.bigdata.flink.sink
+
+import com.alibaba.fastjson.JSONObject
+import com.alibaba.ververica.connector.cloudhbase.schema.HBaseMutationConverter
+import org.apache.hadoop.hbase.client.Mutation
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/9/2 09:15
+ */
+case class JsonObjectToMutationConverter() extends HBaseMutationConverter[JSONObject] {
+  override def open(): Unit = {}
+
+  override def convertToMutation(t: JSONObject): Mutation = {
+
+    null
+
+  }
+}

+ 17 - 0
src/main/scala/com/winhc/bigdata/flink/sink/OdpsSinkBuilder.scala

@@ -0,0 +1,17 @@
+package com.winhc.bigdata.flink.sink
+
+import com.winhc.bigdata.filnk.java.sink.OdpsSinkBuilder4J
+import org.apache.flink.api.java.tuple.Tuple2
+import org.apache.flink.streaming.api.functions.sink.SinkFunction
+import org.apache.flink.types.Row
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/9/1 15:32
+ */
+object OdpsSinkBuilder {
+  def buildSinkFunction(tableName: String, partition: String): SinkFunction[Tuple2[Boolean, Row]] = {
+    OdpsSinkBuilder4J.buildSinkFunction(tableName, partition).asInstanceOf[SinkFunction[Tuple2[Boolean, Row]]]
+  }
+
+}

+ 14 - 0
src/main/scala/com/winhc/bigdata/flink/source/KafkaSourceBuilder.scala

@@ -0,0 +1,14 @@
+package com.winhc.bigdata.flink.source
+
+import com.winhc.bigdata.filnk.java.source.KafkaSourceBuilder4J
+import org.apache.flink.connector.kafka.source.KafkaSource
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/9/1 11:50
+ */
+object KafkaSourceBuilder {
+  def buildSourceFunction(topic: String): KafkaSource[String] = {
+    KafkaSourceBuilder4J.buildSourceFunction(topic)
+  }
+}

+ 33 - 0
src/main/scala/com/winhc/bigdata/flink/source/OdpsSourceBuilder.scala

@@ -0,0 +1,33 @@
+package com.winhc.bigdata.flink.source
+
+import com.alibaba.ververica.connectors.odps.{ODPSStreamSource, OdpsOptions}
+import com.winhc.bigdata.filnk.java.constant.EnvConst
+import com.winhc.bigdata.filnk.java.source.OdpsSourceBuilder4J
+import org.apache.flink.configuration.Configuration
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/8/31 11:48
+ */
+object OdpsSourceBuilder {
+  def buildSourceFunction(tableName: String, partition: String = null): ODPSStreamSource = {
+    var project = EnvConst.getValue("odps.default-project")
+    var tn = tableName
+    if (tableName.contains(".")) {
+      val strings = tableName.split(".")
+      project = strings(0)
+      tn = strings(1)
+    }
+    val conf = new Configuration()
+    conf.setString(OdpsOptions.END_POINT.key(), EnvConst.getValue("odps.url.end-point"))
+    conf.setString(OdpsOptions.TUNNEL_END_POINT.key(), EnvConst.getValue("odps.url.tunnel-end-point"))
+    conf.setString(OdpsOptions.PROJECT_NAME.key(), project)
+    conf.setString(OdpsOptions.TABLE_NAME.key, tn)
+    conf.setString(OdpsOptions.ACCESS_ID.key(), EnvConst.getValue("odps.prod-account.access-id"))
+    conf.setString(OdpsOptions.ACCESS_KEY.key(), EnvConst.getValue("odps.prod-account.access-key"))
+    conf.setString(OdpsOptions.PARTITION.key, partition)
+    val odpsSource = new OdpsSourceBuilder4J(conf).buildSourceFunction
+    odpsSource
+  }
+
+}

+ 40 - 0
src/main/scala/com/winhc/bigdata/flink/test/MyTest.scala

@@ -0,0 +1,40 @@
+package com.winhc.bigdata.flink.test
+
+import com.winhc.bigdata.filnk.java.sink.OdpsSinkBuilder4J
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.java.tuple.Tuple2
+import org.apache.flink.streaming.api.functions.sink.SinkFunction
+import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
+import org.apache.flink.types.Row
+/**
+ * @author: XuJiakai
+ * @date: 2021/9/1 10:15
+ */
+object MyTest {
+  def main(args: Array[String]): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val odpsSink: SinkFunction[Tuple2[Boolean, Row]] = OdpsSinkBuilder4J.buildSinkFunction("xjk_test_test",null).asInstanceOf[SinkFunction[Tuple2[Boolean,Row]]]
+    val odpsSink2: SinkFunction[Tuple2[Boolean, Row]] = OdpsSinkBuilder4J.buildSinkFunction("xjk_test_test_2",null).asInstanceOf[SinkFunction[Tuple2[Boolean,Row]]]
+
+    import org.apache.flink.api.scala._
+    val value: DataStream[Tuple2[Boolean, Row]] = env.fromCollection(Seq(
+      Row.of(Long.box(123), Double.box(123.1d)),
+      Row.of(Long.box(456), Double.box(123.2d))
+    ))
+      .map(new MapFunction[Row, Tuple2[Boolean, Row]]() {
+        @throws[Exception]
+        override def map(row: Row): Tuple2[Boolean, Row] = new Tuple2(true, row)
+      })
+    value.addSink(odpsSink)
+    value.addSink(odpsSink2)
+
+
+
+    /*  val odpsSource = OdpsSource.buildSourceFunction("xjk_test_test")
+      val source = env.addSource[RowData](odpsSource)
+      source.addSink(new PrintSinkFunction[RowData])*/
+
+    env.execute("test job")
+  }
+
+}

+ 23 - 0
src/main/scala/com/winhc/bigdata/flink/utils/BaseUtils.scala

@@ -0,0 +1,23 @@
+package com.winhc.bigdata.flink.utils
+
+import org.apache.commons.lang3.StringUtils
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/9/1 15:25
+ */
+object BaseUtils {
+  //去其他符号 去空格
+  private val pattern = "[^\\u4e00-\\u9fa50-9a-zA-Z]".r
+
+  //去其他符号 去空格  补全null
+  def cleanup(s: String): String = {
+    if (StringUtils.isBlank(s))
+      ""
+    else
+      pattern replaceAllIn(s, "")
+  }
+
+  def isWindows: Boolean = System.getProperty("os.name").contains("Windows")
+
+}

+ 32 - 0
src/main/scala/com/winhc/bigdata/flink/utils/DateUtils.scala

@@ -0,0 +1,32 @@
+package com.winhc.bigdata.flink.utils
+
+import org.apache.commons.lang3.time.DateFormatUtils
+
+import java.text.SimpleDateFormat
+import java.util.{Calendar, Date, Locale}
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/9/1 15:24
+ */
+object DateUtils {
+
+  def nowDate(pattern: String = "yyyy-MM-dd"): String = {
+    new SimpleDateFormat(pattern).format(new Date)
+  }
+
+  def getYesterday(): String = {
+    atDaysAfter(-1, nowDate("yyyyMMdd"))
+  }
+
+  def atDaysAfter(n: Int, time: String, pattern: String = "yyyyMMdd"): String = {
+    import java.text.SimpleDateFormat
+    val newtime: Date = new SimpleDateFormat("yyyyMMdd").parse(time)
+    val c = Calendar.getInstance(Locale.CHINA)
+    c.setTimeInMillis(newtime.getTime)
+    c.add(Calendar.DATE, 1 * n)
+    DateFormatUtils.format(c.getTime.getTime, pattern)
+  }
+
+
+}