|
@@ -0,0 +1,225 @@
|
|
|
+package com.winhc.bigdata.task.jobs;
|
|
|
+
|
|
|
+import com.alibaba.fastjson.JSON;
|
|
|
+import com.alibaba.fastjson.JSONArray;
|
|
|
+import com.alibaba.fastjson.JSONObject;
|
|
|
+import com.winhc.bigdata.task.service.DingTalkService;
|
|
|
+import com.winhc.bigdata.task.util.ElasticsearchQueryUtil;
|
|
|
+import com.winhc.bigdata.task.util.ThrowableUtils;
|
|
|
+import lombok.RequiredArgsConstructor;
|
|
|
+import lombok.experimental.ExtensionMethod;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
|
+import org.elasticsearch.common.TriFunction;
|
|
|
+import org.elasticsearch.common.collect.Tuple;
|
|
|
+import org.frameworkset.elasticsearch.client.ClientInterface;
|
|
|
+import org.frameworkset.util.CollectionUtils;
|
|
|
+import org.springframework.kafka.annotation.KafkaListener;
|
|
|
+import org.springframework.kafka.core.KafkaTemplate;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.Set;
|
|
|
+import java.util.concurrent.CompletableFuture;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import java.util.concurrent.ExecutionException;
|
|
|
+import java.util.concurrent.ForkJoinPool;
|
|
|
+import java.util.function.Function;
|
|
|
+import java.util.function.Predicate;
|
|
|
+import java.util.stream.Collectors;
|
|
|
+import java.util.stream.Stream;
|
|
|
+
|
|
|
+/**
|
|
|
+ * @author ZhangJi
|
|
|
+ * @since 2021-08-02 18:46
|
|
|
+ */
|
|
|
+@Slf4j
|
|
|
+@Component
|
|
|
+@RequiredArgsConstructor
|
|
|
+@ExtensionMethod(DynamicPersonIdUpdateJob.StringStringMapExt.class)
|
|
|
+public class DynamicPersonIdUpdateJob {
|
|
|
+ public static ForkJoinPool TASK_FJ_POOL = new ForkJoinPool(Runtime.getRuntime().availableProcessors() * 3);
|
|
|
+ private final ClientInterface restClient;
|
|
|
+ private final KafkaTemplate<String, String> kafkaTemplate;
|
|
|
+ private final DingTalkService dingTalkService;
|
|
|
+
|
|
|
+ public static <T> Predicate<T> distinctByKey(Function<? super T, ?> keyExtractor) {
|
|
|
+ Set<Object> seen = ConcurrentHashMap.newKeySet();
|
|
|
+ return t -> seen.add(keyExtractor.apply(t));
|
|
|
+ }
|
|
|
+
|
|
|
+ @KafkaListener(id = "update-person-id",
|
|
|
+ topics = "inc_human_pid_change",
|
|
|
+ groupId = "update_person_group",
|
|
|
+ containerFactory = "smallContainerFactory")
|
|
|
+ public void updatePersonId(List<String> payloads) {
|
|
|
+ List<Tuple<Map<String, String>, JSONObject>> params = payloads
|
|
|
+ .stream()
|
|
|
+ .map(JSON::parseObject)
|
|
|
+ .map(j -> {
|
|
|
+ Map<String, String> map = new HashMap<>();
|
|
|
+ map.putOld(j.ohp());
|
|
|
+ map.putNew(j.nhp());
|
|
|
+ return Tuple.tuple(map, j);
|
|
|
+ })
|
|
|
+ .filter(distinctByKey(m -> m.v1().ohp()))
|
|
|
+ .collect(Collectors.toList());
|
|
|
+ batchFromUpdate(params);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void batchFromUpdate(List<Tuple<Map<String, String>, JSONObject>> params) {
|
|
|
+ try {
|
|
|
+ CompletableFuture.allOf(params.stream().map(this::update).toArray(CompletableFuture[]::new)).get();
|
|
|
+ } catch (InterruptedException | ExecutionException e) {
|
|
|
+ log.error("waiting processor error", e);
|
|
|
+ dingTalkService.error("%s\n%s", e.getMessage(), ThrowableUtils.getStackTraceByPn(e, "com.winhc.bigdata.task"));
|
|
|
+ params.stream().map(Tuple::v2).map(j -> j.retry()).forEach(s -> kafkaTemplate.send("inc_human_pid_change", s));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private CompletableFuture<Void> update(Tuple<Map<String, String>, JSONObject> update) {
|
|
|
+ if (!update.v1().valid()) {
|
|
|
+ return CompletableFuture.completedFuture(null);
|
|
|
+ }
|
|
|
+ String query = ElasticsearchQueryUtil.updatePersonId(update.v1().ohp(), update.v1().nhp());
|
|
|
+ return CompletableFuture.runAsync(() -> {
|
|
|
+ try {
|
|
|
+
|
|
|
+ String s = restClient.updateByQuery("winhc-company-dynamic/_update_by_query?conflicts=proceed", query);
|
|
|
+ if (!(s.contains("version_conflicts") && s.contains("\"version_conflicts\": 0"))) {
|
|
|
+ log.warn(s);
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ dingTalkService.error("%s\n%s", e.getMessage(), ThrowableUtils.getStackTraceByPn(e, "com.winhc.bigdata.task"));
|
|
|
+ }
|
|
|
+ /* if (s.contains("version_conflicts") && s.contains("\"version_conflicts\": 0")) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ //TODO ES更新处理版本冲突问题 1. 延迟队列,2.重新入队,待验证
|
|
|
+ kafkaTemplate.send("inc_human_pid_change", update.v2().retry());*/
|
|
|
+ }, TASK_FJ_POOL);
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ @KafkaListener(id = "create-person-id",
|
|
|
+ topics = "company_dynamic_update",
|
|
|
+ groupId = "${spring.kafka.consumer.group-id:xjk_group}",
|
|
|
+ containerFactory = "smallContainerFactory")
|
|
|
+ public void createPersonId(List<String> payloads) {
|
|
|
+ Map<String, List<JSONObject>> collect = payloads.toJSONStream()
|
|
|
+ .collect(Collectors.groupingBy(j -> j.getString("flag").orDefault("0")));
|
|
|
+ System.out.println(collect);
|
|
|
+ batchFromCreate(collect);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void batchFromCreate(Map<String, List<JSONObject>> params) {
|
|
|
+ CompletableFuture<Void> do0 = doAll(params.get("0"), ElasticsearchQueryUtil::createPersonId);
|
|
|
+ CompletableFuture<Void> do9 = doAll(params.get("9"), ElasticsearchQueryUtil::deleteDynamic);
|
|
|
+ try {
|
|
|
+ CompletableFuture.allOf(do0, do9).get();
|
|
|
+ } catch (InterruptedException | ExecutionException e) {
|
|
|
+ log.error("waiting processor error", e);
|
|
|
+ dingTalkService.error("%s\n%s", e.getMessage(), ThrowableUtils.getStackTraceByPn(e, "com.winhc.bigdata.task"));
|
|
|
+ //TODO 执行失败需要重新入队,待验证
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private CompletableFuture<Void> doAll(List<JSONObject> params, TriFunction<String, String, List<Tuple<String, String>>, String> function) {
|
|
|
+ if (CollectionUtils.isEmpty(params)) {
|
|
|
+ return CompletableFuture.completedFuture(null);
|
|
|
+ }
|
|
|
+ return CompletableFuture.allOf(params.stream().map(j -> fromDynamic(j, function)).toArray(CompletableFuture[]::new));
|
|
|
+ }
|
|
|
+
|
|
|
+ private CompletableFuture<Void> fromDynamic(JSONObject param, TriFunction<String, String, List<Tuple<String, String>>, String> function) {
|
|
|
+
|
|
|
+ if (!param.createValid()) {
|
|
|
+ log.warn("Illegal data\n{}", param);
|
|
|
+ return CompletableFuture.completedFuture(null);
|
|
|
+ }
|
|
|
+ String tn = param.getString("tn");
|
|
|
+ String rowkey = param.getString("rowkey");
|
|
|
+ Object flag = param.getOrDefault("flag", "0");
|
|
|
+ List<JSONObject> entity = param.getJSONArray("entity").orDefault(new JSONArray()).toJavaList(JSONObject.class);
|
|
|
+ if (!"9".equals(flag) && entity.isEmpty()) {
|
|
|
+ log.error("新增id没有实体对象\n{}", param);
|
|
|
+ return CompletableFuture.completedFuture(null);
|
|
|
+ }
|
|
|
+ List<Tuple<String, String>> crates = entity.stream().map(j -> j.toCreateTuple()).collect(Collectors.toList());
|
|
|
+ String createUpdate = function.apply(rowkey, tn, crates);
|
|
|
+ System.out.println(createUpdate);
|
|
|
+
|
|
|
+ return CompletableFuture.runAsync(() -> {
|
|
|
+ try {
|
|
|
+ String s = restClient.updateByQuery("winhc-company-dynamic_inc_v1/_update_by_query?conflicts=proceed", createUpdate);
|
|
|
+ if (s.contains("version_conflicts") && s.contains("\"version_conflicts\": 0")) {
|
|
|
+ //do something
|
|
|
+ } else {
|
|
|
+ log.error("update error {}", s);
|
|
|
+
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("execute update error", e);
|
|
|
+ dingTalkService.error("%s\n%s", e.getMessage(), ThrowableUtils.getStackTraceByPn(e, "com.winhc.bigdata.task"));
|
|
|
+ }
|
|
|
+ }, TASK_FJ_POOL);
|
|
|
+ }
|
|
|
+
|
|
|
+ @SuppressWarnings("unused")
|
|
|
+ public static class StringStringMapExt {
|
|
|
+ public static Stream<JSONObject> toJSONStream(List<String> strings) {
|
|
|
+ return strings.stream().map(JSONObject::parseObject);
|
|
|
+ }
|
|
|
+
|
|
|
+ public static JSONArray orDefault(JSONArray j, JSONArray dj) {
|
|
|
+ return CollectionUtils.isEmpty(j) ? dj : j;
|
|
|
+ }
|
|
|
+
|
|
|
+ public static boolean createValid(JSONObject j) {
|
|
|
+ return j.containsKey("tn") && j.containsKey("rowkey");
|
|
|
+ }
|
|
|
+
|
|
|
+ public static String orDefault(String s, String ds) {
|
|
|
+ return StringUtils.isEmpty(s) ? ds : s;
|
|
|
+ }
|
|
|
+
|
|
|
+ public static Tuple<String, String> toCreateTuple(JSONObject j) {
|
|
|
+ return Tuple.tuple(j.getString("name"), j.getString("keyno"));
|
|
|
+ }
|
|
|
+
|
|
|
+ public static String retry(JSONObject j) {
|
|
|
+ return j.fluentPut("retry", j.getIntValue("retry") + 1).toJSONString();
|
|
|
+ }
|
|
|
+
|
|
|
+ public static String ohp(Map<String, String> update) {
|
|
|
+ return update.get("old_human_pid");
|
|
|
+ }
|
|
|
+
|
|
|
+ public static String nhp(Map<String, String> update) {
|
|
|
+ return update.get("new_human_pid");
|
|
|
+ }
|
|
|
+
|
|
|
+ public static String ohp(JSONObject update) {
|
|
|
+ return update.getString("old_human_pid");
|
|
|
+ }
|
|
|
+
|
|
|
+ public static String nhp(JSONObject update) {
|
|
|
+ return update.getString("new_human_pid");
|
|
|
+ }
|
|
|
+
|
|
|
+ public static void putOld(Map<String, String> update, String pid) {
|
|
|
+ update.put("old_human_pid", pid);
|
|
|
+ }
|
|
|
+
|
|
|
+ public static void putNew(Map<String, String> update, String pid) {
|
|
|
+ update.put("new_human_pid", pid);
|
|
|
+ }
|
|
|
+
|
|
|
+ public static boolean valid(Map<String, String> update) {
|
|
|
+ return update.containsKey("old_human_pid") && update.containsKey("new_human_pid");
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|