Quellcode durchsuchen

feat: add java version

JimZhang vor 2 Jahren
Ursprung
Commit
35eba5de95

+ 25 - 0
src/main/java/com/winhc/bigdata/flink/java/entity/CompanyDataReceive2.java

@@ -0,0 +1,25 @@
+package com.winhc.bigdata.flink.java.entity;
+
+import com.alibaba.fastjson.JSONObject;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.Data;
+import lombok.var;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author ZhangJi
+ * @since 2021-10-14 10:07
+ */
+@Data
+public class CompanyDataReceive2 {
+    private String requestId;
+    private String sessionId;
+    private ReceiveMeta meta;
+    private Map<String, List<JSONObject>> data;
+    private JSONObject company;
+
+}

+ 14 - 0
src/main/java/com/winhc/bigdata/flink/java/entity/DimEntity.java

@@ -0,0 +1,14 @@
+package com.winhc.bigdata.flink.java.entity;
+
+/**
+ * @author ZhangJi
+ * @since 2021-10-13 13:52
+ */
+public interface DimEntity {
+    default String key() {
+        return companyId();
+    }
+
+    String companyId();
+
+}

+ 53 - 0
src/main/java/com/winhc/bigdata/flink/java/entity/MetaInfo.java

@@ -0,0 +1,53 @@
+package com.winhc.bigdata.flink.java.entity;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * @author ZhangJi
+ * @since 2021-10-22 11:13
+ */
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class MetaInfo {
+    private CompanyInfo company;
+    private ElasticSearchInfo es;
+    private HbaseInfo hbase;
+    private HologresInfo holo;
+
+    @Data
+    @AllArgsConstructor
+    @NoArgsConstructor
+    public static class CompanyInfo {
+        private String companyId;
+        private String companyName;
+    }
+
+
+    @Data
+    @AllArgsConstructor
+    @NoArgsConstructor
+    public static class ElasticSearchInfo {
+        private String index;
+        private String type;
+    }
+
+    @Data
+    @AllArgsConstructor
+    @NoArgsConstructor
+    public static class HbaseInfo {
+        private String table;
+        private String cf;
+    }
+
+    @Data
+    @AllArgsConstructor
+    @NoArgsConstructor
+    public static class HologresInfo {
+        private String table;
+        private String shema;
+    }
+
+}

+ 45 - 0
src/main/java/com/winhc/bigdata/flink/java/entity/RowBaseDimEntity.java

@@ -0,0 +1,45 @@
+package com.winhc.bigdata.flink.java.entity;
+
+import cn.hutool.core.lang.Tuple;
+import com.alibaba.fastjson.JSONObject;
+import com.winhc.bigdata.flink.utils.PreUDF;
+import com.winhc.bigdata.flink.utils.RowDataUtils;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.var;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * @author ZhangJi
+ * @since 2021-10-22 11:09
+ */
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class RowBaseDimEntity implements DimEntity {
+    private MetaInfo metaInfo;
+    private Map<String, JSONObject> data;
+    private Map<String, JSONObject> old;
+
+    @Override
+    public String companyId() {
+        return Optional.ofNullable(metaInfo).map(MetaInfo::getCompany).map(MetaInfo.CompanyInfo::getCompanyId).orElse(null);
+    }
+
+    public static RowBaseDimEntity of(Tuple dataT, Tuple metaT) {
+        String dim = dataT.get(0);
+        List<JSONObject> js = dataT.get(1);
+        var data = js.stream()
+                .map(j -> new Tuple(RowDataUtils.generateRowKey(dim, j), j))
+                .filter(t -> t.get(0) != null)
+                .collect(Collectors.toMap(t -> (String) t.get(0), t -> (JSONObject) t.get(1), (n, o) -> n));
+        var meta = RowDataUtils.generateMetaInfoJava(dim, metaT.get(0), metaT.get(1));
+        return new RowBaseDimEntity(meta, data, null);
+    }
+}
+

+ 22 - 0
src/main/java/com/winhc/bigdata/flink/java/entity/UpdateEntity.java

@@ -0,0 +1,22 @@
+package com.winhc.bigdata.flink.java.entity;
+
+import com.alibaba.fastjson.JSONObject;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.Map;
+
+/**
+ * @author ZhangJi
+ * @since 2021-10-22 11:18
+ */
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class UpdateEntity {
+    private JSONObject company;
+    private JSONObject oldCompany;
+    private Map<String, RowBaseDimEntity> dims;
+
+}

+ 32 - 9
src/main/java/com/winhc/bigdata/flink/java/jobs/TestJob2.java

@@ -1,13 +1,23 @@
 package com.winhc.bigdata.flink.java.jobs;
 
+import cn.hutool.core.lang.Tuple;
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
+import com.winhc.bigdata.flink.java.entity.CompanyDataReceive;
+import com.winhc.bigdata.flink.java.entity.CompanyDataReceive2;
+import com.winhc.bigdata.flink.java.entity.RowBaseDimEntity;
+import com.winhc.bigdata.flink.java.entity.UpdateEntity;
 import com.winhc.bigdata.flink.java.utils.KafkaSourceUtils;
+import lombok.val;
+import lombok.var;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.types.Row;
+
+import java.util.stream.Collectors;
 
 /**
  * @author π
@@ -18,15 +28,28 @@ public class TestJob2 {
     public static void main(String[] args) throws Exception {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         //env.setParallelism(1);
-        FlinkKafkaConsumer<String> kafkaSource = KafkaSourceUtils.getKafkaSource("flink_test", "xjk_test");
-        DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);
-        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(json->{
-            System.out.println("json: "+json);
-            return JSON.parseObject(json);
-        }).name("map name");
-        SingleOutputStreamOperator<JSONObject> filterDS = jsonObjDS.filter((FilterFunction<JSONObject>) value -> !value.isEmpty()).name("filter name");
-        filterDS.print();
-        //filterDS.addSink(new HoloSink()).name("Holo Sink");
+        var kafkaSource = KafkaSourceUtils.getKafkaSource("flink_test", "xjk_test");
+        var kafkaDS = env.addSource(kafkaSource);
+        var receiveDS = kafkaDS.map(json -> JSON.parseObject(json, CompanyDataReceive2.class)).name("map name");
+        var transformDS = receiveDS.map(c -> {
+            val meta = c.getMeta();
+            val data = c.getData();
+            val base = c.getCompany();
+            var dims = data
+                    .entrySet()
+                    .stream()
+                    .map(
+                            e ->
+                                    new Tuple(e.getKey(),
+                                            RowBaseDimEntity.of(
+                                                    new Tuple(e.getKey(), e.getValue()),
+                                                    new Tuple(meta.getCompanyId(), meta.getCompanyName()))
+                                    )
+                    )
+                    .collect(Collectors.toMap(t -> (String) t.get(0), t -> (RowBaseDimEntity) t.get(1), (n, o) -> n));
+            return new UpdateEntity(base, null, dims);
+        }).name("to update");
+        transformDS.print();
         env.execute("BaseJob");
     }
 }

+ 8 - 3
src/main/scala/com/winhc/bigdata/flink/config/ArgsCompanyJob.scala

@@ -1,5 +1,6 @@
 package com.winhc.bigdata.flink.config
 
+import com.alibaba.fastjson.JSONObject
 import com.fasterxml.jackson.databind.JsonNode
 import com.winhc.bigdata.flink.event.{ElasticSearchInfo, HbaseInfo, HologresInfo}
 import com.winhc.bigdata.flink.utils.BaseUtils.cleanup
@@ -15,7 +16,7 @@ case class ArgsCompanyJob(
                            esInfo: ElasticSearchInfo,
                            holoInfo: HologresInfo,
                            md5_fields: Seq[String],
-                           rowkey_udf: Function2[String, JsonNode, String] = null
+                           rowkey_udf: Function3[String, JsonNode, JSONObject, String] = null
 
                          )
 
@@ -25,8 +26,12 @@ object ArgsCompanyJob {
       ElasticSearchInfo("winhc_rt_index_company_holder", "_doc"),
       HologresInfo("ng_rt_company_holder", "public"),
       null,
-      (companyId, j) => {
-        val row = PreUDF.company_holder_rowkey(j.get("holder_type").asInt(), j.get("holder_id").asText(), j.get("holder_name").asText())
+      (companyId, j, alij) => {
+        val row = if (j != null) {
+          PreUDF.company_holder_rowkey(j.get("holder_type").asInt(), j.get("holder_id").asText(), j.get("holder_name").asText())
+        } else {
+          PreUDF.company_holder_rowkey(alij.getIntValue("holder_type"), alij.getString("holder_id"), alij.getString("holder_name"))
+        }
         BaseUtils.concatws("_", companyId, BaseUtils.md5(cleanup(row)))
       }),
     "company_staff" -> ArgsCompanyJob(

+ 0 - 1
src/main/scala/com/winhc/bigdata/flink/event/ValidationExceptionEntity.scala

@@ -6,7 +6,6 @@ package com.winhc.bigdata.flink.event
  */
 
 
-import java.util.Objects
 import scala.beans.BeanProperty
 
 

+ 37 - 4
src/main/scala/com/winhc/bigdata/flink/utils/RowDataUtils.scala

@@ -1,5 +1,6 @@
 package com.winhc.bigdata.flink.utils
 
+import com.alibaba.fastjson.JSONObject
 import com.fasterxml.jackson.databind.JsonNode
 import com.fasterxml.jackson.databind.node.ObjectNode
 import com.github.fge.jsonschema.main.JsonSchemaFactory
@@ -9,8 +10,8 @@ import com.winhc.bigdata.flink.utils.BaseUtils.md5
 import org.json4s.JsonAST.{JArray, JObject, JString}
 import org.json4s.JsonDSL.jobject2assoc
 import org.json4s.jackson.JsonMethods._
-import java.io.{File, InputStream}
 
+import java.io.{File, InputStream}
 import scala.collection.JavaConverters._
 
 /**
@@ -21,7 +22,7 @@ object RowDataUtils {
   val schemas: Map[String, JsonNode] = createAllSchema()
 
   def createAllSchema(): Map[String, JsonNode] = {
-//    val schema_dir: String = this.getClass.getClassLoader.getResource("/schema").getPath
+    //    val schema_dir: String = this.getClass.getClassLoader.getResource("/schema").getPath
 
     Seq(
       ("company",
@@ -340,6 +341,21 @@ object RowDataUtils {
       .toMap
   }
 
+  def generateRowKey(dim: String, json: JSONObject): String = {
+
+    ArgsCompanyJob.job_args.get(dim).map(acj => {
+      val md5_fields = acj.md5_fields
+      val companyId = json.getString("company_id")
+      if (md5_fields != null) {
+        val fields = md5(md5_fields.map(field => json.getString(field)).filter(s => s != null && s.nonEmpty).mkString)
+        s"${companyId}_$fields".stripMargin
+      } else if (acj.rowkey_udf != null) {
+        acj.rowkey_udf(companyId, null, json)
+      } else {
+        null
+      }
+    }).orNull
+  }
 
   /**
    * TODO 未实现
@@ -356,7 +372,7 @@ object RowDataUtils {
         val fields = md5(md5_fields.map(field => json.get(field).asText()).filter(s => s != null && s.nonEmpty).mkString)
         s"${companyId}_$fields".stripMargin
       } else if (acj.rowkey_udf != null) {
-        acj.rowkey_udf(companyId, json)
+        acj.rowkey_udf(companyId, json, null)
       } else {
         null
       }
@@ -380,6 +396,23 @@ object RowDataUtils {
     }
   }
 
+  def generateMetaInfoJava(dim: String, companyId: String, companyName: String): com.winhc.bigdata.flink.java.entity.MetaInfo = {
+
+    val m = ArgsCompanyJob.job_args.get(dim).orNull
+    if (m == null) {
+      val company = new com.winhc.bigdata.flink.java.entity.MetaInfo.CompanyInfo(companyId, companyName)
+      new com.winhc.bigdata.flink.java.entity.MetaInfo(company, null, null, null)
+
+    } else {
+      import com.winhc.bigdata.flink.java.entity.MetaInfo._
+      val company = new CompanyInfo(companyId, companyName)
+      val es = new ElasticSearchInfo(m.esInfo._index, m.esInfo._type)
+      val hbase = new HbaseInfo(m.hbaseInfo.table, m.hbaseInfo.cf)
+      val holo = new HologresInfo(m.holoInfo.table, m.holoInfo.shema)
+      new com.winhc.bigdata.flink.java.entity.MetaInfo(company, es, hbase, holo)
+    }
+  }
+
 
   /**
    * 实体类校验
@@ -452,7 +485,7 @@ object RowDataUtils {
   }
 
   def main(args: Array[String]): Unit = {
-println("")
+    println("")
   }
 
 }