|
@@ -10,7 +10,6 @@ import lombok.Data;
|
|
|
import lombok.RequiredArgsConstructor;
|
|
|
import lombok.experimental.ExtensionMethod;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
-import lombok.var;
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
import org.elasticsearch.common.TriFunction;
|
|
|
import org.elasticsearch.common.collect.Tuple;
|
|
@@ -18,7 +17,6 @@ import org.frameworkset.elasticsearch.bulk.BulkProcessor;
|
|
|
import org.frameworkset.elasticsearch.client.ClientInterface;
|
|
|
import org.frameworkset.elasticsearch.entity.ESDatas;
|
|
|
import org.frameworkset.elasticsearch.entity.MetaMap;
|
|
|
-import org.frameworkset.elasticsearch.entity.SearchHit;
|
|
|
import org.frameworkset.util.CollectionUtils;
|
|
|
import org.frameworkset.util.ObjectUtils;
|
|
|
import org.springframework.kafka.annotation.KafkaListener;
|
|
@@ -116,7 +114,7 @@ public class DynamicPersonIdUpdateJob {
|
|
|
public void createPersonId(List<String> payloads) {
|
|
|
Map<String, List<JSONObject>> collect = payloads.toJSONStream()
|
|
|
.collect(Collectors.groupingBy(j -> j.getString("flag").orDefault("0")));
|
|
|
- System.out.println(collect);
|
|
|
+// System.out.println(collect);
|
|
|
batchFromCreate(collect);
|
|
|
}
|
|
|
|
|
@@ -130,6 +128,9 @@ public class DynamicPersonIdUpdateJob {
|
|
|
CompletableFuture.allOf(do0, do9, do8).get();
|
|
|
} catch (InterruptedException | ExecutionException e) {
|
|
|
log.error("waiting processor error", e);
|
|
|
+ if (params.containsKey("8")) {
|
|
|
+ log.error("xhasjkdfhjk:{}", params.get("8"));
|
|
|
+ }
|
|
|
dingTalkService.error("{}\n{}", e.getMessage(), ThrowableUtils.getStackTraceByPn(e, "com.winhc.bigdata.task"));
|
|
|
//TODO 执行失败需要重新入队,待验证
|
|
|
}
|
|
@@ -159,7 +160,7 @@ public class DynamicPersonIdUpdateJob {
|
|
|
}
|
|
|
List<Tuple<String, String>> crates = entity.stream().map(j -> j.toCreateTuple()).collect(Collectors.toList());
|
|
|
String createUpdate = function.apply(rowkey, tn, crates);
|
|
|
- System.out.println(createUpdate);
|
|
|
+// System.out.println(createUpdate);
|
|
|
|
|
|
return CompletableFuture.runAsync(() -> {
|
|
|
try {
|
|
@@ -196,6 +197,9 @@ public class DynamicPersonIdUpdateJob {
|
|
|
}
|
|
|
|
|
|
public void scan(ScanParams params) {
|
|
|
+ if (StringUtils.isEmpty(params.keyno) || StringUtils.isEmpty(params.tn) || StringUtils.isEmpty(params.validId)) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
restClient.scroll("winhc-company-dynamic/_search", "scrollQuery", "1m", params.toMap(), MetaMap.class, (response, handlerInfo) -> {
|
|
|
Optional
|
|
|
.ofNullable(response)
|
|
@@ -209,7 +213,7 @@ public class DynamicPersonIdUpdateJob {
|
|
|
|
|
|
|
|
|
private void remove8(String id, String index, String keyno, String validId, Map<String, Object> data) {
|
|
|
- dingTalkService.send(JSON.toJSONString(data));
|
|
|
+// dingTalkService.send(JSON.toJSONString(data));
|
|
|
JSONObject jData = new JSONObject(data);
|
|
|
String dynamic_info = jData.getString("dynamic_info");
|
|
|
if (StringUtils.isEmpty(dynamic_info)) return;
|
|
@@ -223,7 +227,14 @@ public class DynamicPersonIdUpdateJob {
|
|
|
List<JSONObject> collect = content.toJavaList(JSONObject.class).stream().filter(j -> {
|
|
|
JSONObject entity = j.getJSONObject("entity");
|
|
|
if (!keyno.equals(entity.getString("keyno"))) return true;
|
|
|
- return !j.anyMatchByNestedObject(validId, "keyno", "before", "after");
|
|
|
+ try {
|
|
|
+ return !j.anyMatchByNestedObject(validId, "keyno", "before", "after");
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("jasdjf");
|
|
|
+ log.error("data:{}\n{}", jData.toJSONString(), e.getMessage());
|
|
|
+ log.error(e.getMessage(), e);
|
|
|
+ return true;
|
|
|
+ }
|
|
|
}).collect(Collectors.toList());
|
|
|
if (collect.isEmpty()) {
|
|
|
Map<String, Object> map = new HashMap<>();
|