123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355 |
- package com.winhc.bigdata.task.jobs;
- import com.alibaba.fastjson.JSON;
- import com.alibaba.fastjson.JSONArray;
- import com.alibaba.fastjson.JSONObject;
- import com.mongodb.client.MongoCollection;
- import com.winhc.bigdata.task.service.DingTalkService;
- import com.winhc.bigdata.task.util.ElasticsearchQueryUtil;
- import com.winhc.bigdata.task.util.ThrowableUtils;
- import lombok.Data;
- import lombok.RequiredArgsConstructor;
- import lombok.experimental.ExtensionMethod;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.commons.lang3.StringUtils;
- import org.bson.Document;
- import org.elasticsearch.common.TriFunction;
- import org.elasticsearch.common.collect.Tuple;
- import org.frameworkset.elasticsearch.boot.BBossESStarter;
- import org.frameworkset.elasticsearch.bulk.BulkProcessor;
- import org.frameworkset.elasticsearch.client.ClientInterface;
- import org.frameworkset.elasticsearch.client.ClientOptions;
- import org.frameworkset.elasticsearch.entity.ESDatas;
- import org.frameworkset.elasticsearch.entity.MetaMap;
- import org.frameworkset.util.CollectionUtils;
- import org.frameworkset.util.ObjectUtils;
- import org.springframework.data.mongodb.core.MongoTemplate;
- import org.springframework.kafka.annotation.KafkaListener;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.stereotype.Component;
- import javax.annotation.PostConstruct;
- import java.util.*;
- import java.util.concurrent.CompletableFuture;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.ForkJoinPool;
- import java.util.function.Function;
- import java.util.function.Predicate;
- import java.util.stream.Collectors;
- import java.util.stream.Stream;
- /**
- * @author ZhangJi
- * @since 2021-08-02 18:46
- */
- @Slf4j
- @Component
- @RequiredArgsConstructor
- @ExtensionMethod(DynamicPersonIdUpdateJob.StringStringMapExt.class)
- public class DynamicPersonIdUpdateJob {
- public static ForkJoinPool TASK_FJ_POOL = new ForkJoinPool(Runtime.getRuntime().availableProcessors() * 3);
- private final ClientInterface cRestClient;
- private final BBossESStarter bBossESStarter;
- private ClientInterface restClient;
- private final KafkaTemplate<String, String> kafkaTemplate;
- private final DingTalkService dingTalkService;
- private final BulkProcessor processor;
- private final MongoTemplate mongoTemplate;
- @PostConstruct
- public void init() {
- restClient = bBossESStarter.getRestClient();
- }
- public static <T> Predicate<T> distinctByKey(Function<? super T, ?> keyExtractor) {
- Set<Object> seen = ConcurrentHashMap.newKeySet();
- return t -> seen.add(keyExtractor.apply(t));
- }
- @KafkaListener(id = "update-person-id",
- topics = "inc_human_pid_change",
- groupId = "xjk_group",
- containerFactory = "smallContainerFactory")
- public void updatePersonId(List<String> payloads) {
- List<Tuple<Map<String, String>, JSONObject>> params = payloads
- .stream()
- .map(JSON::parseObject)
- .map(j -> {
- Map<String, String> map = new HashMap<>();
- map.putOld(j.ohp());
- map.putNew(j.nhp());
- return Tuple.tuple(map, j);
- })
- .filter(distinctByKey(m -> m.v1().ohp()))
- .collect(Collectors.toList());
- batchFromUpdate(params);
- }
- private void batchFromUpdate(List<Tuple<Map<String, String>, JSONObject>> params) {
- try {
- CompletableFuture.allOf(params.stream().map(this::update).toArray(CompletableFuture[]::new)).get();
- } catch (InterruptedException | ExecutionException e) {
- log.error("waiting processor error", e);
- dingTalkService.error("{}\n{}", e.getMessage(), ThrowableUtils.getStackTraceByPn(e, "com.winhc.bigdata.task"));
- params.stream().map(Tuple::v2).map(j -> j.retry()).forEach(s -> kafkaTemplate.send("inc_human_pid_change", s));
- }
- }
- private CompletableFuture<Void> update(Tuple<Map<String, String>, JSONObject> update) {
- if (!update.v1().valid()) {
- return CompletableFuture.completedFuture(null);
- }
- String query = ElasticsearchQueryUtil.updatePersonId(update.v1().ohp(), update.v1().nhp());
- return CompletableFuture.runAsync(() -> {
- try {
- String s = restClient.updateByQuery("winhc-company-dynamic/_update_by_query?conflicts=proceed", query);
- if (!(s.contains("version_conflicts") && s.contains("\"version_conflicts\": 0"))) {
- log.warn(s);
- }
- } catch (Exception e) {
- dingTalkService.error("{}\n{}", e.getMessage(), ThrowableUtils.getStackTraceByPn(e, "com.winhc.bigdata.task"));
- }
- /* if (s.contains("version_conflicts") && s.contains("\"version_conflicts\": 0")) {
- return;
- }
- //TODO ES更新处理版本冲突问题 1. 延迟队列,2.重新入队,待验证
- kafkaTemplate.send("inc_human_pid_change", update.v2().retry());*/
- }, TASK_FJ_POOL);
- }
- @KafkaListener(id = "create-person-id",
- topics = "company_dynamic_update",
- groupId = "${spring.kafka.consumer.group-id:xjk_group}",
- containerFactory = "smallContainerFactory")
- public void createPersonId(List<String> payloads) {
- Map<String, List<JSONObject>> collect = payloads.toJSONStream()
- .collect(Collectors.groupingBy(j -> j.getString("flag").orDefault("0")));
- // System.out.println(collect);
- batchFromCreate(collect);
- }
- /*=======================flag 0 8 9================*/
- private void batchFromCreate(Map<String, List<JSONObject>> params) {
- CompletableFuture<Void> do0 = doAll(params.get("0"), ElasticsearchQueryUtil::createPersonId);
- CompletableFuture<Void> do9 = doAll(params.get("9"), ElasticsearchQueryUtil::deleteDynamic);
- CompletableFuture<Void> do8 = do8(params.get("8"));
- try {
- CompletableFuture.allOf(do0, do9, do8).get();
- } catch (InterruptedException | ExecutionException e) {
- log.error("waiting processor error", e);
- if (params.containsKey("8")) {
- log.error("xhasjkdfhjk:{}", params.get("8"));
- }
- dingTalkService.error("{}\n{}", e.getMessage(), ThrowableUtils.getStackTraceByPn(e, "com.winhc.bigdata.task"));
- //TODO 执行失败需要重新入队,待验证
- }
- }
- private CompletableFuture<Void> doAll(List<JSONObject> params, TriFunction<String, String, List<Tuple<String, String>>, String> function) {
- if (CollectionUtils.isEmpty(params)) {
- return CompletableFuture.completedFuture(null);
- }
- return CompletableFuture.allOf(params.stream().map(j -> fromDynamic(j, function)).toArray(CompletableFuture[]::new));
- }
- private CompletableFuture<Void> fromDynamic(JSONObject param, TriFunction<String, String, List<Tuple<String, String>>, String> function) {
- if (!param.createValid()) {
- log.warn("Illegal data\n{}", param);
- return CompletableFuture.completedFuture(null);
- }
- String tn = param.getString("tn");
- String rowkey = param.getString("rowkey");
- Object flag = param.getOrDefault("flag", "0");
- List<JSONObject> entity = param.getJSONArray("entity").orDefault(new JSONArray()).toJavaList(JSONObject.class);
- if (!"9".equals(flag) && entity.isEmpty()) {
- log.error("新增id没有实体对象\n{}", param);
- return CompletableFuture.completedFuture(null);
- }
- List<Tuple<String, String>> crates = entity.stream().map(j -> j.toCreateTuple()).collect(Collectors.toList());
- String createUpdate = function.apply(rowkey, tn, crates);
- // System.out.println(createUpdate);
- return CompletableFuture.runAsync(() -> {
- try {
- String s = restClient.updateByQuery("winhc-company-dynamic/_update_by_query?conflicts=proceed", createUpdate);
- if (s.contains("version_conflicts") && s.contains("\"version_conflicts\":0")) {
- //do something
- } else {
- log.error("update error {}", s);
- }
- } catch (Exception e) {
- log.error("execute update error", e);
- dingTalkService.error("{}\n{}", e.getMessage(), ThrowableUtils.getStackTraceByPn(e, "com.winhc.bigdata.task"));
- }
- }, TASK_FJ_POOL);
- }
- private CompletableFuture<Void> do8(List<JSONObject> params) {
- if (CollectionUtils.isEmpty(params)) {
- return CompletableFuture.completedFuture(null);
- }
- List<CompletableFuture<Void>> cr = params.stream().map(j -> new ScanParams(j.getString("tn"), j.getString("company_id"), j.getString("relation_id"))).map(p -> CompletableFuture.runAsync(() -> scan(p))).collect(Collectors.toList());
- List<CompletableFuture<Void>> rc = params.stream().map(j -> new ScanParams(j.getString("tn"), j.getString("relation_id"), j.getString("company_id"))).map(p -> CompletableFuture.runAsync(() -> scan(p))).collect(Collectors.toList());
- cr.addAll(rc);
- return CompletableFuture.allOf(cr.toArray(new CompletableFuture[0]));
- }
- @RequiredArgsConstructor
- @Data
- public static class ScanParams {
- private final String tn;
- private final String keyno;
- private final String validId;
- }
- public void scan(ScanParams params) {
- if (StringUtils.isEmpty(params.keyno) || StringUtils.isEmpty(params.tn) || StringUtils.isEmpty(params.validId)) {
- return;
- }
- cRestClient.scroll("winhc-company-dynamic/_search", "scrollQuery", "1m", params.toMap(), MetaMap.class, (response, handlerInfo) -> {
- Optional
- .ofNullable(response)
- .map(ESDatas::getDatas)
- .ifPresent(
- l -> l.forEach(m -> remove8(m.getId(), m.getIndex(), params.getKeyno(), params.getValidId(), m))
- // l -> l.forEach(hit-> remove8(hit.getId(), params.getKeyno(), params.getValidId(),hit.asMap()))
- );
- });
- }
- private void remove8(String id, String index, String keyno, String validId, Map<String, Object> data) {
- MongoCollection<Document> coll = mongoTemplate.getCollection("a_dynamic_update_flag_8_20210820");
- Document document = new Document();
- document.put("_id", id);
- document.put("keyno", keyno);
- document.put("validId", validId);
- document.put("content", data);
- try {
- coll.insertOne(document);
- } catch (Exception e) {
- log.error(e.getMessage(), e);
- }
- int i = new Random().nextInt(3);
- if (i == 0) {
- log.info("riewqujir:\n{} : {}\n{}", keyno, validId, JSON.toJSONString(data));
- }
- if (StringUtils.isEmpty(id)) {
- return;
- }
- JSONObject jData = new JSONObject(data);
- String dynamic_info = jData.getString("dynamic_info");
- if (StringUtils.isEmpty(dynamic_info)) return;
- try {
- JSONObject jsonObject = JSON.parseObject(dynamic_info);
- JSONArray content = jsonObject.getJSONArray("content");
- if (ObjectUtils.isEmpty(content)) {
- log.warn("content为空");
- return;
- }
- List<JSONObject> collect = content.toJavaList(JSONObject.class).stream().filter(j -> {
- JSONObject entity = j.getJSONObject("entity");
- if (!keyno.equals(entity.getString("keyno"))) return true;
- try {
- return !j.anyMatchByNestedObject(validId, "keyno", "before", "after");
- } catch (Exception e) {
- log.error("jasdjf");
- log.error("data:{}\n{}", jData.toJSONString(), e.getMessage());
- log.error(e.getMessage(), e);
- return true;
- }
- }).collect(Collectors.toList());
- if (collect.isEmpty()) {
- Map<String, Object> map = new HashMap<>();
- map.put("deleted", "9");
- ClientOptions clientOptions = new ClientOptions();
- clientOptions.setId(id);
- processor.updateData(index, "dynamic", map, clientOptions);
- } else {
- jsonObject.put("content", collect);
- ClientOptions clientOptions = new ClientOptions();
- clientOptions.setId(id);
- processor.insertData(index, "dynamic", jsonObject, clientOptions);
- }
- } catch (Exception e) {
- log.error("parse json error", e);
- dingTalkService.error("{}\n{}", e.getMessage(), ThrowableUtils.getStackTraceByPn(e, "com.winhc.bigdata.task"));
- }
- }
- @SuppressWarnings("unused")
- public static class StringStringMapExt {
- public static boolean anyMatchByNestedObject(JSONObject jo, String val, String key, String... fields) {
- return Stream.of(fields).map(jo::getJSONObject).filter(Objects::nonNull).map(j -> j.getString(key)).anyMatch(val::equals);
- }
- public static Stream<JSONObject> toJSONStream(List<String> strings) {
- return strings.stream().map(JSONObject::parseObject);
- }
- public static Map<String, Object> toMap(ScanParams sp) {
- Map<String, Object> stringObjectHashMap = new HashMap<>();
- stringObjectHashMap.put("tn", sp.getTn());
- stringObjectHashMap.put("keyno", sp.getKeyno());
- return stringObjectHashMap;
- }
- public static JSONArray orDefault(JSONArray j, JSONArray dj) {
- return CollectionUtils.isEmpty(j) ? dj : j;
- }
- public static boolean createValid(JSONObject j) {
- return j.containsKey("tn") && j.containsKey("rowkey");
- }
- public static String orDefault(String s, String ds) {
- return StringUtils.isEmpty(s) ? ds : s;
- }
- public static Tuple<String, String> toCreateTuple(JSONObject j) {
- return Tuple.tuple(j.getString("name"), j.getString("keyno"));
- }
- public static String retry(JSONObject j) {
- return j.fluentPut("retry", j.getIntValue("retry") + 1).toJSONString();
- }
- public static String ohp(Map<String, String> update) {
- return update.get("old_human_pid");
- }
- public static String nhp(Map<String, String> update) {
- return update.get("new_human_pid");
- }
- public static String ohp(JSONObject update) {
- return update.getString("old_human_pid");
- }
- public static String nhp(JSONObject update) {
- return update.getString("new_human_pid");
- }
- public static void putOld(Map<String, String> update, String pid) {
- update.put("old_human_pid", pid);
- }
- public static void putNew(Map<String, String> update, String pid) {
- update.put("new_human_pid", pid);
- }
- public static boolean valid(Map<String, String> update) {
- return update.containsKey("old_human_pid") && update.containsKey("new_human_pid");
- }
- }
- }
|