package com.winhc.task.job; import com.alibaba.fastjson.JSONObject; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.winhc.task.common.Constant; import com.winhc.task.framework.es.EsFastScan; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.search.SearchHit; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.Arrays; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @Slf4j @Component @AllArgsConstructor public class EsScanJobSumAggPlus { private RestHighLevelClient restHighLevelClient; public void start() { Arrays.asList("x-CxhuRDT-6fNo9SBkzEFw", "kVrnS8W5RqilOiT_ONjZxg", "XK8AQhV3Ry2y72KEnEzNHA") .forEach(routing -> { new Thread(() -> { synData(routing); }).start(); }); // Arrays.asList("qzu3fuJTQD-OEy1-ppXcRw", "EU6niZMWS6uGhpicyaWi9w", "DGLNkh83SXa-Ry66dvwszA") // .forEach(routing -> executorService.submit(() -> synData(routing))); // Arrays.asList("qzu3fuJTQD-OEy1-ppXcRw", "EU6niZMWS6uGhpicyaWi9w", "DGLNkh83SXa-Ry66dvwszA") // .parallelStream().forEach(this::synData); } public void synData(String routing) { //ThreadPoolExecutor executorService = ThreadPoolFactory.getThreadPoolExecutor(); int poolSize = 20; ArrayBlockingQueue objects = new ArrayBlockingQueue<>(poolSize * 2); ThreadPoolExecutor executorService = new ThreadPoolExecutor( poolSize, poolSize, 0L, TimeUnit.MILLISECONDS, objects, new ThreadFactoryBuilder().setNameFormat("ScanEs-pool").build(), (r, executor) -> { try { executor.getQueue().put(r); } catch (InterruptedException e) { throw new RejectedExecutionException("interrupted", e); } } ); String dsl = "{\n" + " \"query\": {\n" + " \"match_all\": {}\n" + " }\n" + "}"; Consumer func = list -> { BulkRequest bulkRequest = new BulkRequest(); bulkRequest.timeout("10m"); Arrays.stream(list).forEach(d -> { //String id = d.getId(); Map sourceAsMap = d.getSourceAsMap(); //System.out.println(sourceAsMap.toString()); String sum_type = sourceAsMap.get("sum_type").toString(); String tn = sourceAsMap.get("tn").toString(); String id = sourceAsMap.get("id").toString(); Map m = (Map) sourceAsMap.get("summary"); m.forEach((k1, v1) -> { String index = k1.split("_del_")[0]; if (Constant.indexs.contains(index)) { String pre = ""; if ("person".equalsIgnoreCase(sum_type)) { pre = "person_"; } UpdateRequest request = new UpdateRequest("ng_rt_summary_" + pre + index, "_doc", id); request.docAsUpsert(true); JSONObject j1 = new JSONObject(); j1.put(k1, v1); JSONObject j2 = new JSONObject(); j2.put("summary", j1); //j2.put("company_id", id); request.doc(JSONObject.toJSONString(j2), XContentType.JSON); request.retryOnConflict(6); bulkRequest.add(request); } }); }); executorService.submit(() -> { try { restHighLevelClient.bulk(bulkRequest); } catch (IOException e) { e.printStackTrace(); } }); }; new EsFastScan(restHighLevelClient, func, "test_tmp_xf_sum_v9", "_doc", dsl, routing).scan(); } }