浏览代码

实时版本

xufei 3 年之前
父节点
当前提交
f009ddaf9c

+ 3 - 0
src/main/java/com/winhc/config/ConfigConstant.java

@@ -13,5 +13,8 @@ public class ConfigConstant {
     @Value("${spring.kafka.topic_case_v2}")
     public String topic_case_v2;
 
+    @Value("${spring.kafka.topic_node_relation_union}")
+    public String topic_node_relation_union;
+
 
 }

+ 1 - 1
src/main/java/com/winhc/kafka/consumer/KafkaConsumerCaseMergeV2.java

@@ -23,7 +23,7 @@ import java.util.Map;
  * @date 2021/8/30 16:12
  */
 @Slf4j
-@Service
+//@Service
 @AllArgsConstructor
 public class KafkaConsumerCaseMergeV2 {
 

+ 6 - 0
src/main/java/com/winhc/kafka/consumer/KafkaConsumerNeo4jV2.java

@@ -4,6 +4,7 @@ import com.winhc.common.enums.CompanyEnum;
 import com.winhc.db.mongodb.dataobject.NodeRelationError;
 import com.winhc.db.mongodb.repository.NodeRelatonErrorRepository;
 import com.winhc.service.RelationService;
+import com.winhc.service.SendMergeService;
 import com.winhc.utils.CompanyUtils;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
@@ -29,6 +30,8 @@ public class KafkaConsumerNeo4jV2 {
     private final Map<String, RelationService> map;
     @Autowired
     NodeRelatonErrorRepository nodeRelatonErrorRepository;
+    @Autowired
+    SendMergeService sendMergeService;
 
     @KafkaListener(id = "${spring.kafka.topic_node_relation_union}"
             , topics = "${spring.kafka.topic_node_relation_union}"
@@ -43,7 +46,10 @@ public class KafkaConsumerNeo4jV2 {
         this.map.get(CompanyEnum.TopicType.STAFF_RELATION.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.STAFF_RELATION.CODE));
         this.map.get(CompanyEnum.TopicType.PERSON_NODE_LABEL.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.PERSON_NODE_LABEL.CODE));
         this.map.get(CompanyEnum.TopicType.PERSON_MERGE_V2.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.PERSON_MERGE_V2.CODE));
+        //发送合并人员kafka
+        this.sendMergeService.save(CompanyUtils.getMergeIds(listMap));
         this.map.get(CompanyEnum.TopicType.NODE_RELATION_SUCCESS_STATUS.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.NODE_RELATION_SUCCESS_STATUS.CODE));
+
     }
 
 

+ 1 - 1
src/main/java/com/winhc/kafka/consumer/KafkaConsumerPersonMerge.java

@@ -20,7 +20,7 @@ import java.util.Map;
  * @date 2021/4/8 16:12
  */
 @Slf4j
-@Service
+//@Service
 @AllArgsConstructor
 public class KafkaConsumerPersonMerge {
 

+ 13 - 0
src/main/java/com/winhc/service/SendMergeService.java

@@ -0,0 +1,13 @@
+package com.winhc.service;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author π
+ * @Description:
+ * @date 2021/11/19 15:06
+ */
+public interface SendMergeService {
+    String save(List<String> messages);
+}

+ 32 - 0
src/main/java/com/winhc/service/impl/SendMergePersonIncrImpl.java

@@ -0,0 +1,32 @@
+package com.winhc.service.impl;
+
+import com.winhc.config.ConfigConstant;
+import com.winhc.kafka.KafkaProduce;
+import com.winhc.service.SendMergeService;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import java.util.List;
+
+/**
+ * @author π
+ * @Description:案件-新增节点
+ * @date 2021/5/17 10:03
+ */
+@Slf4j
+@Service("sendMergePersonIncrImpl")
+@AllArgsConstructor
+public class SendMergePersonIncrImpl implements SendMergeService {
+
+    @Autowired
+    KafkaProduce kafkaProduce;
+
+    @Autowired
+    ConfigConstant configConstant;
+    @Override
+    public String save(List<String> m) {
+        m.forEach(x->kafkaProduce.produce(configConstant.topic_node_relation_union, x));
+        return null;
+    }
+}

+ 19 - 1
src/main/java/com/winhc/utils/CompanyUtils.java

@@ -1,8 +1,11 @@
 package com.winhc.utils;
 
+import cn.hutool.core.lang.Tuple;
 import cn.hutool.json.JSONUtil;
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.google.common.collect.ImmutableMap;
 import com.winhc.common.enums.CompanyEnum;
 import com.winhc.db.mongodb.dataobject.NodeRelationError;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -72,6 +75,21 @@ public class CompanyUtils {
         return list.stream().filter(r -> (r.getOrDefault("topic_type", "-1").equals(type))).collect(Collectors.toList());
     }
 
+    public static List<String> getMergeIds(List<Map<String, Object>> list) {
+        return list.stream()
+                .filter(r -> r.getOrDefault("start_id", "0").toString().length() == 33)
+                .collect(Collectors.toMap(t -> t.getOrDefault("start_id", "0").toString(), t -> t, (n, o) -> n))
+                .values().stream().map(x -> {
+                    Map<String, String> m = (Map) x;
+                    ImmutableMap<String, String> m2 = ImmutableMap.of(
+                            "person_id", m.get("start_id")
+                            , "name", m.get("start_name")
+                            , "topic_type", "800"
+                    );
+                    return JSONObject.toJSONString(m2, SerializerFeature.WriteMapNullValue);
+                }).collect(Collectors.toList());
+    }
+
     public static List<NodeRelationError> toMessage(List<ConsumerRecord> records, String errorMessage) {
         return records.stream().filter(r -> (r != null && r.value() != null)).map(r -> {
             String consumerMessage = r.value().toString();
@@ -124,7 +142,7 @@ public class CompanyUtils {
     }
 
     public static void main(String[] args) {
-        System.out.println(getIncrPersonLabelV2("新增",10));
+        System.out.println(getIncrPersonLabelV2("新增", 10));
         System.out.println(getIncrPersonLabelV2("新增"));
     }
 }

+ 4 - 4
src/main/resources/application-dev.properties

@@ -10,13 +10,13 @@ spring.data.neo4j.username.v1=neo4j
 spring.data.neo4j.password.v1=neo4j168
 #spring.data.neo4j.uri.v1=bolt://139.196.165.100:7687
 #spring.data.neo4j.uri.v1=bolt://192.168.2.57:7687
-spring.data.neo4j.uri.v1=bolt://127.0.0.1:7687
+spring.data.neo4j.uri.v1=bolt://139.224.197.164:7687
 
 #Neo4j配置(第二台机器)
 spring.data.neo4j.username.v2=neo4j
 spring.data.neo4j.password.v2=neo4j168
-#spring.data.neo4j.uri.v2=bolt://139.224.197.164:7687
-spring.data.neo4j.uri.v2=bolt://127.0.0.1:7687
+spring.data.neo4j.uri.v2=bolt://139.224.197.164:7687
+#spring.data.neo4j.uri.v2=bolt://127.0.0.1:7687
 
 #数据库uri地址
 #spring.data.neo4j.uri=http://10.29.26.76:7474
@@ -41,7 +41,7 @@ scheduling.enabled = false
 spring.kafka.bootstrap-servers=47.101.221.131:9092
 #spring.kafka.bootstrap-servers=192.168.4.239:9092,192.168.4.241:9092,192.168.4.240:9092
 #topic
-spring.kafka.topic_node_relation_union=inc_node_relation_union_dev
+spring.kafka.topic_node_relation_union=test3
 spring.kafka.topic_person_companys=inc_person_companys_dev
 spring.kafka.topic_person_merge=inc_person_merge_dev
 spring.kafka.topic_case=inc_judicial_case_union_dev

+ 5 - 3
src/main/resources/application-prd.properties

@@ -3,13 +3,15 @@
 #Neo4j配置(第一台机器)
 spring.data.neo4j.username.v1=neo4j
 spring.data.neo4j.password.v1=neo4j168
-spring.data.neo4j.uri.v1=bolt://192.168.2.57:7687
+#spring.data.neo4j.uri.v1=bolt://192.168.2.57:7687
+spring.data.neo4j.uri.v1=bolt://192.168.2.60:7687
+
 
 #Neo4j配置(第二台机器)
 spring.data.neo4j.username.v2=neo4j
 spring.data.neo4j.password.v2=neo4j168
 #spring.data.neo4j.uri.v2=bolt://192.168.2.60:7687
-spring.data.neo4j.uri.v2=bolt://192.168.2.62:7687
+spring.data.neo4j.uri.v2=bolt://192.168.2.60:7687
 
 
 #spring.datasource.url = jdbc:mysql://rm-uf61r3m23ba1p5z3dfo.mysql.rds.aliyuncs.com:3306/prism1?useUnicode=true&characterEncoding=utf-8
@@ -35,7 +37,7 @@ spring.kafka.bootstrap-servers=192.168.4.239:9092,192.168.4.241:9092,192.168.4.2
 
 
 #topic
-spring.kafka.topic_node_relation_union=inc_node_relation_union
+spring.kafka.topic_node_relation_union=ng_graph_pre
 spring.kafka.topic_person_companys=inc_person_companys
 spring.kafka.topic_person_merge=inc_person_merge
 spring.kafka.topic_case=inc_judicial_case_union