EsScanJobSumAggPlus.java 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. package com.winhc.task.job;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.google.common.util.concurrent.ThreadFactoryBuilder;
  4. import com.winhc.task.common.Constant;
  5. import com.winhc.task.framework.es.EsFastScan;
  6. import lombok.AllArgsConstructor;
  7. import lombok.extern.slf4j.Slf4j;
  8. import org.elasticsearch.action.bulk.BulkRequest;
  9. import org.elasticsearch.action.update.UpdateRequest;
  10. import org.elasticsearch.client.RestHighLevelClient;
  11. import org.elasticsearch.common.xcontent.XContentType;
  12. import org.elasticsearch.search.SearchHit;
  13. import org.springframework.stereotype.Component;
  14. import java.io.IOException;
  15. import java.util.Arrays;
  16. import java.util.Map;
  17. import java.util.concurrent.ArrayBlockingQueue;
  18. import java.util.concurrent.RejectedExecutionException;
  19. import java.util.concurrent.ThreadPoolExecutor;
  20. import java.util.concurrent.TimeUnit;
  21. import java.util.function.Consumer;
  22. @Slf4j
  23. @Component
  24. @AllArgsConstructor
  25. public class EsScanJobSumAggPlus {
  26. private RestHighLevelClient restHighLevelClient;
  27. public void start() {
  28. Arrays.asList("x-CxhuRDT-6fNo9SBkzEFw", "kVrnS8W5RqilOiT_ONjZxg", "XK8AQhV3Ry2y72KEnEzNHA")
  29. .forEach(routing -> {
  30. new Thread(() -> {
  31. synData(routing);
  32. }).start();
  33. });
  34. // Arrays.asList("qzu3fuJTQD-OEy1-ppXcRw", "EU6niZMWS6uGhpicyaWi9w", "DGLNkh83SXa-Ry66dvwszA")
  35. // .forEach(routing -> executorService.submit(() -> synData(routing)));
  36. // Arrays.asList("qzu3fuJTQD-OEy1-ppXcRw", "EU6niZMWS6uGhpicyaWi9w", "DGLNkh83SXa-Ry66dvwszA")
  37. // .parallelStream().forEach(this::synData);
  38. }
  39. public void synData(String routing) {
  40. //ThreadPoolExecutor executorService = ThreadPoolFactory.getThreadPoolExecutor();
  41. int poolSize = 20;
  42. ArrayBlockingQueue<Runnable> objects = new ArrayBlockingQueue<>(poolSize * 2);
  43. ThreadPoolExecutor executorService = new ThreadPoolExecutor(
  44. poolSize, poolSize,
  45. 0L, TimeUnit.MILLISECONDS,
  46. objects,
  47. new ThreadFactoryBuilder().setNameFormat("ScanEs-pool").build(),
  48. (r, executor) -> {
  49. try {
  50. executor.getQueue().put(r);
  51. } catch (InterruptedException e) {
  52. throw new RejectedExecutionException("interrupted", e);
  53. }
  54. }
  55. );
  56. String dsl = "{\n" +
  57. " \"query\": {\n" +
  58. " \"match_all\": {}\n" +
  59. " }\n" +
  60. "}";
  61. Consumer<SearchHit[]> func = list -> {
  62. BulkRequest bulkRequest = new BulkRequest();
  63. bulkRequest.timeout("10m");
  64. Arrays.stream(list).forEach(d -> {
  65. //String id = d.getId();
  66. Map<String, Object> sourceAsMap = d.getSourceAsMap();
  67. //System.out.println(sourceAsMap.toString());
  68. String sum_type = sourceAsMap.get("sum_type").toString();
  69. String tn = sourceAsMap.get("tn").toString();
  70. String id = sourceAsMap.get("id").toString();
  71. Map<String, Integer> m = (Map<String, Integer>) sourceAsMap.get("summary");
  72. m.forEach((k1, v1) -> {
  73. String index = k1.split("_del_")[0];
  74. if (Constant.indexs.contains(index)) {
  75. String pre = "";
  76. if ("person".equalsIgnoreCase(sum_type)) {
  77. pre = "person_";
  78. }
  79. UpdateRequest request = new UpdateRequest("ng_rt_summary_" + pre + index, "_doc", id);
  80. request.docAsUpsert(true);
  81. JSONObject j1 = new JSONObject();
  82. j1.put(k1, v1);
  83. JSONObject j2 = new JSONObject();
  84. j2.put("summary", j1);
  85. //j2.put("company_id", id);
  86. request.doc(JSONObject.toJSONString(j2), XContentType.JSON);
  87. request.retryOnConflict(6);
  88. bulkRequest.add(request);
  89. }
  90. });
  91. });
  92. executorService.submit(() -> {
  93. try {
  94. restHighLevelClient.bulk(bulkRequest);
  95. } catch (IOException e) {
  96. e.printStackTrace();
  97. }
  98. });
  99. };
  100. new EsFastScan(restHighLevelClient, func, "test_tmp_xf_sum_v9", "_doc", dsl, routing).scan();
  101. }
  102. }