浏览代码

替换topic

xufei 4 年之前
父节点
当前提交
2ca2cef256

+ 5 - 6
src/main/java/com/winhc/task/PersonMergeIncremnetTask.java

@@ -61,7 +61,7 @@ public class PersonMergeIncremnetTask {
     DingUtils dingUtils;
 
     //TODO 启动合并任务
-    @Scheduled(cron = "00 30 01 * * ?")
+    @Scheduled(cron = "00 00 01 * * ?")
     //@Scheduled(cron = "*/20 * * * * ?")
     //@Scheduled(cron = "50 57 16 * * ?")
     public void mergePersonScheduled() throws UnsupportedEncodingException {
@@ -176,7 +176,7 @@ public class PersonMergeIncremnetTask {
         String bizdate = DateUtil.getDateBefore(-1);
         String currentPartition = DateUtil.getDateBefore(0).replace("-", "");
         String afterPartition = DateUtil.getDateBefore(1).replace("-", "");
-        String before7DayPartition = DateUtil.getDateBefore(-7).replace("-", "");
+        String before7DayPartition = DateUtil.getDateBefore(-14).replace("-", "");
         String lastPartition = bizdate.replace("-", "");
         String pathPre = "D:\\data\\opt\\";
         if (!CompanyUtils.isWindows()) {
@@ -316,7 +316,7 @@ public class PersonMergeIncremnetTask {
         final String cql5 = "CALL apoc.export.csv.query('MATCH (person:新增" + date + ")-[r]-(company:企业) \n" +
                 "RETURN person.person_id as person_id,person.name as person_name,company.company_id  as company_id,r.deleted as  deleted,type(r) as label', \n" +
                 "'" + INCR_NAME_PATH + "', \n" +
-                "{batchSize:10000,parallel:false,retries:3,iterateList:true}) \n" +
+                "{batchSize:20000,parallel:false,retries:3,iterateList:true}) \n" +
                 "YIELD file,rows";
         log.info("cql5 : \n {} ", cql5);
         String res5 = CompanyUtils.runNeo4j(session, cql5);
@@ -336,12 +336,11 @@ public class PersonMergeIncremnetTask {
                 "MATCH (q)-[r]-(x) \n" +
                 "WHERE x<>p \n" +
                 "RETURN p,q,r,x', \n" +
-                "'CALL apoc.merge.relationship(p, TYPE(r), {},{}, x) YIELD rel \n" +
-                "SET rel = r \n" +
+                "'CALL apoc.merge.relationship(p, TYPE(r), properties(r),{}, x,{}) YIELD rel \n" +
                 "SET p:合并" + date + " \n" +
                 "SET q:删除" + date + " \n" +
                 "DELETE r', \n" +
-                "{batchSize:10000,parallel:false,retries:3,iterateList:true}\n" +
+                "{batchSize:20000,parallel:false,retries:3,iterateList:true}\n" +
                 ") YIELD batches, total";
         log.info("cql1 : \n {} ", cql1);
         String res1 = CompanyUtils.runNeo4j(session, cql1);

+ 8 - 0
src/main/java/com/winhc/utils/CompanyUtils.java

@@ -24,6 +24,14 @@ public class CompanyUtils {
         return data;
     }
 
+    public static String runNeo4j(Session session, String cql, HashMap<String,Object> parameters) {
+        String data = session.writeTransaction(tx -> {
+            Result result = tx.run(cql, parameters);
+            return "success";
+        });
+        return data;
+    }
+
     public static Boolean isWindows() {
         Properties props = System.getProperties();
         if (props.containsKey("os.name") && props.get("os.name").toString().contains("Windows")) {

+ 3 - 1
src/main/resources/application-prd.properties

@@ -24,8 +24,10 @@ scheduling.enabled = true
 
 #============== kafka ===================
 # 指定kafka 代理地址,可以多个
+#老版本kafka
+#spring.kafka.bootstrap-servers=192.168.4.237:9092,192.168.4.235:9092,192.168.4.236:9092
 #prod
-spring.kafka.bootstrap-servers=192.168.4.237:9092,192.168.4.235:9092,192.168.4.236:9092
+spring.kafka.bootstrap-servers=192.168.4.239:9092,192.168.4.241:9092,192.168.4.240:9092
 
 #topic
 spring.kafka.topic_node_relation_union=inc_node_relation_union

+ 126 - 0
src/test/java/com/winhc/ParseMergeCSV.java

@@ -0,0 +1,126 @@
+package com.winhc;
+
+import cn.hutool.core.io.FileUtil;
+import com.alibaba.fastjson.JSON;
+import com.winhc.utils.CompanyUtils;
+import com.winhc.utils.DateUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.LineIterator;
+import org.apache.commons.lang3.StringUtils;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.Session;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.stream.Collectors;
+
+/**
+ * @author π
+ * @Description:解析 merge CSV
+ * @date 2021/2/26 13:53
+ */
+
+@RunWith(SpringRunner.class)
+@SpringBootTest
+@Slf4j
+public class ParseMergeCSV {
+    @Autowired
+    Driver driver;
+
+    @Test
+    public void parseCSV() throws IOException {
+        String path = "D:\\data\\opt\\export\\merge-person-20210219.csv";
+        String flag = "0";
+
+        LineIterator it = FileUtils.lineIterator(FileUtil.file(path), "UTF-8");
+        try {
+            int i = 0;
+            Set<String> set = new HashSet<String>();
+            while (it.hasNext()) {
+                String line = it.nextLine();
+                ++i;
+                if (i == 1) continue;
+                Map<String, String> map = new HashMap<>();
+                List<String> list = Arrays.asList(line.split("\",\"", -1)).stream()
+                        .map(x -> x.replaceAll("\"", "")).collect(Collectors.toList());
+                if (list.size() < 2) continue;
+                String time = DateUtil.formatDate_YYYY_MM_DD_HH_MM_SS(new Date());
+                map.put("create_time", time);
+                map.put("update_time", time);
+                map.put("person_id", list.get(0));
+                map.put("person_name", list.get(1));
+                if ("0".equals(flag)) {//合并
+                    map.put("company_id", list.get(2));
+                    map.put("deleted", list.get(3));
+                    map.put("label", list.get(4));
+                } else {//删除
+                    map.put("company_id", "");
+                    map.put("deleted", "9");
+                    map.put("label", "");
+                }
+                String message = JSON.toJSONString(map);
+                if (StringUtils.isBlank(map.get("deleted"))) {
+                    System.out.println(message);
+                }
+                set.add(map.get("person_id"));
+                //System.out.println(message);
+
+            }
+            System.out.println(set.size());
+
+            List<Map<String, Object>> batch_list = new ArrayList<>();
+
+            for (String s : set) {
+                HashMap<String, Object> m2 = new HashMap<>();
+                m2.put("person_id", s);
+                batch_list.add(m2);
+            }
+
+            setLabel(batch_list, "个人", "合并20210219");
+        } finally {
+            LineIterator.closeQuietly(it);
+        }
+    }
+
+    public void setLabel(List<Map<String, Object>> batch_list, String fromLabel, String toLabel) {
+        long start = System.currentTimeMillis();
+        Session session = driver.session();
+        final String cql = "WITH  {batch_list} AS batch_list \n" +
+                "UNWIND batch_list AS row \n" +
+                "MATCH(s:" + fromLabel + "{person_id:row.person_id}) \n" +
+                "SET s:" + toLabel + " \n";
+        log.info("consumer size: {}, cost: {}, cql:{}", batch_list.size(), (System.currentTimeMillis() - start), cql);
+        CompanyUtils.runNeo4j(session, cql, new HashMap<String, Object>() {{
+            put("batch_list", batch_list);
+        }});
+        session.close();
+    }
+
+    @Test
+    public void combineLabel() throws IOException {
+//        String[] fromLabels = {"合并20210219", "合并20210220", "合并20210221", "合并20210222", "合并20210223"
+//                , "合并20210224", "合并20210225", "合并20210226", "合并20210227", "合并20210301"};
+        String[] fromLabels = {"合并20210302", "合并20210303", "合并20210304", "合并20210304"};
+        String toLabel = "更新20210301";
+        for (String fromLabel : fromLabels) {
+            combineLabel(fromLabel, toLabel);
+        }
+    }
+
+    public void combineLabel(String fromLabel, String toLabel) {
+        long start = System.currentTimeMillis();
+        Session session = driver.session();
+        final String cql = "MATCH (m:" + fromLabel + ") \n" +
+                "SET m:" + toLabel + " \n";
+        log.info("consumer cost: {}, cql:{}", (System.currentTimeMillis() - start), cql);
+        CompanyUtils.runNeo4j(session, cql, null);
+        session.close();
+    }
+
+}

+ 101 - 0
src/test/java/com/winhc/PidReplaceNameCSV.java

@@ -0,0 +1,101 @@
+package com.winhc;
+
+import cn.hutool.core.io.FileUtil;
+import com.alibaba.fastjson.JSON;
+import com.winhc.utils.CompanyUtils;
+import com.winhc.utils.DateUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.LineIterator;
+import org.apache.commons.lang3.StringUtils;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.Session;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.stream.Collectors;
+
+/**
+ * @author π
+ * @Description:解析 merge CSV
+ * @date 2021/2/26 13:53
+ */
+
+@RunWith(SpringRunner.class)
+@SpringBootTest
+@Slf4j
+public class PidReplaceNameCSV {
+    @Autowired
+    Driver driver;
+
+    @Test
+    public void parseCSV() throws IOException {
+        String path = "D:\\data\\opt\\export\\pid-replace-name-20210305.csv";
+
+        LineIterator it = FileUtils.lineIterator(FileUtil.file(path), "UTF-8");
+        try {
+            int i = 0;
+            List<Map<String, Object>> batch_list = new ArrayList<>();
+            while (it.hasNext()) {
+                String line = it.nextLine();
+                ++i;
+                if (i == 1) continue;
+                Map<String, Object> map = new HashMap<>();
+                List<String> list = Arrays.asList(line.split(",", -1)).stream()
+                        .map(x -> x.replaceAll("\"", "")).collect(Collectors.toList());
+                if (list.size() < 2) continue;
+                map.put("person_id", list.get(0));
+                map.put("error_name", list.get(1));
+                map.put("person_name", list.get(2));
+                String message = JSON.toJSONString(map);
+                System.out.println(message);
+                batch_list.add(map);
+            }
+            System.out.println(batch_list.size());
+            setLabel(batch_list, "个人", "新增20210305");
+        } finally {
+            LineIterator.closeQuietly(it);
+        }
+    }
+
+    public void setLabel(List<Map<String, Object>> batch_list, String fromLabel, String toLabel) {
+        long start = System.currentTimeMillis();
+        Session session = driver.session();
+        final String cql = "WITH  {batch_list} AS batch_list \n" +
+                "UNWIND batch_list AS row \n" +
+                "MATCH(s:" + fromLabel + "{person_id:row.person_id}) \n" +
+                "SET s.name = row.person_name \n" +
+                "SET s:" + toLabel + " \n";
+        log.info("consumer size: {}, cost: {}, cql:{}", batch_list.size(), (System.currentTimeMillis() - start), cql);
+        CompanyUtils.runNeo4j(session, cql, new HashMap<String, Object>() {{
+            put("batch_list", batch_list);
+        }});
+        session.close();
+    }
+
+    @Test
+    public void combineLabel() throws IOException {
+        String[] fromLabels = {"合并20210219", "合并20210220", "合并20210221", "合并20210222", "合并20210223"
+                , "合并20210224", "合并20210225", "合并20210226", "合并20210227", "合并20210301"};
+        String toLabel = "更新20210301";
+        for (String fromLabel : fromLabels) {
+            combineLabel(fromLabel, toLabel);
+        }
+    }
+
+    public void combineLabel(String fromLabel, String toLabel) {
+        long start = System.currentTimeMillis();
+        Session session = driver.session();
+        final String cql = "MATCH (m:" + fromLabel + ") \n" +
+                "SET m:" + toLabel + " \n";
+        log.info("consumer cost: {}, cql:{}", (System.currentTimeMillis() - start), cql);
+        CompanyUtils.runNeo4j(session, cql, null);
+        session.close();
+    }
+
+}