DynamicPersonIdUpdateJob.java 15 KB


  1. package com.winhc.bigdata.task.jobs;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.JSONArray;
  4. import com.alibaba.fastjson.JSONObject;
  5. import com.mongodb.client.MongoCollection;
  6. import com.winhc.bigdata.task.service.DingTalkService;
  7. import com.winhc.bigdata.task.util.ElasticsearchQueryUtil;
  8. import com.winhc.bigdata.task.util.ThrowableUtils;
  9. import lombok.Data;
  10. import lombok.RequiredArgsConstructor;
  11. import lombok.experimental.ExtensionMethod;
  12. import lombok.extern.slf4j.Slf4j;
  13. import org.apache.commons.lang3.StringUtils;
  14. import org.bson.Document;
  15. import org.elasticsearch.common.TriFunction;
  16. import org.elasticsearch.common.collect.Tuple;
  17. import org.frameworkset.elasticsearch.boot.BBossESStarter;
  18. import org.frameworkset.elasticsearch.bulk.BulkProcessor;
  19. import org.frameworkset.elasticsearch.client.ClientInterface;
  20. import org.frameworkset.elasticsearch.client.ClientOptions;
  21. import org.frameworkset.elasticsearch.entity.ESDatas;
  22. import org.frameworkset.elasticsearch.entity.MetaMap;
  23. import org.frameworkset.util.CollectionUtils;
  24. import org.frameworkset.util.ObjectUtils;
  25. import org.springframework.data.mongodb.core.MongoTemplate;
  26. import org.springframework.kafka.annotation.KafkaListener;
  27. import org.springframework.kafka.core.KafkaTemplate;
  28. import org.springframework.stereotype.Component;
  29. import javax.annotation.PostConstruct;
  30. import java.util.*;
  31. import java.util.concurrent.CompletableFuture;
  32. import java.util.concurrent.ConcurrentHashMap;
  33. import java.util.concurrent.ExecutionException;
  34. import java.util.concurrent.ForkJoinPool;
  35. import java.util.function.Function;
  36. import java.util.function.Predicate;
  37. import java.util.stream.Collectors;
  38. import java.util.stream.Stream;
  39. /**
  40. * @author ZhangJi
  41. * @since 2021-08-02 18:46
  42. */
  43. @Slf4j
  44. @Component
  45. @RequiredArgsConstructor
  46. @ExtensionMethod(DynamicPersonIdUpdateJob.StringStringMapExt.class)
  47. public class DynamicPersonIdUpdateJob {
  48. public static ForkJoinPool TASK_FJ_POOL = new ForkJoinPool(Runtime.getRuntime().availableProcessors() * 3);
  49. private final ClientInterface cRestClient;
  50. private final BBossESStarter bBossESStarter;
  51. private ClientInterface restClient;
  52. private final KafkaTemplate<String, String> kafkaTemplate;
  53. private final DingTalkService dingTalkService;
  54. private final BulkProcessor processor;
  55. private final MongoTemplate mongoTemplate;
  56. @PostConstruct
  57. public void init() {
  58. restClient = bBossESStarter.getRestClient();
  59. }
  60. public static <T> Predicate<T> distinctByKey(Function<? super T, ?> keyExtractor) {
  61. Set<Object> seen = ConcurrentHashMap.newKeySet();
  62. return t -> seen.add(keyExtractor.apply(t));
  63. }
  64. @KafkaListener(id = "update-person-id",
  65. topics = "inc_human_pid_change",
  66. groupId = "xjk_group",
  67. containerFactory = "smallContainerFactory")
  68. public void updatePersonId(List<String> payloads) {
  69. List<Tuple<Map<String, String>, JSONObject>> params = payloads
  70. .stream()
  71. .map(JSON::parseObject)
  72. .map(j -> {
  73. Map<String, String> map = new HashMap<>();
  74. map.putOld(j.ohp());
  75. map.putNew(j.nhp());
  76. return Tuple.tuple(map, j);
  77. })
  78. .filter(distinctByKey(m -> m.v1().ohp()))
  79. .collect(Collectors.toList());
  80. batchFromUpdate(params);
  81. }
  82. private void batchFromUpdate(List<Tuple<Map<String, String>, JSONObject>> params) {
  83. try {
  84. CompletableFuture.allOf(params.stream().map(this::update).toArray(CompletableFuture[]::new)).get();
  85. } catch (InterruptedException | ExecutionException e) {
  86. log.error("waiting processor error", e);
  87. dingTalkService.error("{}\n{}", e.getMessage(), ThrowableUtils.getStackTraceByPn(e, "com.winhc.bigdata.task"));
  88. params.stream().map(Tuple::v2).map(j -> j.retry()).forEach(s -> kafkaTemplate.send("inc_human_pid_change", s));
  89. }
  90. }
  91. private CompletableFuture<Void> update(Tuple<Map<String, String>, JSONObject> update) {
  92. if (!update.v1().valid()) {
  93. return CompletableFuture.completedFuture(null);
  94. }
  95. String query = ElasticsearchQueryUtil.updatePersonId(update.v1().ohp(), update.v1().nhp());
  96. return CompletableFuture.runAsync(() -> {
  97. try {
  98. String s = restClient.updateByQuery("winhc-company-dynamic/_update_by_query?conflicts=proceed", query);
  99. if (!(s.contains("version_conflicts") && s.contains("\"version_conflicts\": 0"))) {
  100. log.warn(s);
  101. }
  102. } catch (Exception e) {
  103. dingTalkService.error("{}\n{}", e.getMessage(), ThrowableUtils.getStackTraceByPn(e, "com.winhc.bigdata.task"));
  104. }
  105. /* if (s.contains("version_conflicts") && s.contains("\"version_conflicts\": 0")) {
  106. return;
  107. }
  108. //TODO ES更新处理版本冲突问题 1. 延迟队列,2.重新入队,待验证
  109. kafkaTemplate.send("inc_human_pid_change", update.v2().retry());*/
  110. }, TASK_FJ_POOL);
  111. }
  112. @KafkaListener(id = "create-person-id",
  113. topics = "company_dynamic_update",
  114. groupId = "${spring.kafka.consumer.group-id:xjk_group}",
  115. containerFactory = "smallContainerFactory")
  116. public void createPersonId(List<String> payloads) {
  117. Map<String, List<JSONObject>> collect = payloads.toJSONStream()
  118. .collect(Collectors.groupingBy(j -> j.getString("flag").orDefault("0")));
  119. // System.out.println(collect);
  120. batchFromCreate(collect);
  121. }
  122. /*=======================flag 0 8 9================*/
  123. private void batchFromCreate(Map<String, List<JSONObject>> params) {
  124. CompletableFuture<Void> do0 = doAll(params.get("0"), ElasticsearchQueryUtil::createPersonId);
  125. CompletableFuture<Void> do9 = doAll(params.get("9"), ElasticsearchQueryUtil::deleteDynamic);
  126. CompletableFuture<Void> do8 = do8(params.get("8"));
  127. try {
  128. CompletableFuture.allOf(do0, do9, do8).get();
  129. } catch (InterruptedException | ExecutionException e) {
  130. log.error("waiting processor error", e);
  131. if (params.containsKey("8")) {
  132. log.error("xhasjkdfhjk:{}", params.get("8"));
  133. }
  134. dingTalkService.error("{}\n{}", e.getMessage(), ThrowableUtils.getStackTraceByPn(e, "com.winhc.bigdata.task"));
  135. //TODO 执行失败需要重新入队,待验证
  136. }
  137. }
  138. private CompletableFuture<Void> doAll(List<JSONObject> params, TriFunction<String, String, List<Tuple<String, String>>, String> function) {
  139. if (CollectionUtils.isEmpty(params)) {
  140. return CompletableFuture.completedFuture(null);
  141. }
  142. return CompletableFuture.allOf(params.stream().map(j -> fromDynamic(j, function)).toArray(CompletableFuture[]::new));
  143. }
  144. private CompletableFuture<Void> fromDynamic(JSONObject param, TriFunction<String, String, List<Tuple<String, String>>, String> function) {
  145. if (!param.createValid()) {
  146. log.warn("Illegal data\n{}", param);
  147. return CompletableFuture.completedFuture(null);
  148. }
  149. String tn = param.getString("tn");
  150. String rowkey = param.getString("rowkey");
  151. Object flag = param.getOrDefault("flag", "0");
  152. List<JSONObject> entity = param.getJSONArray("entity").orDefault(new JSONArray()).toJavaList(JSONObject.class);
  153. if (!"9".equals(flag) && entity.isEmpty()) {
  154. log.error("新增id没有实体对象\n{}", param);
  155. return CompletableFuture.completedFuture(null);
  156. }
  157. List<Tuple<String, String>> crates = entity.stream().map(j -> j.toCreateTuple()).collect(Collectors.toList());
  158. String createUpdate = function.apply(rowkey, tn, crates);
  159. // System.out.println(createUpdate);
  160. return CompletableFuture.runAsync(() -> {
  161. try {
  162. String s = restClient.updateByQuery("winhc-company-dynamic/_update_by_query?conflicts=proceed", createUpdate);
  163. if (s.contains("version_conflicts") && s.contains("\"version_conflicts\":0")) {
  164. //do something
  165. } else {
  166. log.error("update error {}", s);
  167. }
  168. } catch (Exception e) {
  169. log.error("execute update error", e);
  170. dingTalkService.error("{}\n{}", e.getMessage(), ThrowableUtils.getStackTraceByPn(e, "com.winhc.bigdata.task"));
  171. }
  172. }, TASK_FJ_POOL);
  173. }
  174. private CompletableFuture<Void> do8(List<JSONObject> params) {
  175. if (CollectionUtils.isEmpty(params)) {
  176. return CompletableFuture.completedFuture(null);
  177. }
  178. List<CompletableFuture<Void>> cr = params.stream().map(j -> new ScanParams(j.getString("tn"), j.getString("company_id"), j.getString("relation_id"))).map(p -> CompletableFuture.runAsync(() -> scan(p))).collect(Collectors.toList());
  179. List<CompletableFuture<Void>> rc = params.stream().map(j -> new ScanParams(j.getString("tn"), j.getString("relation_id"), j.getString("company_id"))).map(p -> CompletableFuture.runAsync(() -> scan(p))).collect(Collectors.toList());
  180. cr.addAll(rc);
  181. return CompletableFuture.allOf(cr.toArray(new CompletableFuture[0]));
  182. }
  183. @RequiredArgsConstructor
  184. @Data
  185. public static class ScanParams {
  186. private final String tn;
  187. private final String keyno;
  188. private final String validId;
  189. }
  190. public void scan(ScanParams params) {
  191. if (StringUtils.isEmpty(params.keyno) || StringUtils.isEmpty(params.tn) || StringUtils.isEmpty(params.validId)) {
  192. return;
  193. }
  194. cRestClient.scroll("winhc-company-dynamic/_search", "scrollQuery", "1m", params.toMap(), MetaMap.class, (response, handlerInfo) -> {
  195. Optional
  196. .ofNullable(response)
  197. .map(ESDatas::getDatas)
  198. .ifPresent(
  199. l -> l.forEach(m -> remove8(m.getId(), m.getIndex(), params.getKeyno(), params.getValidId(), m))
  200. // l -> l.forEach(hit-> remove8(hit.getId(), params.getKeyno(), params.getValidId(),hit.asMap()))
  201. );
  202. });
  203. }
  204. private void remove8(String id, String index, String keyno, String validId, Map<String, Object> data) {
  205. MongoCollection<Document> coll = mongoTemplate.getCollection("a_dynamic_update_flag_8_20210820");
  206. Document document = new Document();
  207. document.put("_id", id);
  208. document.put("keyno", keyno);
  209. document.put("validId", validId);
  210. document.put("content", data);
  211. try {
  212. coll.insertOne(document);
  213. } catch (Exception e) {
  214. log.error(e.getMessage(), e);
  215. }
  216. int i = new Random().nextInt(3);
  217. if (i == 0) {
  218. log.info("riewqujir:\n{} : {}\n{}", keyno, validId, JSON.toJSONString(data));
  219. }
  220. if (StringUtils.isEmpty(id)) {
  221. return;
  222. }
  223. JSONObject jData = new JSONObject(data);
  224. String dynamic_info = jData.getString("dynamic_info");
  225. if (StringUtils.isEmpty(dynamic_info)) return;
  226. try {
  227. JSONObject jsonObject = JSON.parseObject(dynamic_info);
  228. JSONArray content = jsonObject.getJSONArray("content");
  229. if (ObjectUtils.isEmpty(content)) {
  230. log.warn("content为空");
  231. return;
  232. }
  233. List<JSONObject> collect = content.toJavaList(JSONObject.class).stream().filter(j -> {
  234. JSONObject entity = j.getJSONObject("entity");
  235. if (!keyno.equals(entity.getString("keyno"))) return true;
  236. try {
  237. return !j.anyMatchByNestedObject(validId, "keyno", "before", "after");
  238. } catch (Exception e) {
  239. log.error("jasdjf");
  240. log.error("data:{}\n{}", jData.toJSONString(), e.getMessage());
  241. log.error(e.getMessage(), e);
  242. return true;
  243. }
  244. }).collect(Collectors.toList());
  245. if (collect.isEmpty()) {
  246. Map<String, Object> map = new HashMap<>();
  247. map.put("deleted", "9");
  248. ClientOptions clientOptions = new ClientOptions();
  249. clientOptions.setId(id);
  250. processor.updateData(index, "dynamic", map, clientOptions);
  251. } else {
  252. jsonObject.put("content", collect);
  253. ClientOptions clientOptions = new ClientOptions();
  254. clientOptions.setId(id);
  255. processor.insertData(index, "dynamic", jsonObject, clientOptions);
  256. }
  257. } catch (Exception e) {
  258. log.error("parse json error", e);
  259. dingTalkService.error("{}\n{}", e.getMessage(), ThrowableUtils.getStackTraceByPn(e, "com.winhc.bigdata.task"));
  260. }
  261. }
  262. @SuppressWarnings("unused")
  263. public static class StringStringMapExt {
  264. public static boolean anyMatchByNestedObject(JSONObject jo, String val, String key, String... fields) {
  265. return Stream.of(fields).map(jo::getJSONObject).filter(Objects::nonNull).map(j -> j.getString(key)).anyMatch(val::equals);
  266. }
  267. public static Stream<JSONObject> toJSONStream(List<String> strings) {
  268. return strings.stream().map(JSONObject::parseObject);
  269. }
  270. public static Map<String, Object> toMap(ScanParams sp) {
  271. Map<String, Object> stringObjectHashMap = new HashMap<>();
  272. stringObjectHashMap.put("tn", sp.getTn());
  273. stringObjectHashMap.put("keyno", sp.getKeyno());
  274. return stringObjectHashMap;
  275. }
  276. public static JSONArray orDefault(JSONArray j, JSONArray dj) {
  277. return CollectionUtils.isEmpty(j) ? dj : j;
  278. }
  279. public static boolean createValid(JSONObject j) {
  280. return j.containsKey("tn") && j.containsKey("rowkey");
  281. }
  282. public static String orDefault(String s, String ds) {
  283. return StringUtils.isEmpty(s) ? ds : s;
  284. }
  285. public static Tuple<String, String> toCreateTuple(JSONObject j) {
  286. return Tuple.tuple(j.getString("name"), j.getString("keyno"));
  287. }
  288. public static String retry(JSONObject j) {
  289. return j.fluentPut("retry", j.getIntValue("retry") + 1).toJSONString();
  290. }
  291. public static String ohp(Map<String, String> update) {
  292. return update.get("old_human_pid");
  293. }
  294. public static String nhp(Map<String, String> update) {
  295. return update.get("new_human_pid");
  296. }
  297. public static String ohp(JSONObject update) {
  298. return update.getString("old_human_pid");
  299. }
  300. public static String nhp(JSONObject update) {
  301. return update.getString("new_human_pid");
  302. }
  303. public static void putOld(Map<String, String> update, String pid) {
  304. update.put("old_human_pid", pid);
  305. }
  306. public static void putNew(Map<String, String> update, String pid) {
  307. update.put("new_human_pid", pid);
  308. }
  309. public static boolean valid(Map<String, String> update) {
  310. return update.containsKey("old_human_pid") && update.containsKey("new_human_pid");
  311. }
  312. }
  313. }