Jelajahi Sumber

人员合并逻辑

xufei 3 tahun lalu
induk
melakukan
c244965219

+ 4 - 1
src/main/java/com/winhc/common/enums/CompanyEnum.java

@@ -28,7 +28,8 @@ public class CompanyEnum {
         CASE_RELATION_INCR("400", "caseRelationIncrImpl"),
         CASE_NODE_INCR("500", "caseNodeIncrImpl"),
         CASE_RELATION_UPDATE("600", "caseRelationUpdateImpl"),
-        CASE_NODE_UPDATE("700", "caseNodeUpdateImpl")
+        CASE_NODE_UPDATE("700", "caseNodeUpdateImpl"),
+        PERSON_MERGE_V2("800", "personMergeV2Impl")
         ;
 
         public final String CODE;
@@ -40,5 +41,7 @@ public class CompanyEnum {
         }
     }
 
+    public static final int SPLIT_HOUR = 10;
+
 }
 

+ 1 - 1
src/main/java/com/winhc/config/KafkaConfig.java

@@ -102,7 +102,7 @@ public class KafkaConfig {
         // 设置是否自动提交
         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
         // 一次拉取消息数量
-        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
+        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 200);
         props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 5*60*1000);
         props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 5*60*1000);
         // 最大处理时间

+ 1 - 0
src/main/java/com/winhc/kafka/consumer/KafkaConsumerNeo4jV2.java

@@ -42,6 +42,7 @@ public class KafkaConsumerNeo4jV2 {
         this.map.get(CompanyEnum.TopicType.LEGAL_ENTITY_RELATION_V2.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.LEGAL_ENTITY_RELATION_V2.CODE));
         this.map.get(CompanyEnum.TopicType.STAFF_RELATION.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.STAFF_RELATION.CODE));
         this.map.get(CompanyEnum.TopicType.PERSON_NODE_LABEL.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.PERSON_NODE_LABEL.CODE));
+        this.map.get(CompanyEnum.TopicType.PERSON_MERGE_V2.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.PERSON_MERGE_V2.CODE));
         this.map.get(CompanyEnum.TopicType.NODE_RELATION_SUCCESS_STATUS.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.NODE_RELATION_SUCCESS_STATUS.CODE));
     }
 

+ 57 - 0
src/main/java/com/winhc/kafka/consumer/KafkaConsumerPersonMergeV2.java

@@ -0,0 +1,57 @@
+package com.winhc.kafka.consumer;
+
+import com.winhc.common.enums.CompanyEnum;
+import com.winhc.config.ConfigConstant;
+import com.winhc.kafka.KafkaProduce;
+import com.winhc.service.RelationService;
+import com.winhc.utils.CompanyUtils;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
+import org.springframework.stereotype.Service;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author π
+ * @Description:人员合并
+ * @date 2021/5/17 16:12
+ */
+@Slf4j
+//@Service
+@AllArgsConstructor
+public class KafkaConsumerPersonMergeV2 {
+
+    private final Map<String, RelationService> map;
+    @Autowired
+    KafkaProduce kafkaProduce;
+    @Autowired
+    ConfigConstant configConstant;
+
+    @KafkaListener(id = "${spring.kafka.topic_person_merge_v2}"
+            , topics = "${spring.kafka.topic_person_merge_v2}"
+            , groupId = "${spring.kafka.consumer.group-id}", containerFactory = "containerFactory", errorHandler = "KafkaConsumerPersonMergeV2")
+    public void consumerCompanyNode(List<ConsumerRecord<?, ?>> records) {
+        List<Map<String, Object>> listMap = CompanyUtils.map(records);
+        this.map.get(CompanyEnum.TopicType.PERSON_MERGE_V2.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.PERSON_MERGE_V2.CODE));
+    }
+
+    /**
+     * 因为手动确认消费,若消费失败,记录重刷
+     */
+    @Bean("KafkaConsumerPersonMergeV2")
+    public ConsumerAwareListenerErrorHandler dealError() {
+        return (message, e, consumer) -> {
+            List<String> list = CompanyUtils.toMessage((List<ConsumerRecord>) message.getPayload());
+            for (String msg : list) {
+                kafkaProduce.produce(configConstant.topic_case, msg);
+            }
+            log.error("KafkaConsumerCaseMerge error: {}", e.toString());
+            return null;
+        };
+    }
+}

+ 6 - 6
src/main/java/com/winhc/service/impl/CaseNodeIncrImpl.java

@@ -36,12 +36,12 @@ public class CaseNodeIncrImpl implements RelationService {
         final String cql = "\nWITH  {batch_list} AS batch_list \n" +
                 "UNWIND batch_list AS row \n" +
                 " MERGE(s:CASE{case_id:row.start_id})\n" +
-                " SET s.case_id = row.start_id\n" +
-                " WITH s\n" +
-                " CALL apoc.path.subgraphNodes(s, {maxLevel:-1}) YIELD node\n" +
-                " WITH node,s\n" +
-                " SET node.component_id = -id(s)\n" +
-                " SET node:" + CompanyUtils.getIncrPersonLabelV2("新增");
+                " SET s.case_id = row.start_id\n" ;
+//                " WITH s\n" +
+//                " CALL apoc.path.subgraphNodes(s, {maxLevel:-1}) YIELD node\n" +
+//                " WITH node,s\n" +
+//                " SET node.component_id = -id(s)\n" +
+//                " SET node:" + CompanyUtils.getIncrPersonLabelV2("新增");
         log.info("consumer size: {}, cql:{}", batch_list.size(), cql);
         String data = CompanyUtils.writeNeo4j(session, cql, new HashMap<String, Object>() {{
             put("batch_list", batch_list);

+ 1 - 0
src/main/java/com/winhc/service/impl/CaseNodeUpdateImpl.java

@@ -40,6 +40,7 @@ public class CaseNodeUpdateImpl implements RelationService {
                 " WITH s\n" +
                 " CALL apoc.path.subgraphNodes(s, {maxLevel:-1}) YIELD node\n" +
                 " WITH node,s\n" +
+                " SET node.component_id = -id(s)\n" +
                 " SET node:" + CompanyUtils.getIncrPersonLabelV2("新增");
         log.info("consumer size: {}, cql:{}", batch_list.size(), cql);
         String data = CompanyUtils.writeNeo4j(session, cql, new HashMap<String, Object>() {{

+ 6 - 6
src/main/java/com/winhc/service/impl/CaseRelationIncrImpl.java

@@ -40,12 +40,12 @@ public class CaseRelationIncrImpl implements RelationService {
                 " MERGE(m:CASE{case_id:row.end_id})\n" +
                 " SET m.case_id = row.end_id\n" +
                 " MERGE(s)-[r:RELATION]->(m)\n" +
-                " SET r.connect_type=row.connect_type\n" +
-                " WITH s,m\n" +
-                " CALL apoc.path.subgraphNodes(s, {maxLevel:-1}) YIELD node\n" +
-                " WITH node,s\n" +
-                " SET node.component_id = -id(s)\n" +
-                " SET node:" + CompanyUtils.getIncrPersonLabelV2("新增");
+                " SET r.connect_type=row.connect_type\n" ;
+//                " WITH s,m\n" +
+//                " CALL apoc.path.subgraphNodes(s, {maxLevel:-1}) YIELD node\n" +
+//                " WITH node,s\n" +
+//                " SET node.component_id = -id(s)\n" +
+//                " SET node:" + CompanyUtils.getIncrPersonLabelV2("新增");
         log.info("consumer size: {}, cql:{}", batch_list.size(), cql);
         String data = CompanyUtils.writeNeo4j(session, cql, new HashMap<String, Object>() {{
             put("batch_list", batch_list);

+ 1 - 0
src/main/java/com/winhc/service/impl/CaseRelationUpdateImpl.java

@@ -44,6 +44,7 @@ public class CaseRelationUpdateImpl implements RelationService {
                 " WITH s,m\n" +
                 " CALL apoc.path.subgraphNodes(s, {maxLevel:-1}) YIELD node\n" +
                 " WITH node,s\n" +
+                " SET node.component_id = -id(s)\n" +
                 " SET node:" + CompanyUtils.getIncrPersonLabelV2("新增");
         log.info("consumer size: {}, cql:{}", batch_list.size(), cql);
         String data = CompanyUtils.writeNeo4j(session, cql, new HashMap<String, Object>() {{

+ 1 - 1
src/main/java/com/winhc/service/impl/HolderRelationV1ServiceImpl.java

@@ -37,7 +37,7 @@ public class HolderRelationV1ServiceImpl implements RelationService {
                 "UNWIND batch_list AS row \n" +
                 "MERGE(s:" + CompanyEnum.Lable.PERSON.code + "{person_id:row.start_id}) \n" +
                 "SET s.name=row.start_name, s.person_id=row.start_id \n" +
-                "SET s:" + CompanyUtils.getIncrPersonLabel() + " \n" +
+                "SET s:" + CompanyUtils.getIncrPersonLabelV2("新增", CompanyEnum.SPLIT_HOUR) + " \n" +
                 "MERGE(e:" + CompanyEnum.Lable.COMPANY.code + "{company_id:row.end_id}) \n" +
                 "SET e.name=row.end_name, e.company_id=row.end_id \n" +
                 "WITH s,e,row \n" +

+ 1 - 1
src/main/java/com/winhc/service/impl/LegalEntityRelationV1ServiceImpl.java

@@ -37,7 +37,7 @@ public class LegalEntityRelationV1ServiceImpl implements RelationService {
                 "UNWIND batch_list AS row \n" +
                 "MERGE(s:" + CompanyEnum.Lable.PERSON.code + "{person_id:row.start_id}) \n" +
                 "SET s.name=row.start_name, s.person_id=row.start_id \n" +
-                "SET s:" + CompanyUtils.getIncrPersonLabel() + " \n" +
+                "SET s:" + CompanyUtils.getIncrPersonLabelV2("新增", CompanyEnum.SPLIT_HOUR) + " \n" +
                 "MERGE(e:" + CompanyEnum.Lable.COMPANY.code + "{company_id:row.end_id}) \n" +
                 "SET e.name=row.end_name, e.company_id=row.end_id \n" +
                 "WITH s,e,row \n" +

+ 60 - 0
src/main/java/com/winhc/service/impl/PersonMergeV2Impl.java

@@ -0,0 +1,60 @@
+package com.winhc.service.impl;
+
+import com.winhc.common.enums.CompanyEnum;
+import com.winhc.service.RelationService;
+import com.winhc.utils.CompanyUtils;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.Session;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.stereotype.Service;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author π
+ * @Description:人员合并
+ * @date 2021/1/11 10:03
+ */
+@Slf4j
+@Service("personMergeV2Impl")
+@AllArgsConstructor
+public class PersonMergeV2Impl implements RelationService {
+
+    @Autowired
+    @Qualifier("DriverV1")
+    Driver driver;
+
+    @Override
+    public String save(List<Map<String, Object>> batch_list) {
+        if (batch_list.isEmpty()) return null;
+        long start = System.currentTimeMillis();
+        Session session = driver.session();
+
+        final String cql =
+                "\nWITH  {batch_list} AS batch_list \n" +
+                        "UNWIND batch_list AS row \n" +
+                        "MATCH (p:" + CompanyEnum.Lable.PERSON.code + "{person_id: row.person_id})-[*0..3]-(q:" + CompanyEnum.Lable.PERSON.code + "{name:row.name})\n" +
+                        "WHERE ID(p) <> ID(q)\n" +
+                        //"SET p:" + CompanyUtils.getIncrPersonLabelV2("新增", CompanyEnum.SPLIT_HOUR) + "\n" +
+                        "WITH p as first_node, collect(distinct q) as other_nodes\n" +
+                        "UNWIND other_nodes as other_node\n" +
+                        "MATCH (other_node)-[r]-(x:" + CompanyEnum.Lable.COMPANY.code + ")\n" +
+                        "WITH first_node,r,other_node,x\n" +
+                        "CALL apoc.merge.relationship(first_node, TYPE(r), properties(r),{}, x,{}) YIELD rel\n" +
+                        "SET first_node:" + CompanyUtils.getIncrPersonLabelV2("新增", CompanyEnum.SPLIT_HOUR) + "\n" +
+                        "SET other_node:" + CompanyUtils.getIncrPersonLabelV2("删除", CompanyEnum.SPLIT_HOUR) + "\n" +
+                        "DELETE r";
+        log.info("consumer size: {}, cql:{}", batch_list.size(), cql);
+        String data = CompanyUtils.writeNeo4j(session, cql, new HashMap<String, Object>() {{
+            put("batch_list", batch_list);
+        }});
+        session.close();
+        log.info("class:{} | save size:{} | cost:{}", PersonMergeV2Impl.class.getSimpleName(), batch_list.size(), (System.currentTimeMillis() - start));
+        return data;
+    }
+}

+ 1 - 1
src/main/java/com/winhc/service/impl/StaffRelationServiceImpl.java

@@ -37,7 +37,7 @@ public class StaffRelationServiceImpl implements RelationService {
                 "UNWIND batch_list AS row \n" +
                 "MERGE(s:" + CompanyEnum.Lable.PERSON.code + "{person_id:row.start_id}) \n" +
                 "SET s.name=row.start_name, s.person_id=row.start_id \n" +
-                "SET s:" + CompanyUtils.getIncrPersonLabel() + " \n" +
+                "SET s:" + CompanyUtils.getIncrPersonLabelV2("新增", CompanyEnum.SPLIT_HOUR) + " \n" +
                 "MERGE(e:" + CompanyEnum.Lable.COMPANY.code + "{company_id:row.end_id}) \n" +
                 "SET e.name=row.end_name, e.company_id=row.end_id \n" +
                 "WITH s,e,row \n" +

+ 5 - 1
src/main/java/com/winhc/utils/CompanyUtils.java

@@ -113,7 +113,10 @@ public class CompanyUtils {
     }
 
     public static String getIncrPersonLabelV2(String label) {
-        int split_hour = 12;
+        return getIncrPersonLabelV2(label, 12);
+    }
+
+    public static String getIncrPersonLabelV2(String label, int split_hour) {
         GregorianCalendar calendar = new GregorianCalendar();
         int now_hour = calendar.get(Calendar.HOUR_OF_DAY);
         int gap = now_hour > split_hour ? 0 : -1;
@@ -121,6 +124,7 @@ public class CompanyUtils {
     }
 
     public static void main(String[] args) {
+        System.out.println(getIncrPersonLabelV2("新增",10));
         System.out.println(getIncrPersonLabelV2("新增"));
     }
 }

+ 1 - 1
src/main/resources/application-dev.properties

@@ -44,7 +44,7 @@ spring.kafka.topic_node_relation_union=inc_node_relation_union_dev
 spring.kafka.topic_person_companys=inc_person_companys_dev
 spring.kafka.topic_person_merge=inc_person_merge_dev
 spring.kafka.topic_case=inc_judicial_case_union_dev
-
+spring.kafka.topic_person_merge_v2=inc_person_merge_v2_dev
 #spring.kafka.bootstrap-servers=192.168.4.237:9092,192.168.4.235:9092,192.168.4.236:9092
 #spring.kafka.topic=xf_test
 #=============== provider  =======================

+ 1 - 0
src/main/resources/application-prd.properties

@@ -38,6 +38,7 @@ spring.kafka.topic_node_relation_union=inc_node_relation_union
 spring.kafka.topic_person_companys=inc_person_companys
 spring.kafka.topic_person_merge=inc_person_merge
 spring.kafka.topic_case=inc_judicial_case_union
+spring.kafka.topic_person_merge_v2=inc_person_merge_v2
 #=============== provider  =======================
 spring.kafka.producer.retries=3
 # 每次批量发送消息的数量