|
@@ -18,6 +18,8 @@ import com.winhc.utils.SchemaInit;
|
|
|
import lombok.AllArgsConstructor;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import lombok.val;
|
|
|
+import org.apache.commons.io.FileUtils;
|
|
|
+import org.apache.commons.io.LineIterator;
|
|
|
import org.neo4j.driver.Driver;
|
|
|
import org.neo4j.driver.Session;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
@@ -31,6 +33,7 @@ import java.io.UnsupportedEncodingException;
|
|
|
import java.util.*;
|
|
|
import java.util.concurrent.ExecutionException;
|
|
|
import java.util.concurrent.TimeoutException;
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
|
|
/**
|
|
|
* @author π
|
|
@@ -60,13 +63,13 @@ public class PersonMergeIncremnetTask {
|
|
|
//TODO 启动合并任务
|
|
|
@Scheduled(cron = "00 30 01 * * ?")
|
|
|
//@Scheduled(cron = "*/20 * * * * ?")
|
|
|
- //@Scheduled(cron = "50 36 19 * * ?")
|
|
|
+ //@Scheduled(cron = "50 57 16 * * ?")
|
|
|
public void mergePersonScheduled() throws UnsupportedEncodingException {
|
|
|
|
|
|
long start = System.currentTimeMillis();
|
|
|
try {
|
|
|
MergeParam param = initParams();
|
|
|
- if(querySussessStatus(param)){
|
|
|
+ if (querySussessStatus(param)) {
|
|
|
mergeAndExport(param);
|
|
|
sendKafka(param);
|
|
|
startJob(param);
|
|
@@ -83,12 +86,30 @@ public class PersonMergeIncremnetTask {
|
|
|
log.info("MergePersonScheduled end | cost:{} | !", (System.currentTimeMillis() - start));
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+ private boolean updateMysqlStatus(String status, String id) {
|
|
|
+ boolean flag = false;
|
|
|
+ int updateResult = jdbcTemplate.update("update prism1.node_relation_success_status set status = " + status + " , update_time = now() where id =" + id);
|
|
|
+ if (updateResult > 0) {
|
|
|
+ flag = true;
|
|
|
+ }
|
|
|
+ return flag;
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean queryMysqlStatus(String status, String id) {
|
|
|
+ boolean flag = false;
|
|
|
+ List<Map<String, Object>> list = jdbcTemplate.queryForList("select id,status,topic_type,create_time,update_time from prism1.node_relation_success_status where status >= " + status + " and id = " + id);
|
|
|
+ if (list.size() > 0) {
|
|
|
+ flag = true;
|
|
|
+ }
|
|
|
+ return flag;
|
|
|
+ }
|
|
|
+
|
|
|
private boolean querySussessStatus(MergeParam param) throws UnsupportedEncodingException, InterruptedException {
|
|
|
boolean flag = false;
|
|
|
ArrayList<Integer> hours = new ArrayList<>();
|
|
|
while (true) {
|
|
|
- List<Map<String, Object>> list = jdbcTemplate.queryForList("select id,status,topic_type,create_time from node_relation_success_status where status = '1' and id = " + param.getLastPartition());
|
|
|
- if (list.size() > 0) {
|
|
|
+ if (queryMysqlStatus("1", param.getLastPartition())) {
|
|
|
log.info("start MergePersonScheduled !!!");
|
|
|
if (!CompanyUtils.isWindows()) {
|
|
|
dingUtils.send("MergePersonScheduled start !!!");
|
|
@@ -96,10 +117,10 @@ public class PersonMergeIncremnetTask {
|
|
|
flag = true;
|
|
|
break;
|
|
|
}
|
|
|
- Thread.sleep(60*1000);
|
|
|
+ Thread.sleep(60 * 1000);
|
|
|
log.info("waiting success status !!!");
|
|
|
int hour = DateUtil.getCurrentHours();
|
|
|
- if( hour >= 8 && !hours.contains(hour)){
|
|
|
+ if (hour >= 8 && !hours.contains(hour)) {
|
|
|
hours.add(hour);
|
|
|
log.info("上游阻塞告警 !!!!!!");
|
|
|
if (!CompanyUtils.isWindows()) {
|
|
@@ -108,7 +129,7 @@ public class PersonMergeIncremnetTask {
|
|
|
}
|
|
|
|
|
|
//超时退出
|
|
|
- if(DateUtil.getCurrentHours() >= 22){
|
|
|
+ if (DateUtil.getCurrentHours() >= 22) {
|
|
|
log.info("超时程序主动退出 !!!!!!");
|
|
|
if (!CompanyUtils.isWindows()) {
|
|
|
dingUtils.send("超时程序主动退出 !!!!!!");
|
|
@@ -119,7 +140,7 @@ public class PersonMergeIncremnetTask {
|
|
|
return flag;
|
|
|
}
|
|
|
|
|
|
- private void sendKafka(MergeParam param) throws InterruptedException, ExecutionException, TimeoutException {
|
|
|
+ private void sendKafka(MergeParam param) throws InterruptedException, ExecutionException, TimeoutException, IOException {
|
|
|
//移除人员发送kafka
|
|
|
loadCSVSendKafka(param.getPathPre(), param.getDeletedPath(), param.getTopic(), "1");
|
|
|
//保证先移除,再更新
|
|
@@ -128,6 +149,8 @@ public class PersonMergeIncremnetTask {
|
|
|
loadCSVSendKafka(param.getPathPre(), param.getIncrPath(), param.getTopic(), "0");
|
|
|
//合并人员发送kafka
|
|
|
//loadCSVSendKafka(param.getPathPre(), param.getMergePath(), param.getTopic(), "0");
|
|
|
+ //更新job运行阶段
|
|
|
+ updateMysqlStatus("3", param.getLastPartition());
|
|
|
}
|
|
|
|
|
|
private void mergeAndExport(MergeParam param) {
|
|
@@ -145,6 +168,8 @@ public class PersonMergeIncremnetTask {
|
|
|
deletedPersonLabel(Constant.合并, param, session);
|
|
|
deletedPersonLabel(Constant.删除, param, session);
|
|
|
session.close();
|
|
|
+ //更新job运行阶段
|
|
|
+ updateMysqlStatus("2", param.getLastPartition());
|
|
|
}
|
|
|
|
|
|
public MergeParam initParams() {
|
|
@@ -205,6 +230,8 @@ public class PersonMergeIncremnetTask {
|
|
|
if (!CompanyUtils.isWindows()) {
|
|
|
log.error("startJob job run error : \n" + "回流job失败" + "\n!!!!!!! ");
|
|
|
dingUtils.send("startJob job run error : \n" + "回流job失败" + "\n!!!!!!! ");
|
|
|
+ //更新job运行阶段
|
|
|
+ updateMysqlStatus("9", param.getLastPartition());
|
|
|
return;
|
|
|
}
|
|
|
}
|
|
@@ -213,38 +240,46 @@ public class PersonMergeIncremnetTask {
|
|
|
Thread.sleep(2 * 60 * 1000);
|
|
|
} else {
|
|
|
log.info("startJob end | cost:{} | !", (System.currentTimeMillis() - start));
|
|
|
+ //更新job运行阶段
|
|
|
+ updateMysqlStatus("4", param.getLastPartition());
|
|
|
return;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void loadCSVSendKafka(String pre, String path, String topic, String flag) throws InterruptedException, ExecutionException, TimeoutException {
|
|
|
+ private void loadCSVSendKafka(String pre, String path, String topic, String flag) throws InterruptedException, ExecutionException, TimeoutException, IOException {
|
|
|
log.info("loadCSVSendKafka | flag:{} | start !", flag);
|
|
|
long start = System.currentTimeMillis();
|
|
|
- CsvData data = CsvUtil.getReader().read(FileUtil.file(pre + path));
|
|
|
+ LineIterator it = FileUtils.lineIterator(FileUtil.file(pre + path), "UTF-8");
|
|
|
int i = 0;
|
|
|
- for (CsvRow csvRow : data.getRows()) {
|
|
|
- ++i;
|
|
|
- if (i == 1) continue;
|
|
|
- Map<String, String> map = new HashMap<>();
|
|
|
- List<String> list = csvRow.getRawList();
|
|
|
- if (list.size() < 2) continue;
|
|
|
- String time = DateUtil.formatDate_YYYY_MM_DD_HH_MM_SS(new Date());
|
|
|
- map.put("create_time", time);
|
|
|
- map.put("update_time", time);
|
|
|
- map.put("person_id", list.get(0).replaceAll("\"\"", "\""));
|
|
|
- map.put("person_name", list.get(1).replaceAll("\"\"", "\""));
|
|
|
- if ("0".equals(flag)) {//合并
|
|
|
- map.put("company_id", list.get(2).replaceAll("\"\"", "\""));
|
|
|
- map.put("deleted", list.get(3).replaceAll("\"\"", "\""));
|
|
|
- map.put("label", list.get(4).replaceAll("\"\"", "\""));
|
|
|
- } else {//删除
|
|
|
- map.put("company_id", "");
|
|
|
- map.put("deleted", "9");
|
|
|
- map.put("label", "");
|
|
|
+ try {
|
|
|
+ while (it.hasNext()) {
|
|
|
+ String line = it.nextLine();
|
|
|
+ ++i;
|
|
|
+ if (i == 1) continue;
|
|
|
+ Map<String, String> map = new HashMap<>();
|
|
|
+ List<String> list = Arrays.asList(line.split("\",\"", -1)).stream()
|
|
|
+ .map(x -> x.replaceAll("\"", "")).collect(Collectors.toList());
|
|
|
+ if (list.size() < 2) continue;
|
|
|
+ String time = DateUtil.formatDate_YYYY_MM_DD_HH_MM_SS(new Date());
|
|
|
+ map.put("create_time", time);
|
|
|
+ map.put("update_time", time);
|
|
|
+ map.put("person_id", list.get(0));
|
|
|
+ map.put("person_name", list.get(1));
|
|
|
+ if ("0".equals(flag)) {//合并
|
|
|
+ map.put("company_id", list.get(2));
|
|
|
+ map.put("deleted", list.get(3));
|
|
|
+ map.put("label", list.get(4));
|
|
|
+ } else {//删除
|
|
|
+ map.put("company_id", "");
|
|
|
+ map.put("deleted", "9");
|
|
|
+ map.put("label", "");
|
|
|
+ }
|
|
|
+ String message = JSON.toJSONString(map);
|
|
|
+ kafkaProduce.produce(topic, message);
|
|
|
}
|
|
|
- String message = JSON.toJSONString(map);
|
|
|
- kafkaProduce.produce(topic, message);
|
|
|
+ } finally {
|
|
|
+ LineIterator.closeQuietly(it);
|
|
|
}
|
|
|
log.info("loadCSVSendKafka | flag:{} | size:{} | cost:{} | end !", flag, i - 1, (System.currentTimeMillis() - start));
|
|
|
}
|