|
@@ -13,6 +13,7 @@ import lombok.extern.slf4j.Slf4j;
|
|
import org.apache.commons.lang3.StringUtils;
|
|
import org.apache.commons.lang3.StringUtils;
|
|
import org.elasticsearch.common.TriFunction;
|
|
import org.elasticsearch.common.TriFunction;
|
|
import org.elasticsearch.common.collect.Tuple;
|
|
import org.elasticsearch.common.collect.Tuple;
|
|
|
|
+import org.frameworkset.elasticsearch.boot.BBossESStarter;
|
|
import org.frameworkset.elasticsearch.bulk.BulkProcessor;
|
|
import org.frameworkset.elasticsearch.bulk.BulkProcessor;
|
|
import org.frameworkset.elasticsearch.client.ClientInterface;
|
|
import org.frameworkset.elasticsearch.client.ClientInterface;
|
|
import org.frameworkset.elasticsearch.entity.ESDatas;
|
|
import org.frameworkset.elasticsearch.entity.ESDatas;
|
|
@@ -23,6 +24,7 @@ import org.springframework.kafka.annotation.KafkaListener;
|
|
import org.springframework.kafka.core.KafkaTemplate;
|
|
import org.springframework.kafka.core.KafkaTemplate;
|
|
import org.springframework.stereotype.Component;
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
|
|
+import javax.annotation.PostConstruct;
|
|
import java.util.*;
|
|
import java.util.*;
|
|
import java.util.concurrent.CompletableFuture;
|
|
import java.util.concurrent.CompletableFuture;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
@@ -43,11 +45,18 @@ import java.util.stream.Stream;
|
|
@ExtensionMethod(DynamicPersonIdUpdateJob.StringStringMapExt.class)
|
|
@ExtensionMethod(DynamicPersonIdUpdateJob.StringStringMapExt.class)
|
|
public class DynamicPersonIdUpdateJob {
|
|
public class DynamicPersonIdUpdateJob {
|
|
public static ForkJoinPool TASK_FJ_POOL = new ForkJoinPool(Runtime.getRuntime().availableProcessors() * 3);
|
|
public static ForkJoinPool TASK_FJ_POOL = new ForkJoinPool(Runtime.getRuntime().availableProcessors() * 3);
|
|
- private final ClientInterface restClient;
|
|
|
|
|
|
+ private final ClientInterface cRestClient;
|
|
|
|
+ private final BBossESStarter bBossESStarter;
|
|
|
|
+ private ClientInterface restClient;
|
|
private final KafkaTemplate<String, String> kafkaTemplate;
|
|
private final KafkaTemplate<String, String> kafkaTemplate;
|
|
private final DingTalkService dingTalkService;
|
|
private final DingTalkService dingTalkService;
|
|
private final BulkProcessor processor;
|
|
private final BulkProcessor processor;
|
|
|
|
|
|
|
|
+ @PostConstruct
|
|
|
|
+ public void init() {
|
|
|
|
+ restClient = bBossESStarter.getRestClient();
|
|
|
|
+ }
|
|
|
|
+
|
|
public static <T> Predicate<T> distinctByKey(Function<? super T, ?> keyExtractor) {
|
|
public static <T> Predicate<T> distinctByKey(Function<? super T, ?> keyExtractor) {
|
|
Set<Object> seen = ConcurrentHashMap.newKeySet();
|
|
Set<Object> seen = ConcurrentHashMap.newKeySet();
|
|
return t -> seen.add(keyExtractor.apply(t));
|
|
return t -> seen.add(keyExtractor.apply(t));
|
|
@@ -200,7 +209,7 @@ public class DynamicPersonIdUpdateJob {
|
|
if (StringUtils.isEmpty(params.keyno) || StringUtils.isEmpty(params.tn) || StringUtils.isEmpty(params.validId)) {
|
|
if (StringUtils.isEmpty(params.keyno) || StringUtils.isEmpty(params.tn) || StringUtils.isEmpty(params.validId)) {
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
- restClient.scroll("winhc-company-dynamic/_search", "scrollQuery", "1m", params.toMap(), MetaMap.class, (response, handlerInfo) -> {
|
|
|
|
|
|
+ cRestClient.scroll("winhc-company-dynamic/_search", "scrollQuery", "1m", params.toMap(), MetaMap.class, (response, handlerInfo) -> {
|
|
Optional
|
|
Optional
|
|
.ofNullable(response)
|
|
.ofNullable(response)
|
|
.map(ESDatas::getDatas)
|
|
.map(ESDatas::getDatas)
|