فهرست منبع

合并逻辑修改

xufei 3 سال پیش
والد
کامیت
4405a6a7f3

+ 3 - 0
src/main/java/com/winhc/kafka/consumer/KafkaConsumerPersonIdUpdate.java

@@ -67,6 +67,9 @@ public class KafkaConsumerPersonIdUpdate {
             , topics = "${spring.kafka.topic_pid_update_v1}"
             , groupId = "${spring.kafka.consumer.group-id}", containerFactory = "containerFactory", errorHandler = "handlerV1")
     public void updatePid(List<String> messages) {
+        if(CompanyUtils.isWindows()){
+            return;
+        }
         List<Tuple<Map<String, String>, JSONObject>> list = messages.stream()
                 .flatMap(m ->
                         JSONArray.parseArray(m).stream()

+ 7 - 4
src/main/java/com/winhc/pojo/MergePerson.java

@@ -17,10 +17,12 @@ import java.util.Date;
 @AllArgsConstructor
 @NoArgsConstructor
 public class MergePerson {
-    private String new_human_pid;
-    private String new_human_name;
-    private String old_human_pid;
-    private String old_human_name;
+    public String new_human_pid;
+    public String new_human_name;
+    public String old_human_pid;
+    public String old_human_name;
+    @JSONField(serialize = false)
+    public int cnt = 0;
 
     @JSONField(serialize = false)
     public String getId() {
@@ -38,6 +40,7 @@ public class MergePerson {
                 ", new_human_name='" + new_human_name + '\'' +
                 ", old_human_pid='" + old_human_pid + '\'' +
                 ", old_human_name='" + old_human_name + '\'' +
+                ", cnt='" + cnt + '\'' +
                 '}';
     }
 }

+ 35 - 20
src/main/java/com/winhc/service/impl/PersonMergeV2Impl.java

@@ -43,29 +43,44 @@ public class PersonMergeV2Impl implements RelationService {
         if (batch_list.isEmpty()) return null;
         long start = System.currentTimeMillis();
         Session session = driver.session();
-
-        final String cql =
-                "\nWITH  {batch_list} AS batch_list \n" +
-                        "UNWIND batch_list AS row \n" +
-                        "MATCH (p:" + CompanyEnum.Lable.PERSON.code + "{person_id: row.person_id})-[*0..3]-(q:" + CompanyEnum.Lable.PERSON.code + "{name:row.name})\n" +
-                        "WHERE ID(p) <> ID(q)\n" +
-                        "WITH p.person_id as person_id, p as first_node, apoc.coll.sort(collect(distinct ID(q))) as all_ids\n" +
-                        "UNWIND all_ids as all_id\n" +
-                        "MATCH(m:" + CompanyEnum.Lable.PERSON.code + ")-[r]-(x:" + CompanyEnum.Lable.COMPANY.code + ")\n" +
-                        "WHERE ID(m) = all_id\n" +
-                        "WITH person_id, first_node, m as other_node, x, r\n" +
-                        "CALL apoc.merge.relationship.eager(first_node, TYPE(r), properties(r),{}, x,{}) YIELD rel\n" +
-                        "SET first_node:" + CompanyUtils.getIncrPersonLabelV2("新增", CompanyEnum.SPLIT_HOUR) + "\n" +
-                        "SET other_node:" + CompanyUtils.getIncrPersonLabelV2("删除", CompanyEnum.SPLIT_HOUR) + "\n" +
-                        "DELETE r" + "\n" +
-                        "RETURN first_node.person_id as new_human_pid,first_node.name as new_human_name,other_node.person_id as old_human_pid,other_node.name as old_human_name" + "\n";
-        log.info("consumer size: {}, cql:{}", batch_list.size(), cql);
-        String data = CompanyUtils.writeNeo4j2(session, cql, new HashMap<String, Object>() {{
+        //1.查询关联人员
+        final String query_cql = "\nWITH  {batch_list} AS batch_list \n" +
+                "UNWIND batch_list AS row \n" +
+                "MATCH (p:" + CompanyEnum.Lable.PERSON.code + "{person_id: row.person_id})-[*0..3]-(q:" + CompanyEnum.Lable.PERSON.code + "{name:row.name})\n" +
+                "WHERE ID(p) <> ID(q)\n" +
+                "WITH p.person_id as person_id, p as first_node, apoc.coll.sort(collect(distinct ID(p)) + collect(distinct ID(q))) as all_ids\n" +
+                "UNWIND all_ids as all_id\n" +
+                "MATCH(m:" + CompanyEnum.Lable.PERSON.code + ")-[r]-(x:" + CompanyEnum.Lable.COMPANY.code + ")\n" +
+                "WHERE ID(m) = all_id\n" +
+                "WITH person_id, first_node, m as other_node, count(distinct x) as cnt\n" +
+                "RETURN first_node.person_id as new_human_pid, first_node.name as new_human_name, other_node.person_id as old_human_pid, other_node.name as old_human_name, cnt" + "\n";
+        log.info("query size: {}, cql:{}", batch_list.size(), query_cql);
+        List<Map<String, String>> mergeData = CompanyUtils.writeNeo4j1(session, query_cql, new HashMap<String, Object>() {{
             put("batch_list", batch_list);
         }});
+
+        //2.合并逻辑
+        String data = null;
+        if (!mergeData.isEmpty()) {
+            final String merge_cql =
+                    "\nWITH  {merge_list} AS batch_list \n" +
+                            "UNWIND batch_list AS row\n" +
+                            "MATCH (p:个人 {person_id:row.new_human_pid}),(q:个人 {person_id:row.old_human_pid})-[r]-(x:企业)\n" +
+                            "WITH p as first_node, q as other_node, x, r\n" +
+                            "CALL apoc.merge.relationship.eager(first_node, TYPE(r), properties(r),{}, x,{}) YIELD rel\n" +
+                            "SET first_node:" + CompanyUtils.getIncrPersonLabelV2("新增", CompanyEnum.SPLIT_HOUR) + "\n" +
+                            "SET other_node:" + CompanyUtils.getIncrPersonLabelV2("删除", CompanyEnum.SPLIT_HOUR) + "\n" +
+                            "DELETE r\n" +
+                            "RETURN first_node.person_id as new_human_pid,first_node.name as new_human_name,other_node.person_id as old_human_pid,other_node.name as old_human_name";
+            log.info("merge size: {}, cql2:{}", mergeData.size(), merge_cql);
+            data = CompanyUtils.writeNeo4j2(session, merge_cql, new HashMap<String, Object>() {{
+                put("merge_list", mergeData);
+            }});
+        }
         session.close();
-        log.info("class:{} | save size:{} | cost:{}", PersonMergeV2Impl.class.getSimpleName(), batch_list.size(), (System.currentTimeMillis() - start));
-        //发送变更人员记录
+        log.info("class:{} | save size:{} |  merge size:{} |cost:{}", PersonMergeV2Impl.class.getSimpleName(), batch_list.size(), mergeData.size(), (System.currentTimeMillis() - start));
+
+        //3.发送变更人员记录
         if (StringUtils.isNotBlank(data)) {
             kafkaProduce.produce(configConstant.topic_pid_update_v1, data);
         }

+ 2 - 0
src/main/java/com/winhc/task/AsynMergePersonTask.java

@@ -12,6 +12,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.bson.Document;
 import org.bson.conversions.Bson;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.data.mongodb.core.MongoTemplate;
 import org.springframework.scheduling.annotation.EnableScheduling;
 import org.springframework.scheduling.annotation.Scheduled;
@@ -31,6 +32,7 @@ import static com.winhc.utils.DateUtil.getMinuteTime;
 @Slf4j
 @EnableScheduling
 @AllArgsConstructor
+@ConditionalOnProperty(prefix = "scheduling", name = "enabled", havingValue = "true")
 public class AsynMergePersonTask {
 
     private final MongoTemplate mongoTemplate;

+ 31 - 0
src/main/java/com/winhc/utils/CompanyUtils.java

@@ -159,6 +159,37 @@ public class CompanyUtils {
         return values.size() > 0 ? JSONObject.toJSONString(data.values()) : null;
     }
 
+    public static List<Map<String, String>> writeNeo4j1(Session session, String cql, HashMap<String, Object> parameters) {
+        List<Record> dataList = session.writeTransaction(tx -> {
+            Result result = tx.run(cql, parameters);
+            return result.list();
+        });
+        Map<String, List<MergePerson>> collect = dataList.stream()
+                .map(record -> JSON.parseObject(JSON.toJSONString(record.asMap()), MergePerson.class))
+                .collect(Collectors.groupingBy(MergePerson::getNew_human_pid));
+        List<Map<String, String>> list = new ArrayList<>();
+        collect.forEach((k, v) -> {
+            v.stream().max(Comparator.comparing(MergePerson::getCnt)).ifPresent(maxPerson -> v.forEach(m -> {
+                if (!m.getOld_human_pid().equals(maxPerson.getOld_human_pid())) {
+                    HashMap<String, String> map = new HashMap<>();
+                    map.put("new_human_pid", maxPerson.getOld_human_pid());
+                    map.put("new_human_name", maxPerson.getOld_human_name());
+                    map.put("old_human_pid", m.getOld_human_pid());
+                    map.put("old_human_name", m.getOld_human_name());
+                    list.add(map);
+                }
+            }));
+        });
+        return list.stream().collect(
+                Collectors.collectingAndThen(
+                        Collectors.toCollection(
+                                () -> new TreeSet<>(Comparator.comparing(m -> m.get("new_human_pid") + m.get("old_human_pid")))
+                        ), ArrayList::new
+                )
+        );
+
+    }
+
     public static String getIncrPersonLabel() {
         return CompanyEnum.Lable.新增.code + DateUtil.getDateBefore(-1).replace("-", "");
     }

+ 1 - 1
src/test/java/com/winhc/test/TestJson.java

@@ -80,7 +80,7 @@ public class TestJson {
     @Test
     public void test4() {
         Collection<MergePerson> values = new ArrayList<>();
-        MergePerson m = new MergePerson("1", "n1", "2", "n2");
+        MergePerson m = new MergePerson("1", "n1", "2", "n2",1);
         values.add(m);
 //        MergePerson m1 = new MergePerson("1", "n1", "2", "n2");
 //        values.add(m1);

+ 33 - 1
src/test/java/com/winhc/test/TestSendKafka.java

@@ -44,13 +44,27 @@ public class TestSendKafka {
     @Test
     public void sendMergePerson() {
         String topic = "test3";
-        Map<String,String> m1 = new HashMap<String,String>(){
+        Map<String,String> m = new HashMap<String,String>(){
             {
                 put("person_id","p2");
                 put("name","钱胜前");
                 put("topic_type","800");
             }
         };
+        Map<String,String> m0 = new HashMap<String,String>(){
+            {
+                put("person_id","p1");
+                put("name","钱胜前");
+                put("topic_type","800");
+            }
+        };
+        Map<String,String> m1 = new HashMap<String,String>(){
+            {
+                put("person_id","p1");
+                put("name","钱胜前");
+                put("topic_type","800");
+            }
+        };
         Map<String,String> m2 = new HashMap<String,String>(){
             {
                 put("person_id","p6");
@@ -58,8 +72,26 @@ public class TestSendKafka {
                 put("topic_type","800");
             }
         };
+        Map<String,String> m3 = new HashMap<String,String>(){
+            {
+                put("person_id","p4");
+                put("name","曾妍");
+                put("topic_type","800");
+            }
+        };
+        Map<String,String> m4 = new HashMap<String,String>(){
+            {
+                put("person_id","p5");
+                put("name","杨凡");
+                put("topic_type","800");
+            }
+        };
+        kafkaProduce.produce(topic, JSONObject.toJSONString(m));
+        kafkaProduce.produce(topic, JSONObject.toJSONString(m0));
         kafkaProduce.produce(topic, JSONObject.toJSONString(m1));
         kafkaProduce.produce(topic, JSONObject.toJSONString(m2));
+        kafkaProduce.produce(topic, JSONObject.toJSONString(m3));
+        kafkaProduce.produce(topic, JSONObject.toJSONString(m4));
     }
 
     @Test