Browse Source

增加全量合并

xufei 3 năm trước cách đây
mục cha
commit
08fbf7ab71

+ 1 - 0
src/main/java/com/winhc/common/constant/Base.java

@@ -11,5 +11,6 @@ import java.util.concurrent.ForkJoinPool;
 public class Base {
     public static ForkJoinPool TASK_FJ_POOL = new ForkJoinPool(Runtime.getRuntime().availableProcessors() * 3);
     public static String PID_WAIT_MERGE_V9 = "pid_wait_merge_v9";
+    public static String PID_ALL_PERSON_MERGE_V9 = "pid_all_person_merge_v9";
 
 }

+ 3 - 1
src/main/java/com/winhc/common/enums/CompanyEnum.java

@@ -29,7 +29,9 @@ public class CompanyEnum {
         CASE_NODE_INCR("500", "caseNodeIncrImpl"),
         CASE_RELATION_UPDATE("600", "caseRelationUpdateImpl"),
         CASE_NODE_UPDATE("700", "caseNodeUpdateImpl"),
-        PERSON_MERGE_V2("800", "personMergeV2Impl")
+        PERSON_MERGE_V2("800", "personMergeV2Impl"),
+        PERSON_MERGE_All("900", "personMergeAllImpl"),
+        PERSON_MERGE_All_V2("1000", "personMergeAllV2Impl")
         ;
 
         public final String CODE;

+ 1 - 1
src/main/java/com/winhc/kafka/KafkaProduce.java

@@ -37,7 +37,7 @@ public class KafkaProduce {
     public void produce(String topic,String message) {
         ListenableFuture<SendResult<String, String>> send = kafkaTemplate.send(topic, message);
         send.addCallback(o -> {
-            log.info("send success:" + message);
+            //log.info("send success:" + message);
         }, throwable -> {
             log.error("send error:" + message);
         });

+ 5 - 0
src/main/java/com/winhc/kafka/consumer/KafkaConsumerNeo4jV2.java

@@ -23,6 +23,8 @@ import org.springframework.stereotype.Service;
 import java.util.List;
 import java.util.Map;
 
+import static com.winhc.utils.CompanyUtils.isWindows;
+
 /**
  * @author π
  * @Description:
@@ -59,6 +61,8 @@ public class KafkaConsumerNeo4jV2 {
         this.map.get(CompanyEnum.TopicType.STAFF_RELATION.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.STAFF_RELATION.CODE));
         this.map.get(CompanyEnum.TopicType.PERSON_NODE_LABEL.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.PERSON_NODE_LABEL.CODE));
         this.map.get(CompanyEnum.TopicType.PERSON_MERGE_V2.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.PERSON_MERGE_V2.CODE));
+        //this.map.get(CompanyEnum.TopicType.PERSON_MERGE_All.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.PERSON_MERGE_All.CODE));
+        this.map.get(CompanyEnum.TopicType.PERSON_MERGE_All_V2.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.PERSON_MERGE_All_V2.CODE));
 
         //todo 等两分钟,再触发合并程序
         this.pidToMongoService.save(CompanyUtils.getMergeIds2(listMap));
@@ -76,6 +80,7 @@ public class KafkaConsumerNeo4jV2 {
     public ConsumerAwareListenerErrorHandler dealError() {
         return (message, e, consumer) -> {
             List<String> list = CompanyUtils.toMessage((List<ConsumerRecord>) message.getPayload());
+            if(isWindows()) return null;
             for (String msg : list) {
                 kafkaProduce.produce(configConstant.topic_node_relation_union, msg);
             }

+ 37 - 0
src/main/java/com/winhc/pojo/MergeAllPerson.java

@@ -0,0 +1,37 @@
+package com.winhc.pojo;
+
+import com.alibaba.fastjson.annotation.JSONField;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+/**
+ * @author π
+ * @Description:
+ * @date 2022/4/22 15:45
+ */
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class MergeAllPerson {
+    public String new_human_pid;
+    public int new_cnt;
+    public String old_human_pid;
+    public int old_cnt;
+
+    @JSONField(serialize = false)
+    public String getId() {
+        return new_human_pid + old_human_pid;
+    }
+
+    @Override
+    public String toString() {
+        return "MergeAllPerson{" +
+                "new_human_pid='" + new_human_pid + '\'' +
+                ", new_cnt=" + new_cnt +
+                ", old_human_pid='" + old_human_pid + '\'' +
+                ", old_cnt=" + old_cnt +
+                '}';
+    }
+}

+ 73 - 0
src/main/java/com/winhc/service/impl/PersonMergeAllImpl.java

@@ -0,0 +1,73 @@
+package com.winhc.service.impl;
+
+import com.winhc.common.enums.CompanyEnum;
+import com.winhc.config.ConfigConstant;
+import com.winhc.kafka.KafkaProduce;
+import com.winhc.service.RelationService;
+import com.winhc.utils.CompanyUtils;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.Session;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.stereotype.Service;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static com.winhc.utils.CompanyUtils.mergePerson;
+
+/**
+ * @author π
+ * @Description:全量人员合并处理逻辑
+ * @date 2021/1/11 10:03
+ */
+@Slf4j
+@Service("personMergeAllImpl")
+@AllArgsConstructor
+public class PersonMergeAllImpl implements RelationService {
+
+    @Autowired
+    @Qualifier("DriverV1")
+    Driver driver;
+    @Autowired
+    KafkaProduce kafkaProduce;
+    @Autowired
+    ConfigConstant configConstant;
+
+    @Override
+    public String save(List<Map<String, Object>> batch_list) {
+        if (batch_list.isEmpty()) return null;
+        long start = System.currentTimeMillis();
+        Session session = driver.session();
+        //1.查询关联人员
+        final String query_cql =
+                "\nWITH  {batch_list} AS batch_list \n" +
+                        "UNWIND batch_list AS row \n" +
+                        "MATCH (c1:" + CompanyEnum.Lable.COMPANY.code + "{company_id:row.first_company_id})-[r1]-(p1:" + CompanyEnum.Lable.PERSON.code + "{name:row.first_human_name})\n" +
+                        "MATCH (p1)-[r2]-(c2:" + CompanyEnum.Lable.COMPANY.code + ")\n" +
+                        ",(c3:" + CompanyEnum.Lable.COMPANY.code + " {company_id:row.second_company_id})-[r3]-(p3:" + CompanyEnum.Lable.PERSON.code + "{name:row.second_human_name})\n" +
+                        "MATCH (p3)-[r4]-(c4:" + CompanyEnum.Lable.COMPANY.code + ")\n" +
+                        "RETURN p1.person_id as new_human_pid, p3.person_id  as old_human_pid, count(distinct c2) as new_cnt, count(distinct c4) as old_cnt";
+        log.info("query size: {}, cql:{}", batch_list.size(), query_cql);
+        List<Map<String, String>> mergeData = CompanyUtils.writeNeo4j3(session, query_cql, new HashMap<String, Object>() {{
+            put("batch_list", batch_list);
+        }});
+
+        //2.合并逻辑
+        String data = mergePerson(session, mergeData);
+        session.close();
+        log.info("class:{} | save size:{} |  merge size:{} |cost:{}", PersonMergeAllImpl.class.getSimpleName(), batch_list.size(), mergeData.size(), (System.currentTimeMillis() - start));
+
+        //3.发送变更人员记录
+        if (StringUtils.isNotBlank(data)) {
+            kafkaProduce.produce(configConstant.topic_pid_update_v1, data);
+        }
+        return data;
+    }
+
+
+}

+ 72 - 0
src/main/java/com/winhc/service/impl/PersonMergeAllV2Impl.java

@@ -0,0 +1,72 @@
+package com.winhc.service.impl;
+
+import com.winhc.common.enums.CompanyEnum;
+import com.winhc.config.ConfigConstant;
+import com.winhc.kafka.KafkaProduce;
+import com.winhc.service.RelationService;
+import com.winhc.utils.CompanyUtils;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.Session;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.stereotype.Service;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author π
+ * @Description:全量人员合并处理逻辑
+ * @date 2022/5/12 10:03
+ */
+@Slf4j
+@Service("personMergeAllV2Impl")
+@AllArgsConstructor
+public class PersonMergeAllV2Impl implements RelationService {
+
+    @Autowired
+    @Qualifier("DriverV1")
+    Driver driver;
+    @Autowired
+    KafkaProduce kafkaProduce;
+    @Autowired
+    ConfigConstant configConstant;
+
+    @Override
+    public String save(List<Map<String, Object>> batch_list) {
+        if (batch_list.isEmpty()) return null;
+        long start = System.currentTimeMillis();
+        Session session = driver.session();
+        //1.查询关联人员
+        final String merge_cql =
+                "\nWITH  {merge_list} AS batch_list \n" +
+                        "UNWIND batch_list AS row\n" +
+                        "MATCH (c0:" + CompanyEnum.Lable.COMPANY.code + "{company_id:row.first_company_id})-[r0]-(p:" + CompanyEnum.Lable.PERSON.code + "{name:row.first_human_name})\n" +
+                        ", (q:" + CompanyEnum.Lable.PERSON.code + " {person_id:row.second_human_pid})-[r]-(x:" + CompanyEnum.Lable.COMPANY.code + ")\n" +
+                        "WHERE p.person_id <> q.person_id\n" +
+                        "WITH p as first_node, q as other_node, x, r\n" +
+                        "CALL apoc.merge.relationship.eager(first_node, TYPE(r), properties(r),{}, x,{}) YIELD rel\n" +
+                        "SET first_node:" + CompanyUtils.getIncrPersonLabelV2("新增", CompanyEnum.SPLIT_HOUR) + "\n" +
+                        "SET other_node:" + CompanyUtils.getIncrPersonLabelV2("删除", CompanyEnum.SPLIT_HOUR) + "\n" +
+                        "DELETE r\n" +
+                        "RETURN first_node.person_id as new_human_pid,first_node.name as new_human_name,other_node.person_id as old_human_pid,other_node.name as old_human_name";
+        log.info("merge size: {}, cql2:{}", batch_list.size(), merge_cql);
+        String data = CompanyUtils.writeNeo4j2(session, merge_cql, new HashMap<String, Object>() {{
+            put("merge_list", batch_list);
+        }});
+
+        session.close();
+        log.info("class:{} | save size:{} |  merge size:{} |cost:{}", PersonMergeAllV2Impl.class.getSimpleName(), batch_list.size(), batch_list.size(), (System.currentTimeMillis() - start));
+
+        //2.发送变更人员记录
+        if (StringUtils.isNotBlank(data)) {
+            kafkaProduce.produce(configConstant.topic_pid_update_v1, data);
+        }
+        return data;
+    }
+
+
+}

+ 3 - 17
src/main/java/com/winhc/service/impl/PersonMergeV2Impl.java

@@ -20,6 +20,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static com.winhc.utils.CompanyUtils.mergePerson;
+
 /**
  * @author π
  * @Description:人员合并
@@ -60,23 +62,7 @@ public class PersonMergeV2Impl implements RelationService {
         }});
 
         //2.合并逻辑
-        String data = null;
-        if (!mergeData.isEmpty()) {
-            final String merge_cql =
-                    "\nWITH  {merge_list} AS batch_list \n" +
-                            "UNWIND batch_list AS row\n" +
-                            "MATCH (p:个人 {person_id:row.new_human_pid}),(q:个人 {person_id:row.old_human_pid})-[r]-(x:企业)\n" +
-                            "WITH p as first_node, q as other_node, x, r\n" +
-                            "CALL apoc.merge.relationship.eager(first_node, TYPE(r), properties(r),{}, x,{}) YIELD rel\n" +
-                            "SET first_node:" + CompanyUtils.getIncrPersonLabelV2("新增", CompanyEnum.SPLIT_HOUR) + "\n" +
-                            "SET other_node:" + CompanyUtils.getIncrPersonLabelV2("删除", CompanyEnum.SPLIT_HOUR) + "\n" +
-                            "DELETE r\n" +
-                            "RETURN first_node.person_id as new_human_pid,first_node.name as new_human_name,other_node.person_id as old_human_pid,other_node.name as old_human_name";
-            log.info("merge size: {}, cql2:{}", mergeData.size(), merge_cql);
-            data = CompanyUtils.writeNeo4j2(session, merge_cql, new HashMap<String, Object>() {{
-                put("merge_list", mergeData);
-            }});
-        }
+        String data = mergePerson(session, mergeData);
         session.close();
         log.info("class:{} | save size:{} |  merge size:{} |cost:{}", PersonMergeV2Impl.class.getSimpleName(), batch_list.size(), mergeData.size(), (System.currentTimeMillis() - start));
 

+ 3 - 1
src/main/java/com/winhc/service/impl/PidToMongoServiceImpl.java

@@ -38,7 +38,9 @@ public class PidToMongoServiceImpl implements PidToMongoService {
     public Integer save(List<Map<String, Object>> ids) {
         BulkWriteResult bulkWriteResult = null;
         if (ids.size() > 0) {
-            MongoCollection<Document> pid = mongoTemplate.getCollection(Base.PID_WAIT_MERGE_V9);
+            //todo 保存临时表,注意替换
+            //MongoCollection<Document> pid = mongoTemplate.getCollection(Base.PID_WAIT_MERGE_V9);
+            MongoCollection<Document> pid = mongoTemplate.getCollection("pid_wait_merge_v9_tmp");
 //            List<Document> documents = messages.stream().map(Document::new).collect(Collectors.toList());
 //            pid.insertMany(documents, new InsertManyOptions().ordered(false));
             List<WriteModel<Document>> updateDocuments = new ArrayList<>();

+ 67 - 0
src/main/java/com/winhc/task/AsynAllPersonMergeTask.java

@@ -0,0 +1,67 @@
+package com.winhc.task;
+
+import com.alibaba.fastjson.JSON;
+import com.mongodb.client.FindIterable;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.Filters;
+import com.winhc.common.constant.Base;
+import com.winhc.config.ConfigConstant;
+import com.winhc.kafka.KafkaProduce;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+import org.bson.types.ObjectId;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.data.mongodb.core.MongoTemplate;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static com.mongodb.client.model.Filters.in;
+import static com.winhc.utils.DateUtil.getMinuteTime;
+
+/**
+ * @author π
+ * @Description:天眼查线索数据合并
+ * @date 2021/6/22 17:07
+ */
+
+@Component
+@Slf4j
+@EnableScheduling
+@AllArgsConstructor
+@ConditionalOnProperty(prefix = "scheduling", name = "enabled", havingValue = "true")
+public class AsynAllPersonMergeTask {
+
+    private final MongoTemplate mongoTemplate;
+    private final KafkaProduce kafkaProduce;
+    @Autowired
+    ConfigConstant configConstant;
+
+    //@Scheduled(cron = "50 20 17 07 05 ?")
+    @Scheduled(cron = "*/10 * * * * ?")
+    //@Scheduled(cron = "0 /2 * * * ? ")
+    public void start() throws InterruptedException {
+        log.info("start AsynAllPersonMergeTask !!! ");
+        MongoCollection<Document> collection = mongoTemplate.getCollection(Base.PID_ALL_PERSON_MERGE_V9);
+        //1.遍历mongo
+        List<ObjectId> ids = new ArrayList<>();
+        for (Document d : collection.find().batchSize(200).noCursorTimeout(true).limit(200)) {
+            ids.add(new ObjectId(d.get("_id").toString()));
+            d.remove("_id");
+            kafkaProduce.produce(configConstant.topic_node_relation_union, JSON.toJSONString(d));
+        }
+        //2.成功后删除
+        if (!ids.isEmpty()) {
+            long deleteResult = collection.deleteMany(in("_id", ids)).getDeletedCount();
+            log.info("deleted size : {} ,ids : {}", deleteResult, ids.get(0));
+        }
+        log.info("stop AsynAllPersonMergeTask !!! ");
+
+    }
+}

+ 3 - 1
src/main/java/com/winhc/task/AsynMergePersonTask.java

@@ -20,6 +20,7 @@ import org.springframework.stereotype.Component;
 import java.util.ArrayList;
 import java.util.List;
 import static com.mongodb.client.model.Filters.in;
+import static com.winhc.utils.CompanyUtils.isWindows;
 import static com.winhc.utils.DateUtil.getMinuteTime;
 
 /**
@@ -40,9 +41,10 @@ public class AsynMergePersonTask {
     @Autowired
     ConfigConstant configConstant;
 
-    @Scheduled(cron = "*/10 * * * * ?")
+    @Scheduled(cron = "*/15 * * * * ?")
     //@Scheduled(cron = "0 /2 * * * ? ")
     public void start() throws InterruptedException {
+        if(isWindows()) return;
         MongoCollection<Document> collection = mongoTemplate.getCollection(Base.PID_WAIT_MERGE_V9);
         while (true) {
             //1.查询mongo 2分钟之前数据

+ 107 - 9
src/main/java/com/winhc/utils/CompanyUtils.java

@@ -1,6 +1,5 @@
 package com.winhc.utils;
 
-import cn.hutool.core.lang.Tuple;
 import cn.hutool.json.JSONUtil;
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONArray;
@@ -9,7 +8,9 @@ import com.alibaba.fastjson.serializer.SerializerFeature;
 import com.google.common.collect.ImmutableMap;
 import com.winhc.common.enums.CompanyEnum;
 import com.winhc.db.mongodb.dataobject.NodeRelationError;
+import com.winhc.pojo.MergeAllPerson;
 import com.winhc.pojo.MergePerson;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.neo4j.driver.Record;
 import org.neo4j.driver.Result;
@@ -19,18 +20,20 @@ import java.util.*;
 import java.util.stream.Collectors;
 
 import static com.winhc.utils.DateUtil.formatDate_YYYY_MM_DD_HH_MM_SS;
+import static java.util.stream.Collectors.toList;
 
 /**
  * @author π
  * @Description:
  * @date 2021/1/11 21:31
  */
+@Slf4j
 public class CompanyUtils {
     public static List<Map<String, Object>> map(List<ConsumerRecord<?, ?>> records) {
         return records.stream().filter(r -> (r != null && r.value() != null)).map(r -> {
             Map<String, Object> m = JSONUtil.parseObj(r.value().toString());
             return m;
-        }).collect(Collectors.toList());
+        }).collect(toList());
     }
 
     public static List<Map<String, Object>> mapPlus(List<ConsumerRecord<?, ?>> records) {
@@ -48,7 +51,7 @@ public class CompanyUtils {
             map.put("name", person_name);
             map.put("company_ids", listToString(list));
             return map;
-        }).collect(Collectors.toList());
+        }).collect(toList());
     }
 
     public static List<Map<String, Object>> mapPlusV1(List<ConsumerRecord<?, ?>> records) {
@@ -61,7 +64,7 @@ public class CompanyUtils {
             map.put("merge_human_pid", merge_human_pid);
             map.put("deleted_human_pid", deleted_human_pid);
             return map;
-        }).collect(Collectors.toList());
+        }).collect(toList());
     }
 
     public static String listToString(List<String> list) {
@@ -77,7 +80,7 @@ public class CompanyUtils {
     }
 
     public static List<Map<String, Object>> filterList(List<Map<String, Object>> list, String type) {
-        return list.stream().filter(r -> (r.getOrDefault("topic_type", "-1").equals(type))).collect(Collectors.toList());
+        return list.stream().filter(r -> (r.getOrDefault("topic_type", "-1").equals(type))).collect(toList());
     }
 
     public static List<String> getMergeIds(List<Map<String, Object>> list) {
@@ -92,7 +95,7 @@ public class CompanyUtils {
                             , "topic_type", "800"
                     );
                     return JSONObject.toJSONString(m2, SerializerFeature.WriteMapNullValue);
-                }).collect(Collectors.toList());
+                }).collect(toList());
     }
 
     public static List<Map<String, Object>> getMergeIds2(List<Map<String, Object>> list) {
@@ -112,7 +115,7 @@ public class CompanyUtils {
                                     , "topic_type", "800"
                             )
                     );
-                }).collect(Collectors.toList());
+                }).collect(toList());
     }
 
     public static List<NodeRelationError> toMessage(List<ConsumerRecord> records, String errorMessage) {
@@ -129,14 +132,14 @@ public class CompanyUtils {
             n1.setStatus(0);
             n1.setTopicType(String.valueOf(topic_type));
             return n1;
-        }).collect(Collectors.toList());
+        }).collect(toList());
     }
 
     public static List<String> toMessage(List<ConsumerRecord> records) {
         return records.stream()
                 .filter(r -> (r != null && r.value() != null))
                 .map(r -> r.value().toString())
-                .collect(Collectors.toList());
+                .collect(toList());
     }
 
     public static String writeNeo4j(Session session, String cql, HashMap<String, Object> parameters) {
@@ -190,6 +193,101 @@ public class CompanyUtils {
 
     }
 
+    public static List<Map<String, String>> writeNeo4j3(Session session, String cql, HashMap<String, Object> parameters) {
+        List<Record> dataList = session.writeTransaction(tx -> {
+            Result result = tx.run(cql, parameters);
+            return result.list();
+        });
+        Set<String> newIds = new HashSet<>();
+        Set<String> oldIds = new HashSet<>();
+        List<Map<String, String>> list = dataList.stream()
+                .map(record -> JSON.parseObject(JSON.toJSONString(record.asMap()), MergeAllPerson.class))
+                .filter(m -> !m.getNew_human_pid().equals(m.getOld_human_pid()))
+                .map(x -> {
+                    HashMap<String, String> map = new HashMap<>();
+                    if (x.getNew_cnt() >= x.getOld_cnt()) {
+                        map.put("new_human_pid", x.getNew_human_pid());
+                        map.put("old_human_pid", x.getOld_human_pid());
+                        newIds.add(x.getNew_human_pid());
+                        oldIds.add(x.getOld_human_pid());
+                    } else {
+                        map.put("new_human_pid", x.getOld_human_pid());
+                        map.put("old_human_pid", x.getNew_human_pid());
+                        newIds.add(x.getOld_human_pid());
+                        oldIds.add(x.getNew_human_pid());
+                    }
+                    return map;
+                })
+                .collect(
+                        Collectors.collectingAndThen(
+                                Collectors.toCollection(
+                                        () -> new TreeSet<>(Comparator.comparing(m -> m.get("new_human_pid") + m.get("old_human_pid")))
+                                ), ArrayList::new
+                        )
+                );
+        //需要查询id
+        List<String> in = newIds.stream().filter(oldIds::contains).collect(toList());
+
+        return list.stream().peek(m -> {
+            String new_human_pid = m.get("new_human_pid");
+            if (in.contains(new_human_pid)) {
+                String newPid = getNewPid(list, new_human_pid);
+                m.put("new_human_pid", newPid);
+            }
+        }).collect(
+                Collectors.collectingAndThen(
+                        Collectors.toCollection(
+                                () -> new TreeSet<>(Comparator.comparing(m -> m.get("new_human_pid") + m.get("old_human_pid")))
+                        ), ArrayList::new
+                )
+        );
+    }
+
+    public static String getNewPid(List<Map<String, String>> list, String newHumanPid) {
+        Set<String> tmpIds = new HashSet<>(Collections.singletonList(newHumanPid));
+        String tmpId = newHumanPid;
+        while (true) {
+            String queryId = getQuery(list, tmpId);
+            if (tmpIds.contains(queryId) || tmpId.equalsIgnoreCase(queryId)) {
+                return queryId;
+            }
+            tmpId = queryId;
+            tmpIds.add(queryId);
+        }
+
+    }
+
+    public static String getQuery(List<Map<String, String>> list, String id) {
+        Map<String, String> map = list.stream().filter(m -> m.get("old_human_pid").equalsIgnoreCase(id)).findFirst().orElse(null);
+        if (map != null && !map.isEmpty()) {
+            return map.get("new_human_pid");
+        } else {
+            return id;
+        }
+    }
+
+   public static String mergePerson(Session session, List<Map<String, String>> mergeData) {
+       String data = null;
+        if (!mergeData.isEmpty()) {
+            final String merge_cql =
+                    "\nWITH  {merge_list} AS batch_list \n" +
+                            "UNWIND batch_list AS row\n" +
+                            "MATCH (p:" + CompanyEnum.Lable.PERSON.code + " {person_id:row.new_human_pid}),(q:" + CompanyEnum.Lable.PERSON.code + " {person_id:row.old_human_pid})-[r]-(x:" + CompanyEnum.Lable.COMPANY.code + ")\n" +
+                            "WITH p as first_node, q as other_node, x, r\n" +
+                            "CALL apoc.merge.relationship.eager(first_node, TYPE(r), properties(r),{}, x,{}) YIELD rel\n" +
+                            "SET first_node:" + CompanyUtils.getIncrPersonLabelV2("新增", CompanyEnum.SPLIT_HOUR) + "\n" +
+                            "SET other_node:" + CompanyUtils.getIncrPersonLabelV2("删除", CompanyEnum.SPLIT_HOUR) + "\n" +
+                            "DELETE r\n" +
+                            "RETURN first_node.person_id as new_human_pid,first_node.name as new_human_name,other_node.person_id as old_human_pid,other_node.name as old_human_name";
+            log.info("merge size: {}, cql2:{}", mergeData.size(), merge_cql);
+            data = CompanyUtils.writeNeo4j2(session, merge_cql, new HashMap<String, Object>() {{
+                put("merge_list", mergeData);
+            }});
+        }
+        return data;
+    }
+
+
     public static String getIncrPersonLabel() {
         return CompanyEnum.Lable.新增.code + DateUtil.getDateBefore(-1).replace("-", "");
     }

+ 1 - 1
src/main/resources/application-dev.properties

@@ -33,7 +33,7 @@ spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
 #spring.datasource.username = firefly
 #spring.datasource.password = firefly
 
-scheduling.enabled = false
+scheduling.enabled = true
 
 
 

+ 3 - 0
src/test/java/com/winhc/test/TestMongo.java

@@ -9,6 +9,7 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.data.mongodb.core.MongoTemplate;
 import org.springframework.test.context.junit4.SpringRunner;
 
 import java.util.*;
@@ -32,6 +33,8 @@ public class TestMongo {
     @Autowired
     PidToMongoService pidToMongoService;
 
+
+
     @Test
     public void saveData() {
         ArrayList<NodeRelationError> list = new ArrayList<>();

+ 271 - 29
src/test/java/com/winhc/test/TestSendKafka.java

@@ -9,7 +9,10 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.data.mongodb.core.MongoTemplate;
 import org.springframework.test.context.junit4.SpringRunner;
+
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -24,17 +27,20 @@ public class TestSendKafka {
     @Autowired
     KafkaProduce kafkaProduce;
 
+    @Autowired
+    MongoTemplate mongoTemplate;
+
     @Test
     public void sendKafka() {
         String topic = "inc_judicial_case_monitor";
         long start = System.currentTimeMillis();
         HashMap<Object, Object> map = new HashMap<>();
         HashMap<Object, Object> map1 = new HashMap<>();
-        map.put("topic_type","1");
-        map1.put("justice_case_id","221111486");
-        map1.put("dimension_id","b97afac1bc2c9d6755413fa0069699e0");
-        map1.put("dimension_type","7");
-        map.put("data",map1);
+        map.put("topic_type", "1");
+        map1.put("justice_case_id", "221111486");
+        map1.put("dimension_id", "b97afac1bc2c9d6755413fa0069699e0");
+        map1.put("dimension_type", "7");
+        map.put("data", map1);
         String msg = JSON.toJSONString(map);
         System.out.println(msg);
         //kafkaProduce.produce(topic, msg);
@@ -44,46 +50,46 @@ public class TestSendKafka {
     @Test
     public void sendMergePerson() {
         String topic = "test3";
-        Map<String,String> m = new HashMap<String,String>(){
+        Map<String, String> m = new HashMap<String, String>() {
             {
-                put("person_id","p2");
-                put("name","钱胜前");
-                put("topic_type","800");
+                put("person_id", "p2");
+                put("name", "钱胜前");
+                put("topic_type", "800");
             }
         };
-        Map<String,String> m0 = new HashMap<String,String>(){
+        Map<String, String> m0 = new HashMap<String, String>() {
             {
-                put("person_id","p1");
-                put("name","钱胜前");
-                put("topic_type","800");
+                put("person_id", "p1");
+                put("name", "钱胜前");
+                put("topic_type", "800");
             }
         };
-        Map<String,String> m1 = new HashMap<String,String>(){
+        Map<String, String> m1 = new HashMap<String, String>() {
             {
-                put("person_id","p1");
-                put("name","钱胜前");
-                put("topic_type","800");
+                put("person_id", "p1");
+                put("name", "钱胜前");
+                put("topic_type", "800");
             }
         };
-        Map<String,String> m2 = new HashMap<String,String>(){
+        Map<String, String> m2 = new HashMap<String, String>() {
             {
-                put("person_id","p6");
-                put("name","曾妍");
-                put("topic_type","800");
+                put("person_id", "p6");
+                put("name", "曾妍");
+                put("topic_type", "800");
             }
         };
-        Map<String,String> m3 = new HashMap<String,String>(){
+        Map<String, String> m3 = new HashMap<String, String>() {
             {
-                put("person_id","p4");
-                put("name","曾妍");
-                put("topic_type","800");
+                put("person_id", "p4");
+                put("name", "曾妍");
+                put("topic_type", "800");
             }
         };
-        Map<String,String> m4 = new HashMap<String,String>(){
+        Map<String, String> m4 = new HashMap<String, String>() {
             {
-                put("person_id","p5");
-                put("name","杨凡");
-                put("topic_type","800");
+                put("person_id", "p5");
+                put("name", "杨凡");
+                put("topic_type", "800");
             }
         };
         kafkaProduce.produce(topic, JSONObject.toJSONString(m));
@@ -109,4 +115,240 @@ public class TestSendKafka {
     }
 
 
+    @Test
+    public void sendMergeAllPerson() {
+        String topic = "test3";
+        Map<String, String> m = new HashMap<String, String>() {
+            {
+                put("first_company_id", "c3");
+                put("first_human_name", "钱胜前");
+                put("second_company_id", "c1");
+                put("second_human_name", "钱胜前");
+                put("topic_type", "900");
+            }
+        };
+        Map<String, String> m0 = new HashMap<String, String>() {
+            {
+                put("first_company_id", "c3");
+                put("first_human_name", "钱胜前");
+                put("second_company_id", "c3");
+                put("second_human_name", "钱胜前");
+                put("topic_type", "900");
+            }
+        };
+        Map<String, String> m1 = new HashMap<String, String>() {
+            {
+                put("first_company_id", "c2");
+                put("first_human_name", "钱胜前");
+                put("second_company_id", "c1");
+                put("second_human_name", "钱胜前");
+                put("topic_type", "900");
+            }
+        };
+
+        Map<String, String> m2 = new HashMap<String, String>() {
+            {
+                put("first_company_id", "c1");
+                put("first_human_name", "曾妍");
+                put("second_company_id", "c2");
+                put("second_human_name", "曾妍");
+                put("topic_type", "900");
+            }
+        };
+        Map<String, String> m3 = new HashMap<String, String>() {
+            {
+                put("first_company_id", "c1");
+                put("first_human_name", "曾妍");
+                put("second_company_id", "c6");
+                put("second_human_name", "曾妍");
+                put("topic_type", "900");
+            }
+        };
+        Map<String, String> m4 = new HashMap<String, String>() {
+            {
+                put("first_company_id", "c2");
+                put("first_human_name", "杨凡");
+                put("second_company_id", "c1");
+                put("second_human_name", "杨凡");
+                put("topic_type", "900");
+            }
+        };
+        Arrays.asList(m, m0, m1, m2, m3, m4).forEach(x -> {
+//            kafkaProduce.produce(topic, JSONObject.toJSONString(x));
+            mongoTemplate.save(x, "pid_all_person_merge_v9");
+        });
+    }
+
+    @Test
+    public void sendMergeAllPerson2() {
+        String topic = "test3";
+        Map<String, String> m = new HashMap<String, String>() {
+            {
+                put("first_company_id", "c4");
+                put("first_human_name", "周立磐");
+                put("second_company_id", "c6");
+                put("second_human_name", "周立磐");
+                put("topic_type", "900");
+            }
+        };
+        Map<String, String> m0 = new HashMap<String, String>() {
+            {
+                put("first_company_id", "c4");
+                put("first_human_name", "周立磐");
+                put("second_company_id", "c7");
+                put("second_human_name", "周立磐");
+                put("topic_type", "900");
+            }
+        };
+        Map<String, String> m1 = new HashMap<String, String>() {
+            {
+                put("first_company_id", "c4");
+                put("first_human_name", "周立磐");
+                put("second_company_id", "c8");
+                put("second_human_name", "周立磐");
+                put("topic_type", "900");
+            }
+        };
+
+        Map<String, String> m2 = new HashMap<String, String>() {
+            {
+                put("first_company_id", "c4");
+                put("first_human_name", "周立磐");
+                put("second_company_id", "c9");
+                put("second_human_name", "周立磐");
+                put("topic_type", "900");
+            }
+        };
+        Map<String, String> m3 = new HashMap<String, String>() {
+            {
+                put("first_company_id", "c4");
+                put("first_human_name", "周立磐");
+                put("second_company_id", "c10");
+                put("second_human_name", "周立磐");
+                put("topic_type", "900");
+            }
+        };
+        Map<String, String> m4 = new HashMap<String, String>() {
+            {
+                put("first_company_id", "c6");
+                put("first_human_name", "周立磐");
+                put("second_company_id", "c4");
+                put("second_human_name", "周立磐");
+                put("topic_type", "900");
+            }
+        };
+        Map<String, String> m5 = new HashMap<String, String>() {
+            {
+                put("first_company_id", "c4");
+                put("first_human_name", "周立磐");
+                put("second_company_id", "c4");
+                put("second_human_name", "周立磐");
+                put("topic_type", "900");
+            }
+        };
+        Map<String, String> m6 = new HashMap<String, String>() {
+            {
+                put("first_company_id", "c5");
+                put("first_human_name", "周立磐");
+                put("second_company_id", "c1");
+                put("second_human_name", "周立磐");
+                put("topic_type", "900");
+            }
+        };
+        Arrays.asList(m, m0, m1, m2, m3, m4, m5, m6).forEach(x -> {
+            mongoTemplate.save(x, "pid_all_person_merge_v9");
+            //kafkaProduce.produce(topic, JSONObject.toJSONString(x));
+        });
+    }
+
+    @Test
+    public void sendMergeAllPerson3() {
+        String topic = "test3";
+        Map<String, String> m = new HashMap<String, String>() {
+            {
+                put("first_company_id", "c4");
+                put("first_human_name", "周立磐");
+                put("second_company_id", "c6");
+                put("second_human_name", "周立磐");
+                put("second_human_pid", "p10");
+                put("topic_type", "1000");
+            }
+        };
+        Map<String, String> m0 = new HashMap<String, String>() {
+            {
+                put("first_company_id", "c4");
+                put("first_human_name", "周立磐");
+                put("second_company_id", "c7");
+                put("second_human_name", "周立磐");
+                put("second_human_pid", "p9");
+                put("topic_type", "1000");
+            }
+        };
+        Map<String, String> m1 = new HashMap<String, String>() {
+            {
+                put("first_company_id", "c4");
+                put("first_human_name", "周立磐");
+                put("second_company_id", "c8");
+                put("second_human_name", "周立磐");
+                put("second_human_pid", "p11");
+                put("topic_type", "1000");
+            }
+        };
+
+        Map<String, String> m2 = new HashMap<String, String>() {
+            {
+                put("first_company_id", "c4");
+                put("first_human_name", "周立磐");
+                put("second_company_id", "c9");
+                put("second_human_name", "周立磐");
+                put("second_human_pid", "p12");
+                put("topic_type", "1000");
+            }
+        };
+        Map<String, String> m3 = new HashMap<String, String>() {
+            {
+                put("first_company_id", "c4");
+                put("first_human_name", "周立磐");
+                put("second_company_id", "c10");
+                put("second_human_name", "周立磐");
+                put("second_human_pid", "p13");
+                put("topic_type", "1000");
+            }
+        };
+        Map<String, String> m4 = new HashMap<String, String>() {
+            {
+                put("first_company_id", "c4");
+                put("first_human_name", "周立磐");
+                put("second_company_id", "c4");
+                put("second_human_name", "周立磐");
+                put("second_human_pid", "p8");
+                put("topic_type", "1000");
+            }
+        };
+        Map<String, String> m5 = new HashMap<String, String>() {
+            {
+                put("first_company_id", "c4");
+                put("first_human_name", "周立磐");
+                put("second_company_id", "c10");
+                put("second_human_name", "周立磐");
+                put("second_human_pid", "p13");
+                put("topic_type", "1000");
+            }
+        };
+        Map<String, String> m6 = new HashMap<String, String>() {
+            {
+                put("first_company_id", "c9");
+                put("first_human_name", "周立磐");
+                put("second_company_id", "c1");
+                put("second_human_name", "周立磐");
+                //put("second_human_pid", "p10");
+                put("topic_type", "1000");
+            }
+        };
+        Arrays.asList(m, m0, m1, m2, m3, m4, m5, m6).forEach(x -> {
+            mongoTemplate.save(x, "pid_all_person_merge_v9");
+            //kafkaProduce.produce(topic, JSONObject.toJSONString(x));
+        });
+    }
+
 }