소스 검색

feat(kafka): 动态人员id变更

JimZhang 3 년 전
부모
커밋
45ecee335c

+ 11 - 0
pom.xml

@@ -134,6 +134,17 @@
             <artifactId>logging-interceptor</artifactId>
             <version>3.10.0</version>
         </dependency>
+        <dependency>
+            <groupId>com.bbossgroups.plugins</groupId>
+            <artifactId>bboss-elasticsearch-spring-boot-starter</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+            <version>6.3.1</version>
+        </dependency>
 
 
         <!--        <dependency>-->

+ 18 - 0
src/main/java/com/winhc/bigdata/task/configuration/BBossESConfiguration.java

@@ -0,0 +1,18 @@
+package com.winhc.bigdata.task.configuration;
+
+import org.frameworkset.elasticsearch.boot.BBossESStarter;
+import org.frameworkset.elasticsearch.client.ClientInterface;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * @author ZhangJi
+ * @since 2021-08-03 09:36
+ */
+@Configuration
+public class BBossESConfiguration {
+    @Bean
+    public ClientInterface bbossESClient(BBossESStarter bBossESStarter) {
+        return bBossESStarter.getRestClient();
+    }
+}

+ 21 - 3
src/main/java/com/winhc/bigdata/task/configuration/KafkaConfiguration.java

@@ -90,6 +90,24 @@ public class KafkaConfiguration {
         return container;
     }
 
+    @Bean("smallContainerFactory")
+    public ConcurrentKafkaListenerContainerFactory<String, String> smallContainerFactory() {
+        ConcurrentKafkaListenerContainerFactory<String, String> container = new ConcurrentKafkaListenerContainerFactory<>();
+        Map<String, Object> props = consumerProps();
+        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 16);
+        container.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props));
+        // 设置并发量,小于或等于Topic的分区数
+        container.setConcurrency(1);
+        // 拉取超时时间
+        //container.getContainerProperties().setPollTimeout(1500);
+        // 设置为批量监听
+        container.setBatchListener(true);
+
+        // 设置提交偏移量的方式
+        //container.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
+        return container;
+    }
+
     private Map<String, Object> consumerProps() {
         Map<String, Object> props = new HashMap<>(8);
         // kafka服务地址
@@ -98,10 +116,10 @@ public class KafkaConfiguration {
         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
         // 一次拉取消息数量
         props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
-        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 5*60*1000);
-        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 5*60*1000);
+        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 5 * 60 * 1000);
+        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 5 * 60 * 1000);
         // 最大处理时间
-        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 5*60*1000);
+        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 5 * 60 * 1000);
         // 序列化
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

+ 14 - 4
src/main/java/com/winhc/bigdata/task/controller/TestController.java

@@ -6,10 +6,13 @@ import com.winhc.bigdata.task.framework.odps.service.PullRequestRecordService;
 import com.winhc.bigdata.task.framework.odps.utils.ExecOdpsSqlUtil;
 import com.winhc.bigdata.task.framework.odps.vo.MaxComputeSqlInstance;
 import lombok.AllArgsConstructor;
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.springframework.web.bind.annotation.GetMapping;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RestController;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.web.bind.annotation.*;
+
+import java.util.concurrent.ExecutionException;
 
 /**
  * @author: XuJiakai
@@ -18,12 +21,13 @@ import org.springframework.web.bind.annotation.RestController;
 @Slf4j
 @RestController
 @RequestMapping("test")
-@AllArgsConstructor
+@RequiredArgsConstructor
 public class TestController {
 
     private final PullRequestRecordService pullRequestRecordService;
     private final ExportDataCenter exportDataCenter;
     private final ExecOdpsSqlUtil execOdpsSqlUtil;
+    private final KafkaTemplate<String, String> kafkaTemplate;
 
     @GetMapping("pull")
     public Object pull(String sqlId, String pullBy) {
@@ -58,4 +62,10 @@ public class TestController {
             return e.getMessage();
         }
     }
+
+    @PostMapping("push-kafka/{topic}")
+    public Object push(@RequestBody String payload, @PathVariable String topic) throws ExecutionException, InterruptedException {
+        return kafkaTemplate.send(topic, payload)
+                .get().toString();
+    }
 }

+ 225 - 0
src/main/java/com/winhc/bigdata/task/jobs/DynamicPersonIdUpdateJob.java

@@ -0,0 +1,225 @@
+package com.winhc.bigdata.task.jobs;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.winhc.bigdata.task.service.DingTalkService;
+import com.winhc.bigdata.task.util.ElasticsearchQueryUtil;
+import com.winhc.bigdata.task.util.ThrowableUtils;
+import lombok.RequiredArgsConstructor;
+import lombok.experimental.ExtensionMethod;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.elasticsearch.common.TriFunction;
+import org.elasticsearch.common.collect.Tuple;
+import org.frameworkset.elasticsearch.client.ClientInterface;
+import org.frameworkset.util.CollectionUtils;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.stereotype.Component;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * @author ZhangJi
+ * @since 2021-08-02 18:46
+ */
+@Slf4j
+@Component
+@RequiredArgsConstructor
+@ExtensionMethod(DynamicPersonIdUpdateJob.StringStringMapExt.class)
+public class DynamicPersonIdUpdateJob {
+    public static ForkJoinPool TASK_FJ_POOL = new ForkJoinPool(Runtime.getRuntime().availableProcessors() * 3);
+    private final ClientInterface restClient;
+    private final KafkaTemplate<String, String> kafkaTemplate;
+    private final DingTalkService dingTalkService;
+
+    public static <T> Predicate<T> distinctByKey(Function<? super T, ?> keyExtractor) {
+        Set<Object> seen = ConcurrentHashMap.newKeySet();
+        return t -> seen.add(keyExtractor.apply(t));
+    }
+
+    @KafkaListener(id = "update-person-id",
+            topics = "inc_human_pid_change",
+            groupId = "update_person_group",
+            containerFactory = "smallContainerFactory")
+    public void updatePersonId(List<String> payloads) {
+        List<Tuple<Map<String, String>, JSONObject>> params = payloads
+                .stream()
+                .map(JSON::parseObject)
+                .map(j -> {
+                    Map<String, String> map = new HashMap<>();
+                    map.putOld(j.ohp());
+                    map.putNew(j.nhp());
+                    return Tuple.tuple(map, j);
+                })
+                .filter(distinctByKey(m -> m.v1().ohp()))
+                .collect(Collectors.toList());
+        batchFromUpdate(params);
+    }
+
+    private void batchFromUpdate(List<Tuple<Map<String, String>, JSONObject>> params) {
+        try {
+            CompletableFuture.allOf(params.stream().map(this::update).toArray(CompletableFuture[]::new)).get();
+        } catch (InterruptedException | ExecutionException e) {
+            log.error("waiting processor error", e);
+            dingTalkService.error("%s\n%s", e.getMessage(), ThrowableUtils.getStackTraceByPn(e, "com.winhc.bigdata.task"));
+            params.stream().map(Tuple::v2).map(j -> j.retry()).forEach(s -> kafkaTemplate.send("inc_human_pid_change", s));
+        }
+    }
+
+    private CompletableFuture<Void> update(Tuple<Map<String, String>, JSONObject> update) {
+        if (!update.v1().valid()) {
+            return CompletableFuture.completedFuture(null);
+        }
+        String query = ElasticsearchQueryUtil.updatePersonId(update.v1().ohp(), update.v1().nhp());
+        return CompletableFuture.runAsync(() -> {
+            try {
+
+                String s = restClient.updateByQuery("winhc-company-dynamic/_update_by_query?conflicts=proceed", query);
+                if (!(s.contains("version_conflicts") && s.contains("\"version_conflicts\": 0"))) {
+                    log.warn(s);
+                }
+            } catch (Exception e) {
+                dingTalkService.error("%s\n%s", e.getMessage(), ThrowableUtils.getStackTraceByPn(e, "com.winhc.bigdata.task"));
+            }
+           /* if (s.contains("version_conflicts") && s.contains("\"version_conflicts\": 0")) {
+                return;
+            }
+            //TODO ES更新处理版本冲突问题 1. 延迟队列,2.重新入队,待验证
+            kafkaTemplate.send("inc_human_pid_change", update.v2().retry());*/
+        }, TASK_FJ_POOL);
+
+    }
+
+
+    @KafkaListener(id = "create-person-id",
+            topics = "company_dynamic_update",
+            groupId = "${spring.kafka.consumer.group-id:xjk_group}",
+            containerFactory = "smallContainerFactory")
+    public void createPersonId(List<String> payloads) {
+        Map<String, List<JSONObject>> collect = payloads.toJSONStream()
+                .collect(Collectors.groupingBy(j -> j.getString("flag").orDefault("0")));
+        System.out.println(collect);
+        batchFromCreate(collect);
+    }
+
+    private void batchFromCreate(Map<String, List<JSONObject>> params) {
+        CompletableFuture<Void> do0 = doAll(params.get("0"), ElasticsearchQueryUtil::createPersonId);
+        CompletableFuture<Void> do9 = doAll(params.get("9"), ElasticsearchQueryUtil::deleteDynamic);
+        try {
+            CompletableFuture.allOf(do0, do9).get();
+        } catch (InterruptedException | ExecutionException e) {
+            log.error("waiting processor error", e);
+            dingTalkService.error("%s\n%s", e.getMessage(), ThrowableUtils.getStackTraceByPn(e, "com.winhc.bigdata.task"));
+            //TODO 执行失败需要重新入队,待验证
+        }
+    }
+
+    private CompletableFuture<Void> doAll(List<JSONObject> params, TriFunction<String, String, List<Tuple<String, String>>, String> function) {
+        if (CollectionUtils.isEmpty(params)) {
+            return CompletableFuture.completedFuture(null);
+        }
+        return CompletableFuture.allOf(params.stream().map(j -> fromDynamic(j, function)).toArray(CompletableFuture[]::new));
+    }
+
+    private CompletableFuture<Void> fromDynamic(JSONObject param, TriFunction<String, String, List<Tuple<String, String>>, String> function) {
+
+        if (!param.createValid()) {
+            log.warn("Illegal data\n{}", param);
+            return CompletableFuture.completedFuture(null);
+        }
+        String tn = param.getString("tn");
+        String rowkey = param.getString("rowkey");
+        Object flag = param.getOrDefault("flag", "0");
+        List<JSONObject> entity = param.getJSONArray("entity").orDefault(new JSONArray()).toJavaList(JSONObject.class);
+        if (!"9".equals(flag) && entity.isEmpty()) {
+            log.error("新增id没有实体对象\n{}", param);
+            return CompletableFuture.completedFuture(null);
+        }
+        List<Tuple<String, String>> crates = entity.stream().map(j -> j.toCreateTuple()).collect(Collectors.toList());
+        String createUpdate = function.apply(rowkey, tn, crates);
+        System.out.println(createUpdate);
+
+        return CompletableFuture.runAsync(() -> {
+            try {
+                String s = restClient.updateByQuery("winhc-company-dynamic_inc_v1/_update_by_query?conflicts=proceed", createUpdate);
+                if (s.contains("version_conflicts") && s.contains("\"version_conflicts\": 0")) {
+                    //do something
+                } else {
+                    log.error("update error {}", s);
+
+                }
+            } catch (Exception e) {
+                log.error("execute update error", e);
+                dingTalkService.error("%s\n%s", e.getMessage(), ThrowableUtils.getStackTraceByPn(e, "com.winhc.bigdata.task"));
+            }
+        }, TASK_FJ_POOL);
+    }
+
+    @SuppressWarnings("unused")
+    public static class StringStringMapExt {
+        public static Stream<JSONObject> toJSONStream(List<String> strings) {
+            return strings.stream().map(JSONObject::parseObject);
+        }
+
+        public static JSONArray orDefault(JSONArray j, JSONArray dj) {
+            return CollectionUtils.isEmpty(j) ? dj : j;
+        }
+
+        public static boolean createValid(JSONObject j) {
+            return j.containsKey("tn") && j.containsKey("rowkey");
+        }
+
+        public static String orDefault(String s, String ds) {
+            return StringUtils.isEmpty(s) ? ds : s;
+        }
+
+        public static Tuple<String, String> toCreateTuple(JSONObject j) {
+            return Tuple.tuple(j.getString("name"), j.getString("keyno"));
+        }
+
+        public static String retry(JSONObject j) {
+            return j.fluentPut("retry", j.getIntValue("retry") + 1).toJSONString();
+        }
+
+        public static String ohp(Map<String, String> update) {
+            return update.get("old_human_pid");
+        }
+
+        public static String nhp(Map<String, String> update) {
+            return update.get("new_human_pid");
+        }
+
+        public static String ohp(JSONObject update) {
+            return update.getString("old_human_pid");
+        }
+
+        public static String nhp(JSONObject update) {
+            return update.getString("new_human_pid");
+        }
+
+        public static void putOld(Map<String, String> update, String pid) {
+            update.put("old_human_pid", pid);
+        }
+
+        public static void putNew(Map<String, String> update, String pid) {
+            update.put("new_human_pid", pid);
+        }
+
+        public static boolean valid(Map<String, String> update) {
+            return update.containsKey("old_human_pid") && update.containsKey("new_human_pid");
+        }
+    }
+}

+ 107 - 0
src/main/java/com/winhc/bigdata/task/util/ElasticsearchQueryUtil.java

@@ -0,0 +1,107 @@
+package com.winhc.bigdata.task.util;
+
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.google.common.collect.Lists;
+import lombok.experimental.ExtensionMethod;
+import org.elasticsearch.common.collect.Tuple;
+import org.frameworkset.util.Assert;
+import org.frameworkset.util.CollectionUtils;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * @author ZhangJi
+ * @since 2021-08-03 09:09
+ */
+@ExtensionMethod(ElasticsearchQueryUtil.QueryUtilExt.class)
+public class ElasticsearchQueryUtil {
+    public static String updatePersonId(String oldKeyno, String newKeyno) {
+        return "{\n" +
+                "  \"query\": {\n" +
+                "    \"term\": {\n" +
+                "      \"association_entity_info.keyno\": {\n" +
+                "        \"value\": \"" + oldKeyno + "\"\n" +
+                "      }\n" +
+                "    }\n" +
+                "  },\n" +
+                "  \"script\": {\n" +
+                "    \"stored\": \"update-person-id\",\n" +
+                "    \"params\": {\n" +
+                "      \"old_keyno\": \"" + oldKeyno + "\",\n" +
+                "\"new_keyno\": \"" + newKeyno + "\"\n" +
+                "    }\n" +
+                "  }\n" +
+                "}";
+    }
+
+    public static String deleteDynamic(String rowkey, String tn, List<Tuple<String, String>> creates) {
+        return new JSONObject()
+                .fluentPut("query", new JSONObject()
+                        .fluentPut("bool", new JSONObject()
+                                .fluentPut("filter", new JSONArray()
+                                        .fluentAdd(
+                                                new JSONObject()
+                                                        .fluentPut("term", new JSONObject()
+                                                                .fluentPut("rowkey", rowkey)
+                                                        )
+                                        )
+                                        .fluentAdd(
+                                                new JSONObject()
+                                                        .fluentPut("term", new JSONObject()
+                                                                .fluentPut("tn", tn)
+                                                        )
+                                        )
+                                )
+                        )
+                )
+                .fluentPut("script", new JSONObject()
+                        .fluentPut("stored", "delete-dynamic")
+                ).toJSONString();
+    }
+
+    public static String createPersonId(String rowkey, String tn, List<Tuple<String, String>> creates) {
+        Assert.isTrue(!CollectionUtils.isEmpty(creates), "未找到新增人名");
+        return new JSONObject()
+                .fluentPut("query", new JSONObject()
+                        .fluentPut("bool", new JSONObject()
+                                .fluentPut("filter", new JSONArray()
+                                        .fluentAdd(
+                                                new JSONObject()
+                                                        .fluentPut("term", new JSONObject()
+                                                                .fluentPut("rowkey", rowkey)
+                                                        )
+                                        )
+                                        .fluentAdd(
+                                                new JSONObject()
+                                                        .fluentPut("term", new JSONObject()
+                                                                .fluentPut("tn", tn)
+                                                        )
+                                        )
+                                )
+                        )
+                )
+                .fluentPut("script", new JSONObject()
+                        .fluentPut("stored", "create-person-id")
+                        .fluentPut("params", creates.toMap())
+                ).toJSONString();
+
+    }
+
+    public static class QueryUtilExt {
+        public static Map<String, String> toMap(List<Tuple<String, String>> tuples) {
+            return tuples.stream().collect(Collectors.toMap(Tuple::v1, Tuple::v2, (o, o2) -> o));
+        }
+    }
+
+    public static void main(String[] args) {
+        System.out.println(createPersonId("1234", "company", Collections.singletonList(Tuple.tuple("张三", "122333444"))
+        ));
+    }
+
+
+}

+ 18 - 0
src/main/java/com/winhc/bigdata/task/util/ThrowableUtils.java

@@ -0,0 +1,18 @@
+package com.winhc.bigdata.task.util;
+
+/**
+ * @author ZhangJi
+ * @since 2021-08-13 16:54
+ */
+public class ThrowableUtils {
+    public static String getStackTraceByPn(Throwable e, String packagePrefix) {
+        StringBuilder s = new StringBuilder("\n").append(e);
+        for (StackTraceElement traceElement : e.getStackTrace()) {
+            if (!traceElement.getClassName().startsWith(packagePrefix)) {
+                break;
+            }
+            s.append("\n\tat ").append(traceElement);
+        }
+        return s.toString();
+    }
+}

+ 29 - 0
src/main/resources/application-dev.yml

@@ -25,6 +25,35 @@ spring:
     hikari:
       max-lifetime: 500000
   elasticsearch:
+    bboss:
+      name: old
+      elasticUser: elastic
+      elasticPassword: elastic_168
+      elasticsearch:
+        rest:
+          hostNames: es-cn-0pp0r32zf000ipovd.public.elasticsearch.aliyuncs.com:9200
+        dateFormat: yyyy.MM.dd
+        timeZone: Asia/Shanghai
+        showTemplate: true
+        discoverHost: false
+      dslfile:
+        refreshInterval: -1
+      http:
+        timeoutConnection: 600000
+        timeoutSocket: 600000
+        connectionRequestTimeout: 600000
+        retryTime: -1
+        maxLineLength: -1
+        maxHeaderCount: 200
+        maxTotal: 400
+        defaultMaxPerRoute: 200
+        soReuseAddress: false
+        soKeepAlive: false
+        timeToLive: 3600000
+        keepAlive: 3600000
+        keystore:
+        keyPassword:
+        hostnameVerifier:
     rest:
       uris: es-cn-oew22t8bw002iferu.public.elasticsearch.aliyuncs.com:9200
       username: elastic

+ 29 - 0
src/main/resources/application-prod.yml

@@ -13,6 +13,35 @@ spring:
     hikari:
       max-lifetime: 500000
   elasticsearch:
+    bboss:
+      name: old
+      elasticUser: elastic
+      elasticPassword: elastic_168
+      elasticsearch:
+        rest:
+          hostNames: es-cn-0pp0r32zf000ipovd.elasticsearch.aliyuncs.com:9200
+        dateFormat: yyyy.MM.dd
+        timeZone: Asia/Shanghai
+        showTemplate: true
+        discoverHost: false
+      dslfile:
+        refreshInterval: -1
+      http:
+        timeoutConnection: 600000
+        timeoutSocket: 600000
+        connectionRequestTimeout: 600000
+        retryTime: -1
+        maxLineLength: -1
+        maxHeaderCount: 200
+        maxTotal: 400
+        defaultMaxPerRoute: 200
+        soReuseAddress: false
+        soKeepAlive: false
+        timeToLive: 3600000
+        keepAlive: 3600000
+        keystore:
+        keyPassword:
+        hostnameVerifier:
     rest:
       uris: es-cn-oew22t8bw002iferu.elasticsearch.aliyuncs.com:9200
       username: elastic