|
@@ -0,0 +1,166 @@
|
|
|
+package com.winhc.kafka.consumer;
|
|
|
+
|
|
|
+
|
|
|
+import com.alibaba.fastjson.JSON;
|
|
|
+import com.alibaba.fastjson.JSONArray;
|
|
|
+import com.alibaba.fastjson.JSONAware;
|
|
|
+import com.alibaba.fastjson.JSONObject;
|
|
|
+import com.mongodb.client.MongoCollection;
|
|
|
+import com.winhc.common.constant.Base;
|
|
|
+import com.winhc.config.ConfigConstant;
|
|
|
+import com.winhc.kafka.KafkaProduce;
|
|
|
+import com.winhc.utils.CompanyUtils;
|
|
|
+import com.winhc.utils.ESUtils;
|
|
|
+import lombok.AllArgsConstructor;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
|
+import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
|
+import org.bson.Document;
|
|
|
+import org.elasticsearch.common.collect.Tuple;
|
|
|
+import org.frameworkset.elasticsearch.boot.BBossESStarter;
|
|
|
+import org.frameworkset.elasticsearch.client.ClientInterface;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.beans.factory.annotation.Qualifier;
|
|
|
+import org.springframework.context.annotation.Bean;
|
|
|
+import org.springframework.data.mongodb.core.MongoTemplate;
|
|
|
+import org.springframework.kafka.annotation.KafkaListener;
|
|
|
+import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+
|
|
|
+import javax.annotation.PostConstruct;
|
|
|
+import java.util.*;
|
|
|
+import java.util.concurrent.CompletableFuture;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import java.util.concurrent.ExecutionException;
|
|
|
+import java.util.function.Function;
|
|
|
+import java.util.function.Predicate;
|
|
|
+import java.util.stream.Collectors;
|
|
|
+import java.util.stream.Stream;
|
|
|
+
|
|
|
+/**
|
|
|
+ * @author π
|
|
|
+ * @Description:pid变化更新映射
|
|
|
+ * @date 2021/1/8 16:04
|
|
|
+ */
|
|
|
+@Slf4j
|
|
|
+@Service
|
|
|
+@AllArgsConstructor
|
|
|
+public class KafkaConsumerPersonIdUpdate {
|
|
|
+
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ @Qualifier("bbossESStarterEs5")
|
|
|
+ private BBossESStarter bbossESStarterEs5;
|
|
|
+ private ClientInterface restClient;
|
|
|
+ @Autowired
|
|
|
+ KafkaProduce kafkaProduce;
|
|
|
+ private final MongoTemplate mongoTemplate;
|
|
|
+ @Autowired
|
|
|
+ ConfigConstant configConstant;
|
|
|
+
|
|
|
+ @PostConstruct
|
|
|
+ public void init() {
|
|
|
+ restClient = bbossESStarterEs5.getRestClient("es5");
|
|
|
+ }
|
|
|
+
|
|
|
+ @KafkaListener(id = "${spring.kafka.topic_pid_update_v1}"
|
|
|
+ , topics = "${spring.kafka.topic_pid_update_v1}"
|
|
|
+ , groupId = "${spring.kafka.consumer.group-id}", containerFactory = "containerFactory", errorHandler = "handlerV1")
|
|
|
+ public void updatePid(List<String> messages) {
|
|
|
+ List<Tuple<Map<String, String>, JSONObject>> list = messages.stream()
|
|
|
+ .flatMap(m ->
|
|
|
+ JSONArray.parseArray(m).stream()
|
|
|
+ .map(x -> {
|
|
|
+ JSONObject j = (JSONObject) x;
|
|
|
+ Map<String, String> map = new HashMap<>();
|
|
|
+ map.put("old_human_pid", j.getOrDefault("old_human_pid", "").toString());
|
|
|
+ map.put("new_human_pid", j.getOrDefault("new_human_pid", "").toString());
|
|
|
+ return Tuple.tuple(map, j);
|
|
|
+ })).filter(distinctByKey(m -> m.v1().get("old_human_pid")))
|
|
|
+ .collect(Collectors.toList());
|
|
|
+ try {
|
|
|
+ CompletableFuture.allOf(list.stream().map(this::update).toArray(CompletableFuture[]::new)).get();
|
|
|
+ } catch (Exception e) {
|
|
|
+ list.stream().map(Tuple::v2).forEach(d -> {
|
|
|
+ sendMessage(JSON.toJSONString(Collections.singletonList(d)), configConstant.topic_pid_update_v1);
|
|
|
+ });
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private CompletableFuture<Void> update(Tuple<Map<String, String>, JSONObject> update) {
|
|
|
+ String old_human_pid = update.v1().get("old_human_pid");
|
|
|
+ String new_human_pid = update.v1().get("new_human_pid");
|
|
|
+ if (StringUtils.isBlank(old_human_pid) || StringUtils.isBlank(new_human_pid)) {
|
|
|
+ return CompletableFuture.completedFuture(null);
|
|
|
+ }
|
|
|
+
|
|
|
+ CompletableFuture<Boolean> v1 = updateByQuery(old_human_pid, new_human_pid, "company-human-pid-mapping-v1", ESUtils.updateHumanMappingId(old_human_pid, new_human_pid));
|
|
|
+ CompletableFuture<Boolean> v2 = updateByQuery(old_human_pid, new_human_pid, "company-human-relation-v1", ESUtils.updateBossId(old_human_pid));
|
|
|
+
|
|
|
+ return CompletableFuture.allOf(v1, v2).thenApplyAsync(x -> {
|
|
|
+ try {
|
|
|
+ if (v1.get() && v2.get()) {
|
|
|
+ sendMessage(update.v2().toJSONString(), configConstant.topic_pid_update_v2);
|
|
|
+ } else {
|
|
|
+ sendMessage(JSON.toJSONString(Collections.singletonList(update.v2())), configConstant.topic_pid_update_v1);
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ log.error("update es error old_human_pid :{} ,new_human_pid:{} ,message:{}", old_human_pid, new_human_pid, e.getMessage());
|
|
|
+ sendMessage(JSON.toJSONString(Collections.singletonList(update.v2())), configConstant.topic_pid_update_v1);
|
|
|
+ }
|
|
|
+ return x;
|
|
|
+ });
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ private CompletableFuture<Boolean> updateByQuery(String old_human_pid, String new_human_pid, String index, String query) {
|
|
|
+ return CompletableFuture.supplyAsync(() -> {
|
|
|
+ try {
|
|
|
+ String res = restClient.updateByQuery(index + "/_update_by_query?conflicts=proceed&refresh=true", query);
|
|
|
+ if (res.contains("\"version_conflicts\":0")) {
|
|
|
+ log.info("update info old_human_pid : {} ,new_human_pid : {} , res : {}", old_human_pid, new_human_pid, res);
|
|
|
+ return true;
|
|
|
+ } else {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }, Base.TASK_FJ_POOL);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void sendMessage(String message, String topic) {
|
|
|
+ kafkaProduce.produce(topic, message);
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ public static <T> Predicate<T> distinctByKey(Function<? super T, ?> keyExtractor) {
|
|
|
+ Set<Object> seen = ConcurrentHashMap.newKeySet();
|
|
|
+ return t -> seen.add(keyExtractor.apply(t));
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 因为手动确认消费,若消费失败,记录重刷
|
|
|
+ */
|
|
|
+ @Bean("handlerV1")
|
|
|
+ public ConsumerAwareListenerErrorHandler dealError() {
|
|
|
+ return (message, e, consumer) -> {
|
|
|
+ List<String> list = CompanyUtils.toMessage((List<ConsumerRecord>) message.getPayload());
|
|
|
+ MongoCollection<Document> col = mongoTemplate.getCollection("xf_ng_rt_pid_change_error");
|
|
|
+ for (String msg : list) {
|
|
|
+ Document document = new Document();
|
|
|
+ document.put("message", msg);
|
|
|
+ try {
|
|
|
+ col.insertOne(document);
|
|
|
+ } catch (Exception e1) {
|
|
|
+ log.error(e1.getMessage(), e1);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ log.error("kafkaConsumerPersonIdUpdate error: {}", e.toString());
|
|
|
+ return null;
|
|
|
+ };
|
|
|
+ }
|
|
|
+
|
|
|
+}
|