|
@@ -60,9 +60,12 @@ public class PersonMergeIncremnetTask {
|
|
//TODO 启动合并任务
|
|
//TODO 启动合并任务
|
|
@Scheduled(cron = "00 10 00 * * ?")
|
|
@Scheduled(cron = "00 10 00 * * ?")
|
|
//@Scheduled(cron = "*/20 * * * * ?")
|
|
//@Scheduled(cron = "*/20 * * * * ?")
|
|
- //@Scheduled(cron = "00 13 15 * * ?")
|
|
|
|
|
|
+ //@Scheduled(cron = "30 20 22 * * ?")
|
|
public void mergePersonScheduled() throws UnsupportedEncodingException {
|
|
public void mergePersonScheduled() throws UnsupportedEncodingException {
|
|
- log.info("start mergePersonScheduled !");
|
|
|
|
|
|
+ log.info("start MergePersonScheduled !!!");
|
|
|
|
+ if (!CompanyUtils.isWindows()) {
|
|
|
|
+ dingUtils.send("MergePersonScheduled start !!!");
|
|
|
|
+ }
|
|
long start = System.currentTimeMillis();
|
|
long start = System.currentTimeMillis();
|
|
try {
|
|
try {
|
|
MergeParam param = initParams();
|
|
MergeParam param = initParams();
|
|
@@ -70,12 +73,15 @@ public class PersonMergeIncremnetTask {
|
|
sendKafka(param);
|
|
sendKafka(param);
|
|
startJob(param);
|
|
startJob(param);
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
- log.error("mergePersonScheduled error | message:{} | .", e.getMessage());
|
|
|
|
|
|
+ log.error("MergePersonScheduled error | message:{} | .", e.getMessage());
|
|
if (!CompanyUtils.isWindows()) {
|
|
if (!CompanyUtils.isWindows()) {
|
|
- dingUtils.send("mergePersonScheduled job run error : \n" + e.getMessage() + "\n!!!!!!! ");
|
|
|
|
|
|
+ dingUtils.send("MergePersonScheduled job run error : \n" + e.getMessage() + "\n!!!!!!! ");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- log.info("mergePersonScheduled end | cost:{} | !", (System.currentTimeMillis() - start));
|
|
|
|
|
|
+ if (!CompanyUtils.isWindows()) {
|
|
|
|
+ dingUtils.send("MergePersonScheduled end !!!");
|
|
|
|
+ }
|
|
|
|
+ log.info("MergePersonScheduled end | cost:{} | !", (System.currentTimeMillis() - start));
|
|
}
|
|
}
|
|
|
|
|
|
private void sendKafka(MergeParam param) throws InterruptedException, ExecutionException, TimeoutException {
|
|
private void sendKafka(MergeParam param) throws InterruptedException, ExecutionException, TimeoutException {
|
|
@@ -153,13 +159,14 @@ public class PersonMergeIncremnetTask {
|
|
long count1 = query.values().stream().filter(TaskFlowEnum.FAILURE::equals).count();
|
|
long count1 = query.values().stream().filter(TaskFlowEnum.FAILURE::equals).count();
|
|
if (count1 != 0) {
|
|
if (count1 != 0) {
|
|
if (!CompanyUtils.isWindows()) {
|
|
if (!CompanyUtils.isWindows()) {
|
|
- log.info("startJob job run error : \n" + "回流job失败" + "\n!!!!!!! ");
|
|
|
|
|
|
+ log.error("startJob job run error : \n" + "回流job失败" + "\n!!!!!!! ");
|
|
dingUtils.send("startJob job run error : \n" + "回流job失败" + "\n!!!!!!! ");
|
|
dingUtils.send("startJob job run error : \n" + "回流job失败" + "\n!!!!!!! ");
|
|
|
|
+ return;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
if (count != 0) {
|
|
if (count != 0) {
|
|
- Thread.sleep(10000);
|
|
|
|
|
|
+ Thread.sleep(2*60*1000);
|
|
} else {
|
|
} else {
|
|
log.info("startJob end | cost:{} | !", (System.currentTimeMillis() - start));
|
|
log.info("startJob end | cost:{} | !", (System.currentTimeMillis() - start));
|
|
return;
|
|
return;
|
|
@@ -185,10 +192,12 @@ public class PersonMergeIncremnetTask {
|
|
map.put("person_name", list.get(1).replaceAll("\"\"", "\""));
|
|
map.put("person_name", list.get(1).replaceAll("\"\"", "\""));
|
|
if ("0".equals(flag)) {//合并
|
|
if ("0".equals(flag)) {//合并
|
|
map.put("company_id", list.get(2).replaceAll("\"\"", "\""));
|
|
map.put("company_id", list.get(2).replaceAll("\"\"", "\""));
|
|
- map.put("deleted", "0");
|
|
|
|
|
|
+ map.put("deleted", list.get(3).replaceAll("\"\"", "\""));
|
|
|
|
+ map.put("label", list.get(4).replaceAll("\"\"", "\""));
|
|
} else {//删除
|
|
} else {//删除
|
|
map.put("company_id", "");
|
|
map.put("company_id", "");
|
|
- map.put("deleted", "1");
|
|
|
|
|
|
+ map.put("deleted", "9");
|
|
|
|
+ map.put("label", "");
|
|
}
|
|
}
|
|
String message = JSON.toJSONString(map);
|
|
String message = JSON.toJSONString(map);
|
|
kafkaProduce.produce(topic, message);
|
|
kafkaProduce.produce(topic, message);
|
|
@@ -213,7 +222,7 @@ public class PersonMergeIncremnetTask {
|
|
log.info("exportMergePerson2CSV start!");
|
|
log.info("exportMergePerson2CSV start!");
|
|
long start = System.currentTimeMillis();
|
|
long start = System.currentTimeMillis();
|
|
final String cql2 = "CALL apoc.export.csv.query('MATCH (person:合并" + date + ")-[r]-(company:企业) \n" +
|
|
final String cql2 = "CALL apoc.export.csv.query('MATCH (person:合并" + date + ")-[r]-(company:企业) \n" +
|
|
- "RETURN person.person_id as person_id,person.name as person_name,company.company_id as company_id', \n" +
|
|
|
|
|
|
+ "RETURN person.person_id as person_id,person.name as person_name,company.company_id as company_id,r.deleted as deleted,type(r) as label', \n" +
|
|
"'" + MERGE_NAME_PATH + "', \n" +
|
|
"'" + MERGE_NAME_PATH + "', \n" +
|
|
"{batchSize:10000,parallel:false,retries:3,iterateList:true}) \n" +
|
|
"{batchSize:10000,parallel:false,retries:3,iterateList:true}) \n" +
|
|
"YIELD file,rows";
|
|
"YIELD file,rows";
|