Browse Source

新增案件逻辑

xufei 4 years ago
parent
commit
f57841a605

+ 24 - 0
src/main/java/com/winhc/config/CaseConfig.java

@@ -0,0 +1,24 @@
+package com.winhc.config;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * @author π
+ * @Description:
+ * @date 2021/5/18 15:35
+ */
+@Configuration
+@Getter
+@Setter
+public class CaseConfig {
+
+    @Value("${case.task.flow}")
+    String flow;
+    @Value("${case.task.taskName}")
+    String taskName;
+    @Value("${case.task.topic}")
+    String topic;
+}

+ 11 - 6
src/main/java/com/winhc/config/Neo4jDriver.java

@@ -4,25 +4,30 @@ import org.neo4j.driver.AuthTokens;
 import org.neo4j.driver.Config;
 import org.neo4j.driver.Driver;
 import org.neo4j.driver.GraphDatabase;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Primary;
 import org.springframework.stereotype.Component;
 
 import java.util.concurrent.TimeUnit;
 
 @Component
-public class Neo4jDriver {
+public class Neo4jDriverV1 {
 
-    @Value("${spring.data.neo4j.uri}")
+    @Value("${spring.data.neo4j.uri.v1}")
     private String uri;
-    @Value("${spring.data.neo4j.username}")
+    @Value("${spring.data.neo4j.username.v1}")
     private String username;
-    @Value("${spring.data.neo4j.password}")
+    @Value("${spring.data.neo4j.password.v1}")
     private String password;
 
-    @Bean
-    public Driver init() {
+    @Bean("DriverV1")
+    @Primary
+    public Driver init1() {
         Config config = Config.builder().withConnectionTimeout(100, TimeUnit.SECONDS).withMaxTransactionRetryTime(10, TimeUnit.SECONDS).build();
         return GraphDatabase.driver(uri, AuthTokens.basic(username, password), config);
     }
+
 }

+ 12 - 10
src/main/java/com/winhc/config/Neo4jDriver.java

@@ -4,6 +4,8 @@ import org.neo4j.driver.AuthTokens;
 import org.neo4j.driver.Config;
 import org.neo4j.driver.Driver;
 import org.neo4j.driver.GraphDatabase;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
 import org.springframework.stereotype.Component;
@@ -11,18 +13,18 @@ import org.springframework.stereotype.Component;
 import java.util.concurrent.TimeUnit;
 
 @Component
-public class Neo4jDriver {
+public class Neo4jDriverV2 {
 
-    @Value("${spring.data.neo4j.uri}")
-    private String uri;
-    @Value("${spring.data.neo4j.username}")
-    private String username;
-    @Value("${spring.data.neo4j.password}")
-    private String password;
+    @Value("${spring.data.neo4j.uri.v2}")
+    private String uri_v2;
+    @Value("${spring.data.neo4j.username.v2}")
+    private String username_v2;
+    @Value("${spring.data.neo4j.password.v2}")
+    private String password_v2;
 
-    @Bean
-    public Driver init() {
+    @Bean("DriverV2")
+    public Driver init2() {
         Config config = Config.builder().withConnectionTimeout(100, TimeUnit.SECONDS).withMaxTransactionRetryTime(10, TimeUnit.SECONDS).build();
-        return GraphDatabase.driver(uri, AuthTokens.basic(username, password), config);
+        return GraphDatabase.driver(uri_v2, AuthTokens.basic(username_v2, password_v2), config);
     }
 }

+ 173 - 0
src/main/java/com/winhc/task/CaseIncrementTask.java

@@ -0,0 +1,173 @@
+package com.winhc.task;
+
+
+import cn.hutool.core.io.FileUtil;
+import com.alibaba.fastjson.JSON;
+import com.winhc.bean.Constant;
+import com.winhc.bean.MergeParam;
+import com.winhc.config.CaseConfig;
+import com.winhc.kafka.produce.KafkaProduce;
+import com.winhc.service.TouchService;
+import com.winhc.utils.CompanyUtils;
+import com.winhc.utils.DateUtil;
+import com.winhc.utils.DingUtils;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.LineIterator;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.Session;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+/**
+ * @author π
+ * @date 2021年5月16日
+ * @des 案件关联回流
+ */
+@Component
+@Slf4j
+@EnableScheduling
+@AllArgsConstructor
+public class CaseIncrementTask {
+
+    @Autowired
+    JdbcTemplate jdbcTemplate;
+
+    @Autowired
+    @Qualifier("DriverV2")
+    Driver driver;
+
+    @Autowired
+    KafkaProduce kafkaProduce;
+
+    @Autowired
+    TouchService touchService;
+
+    @Autowired
+    DingUtils dingUtils;
+
+    @Autowired
+    CaseConfig caseConfig;
+
+    //TODO 启动合并任务
+    //@Scheduled(cron = "00 00 01 * * ?")
+    //@Scheduled(cron = "*/20 * * * * ?")
+    //@Scheduled(cron = "55 13 16 02 06 ?")
+    public void mergePersonScheduled() throws UnsupportedEncodingException {
+        long start = System.currentTimeMillis();
+        try {
+            MergeParam param = initParams();
+            exportIncrPerson2CSV(param.getLastPartition(), driver.session(), param.getIncrPath());
+            loadCSVSendKafka(param.getPathPre() + param.getIncrPath(), param.getTopic());
+        } catch (Exception e) {
+            log.error("MergePersonScheduled error | message:{} | .", e.getMessage());
+        }
+        log.info("MergePersonScheduled end | cost:{} | !", (System.currentTimeMillis() - start));
+    }
+
+    public MergeParam initParams() {
+        String bizdate = DateUtil.getDateBefore(-1);
+        String currentPartition = DateUtil.getDateBefore(0).replace("-", "");
+        String afterPartition = DateUtil.getDateBefore(1).replace("-", "");
+        String before7DayPartition = DateUtil.getDateBefore(-14).replace("-", "");
+        String lastPartition = bizdate.replace("-", "");
+        String pathPre = "D:\\data\\opt\\";
+        if (!CompanyUtils.isWindows()) {
+            pathPre = "/data/opt/";
+        }
+        final String incrPath = "export/incr-case-" + lastPartition + ".csv";
+        MergeParam param = MergeParam.builder()
+                .flow(caseConfig.getFlow())
+                .taskName(caseConfig.getTaskName())
+                .topic(caseConfig.getTopic())
+                .bizdate(bizdate)
+                .lastPartition(lastPartition)
+                .currentPartition(currentPartition)
+                .afterPartition(afterPartition)
+                .before7DayPartition(before7DayPartition)
+                .pathPre(pathPre)
+                .incrPath(incrPath)
+                .build();
+        log.info("show params : {} !", param.toString());
+        return param;
+    }
+
+    /**
+     * 导出增量变化数据
+     *
+     * @param date
+     * @param session
+     * @param INCR_NAME_PATH
+     */
+    private void exportIncrPerson2CSV(String date, Session session, String INCR_NAME_PATH) {
+        log.info("exportIncrPerson2CSV start!");
+        long start = System.currentTimeMillis();
+        final String cql5 = " CALL apoc.export.csv.query('MATCH (p:" + CompanyUtils.getIncrPersonLabel("新增") + ")\n" +
+                "RETURN p.case_id as case_id,p.component_id as component_id',\n" +
+                "'" + INCR_NAME_PATH + "',\n" +
+                "{batchSize:20000,parallel:false,retries:3,iterateList:true})\n" +
+                "YIELD file,rows";
+        log.info("cql5 : \n {} ", cql5);
+        String res5 = CompanyUtils.runNeo4j(session, cql5);
+        log.info("exportIncrPerson2CSV | cost:{} | result:{} | end !", (System.currentTimeMillis() - start), res5);
+    }
+
+    /**
+     * 发送kafka
+     *
+     * @param path
+     * @param topic
+     * @throws InterruptedException
+     * @throws ExecutionException
+     * @throws TimeoutException
+     * @throws IOException
+     */
+    private void loadCSVSendKafka(String path, String topic) throws InterruptedException, ExecutionException, TimeoutException, IOException {
+        log.info("loadCSVSendKafka  | start !");
+        long start = System.currentTimeMillis();
+        LineIterator it = FileUtils.lineIterator(FileUtil.file(path), "UTF-8");
+        int i = 0;
+        try {
+            while (it.hasNext()) {
+                String line = it.nextLine();
+                ++i;
+                if (i == 1) continue;
+                Map<String, String> map = new HashMap<>();
+                List<String> list = Arrays.asList(line.split("\",\"", -1)).stream()
+                        .map(x -> x.replaceAll("\"", "")).collect(Collectors.toList());
+                if (list.size() < 2) continue;
+                String time = DateUtil.formatDate_YYYY_MM_DD_HH_MM_SS(new Date());
+                String[] arr = list.get(0).split("_", -1);
+                map.put("create_time", time);
+                map.put("update_time", time);
+                map.put("rowkey", arr[0]);
+                map.put("tn", arr[1]);
+                map.put("component_id", list.get(1));
+                String message = JSON.toJSONString(map);
+                kafkaProduce.produce(topic, message);
+            }
+        }catch (Exception e1){
+            e1.printStackTrace();
+            log.error(e1.getMessage());
+        }finally {
+            LineIterator.closeQuietly(it);
+        }
+        log.info("loadCSVSendKafka | size:{} | cost:{} | end !", i - 1, (System.currentTimeMillis() - start));
+    }
+
+
+}

+ 5 - 6
src/main/java/com/winhc/task/PersonMergeIncremnetTask.java

@@ -2,9 +2,6 @@ package com.winhc.task;
 
 
 import cn.hutool.core.io.FileUtil;
-import cn.hutool.core.text.csv.CsvData;
-import cn.hutool.core.text.csv.CsvRow;
-import cn.hutool.core.text.csv.CsvUtil;
 import com.alibaba.fastjson.JSON;
 import com.aliyuncs.dataworks_public.model.v20180601.CreateManualDagResponse;
 import com.aliyuncs.exceptions.ClientException;
@@ -17,12 +14,12 @@ import com.winhc.utils.DingUtils;
 import com.winhc.utils.SchemaInit;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import lombok.val;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.LineIterator;
 import org.neo4j.driver.Driver;
 import org.neo4j.driver.Session;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.jdbc.core.JdbcTemplate;
 import org.springframework.scheduling.annotation.EnableScheduling;
 import org.springframework.scheduling.annotation.Scheduled;
@@ -49,7 +46,9 @@ public class PersonMergeIncremnetTask {
     @Autowired
     JdbcTemplate jdbcTemplate;
 
-    private final Driver driver;
+    @Autowired
+    @Qualifier("DriverV1")
+    Driver driver;
 
     @Autowired
     KafkaProduce kafkaProduce;
@@ -63,7 +62,7 @@ public class PersonMergeIncremnetTask {
     //TODO 启动合并任务
     @Scheduled(cron = "00 00 03 * * ?")
     //@Scheduled(cron = "*/20 * * * * ?")
-    //@Scheduled(cron = "50 45 09 * * ?")
+    //@Scheduled(cron = "50 53 12 * * ?")
     public void mergePersonScheduled() throws UnsupportedEncodingException {
 
         long start = System.currentTimeMillis();

+ 4 - 1
src/main/java/com/winhc/task/PersonMergeIncremnetTaskTmp.java

@@ -19,6 +19,7 @@ import org.apache.commons.io.LineIterator;
 import org.neo4j.driver.Driver;
 import org.neo4j.driver.Session;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.jdbc.core.JdbcTemplate;
 import org.springframework.scheduling.annotation.EnableScheduling;
 import org.springframework.scheduling.annotation.Scheduled;
@@ -45,7 +46,9 @@ public class PersonMergeIncremnetTaskTmp {
     @Autowired
     JdbcTemplate jdbcTemplate;
 
-    private final Driver driver;
+    @Autowired
+    @Qualifier("DriverV1")
+    Driver driver;
 
     @Autowired
     KafkaProduce kafkaProduce;

+ 301 - 0
src/main/java/com/winhc/tmp/PidUpdateExportTask.java

@@ -0,0 +1,301 @@
+package com.winhc.tmp;
+
+import cn.hutool.core.io.FileUtil;
+import com.alibaba.fastjson.JSON;
+import com.winhc.utils.CompanyUtils;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.LineIterator;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.Session;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+import java.io.IOException;
+import java.util.*;
+import java.util.stream.Collectors;
+
+/**
+ * @author π
+ * @Description:临时测试任务
+ * @date 2021/2/26 13:53
+ */
+
+//@Component
+@Slf4j
+//@EnableScheduling
+//@AllArgsConstructor
+public class PidUpdateExportTask {
+
+    @Autowired
+    @Qualifier("DriverV2")
+    Driver driver;
+
+    //@Scheduled(cron = "00 00 01 * * ?")
+    //@Scheduled(cron = "*/20 * * * * ?")
+    //@Scheduled(cron = "55 55 13 01 06 ?")
+    public void parseCSV() throws IOException {
+        String path = "D:\\data\\opt\\export\\error_pid_all.csv";
+        LineIterator it = FileUtils.lineIterator(FileUtil.file(path), "UTF-8");
+        try {
+            int i = 0;
+            List<Map<String, Object>> batch_list = new ArrayList<>();
+            while (it.hasNext()) {
+                String person_id = it.nextLine();
+                ++i;
+                if (i == 1) continue;
+                Map<String, Object> map = new HashMap<>();
+                map.put("person_id", person_id);
+                String message = JSON.toJSONString(map);
+                System.out.println(message);
+                batch_list.add(map);
+                if(batch_list.size() == 5000){
+                    setLabel(batch_list, "个人", "update20210531");
+                    System.out.println("size: "+batch_list.size());
+                    batch_list.clear();
+                }
+            }
+            if(batch_list.size() > 0){
+                setLabel(batch_list, "个人", "update20210531");
+                System.out.println("size: "+batch_list.size());
+                batch_list.clear();
+            }
+            System.out.println(batch_list.size());
+        }catch (Exception e){
+            e.printStackTrace();
+            System.out.println(e.getMessage());
+        }finally {
+            LineIterator.closeQuietly(it);
+        }
+    }
+
+    public void setLabel(List<Map<String, Object>> batch_list, String fromLabel, String toLabel) {
+        long start = System.currentTimeMillis();
+        Session session = driver.session();
+        final String cql = "WITH  {batch_list} AS batch_list \n" +
+                "UNWIND batch_list AS row \n" +
+                "MATCH(s:" + fromLabel + "{person_id:row.person_id}) \n" +
+                "SET s:" + toLabel + " \n";
+        log.info("consumer size: {}, cost: {}, cql:{}", batch_list.size(), (System.currentTimeMillis() - start), cql);
+        CompanyUtils.runNeo4j(session, cql, new HashMap<String, Object>() {{
+            put("batch_list", batch_list);
+        }});
+        session.close();
+    }
+
+    //@Scheduled(cron = "00 00 01 * * ?")
+    //@Scheduled(cron = "*/20 * * * * ?")
+    //@Scheduled(cron = "20 43 14 01 06 ?")
+    public void parseTouZiCSV() throws IOException {
+        String path = "D:\\data\\opt\\export\\export-touzi-all.csv";
+        if (!CompanyUtils.isWindows()) {
+            path = "/data/opt/export/export-touzi-all.csv";
+        }
+        LineIterator it = FileUtils.lineIterator(FileUtil.file(path), "UTF-8");
+        try {
+            int i = 0;
+            List<Map<String, Object>> batch_list = new ArrayList<>();
+            while (it.hasNext()) {
+                String line = it.nextLine();
+                ++i;
+                if (i == 1) continue;
+                Map<String, Object> map = new HashMap<>();
+                List<String> list = Arrays.asList(line.split(",", -1)).stream()
+                        .map(x -> x.replaceAll("\"", "")).collect(Collectors.toList());
+                if (list.size() < 2) continue;
+                map.put("person_id", list.get(0));
+                map.put("person_name", list.get(1));
+                map.put("percent", list.get(2));
+                map.put("deleted", list.get(3));
+                map.put("company_id", list.get(4));
+                map.put("company_name", list.get(5));
+                String message = JSON.toJSONString(map);
+                //System.out.println(message);
+                batch_list.add(map);
+                if(batch_list.size() == 1000){
+                    setTouZiLabel(batch_list, "个人", "update20210601");
+                    System.out.println("size: "+ i);
+                    batch_list.clear();
+                }
+            }
+            if(batch_list.size() > 0){
+                setTouZiLabel(batch_list, "个人", "update20210601");
+                System.out.println("size: "+ i);
+                batch_list.clear();
+            }
+        }catch (Exception e){
+            e.printStackTrace();
+            System.out.println(e.getMessage());
+        } finally {
+            LineIterator.closeQuietly(it);
+        }
+    }
+
+
+    public void setTouZiLabel(List<Map<String, Object>> batch_list, String fromLabel, String toLabel) {
+        long start = System.currentTimeMillis();
+        Session session = driver.session();
+
+        final String cql = "\nWITH  {batch_list} AS batch_list \n" +
+                "UNWIND batch_list AS row \n" +
+                "MERGE(s:" + fromLabel + "{person_id:row.person_id}) \n" +
+                "SET s.name=row.person_name, s.person_id=row.person_id \n" +
+                "SET s:" + toLabel + " \n" +
+                "MERGE(e:企业{company_id:row.company_id}) \n" +
+                "SET e.name=row.company_name, e.company_id=row.company_id \n" +
+                "WITH s,e,row \n" +
+                "MERGE(s)-[r:投资]->(e) \n" +
+                "SET r.percent=row.percent, r.deleted=row.deleted \n";
+
+        CompanyUtils.runNeo4j(session, cql, new HashMap<String, Object>() {{
+            put("batch_list", batch_list);
+        }});
+        log.info("consumer size: {}, cost: {}, cql:{}", batch_list.size(), (System.currentTimeMillis() - start), cql);
+        session.close();
+    }
+
+    //@Scheduled(cron = "00 00 01 * * ?")
+    //@Scheduled(cron = "*/20 * * * * ?")
+    //@Scheduled(cron = "50 22 15 01 06 ?")
+    public void parseFaRenCSV() throws IOException {
+        String path = "D:\\data\\opt\\export\\export-faren-all.csv";
+        if (!CompanyUtils.isWindows()) {
+            path = "/data/opt/export/export-faren-all.csv";
+        }
+        LineIterator it = FileUtils.lineIterator(FileUtil.file(path), "UTF-8");
+        try {
+            int i = 0;
+            List<Map<String, Object>> batch_list = new ArrayList<>();
+            while (it.hasNext()) {
+                String line = it.nextLine();
+                ++i;
+                if (i == 1) continue;
+                Map<String, Object> map = new HashMap<>();
+                List<String> list = Arrays.asList(line.split(",", -1)).stream()
+                        .map(x -> x.replaceAll("\"", "")).collect(Collectors.toList());
+                if (list.size() < 2) continue;
+                map.put("person_id", list.get(0));
+                map.put("person_name", list.get(1));
+                map.put("deleted", list.get(2));
+                map.put("company_id", list.get(3));
+                map.put("company_name", list.get(4));
+                String message = JSON.toJSONString(map);
+                //System.out.println(message);
+                batch_list.add(map);
+                if(batch_list.size() == 1000){
+                    setFaRenLabel(batch_list, "个人", "update20210601");
+                    System.out.println("size: "+ i);
+                    batch_list.clear();
+                }
+            }
+            if(batch_list.size() > 0){
+                setFaRenLabel(batch_list, "个人", "update20210601");
+                System.out.println("size: "+ i);
+                batch_list.clear();
+            }
+        }catch (Exception e){
+            e.printStackTrace();
+            System.out.println(e.getMessage());
+        } finally {
+            LineIterator.closeQuietly(it);
+        }
+    }
+
+
+    public void setFaRenLabel(List<Map<String, Object>> batch_list, String fromLabel, String toLabel) {
+        long start = System.currentTimeMillis();
+        Session session = driver.session();
+
+        final String cql = "\nWITH  {batch_list} AS batch_list \n" +
+                "UNWIND batch_list AS row \n" +
+                "MERGE(s:" + fromLabel + "{person_id:row.person_id}) \n" +
+                "SET s.name=row.person_name, s.person_id=row.person_id \n" +
+                "SET s:" + toLabel + " \n" +
+                "MERGE(e:企业{company_id:row.company_id}) \n" +
+                "SET e.name=row.company_name, e.company_id=row.company_id \n" +
+                "WITH s,e,row \n" +
+                "MERGE(s)-[r:法人]->(e) \n" +
+                "SET r.deleted=row.deleted \n";
+
+        CompanyUtils.runNeo4j(session, cql, new HashMap<String, Object>() {{
+            put("batch_list", batch_list);
+        }});
+        log.info("consumer size: {}, cost: {}, cql:{}", batch_list.size(), (System.currentTimeMillis() - start), cql);
+        session.close();
+    }
+
+    //@Scheduled(cron = "00 00 01 * * ?")
+    //@Scheduled(cron = "*/20 * * * * ?")
+    //@Scheduled(cron = "20 39 15 01 06 ?")
+    public void parseGaoGuanCSV() throws IOException {
+        String path = "D:\\data\\opt\\export\\export-gaoguan-all.csv";
+        if (!CompanyUtils.isWindows()) {
+            path = "/data/opt/export/export-gaoguan-all.csv";
+        }
+        LineIterator it = FileUtils.lineIterator(FileUtil.file(path), "UTF-8");
+        try {
+            int i = 0;
+            List<Map<String, Object>> batch_list = new ArrayList<>();
+            while (it.hasNext()) {
+                String line = it.nextLine();
+                ++i;
+                if (i == 1) continue;
+                Map<String, Object> map = new HashMap<>();
+                List<String> list = Arrays.asList(line.split(",", -1)).stream()
+                        .map(x -> x.replaceAll("\"", "")).collect(Collectors.toList());
+                if (list.size() < 2) continue;
+                map.put("person_id", list.get(0));
+                map.put("person_name", list.get(1));
+                map.put("staff_type", list.get(2));
+                map.put("deleted", list.get(3));
+                map.put("company_id", list.get(4));
+                map.put("company_name", list.get(5));
+                String message = JSON.toJSONString(map);
+                System.out.println(message);
+                batch_list.add(map);
+                if(batch_list.size() == 2000){
+                    setGaoGuanLabel(batch_list, "个人", "update20210601");
+                    System.out.println("size: "+ i);
+                    batch_list.clear();
+                }
+            }
+            if(batch_list.size() > 0){
+                setGaoGuanLabel(batch_list, "个人", "update20210601");
+                System.out.println("size: "+ i);
+                batch_list.clear();
+            }
+        }catch (Exception e){
+            e.printStackTrace();
+            System.out.println(e.getMessage());
+        } finally {
+            LineIterator.closeQuietly(it);
+        }
+    }
+
+
+    public void setGaoGuanLabel(List<Map<String, Object>> batch_list, String fromLabel, String toLabel) {
+        long start = System.currentTimeMillis();
+        Session session = driver.session();
+
+        final String cql = "\nWITH  {batch_list} AS batch_list \n" +
+                "UNWIND batch_list AS row \n" +
+                "MERGE(s:" + fromLabel + "{person_id:row.person_id}) \n" +
+                "SET s.name=row.person_name, s.person_id=row.person_id \n" +
+                "SET s:" + toLabel + " \n" +
+                "MERGE(e:企业{company_id:row.company_id}) \n" +
+                "SET e.name=row.company_name, e.company_id=row.company_id \n" +
+                "WITH s,e,row \n" +
+                "MERGE(s)-[r:高管]->(e) \n" +
+                "SET r.staff_type=row.staff_type , r.deleted=row.deleted \n";
+        CompanyUtils.runNeo4j(session, cql, new HashMap<String, Object>() {{
+            put("batch_list", batch_list);
+        }});
+        log.info("consumer size: {}, cost: {}, cql:{}", batch_list.size(), (System.currentTimeMillis() - start), cql);
+        session.close();
+    }
+
+
+}

+ 3 - 0
src/main/java/com/winhc/utils/CompanyUtils.java

@@ -39,6 +39,9 @@ public class CompanyUtils {
         }
         return false;
     }
+    public static String getIncrPersonLabel(String label) {
+        return label + DateUtil.getDateBefore(-1).replace("-", "");
+    }
 
     public static void main(String[] args) {
         System.out.println(CompanyUtils.isWindows());

+ 24 - 10
src/main/resources/application-dev.properties

@@ -1,15 +1,23 @@
 #eureka.client.serviceUrl.defaultZone= http://106.14.81.247:8900/eureka/
 
-#Neo4j配置
-spring.data.neo4j.username=neo4j
-spring.data.neo4j.password=neo4j168
+
+#Neo4j配置(第一台机器)
+#spring.data.neo4j.username=neo4j
+#spring.data.neo4j.password=neo4j168
 #spring.data.neo4j.uri=bolt://127.0.0.1:7687
+#spring.data.neo4j.uri=bolt://139.196.165.100:7687
+
+spring.data.neo4j.username.v1=neo4j
+spring.data.neo4j.password.v1=neo4j168
+#spring.data.neo4j.uri.v1=bolt://139.196.165.100:7687
+spring.data.neo4j.uri.v1=bolt://192.168.2.57:7687
 
-#爬虫外网
-#spring.data.neo4j.uri=bolt://47.100.177.224:7687
+#Neo4j配置(第二台机器)
+spring.data.neo4j.username.v2=neo4j
+spring.data.neo4j.password.v2=neo4j168
+spring.data.neo4j.uri.v2=bolt://139.224.197.164:7687
+#spring.data.neo4j.uri.v2=bolt://192.168.2.60:7687
 
-#生产外网
-spring.data.neo4j.uri=bolt://139.196.165.100:7687
 
 
 #spring.datasource.url = jdbc:mysql://47.100.20.161:3306/prism1?useUnicode=true&characterEncoding=utf-8
@@ -22,9 +30,10 @@ scheduling.enabled = true
 
 #============== kafka ===================
 # 指定kafka 代理地址,可以多个
-spring.kafka.bootstrap-servers=47.100.177.224:9092
+spring.kafka.bootstrap-servers=47.101.221.131:9092
+#spring.kafka.bootstrap-servers=192.168.4.239:9092,192.168.4.241:9092,192.168.4.240:9092
 #topic
-spring.kafka.topic_node_relation_union=inc_node_relation_union
+#spring.kafka.topic_node_relation_union=inc_node_relation_union
 
 #spring.kafka.bootstrap-servers=192.168.4.237:9092,192.168.4.235:9092,192.168.4.236:9092
 #spring.kafka.topic=xf_test
@@ -40,7 +49,7 @@ spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.Str
 
 #=============== consumer  =======================
 # 指定默认消费者group id
-spring.kafka.consumer.group-id=neo4j_node_relation
+spring.kafka.consumer.group-id=neo4j_node_relation_dev
 
 spring.kafka.consumer.auto-offset-reset=earliest
 # 取消自动提交
@@ -56,3 +65,8 @@ spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.S
 
 #mongo
 spring.data.mongodb.uri=mongodb://itslaw:itslaw_168@dds-uf6ff5dfd9aef3641601-pub.mongodb.rds.aliyuncs.com:3717,dds-uf6ff5dfd9aef3642555-pub.mongodb.rds.aliyuncs.com:3717/itslaw?replicaSet=mgset-6501997
+
+#案件配置
+case.task.flow=inc_case_task
+case.task.taskName=inc_case_task_kafka
+case.task.topic=inc_case_id_dev

+ 13 - 8
src/main/resources/application-prd.properties

@@ -1,14 +1,14 @@
 #eureka.client.serviceUrl.defaultZone= http://192.168.1.1:8900/eureka/
 
-#Neo4j配置
-spring.data.neo4j.username=neo4j
-spring.data.neo4j.password=neo4j168
+#Neo4j配置(第一台机器)
+spring.data.neo4j.username.v1=neo4j
+spring.data.neo4j.password.v1=neo4j168
+spring.data.neo4j.uri.v1=bolt://192.168.2.57:7687
 
-#爬虫
-#spring.data.neo4j.uri=bolt://192.168.2.56:7687
-
-#neo4j_prd
-spring.data.neo4j.uri=bolt://192.168.2.57:7687
+#Neo4j配置(第二台机器)
+spring.data.neo4j.username.v2=neo4j
+spring.data.neo4j.password.v2=neo4j168
+spring.data.neo4j.uri.v2=bolt://192.168.2.60:7687
 
 
 #spring.datasource.url = jdbc:mysql://rm-uf61r3m23ba1p5z3dfo.mysql.rds.aliyuncs.com:3306/prism1?useUnicode=true&characterEncoding=utf-8
@@ -60,3 +60,8 @@ spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.S
 #mongo
 #spring.data.mongodb.uri=mongodb://itslaw:itslaw_168@dds-uf6ff5dfd9aef3641601-pub.mongodb.rds.aliyuncs.com:3717,dds-uf6ff5dfd9aef3642555-pub.mongodb.rds.aliyuncs.com:3717/itslaw?replicaSet=mgset-6501997&maxIdleTimeMS=3000
 spring.data.mongodb.uri=mongodb://itslaw:itslaw_168@dds-uf6ff5dfd9aef3641.mongodb.rds.aliyuncs.com:3717,dds-uf6ff5dfd9aef3642.mongodb.rds.aliyuncs.com:3717/itslaw?replicaSet=mgset-6501997
+
+#案件配置
+case.task.flow=inc_case_task
+case.task.taskName=inc_case_task_kafka
+case.task.topic=inc_case_id_prod

+ 299 - 0
src/test/java/com/winhc/PidUpdateExport.java

@@ -0,0 +1,299 @@
+package com.winhc;
+
+import cn.hutool.core.io.FileUtil;
+import com.alibaba.fastjson.JSON;
+import com.winhc.utils.CompanyUtils;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.LineIterator;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.Session;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.stream.Collectors;
+
+/**
+ * @author π
+ * @Description:
+ * @date 2021/2/26 13:53
+ */
+
+@RunWith(SpringRunner.class)
+@SpringBootTest
+@Slf4j
+public class PidUpdateExport {
+
+    @Autowired
+    @Qualifier("DriverV2")
+    Driver driver;
+
+    @Test
+    public void parseCSV() throws IOException {
+        String path = "D:\\data\\opt\\export\\error_pid_all.csv";
+        LineIterator it = FileUtils.lineIterator(FileUtil.file(path), "UTF-8");
+        try {
+            int i = 0;
+            List<Map<String, Object>> batch_list = new ArrayList<>();
+            while (it.hasNext()) {
+                String person_id = it.nextLine();
+                ++i;
+                if (i == 1) continue;
+                Map<String, Object> map = new HashMap<>();
+                map.put("person_id", person_id);
+                String message = JSON.toJSONString(map);
+                //System.out.println(message);
+                batch_list.add(map);
+                if(batch_list.size() == 5000){
+                    setLabel(batch_list, "个人", "update20210602");
+                    System.out.println("size: "+i);
+                    batch_list.clear();
+                }
+            }
+            if(batch_list.size() > 0){
+                setLabel(batch_list, "个人", "update20210602");
+                System.out.println("size: "+i);
+                batch_list.clear();
+            }
+            System.out.println(batch_list.size());
+        }catch (Exception e){
+            e.printStackTrace();
+            System.out.println(e.getMessage());
+        }finally {
+            LineIterator.closeQuietly(it);
+        }
+    }
+
+    public void setLabel(List<Map<String, Object>> batch_list, String fromLabel, String toLabel) {
+        long start = System.currentTimeMillis();
+        Session session = driver.session();
+        final String cql = "WITH  {batch_list} AS batch_list \n" +
+                "UNWIND batch_list AS row \n" +
+                "MATCH(s:" + fromLabel + "{person_id:row.person_id}) \n" +
+                "SET s:" + toLabel + " \n";
+        CompanyUtils.runNeo4j(session, cql, new HashMap<String, Object>() {{
+            put("batch_list", batch_list);
+        }});
+        log.info("consumer size: {}, cost: {}, cql:{}", batch_list.size(), (System.currentTimeMillis() - start), cql);
+        session.close();
+    }
+
+    @Test
+    public void parseTouZiCSV() throws IOException {
+        String path = "D:\\data\\opt\\export\\export-touzi-all.csv";
+        if (!CompanyUtils.isWindows()) {
+            path = "/data/opt/export/export-touzi-all.csv";
+        }
+        LineIterator it = FileUtils.lineIterator(FileUtil.file(path), "UTF-8");
+        try {
+            int i = 0;
+            List<Map<String, Object>> batch_list = new ArrayList<>();
+            while (it.hasNext()) {
+                String line = it.nextLine();
+                ++i;
+                if (i == 1) continue;
+                Map<String, Object> map = new HashMap<>();
+                List<String> list = Arrays.asList(line.split(",", -1)).stream()
+                        .map(x -> x.replaceAll("\"", "")).collect(Collectors.toList());
+                if (list.size() < 2) continue;
+                map.put("person_id", list.get(0));
+                map.put("person_name", list.get(1));
+                map.put("percent", list.get(2));
+                map.put("deleted", list.get(3));
+                map.put("company_id", list.get(4));
+                map.put("company_name", list.get(5));
+                String message = JSON.toJSONString(map);
+                //System.out.println(message);
+                batch_list.add(map);
+                if(batch_list.size() == 2000){
+                    setTouZiLabel(batch_list, "个人", "新增20210603");
+                    System.out.println("size: "+ i);
+                    batch_list.clear();
+                }
+            }
+            if(batch_list.size() > 0){
+                setTouZiLabel(batch_list, "个人", "新增20210603");
+                System.out.println("size: "+ i);
+                batch_list.clear();
+            }
+        }catch (Exception e){
+            e.printStackTrace();
+            System.out.println(e.getMessage());
+        } finally {
+            LineIterator.closeQuietly(it);
+        }
+    }
+
+
+    public void setTouZiLabel(List<Map<String, Object>> batch_list, String fromLabel, String toLabel) {
+        long start = System.currentTimeMillis();
+        Session session = driver.session();
+
+        final String cql = "\nWITH  {batch_list} AS batch_list \n" +
+                "UNWIND batch_list AS row \n" +
+                "MERGE(s:" + fromLabel + "{person_id:row.person_id}) \n" +
+                "SET s.name=row.person_name, s.person_id=row.person_id \n" +
+                "SET s:" + toLabel + " \n" +
+                "MERGE(e:企业{company_id:row.company_id}) \n" +
+                "SET e.name=row.company_name, e.company_id=row.company_id \n" +
+                "WITH s,e,row \n" +
+                "MERGE(s)-[r:投资]->(e) \n" +
+                "SET r.percent=row.percent, r.deleted=row.deleted \n";
+
+        CompanyUtils.runNeo4j(session, cql, new HashMap<String, Object>() {{
+            put("batch_list", batch_list);
+        }});
+        log.info("consumer size: {}, cost: {}, cql:{}", batch_list.size(), (System.currentTimeMillis() - start), cql);
+        session.close();
+    }
+
+    @Test
+    public void parseFaRenCSV() throws IOException {
+        String path = "D:\\data\\opt\\export\\export-faren-all.csv";
+        if (!CompanyUtils.isWindows()) {
+            path = "/data/opt/export/export-faren-all.csv";
+        }
+        LineIterator it = FileUtils.lineIterator(FileUtil.file(path), "UTF-8");
+        try {
+            int i = 0;
+            List<Map<String, Object>> batch_list = new ArrayList<>();
+            while (it.hasNext()) {
+                String line = it.nextLine();
+                ++i;
+                if (i == 1) continue;
+                Map<String, Object> map = new HashMap<>();
+                List<String> list = Arrays.asList(line.split(",", -1)).stream()
+                        .map(x -> x.replaceAll("\"", "")).collect(Collectors.toList());
+                if (list.size() < 2) continue;
+                map.put("person_id", list.get(0));
+                map.put("person_name", list.get(1));
+                map.put("deleted", list.get(2));
+                map.put("company_id", list.get(3));
+                map.put("company_name", list.get(4));
+                String message = JSON.toJSONString(map);
+                //System.out.println(message);
+                batch_list.add(map);
+                if(batch_list.size() == 2000){
+                    setFaRenLabel(batch_list, "个人", "新增20210603");
+                    System.out.println("size: "+ i);
+                    batch_list.clear();
+                }
+            }
+            if(batch_list.size() > 0){
+                setFaRenLabel(batch_list, "个人", "新增20210603");
+                System.out.println("size: "+ i);
+                batch_list.clear();
+            }
+        }catch (Exception e){
+            e.printStackTrace();
+            System.out.println(e.getMessage());
+        } finally {
+            LineIterator.closeQuietly(it);
+        }
+    }
+
+
+    public void setFaRenLabel(List<Map<String, Object>> batch_list, String fromLabel, String toLabel) {
+        long start = System.currentTimeMillis();
+        Session session = driver.session();
+
+        final String cql = "\nWITH  {batch_list} AS batch_list \n" +
+                "UNWIND batch_list AS row \n" +
+                "MERGE(s:" + fromLabel + "{person_id:row.person_id}) \n" +
+                "SET s.name=row.person_name, s.person_id=row.person_id \n" +
+                "SET s:" + toLabel + " \n" +
+                "MERGE(e:企业{company_id:row.company_id}) \n" +
+                "SET e.name=row.company_name, e.company_id=row.company_id \n" +
+                "WITH s,e,row \n" +
+                "MERGE(s)-[r:法人]->(e) \n" +
+                "SET r.deleted=row.deleted \n";
+
+        CompanyUtils.runNeo4j(session, cql, new HashMap<String, Object>() {{
+            put("batch_list", batch_list);
+        }});
+        log.info("consumer size: {}, cost: {}, cql:{}", batch_list.size(), (System.currentTimeMillis() - start), cql);
+        session.close();
+    }
+
+    @Test
+    public void parseGaoGuanCSV() throws IOException {
+        String path = "D:\\data\\opt\\export\\export-gaoguan-all.csv";
+        if (!CompanyUtils.isWindows()) {
+            path = "/data/opt/export/export-gaoguan-all.csv";
+        }
+        LineIterator it = FileUtils.lineIterator(FileUtil.file(path), "UTF-8");
+        try {
+            int i = 0;
+            List<Map<String, Object>> batch_list = new ArrayList<>();
+            while (it.hasNext()) {
+                String line = it.nextLine();
+                ++i;
+                if (i == 1) continue;
+                Map<String, Object> map = new HashMap<>();
+                List<String> list = Arrays.asList(line.split(",", -1)).stream()
+                        .map(x -> x.replaceAll("\"", "")).collect(Collectors.toList());
+                if (list.size() < 2) continue;
+                map.put("person_id", list.get(0));
+                map.put("person_name", list.get(1));
+                map.put("staff_type", list.get(2));
+                map.put("deleted", list.get(3));
+                map.put("company_id", list.get(4));
+                map.put("company_name", list.get(5));
+                String message = JSON.toJSONString(map);
+                //System.out.println(message);
+                batch_list.add(map);
+                if(batch_list.size() == 4000){
+                    setGaoGuanLabel(batch_list, "个人", "新增20210603");
+                    System.out.println("size: "+ i);
+                    batch_list.clear();
+                }
+            }
+            if(batch_list.size() > 0){
+                setGaoGuanLabel(batch_list, "个人", "新增20210603");
+                System.out.println("size: "+ i);
+                batch_list.clear();
+            }
+        }catch (Exception e){
+            e.printStackTrace();
+            System.out.println(e.getMessage());
+        } finally {
+            LineIterator.closeQuietly(it);
+        }
+    }
+
+    public void setGaoGuanLabel(List<Map<String, Object>> batch_list, String fromLabel, String toLabel) {
+        long start = System.currentTimeMillis();
+        Session session = driver.session();
+
+        final String cql = "\nWITH  {batch_list} AS batch_list \n" +
+                "UNWIND batch_list AS row \n" +
+                "MERGE(s:" + fromLabel + "{person_id:row.person_id}) \n" +
+                "SET s.name=row.person_name, s.person_id=row.person_id \n" +
+                "SET s:" + toLabel + " \n" +
+                "MERGE(e:企业{company_id:row.company_id}) \n" +
+                "SET e.name=row.company_name, e.company_id=row.company_id \n" +
+                "WITH s,e,row \n" +
+                "MERGE(s)-[r:高管]->(e) \n" +
+                "SET r.staff_type=row.staff_type , r.deleted=row.deleted \n";
+        CompanyUtils.runNeo4j(session, cql, new HashMap<String, Object>() {{
+            put("batch_list", batch_list);
+        }});
+        log.info("consumer size: {}, cost: {}, cql:{}", batch_list.size(), (System.currentTimeMillis() - start), cql);
+        session.close();
+    }
+
+    @Test
+    public void init() throws IOException {
+        parseTouZiCSV();
+        parseFaRenCSV();
+        parseGaoGuanCSV();
+    }
+
+}