xufei 2 年 前
コミット
50d2622c5e

+ 12 - 10
src/main/java/com/winhc/kafka/consumer/KafkaConsumerNeo4jV2.java

@@ -52,21 +52,23 @@ public class KafkaConsumerNeo4jV2 {
             , topics = "${spring.kafka.topic_node_relation_union}"
             , groupId = "${spring.kafka.consumer.group-id}", containerFactory = "containerFactory", errorHandler = "consumerAwareListenerErrorHandlerV2")
     public void consumerCompanyNode(List<ConsumerRecord<?, ?>> records) {
+
         List<Map<String, Object>> listMap = CompanyUtils.map(records);
-        this.map.get(CompanyEnum.TopicType.COMPANY_NODE.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.COMPANY_NODE.CODE));
-        this.map.get(CompanyEnum.TopicType.HOLDER_RELATION_V1.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.HOLDER_RELATION_V1.CODE));
-        this.map.get(CompanyEnum.TopicType.HOLDER_RELATION_V2.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.HOLDER_RELATION_V2.CODE));
-        this.map.get(CompanyEnum.TopicType.LEGAL_ENTITY_RELATION_V1.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.LEGAL_ENTITY_RELATION_V1.CODE));
-        this.map.get(CompanyEnum.TopicType.LEGAL_ENTITY_RELATION_V2.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.LEGAL_ENTITY_RELATION_V2.CODE));
-        this.map.get(CompanyEnum.TopicType.STAFF_RELATION.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.STAFF_RELATION.CODE));
-        this.map.get(CompanyEnum.TopicType.PERSON_NODE_LABEL.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.PERSON_NODE_LABEL.CODE));
         this.map.get(CompanyEnum.TopicType.PERSON_MERGE_V2.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.PERSON_MERGE_V2.CODE));
-        this.map.get(CompanyEnum.TopicType.PERSON_MERGE_All_V2.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.PERSON_MERGE_All_V2.CODE));
+
+//        this.map.get(CompanyEnum.TopicType.COMPANY_NODE.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.COMPANY_NODE.CODE));
+//        this.map.get(CompanyEnum.TopicType.HOLDER_RELATION_V1.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.HOLDER_RELATION_V1.CODE));
+//        this.map.get(CompanyEnum.TopicType.HOLDER_RELATION_V2.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.HOLDER_RELATION_V2.CODE));
+//        this.map.get(CompanyEnum.TopicType.LEGAL_ENTITY_RELATION_V1.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.LEGAL_ENTITY_RELATION_V1.CODE));
+//        this.map.get(CompanyEnum.TopicType.LEGAL_ENTITY_RELATION_V2.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.LEGAL_ENTITY_RELATION_V2.CODE));
+//        this.map.get(CompanyEnum.TopicType.STAFF_RELATION.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.STAFF_RELATION.CODE));
+//        this.map.get(CompanyEnum.TopicType.PERSON_NODE_LABEL.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.PERSON_NODE_LABEL.CODE));
+//        this.map.get(CompanyEnum.TopicType.PERSON_MERGE_All_V2.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.PERSON_MERGE_All_V2.CODE));
 
         //todo 等两分钟,再触发合并程序
-        this.pidToMongoService.save(CompanyUtils.getMergeIds2(listMap));
+//        this.pidToMongoService.save(CompanyUtils.getMergeIds2(listMap));
         //发送合并人员kafka
-        this.map.get(CompanyEnum.TopicType.NODE_RELATION_SUCCESS_STATUS.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.NODE_RELATION_SUCCESS_STATUS.CODE));
+//        this.map.get(CompanyEnum.TopicType.NODE_RELATION_SUCCESS_STATUS.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.NODE_RELATION_SUCCESS_STATUS.CODE));
 
     }
 

+ 3 - 3
src/main/java/com/winhc/kafka/consumer/KafkaConsumerPersonIdUpdate.java

@@ -60,9 +60,9 @@ public class KafkaConsumerPersonIdUpdate {
         restClient = bbossESStarterEs5.getRestClient("es5");
     }
 
-    @KafkaListener(id = "${spring.kafka.topic_pid_update_v1}"
-            , topics = "${spring.kafka.topic_pid_update_v1}"
-            , groupId = "${spring.kafka.consumer.group-id}", containerFactory = "containerFactory", errorHandler = "handlerV1")
+//    @KafkaListener(id = "${spring.kafka.topic_pid_update_v1}"
+//            , topics = "${spring.kafka.topic_pid_update_v1}"
+//            , groupId = "${spring.kafka.consumer.group-id}", containerFactory = "containerFactory", errorHandler = "handlerV1")
     public void updatePid(List<String> messages) {
         if(CompanyUtils.isWindows()){
             return;

+ 6 - 6
src/main/java/com/winhc/service/impl/PersonMergeV2Impl.java

@@ -48,7 +48,7 @@ public class PersonMergeV2Impl implements RelationService {
         //1.查询关联人员
         final String query_cql = "\nWITH  {batch_list} AS batch_list \n" +
                 "UNWIND batch_list AS row \n" +
-                "MATCH (p:" + CompanyEnum.Lable.PERSON.code + "{person_id: row.person_id})-[*0..3]-(q:" + CompanyEnum.Lable.PERSON.code + "{name:row.name})\n" +
+                "MATCH (p:" + CompanyEnum.Lable.PERSON.code + "{person_id: row.person_id})-[*0..4]-(q:" + CompanyEnum.Lable.PERSON.code + "{name:row.name})\n" +
                 "WHERE ID(p) <> ID(q)\n" +
                 "WITH p.person_id as person_id, p as first_node, apoc.coll.sort(collect(distinct ID(p)) + collect(distinct ID(q))) as all_ids\n" +
                 "UNWIND all_ids as all_id\n" +
@@ -64,14 +64,14 @@ public class PersonMergeV2Impl implements RelationService {
         }})));
 
         //2.合并逻辑
-        String data = mergePerson(session, mergeData);
+//        String data = mergePerson(session, mergeData);
         session.close();
         log.info("class:{} | save size:{} |  merge size:{} |cost:{}", PersonMergeV2Impl.class.getSimpleName(), batch_list.size(), mergeData.size(), (System.currentTimeMillis() - start));
 
         //3.发送变更人员记录
-        if (StringUtils.isNotBlank(data)) {
-            kafkaProduce.produce(configConstant.topic_pid_update_v1, data);
-        }
-        return data;
+//        if (StringUtils.isNotBlank(data)) {
+//            kafkaProduce.produce(configConstant.topic_pid_update_v1, data);
+//        }
+        return null;
     }
 }

+ 1 - 1
src/main/java/com/winhc/task/AsynAllPersonMergeTask.java

@@ -42,7 +42,7 @@ public class AsynAllPersonMergeTask {
     ConfigConstant configConstant;
 
     //@Scheduled(cron = "50 20 17 07 05 ?")
-    @Scheduled(cron = "*/10 * * * * ?")
+    //@Scheduled(cron = "*/10 * * * * ?")
     //@Scheduled(cron = "0 /2 * * * ? ")
     public void start() throws InterruptedException {
         if(CompanyUtils.isWindows()) return;

+ 1 - 1
src/main/java/com/winhc/task/AsynMergePersonTask.java

@@ -43,7 +43,7 @@ public class AsynMergePersonTask {
     @Autowired
     ConfigConstant configConstant;
 
-    @Scheduled(cron = "*/15 * * * * ?")
+    //@Scheduled(cron = "*/15 * * * * ?")
     //@Scheduled(cron = "0 /2 * * * ? ")
     public void start() throws InterruptedException {
         if (isWindows()) return;

+ 7 - 5
src/main/resources/application-prd.properties

@@ -3,15 +3,15 @@
 #Neo4j配置(第一台机器)
 spring.data.neo4j.username.v1=neo4j
 spring.data.neo4j.password.v1=neo4j168
-#spring.data.neo4j.uri.v1=bolt://192.168.2.57:7687
-spring.data.neo4j.uri.v1=bolt://192.168.2.60:7687
+spring.data.neo4j.uri.v1=bolt://192.168.2.57:7687
+#spring.data.neo4j.uri.v1=bolt://192.168.2.60:7687
 
 
 #Neo4j配置(第二台机器)
 spring.data.neo4j.username.v2=neo4j
 spring.data.neo4j.password.v2=neo4j168
+spring.data.neo4j.uri.v2=bolt://192.168.2.57:7687
 #spring.data.neo4j.uri.v2=bolt://192.168.2.60:7687
-spring.data.neo4j.uri.v2=bolt://192.168.2.60:7687
 
 
 #spring.datasource.url = jdbc:mysql://rm-uf61r3m23ba1p5z3dfo.mysql.rds.aliyuncs.com:3306/prism1?useUnicode=true&characterEncoding=utf-8
@@ -40,7 +40,8 @@ spring.kafka.bootstrap-servers=192.168.4.239:9092,192.168.4.241:9092,192.168.4.2
 spring.kafka.topic_pid_update_v1=ng_rt_pid_change_v1
 spring.kafka.topic_pid_update_v2=ng_rt_pid_change_v2
 
-spring.kafka.topic_node_relation_union=ng_graph_pre
+#spring.kafka.topic_node_relation_union=ng_graph_pre
+spring.kafka.topic_node_relation_union=ng_graph_dev
 spring.kafka.topic_person_companys=inc_person_companys
 spring.kafka.topic_person_merge=inc_person_merge
 spring.kafka.topic_case=inc_judicial_case_union
@@ -58,7 +59,8 @@ spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.Str
 
 #=============== consumer  =======================
 # 指定默认消费者group id
-spring.kafka.consumer.group-id=neo4j_node_relation
+#spring.kafka.consumer.group-id=neo4j_node_relation
+spring.kafka.consumer.group-id=xf_group
 
 spring.kafka.consumer.auto-offset-reset=earliest
 # 取消自动提交

+ 11 - 0
src/test/java/com/winhc/test/TestMongo.java

@@ -3,6 +3,7 @@ package com.winhc.test;
 import com.google.common.collect.ImmutableMap;
 import com.winhc.db.mongodb.dataobject.NodeRelationError;
 import com.winhc.db.mongodb.repository.NodeRelatonErrorRepository;
+import com.winhc.kafka.KafkaProduce;
 import com.winhc.service.EsQueryService;
 import com.winhc.service.PidToMongoService;
 import com.winhc.service.impl.NodeRelationSuccessStatusServiceImpl;
@@ -37,6 +38,9 @@ public class TestMongo {
     @Autowired
     EsQueryService esQueryService;
 
+    @Autowired
+    KafkaProduce kafkaProduce;
+
     @Test
     public void saveData() {
         ArrayList<NodeRelationError> list = new ArrayList<>();
@@ -96,4 +100,11 @@ public class TestMongo {
         System.out.println("cost : " + (System.currentTimeMillis() - start));
         System.out.println(res);
     }
+
+    @Test
+    public void sendKafka() {
+        String topic = "test3";
+        String message = "{\"person_id\":\"pcbcb10f7f7326fc960ef9e5611ac14f8\",\"name\":\"翁燕玲\",\"topic_type\":\"800\"}";
+        kafkaProduce.produce(topic, message);
+    }
 }