Ver código fonte

feat: 包名规范,HbaseSink

JimZhang 3 anos atrás
pai
commit
db2ac4b4ce
58 arquivos alterados com 1982 adições e 124 exclusões
  1. 37 1
      pom.xml
  2. 0 26
      src/main/java/com/winhc/bigdata/filnk/java/constant/EnvConst.java
  3. 1 1
      src/main/java/com/winhc/bigdata/filnk/java/BaseEntity.java
  4. 0 0
      src/main/java/com/winhc/bigdata/flink/java/CompanyData.java
  5. 2 2
      src/main/java/com/winhc/bigdata/filnk/java/company_data_entity/Company.java
  6. 2 2
      src/main/java/com/winhc/bigdata/filnk/java/company_data_entity/CompanyHolder.java
  7. 2 2
      src/main/java/com/winhc/bigdata/filnk/java/company_data_entity/CompanyStaff.java
  8. 2 2
      src/main/java/com/winhc/bigdata/filnk/java/company_data_entity/CompanyTm.java
  9. 46 0
      src/main/java/com/winhc/bigdata/flink/java/constant/EnvConst.java
  10. 167 0
      src/main/java/com/winhc/bigdata/flink/java/entity/CompanyDataReceive.java
  11. 1 1
      src/main/java/com/winhc/bigdata/filnk/java/entity/CompanyIndexEntity.java
  12. 1 1
      src/main/java/com/winhc/bigdata/filnk/java/entity/CompanyName.java
  13. 1 1
      src/main/java/com/winhc/bigdata/filnk/java/entity/Entity.java
  14. 89 0
      src/main/java/com/winhc/bigdata/flink/java/entity/PutCollection.java
  15. 21 0
      src/main/java/com/winhc/bigdata/flink/java/entity/ReceiveMeta.java
  16. 238 0
      src/main/java/com/winhc/bigdata/flink/java/entity/SealedPut.java
  17. 83 0
      src/main/java/com/winhc/bigdata/flink/java/entity/ValidationExceptionEntity.java
  18. 2 2
      src/main/java/com/winhc/bigdata/filnk/java/sink/OdpsSinkBuilder4J.java
  19. 7 4
      src/main/java/com/winhc/bigdata/filnk/java/source/KafkaSourceBuilder4J.java
  20. 1 1
      src/main/java/com/winhc/bigdata/filnk/java/source/OdpsSourceBuilder4J.java
  21. 11 4
      src/main/java/com/winhc/bigdata/filnk/java/utils/EnvironmentProperties.java
  22. 1 1
      src/main/java/com/winhc/bigdata/filnk/java/utils/FieldNameUtils.java
  23. 2 2
      src/main/java/com/winhc/bigdata/filnk/java/utils/Json2EntityUtils.java
  24. 21 0
      src/main/resources/log4j2.xml
  25. 173 0
      src/main/resources/schema/company.schema
  26. 67 0
      src/main/resources/schema/company_holder.schema
  27. 34 0
      src/main/resources/schema/company_staff.schema
  28. 1 1
      src/main/scala/com/winhc/bigdata/flink/CompanyData.scala
  29. 36 0
      src/main/scala/com/winhc/bigdata/flink/config/ArgsCompanyJob.scala
  30. 1 1
      src/main/scala/com/winhc/bigdata/flink/config/ElasticsearchConfig.scala
  31. 19 1
      src/main/scala/com/winhc/bigdata/flink/config/HbaseConfig.scala
  32. 12 0
      src/main/scala/com/winhc/bigdata/flink/event/DimEntity.scala
  33. 18 0
      src/main/scala/com/winhc/bigdata/flink/event/MetaInfo.scala
  34. 34 0
      src/main/scala/com/winhc/bigdata/flink/event/RowBaseDimEntity.scala
  35. 35 0
      src/main/scala/com/winhc/bigdata/flink/event/UpdateEntity.scala
  36. 27 0
      src/main/scala/com/winhc/bigdata/flink/event/ValidationExceptionEntity.scala
  37. 9 0
      src/main/scala/com/winhc/bigdata/flink/event/ValidationResult.scala
  38. 74 25
      src/main/scala/com/winhc/bigdata/flink/func/HbaseAsyncFunction.scala
  39. 54 11
      src/main/scala/com/winhc/bigdata/flink/func/HbaseSinkFunction.scala
  40. 8 4
      src/main/scala/com/winhc/bigdata/flink/implicits/CaseClass2JsonHelper.scala
  41. 15 2
      src/main/scala/com/winhc/bigdata/flink/jobs/CompanyStreamJob.scala
  42. 4 5
      src/main/scala/com/winhc/bigdata/flink/TestJob.scala
  43. 137 0
      src/main/scala/com/winhc/bigdata/flink/jobs/TestJob1.scala
  44. 21 0
      src/main/scala/com/winhc/bigdata/flink/jobs/company_job_step/OutputTags.scala
  45. 78 0
      src/main/scala/com/winhc/bigdata/flink/jobs/company_job_step/gs_01_kafka_transform_and_validation.scala
  46. 4 2
      src/main/scala/com/winhc/bigdata/flink/jobs/company_job_step/package.scala
  47. 4 3
      src/main/scala/com/winhc/bigdata/flink/jobs/company_job_step/step_01_kafka_transform.scala
  48. 2 2
      src/main/scala/com/winhc/bigdata/flink/jobs/company_job_step/step_02_sink_2_ods.scala
  49. 1 1
      src/main/scala/com/winhc/bigdata/flink/sink/HoloSinkBuilder.scala
  50. 1 1
      src/main/scala/com/winhc/bigdata/flink/sink/OdpsSinkBuilder.scala
  51. 1 1
      src/main/scala/com/winhc/bigdata/flink/source/KafkaSourceBuilder.scala
  52. 2 2
      src/main/scala/com/winhc/bigdata/flink/source/OdpsSourceBuilder.scala
  53. 101 0
      src/main/scala/com/winhc/bigdata/flink/streaming/trigger/TimeCountTrigger.scala
  54. 1 1
      src/main/scala/com/winhc/bigdata/flink/test/MyTest.scala
  55. 14 8
      src/main/scala/com/winhc/bigdata/flink/utils/BaseUtils.scala
  56. 66 0
      src/main/scala/com/winhc/bigdata/flink/utils/PreUDF.scala
  57. 153 0
      src/main/scala/com/winhc/bigdata/flink/utils/RowDataUtils.scala
  58. 37 0
      src/test/scala/com/winhc/bigdata/flink/utils/BaseUtilsTI.scala

+ 37 - 1
pom.xml

@@ -18,7 +18,7 @@
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <maven.compiler.source>8</maven.compiler.source>
         <maven.compiler.target>8</maven.compiler.target>
-        <connector.version>1.13-vvr-4.0.7-SNAPSHOT</connector.version>
+        <connector.version>1.13-vvr-4.0.7</connector.version>
         <scope>compile</scope>
         <!--        <scope>provided</scope>-->
     </properties>
@@ -53,6 +53,19 @@
         <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
         <dependency>
             <groupId>org.apache.flink</groupId>
+            <artifactId>flink-avro</artifactId>
+            <version>${flink.version}</version>
+            <scope>${scope}</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>${scope}</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
             <artifactId>flink-scala_${scala.binary.version}</artifactId>
             <version>${flink.version}</version>
             <scope>${scope}</scope>
@@ -199,6 +212,28 @@
             <version>3.7.0-M7</version>
         </dependency>
 
+        <dependency>
+            <groupId>com.github.java-json-tools</groupId>
+            <artifactId>json-schema-validator</artifactId>
+            <version>2.2.14</version>
+        </dependency>
+
+
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-runtime_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+            <classifier>tests</classifier>
+        </dependency>
     </dependencies>
 
     <build>
@@ -284,6 +319,7 @@
                     </execution>
                 </executions>
                 <configuration>
+                    <addScalacArgs>-target:jvm-1.8</addScalacArgs>
                     <args>
                         <arg>-nobootcp</arg>
                     </args>

+ 0 - 26
src/main/java/com/winhc/bigdata/filnk/java/constant/EnvConst.java

@@ -1,26 +0,0 @@
-package com.winhc.bigdata.filnk.java.constant;
-
-import com.winhc.bigdata.filnk.java.utils.EnvironmentProperties;
-
-import java.util.Map;
-
-/**
- * @author: XuJiakai
- * 2021/8/31 16:17
- */
-public class EnvConst {
-    private static final EnvironmentProperties env = new EnvironmentProperties();
-
-    public static Map<String, String> getMapByPrefix(String prefix) {
-        return env.getMapByPrefix(prefix);
-    }
-
-    public static String getValue(String key) {
-        return getValue(key, null);
-    }
-
-    public static String getValue(String key, String orDefault) {
-        String value = env.getValue(key);
-        return value == null ? orDefault : value;
-    }
-}

+ 1 - 1
src/main/java/com/winhc/bigdata/filnk/java/BaseEntity.java

@@ -1,4 +1,4 @@
-package com.winhc.bigdata.filnk.java;
+package com.winhc.bigdata.flink.java;
 
 import org.apache.flink.types.Row;
 

src/main/java/com/winhc/bigdata/filnk/java/CompanyData.java → src/main/java/com/winhc/bigdata/flink/java/CompanyData.java


+ 2 - 2
src/main/java/com/winhc/bigdata/filnk/java/company_data_entity/Company.java

@@ -1,9 +1,9 @@
 
-package com.winhc.bigdata.filnk.java.company_data_entity;
+package com.winhc.bigdata.flink.java.company_data_entity;
 
 import com.alibaba.fastjson.JSONObject;
 import com.alibaba.fastjson.serializer.SerializerFeature;
-import com.winhc.bigdata.filnk.java.BaseEntity;
+import com.winhc.bigdata.flink.java.BaseEntity;
 import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.NoArgsConstructor;

+ 2 - 2
src/main/java/com/winhc/bigdata/filnk/java/company_data_entity/CompanyHolder.java

@@ -1,9 +1,9 @@
 
-package com.winhc.bigdata.filnk.java.company_data_entity;
+package com.winhc.bigdata.flink.java.company_data_entity;
 
 import com.alibaba.fastjson.JSONObject;
 import com.alibaba.fastjson.serializer.SerializerFeature;
-import com.winhc.bigdata.filnk.java.BaseEntity;
+import com.winhc.bigdata.flink.java.BaseEntity;
 import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.NoArgsConstructor;

+ 2 - 2
src/main/java/com/winhc/bigdata/filnk/java/company_data_entity/CompanyStaff.java

@@ -1,9 +1,9 @@
 
-package com.winhc.bigdata.filnk.java.company_data_entity;
+package com.winhc.bigdata.flink.java.company_data_entity;
 
 import com.alibaba.fastjson.JSONObject;
 import com.alibaba.fastjson.serializer.SerializerFeature;
-import com.winhc.bigdata.filnk.java.BaseEntity;
+import com.winhc.bigdata.flink.java.BaseEntity;
 import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.NoArgsConstructor;

+ 2 - 2
src/main/java/com/winhc/bigdata/filnk/java/company_data_entity/CompanyTm.java

@@ -1,9 +1,9 @@
 
-package com.winhc.bigdata.filnk.java.company_data_entity;
+package com.winhc.bigdata.flink.java.company_data_entity;
 
 import com.alibaba.fastjson.JSONObject;
 import com.alibaba.fastjson.serializer.SerializerFeature;
-import com.winhc.bigdata.filnk.java.BaseEntity;
+import com.winhc.bigdata.flink.java.BaseEntity;
 import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.NoArgsConstructor;

+ 46 - 0
src/main/java/com/winhc/bigdata/flink/java/constant/EnvConst.java

@@ -0,0 +1,46 @@
+package com.winhc.bigdata.flink.java.constant;
+
+import com.winhc.bigdata.flink.java.utils.EnvironmentProperties;
+import org.apache.flink.api.java.utils.ParameterTool;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author: XuJiakai
+ * 2021/8/31 16:17
+ */
+public class EnvConst {
+    private static final EnvironmentProperties env = new EnvironmentProperties();
+    public static ParameterTool createParameterTool(final String[] args) throws Exception {
+        return ParameterTool
+                .fromMap(env.getSource())
+                .mergeWith(ParameterTool.fromArgs(args))
+                .mergeWith(ParameterTool.fromSystemProperties())
+                .mergeWith(ParameterTool.fromMap(getenv()));// mergeWith 会使用最新的配置
+    }
+
+    //获取 Job 设置的环境变量
+    private static Map<String, String> getenv() {
+        Map<String, String> map = new HashMap<>();
+        for (Map.Entry<String, String> entry : System.getenv().entrySet()) {
+            map.put(entry.getKey().toLowerCase().replace('_', '.'), entry.getValue());
+        }
+        return map;
+    }
+
+
+
+    public static Map<String, String> getMapByPrefix(String prefix) {
+        return env.getMapByPrefix(prefix);
+    }
+
+    public static String getValue(String key) {
+        return getValue(key, null);
+    }
+
+    public static String getValue(String key, String orDefault) {
+        String value = env.getValue(key);
+        return value == null ? orDefault : value;
+    }
+}

+ 167 - 0
src/main/java/com/winhc/bigdata/flink/java/entity/CompanyDataReceive.java

@@ -0,0 +1,167 @@
+package com.winhc.bigdata.flink.java.entity;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.Data;
+import lombok.var;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author ZhangJi
+ * @since 2021-10-14 10:07
+ */
+@Data
+public class CompanyDataReceive {
+    private String requestId;
+    private String sessionId;
+    private ReceiveMeta meta;
+    private Map<String, List<JsonNode>> data;
+    private JsonNode company;
+
+    public ReceiveMeta getMeta() {
+        return meta;
+    }
+
+    public Map<String, List<JsonNode>> getData() {
+        return data;
+    }
+
+    public void setCompany(JsonNode company) {
+        this.company = company;
+    }
+
+    public JsonNode getCompany() {
+        return company;
+    }
+
+    public static void main(String[] args) throws JsonProcessingException {
+        var s = "{ \n" +
+                "    \"meta\":{\n" +
+                "      \n" +
+                "    },\n" +
+                "    \"company\" : {\n" +
+                "        \"company_id\" : \"a04963caf9fb971d639caff75a9bf653\", \n" +
+                "        \"new_cid\" : null, \n" +
+                "        \"base\" : \"js\", \n" +
+                "        \"name\" : \"南京绿野仙踪摄影有限公司\", \n" +
+                "        \"name_en\" : \"Nanjing Wizard of Oz Photography Co., Ltd.\", \n" +
+                "        \"name_alias\" : \"绿野仙踪\", \n" +
+                "        \"history_names\" : null, \n" +
+                "        \"legal_entity_id\" : \"p1444360499ac9d07c02da55b73d52c48\", \n" +
+                "        \"legal_entity_name\" : \"潘振\", \n" +
+                "        \"legal_entity_type\" : 1, \n" +
+                "        \"reg_number\" : \"320106000094121\", \n" +
+                "        \"company_org_type\" : \"有限责任公司(自然人投资或控股)\", \n" +
+                "        \"reg_location\" : \"南京市鼓楼区江东北路319号二楼B2075-B2077号\", \n" +
+                "        \"estiblish_time\" : \"2007-10-19 00:00:00\", \n" +
+                "        \"from_time\" : \"2007-10-19 00:00:00\", \n" +
+                "        \"to_time\" : \"2027-10-18 00:00:00\", \n" +
+                "        \"business_scope\" : \"摄影服务;玩具、服装、鞋类、文化用品销售。(依法须经批准的项目,经相关部门批准后方可开展经营活动)\", \n" +
+                "        \"reg_institute\" : \"南京市鼓楼区市场监督管理局\", \n" +
+                "        \"approved_time\" : \"2021-03-22 00:00:00\", \n" +
+                "        \"reg_status\" : \"存续(在营、开业、在册)\", \n" +
+                "        \"reg_capital\" : \"3万元人民币\", \n" +
+                "        \"org_approved_institute\" : null, \n" +
+                "        \"parent_company_id\" : null, \n" +
+                "        \"company_type\" : 1, \n" +
+                "        \"credit_code\" : \"91320106667355766T\", \n" +
+                "        \"org_number\" : \"66735576-6\", \n" +
+                "        \"score\" : null, \n" +
+                "        \"cate_first_code\" : \"27\", \n" +
+                "        \"cate_second_code\" : \"24\", \n" +
+                "        \"cate_third_code\" : \"493\", \n" +
+                "        \"lat\" : \"32.0661519547779\", \n" +
+                "        \"lng\" : \"118.745513991916\", \n" +
+                "        \"province_code\" : \"32\", \n" +
+                "        \"city_code\" : \"01\", \n" +
+                "        \"county_code\" : \"06\", \n" +
+                "        \"reg_capital_amount\" : 3000000, \n" +
+                "        \"reg_capital_currency\" : \"人民币\", \n" +
+                "        \"actual_capital_amount\" : 3000000, \n" +
+                "        \"actual_capital_currency\" : \"人民币\", \n" +
+                "        \"reg_status_std\" : \"存续\", \n" +
+                "        \"social_security_staff_num\" : 2, \n" +
+                "        \"cancel_date\" : null, \n" +
+                "        \"cancel_reason\" : null, \n" +
+                "        \"revoke_date\" : null, \n" +
+                "        \"revoke_reason\" : null, \n" +
+                "        \"emails\" : \"425372106@qq.com\", \n" +
+                "        \"phones\" : \"13952093889\", \n" +
+                "        \"wechat_public_num\" : null, \n" +
+                "        \"logo\" : null, \n" +
+                "        \"crawled_time\" : \"2021-09-07 23:24:12\", \n" +
+                "        \"create_time\" : \"2021-09-07 23:24:12\", \n" +
+                "        \"update_time\" : \"2021-09-07 23:24:12\", \n" +
+                "        \"deleted\" : 0\n" +
+                "    }, \n" +
+                "    \"data\":{\"company_holder\" : [\n" +
+                "        {\n" +
+                "            \"rowkey\" : null, \n" +
+                "            \"company_id\" : \"a04963caf9fb971d639caff75a9bf653\", \n" +
+                "            \"company_name\" : \"南京绿野仙踪摄影有限公司\", \n" +
+                "            \"holder_name\" : \"潘振\", \n" +
+                "            \"holder_id\" : \"p1444360499ac9d07c02da55b73d52c48\", \n" +
+                "            \"holder_type\" : 1, \n" +
+                "            \"amount\" : 2.1, \n" +
+                "            \"capital\" : \"[{\\\"amomon\\\": \\\"2.1万人民币\\\", \\\"paymet\\\": null, \\\"time\\\": \\\"2014-05-30\\\"}]\", \n" +
+                "            \"capital_actual\" : \"[]\", \n" +
+                "            \"percent\" : 0.7, \n" +
+                "            \"create_time\" : \"2021-09-07 23:24:12\", \n" +
+                "            \"update_time\" : \"2021-09-07 23:24:12\", \n" +
+                "            \"deleted\" : 0\n" +
+                "        }, \n" +
+                "        {\n" +
+                "            \"rowkey\" : null, \n" +
+                "            \"company_id\" : \"a04963caf9fb971d639caff75a9bf653\", \n" +
+                "            \"company_name\" : \"南京绿野仙踪摄影有限公司\", \n" +
+                "            \"holder_name\" : \"夏天\", \n" +
+                "            \"holder_id\" : \"p68fedadbd70d991d15bde4e4fcd72d52\", \n" +
+                "            \"holder_type\" : 1, \n" +
+                "            \"amount\" : 0.9, \n" +
+                "            \"capital\" : \"[{\\\"amomon\\\": \\\"0.9万人民币\\\", \\\"paymet\\\": null, \\\"time\\\": \\\"2014-05-30\\\"}]\", \n" +
+                "            \"capital_actual\" : \"[]\", \n" +
+                "            \"percent\" : 0.3, \n" +
+                "            \"create_time\" : \"2021-09-07 23:24:12\", \n" +
+                "            \"update_time\" : \"2021-09-07 23:24:12\", \n" +
+                "            \"deleted\" : 0\n" +
+                "        }\n" +
+                "    ], \n" +
+                "    \"company_staff\" : [\n" +
+                "        {\n" +
+                "            \"rowkey\" : null, \n" +
+                "            \"company_id\" : \"a04963caf9fb971d639caff75a9bf653\", \n" +
+                "            \"company_name\" : \"南京绿野仙踪摄影有限公司\", \n" +
+                "            \"hid\" : \"p1444360499ac9d07c02da55b73d52c48\", \n" +
+                "            \"staff_name\" : \"潘振\", \n" +
+                "            \"staff_type\" : \"执行董事兼总经理\", \n" +
+                "            \"create_time\" : \"2021-09-07 23:24:12\", \n" +
+                "            \"update_time\" : \"2021-09-07 23:24:12\", \n" +
+                "            \"deleted\" : 0\n" +
+                "        }, \n" +
+                "        {\n" +
+                "            \"rowkey\" : null, \n" +
+                "            \"company_id\" : \"a04963caf9fb971d639caff75a9bf653\", \n" +
+                "            \"company_name\" : \"南京绿野仙踪摄影有限公司\", \n" +
+                "            \"hid\" : \"p68fedadbd70d991d15bde4e4fcd72d52\", \n" +
+                "            \"staff_name\" : \"夏天\", \n" +
+                "            \"staff_type\" : \"监事\", \n" +
+                "            \"create_time\" : \"2021-09-07 23:24:12\", \n" +
+                "            \"update_time\" : \"2021-09-07 23:24:12\", \n" +
+                "            \"deleted\" : 0\n" +
+                "        }\n" +
+                "    ]},\n" +
+                "    \"time\" : \"2021-09-07\", \n" +
+                "    \"company_cid\" : \"a04963caf9fb971d639caff75a9bf653\", \n" +
+                "    \"yhc_time\" : \"2021-09-08\", \n" +
+                "    \"yhc_source\" : 1\n" +
+                "}\n";
+        ObjectMapper objectMapper = new ObjectMapper();
+        CompanyDataReceive cdr = objectMapper.readValue(s, CompanyDataReceive.class);
+
+        System.out.println(cdr);
+
+    }
+}

+ 1 - 1
src/main/java/com/winhc/bigdata/filnk/java/entity/CompanyIndexEntity.java

@@ -1,4 +1,4 @@
-package com.winhc.bigdata.filnk.java.entity;
+package com.winhc.bigdata.flink.java.entity;
 
 import java.util.List;
 

+ 1 - 1
src/main/java/com/winhc/bigdata/filnk/java/entity/CompanyName.java

@@ -1,4 +1,4 @@
-package com.winhc.bigdata.filnk.java.entity;
+package com.winhc.bigdata.flink.java.entity;
 
 import lombok.AllArgsConstructor;
 import lombok.Getter;

+ 1 - 1
src/main/java/com/winhc/bigdata/filnk/java/entity/Entity.java

@@ -1,4 +1,4 @@
-package com.winhc.bigdata.filnk.java.entity;
+package com.winhc.bigdata.flink.java.entity;
 
 import lombok.AllArgsConstructor;
 import lombok.Getter;

+ 89 - 0
src/main/java/com/winhc/bigdata/flink/java/entity/PutCollection.java

@@ -0,0 +1,89 @@
+package com.winhc.bigdata.flink.java.entity;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.hbase.util.Bytes;
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * @author ZhangJi
+ * @since 2021-10-11 15:33
+ */
+public class PutCollection {
+
+
+    public Map<String, Map<String, SealedPut>> getData() {
+        return data;
+    }
+
+    public void setData(Map<String, Map<String, SealedPut>> data) {
+        this.data = data;
+    }
+
+    private Map<String, Map<String, SealedPut>> data = Maps.newHashMap();
+
+    public void putAll(String tableName, Collection<SealedPut> puts) {
+        data.putIfAbsent(tableName,Maps.newHashMap());
+        if (!data.containsKey(tableName)) {
+            data.put(tableName, Maps.newHashMap());
+        }
+        final Map<String, SealedPut> table = data.get(tableName);
+        for (SealedPut put : puts) {
+            final String hexRow = Bytes.toHex(put.getRowKey());
+            if (!table.containsKey(hexRow)) {
+                table.put(hexRow, put);
+            } else {
+                table.put(
+                        hexRow,
+                        table.get(hexRow).appendSealedPut(put)
+                );
+            }
+        }
+    }
+
+    public void put(String tableName, SealedPut put) {
+        data.putIfAbsent(tableName,Maps.newHashMap());
+//        if (!data.containsKey(tableName)) {
+//            data.put(tableName, Maps.newHashMap());
+//        }
+        final Map<String, SealedPut> table = data.get(tableName);
+        final String hexRow = Bytes.toHex(put.getRowKey());
+        if (!table.containsKey(hexRow)) {
+            table.put(hexRow, put);
+        } else {
+            table.put(
+                    hexRow,
+                    table.get(hexRow).appendSealedPut(put)
+            );
+        }
+    }
+
+
+    /**
+     * 合并多个PutCollection
+     *
+     * @param cs
+     * @return
+     */
+    public static PutCollection merge(PutCollection... cs) {
+        PutCollection c0 = new PutCollection();
+        for (PutCollection ci : cs) {
+            for (Map.Entry<String, Map<String, SealedPut>> entry : ci.data.entrySet()) {
+                c0.putAll(entry.getKey(), entry.getValue().values());
+            }
+        }
+        return c0;
+    }
+
+    public static PutCollection create(String tableName, Collection<SealedPut> puts) {
+        PutCollection result = new PutCollection();
+        result.putAll(tableName, puts);
+        return result;
+    }
+
+    public static PutCollection create(String tableName, SealedPut put) {
+        PutCollection result = new PutCollection();
+        result.put(tableName, put);
+        return result;
+    }
+}
+

+ 21 - 0
src/main/java/com/winhc/bigdata/flink/java/entity/ReceiveMeta.java

@@ -0,0 +1,21 @@
+package com.winhc.bigdata.flink.java.entity;
+
+import lombok.Data;
+
+/**
+ * @author ZhangJi
+ * @since 2021-10-14 10:07
+ */
+@Data
+public class ReceiveMeta {
+    private String companyId;
+    private String companyName;
+
+    public String getCompanyId() {
+        return companyId;
+    }
+
+    public String getCompanyName() {
+        return companyName;
+    }
+}

+ 238 - 0
src/main/java/com/winhc/bigdata/flink/java/entity/SealedPut.java

@@ -0,0 +1,238 @@
+package com.winhc.bigdata.flink.java.entity;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.stream.Collectors;
+
+/**
+ * 针对Put的一个封装,可以方便的进行序列化防止在某些框架下出现奇怪的错误
+ *
+ * @author ZhangJi
+ * @since 2021-10-11 15:33
+ */
+public class SealedPut {
+    public byte[] getRowKey() {
+        return rowKey;
+    }
+
+    /**
+     * Make `POJO` Happy
+     *
+     * @param rowKey
+     */
+    @Deprecated
+    public void setRowKey(byte[] rowKey) {
+        this.rowKey = rowKey;
+    }
+
+    /**
+     * Make `POJO` Happy
+     *
+     * @param resultData
+     */
+    @Deprecated
+    public void setResultData(SortedMap<String, SortedMap<String, byte[]>> resultData) {
+        this.resultData = resultData;
+    }
+
+    private byte[] rowKey;
+
+    public SortedMap<String, SortedMap<String, byte[]>> getResultData() {
+        return resultData;
+    }
+
+    /**
+     * 类型说明:
+     * <p>
+     * Map[ColumnFamily(HexString),Map[Qualifier(HexString),Value(Bytes)]]
+     */
+    private SortedMap<String, SortedMap<String, byte[]>> resultData = Maps.newTreeMap();
+
+    public SealedPut() {
+
+    }
+
+    /**
+     * 创建一个SealedPut
+     *
+     * @param row 行键信息
+     */
+    public SealedPut(byte[] row) {
+        rowKey = row;
+    }
+
+    /**
+     * 直接通过PUT创建
+     *
+     * @param hbasePut HBase的PUT
+     */
+    public SealedPut(Put hbasePut) {
+        this(hbasePut.getRow());
+        for (Map.Entry<byte[], List<Cell>> cfData : hbasePut.getFamilyCellMap().entrySet()) {
+            final byte[] columnFamily = cfData.getKey();
+            for (Cell cell : cfData.getValue()) {
+                put(columnFamily, CellUtil.cloneQualifier(cell), CellUtil.cloneValue(cell));
+            }
+        }
+    }
+
+    /**
+     * 将本Put和其他Put加在一起
+     *
+     * @param v
+     * @return
+     */
+    public SealedPut appendSealedPut(SealedPut v) {
+        return merge(this, v);
+    }
+
+    /**
+     * @return
+     */
+    public static SealedPut merge(SealedPut... vs) {
+        if (vs.length <= 1) {
+            throw new RuntimeException("No data need to merge.");
+        }
+        // Check is all the rowkey same
+        final SealedPut v0 = vs[0];
+        for (SealedPut vi : vs) {
+            if (!Bytes.equals(vi.rowKey, v0.rowKey)) {
+                throw new RuntimeException(
+                        String.format(
+                                "Can't merge Puts caused by row key not equal (v0=%s, vi=%s).",
+                                Bytes.toHex(v0.rowKey),
+                                Bytes.toHex(vi.rowKey)
+                        )
+                );
+            }
+        }
+        // Merge all put into one
+        final SealedPut newValue = new SealedPut(v0.rowKey);
+        for (SealedPut vi : vs) {
+            for (Map.Entry<String, SortedMap<String, byte[]>> value : vi.resultData.entrySet()) {
+                final byte[] cf = Bytes.fromHex(value.getKey());
+                for (Map.Entry<String, byte[]> field : value.getValue().entrySet()) {
+                    final byte[] qualifier = Bytes.fromHex(field.getKey());
+                    newValue.put(cf, qualifier, field.getValue());
+                }
+            }
+        }
+        return newValue;
+
+    }
+
+    /**
+     * 增加一列
+     * <p>
+     * 注意增加一列的时候必需按照行键顺序PUT
+     *
+     * @param family    列族信息
+     * @param qualifier 列标识位
+     * @param value     数据
+     */
+    public void put(byte[] family, byte[] qualifier, byte[] value) {
+        final String familyHex = Bytes.toHex(family);
+        final String qualifierHex = Bytes.toHex(qualifier);
+        resultData.putIfAbsent(familyHex, Maps.newTreeMap());
+/*        if (!resultData.containsKey(familyHex)) {
+            resultData.put(familyHex, Maps.newTreeMap());
+        }*/
+        resultData.get(familyHex).put(qualifierHex, value);
+    }
+
+    /**
+     * 转换为HBase可以纳得的Put
+     *
+     * @return
+     */
+    public Put toPut() {
+        return convertToPut(this);
+    }
+
+    public static List<Put> toPuts(Collection<SealedPut> sealedPuts) {
+        return sealedPuts.stream().map(SealedPut::toPut).collect(Collectors.toList());
+    }
+
+    /**
+     * 创建函数
+     *
+     * @param legacyPut
+     * @return
+     */
+    public static SealedPut createSealedPut(Put legacyPut) {
+        return new SealedPut(legacyPut);
+    }
+
+    /**
+     * 将SealedPut转换成HBase使用的Put
+     *
+     * @param sealedPut
+     * @return
+     */
+    public static Put convertToPut(SealedPut sealedPut) {
+        Put put = new Put(sealedPut.rowKey);
+        for (Map.Entry<String, SortedMap<String, byte[]>> familyValue : sealedPut.resultData.entrySet()) {
+            for (Map.Entry<String, byte[]> columnValue : familyValue.getValue().entrySet()) {
+                put.addColumn(Bytes.fromHex(familyValue.getKey()), Bytes.fromHex(columnValue.getKey()), columnValue.getValue());
+
+            }
+        }
+        return put;
+    }
+
+    @Override
+    public String toString() {
+        return getStringInHex();
+    }
+
+    public String getStringInHex() {
+        final StringBuilder sb = new StringBuilder(String.format(
+                "RowKey:%s\n", Bytes.toHex(getRowKey())
+        ));
+        for (Map.Entry<String, SortedMap<String, byte[]>> entry : getResultData().entrySet()) {
+            sb.append("  FamilyColumn:").append(entry.getKey()).append("\n");
+            for (Map.Entry<String, byte[]> qv : entry.getValue().entrySet()) {
+                sb.append("    Column: ").append(
+                        qv.getKey()
+                ).append(" -> ").append(
+                        Bytes.toHex(qv.getValue())
+                ).append("\n");
+            }
+        }
+        return sb.toString();
+    }
+
+    /**
+     * For debugging use only
+     *
+     * @return
+     */
+    @Deprecated
+    public String getStringInUTF8() {
+        final StringBuilder sb = new StringBuilder(String.format(
+                "RowKey:%s\n", Bytes.toHex(getRowKey())
+        ));
+        for (Map.Entry<String, SortedMap<String, byte[]>> entry : getResultData().entrySet()) {
+            final String columnFamily = Bytes.toString(Bytes.fromHex(entry.getKey()));
+            sb.append("  FamilyColumn: ").append(columnFamily).append("\n");
+            for (Map.Entry<String, byte[]> qv : entry.getValue().entrySet()) {
+                final String qualifier = Bytes.toString(Bytes.fromHex(qv.getKey()));
+                sb.append("    Column: ").append(
+                        qualifier
+                ).append(" -> ").append(
+                        Bytes.toHex(qv.getValue())
+                ).append("\n");
+            }
+        }
+        return sb.toString();
+    }
+}

+ 83 - 0
src/main/java/com/winhc/bigdata/flink/java/entity/ValidationExceptionEntity.java

@@ -0,0 +1,83 @@
+package com.winhc.bigdata.flink.java.entity;
+
+import com.alibaba.fastjson.JSON;
+
+import java.util.Objects;
+
+/**
+ * @author ZhangJi
+ * @since 2021-10-18 17:37
+ */
+public class ValidationExceptionEntity<T> {
+    private String name;
+    private String message;
+    private T data;
+
+    public ValidationExceptionEntity() {
+    }
+
+    public ValidationExceptionEntity(String name, String message, T data) {
+        this.name = name;
+        this.message = message;
+        this.data = data;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public String getMessage() {
+        return message;
+    }
+
+    public void setMessage(String message) {
+        this.message = message;
+    }
+
+    public T getData() {
+        return data;
+    }
+
+    public void setData(T data) {
+        this.data = data;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        ValidationExceptionEntity<?> that = (ValidationExceptionEntity<?>) o;
+
+        if (!Objects.equals(name, that.name)) return false;
+        if (!Objects.equals(message, that.message)) return false;
+        return Objects.equals(data, that.data);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = name != null ? name.hashCode() : 0;
+        result = 31 * result + (message != null ? message.hashCode() : 0);
+        result = 31 * result + (data != null ? data.hashCode() : 0);
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "ValidationExceptionEntity{" +
+                "name='" + name + '\'' +
+                ", message='" + message + '\'' +
+                ", data=" + data +
+                '}';
+    }
+
+    public static <T> ValidationExceptionEntity<T> of(Exception e, T data) {
+        return new ValidationExceptionEntity<>(e.getClass().getName(), e.getMessage(), data);
+    }
+
+
+}

+ 2 - 2
src/main/java/com/winhc/bigdata/filnk/java/sink/OdpsSinkBuilder4J.java

@@ -1,4 +1,4 @@
-package com.winhc.bigdata.filnk.java.sink;
+package com.winhc.bigdata.flink.java.sink;
 
 import com.alibaba.ververica.connectors.common.sink.TupleOutputFormatSinkFunction;
 import com.alibaba.ververica.connectors.odps.sink.OdpsOutputFormat;
@@ -7,7 +7,7 @@ import com.aliyun.odps.Odps;
 import com.aliyun.odps.Table;
 import com.aliyun.odps.account.AliyunAccount;
 import com.aliyun.odps.type.TypeInfo;
-import com.winhc.bigdata.filnk.java.constant.EnvConst;
+import com.winhc.bigdata.flink.java.constant.EnvConst;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;

+ 7 - 4
src/main/java/com/winhc/bigdata/filnk/java/source/KafkaSourceBuilder4J.java

@@ -1,6 +1,7 @@
-package com.winhc.bigdata.filnk.java.source;
+package com.winhc.bigdata.flink.java.source;
 
-import com.winhc.bigdata.filnk.java.constant.EnvConst;
+import com.winhc.bigdata.flink.java.constant.EnvConst;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.connector.kafka.source.KafkaSource;
 import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
 import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetResetStrategy;
@@ -21,11 +22,13 @@ public class KafkaSourceBuilder4J {
         return buildSourceFunction(topic, offsets);
     }
 
+    //TODO unfixed: Deserialization schema is required but not provided.
     public static KafkaSource<String> buildSourceFunction(String topic, OffsetsInitializer offsetsInitializer) {
-        String bootstrapServers = EnvConst.getValue("winhc.bootstrap-servers");
-        String groupId = EnvConst.getValue("winhc.group-id");
+        String bootstrapServers = EnvConst.getValue("winhc.kafka.bootstrap-servers");
+        String groupId = EnvConst.getValue("winhc.kafka.group-id");
         KafkaSource<String> build = KafkaSource.<String>builder()
                 .setBootstrapServers(bootstrapServers)
+                .setValueOnlyDeserializer(new SimpleStringSchema())
                 .setGroupId(groupId)
                 .setTopics(topic)
                 .setStartingOffsets(offsetsInitializer)

+ 1 - 1
src/main/java/com/winhc/bigdata/filnk/java/source/OdpsSourceBuilder4J.java

@@ -1,4 +1,4 @@
-package com.winhc.bigdata.filnk.java.source;
+package com.winhc.bigdata.flink.java.source;
 
 import com.alibaba.ververica.connectors.odps.ODPSStreamSource;
 import com.alibaba.ververica.connectors.odps.OdpsConf;

+ 11 - 4
src/main/java/com/winhc/bigdata/filnk/java/utils/EnvironmentProperties.java

@@ -1,8 +1,9 @@
-package com.winhc.bigdata.filnk.java.utils;
+package com.winhc.bigdata.flink.java.utils;
 
 import org.apache.commons.lang3.StringUtils;
 import org.yaml.snakeyaml.Yaml;
 
+import java.io.IOException;
 import java.io.InputStream;
 import java.util.HashMap;
 import java.util.Map;
@@ -22,11 +23,11 @@ public class EnvironmentProperties {
     }
 
     public Map<String, String> getMapByPrefix(String prefix) {
-        return source.entrySet().stream().filter(e->e.getKey().startsWith(prefix)).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue,(e1, e2)->e1));
+        return source.entrySet().stream().filter(e -> e.getKey().startsWith(prefix)).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e1));
     }
 
     public EnvironmentProperties() {
-        Map<String, String> source = getYmlByFileName(null,null);
+        Map<String, String> source = getYmlByFileName(null, null);
         String profile = source.getOrDefault("profile.activate", null);
         if (StringUtils.isNotBlank(profile)) {
             source = getYmlByFileName("env-" + profile + ".yml", source);
@@ -34,6 +35,11 @@ public class EnvironmentProperties {
         this.source = source;
     }
 
+    public Map<String, String> getSource() {
+        return source;
+    }
+
+
     /**
      * 根据文件名获取yml的文件内容
      *
@@ -91,8 +97,9 @@ public class EnvironmentProperties {
     }
 
 
-    public static void main(String[] args) {
+    public static void main(String[] args) throws IOException {
         EnvironmentProperties environmentProperties = new EnvironmentProperties();
+
         String value = environmentProperties.getValue("es.a");
         System.out.println(value);
     }

+ 1 - 1
src/main/java/com/winhc/bigdata/filnk/java/utils/FieldNameUtils.java

@@ -1,4 +1,4 @@
-package com.winhc.bigdata.filnk.java.utils;
+package com.winhc.bigdata.flink.java.utils;
 
 import java.util.Locale;
 

+ 2 - 2
src/main/java/com/winhc/bigdata/filnk/java/utils/Json2EntityUtils.java

@@ -1,8 +1,8 @@
-package com.winhc.bigdata.filnk.java.utils;
+package com.winhc.bigdata.flink.java.utils;
 
 import cn.hutool.core.util.ClassUtil;
 import com.alibaba.fastjson.JSONObject;
-import com.winhc.bigdata.filnk.java.BaseEntity;
+import com.winhc.bigdata.flink.java.BaseEntity;
 
 import java.util.*;
 import java.util.stream.Collectors;

+ 21 - 0
src/main/resources/log4j2.xml

@@ -0,0 +1,21 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration monitorInterval="5">
+    <Properties>
+        <property name="LOG_PATTERN" value="%date{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n" />
+        <property name="LOG_LEVEL" value="INFO" />
+    </Properties>
+
+    <appenders>
+        <console name="Console" target="SYSTEM_OUT">
+            <PatternLayout pattern="${LOG_PATTERN}"/>
+            <ThresholdFilter level="${LOG_LEVEL}" onMatch="ACCEPT" onMismatch="DENY"/>
+        </console>
+    </appenders>
+
+    <loggers>
+        <root level="${LOG_LEVEL}">
+            <appender-ref ref="Console"/>
+        </root>
+    </loggers>
+
+</configuration>

+ 173 - 0
src/main/resources/schema/company.schema

@@ -0,0 +1,173 @@
+{
+  "$schema": "http://json-schema.org/draft-04/schema#",
+  "type": ["null","object"],
+  "properties": {
+    "company_id": {
+      "type": ["string"]
+    },
+    "new_cid": {
+      "type": ["null","string"]
+    },
+    "base": {
+      "type": ["null","string"]
+    },
+    "name": {
+      "type": ["null","string"]
+    },
+    "name_en": {
+      "type": ["null","string"]
+    },
+    "name_alias": {
+      "type": ["null","string"]
+    },
+    "history_names": {
+      "type": ["null","array"],
+      "items": [
+        {
+          "type": ["string"]
+        }
+      ]
+    },
+    "legal_entity_id": {
+      "type": ["null","string"]
+    },
+    "legal_entity_name": {
+      "type": ["null","string"]
+    },
+    "legal_entity_type": {
+      "type": ["null","integer"]
+    },
+    "reg_number": {
+      "type": ["null","string"]
+    },
+    "company_org_type": {
+      "type": ["null","string"]
+    },
+    "reg_location": {
+      "type": ["null","string"]
+    },
+    "estiblish_time": {
+      "type": ["null","string"]
+    },
+    "from_time": {
+      "type": ["null","string"]
+    },
+    "to_time": {
+      "type": ["null","string"]
+    },
+    "business_scope": {
+      "type": ["null","string"]
+    },
+    "reg_institute": {
+      "type": ["null","string"]
+    },
+    "approved_time": {
+      "type": ["null","string"]
+    },
+    "reg_status": {
+      "type": ["null","string"]
+    },
+    "reg_capital": {
+      "type": ["null","string"]
+    },
+    "org_approved_institute": {
+      "type": ["null","string"]
+    },
+    "parent_company_id": {
+      "type": ["null","string"]
+    },
+    "company_type": {
+      "type": ["null","integer"]
+    },
+    "credit_code": {
+      "type": ["null","string"]
+    },
+    "org_number": {
+      "type": ["null","string"]
+    },
+    "score": {
+      "type": ["null","number"]
+    },
+    "cate_first_code": {
+      "type": ["null","string"]
+    },
+    "cate_second_code": {
+      "type": ["null","string"]
+    },
+    "cate_third_code": {
+      "type": ["null","string"]
+    },
+    "lat": {
+      "type": ["null","string"]
+    },
+    "lng": {
+      "type": ["null","string"]
+    },
+    "province_code": {
+      "type": ["null","string"]
+    },
+    "city_code": {
+      "type": ["null","string"]
+    },
+    "county_code": {
+      "type": ["null","string"]
+    },
+    "reg_capital_amount": {
+      "type": ["null","integer"]
+    },
+    "reg_capital_currency": {
+      "type": ["null","string"]
+    },
+    "actual_capital_amount": {
+      "type": ["null","integer"]
+    },
+    "actual_capital_currency": {
+      "type": ["null","string"]
+    },
+    "reg_status_std": {
+      "type": ["null","string"]
+    },
+    "social_security_staff_num": {
+      "type": ["null","integer"]
+    },
+    "cancel_date": {
+      "type": ["null","string"]
+    },
+    "cancel_reason": {
+      "type": ["null","string"]
+    },
+    "revoke_date": {
+      "type": ["null","string"]
+    },
+    "revoke_reason": {
+      "type": ["null","string"]
+    },
+    "emails": {
+      "type": ["null","string"]
+    },
+    "phones": {
+      "type": ["null","string"]
+    },
+    "wechat_public_num": {
+      "type": ["null","string"]
+    },
+    "logo": {
+      "type": ["null","string"]
+    },
+    "crawled_time": {
+      "type": ["null","string"]
+    },
+    "create_time": {
+      "type": ["null","string"]
+    },
+    "update_time": {
+      "type": ["null","string"]
+    },
+    "deleted": {
+      "type": ["null","integer"]
+    }
+  },
+  "required": [
+    "company_id"
+  ]
+}

+ 67 - 0
src/main/resources/schema/company_holder.schema

@@ -0,0 +1,67 @@
+{
+  "$schema": "http://json-schema.org/draft-04/schema#",
+  "type": ["null","object"],
+  "properties": {
+    "capital_actual": {
+      "type": ["null","string"]
+    },
+    "amount": {
+      "type": ["null","number"]
+    },
+    "capital": {
+      "type": ["null","array"],
+      "items": [
+        {
+          "type": ["object"],
+          "properties": {
+            "amomon": {
+              "type": ["string"]
+            },
+            "paymet": {
+              "type": ["null","string"]
+            },
+            "time": {
+              "type": ["null","string"]
+            }
+          },
+          "required": [
+            "amomon"
+          ]
+        }
+      ]
+    },
+    "company_id": {
+      "type": ["string"]
+    },
+    "create_time": {
+      "type": ["null","string"]
+    },
+    "holder_type": {
+      "type": ["integer"]
+    },
+    "percent": {
+      "type": ["null","number"]
+    },
+    "update_time": {
+      "type": ["null","string"]
+    },
+    "deleted": {
+      "type": ["null","integer"]
+    },
+    "company_name": {
+      "type": ["null","string"]
+    },
+    "holder_id": {
+      "type": ["string"]
+    },
+    "holder_name": {
+      "type": ["string"]
+    }
+  },
+  "required": [
+    "company_id",
+    "company_name",
+    "holder_type",
+    "holder_name"
+  ]
+}

+ 34 - 0
src/main/resources/schema/company_staff.schema

@@ -0,0 +1,34 @@
+{
+  "$schema": "http://json-schema.org/draft-04/schema#",
+  "type": ["null","object"],
+  "properties": {
+    "staff_name": {
+      "type": ["string"]
+    },
+    "hid": {
+      "type": ["null","string"]
+    },
+    "update_time": {
+      "type": ["null","string"]
+    },
+    "deleted": {
+      "type": ["null","integer"]
+    },
+    "company_id": {
+      "type": ["string"]
+    },
+    "staff_type": {
+      "type": ["null","string"]
+    },
+    "create_time": {
+      "type": ["null","string"]
+    },
+    "company_name": {
+      "type": ["null","string"]
+    }
+  },
+  "required": [
+    "staff_name",
+    "company_id"
+  ]
+}

+ 1 - 1
src/main/scala/com/winhc/bigdata/flink/CompanyData.scala

@@ -1,6 +1,6 @@
 package com.winhc.bigdata.flink
 
-import com.winhc.bigdata.filnk.java.BaseEntity
+import com.winhc.bigdata.flink.java.BaseEntity
 import com.winhc.bigdata.flink.implicits._
 
 /**

+ 36 - 0
src/main/scala/com/winhc/bigdata/flink/config/ArgsCompanyJob.scala

@@ -0,0 +1,36 @@
+package com.winhc.bigdata.flink.config
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.winhc.bigdata.flink.event.{ElasticSearchInfo, HbaseInfo}
+import com.winhc.bigdata.flink.utils.BaseUtils.cleanup
+import com.winhc.bigdata.flink.utils.{BaseUtils, PreUDF}
+
+/**
+ * @author ZhangJi
+ * @since 2021-10-18 11:42
+ */
+case class ArgsCompanyJob(
+                           //                             tn: String,
+                           hbaseInfo: HbaseInfo,
+                           esInfo: ElasticSearchInfo,
+                           md5_fields: Seq[String],
+                           rowkey_udf: Function2[String, JsonNode, String] = null
+
+                         )
+
+object ArgsCompanyJob {
+  val job_args = Map(
+    "company_holder" -> ArgsCompanyJob(HbaseInfo("NG_RT_COMPANY_HOLDER", "F"),
+      ElasticSearchInfo("winhc_rt_index_company_holder", "_doc"),
+      null,
+      (companyId, j) => {
+        val row = PreUDF.company_holder_rowkey(j.get("holder_type").asInt(), j.get("holder_id").asText(), j.get("holder_name").asText())
+        BaseUtils.concatws("_", companyId, BaseUtils.md5(cleanup(row)))
+      }),
+    "company_staff" -> ArgsCompanyJob(
+      HbaseInfo("NG_RT_COMPANY_STAFF", "F"),
+      ElasticSearchInfo("winhc_rt_index_company_staff", "_doc"),
+      Seq("staff_name")
+    )
+  )
+}

+ 1 - 1
src/main/scala/com/winhc/bigdata/flink/config/ElasticsearchConfig.scala

@@ -1,6 +1,6 @@
 package com.winhc.bigdata.flink.config
 
-import com.winhc.bigdata.filnk.java.constant.EnvConst
+import com.winhc.bigdata.flink.java.constant.EnvConst
 import org.apache.flink.elasticsearch6.shaded.org.apache.http.HttpHost
 import org.apache.flink.elasticsearch6.shaded.org.apache.http.auth.{AuthScope, UsernamePasswordCredentials}
 import org.apache.flink.elasticsearch6.shaded.org.apache.http.client.config.RequestConfig

+ 19 - 1
src/main/scala/com/winhc/bigdata/flink/config/HbaseConfig.scala

@@ -2,7 +2,8 @@ package com.winhc.bigdata.flink.config
 
 
 import com.alibaba.ververica.connector.shaded.hadoop3.org.apache.hadoop.conf.Configuration
-import com.winhc.bigdata.filnk.java.constant.EnvConst
+import com.winhc.bigdata.flink.java.constant.EnvConst
+import org.apache.flink.configuration.{ConfigOption, ConfigOptions}
 import org.apache.hadoop.hbase.HBaseConfiguration
 
 import java.util
@@ -14,6 +15,7 @@ import scala.collection.JavaConverters._
  */
 object HbaseConfig {
   def getHbaseConfiguration(): Configuration = {
+
     val map: util.Map[String, String] = EnvConst.getMapByPrefix("winhc.hbase.config")
     val configuration: Configuration = HBaseConfiguration.create
     for (elem <- map.asScala) {
@@ -21,4 +23,20 @@ object HbaseConfig {
     }
     configuration
   }
+
+  /**
+   * open函数中获取参数
+   * @param parameters Flink RichFunction Configuration
+   * @param configs Flink ConfigOption
+   * @return HBaseConfiguration
+   */
+  def getHbaseConfiguration(parameters: org.apache.flink.configuration.Configuration, configs: Seq[ConfigOption[String]]): Configuration = {
+    val configuration: Configuration = HBaseConfiguration.create
+    for (config <- configs) {
+      val value = parameters.getString(config)
+      val key = config.key()
+      configuration.set(key.replace("winhc.hbase.config.", ""), value)
+    }
+    configuration
+  }
 }

+ 12 - 0
src/main/scala/com/winhc/bigdata/flink/event/DimEntity.scala

@@ -0,0 +1,12 @@
+package com.winhc.bigdata.flink.event
+
+/**
+ * @author ZhangJi
+ * @since 2021-10-13 13:52
+ */
+trait DimEntity {
+  def key: String = companyId
+
+  def companyId: String
+
+}

+ 18 - 0
src/main/scala/com/winhc/bigdata/flink/event/MetaInfo.scala

@@ -0,0 +1,18 @@
+package com.winhc.bigdata.flink.event
+
+/**
+ * @author ZhangJi
+ * @since 2021-10-13 14:40
+ */
+
+
+case class MetaInfo(company: Option[CompanyInfo], es: Option[ElasticSearchInfo], hbase: Option[HbaseInfo])
+
+case class CompanyInfo(companyId: String, companyName: String)
+
+case class ElasticSearchInfo(_index: String, _type: String)
+
+case class HbaseInfo(table: String, cf: String)
+
+
+

+ 34 - 0
src/main/scala/com/winhc/bigdata/flink/event/RowBaseDimEntity.scala

@@ -0,0 +1,34 @@
+package com.winhc.bigdata.flink.event
+
+import com.alibaba.fastjson.JSONObject
+import com.fasterxml.jackson.databind.JsonNode
+import com.winhc.bigdata.flink.utils.RowDataUtils
+import org.json4s.JsonAST.JObject
+import org.json4s.jackson.JsonMethods._
+/**
+ * @author ZhangJi
+ * @since 2021-10-13 13:56
+ */
+object RowBaseDimEntity {
+
+
+  def ofJsonNode(entry:(String,Seq[JsonNode]), companyMeta: (String, String)): RowBaseDimEntity = {
+    val (dim, data) = entry
+    ofJObject((dim,data.map(j=>fromJsonNode(j).asInstanceOf[JObject])),companyMeta)
+  }
+  def ofJObject(entry:(String,Seq[JObject]), companyMeta: (String, String)): RowBaseDimEntity = {
+    val (dim, data) = entry
+    val mapData = data.map(j => (RowDataUtils.generateRowKey(dim, asJsonNode(j)), j)).filter(t=>t._1!=null).toMap
+    val (companyId, companyName) = companyMeta
+    new RowBaseDimEntity(RowDataUtils.generateMetaInfo(dim, companyId, companyName), mapData, Map.empty)
+  }
+
+
+}
+
+case class RowBaseDimEntity(metaInfo: MetaInfo, data: Map[String, JObject], var old: Map[String, JObject]) extends DimEntity {
+  override def companyId: String = {
+    Some(metaInfo).flatMap(m => m.company).map(c => c.companyId).orNull
+  }
+
+}

Diferenças do arquivo suprimidas por serem muito extensas
+ 35 - 0
src/main/scala/com/winhc/bigdata/flink/event/UpdateEntity.scala


+ 27 - 0
src/main/scala/com/winhc/bigdata/flink/event/ValidationExceptionEntity.scala

@@ -0,0 +1,27 @@
+package com.winhc.bigdata.flink.event
+
+/**
+ * @author ZhangJi
+ * @since 2021-10-19 17:28
+ */
+
+
+import java.util.Objects
+import scala.beans.BeanProperty
+
+
+/**
+ * @author ZhangJi
+ * @since 2021-10-18 17:37
+ */
+object ValidationExceptionEntity {
+  def of[T](e: Exception, data: T) = new ValidationExceptionEntity[T](e.getClass.getName, e.getMessage, data)
+}
+
+case class ValidationExceptionEntity[T](@BeanProperty var name: String,
+                                        @BeanProperty var message: String,
+                                        @BeanProperty var data: T) {
+
+
+}
+

+ 9 - 0
src/main/scala/com/winhc/bigdata/flink/event/ValidationResult.scala

@@ -0,0 +1,9 @@
+package com.winhc.bigdata.flink.event
+
+import org.json4s.JsonAST.JObject
+
+/**
+ * @author ZhangJi
+ * @since 2021-10-19 11:24
+ */
+case class ValidationResult(success:Map[String,Seq[JObject]],error:Map[String,Seq[JObject]])

+ 74 - 25
src/main/scala/com/winhc/bigdata/flink/func/HbaseAsyncFunction.scala

@@ -1,52 +1,101 @@
 package com.winhc.bigdata.flink.func
 
-import com.alibaba.ververica.connector.cloudhbase.util.HBaseConfigurationUtil
 import com.winhc.bigdata.flink.config.HbaseConfig
+import com.winhc.bigdata.flink.event.UpdateEntity
 import com.winhc.bigdata.flink.implicits._
-import org.apache.flink.configuration.Configuration
+import com.winhc.bigdata.flink.utils.BaseUtils
+import org.apache.flink.configuration.{ConfigOptions, Configuration}
 import org.apache.flink.streaming.api.scala.async.{ResultFuture, RichAsyncFunction}
 import org.apache.hadoop.hbase.TableName
 import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Get, HTable}
+import org.json4s.JsonAST.JObject
+import org.json4s.jackson.JsonMethods._
 import org.slf4j.LoggerFactory
 
+import scala.collection.JavaConverters._
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Future
+import scala.language.postfixOps
+import scala.util.{Failure, Success}
+
 /**
  * @author: XuJiakai
  * @date: 2021/8/31 17:07
  */
-case class HbaseAsyncFunction() extends RichAsyncFunction[(String, String), (String, String, String)] {
+case class HbaseAsyncFunction() extends RichAsyncFunction[UpdateEntity, UpdateEntity] {
   private val LOG = LoggerFactory.getLogger(classOf[HbaseAsyncFunction])
 
-  @transient private var connection: Connection = null
+  @transient private var connection: Connection = _
 
 
+  override def open(parameters: Configuration): Unit = {
+    /*    import com.alibaba.ververica.connector.shaded.hadoop3.org.apache.hadoop.conf.Configuration
+        val configuration: Configuration = HbaseConfig.getHbaseConfiguration()
+        val config: Configuration = HBaseConfigurationUtil.prepareRuntimeConfiguration(HBaseConfigurationUtil.serializeConfiguration(configuration), LOG)
+       connection = ConnectionFactory.createConnection(config)
+       */ val quorum = ConfigOptions.key("winhc.hbase.config.hbase.zookeeper.quorum").stringType().defaultValue("hb-proxy-pub-uf6m8e1nu4ivp06m5-master1-001.hbase.rds.aliyuncs.com:2181,hb-proxy-pub-uf6m8e1nu4ivp06m5-master2-001.hbase.rds.aliyuncs.com:2181,hb-proxy-pub-uf6m8e1nu4ivp06m5-master3-001.hbase.rds.aliyuncs.com:2181")
+    val period = ConfigOptions.key("winhc.hbase.config.hbase.client.scanner.timeout.period").stringType().defaultValue("120000")
+    val number = ConfigOptions.key("winhc.hbase.config.hbase.client.retries.number").stringType().defaultValue("5")
+    val pause = ConfigOptions.key("winhc.hbase.config.hbase.client.pause").stringType().defaultValue("1000")
+    val perserver_tasks = ConfigOptions.key("winhc.hbase.config.hbase.client.max.perserver.tasks").stringType().defaultValue("10")
+    val perregion_tasks = ConfigOptions.key("winhc.hbase.config.hbase.client.max.perregion.tasks").stringType().defaultValue("10")
+    val maxsize = ConfigOptions.key("winhc.hbase.config.hbase.client.keyvalue.maxsize").stringType().defaultValue("524288000")
+    val size = ConfigOptions.key("winhc.hbase.config.base.client.ipc.pool.size").stringType().defaultValue("5")
+    val retry = ConfigOptions.key("winhc.hbase.config.zookeeper.recovery.retry").stringType().defaultValue("5")
+    val configuration = HbaseConfig.getHbaseConfiguration(parameters, Seq(
+      quorum,
+      period,
+      number,
+      pause,
+      perserver_tasks,
+      perregion_tasks,
+      maxsize,
+      size,
+      retry
+    ))
+    connection = ConnectionFactory.createConnection(configuration)
 
+  }
 
+  /**
+   * 内部函数必须是异步,非异步有bug
+   *
+   * @param input        tn, rowkey
+   * @param resultFuture result
+   */
+  override def asyncInvoke(input: UpdateEntity, resultFuture: ResultFuture[UpdateEntity]): Unit = {
 
-  override def open(parameters: Configuration): Unit = {
-    import com.alibaba.ververica.connector.shaded.hadoop3.org.apache.hadoop.conf.Configuration
-    val configuration: Configuration = HbaseConfig.getHbaseConfiguration()
-    val config: Configuration = HBaseConfigurationUtil.prepareRuntimeConfiguration(HBaseConfigurationUtil.serializeConfiguration(configuration), LOG)
-    connection = ConnectionFactory.createConnection(config)
-  }
+    Future {
+      val company = input.company
+      val companyId = input.companyId()
+      if (company != null && companyId != null) {
+        val companyTable = connection.getTable(TableName.valueOf("NG_COMPANY")).asInstanceOf[HTable]
+        val get = new Get(companyId.getBytes)
+        val companyResult = companyTable.get(get)
+        val oldCompany = parse(companyResult.toJson()).asInstanceOf[JObject]
+        input.oldCompany = oldCompany
+        companyTable.close()
+      }
+      input.dims.foreach {
+        t => {
+          val (k, v) = t
+          val gs = v.data.keys.map(key => new Get(key.getBytes)).toList.asJava
+          val ht = connection.getTable(TableName.valueOf(BaseUtils.hbaseTableName(k))).asInstanceOf[HTable]
+          val rs = ht.get(gs)
+          val rss = rs.map(_.toJson).map(parse(_).asInstanceOf[JObject])
+          val rowkeys = rs.map(r => new String(r.getRow))
+          val zrs=rowkeys zip rss toMap
 
-  override def asyncInvoke(input: (String, String), resultFuture: ResultFuture[(String, String, String)]): Unit = {
-    val tn = input._1
-    val rowkey = input._2
-    try {
-      val table = connection.getTable(TableName.valueOf(tn.toUpperCase())).asInstanceOf[HTable]
-      val get = new Get(rowkey.getBytes)
-      val result = table.get(get)
-      val string = result.toJson
-      resultFuture.complete(Seq((tn, rowkey, string)))
-    } catch {
-      case ex: Exception => {
-        resultFuture.completeExceptionally(ex)
+          v.old=zrs
+        }
       }
+
+    } onComplete {
+      case Success(r) => resultFuture.complete(Seq(input))
+      case Failure(ex) => resultFuture.completeExceptionally(ex)
     }
   }
-  //  override def timeout(input: (String, String), resultFuture: ResultFuture[(String, String, String)]): Unit = {
-  //
-  //  }
+
 
   override def close(): Unit = {
     if (connection != null) {

+ 54 - 11
src/main/scala/com/winhc/bigdata/flink/func/HbaseSinkFunction.scala

@@ -1,28 +1,62 @@
 package com.winhc.bigdata.flink.func
 
-import com.alibaba.fastjson.JSONObject
+
 import com.winhc.bigdata.flink.config.HbaseConfig
-import org.apache.flink.configuration.Configuration
+import com.winhc.bigdata.flink.java.entity.{PutCollection, SealedPut}
+import org.apache.flink.configuration.{ConfigOptions, Configuration}
 import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
 import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
+import org.apache.hadoop.hbase.TableName
 import org.apache.hadoop.hbase.client.{BufferedMutator, Connection, ConnectionFactory, RetriesExhaustedWithDetailsException}
 
+import java.util.concurrent.atomic.AtomicReference
+import scala.collection.JavaConverters.asScalaSetConverter
+
 /**
  * @author: XuJiakai
  * @date: 2021/9/1 17:52
  */
-case class HbaseSinkFunction(tableName:String,jsonObject:JSONObject) extends RichSinkFunction[JSONObject] with CheckpointedFunction with BufferedMutator.ExceptionListener {
-  var connection: Connection = null
+class HbaseSinkFunction() extends RichSinkFunction[PutCollection] with CheckpointedFunction with BufferedMutator.ExceptionListener {
+  var connection: Connection = _
+  private val failureThrowable: AtomicReference[Throwable] = new AtomicReference[Throwable]
 
   override def open(parameters: Configuration): Unit = {
-    val configuration = HbaseConfig.getHbaseConfiguration()
+    val quorum = ConfigOptions.key("winhc.hbase.config.hbase.zookeeper.quorum").stringType().defaultValue("hb-proxy-pub-uf6m8e1nu4ivp06m5-master1-001.hbase.rds.aliyuncs.com:2181,hb-proxy-pub-uf6m8e1nu4ivp06m5-master2-001.hbase.rds.aliyuncs.com:2181,hb-proxy-pub-uf6m8e1nu4ivp06m5-master3-001.hbase.rds.aliyuncs.com:2181")
+    val period = ConfigOptions.key("winhc.hbase.config.hbase.client.scanner.timeout.period").stringType().defaultValue("120000")
+    val number = ConfigOptions.key("winhc.hbase.config.hbase.client.retries.number").stringType().defaultValue("5")
+    val pause = ConfigOptions.key("winhc.hbase.config.hbase.client.pause").stringType().defaultValue("1000")
+    val perserver_tasks = ConfigOptions.key("winhc.hbase.config.hbase.client.max.perserver.tasks").stringType().defaultValue("10")
+    val perregion_tasks = ConfigOptions.key("winhc.hbase.config.hbase.client.max.perregion.tasks").stringType().defaultValue("10")
+    val maxsize = ConfigOptions.key("winhc.hbase.config.hbase.client.keyvalue.maxsize").stringType().defaultValue("524288000")
+    val size = ConfigOptions.key("winhc.hbase.config.base.client.ipc.pool.size").stringType().defaultValue("5")
+    val retry = ConfigOptions.key("winhc.hbase.config.zookeeper.recovery.retry").stringType().defaultValue("5")
+    val configuration = HbaseConfig.getHbaseConfiguration(parameters, Seq(
+      quorum,
+      period,
+      number,
+      pause,
+      perserver_tasks,
+      perregion_tasks,
+      maxsize,
+      size,
+      retry
+    ))
     connection = ConnectionFactory.createConnection(configuration)
   }
 
-
-  override def invoke(value: JSONObject, context: SinkFunction.Context): Unit = {
-
+  //TODO 按批写入
+  override def invoke(value: PutCollection, context: SinkFunction.Context): Unit = {
+    value
+      .getData
+      .entrySet()
+      .asScala.map(e => (e.getKey, e.getValue))
+      .par
+      .foreach(t => {
+        val (k, v) = t
+        val table = connection.getTable(TableName.valueOf(k))
+        table.put(SealedPut.toPuts(v.values()))
+      })
   }
 
   override def close(): Unit = {
@@ -31,9 +65,18 @@ case class HbaseSinkFunction(tableName:String,jsonObject:JSONObject) extends Ric
     }
   }
 
-  override def snapshotState(context: FunctionSnapshotContext): Unit = ???
+  private def checkErrorAndRethrow(): Unit = {
+    val cause = failureThrowable.get
+    if (cause != null) throw new RuntimeException("An error occurred in HbaseSink.", cause)
+  }
+
+  override def snapshotState(context: FunctionSnapshotContext): Unit = {
+    checkErrorAndRethrow()
+  }
 
-  override def initializeState(context: FunctionInitializationContext): Unit = ???
+  override def initializeState(context: FunctionInitializationContext): Unit = {}
 
-  override def onException(e: RetriesExhaustedWithDetailsException, bufferedMutator: BufferedMutator): Unit = ???
+  override def onException(e: RetriesExhaustedWithDetailsException, bufferedMutator: BufferedMutator): Unit = {
+    failureThrowable.compareAndSet(null, e)
+  }
 }

+ 8 - 4
src/main/scala/com/winhc/bigdata/flink/implicits/CaseClass2JsonHelper.scala

@@ -1,8 +1,13 @@
 package com.winhc.bigdata.flink.implicits
 
+
+import com.winhc.bigdata.flink.event.ValidationExceptionEntity
+import com.winhc.bigdata.flink.java.entity.CompanyDataReceive
+import com.winhc.bigdata.flink.utils.BaseUtils
 import org.apache.hadoop.hbase.client.Result
 import org.json4s.jackson.Serialization
 import org.json4s.{Formats, NoTypeHints}
+
 /**
  * @author: XuJiakai
  * @date: 2020/11/23 10:51
@@ -13,10 +18,9 @@ case class CaseClass2JsonHelper[A <: AnyRef](that: A) {
       return null
     }
     that match {
-      case result: Result =>
-        result.toJsonString()
-      case _ =>
-        Serialization.write(that)
+      case result: Result => result.toJsonString()
+      case x: ValidationExceptionEntity[CompanyDataReceive] => BaseUtils.objectmapper.writeValueAsString(x)
+      case _ => Serialization.write(that)
     }
   }
 }

+ 15 - 2
src/main/scala/com/winhc/bigdata/flink/jobs/CompanyStreamJob.scala

@@ -1,9 +1,14 @@
 package com.winhc.bigdata.flink.jobs
 
+import com.winhc.bigdata.flink.java.constant.EnvConst
 import com.winhc.bigdata.flink.jobs.company_job_step._
 import com.winhc.bigdata.flink.source.KafkaSourceBuilder
+import com.winhc.bigdata.flink.streaming.trigger.TimeCountTrigger
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
+import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
 
 /**
  * @author: XuJiakai
@@ -13,10 +18,18 @@ object CompanyStreamJob {
 
   def main(args: Array[String]): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    // 全局Job参数配合Rich函数自动配置
+    env.getConfig.setGlobalJobParameters(EnvConst.createParameterTool(args))
+
     val kafkaSource = KafkaSourceBuilder.buildSourceFunction("xjk_test_topic")
     val source: DataStream[String] = env.fromSource[String](kafkaSource, CustomWatermarkStrategy(), "Kafka Source")
-    source.winhcTransform()
 
-    env.execute("CompanyStreamJob")
+
+    source.winhcTransform().keyBy(o=>o.companyId).window(SlidingProcessingTimeWindows.of(Time.seconds(15), Time.seconds(10)))
+      .trigger(TimeCountTrigger.processingTime[TimeWindow](2000))
+
+    System.out.println(env.getExecutionPlan)
+    //    env.execute("CompanyStreamJob")
   }
 }

+ 4 - 5
src/main/scala/com/winhc/bigdata/flink/TestJob.scala

@@ -1,14 +1,13 @@
-package com.winhc.bigdata.flink
+package com.winhc.bigdata.flink.jobs
 
 import com.winhc.bigdata.flink.func.HbaseAsyncFunction
 import com.winhc.bigdata.flink.source.OdpsSourceBuilder
-import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction
 import org.apache.flink.streaming.api.scala.async.AsyncFunction
 import org.apache.flink.streaming.api.scala.{AsyncDataStream, DataStream, StreamExecutionEnvironment}
 import org.apache.flink.table.data.RowData
-
-import java.util.concurrent.TimeUnit
+import org.apache.flink.api.scala._
+import scala.concurrent.duration.SECONDS
 
 /**
  * @author: XuJiakai
@@ -31,7 +30,7 @@ object TestJob {
     val source = env.addSource[RowData](odpsSource)
     val value1: DataStream[(String, String)] = source.map(r => ("ng_company", r.getString(0).toString))
     val function: AsyncFunction[(String, String), (String, String, String)] = HbaseAsyncFunction()
-    val value: DataStream[(String, String, String)] = AsyncDataStream.unorderedWait(value1, function, 100, TimeUnit.SECONDS, 10)
+    val value: DataStream[(String, String, String)] = AsyncDataStream.unorderedWait(value1, function, 100, SECONDS, 10)
 
     value.addSink(new PrintSinkFunction[(String, String, String)])
     env.execute("test job")

+ 137 - 0
src/main/scala/com/winhc/bigdata/flink/jobs/TestJob1.scala

@@ -0,0 +1,137 @@
+package com.winhc.bigdata.flink.jobs
+
+import com.winhc.bigdata.flink.event.{ElasticSearchInfo, HbaseInfo, UpdateEntity}
+import com.winhc.bigdata.flink.func.{HbaseAsyncFunction, HbaseSinkFunction}
+import com.winhc.bigdata.flink.implicits._
+import com.winhc.bigdata.flink.java.constant.EnvConst
+import com.winhc.bigdata.flink.java.entity.{PutCollection, SealedPut}
+import com.winhc.bigdata.flink.jobs.company_job_step._
+import com.winhc.bigdata.flink.source.KafkaSourceBuilder
+import com.winhc.bigdata.flink.streaming.trigger.ProcessingTimeCountTrigger
+import org.apache.commons.lang3.StringUtils
+import org.apache.flink.api.common.eventtime.WatermarkStrategy
+import org.apache.flink.api.common.functions.AggregateFunction
+import org.apache.flink.api.scala._
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.connector.kafka.source.KafkaSource
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.scala.{AsyncDataStream, StreamExecutionEnvironment}
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.util.Collector
+import org.apache.hadoop.hbase.client.Put
+import org.json4s.JsonAST.{JNull, JObject, JValue}
+import org.json4s.{JNothing, JString}
+
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.TimeUnit
+
+
+/**
+ * @author ZhangJi
+ * @since 2021-10-14 10:11
+ */
+object TestJob1 {
+  def main(args: Array[String]): Unit = {
+
+    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration())
+
+    env.getConfig.setGlobalJobParameters(EnvConst.createParameterTool(args))
+    val kafkaSource: KafkaSource[String] = KafkaSourceBuilder.buildSourceFunction("flink_test")
+
+    val source = env.fromSource(kafkaSource, WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source")
+
+    val allSource = source.transform_and_validation()
+    allSource.getSideOutput(OutputTags.ExceptionTag).map(e => e.toJson()).print()
+    allSource.getSideOutput(OutputTags.TransformErrorTag).map(e => e.toJson()).print()
+    allSource.getSideOutput(OutputTags.ValidationErrorTag).map(e => e.toJson()).print()
+    val asyncDataStream = AsyncDataStream.unorderedWait(allSource, new HbaseAsyncFunction, 10, TimeUnit.SECONDS)
+
+    val afterOutput = asyncDataStream.process(new ProcessFunction[UpdateEntity, (String, Map[String, JObject])] {
+      override def processElement(ue: UpdateEntity, ctx: ProcessFunction[UpdateEntity, (String, Map[String, JObject])]#Context, out: Collector[(String, Map[String, JObject])]): Unit = {
+        val company = ue.company
+        if (company != null) {
+          ctx.output(OutputTags.HBASE_SINK_TAG, ("NG_RT_COMPANY", "F", ue.companyId(), company))
+        }
+        for (table <- ue.dims.values) {
+          val meta = table.metaInfo
+          val data = table.data
+          meta.hbase match {
+            case Some(hi) =>
+              val HbaseInfo(h0, h1) = hi
+              data.foreach(t => {
+                val (rowkey, jo) = t
+                ctx.output(OutputTags.HBASE_SINK_TAG, (h0, h1, rowkey, jo))
+              })
+            case _ =>
+          }
+          //ES 暂时未实现
+       /*   meta.es match {
+            case Some(ei) =>
+              val ElasticSearchInfo(index, doc_type) = ei
+              data.foreach(t => {
+                val (rowkey, jo) = t
+                ctx.output(OutputTags.ELASTICSEARCH_SINK_TAG, (index, doc_type, rowkey, jo))
+              })
+            case _ =>
+          }*/
+        }
+      }
+    })
+    val hbaseSource = afterOutput.getSideOutput(OutputTags.HBASE_SINK_TAG)
+    hbaseSource
+      .map(t => {
+        val (table, cf, row, value) = t
+        val rowb = row.getBytes("utf-8")
+        val cfb = cf.getBytes("utf-8")
+
+        value match {
+          case JObject(x) =>
+            val put: Put = new Put(rowb)
+            x.foreach(t => {
+              val (k, v) = t
+              val stringv = v match {
+                case JNull => ""
+                case JNothing => ""
+                case x: JObject => x.toJson()
+                case JString(x) => x
+                case x: JValue => x.toString
+              }
+              put.addColumn(cfb, k.getBytes("utf-8"), stringv.getBytes("utf-8"))
+            })
+            val sp = new SealedPut(put)
+            (table, sp)
+          case _ => throw new RuntimeException("流数据错误")
+        }
+      })
+      .keyBy(t => {
+        val row = new String(t._2.getRowKey, StandardCharsets.UTF_8)
+        s"${t._1}${StringUtils.left(row, 2)}"
+      })
+      .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
+      .trigger(
+        new ProcessingTimeCountTrigger[TimeWindow](1000)
+      ).aggregate(new AggregateFunction[(String, SealedPut), PutCollection, PutCollection] {
+      override def createAccumulator(): PutCollection = {
+        new PutCollection
+      }
+
+      override def add(value: (String, SealedPut), accumulator: PutCollection): PutCollection = {
+        accumulator.put(value._1, value._2)
+        accumulator
+      }
+
+      override def getResult(accumulator: PutCollection): PutCollection = {
+        accumulator
+      }
+
+      override def merge(a: PutCollection, b: PutCollection): PutCollection = {
+        PutCollection.merge(a, b)
+      }
+    })
+      .addSink(new HbaseSinkFunction)
+    env.execute("flink start")
+  }
+
+}

+ 21 - 0
src/main/scala/com/winhc/bigdata/flink/jobs/company_job_step/OutputTags.scala

@@ -0,0 +1,21 @@
+package com.winhc.bigdata.flink.jobs.company_job_step
+
+
+import com.winhc.bigdata.flink.event.ValidationExceptionEntity
+import com.winhc.bigdata.flink.java.entity.CompanyDataReceive
+import org.apache.flink.streaming.api.scala.OutputTag
+import org.json4s.JsonAST.JObject
+import org.apache.flink.api.scala._
+/**
+ * @author ZhangJi
+ * @since 2021-10-19 14:44
+ */
+object OutputTags {
+  val TransformErrorTag = new OutputTag[ValidationExceptionEntity[String]]("transformException")
+  val ExceptionTag = new OutputTag[ValidationExceptionEntity[CompanyDataReceive]]("exception")
+  val ValidationErrorTag = new OutputTag[Map[String, Seq[JObject]]]("validation")
+  val HBASE_SINK_TAG=new OutputTag[(String,String,String,JObject)]("hbase")
+  val ELASTICSEARCH_SINK_TAG=new OutputTag[(String,String,String,JObject)]("es")
+  val HOLO_SINK_TAG=new OutputTag[(String,JObject)]("holo")
+
+}

+ 78 - 0
src/main/scala/com/winhc/bigdata/flink/jobs/company_job_step/gs_01_kafka_transform_and_validation.scala

@@ -0,0 +1,78 @@
+package com.winhc.bigdata.flink.jobs.company_job_step
+
+import com.winhc.bigdata.flink.event.{RowBaseDimEntity, UpdateEntity, ValidationExceptionEntity}
+import com.winhc.bigdata.flink.java.entity.CompanyDataReceive
+import com.winhc.bigdata.flink.jobs.company_job_step.OutputTags._
+import com.winhc.bigdata.flink.utils.{BaseUtils, RowDataUtils}
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.flink.util.Collector
+import org.json4s.JsonAST.JObject
+import org.json4s.jackson.JsonMethods._
+import org.slf4j.LoggerFactory
+
+import scala.collection.JavaConverters._
+
+/**
+ * @author ZhangJi
+ * @since 2021-10-18 17:21
+ */
+object gs_01_kafka_transform_and_validation {
+  private val logger = LoggerFactory.getLogger(classOf[gs_01_kafka_transform_and_validation])
+}
+
+case class gs_01_kafka_transform_and_validation(source: DataStream[String]) {
+
+
+  def transform_and_validation(): DataStream[UpdateEntity] = {
+    source
+      .process(new ProcessFunction[String, UpdateEntity]() {
+        override def processElement(value: String, ctx: ProcessFunction[String, UpdateEntity]#Context, out: Collector[UpdateEntity]): Unit = {
+          try {
+            val c = BaseUtils.objectmapper.readValue(value, classOf[CompanyDataReceive])
+            try {
+              val meta = c.getMeta
+              val data = c.getData
+              val base = c.getCompany
+              val joMap = data.asScala
+                .map(e => (e._1, e._2.asScala.map(fromJsonNode(_).asInstanceOf[JObject])))
+                .toMap
+
+              val validationResult = RowDataUtils.validateDims(joMap)
+              val error = validationResult.error.filter(t => t._2.nonEmpty)
+              if (error.nonEmpty) ctx.output(ValidationErrorTag, error)
+              val dims = validationResult
+                .success
+                .filter(t => t._2.nonEmpty)
+                .map(t => (t._1, RowBaseDimEntity.ofJObject(t, (meta.getCompanyId, meta.getCompanyName))))
+              if (dims.nonEmpty) {
+                val entity = RowDataUtils.validateCompany(base) match {
+                  case Right(c) => UpdateEntity(fromJsonNode(c).asInstanceOf[JObject], dims)
+                  case Left(c) =>
+                    val companyDataReceive = new CompanyDataReceive()
+                    companyDataReceive.setCompany(c)
+                    val rte = new RuntimeException("基本信息不合法")
+                    ctx.output(ExceptionTag, ValidationExceptionEntity.of(rte, companyDataReceive))
+                    UpdateEntity(null, dims)
+                }
+                if (base == null) {
+                  UpdateEntity(null, dims)
+                } else {
+                  UpdateEntity(fromJsonNode(base).asInstanceOf[JObject], dims)
+                }
+                out.collect(entity)
+              }
+            } catch {
+              case e: Exception =>
+                gs_01_kafka_transform_and_validation.logger.error("error", e)
+                ctx.output(ExceptionTag, ValidationExceptionEntity.of(e, c))
+            }
+          } catch {
+            case e: Exception =>
+              ctx.output(TransformErrorTag, ValidationExceptionEntity.of(e, value))
+          }
+        }
+      })
+  }
+}

+ 4 - 2
src/main/scala/com/winhc/bigdata/flink/jobs/company_job_step/package.scala

@@ -2,7 +2,9 @@ package com.winhc.bigdata.flink.jobs
 
 import org.apache.flink.streaming.api.scala.DataStream
 
-package object company_job_step {
-  implicit def kafkaTransformEnhancer(source: DataStream[String]) = step_01_kafka_transform(source)
+import scala.language.implicitConversions
 
+package object company_job_step {
+  implicit def kafkaTransformEnhancer(source: DataStream[String]): step_01_kafka_transform = step_01_kafka_transform(source)
+  implicit def transformValidation(source:DataStream[String]): gs_01_kafka_transform_and_validation =gs_01_kafka_transform_and_validation(source)
 }

+ 4 - 3
src/main/scala/com/winhc/bigdata/flink/jobs/company_job_step/step_01_kafka_transform.scala

@@ -1,9 +1,10 @@
 package com.winhc.bigdata.flink.jobs.company_job_step
 
 import com.alibaba.fastjson.JSON
-import com.winhc.bigdata.filnk.java.BaseEntity
-import com.winhc.bigdata.filnk.java.utils.Json2EntityUtils
+import com.winhc.bigdata.flink.java.BaseEntity
+import com.winhc.bigdata.flink.java.utils.Json2EntityUtils
 import com.winhc.bigdata.flink.CompanyData
+
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.scala.DataStream
 
@@ -23,7 +24,7 @@ case class step_01_kafka_transform(source: DataStream[String]) {
       val companyName = jsonObject.getString("company_name")
       val content = jsonObject.getJSONObject("content")
       val map: Map[String, Seq[BaseEntity]] = content.keySet().asScala.map(tn => {
-        (tn, content.getJSONArray(tn).toJavaList(Json2EntityUtils.getClazz(tn)).asScala.toSeq)
+        (tn, content.getJSONArray(tn).toJavaList(Json2EntityUtils.getClazz(tn)).asScala)
       }).toMap
       CompanyData(companyId = companyId, companyName = companyName, content = map)
     })

+ 2 - 2
src/main/scala/com/winhc/bigdata/flink/jobs/company_job_step/step_02_sink_2_ods.scala

@@ -1,7 +1,7 @@
 package com.winhc.bigdata.flink.jobs.company_job_step
 
-import com.winhc.bigdata.filnk.java.BaseEntity
-import com.winhc.bigdata.filnk.java.utils.Json2EntityUtils
+import com.winhc.bigdata.flink.java.BaseEntity
+import com.winhc.bigdata.flink.java.utils.Json2EntityUtils
 import com.winhc.bigdata.flink.CompanyData
 import com.winhc.bigdata.flink.sink.OdpsSinkBuilder
 import com.winhc.bigdata.flink.utils.DateUtils

+ 1 - 1
src/main/scala/com/winhc/bigdata/flink/sink/HoloSinkBuilder.scala

@@ -7,7 +7,7 @@ import com.alibaba.ververica.connectors.hologres.jdbc.HologresJDBCWriter
 import com.alibaba.ververica.connectors.hologres.rpc.HologresRpcWriter
 import com.alibaba.ververica.connectors.hologres.sink.HologresSinkFunction
 import com.alibaba.ververica.connectors.hologres.utils.HologresUtils
-import com.winhc.bigdata.filnk.java.constant.EnvConst
+import com.winhc.bigdata.flink.java.constant.EnvConst
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.table.api.{DataTypes, TableSchema}
 import org.apache.flink.table.data.RowData

+ 1 - 1
src/main/scala/com/winhc/bigdata/flink/sink/OdpsSinkBuilder.scala

@@ -1,6 +1,6 @@
 package com.winhc.bigdata.flink.sink
 
-import com.winhc.bigdata.filnk.java.sink.OdpsSinkBuilder4J
+import com.winhc.bigdata.flink.java.sink.OdpsSinkBuilder4J
 import org.apache.flink.api.java.tuple.Tuple2
 import org.apache.flink.streaming.api.functions.sink.SinkFunction
 import org.apache.flink.types.Row

+ 1 - 1
src/main/scala/com/winhc/bigdata/flink/source/KafkaSourceBuilder.scala

@@ -1,6 +1,6 @@
 package com.winhc.bigdata.flink.source
 
-import com.winhc.bigdata.filnk.java.source.KafkaSourceBuilder4J
+import com.winhc.bigdata.flink.java.source.KafkaSourceBuilder4J
 import org.apache.flink.connector.kafka.source.KafkaSource
 
 /**

+ 2 - 2
src/main/scala/com/winhc/bigdata/flink/source/OdpsSourceBuilder.scala

@@ -1,8 +1,8 @@
 package com.winhc.bigdata.flink.source
 
 import com.alibaba.ververica.connectors.odps.{ODPSStreamSource, OdpsOptions}
-import com.winhc.bigdata.filnk.java.constant.EnvConst
-import com.winhc.bigdata.filnk.java.source.OdpsSourceBuilder4J
+import com.winhc.bigdata.flink.java.constant.EnvConst
+import com.winhc.bigdata.flink.java.source.OdpsSourceBuilder4J
 import org.apache.flink.configuration.Configuration
 
 /**

+ 101 - 0
src/main/scala/com/winhc/bigdata/flink/streaming/trigger/TimeCountTrigger.scala

@@ -0,0 +1,101 @@
+package com.winhc.bigdata.flink.streaming.trigger
+
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.common.state.{ReducingState, ReducingStateDescriptor}
+import org.apache.flink.api.common.typeutils.base.LongSerializer
+import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
+import org.apache.flink.streaming.api.windowing.triggers._
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+
+
+/**
+ * 
+ * @author ZhangJi
+ * @since 2021-10-11 15:37
+ */
+
+object TimeCountTrigger {
+  def processingTime[W<:TimeWindow](maxCount: Long): TimeCountTrigger[W] = {
+    new ProcessingTimeCountTrigger[W](maxCount)
+  }
+
+  def eventTime[W<:TimeWindow](maxCount: Long): TimeCountTrigger[W] = {
+    new EventTimeCountTrigger[W](maxCount)
+  }
+}
+
+abstract class TimeCountTrigger[W <: TimeWindow](maxCount: Long) extends Trigger[Object, W] {
+
+
+  /**
+   * 计数状态
+   */
+  private val countState: ReducingStateDescriptor[java.lang.Long] = new ReducingStateDescriptor[java.lang.Long](
+    "count", new Sum(), LongSerializer.INSTANCE
+  )
+
+  /**
+   * 这里绝逼是Flink里的坑,在返回`TriggerResult.FIRE_AND_PURGE`的时候不会清空计数
+   *
+   * @param window 窗信息
+   * @param ctx    上下文
+   * @return
+   */
+  protected def fireAndPurge(window: W, ctx: TriggerContext): TriggerResult = {
+    clear(window, ctx)
+    TriggerResult.FIRE_AND_PURGE
+  }
+
+  override def onElement(element: Object, timestamp: Long, window: W, ctx: TriggerContext): TriggerResult = {
+    val count: ReducingState[java.lang.Long] = ctx.getPartitionedState(countState)
+    count.add(1L)
+    if (count.get >= maxCount || timestamp >= window.getEnd) {
+      fireAndPurge(window, ctx)
+    } else {
+      TriggerResult.CONTINUE
+    }
+  }
+
+  override def clear(window: W, ctx: TriggerContext): Unit = {
+    val count: ReducingState[java.lang.Long] = ctx.getPartitionedState(countState)
+    count.clear()
+    count.add(0L)
+  }
+
+  /**
+   * 计数求和
+   */
+  class Sum extends ReduceFunction[java.lang.Long] {
+    def reduce(value1: java.lang.Long, value2: java.lang.Long): java.lang.Long = value1 + value2
+  }
+}
+
+
+class ProcessingTimeCountTrigger[W <: TimeWindow](maxCount: Long) extends TimeCountTrigger[W](maxCount) {
+  override def onProcessingTime(time: Long, window: W, ctx: TriggerContext): TriggerResult = {
+    if (time >= window.getEnd) {
+      TriggerResult.CONTINUE
+    } else {
+      fireAndPurge(window, ctx)
+    }
+  }
+
+  override def onEventTime(time: Long, window: W, ctx: TriggerContext): TriggerResult = {
+    TriggerResult.CONTINUE
+  }
+}
+
+class EventTimeCountTrigger[W <: TimeWindow](maxCount: Long) extends TimeCountTrigger[W](maxCount) {
+  override def onProcessingTime(time: Long, window: W, ctx: TriggerContext): TriggerResult = {
+    TriggerResult.CONTINUE
+  }
+
+  override def onEventTime(time: Long, window: W, ctx: TriggerContext): TriggerResult = {
+    if (time >= window.getEnd) {
+      TriggerResult.CONTINUE
+    } else {
+      fireAndPurge(window, ctx)
+    }
+  }
+}

+ 1 - 1
src/main/scala/com/winhc/bigdata/flink/test/MyTest.scala

@@ -1,6 +1,6 @@
 package com.winhc.bigdata.flink.test
 
-import com.winhc.bigdata.filnk.java.sink.OdpsSinkBuilder4J
+import com.winhc.bigdata.flink.java.sink.OdpsSinkBuilder4J
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.java.tuple.Tuple2
 import org.apache.flink.streaming.api.functions.sink.SinkFunction

+ 14 - 8
src/main/scala/com/winhc/bigdata/flink/utils/BaseUtils.scala

@@ -1,23 +1,29 @@
 package com.winhc.bigdata.flink.utils
 
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
 import org.apache.commons.lang3.StringUtils
+import org.apache.flink.table.utils.EncodingUtils
 
 /**
- * @author: XuJiakai
- * @date: 2021/9/1 15:25
+ * @author XuJiakai
+ * @since 2021/9/1 15:25
  */
 object BaseUtils {
   //去其他符号 去空格
+  val objectmapper: ObjectMapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
   private val pattern = "[^\\u4e00-\\u9fa50-9a-zA-Z]".r
 
   //去其他符号 去空格  补全null
-  def cleanup(s: String): String = {
-    if (StringUtils.isBlank(s))
-      ""
-    else
-      pattern replaceAllIn(s, "")
-  }
+  def cleanup(s: String): String = if (StringUtils.isBlank(s)) "" else pattern replaceAllIn(s, "")
+
+
+  def hbaseTableName(dim: String): String = s"NG_${dim.toUpperCase}"
+
+  def md5(s: String): String = EncodingUtils.hex(EncodingUtils.md5(s))
+
+  def concatws(seq: String, strings: String*): String = strings.mkString(seq)
 
   def isWindows: Boolean = System.getProperty("os.name").contains("Windows")
 
+
 }

+ 66 - 0
src/main/scala/com/winhc/bigdata/flink/utils/PreUDF.scala

@@ -0,0 +1,66 @@
+package com.winhc.bigdata.flink.utils
+
+import com.alibaba.fastjson.JSON
+import org.apache.commons.lang3.StringUtils
+
+import scala.collection.mutable
+
+/**
+ * @author ZhangJi
+ * @since 2021-10-15 11:28
+ */
+object PreUDF {
+  def replace_rowkey(use_user_defined_rowkey: String, new_rowkey: String): String = {
+    if (StringUtils.isEmpty(use_user_defined_rowkey)) {
+      new_rowkey
+    } else {
+      use_user_defined_rowkey
+    }
+  }
+
+  def company_holder_rowkey(holder_type: Long, holder_id: String, holder_name: String): String = {
+    if (2 == holder_type) {
+      holder_id
+    } else {
+      holder_name
+    }
+  }
+
+  def get_fixed_val(key_no: String, name: String): String = {
+    if (StringUtils.isEmpty(key_no)) {
+      name
+    } else {
+      if (key_no.length == 32) {
+        key_no
+      } else {
+        name
+      }
+    }
+  }
+
+  def equity_info_rowkey(pledgee_info: String, pledgor_info: String): String = {
+    try {
+      def get_seq(key: String, value: String): String = {
+        val jSONArray = JSON.parseArray(value)
+        if (jSONArray.isEmpty) {
+          return ""
+        }
+        val arr = mutable.ArrayBuffer[String]()
+        for (i <- 0 until jSONArray.size()) {
+          val jSONObject = jSONArray.getJSONObject(i)
+          arr.append(jSONObject.getString(key))
+        }
+        arr.distinct.sorted.mkString("、")
+      }
+
+      val str = get_seq("pledgee", pledgee_info) + " " + get_seq("pledgor", pledgor_info)
+      str
+    } catch {
+      case ex: Exception => {
+        ""
+      }
+    }
+  }
+
+
+}

+ 153 - 0
src/main/scala/com/winhc/bigdata/flink/utils/RowDataUtils.scala

@@ -0,0 +1,153 @@
+package com.winhc.bigdata.flink.utils
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.ObjectNode
+import com.github.fge.jsonschema.main.JsonSchemaFactory
+import com.winhc.bigdata.flink.config.ArgsCompanyJob
+import com.winhc.bigdata.flink.event.{CompanyInfo, MetaInfo, ValidationResult}
+import com.winhc.bigdata.flink.utils.BaseUtils.md5
+import org.json4s.JsonAST.{JArray, JObject, JString}
+import org.json4s.JsonDSL.jobject2assoc
+import org.json4s.jackson.JsonMethods._
+
+import java.io.File
+import scala.collection.JavaConverters._
+
+/**
+ * @author ZhangJi
+ * @since 2021-10-13 14:54
+ */
+object RowDataUtils {
+
+  private val schema_dir: String = this.getClass.getResource("/schema").getPath
+  val schemas: Map[String, JsonNode] = createAllSchema
+
+  def createAllSchema: Map[String, JsonNode] = {
+    new File(schema_dir).listFiles().filter(!_.isDirectory)
+      .filter(t => t.toString.endsWith(".schema"))
+      .map(f =>
+        (f.getName.replace(".schema", ""), scala.io.Source.fromFile(f))
+      )
+      .map(t => {
+        val (n, s) = t
+        (n, try s.getLines().mkString("\n") finally s.close())
+      })
+      .map(t => {
+        val (n, s) = t
+        (n, asJsonNode(parse(s)))
+      })
+      .toMap
+  }
+
+
+  /**
+   * TODO 未实现
+   *
+   * @param dim 维度名
+   * @return rowkey
+   */
+  def generateRowKey(dim: String, json: JsonNode): String = {
+
+    ArgsCompanyJob.job_args.get(dim).map(acj => {
+      val md5_fields = acj.md5_fields
+      val companyId = json.get("company_id").asText()
+      if (md5_fields != null) {
+        val fields = md5(md5_fields.map(field => json.get(field).asText()).filter(s => s != null && s.nonEmpty).mkString)
+        s"${companyId}_$fields".stripMargin
+      } else if (acj.rowkey_udf != null) {
+        acj.rowkey_udf(companyId, json)
+      } else {
+        null
+      }
+    }).orNull
+  }
+
+  /**
+   * 生成元数据
+   *
+   * @param dim         维度名
+   * @param companyId   公司id
+   * @param companyName 公司名
+   * @return
+   */
+  def generateMetaInfo(dim: String, companyId: String, companyName: String): MetaInfo = {
+    val m = ArgsCompanyJob.job_args.get(dim).orNull
+    if (m == null) {
+      MetaInfo(Some(CompanyInfo(companyId, companyName)), None, None)
+    } else {
+      MetaInfo(Some(CompanyInfo(companyId, companyName)), Some(m.esInfo), Some(m.hbaseInfo))
+    }
+  }
+
+
+  /**
+   * 实体类校验
+   *
+   * @param dims 维度数据
+   */
+  def validateDims(dims: Map[String, Seq[JObject]]): ValidationResult = {
+    val group = dims.flatMap(t => {
+      val (k, v) = t
+      if (!schemas.contains(k)) {
+        Seq(Left((k, v.map(j => {
+          j ~ ("error" -> JString("维度不存在"))
+        }))))
+      } else {
+        val validator = JsonSchemaFactory.byDefault().getValidator
+        val data = v.map(j => asJsonNode(j))
+          .map(j => (validator.validate(schemas(k), j), j))
+          .map(t => {
+            val (p, j) = t
+            (p, fromJsonNode(j).asInstanceOf[JObject])
+          })
+          .groupBy(t => {
+            val (p, _) = t
+            p.isSuccess
+          })
+        val successList = data.get(true).map(s => s.map(_._2)).getOrElse(Seq.empty)
+        val errorList = data.get(false).map(s =>
+          s.map(t => {
+            val (p, j) = t
+            val errors = p.iterator().asScala.map(m => fromJsonNode(m.asJson())).toList
+            j ~ ("error" -> JArray(errors))
+          })
+        )
+          .getOrElse(Seq.empty)
+        Seq(Right((k, successList)), Left((k, errorList)))
+      }
+    }
+    )
+      .groupBy {
+        case Right(_) => true
+        case Left(_) => false
+      }
+    val success = group.getOrElse(true, Seq.empty).map {
+      case Right(x) => x
+      case _ => null
+    }
+      .filter(_ != null)
+      .toMap
+    val error = group.getOrElse(false, Seq.empty).map {
+      case Left(x) => x
+      case _ => null
+    }
+      .filter(_ != null)
+      .toMap
+    ValidationResult(success = success, error = error)
+  }
+
+  def validateCompany(company: JsonNode): Either[JsonNode, JsonNode] = {
+    if (company == null) return Right(null)
+    val companySchema = schemas("company")
+    val validator = JsonSchemaFactory.byDefault().getValidator
+    val p = validator.validate(companySchema, company)
+    if (p.isSuccess) {
+      Right(company)
+    } else {
+      val error = p.iterator().asScala.map(m => m.asJson()).toSeq.asJava
+      company.asInstanceOf[ObjectNode].putArray("error").addAll(error)
+      Left(company)
+    }
+  }
+
+}

+ 37 - 0
src/test/scala/com/winhc/bigdata/flink/utils/BaseUtilsTI.scala

@@ -0,0 +1,37 @@
+package com.winhc.bigdata.flink.utils
+
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+/**
+ * @author ZhangJi
+ * @since 2021-10-20 15:19
+ */
+
+class BaseUtilsTI {
+  @Test
+  def testHbaseTableName(): Unit = {
+    assertEquals("NG_COMPANY", BaseUtils.hbaseTableName("company"))
+    assertEquals("NG_COMPANY_HOLDER", BaseUtils.hbaseTableName("company_holder"))
+  }
+
+  @Test
+  def testConcatws(): Unit = {
+    assertEquals("1,2,3,4", BaseUtils.concatws(",", "1", "2", "3", "4"))
+  }
+
+  @Test
+  def testRandomString(): Unit = {
+    var i = 0
+    while (true) {
+      i += 1
+      if (i > 1000000) return
+      val b = BigInt(8, 0, scala.util.Random).toString(16)
+      println(b)
+      if (b.length < 2) {
+        return
+      }
+    }
+
+  }
+}