|
@@ -52,7 +52,7 @@ public class DynamicPersonIdUpdateJob {
|
|
|
|
|
|
@KafkaListener(id = "update-person-id",
|
|
|
topics = "inc_human_pid_change",
|
|
|
- groupId = "update_person_group",
|
|
|
+ groupId = "xjk_group",
|
|
|
containerFactory = "smallContainerFactory")
|
|
|
public void updatePersonId(List<String> payloads) {
|
|
|
List<Tuple<Map<String, String>, JSONObject>> params = payloads
|
|
@@ -74,7 +74,7 @@ public class DynamicPersonIdUpdateJob {
|
|
|
CompletableFuture.allOf(params.stream().map(this::update).toArray(CompletableFuture[]::new)).get();
|
|
|
} catch (InterruptedException | ExecutionException e) {
|
|
|
log.error("waiting processor error", e);
|
|
|
- dingTalkService.error("%s\n%s", e.getMessage(), ThrowableUtils.getStackTraceByPn(e, "com.winhc.bigdata.task"));
|
|
|
+ dingTalkService.error("{}\n{}", e.getMessage(), ThrowableUtils.getStackTraceByPn(e, "com.winhc.bigdata.task"));
|
|
|
params.stream().map(Tuple::v2).map(j -> j.retry()).forEach(s -> kafkaTemplate.send("inc_human_pid_change", s));
|
|
|
}
|
|
|
}
|
|
@@ -92,7 +92,7 @@ public class DynamicPersonIdUpdateJob {
|
|
|
log.warn(s);
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
- dingTalkService.error("%s\n%s", e.getMessage(), ThrowableUtils.getStackTraceByPn(e, "com.winhc.bigdata.task"));
|
|
|
+ dingTalkService.error("{}\n{}", e.getMessage(), ThrowableUtils.getStackTraceByPn(e, "com.winhc.bigdata.task"));
|
|
|
}
|
|
|
/* if (s.contains("version_conflicts") && s.contains("\"version_conflicts\": 0")) {
|
|
|
return;
|
|
@@ -122,7 +122,7 @@ public class DynamicPersonIdUpdateJob {
|
|
|
CompletableFuture.allOf(do0, do9).get();
|
|
|
} catch (InterruptedException | ExecutionException e) {
|
|
|
log.error("waiting processor error", e);
|
|
|
- dingTalkService.error("%s\n%s", e.getMessage(), ThrowableUtils.getStackTraceByPn(e, "com.winhc.bigdata.task"));
|
|
|
+ dingTalkService.error("{}\n{}", e.getMessage(), ThrowableUtils.getStackTraceByPn(e, "com.winhc.bigdata.task"));
|
|
|
//TODO 执行失败需要重新入队,待验证
|
|
|
}
|
|
|
}
|
|
@@ -163,7 +163,7 @@ public class DynamicPersonIdUpdateJob {
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
log.error("execute update error", e);
|
|
|
- dingTalkService.error("%s\n%s", e.getMessage(), ThrowableUtils.getStackTraceByPn(e, "com.winhc.bigdata.task"));
|
|
|
+ dingTalkService.error("{}\n{}", e.getMessage(), ThrowableUtils.getStackTraceByPn(e, "com.winhc.bigdata.task"));
|
|
|
}
|
|
|
}, TASK_FJ_POOL);
|
|
|
}
|