xufei 3 jaren geleden
bovenliggende
commit
f9e1a3df97

+ 28 - 5
src/main/java/com/winhc/bigdata/flink/java/entity/MetaInfo.java

@@ -9,7 +9,6 @@ import lombok.NoArgsConstructor;
  * @since 2021-10-22 11:13
  */
 @Data
-@AllArgsConstructor
 @NoArgsConstructor
 public class MetaInfo {
     private CompanyInfo company;
@@ -17,36 +16,60 @@ public class MetaInfo {
     private HbaseInfo hbase;
     private HologresInfo holo;
 
+    public MetaInfo(CompanyInfo company, ElasticSearchInfo es, HbaseInfo hbase, HologresInfo holo) {
+        this.company = company;
+        this.es = es;
+        this.hbase = hbase;
+        this.holo = holo;
+    }
+
     @Data
-    @AllArgsConstructor
     @NoArgsConstructor
     public static class CompanyInfo {
         private String companyId;
         private String companyName;
+
+        public CompanyInfo(String companyId, String companyName) {
+            this.companyId = companyId;
+            this.companyName = companyName;
+        }
     }
 
 
     @Data
-    @AllArgsConstructor
     @NoArgsConstructor
     public static class ElasticSearchInfo {
         private String index;
         private String type;
+
+        public ElasticSearchInfo(String index, String type) {
+            this.index = index;
+            this.type = type;
+        }
     }
 
     @Data
-    @AllArgsConstructor
     @NoArgsConstructor
     public static class HbaseInfo {
         private String table;
         private String cf;
+
+        public HbaseInfo(String table, String cf) {
+            this.table = table;
+            this.cf = cf;
+        }
     }
 
     @Data
-    @AllArgsConstructor
     @NoArgsConstructor
     public static class HologresInfo {
         private String table;
+
+        public HologresInfo(String table, String shema) {
+            this.table = table;
+            this.shema = shema;
+        }
+
         private String shema;
     }
 

+ 28 - 0
src/main/java/com/winhc/bigdata/flink/java/entity/PutDataCollection.java

@@ -0,0 +1,28 @@
+package com.winhc.bigdata.flink.java.entity;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author π
+ * @since 2021-10-22 11:33
+ */
+public class PutDataCollection {
+
+
+    public Map<String, List<Map<String, Object>>> getData() {
+        return data;
+    }
+
+    public void setData(Map<String, List<Map<String, Object>>> data) {
+        this.data = data;
+    }
+
+    private Map<String, List<Map<String, Object>>> data = Maps.newHashMap();
+
+}
+

+ 85 - 0
src/main/java/com/winhc/bigdata/flink/java/sink/HoloPojoSink.java

@@ -0,0 +1,85 @@
+package com.winhc.bigdata.flink.java.sink;
+
+import com.alibaba.hologres.client.HoloClient;
+import com.alibaba.hologres.client.Put;
+import com.winhc.bigdata.flink.java.entity.PutDataCollection;
+import com.winhc.bigdata.flink.java.utils.HoloUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
+import org.postgresql.model.TableSchema;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+/**
+ * @author π
+ * @Description:
+ * @date 2021/10/22 10:37
+ */
+public class HoloPojoSink extends RichSinkFunction<PutDataCollection> implements CheckpointedFunction, BufferedMutator.ExceptionListener {
+
+    static Log log = LogFactory.getLog(HoloPojoSink.class);
+
+    private HoloClient holoClient = null;
+
+    private AtomicReference<Throwable> failureThrowable =new AtomicReference<Throwable>();
+
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        log.info("init holo..");
+        holoClient = HoloUtils.init();
+    }
+
+    @Override
+    public void invoke(PutDataCollection data, Context context) throws Exception {
+        for (Map.Entry<String, List<Map<String, Object>>> entry : data.getData().entrySet()) {
+            String tableName = entry.getKey();
+            List<Map<String, Object>> list = entry.getValue();
+            TableSchema tableSchema = holoClient.getTableSchema(tableName);
+
+            List<Put> puts = list.stream().map(m -> {
+                Put put = new Put(tableSchema);
+                m.forEach(put::setObject);
+                return put;
+            }).collect(Collectors.toList());
+            holoClient.put(puts);
+            holoClient.flush(); //强制提交所有未提交put请求
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (holoClient != null) {
+            holoClient.close();
+        }
+    }
+
+    private void checkErrorAndRethrow() {
+        Throwable throwable = failureThrowable.get();
+        if (throwable != null) throw new RuntimeException("An error occurred in HoloPojoSink.", throwable);
+    }
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws Exception {
+        checkErrorAndRethrow();
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws Exception {
+
+    }
+
+    @Override
+    public void onException(RetriesExhaustedWithDetailsException e, BufferedMutator bufferedMutator) throws RetriesExhaustedWithDetailsException {
+        failureThrowable.compareAndSet(null, e);
+    }
+}

+ 57 - 0
src/main/java/com/winhc/bigdata/flink/java/sink/HoloSqlSink.java

@@ -0,0 +1,57 @@
+package com.winhc.bigdata.flink.java.sink;
+
+import com.alibaba.fastjson.JSONObject;
+import com.alibaba.hologres.client.HoloClient;
+import com.alibaba.hologres.client.exception.HoloClientException;
+import com.winhc.bigdata.flink.java.utils.HoloUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import java.sql.Statement;
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * @author π
+ * @Description:
+ * @date 2021/10/19 10:37
+ */
+public class HoloSqlSink extends RichSinkFunction<JSONObject> {
+
+    static Log log = LogFactory.getLog(HoloSqlSink.class);
+
+    private HoloClient holoClient = null;
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        log.info("init holo..");
+        holoClient = HoloUtils.init();
+    }
+
+    @Override
+    public void invoke(JSONObject jsonObject, Context context) throws Exception {
+        try {
+            //获取数据中的Key以及Value
+            JSONObject data = jsonObject.getJSONObject("data");
+            Set<String> keys = data.keySet();
+            Collection<Object> values = data.values();
+            Collection<Object> primaryKeys = jsonObject.getJSONArray("primaryKeys");
+            String tableName = jsonObject.getString("tn");
+            String sql = HoloUtils.genInsertSql(tableName, keys, values, primaryKeys);
+            CompletableFuture<Boolean> res1 = holoClient.sql(conn -> {
+                Boolean resultSet;
+                try (Statement stat = conn.createStatement()) {
+                    resultSet = stat.execute(sql);
+                }
+                return resultSet;
+            });
+        } catch (HoloClientException e) {
+            e.printStackTrace();
+            System.out.println(e.getMessage());
+        }
+    }
+
+
+}

+ 53 - 0
src/main/java/com/winhc/bigdata/flink/java/utils/HoloUtils.java

@@ -0,0 +1,53 @@
+package com.winhc.bigdata.flink.java.utils;
+
+import com.alibaba.hologres.client.HoloClient;
+import com.alibaba.hologres.client.HoloConfig;
+import com.alibaba.hologres.client.exception.HoloClientException;
+import com.alibaba.hologres.client.model.WriteMode;
+import org.apache.commons.lang3.StringUtils;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * @author π
+ * @Description:
+ * @date 2021/10/11 11:39
+ */
+public class HoloUtils {
+    private static String url;
+
+    static {
+        if (BaseUtils.isWindows()) {
+            url = "jdbc:postgresql://hgprecn-cn-zvp2dvgsr005-cn-shanghai.hologres.aliyuncs.com:80/winhc_test_db";
+        } else {
+            url = "jdbc:postgresql://hgprecn-cn-zvp2dvgsr005-cn-shanghai-vpc.hologres.aliyuncs.com:80/winhc_test_db";
+        }
+    }
+
+    private static final String username = "LTAI4G4n7pAW8tUbJVkkZQPD";
+    private static final String password = "uNJOBskzcDqHq1TYG3m2rebR4c1009";
+    private static HoloClient client = null;
+
+    public static HoloClient init() throws HoloClientException {
+        HoloConfig config = new HoloConfig();
+        config.setJdbcUrl(url);
+        config.setUsername(username);
+        config.setPassword(password);
+        config.setWriteMode(WriteMode.INSERT_OR_REPLACE);//配置主键冲突时策略
+        client = new HoloClient(config);
+        return client;
+    }
+
+    public static String genInsertSql(String tableName, Set<String> keys, Collection<Object> values, Collection<Object> primaryKeys) {
+        return "INSERT INTO " + tableName + " \n" +
+                "(" + StringUtils.join(keys, ",") + ") VALUES ('" + StringUtils.join(values, "','") + "') \n" +
+                "ON conflict(" + StringUtils.join(primaryKeys, ",") + ") DO UPDATE \n" +
+                "SET (" + StringUtils.join(keys, ",") + ") = ROW(excluded.*)\n";
+
+    }
+}

+ 1 - 1
src/main/scala/com/winhc/bigdata/flink/utils/RowDataUtils.scala

@@ -405,7 +405,7 @@ object RowDataUtils {
 
     } else {
       import com.winhc.bigdata.flink.java.entity.MetaInfo._
-      val company = new CompanyInfo(companyId, companyName)
+      val company = new com.winhc.bigdata.flink.java.entity.MetaInfo.CompanyInfo(companyId, companyName)
       val es = new ElasticSearchInfo(m.esInfo._index, m.esInfo._type)
       val hbase = new HbaseInfo(m.hbaseInfo.table, m.hbaseInfo.cf)
       val holo = new HologresInfo(m.holoInfo.table, m.holoInfo.shema)