xufei 4 anni fa
parent
commit
0abbefe519

+ 2 - 2
src/main/java/com/winhc/task/PersonMergeIncremnetTask.java

@@ -61,9 +61,9 @@ public class PersonMergeIncremnetTask {
     DingUtils dingUtils;
 
     //TODO 启动合并任务
-    @Scheduled(cron = "00 00 01 * * ?")
+    @Scheduled(cron = "00 00 03 * * ?")
     //@Scheduled(cron = "*/20 * * * * ?")
-    //@Scheduled(cron = "50 57 16 * * ?")
+    //@Scheduled(cron = "50 45 09 * * ?")
     public void mergePersonScheduled() throws UnsupportedEncodingException {
 
         long start = System.currentTimeMillis();

+ 298 - 0
src/main/java/com/winhc/task/PersonMergeIncremnetTaskTmp.java

@@ -0,0 +1,298 @@
+package com.winhc.task;
+
+
+import cn.hutool.core.io.FileUtil;
+import com.alibaba.fastjson.JSON;
+import com.aliyuncs.dataworks_public.model.v20180601.CreateManualDagResponse;
+import com.aliyuncs.exceptions.ClientException;
+import com.winhc.bean.*;
+import com.winhc.kafka.produce.KafkaProduce;
+import com.winhc.service.TouchService;
+import com.winhc.utils.CompanyUtils;
+import com.winhc.utils.DateUtil;
+import com.winhc.utils.DingUtils;
+import com.winhc.utils.SchemaInit;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.LineIterator;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.Session;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+/**
+ * @author π
+ * @date 2021年1月27日
+ * @des 测试
+ */
+@Component
+@Slf4j
+@EnableScheduling
+@AllArgsConstructor
+public class PersonMergeIncremnetTaskTmp {
+
+    @Autowired
+    JdbcTemplate jdbcTemplate;
+
+    private final Driver driver;
+
+    @Autowired
+    KafkaProduce kafkaProduce;
+
+    @Autowired
+    TouchService touchService;
+
+    @Autowired
+    DingUtils dingUtils;
+
+    //TODO 启动合并任务
+    //@Scheduled(cron = "00 00 01 * * ?")
+    //@Scheduled(cron = "*/20 * * * * ?")
+    //@Scheduled(cron = "50 09 13 17 04 ?")
+    public void mergePersonScheduled() throws UnsupportedEncodingException {
+
+        long start = System.currentTimeMillis();
+        try {
+            MergeParam param = initParams();
+                //mergeAndExport(param);
+                //sendKafka(param);
+                startJob(param);
+        } catch (Exception e) {
+            log.error("MergePersonScheduled error | message:{} | .", e.getMessage());
+
+        }
+
+        log.info("MergePersonScheduled end | cost:{} | !", (System.currentTimeMillis() - start));
+    }
+
+    private void sendKafka(MergeParam param) throws InterruptedException, ExecutionException, TimeoutException, IOException {
+        //移除人员发送kafka
+        loadCSVSendKafka(param.getPathPre(), param.getDeletedPath(), param.getTopic(), "1");
+        //保证先移除,再更新
+        Thread.sleep(10 * 1000);
+        //增量人员发送kafka
+        //loadCSVSendKafka(param.getPathPre(), param.getIncrPath(), param.getTopic(), "0");
+        //合并人员发送kafka
+        loadCSVSendKafka(param.getPathPre(), param.getMergePath(), param.getTopic(), "0");
+        //更新job运行阶段
+    }
+
+    private void mergeAndExport(MergeParam param) {
+        Session session = driver.session();
+        //合并昨天逻辑
+        mergePerson(param.getLastPartition(), session);
+        //导出昨天合并csv
+        exportMergePerson2CSV(param.getLastPartition(), session, param.getMergePath());
+        //导出昨天删除csv
+        exportDeletedPerson2CSV(param.getLastPartition(), session, param.getDeletedPath());
+        //导出昨天新增csv
+        exportIncrPerson2CSV(param.getLastPartition(), session, param.getIncrPath());
+        //TODO 删除七天前标签(新增)
+        deletedPersonLabel(Constant.新增, param, session);
+        deletedPersonLabel(Constant.合并, param, session);
+        deletedPersonLabel(Constant.删除, param, session);
+        session.close();
+        //更新job运行阶段
+    }
+
+    public MergeParam initParams() {
+        String bizdate = DateUtil.getDateBefore(-1);
+        String currentPartition = DateUtil.getDateBefore(0).replace("-", "");
+        String afterPartition = DateUtil.getDateBefore(1).replace("-", "");
+        String before7DayPartition = DateUtil.getDateBefore(-14).replace("-", "");
+        String lastPartition = bizdate.replace("-", "");
+        String pathPre = "D:\\data\\opt\\";
+        if (!CompanyUtils.isWindows()) {
+            pathPre = "/data/opt/";
+        }
+        //final String mergePath = "export/merge-person-" + lastPartition + ".csv";
+        final String mergePath = "export/update-20210415.csv";
+        //final String deletedPath = "export/deleted-person-" + lastPartition + ".csv";
+        final String deletedPath = "export/deleted-20210415.csv";
+        final String incrPath = "export/incr-20210415.csv";
+        MergeParam param = MergeParam.builder()
+                .flow(Constant.flow)
+                .taskName(Constant.taskName)
+                .topic(Constant.topic)
+                .bizdate(bizdate)
+                .lastPartition(lastPartition)
+                .currentPartition(currentPartition)
+                .afterPartition(afterPartition)
+                .before7DayPartition(before7DayPartition)
+                .pathPre(pathPre)
+                .mergePath(mergePath)
+                .deletedPath(deletedPath)
+                .incrPath(incrPath)
+                .build();
+        log.info("show params : {} !", param.toString());
+        return param;
+    }
+
+    public void startJob(MergeParam param) throws InterruptedException, ClientException, IOException {
+        log.info("startJob start !");
+        long start = System.currentTimeMillis();
+        DataWorksFlowJob dataWorksFlowJob = SchemaInit.getJobs().stream().filter(j -> j.getFlow().equals(param.getFlow())).findFirst().orElseThrow(NullPointerException::new);
+        DataWorksFlowTask dataWorksFlowTask = dataWorksFlowJob.getTask().stream().filter(t -> t.getTaskName().equals(param.getTaskName())).findFirst().orElseThrow(NullPointerException::new);
+        TaskParam build = TaskParam.builder()
+                .projectName(dataWorksFlowJob.getProject())
+                .bizDate(param.getBizdate())
+                .flowName(dataWorksFlowJob.getFlow())
+                .nodeParam(dataWorksFlowTask.toNodeParam(param))
+                .build();
+        CreateManualDagResponse touch = touchService.touch(build);
+        if (touch == null) {
+            return;
+        }
+        while (true) {
+            Map<String, TaskFlowEnum> query = touchService.query(dataWorksFlowJob.getProject(), touch.getReturnValue());
+            long count = query.values()
+                    .stream()
+                    .filter(e -> !(TaskFlowEnum.SUCCESS.equals(e) || TaskFlowEnum.FAILURE.equals(e)))
+                    .count();
+
+            long count1 = query.values().stream().filter(TaskFlowEnum.FAILURE::equals).count();
+            if (count1 != 0) {
+                if (!CompanyUtils.isWindows()) {
+                    log.error("startJob job run error : \n" + "回流job失败" + "\n!!!!!!! ");
+                    dingUtils.send("startJob job run error : \n" + "回流job失败" + "\n!!!!!!! ");
+                    //更新job运行阶段
+                    return;
+                }
+            }
+
+            if (count != 0) {
+                Thread.sleep(2 * 60 * 1000);
+            } else {
+                log.info("startJob end | cost:{} | !", (System.currentTimeMillis() - start));
+                //更新job运行阶段
+                return;
+            }
+        }
+    }
+
+    private void loadCSVSendKafka(String pre, String path, String topic, String flag) throws InterruptedException, ExecutionException, TimeoutException, IOException {
+        log.info("loadCSVSendKafka | flag:{} | start !", flag);
+        long start = System.currentTimeMillis();
+        LineIterator it = FileUtils.lineIterator(FileUtil.file(pre + path), "UTF-8");
+        int i = 0;
+        try {
+            while (it.hasNext()) {
+                String line = it.nextLine();
+                ++i;
+                if (i == 1) continue;
+                Map<String, String> map = new HashMap<>();
+                List<String> list = Arrays.asList(line.split("\",\"", -1)).stream()
+                        .map(x -> x.replaceAll("\"", "")).collect(Collectors.toList());
+                if (list.size() < 2) continue;
+                String time = DateUtil.formatDate_YYYY_MM_DD_HH_MM_SS(new Date());
+                map.put("create_time", time);
+                map.put("update_time", time);
+                map.put("person_id", list.get(0));
+                map.put("person_name", list.get(1));
+                if ("0".equals(flag)) {//合并
+                    map.put("company_id", list.get(2));
+                    map.put("deleted", list.get(3));
+                    map.put("label", list.get(4));
+                } else {//删除
+                    map.put("company_id", "");
+                    map.put("deleted", "9");
+                    map.put("label", "");
+                }
+                String message = JSON.toJSONString(map);
+                kafkaProduce.produce(topic, message);
+            }
+        } finally {
+            LineIterator.closeQuietly(it);
+        }
+        log.info("loadCSVSendKafka | flag:{} | size:{} | cost:{} | end !", flag, i - 1, (System.currentTimeMillis() - start));
+    }
+
+    private void exportDeletedPerson2CSV(String date, Session session, String DELETED_NAME_PATH) {
+        log.info("exportDeletedPerson2CSV start!");
+        long start = System.currentTimeMillis();
+        final String cql3 = "CALL apoc.export.csv.query('MATCH (person:删除" + date + ")\n" +
+                "RETURN person.person_id as person_id,person.name as person_name', \n" +
+                "'" + DELETED_NAME_PATH + "', \n" +
+                "{batchSize:10000,parallel:false,retries:3,iterateList:true})\n" +
+                "YIELD file,rows";
+        log.info("cql3 : \n {} ", cql3);
+        String res3 = CompanyUtils.runNeo4j(session, cql3);
+        log.info("exportDeletedPerson2CSV | cost:{} | result:{} | end !", (System.currentTimeMillis() - start), res3);
+    }
+
+    private void exportMergePerson2CSV(String date, Session session, String MERGE_NAME_PATH) {
+        log.info("exportMergePerson2CSV start!");
+        long start = System.currentTimeMillis();
+        final String cql2 = "CALL apoc.export.csv.query('MATCH (person:合并" + date + ")-[r]-(company:企业) \n" +
+                "RETURN person.person_id as person_id,person.name as person_name,company.company_id  as company_id,r.deleted as  deleted,type(r) as label', \n" +
+                "'" + MERGE_NAME_PATH + "', \n" +
+                "{batchSize:10000,parallel:false,retries:3,iterateList:true}) \n" +
+                "YIELD file,rows";
+        log.info("cql2 : \n {} ", cql2);
+        String res2 = CompanyUtils.runNeo4j(session, cql2);
+        log.info("exportMergePerson2CSV | cost:{} | result:{} | end !", (System.currentTimeMillis() - start), res2);
+    }
+
+    private void exportIncrPerson2CSV(String date, Session session, String INCR_NAME_PATH) {
+        log.info("exportIncrPerson2CSV start!");
+        long start = System.currentTimeMillis();
+        final String cql5 = "CALL apoc.export.csv.query('MATCH (person:新增" + date + ")-[r]-(company:企业) \n" +
+                "RETURN person.person_id as person_id,person.name as person_name,company.company_id  as company_id,r.deleted as  deleted,type(r) as label', \n" +
+                "'" + INCR_NAME_PATH + "', \n" +
+                "{batchSize:20000,parallel:false,retries:3,iterateList:true}) \n" +
+                "YIELD file,rows";
+        log.info("cql5 : \n {} ", cql5);
+        String res5 = CompanyUtils.runNeo4j(session, cql5);
+        log.info("exportIncrPerson2CSV | cost:{} | result:{} | end !", (System.currentTimeMillis() - start), res5);
+    }
+
+    private void mergePerson(String date, Session session) {
+        log.info("mergePerson start!");
+        long start = System.currentTimeMillis();
+        final String cql1 = "CALL apoc.periodic.iterate( \n" +
+                "'MATCH (p:新增" + date + ")-[:投资*1..3]-(q:个人) \n" +
+                "WHERE p.name=q.name AND ID(p)<>ID(q) \n" +
+                "WITH p.name AS nn,max(ID(p)) as m_id \n" +
+                "MATCH (p:个人)-[:投资*1..3]-(q:个人) \n" +
+                "WHERE ID(p) = m_id AND p.name=q.name AND ID(p)<>ID(q) \n" +
+                "WITH p,q \n" +
+                "MATCH (q)-[r]-(x) \n" +
+                "WHERE x<>p \n" +
+                "RETURN p,q,r,x', \n" +
+                "'CALL apoc.merge.relationship(p, TYPE(r), properties(r),{}, x,{}) YIELD rel \n" +
+                "SET p:合并" + date + " \n" +
+                "SET q:删除" + date + " \n" +
+                "DELETE r', \n" +
+                "{batchSize:20000,parallel:false,retries:3,iterateList:true}\n" +
+                ") YIELD batches, total";
+        log.info("cql1 : \n {} ", cql1);
+        String res1 = CompanyUtils.runNeo4j(session, cql1);
+        log.info("mergePerson | cost:{} | result:{} | end !", (System.currentTimeMillis() - start), res1);
+    }
+
+    private void deletedPersonLabel(String labelPre, MergeParam param, Session session) {
+        String label = labelPre + param.getBefore7DayPartition();
+        log.info("deletedPersonLabel | label:{} | start!", label);
+        long start = System.currentTimeMillis();
+        final String cql4 = "CALL apoc.periodic.iterate( \n" +
+                "'MATCH (m:" + label + ") \n" +
+                "RETURN m', \n" +
+                "'REMOVE m:" + label + "', \n" +
+                "{batchSize:10000,parallel:false,retries:3,iterateList:true} \n" +
+                ") YIELD batches, total";
+        log.info("cql4 : \n {} ", cql4);
+        String res4 = CompanyUtils.runNeo4j(session, cql4);
+        log.info("deletedPersonLabel | label:{} | cost:{} | result:{} | end !", label, (System.currentTimeMillis() - start), res4);
+    }
+}

+ 31 - 0
src/test/java/com/winhc/Test1.java

@@ -0,0 +1,31 @@
+package com.winhc;
+
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import org.junit.Test;
+
+import java.util.Iterator;
+
+
+/**
+ * @author π
+ * @Description:
+ * @date 2021/4/7 17:06
+ */
+public class Test1 {
+
+    @Test
+    public void parseJson() {
+        String s = "{\"company_infos\":[{\"company_name\":\"内蒙古福事特液压有限公司\",\"company_id\":\"6c2167b6302f66f1b81acb33e401d0bb\"},{\"company_name\":\"江西福事特液压有限公司\",\"company_id\":\"54e9a3dc2ff4a7d9b37251d57107d663\"},{\"company_name\":\"长沙福事特液压有限公司\",\"company_id\":\"bce86f859c2193b51a7630841cc5e892\"}],\"person_name\":\"郭少华\"}";
+        JSONObject jsonObject = JSONObject.parseObject(s);
+        String person_name = jsonObject.getString("person_name");
+        System.out.println(person_name);
+
+        JSONArray company_infos = jsonObject.getJSONArray("company_infos");
+        for (Iterator iterator = company_infos.iterator(); iterator.hasNext();) {
+            JSONObject jsonObject1 = (JSONObject) iterator.next();
+            String company_id = jsonObject1.getString("company_id");
+            System.out.println(company_id);
+        }
+    }
+}