xufei 2 年 前
コミット
9092bc360d

+ 3 - 2
src/main/java/com/winhc/common/enums/CompanyEnum.java

@@ -4,7 +4,7 @@ public class CompanyEnum {
 
     public enum Lable {
         COMPANY("企业"), PERSON("个人"), 新增("新增"),
-        法人("法人"), 高管("高管"), 投资("投资");
+        法人("法人"), 高管("高管"), 投资("投资"), 分支机构("分支机构");
 
         public final String code;
 
@@ -31,7 +31,8 @@ public class CompanyEnum {
         CASE_NODE_UPDATE("700", "caseNodeUpdateImpl"),
         PERSON_MERGE_V2("800", "personMergeV2Impl"),
         PERSON_MERGE_All("900", "personMergeAllImpl"),
-        PERSON_MERGE_All_V2("1000", "personMergeAllV2Impl")
+        PERSON_MERGE_All_V2("1000", "personMergeAllV2Impl"),
+        BRANCH_RELATION("1100", "branchRelationServiceImpl")
         ;
 
         public final String CODE;

+ 1 - 1
src/main/java/com/winhc/config/KafkaConfig.java

@@ -102,7 +102,7 @@ public class KafkaConfig {
         // 设置是否自动提交
         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
         // 一次拉取消息数量
-        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 20);
+        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 200);
         props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 5*60*1000);
         props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 5*60*1000);
         // 最大处理时间

+ 3 - 1
src/main/java/com/winhc/kafka/consumer/KafkaConsumerNeo4jV2.java

@@ -56,6 +56,8 @@ public class KafkaConsumerNeo4jV2 {
         List<Map<String, Object>> listMap = CompanyUtils.map(records);
         this.map.get(CompanyEnum.TopicType.PERSON_MERGE_V2.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.PERSON_MERGE_V2.CODE));
 
+        this.map.get(CompanyEnum.TopicType.BRANCH_RELATION.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.BRANCH_RELATION.CODE));
+
 //        this.map.get(CompanyEnum.TopicType.COMPANY_NODE.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.COMPANY_NODE.CODE));
 //        this.map.get(CompanyEnum.TopicType.HOLDER_RELATION_V1.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.HOLDER_RELATION_V1.CODE));
 //        this.map.get(CompanyEnum.TopicType.HOLDER_RELATION_V2.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.HOLDER_RELATION_V2.CODE));
@@ -80,7 +82,7 @@ public class KafkaConsumerNeo4jV2 {
     public ConsumerAwareListenerErrorHandler dealError() {
         return (message, e, consumer) -> {
             List<String> list = CompanyUtils.toMessage((List<ConsumerRecord>) message.getPayload());
-            if(isWindows()) return null;
+            if (isWindows()) return null;
             for (String msg : list) {
                 kafkaProduce.produce(configConstant.topic_node_relation_union, msg);
             }

+ 54 - 0
src/main/java/com/winhc/service/impl/BranchRelationServiceImpl.java

@@ -0,0 +1,54 @@
+package com.winhc.service.impl;
+
+import com.winhc.common.enums.CompanyEnum;
+import com.winhc.service.RelationService;
+import com.winhc.utils.CompanyUtils;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.Session;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.stereotype.Service;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author π
+ * @Description:分支机构 公司->公司
+ * @date 2023/4/20 10:03
+ */
+@Slf4j
+@Service("branchRelationServiceImpl")
+@AllArgsConstructor
+public class BranchRelationServiceImpl implements RelationService {
+
+    @Autowired
+    @Qualifier("DriverV1")
+    Driver driver;
+
+    @Override
+    public String save(List<Map<String, Object>> batch_list) {
+        if (batch_list.isEmpty()) return null;
+        long start = System.currentTimeMillis();
+        Session session = driver.session();
+        final String cql = "WITH  {batch_list} AS batch_list \n" +
+                "UNWIND batch_list AS row \n" +
+                "MERGE(s:" + CompanyEnum.Lable.COMPANY.code + "{company_id:row.start_id}) \n" +
+                "SET s.name=row.start_name, s.company_id=row.start_id \n" +
+                "MERGE(e:" + CompanyEnum.Lable.COMPANY.code + "{company_id:row.end_id}) \n" +
+                "SET e.company_id=row.end_id \n" +
+                "WITH s,e,row \n" +
+                "MERGE(s)-[r:" + CompanyEnum.Lable.分支机构.code + "]->(e) \n" +
+                "SET r.deleted=row.deleted \n";
+        log.info("consumer size: {}, cql:{}", batch_list.size(), cql);
+        String data = CompanyUtils.writeNeo4j(session, cql, new HashMap<String, Object>() {{
+            put("batch_list", batch_list);
+        }});
+        session.close();
+        log.info("class:{} | save size:{} | cost:{}", BranchRelationServiceImpl.class.getSimpleName(), batch_list.size(), (System.currentTimeMillis() - start));
+        return null;
+    }
+}