Browse Source

统一合并逻辑

xufei 4 năm trước cách đây
mục cha
commit
31c2bc132b

+ 1 - 0
src/main/java/com/winhc/bean/MergeParam.java

@@ -26,6 +26,7 @@ public class MergeParam {
     private String before7DayPartition;
     private String pathPre;
     private String mergePath;
+    private String incrPath;
     private String deletedPath;
 
     @Override

+ 27 - 7
src/main/java/com/winhc/task/PersonMergeIncremnetTask.java

@@ -58,9 +58,9 @@ public class PersonMergeIncremnetTask {
     DingUtils dingUtils;
 
     //TODO 启动合并任务
-    @Scheduled(cron = "00 10 00 * * ?")
+    @Scheduled(cron = "00 30 05 * * ?")
     //@Scheduled(cron = "*/20 * * * * ?")
-    //@Scheduled(cron = "30 20 22 * * ?")
+    //@Scheduled(cron = "40 06 17 * * ?")
     public void mergePersonScheduled() throws UnsupportedEncodingException {
         log.info("start  MergePersonScheduled !!!");
         if (!CompanyUtils.isWindows()) {
@@ -85,21 +85,26 @@ public class PersonMergeIncremnetTask {
     }
 
     private void sendKafka(MergeParam param) throws InterruptedException, ExecutionException, TimeoutException {
-        //加载文件发送kafka
+        //移除人员发送kafka
         loadCSVSendKafka(param.getPathPre(), param.getDeletedPath(), param.getTopic(), "1");
         //保证先移除,再更新
         Thread.sleep(10*1000);
-        loadCSVSendKafka(param.getPathPre(), param.getMergePath(), param.getTopic(), "0");
+        //增量人员发送kafka
+        loadCSVSendKafka(param.getPathPre(), param.getIncrPath(), param.getTopic(), "0");
+        //合并人员发送kafka
+        //loadCSVSendKafka(param.getPathPre(), param.getMergePath(), param.getTopic(), "0");
     }
 
     private void mergeAndExport(MergeParam param) {
         Session session = driver.session();
-        //合并逻辑
+        //合并昨天逻辑
         mergePerson(param.getLastPartition(), session);
-        //导出合并csv
+        //导出昨天合并csv
         exportMergePerson2CSV(param.getLastPartition(), session, param.getMergePath());
-        //导出删除csv
+        //导出昨天删除csv
         exportDeletedPerson2CSV(param.getLastPartition(), session, param.getDeletedPath());
+        //导出昨天新增csv
+        exportIncrPerson2CSV(param.getLastPartition(), session, param.getIncrPath());
         //TODO 删除七天前标签(新增)
         deletedPersonLabel(Constant.新增, param, session);
         deletedPersonLabel(Constant.合并, param, session);
@@ -119,6 +124,7 @@ public class PersonMergeIncremnetTask {
         }
         final String mergePath = "export/merge-person-" + lastPartition + ".csv";
         final String deletedPath = "export/deleted-person-" + lastPartition + ".csv";
+        final String incrPath = "export/incr-person-" + lastPartition + ".csv";
         MergeParam param = MergeParam.builder()
                 .flow(Constant.flow)
                 .taskName(Constant.taskName)
@@ -131,6 +137,7 @@ public class PersonMergeIncremnetTask {
                 .pathPre(pathPre)
                 .mergePath(mergePath)
                 .deletedPath(deletedPath)
+                .incrPath(incrPath)
                 .build();
         log.info("show params : {} !", param.toString());
         return param;
@@ -233,6 +240,19 @@ public class PersonMergeIncremnetTask {
         log.info("exportMergePerson2CSV | cost:{} | result:{} | end !", (System.currentTimeMillis() - start), res2);
     }
 
+    private void exportIncrPerson2CSV(String date, Session session, String INCR_NAME_PATH) {
+        log.info("exportIncrPerson2CSV start!");
+        long start = System.currentTimeMillis();
+        final String cql5 = "CALL apoc.export.csv.query('MATCH (person:新增" + date + ")-[r]-(company:企业) \n" +
+                "RETURN person.person_id as person_id,person.name as person_name,company.company_id  as company_id,r.deleted as  deleted,type(r) as label', \n" +
+                "'" + INCR_NAME_PATH + "', \n" +
+                "{batchSize:10000,parallel:false,retries:3,iterateList:true}) \n" +
+                "YIELD file,rows";
+        log.info("cql5 : \n {} ", cql5);
+        String res5 = CompanyUtils.runNeo4j(session, cql5);
+        log.info("exportIncrPerson2CSV | cost:{} | result:{} | end !", (System.currentTimeMillis() - start), res5);
+    }
+
     private void mergePerson(String date, Session session) {
         log.info("mergePerson start!");
         long start = System.currentTimeMillis();