123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115 |
- package com.winhc.task.job;
- import com.alibaba.fastjson.JSONObject;
- import com.google.common.util.concurrent.ThreadFactoryBuilder;
- import com.winhc.task.common.Constant;
- import com.winhc.task.framework.es.EsFastScan;
- import lombok.AllArgsConstructor;
- import lombok.extern.slf4j.Slf4j;
- import org.elasticsearch.action.bulk.BulkRequest;
- import org.elasticsearch.action.update.UpdateRequest;
- import org.elasticsearch.client.RestHighLevelClient;
- import org.elasticsearch.common.xcontent.XContentType;
- import org.elasticsearch.search.SearchHit;
- import org.springframework.stereotype.Component;
- import java.io.IOException;
- import java.util.Arrays;
- import java.util.Map;
- import java.util.concurrent.ArrayBlockingQueue;
- import java.util.concurrent.RejectedExecutionException;
- import java.util.concurrent.ThreadPoolExecutor;
- import java.util.concurrent.TimeUnit;
- import java.util.function.Consumer;
- @Slf4j
- @Component
- @AllArgsConstructor
- public class EsScanJobSumAggPlus {
- private RestHighLevelClient restHighLevelClient;
- public void start() {
- Arrays.asList("x-CxhuRDT-6fNo9SBkzEFw", "kVrnS8W5RqilOiT_ONjZxg", "XK8AQhV3Ry2y72KEnEzNHA")
- .forEach(routing -> {
- new Thread(() -> {
- synData(routing);
- }).start();
- });
- // Arrays.asList("qzu3fuJTQD-OEy1-ppXcRw", "EU6niZMWS6uGhpicyaWi9w", "DGLNkh83SXa-Ry66dvwszA")
- // .forEach(routing -> executorService.submit(() -> synData(routing)));
- // Arrays.asList("qzu3fuJTQD-OEy1-ppXcRw", "EU6niZMWS6uGhpicyaWi9w", "DGLNkh83SXa-Ry66dvwszA")
- // .parallelStream().forEach(this::synData);
- }
- public void synData(String routing) {
- //ThreadPoolExecutor executorService = ThreadPoolFactory.getThreadPoolExecutor();
- int poolSize = 20;
- ArrayBlockingQueue<Runnable> objects = new ArrayBlockingQueue<>(poolSize * 2);
- ThreadPoolExecutor executorService = new ThreadPoolExecutor(
- poolSize, poolSize,
- 0L, TimeUnit.MILLISECONDS,
- objects,
- new ThreadFactoryBuilder().setNameFormat("ScanEs-pool").build(),
- (r, executor) -> {
- try {
- executor.getQueue().put(r);
- } catch (InterruptedException e) {
- throw new RejectedExecutionException("interrupted", e);
- }
- }
- );
- String dsl = "{\n" +
- " \"query\": {\n" +
- " \"match_all\": {}\n" +
- " }\n" +
- "}";
- Consumer<SearchHit[]> func = list -> {
- BulkRequest bulkRequest = new BulkRequest();
- bulkRequest.timeout("10m");
- Arrays.stream(list).forEach(d -> {
- //String id = d.getId();
- Map<String, Object> sourceAsMap = d.getSourceAsMap();
- //System.out.println(sourceAsMap.toString());
- String sum_type = sourceAsMap.get("sum_type").toString();
- String tn = sourceAsMap.get("tn").toString();
- String id = sourceAsMap.get("id").toString();
- Map<String, Integer> m = (Map<String, Integer>) sourceAsMap.get("summary");
- m.forEach((k1, v1) -> {
- String index = k1.split("_del_")[0];
- if (Constant.indexs.contains(index)) {
- String pre = "";
- if ("person".equalsIgnoreCase(sum_type)) {
- pre = "person_";
- }
- UpdateRequest request = new UpdateRequest("ng_rt_summary_" + pre + index, "_doc", id);
- request.docAsUpsert(true);
- JSONObject j1 = new JSONObject();
- j1.put(k1, v1);
- JSONObject j2 = new JSONObject();
- j2.put("summary", j1);
- //j2.put("company_id", id);
- request.doc(JSONObject.toJSONString(j2), XContentType.JSON);
- request.retryOnConflict(6);
- bulkRequest.add(request);
- }
- });
- });
- executorService.submit(() -> {
- try {
- restHighLevelClient.bulk(bulkRequest);
- } catch (IOException e) {
- e.printStackTrace();
- }
- });
- };
- new EsFastScan(restHighLevelClient, func, "test_tmp_xf_sum_v9", "_doc", dsl, routing).scan();
- }
- }
|