package com.winhc.phoenix.example.task; import com.mongodb.client.MongoCollection; import com.winhc.phoenix.example.configuration.DynamicElasticSearchClient; import com.winhc.phoenix.example.framework.es.EsFastScan; import lombok.AllArgsConstructor; import org.bson.Document; import org.elasticsearch.search.SearchHit; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.AsyncResult; import org.springframework.stereotype.Component; import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.concurrent.Future; import java.util.function.Consumer; import java.util.stream.Collectors; /** * @author: XuJiakai * 2021/5/18 13:56 */ @Component @AllArgsConstructor public class AsyncTask { private final DynamicElasticSearchClient dynamicElasticSearchClient; private final MongoTemplate mongoTemplate; @Async public Future pullCreditPunishmentTouch() { String dsl = "{\n" + " \"_source\": [\"case_no\", \"name\", \"rowkey\"],\n" + " \"query\": {\n" + " \"bool\": {\n" + " \"filter\": {\n" + " \"bool\": {\n" + " \"should\": [\n" + " {\n" + " \"bool\": {\n" + " \"must\": [\n" + " {\"terms\": {\n" + " \"label\": [\n" + " \"失信被执行人\",\n" + " \"被执行人\",\n" + " \"终本案件\"\n" + " ]\n" + " }}, {\n" + " \"term\": {\n" + " \"deleted\": {\n" + " \"value\": \"0\"\n" + " }\n" + " }\n" + " }\n" + " ]\n" + " }\n" + " }, {\n" + " \"bool\": {\n" + " \"must\": [\n" + " {\"term\": {\n" + " \"deleted\": {\n" + " \"value\": \"0\"\n" + " }\n" + " }}, {\n" + " \"term\": {\n" + " \"label\": {\n" + " \"value\": \"限制高消费\"\n" + " }\n" + " }\n" + " }, {\n" + " \"script\": {\n" + " \"script\": \"if(doc['keyno'].value==null)return false; else return !(doc['keyno'].value.length()==32)\"\n" + " }\n" + " }\n" + " ]\n" + " }\n" + " }\n" + " ]\n" + " }\n" + " }\n" + " }\n" + " }\n" + "}"; try { MongoCollection person = mongoTemplate.getCollection("credit_punishment_case_info_xjk"); Consumer func = list -> { List li = Arrays.stream(list).map(d -> { String id = d.getId(); Map sourceAsMap = d.getSourceAsMap(); Object case_no = sourceAsMap.get("case_no"); Object name = sourceAsMap.get("name"); Object rowkey = sourceAsMap.get("rowkey"); Document document = new Document(); document.put("doc_id", id); document.put("case_no", case_no); document.put("name", name); document.put("rowkey", rowkey); return document; }).collect(Collectors.toList()); person.insertMany(li); }; new EsFastScan(dynamicElasticSearchClient.getRestHighLevelClient(), func, "credit_punishment_case_info_v1", "_doc", dsl).scan(); return new AsyncResult<>(true); } catch (Exception e) { return new AsyncResult<>(false); } } }