Browse Source

异常存储,代码优化

xufei 4 years ago
parent
commit
521f3a7ffb

+ 4 - 0
pom.xml

@@ -147,6 +147,10 @@
             <artifactId>hutool-all</artifactId>
             <version>4.5.16</version>
         </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-data-mongodb</artifactId>
+        </dependency>
     </dependencies>
 
     <dependencyManagement>

+ 1 - 1
src/main/java/com/winhc/common/enums/CompanyEnum.java

@@ -3,7 +3,7 @@ package com.winhc.common.enums;
 public class CompanyEnum {
 
     public enum Lable {
-        COMPANY("COMPANY1"), PERSON("PERSON1"),
+        COMPANY("企业"), PERSON("个人"),
         法人("法人"), 高管("高管"), 投资("投资");
 
         public final String code;

+ 1 - 1
src/main/java/com/winhc/config/KafkaConfig.java

@@ -102,7 +102,7 @@ public class KafkaConfig {
         // 设置是否自动提交
         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
         // 一次拉取消息数量
-        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5000);
+        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2000);
         props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
         props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
         // 最大处理时间

+ 40 - 0
src/main/java/com/winhc/config/MongoConfig.java

@@ -0,0 +1,40 @@
+package com.winhc.config;
+
+import com.mongodb.MongoClientOptions;
+import org.springframework.beans.factory.BeanFactory;
+import org.springframework.beans.factory.NoSuchBeanDefinitionException;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.data.convert.CustomConversions;
+import org.springframework.data.mongodb.MongoDbFactory;
+import org.springframework.data.mongodb.core.convert.DbRefResolver;
+import org.springframework.data.mongodb.core.convert.DefaultDbRefResolver;
+import org.springframework.data.mongodb.core.convert.DefaultMongoTypeMapper;
+import org.springframework.data.mongodb.core.convert.MappingMongoConverter;
+import org.springframework.data.mongodb.core.mapping.MongoMappingContext;
+
+@Configuration
+public class MongoConfig {
+
+    @Bean
+    public MappingMongoConverter mappingMongoConverter(MongoDbFactory factory, MongoMappingContext context, BeanFactory beanFactory) {
+        DbRefResolver dbRefResolver = new DefaultDbRefResolver(factory);
+        MappingMongoConverter mappingConverter = new MappingMongoConverter(dbRefResolver, context);
+        try {
+            mappingConverter.setCustomConversions(beanFactory.getBean(CustomConversions.class));
+        } catch (NoSuchBeanDefinitionException ignore) {
+        }
+
+        // Don't save _class to mongo
+        mappingConverter.setTypeMapper(new DefaultMongoTypeMapper(null));
+
+        return mappingConverter;
+    }
+
+    @Bean
+    public MongoClientOptions mongoOptions() {
+        return MongoClientOptions.builder().maxConnectionIdleTime(3000).build();
+
+    }
+}
+ 

+ 39 - 0
src/main/java/com/winhc/db/mongodb/dataobject/NodeRelationError.java

@@ -0,0 +1,39 @@
+package com.winhc.db.mongodb.dataobject;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.springframework.data.mongodb.core.mapping.Document;
+import java.util.Date;
+
+/**
+ * @description: 企业债券信息
+ * @author: yujie
+ * @date 2020.1.14 17:34
+ */
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+@Document(collection = "node_relation_error")
+public class NodeRelationError {
+
+    // 创建时间
+    private Date createTime;
+
+    // 更新时间
+    private Date updateTime;
+
+    // 消息
+    private String consumerMessage;
+
+    // 消息
+    private String errorMessage;
+
+    // 类型
+    private String topicType;
+
+    // 类型
+    private Integer status;
+
+
+}

+ 11 - 0
src/main/java/com/winhc/db/mongodb/repository/NodeRelatonErrorRepository.java

@@ -0,0 +1,11 @@
+package com.winhc.db.mongodb.repository;
+
+import com.winhc.db.mongodb.dataobject.NodeRelationError;
+import org.springframework.data.mongodb.repository.MongoRepository;
+
+/**
+ * @author: π
+ * @date 2021.1.15
+ */
+public interface NodeRelatonErrorRepository extends MongoRepository<NodeRelationError, String> {
+}

+ 8 - 8
src/main/java/com/winhc/kafka/consumer/KafkaConsumerNeo4jV2.java

@@ -1,16 +1,18 @@
 package com.winhc.kafka.consumer;
 
 import com.winhc.common.enums.CompanyEnum;
+import com.winhc.db.mongodb.dataobject.NodeRelationError;
+import com.winhc.db.mongodb.repository.NodeRelatonErrorRepository;
 import com.winhc.service.RelationService;
 import com.winhc.utils.CompanyUtils;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Bean;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
 import org.springframework.stereotype.Service;
-
 import java.util.List;
 import java.util.Map;
 
@@ -25,6 +27,8 @@ import java.util.Map;
 public class KafkaConsumerNeo4jV2 {
 
     private final Map<String, RelationService> map;
+    @Autowired
+    NodeRelatonErrorRepository nodeRelatonErrorRepository;
 
     @KafkaListener(id = "${spring.kafka.topic_node_relation_union}"
             , topics = "${spring.kafka.topic_node_relation_union}"
@@ -46,13 +50,9 @@ public class KafkaConsumerNeo4jV2 {
     @Bean("consumerAwareListenerErrorHandlerV2")
     public ConsumerAwareListenerErrorHandler dealError() {
         return (message, e, consumer) -> {
-            List<ConsumerRecord> records = (List<ConsumerRecord>) message.getPayload();
-            log.error("consumer error: {}", e.getMessage());
-            //System.out.println("consumer error:" + e.getMessage());
-            //System.out.println("consumer 1:" + consumer.toString());
-            //System.out.println("consumer 2:" + records);
-            log.error("consumer message: {}", records.get(0).toString());
-            // TODO 将失败的记录保存到数据库,再用定时任务查询记录,并重刷数据
+            List<NodeRelationError> nodeRelationErrors = CompanyUtils.toMessage((List<ConsumerRecord>) message.getPayload(), e.getMessage());
+            nodeRelatonErrorRepository.saveAll(nodeRelationErrors);
+            log.error("consumer error: save mongo size: {} , message: {}", nodeRelationErrors.size(), e.getMessage());
             return null;
         };
     }

+ 4 - 10
src/main/java/com/winhc/service/impl/CompanyNodeServiceImpl.java

@@ -2,13 +2,12 @@ package com.winhc.service.impl;
 
 import com.winhc.common.enums.CompanyEnum;
 import com.winhc.service.RelationService;
+import com.winhc.utils.CompanyUtils;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.neo4j.driver.Driver;
-import org.neo4j.driver.Result;
 import org.neo4j.driver.Session;
 import org.springframework.stereotype.Service;
-
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -29,20 +28,15 @@ public class CompanyNodeServiceImpl implements RelationService {
     public String save(List<Map<String, Object>> batch_list) {
         if (batch_list.isEmpty()) return null;
         long start = System.currentTimeMillis();
-
         Session session = driver.session();
         final String cql = "WITH  {batch_list} AS batch_list \n" +
                 "UNWIND batch_list AS row \n" +
                 "MERGE(e:" + CompanyEnum.Lable.COMPANY.code + "{company_id:row.id}) \n" +
                 "SET e.name=row.name, e.company_id=row.id \n";
-        Map parameters = new HashMap() {{
+        log.info("consumer size: {}, cql:{}", batch_list.size(), cql);
+        String data = CompanyUtils.writeNeo4j(session, cql, new HashMap<String, Object>() {{
             put("batch_list", batch_list);
-        }};
-        //log.info("cql:" + cql);
-        String data = session.writeTransaction(tx -> {
-            Result result = tx.run(cql, parameters);
-            return "success";
-        });
+        }});
         session.close();
         log.info("class:{} | save size:{} | cost:{}", CompanyNodeServiceImpl.class.getSimpleName(), batch_list.size(), (System.currentTimeMillis() - start));
         return data;

+ 4 - 9
src/main/java/com/winhc/service/impl/HolderRelationV1ServiceImpl.java

@@ -2,11 +2,11 @@ package com.winhc.service.impl;
 
 import com.winhc.common.enums.CompanyEnum;
 import com.winhc.service.RelationService;
+import com.winhc.utils.CompanyUtils;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.neo4j.driver.*;
 import org.springframework.stereotype.Service;
-
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -27,7 +27,6 @@ public class HolderRelationV1ServiceImpl implements RelationService {
     public String save(List<Map<String, Object>> batch_list) {
         if (batch_list.isEmpty()) return null;
         long start = System.currentTimeMillis();
-
         Session session = driver.session();
         final String cql = "WITH  {batch_list} AS batch_list \n" +
                 "UNWIND batch_list AS row \n" +
@@ -38,14 +37,10 @@ public class HolderRelationV1ServiceImpl implements RelationService {
                 "WITH s,e,row \n" +
                 "MERGE(s)-[r:" + CompanyEnum.Lable.投资.code + "]->(e) \n" +
                 "SET r.percent=row.percent, r.deleted=row.deleted \n";
-        Map parameters = new HashMap() {{
+        log.info("consumer size: {}, cql:{}", batch_list.size(), cql);
+        String data = CompanyUtils.writeNeo4j(session, cql, new HashMap<String, Object>() {{
             put("batch_list", batch_list);
-        }};
-        //log.info("cql:" + cql);
-        String data = session.writeTransaction(tx -> {
-            Result result = tx.run(cql, parameters);
-            return "success";
-        });
+        }});
         session.close();
         log.info("class:{} | save size:{} | cost:{}", HolderRelationV1ServiceImpl.class.getSimpleName(), batch_list.size(), (System.currentTimeMillis() - start));
         return data;

+ 6 - 12
src/main/java/com/winhc/service/impl/HolderRelationV2ServiceImpl.java

@@ -2,11 +2,10 @@ package com.winhc.service.impl;
 
 import com.winhc.common.enums.CompanyEnum;
 import com.winhc.service.RelationService;
+import com.winhc.utils.CompanyUtils;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.neo4j.driver.Driver;
-import org.neo4j.driver.Result;
-import org.neo4j.driver.Session;
+import org.neo4j.driver.*;
 import org.springframework.stereotype.Service;
 import java.util.HashMap;
 import java.util.List;
@@ -28,7 +27,6 @@ public class HolderRelationV2ServiceImpl implements RelationService {
     public String save(List<Map<String, Object>> batch_list) {
         if (batch_list.isEmpty()) return null;
         long start = System.currentTimeMillis();
-
         Session session = driver.session();
         final String cql = "WITH  {batch_list} AS batch_list \n" +
                 "UNWIND batch_list AS row \n" +
@@ -39,16 +37,12 @@ public class HolderRelationV2ServiceImpl implements RelationService {
                 "WITH s,e,row \n" +
                 "MERGE(s)-[r:" + CompanyEnum.Lable.投资.code + "]->(e) \n" +
                 "SET r.percent=row.percent, r.deleted=row.deleted \n";
-        Map parameters = new HashMap() {{
+        log.info("consumer size: {}, cql:{}", batch_list.size(), cql);
+        String data = CompanyUtils.writeNeo4j(session, cql, new HashMap<String, Object>() {{
             put("batch_list", batch_list);
-        }};
-        //log.info("cql:" + cql);
-        String data = session.writeTransaction(tx -> {
-            Result result = tx.run(cql, parameters);
-            return "success";
-        });
+        }});
         session.close();
         log.info("class:{} | save size:{} | cost:{}", HolderRelationV2ServiceImpl.class.getSimpleName(), batch_list.size(), (System.currentTimeMillis() - start));
-        return data;
+        return null;
     }
 }

+ 4 - 9
src/main/java/com/winhc/service/impl/LegalEntityRelationV1ServiceImpl.java

@@ -2,10 +2,10 @@ package com.winhc.service.impl;
 
 import com.winhc.common.enums.CompanyEnum;
 import com.winhc.service.RelationService;
+import com.winhc.utils.CompanyUtils;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.neo4j.driver.Driver;
-import org.neo4j.driver.Result;
 import org.neo4j.driver.Session;
 import org.springframework.stereotype.Service;
 import java.util.HashMap;
@@ -28,7 +28,6 @@ public class LegalEntityRelationV1ServiceImpl implements RelationService {
     public String save(List<Map<String, Object>> batch_list) {
         if (batch_list.isEmpty()) return null;
         long start = System.currentTimeMillis();
-
         Session session = driver.session();
         final String cql = "WITH  {batch_list} AS batch_list \n" +
                 "UNWIND batch_list AS row \n" +
@@ -39,14 +38,10 @@ public class LegalEntityRelationV1ServiceImpl implements RelationService {
                 "WITH s,e,row \n" +
                 "MERGE(s)-[r:" + CompanyEnum.Lable.法人.code + "]->(e) \n" +
                 "SET r.deleted=row.deleted \n";
-        Map parameters = new HashMap() {{
+        log.info("consumer size: {}, cql:{}", batch_list.size(), cql);
+        String data = CompanyUtils.writeNeo4j(session, cql, new HashMap<String, Object>() {{
             put("batch_list", batch_list);
-        }};
-        //log.info("cql:" + cql);
-        String data = session.writeTransaction(tx -> {
-            Result result = tx.run(cql, parameters);
-            return "success";
-        });
+        }});
         session.close();
         log.info("class:{} | save size:{} | cost:{}", LegalEntityRelationV1ServiceImpl.class.getSimpleName(), batch_list.size(), (System.currentTimeMillis() - start));
         return data;

+ 4 - 9
src/main/java/com/winhc/service/impl/LegalEntityRelationV2ServiceImpl.java

@@ -2,10 +2,10 @@ package com.winhc.service.impl;
 
 import com.winhc.common.enums.CompanyEnum;
 import com.winhc.service.RelationService;
+import com.winhc.utils.CompanyUtils;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.neo4j.driver.Driver;
-import org.neo4j.driver.Result;
 import org.neo4j.driver.Session;
 import org.springframework.stereotype.Service;
 import java.util.HashMap;
@@ -28,7 +28,6 @@ public class LegalEntityRelationV2ServiceImpl implements RelationService {
     public String save(List<Map<String, Object>> batch_list) {
         if (batch_list.isEmpty()) return null;
         long start = System.currentTimeMillis();
-
         Session session = driver.session();
         final String cql = "WITH  {batch_list} AS batch_list \n" +
                 "UNWIND batch_list AS row \n" +
@@ -39,14 +38,10 @@ public class LegalEntityRelationV2ServiceImpl implements RelationService {
                 "WITH s,e,row \n" +
                 "MERGE(s)-[r:" + CompanyEnum.Lable.法人.code + "]->(e) \n" +
                 "SET r.deleted=row.deleted \n";
-        Map parameters = new HashMap() {{
+        log.info("consumer size: {}, cql:{}", batch_list.size(), cql);
+        String data = CompanyUtils.writeNeo4j(session, cql, new HashMap<String, Object>() {{
             put("batch_list", batch_list);
-        }};
-        //log.info("cql:" + cql);
-        String data = session.writeTransaction(tx -> {
-            Result result = tx.run(cql, parameters);
-            return "success";
-        });
+        }});
         session.close();
         log.info("class:{} | save size:{} | cost:{}", LegalEntityRelationV2ServiceImpl.class.getSimpleName(), batch_list.size(), (System.currentTimeMillis() - start));
         return data;

+ 4 - 9
src/main/java/com/winhc/service/impl/StaffRelationServiceImpl.java

@@ -2,10 +2,10 @@ package com.winhc.service.impl;
 
 import com.winhc.common.enums.CompanyEnum;
 import com.winhc.service.RelationService;
+import com.winhc.utils.CompanyUtils;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.neo4j.driver.Driver;
-import org.neo4j.driver.Result;
 import org.neo4j.driver.Session;
 import org.springframework.stereotype.Service;
 import java.util.HashMap;
@@ -28,7 +28,6 @@ public class StaffRelationServiceImpl implements RelationService {
     public String save(List<Map<String, Object>> batch_list) {
         if (batch_list.isEmpty()) return null;
         long start = System.currentTimeMillis();
-
         Session session = driver.session();
         final String cql = "WITH  {batch_list} AS batch_list \n" +
                 "UNWIND batch_list AS row \n" +
@@ -39,14 +38,10 @@ public class StaffRelationServiceImpl implements RelationService {
                 "WITH s,e,row \n" +
                 "MERGE(s)-[r:" + CompanyEnum.Lable.高管.code + "]->(e) \n" +
                 "SET r.staff_type=row.staff_type, r.deleted=row.deleted \n";
-        Map parameters = new HashMap() {{
+        log.info("consumer size: {}, cql:{}", batch_list.size(), cql);
+        String data = CompanyUtils.writeNeo4j(session, cql, new HashMap<String, Object>() {{
             put("batch_list", batch_list);
-        }};
-        //log.info("cql:" + cql);
-        String data = session.writeTransaction(tx -> {
-            Result result = tx.run(cql, parameters);
-            return "success";
-        });
+        }});
         session.close();
         log.info("class:{} | save size:{} | cost:{}", StaffRelationServiceImpl.class.getSimpleName(), batch_list.size(), (System.currentTimeMillis() - start));
         return data;

+ 31 - 2
src/main/java/com/winhc/utils/CompanyUtils.java

@@ -1,8 +1,12 @@
 package com.winhc.utils;
 
 import cn.hutool.json.JSONUtil;
+import com.winhc.db.mongodb.dataobject.NodeRelationError;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-
+import org.neo4j.driver.Result;
+import org.neo4j.driver.Session;
+import java.util.Date;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -21,6 +25,31 @@ public class CompanyUtils {
     }
 
     public static List<Map<String, Object>> filterList(List<Map<String, Object>> list, String type) {
-        return list.stream().filter(r -> (r.get("topic_type").equals(type))).collect(Collectors.toList());
+        return list.stream().filter(r -> (r.getOrDefault("topic_type","-1").equals(type))).collect(Collectors.toList());
+    }
+
+    public static List<NodeRelationError> toMessage(List<ConsumerRecord> records, String errorMessage) {
+        return records.stream().filter(r -> (r != null && r.value() != null)).map(r -> {
+            String consumerMessage = r.value().toString();
+            Map<String, Object> m = JSONUtil.parseObj(consumerMessage);
+            Object topic_type = m.getOrDefault("topic_type", "-1");
+            Date date = new Date();
+            NodeRelationError n1 = new NodeRelationError();
+            n1.setCreateTime(date);
+            n1.setUpdateTime(date);
+            n1.setConsumerMessage(consumerMessage);
+            n1.setErrorMessage(errorMessage);
+            n1.setStatus(0);
+            n1.setTopicType(String.valueOf(topic_type));
+            return n1;
+        }).collect(Collectors.toList());
+    }
+
+    public static String writeNeo4j(Session session, String cql, HashMap<String,Object> parameters) {
+        String data = session.writeTransaction(tx -> {
+            Result result = tx.run(cql, parameters);
+            return "success";
+        });
+        return data;
     }
 }

+ 7 - 9
src/main/resources/application-dev.properties

@@ -5,6 +5,7 @@ spring.data.neo4j.username=neo4j
 spring.data.neo4j.password=neo4j168
 #spring.data.neo4j.uri=http://106.14.211.187:7474
 spring.data.neo4j.uri=bolt://106.14.211.187:7687
+#spring.data.neo4j.uri=bolt://127.0.0.1:7687
 #prod
 #spring.data.neo4j.uri=bolt://10.29.30.244:7687
 
@@ -31,13 +32,6 @@ scheduling.enabled = false
 # 指定kafka 代理地址,可以多个
 spring.kafka.bootstrap-servers=106.14.211.187:9092
 #topic
-spring.kafka.topic_company_node=inc_company_node
-spring.kafka.topic_holder_v1=inc_holder_relation_v1
-spring.kafka.topic_holder_v2=inc_holder_relation_v2
-spring.kafka.topic_staff_relation=inc_staff_relation
-spring.kafka.topic_legal_entity_relation_v1=inc_legal_entity_relation_v1
-spring.kafka.topic_legal_entity_relation_v2=inc_legal_entity_relation_v2
-
 spring.kafka.topic_node_relation_union=inc_node_relation_union
 
 #spring.kafka.bootstrap-servers=192.168.4.237:9092,192.168.4.235:9092,192.168.4.236:9092
@@ -54,7 +48,7 @@ spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.Str
 
 #=============== consumer  =======================
 # 指定默认消费者group id
-spring.kafka.consumer.group-id=user-log-group1
+spring.kafka.consumer.group-id=neo4j_node_relation
 
 spring.kafka.consumer.auto-offset-reset=earliest
 # 取消自动提交
@@ -65,4 +59,8 @@ spring.kafka.consumer.auto-commit-interval=100
 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
 spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
 # 手动提交
-#spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE
+#spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE
+
+
+#mongo
+spring.data.mongodb.uri=mongodb://itslaw:itslaw_168@dds-uf6ff5dfd9aef3641601-pub.mongodb.rds.aliyuncs.com:3717,dds-uf6ff5dfd9aef3642555-pub.mongodb.rds.aliyuncs.com:3717/itslaw?replicaSet=mgset-6501997

+ 20 - 11
src/main/resources/application-prd.properties

@@ -3,8 +3,19 @@
 #Neo4j配置
 spring.data.neo4j.username=neo4j
 spring.data.neo4j.password=neo4j168
+
+#187
 #spring.data.neo4j.uri=http://192.168.2.55:7474
-spring.data.neo4j.uri=bolt://192.168.2.55:7687
+#spring.data.neo4j.uri=bolt://192.168.2.55:7687
+#spring.data.neo4j.uri=bolt://106.14.211.187:7687
+
+#爬虫
+spring.data.neo4j.uri=bolt://192.168.2.56:7687
+
+
+#local
+#spring.data.neo4j.uri=bolt://127.0.0.1:7687
+#spring.data.neo4j.uri=bolt://10.1.10.213:7687
 
 
 #spring.datasource.url = jdbc:mysql://rm-uf61r3m23ba1p5z3dfo.mysql.rds.aliyuncs.com:3306/prism1?useUnicode=true&characterEncoding=utf-8
@@ -19,19 +30,13 @@ scheduling.enabled = true
 
 #============== kafka ===================
 # 指定kafka 代理地址,可以多个
-#spring.kafka.bootstrap-servers=106.14.211.187:9092
-#spring.kafka.topic=test
 
+#dev
+#spring.kafka.bootstrap-servers=106.14.211.187:9092
+#prod
 spring.kafka.bootstrap-servers=192.168.4.237:9092,192.168.4.235:9092,192.168.4.236:9092
 
 #topic
-spring.kafka.topic_company_node=inc_company_node
-spring.kafka.topic_holder_v1=inc_holder_relation_v1
-spring.kafka.topic_holder_v2=inc_holder_relation_v2
-spring.kafka.topic_staff_relation=inc_staff_relation
-spring.kafka.topic_legal_entity_relation_v1=inc_legal_entity_relation_v1
-spring.kafka.topic_legal_entity_relation_v2=inc_legal_entity_relation_v2
-
 spring.kafka.topic_node_relation_union=inc_node_relation_union
 
 #=============== provider  =======================
@@ -57,4 +62,8 @@ spring.kafka.consumer.auto-commit-interval=100
 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
 spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
 # 手动提交
-#spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE
+#spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE
+
+#mongo
+#spring.data.mongodb.uri=mongodb://itslaw:itslaw_168@dds-uf6ff5dfd9aef3641601-pub.mongodb.rds.aliyuncs.com:3717,dds-uf6ff5dfd9aef3642555-pub.mongodb.rds.aliyuncs.com:3717/itslaw?replicaSet=mgset-6501997&maxIdleTimeMS=3000
+spring.data.mongodb.uri=mongodb://itslaw:itslaw_168@dds-uf6ff5dfd9aef3641.mongodb.rds.aliyuncs.com:3717,dds-uf6ff5dfd9aef3642.mongodb.rds.aliyuncs.com:3717/itslaw?replicaSet=mgset-6501997

+ 15 - 72
src/test/java/com/winhc/test/TestCreateNode.java

@@ -206,80 +206,11 @@ public class TestCreateNode {
         System.out.println("cost" + (System.currentTimeMillis() - start));
     }
 
-    public void saveEdges(List<Edge> edges) {
-        StringBuilder sb = new StringBuilder();
-
-        sb.append("UNWIND {batch} as row ") //
-                .append(" WITH split(row.properties.from, '/') AS fromInfo, " //
-                        + "split(row.properties.to, '/') AS toInfo, row ") //
-                .append(" CALL apoc.cypher.doIt(" //
-                        + "'MATCH (from:`' + fromInfo[0] + '` {id: {fromId}})" //
-                        + " MATCH (to:`' + toInfo[0] + '` {id: {toId}}) " //
-                        + " MERGE (from)-[r:`' + row.properties.label + '` {id: {id}}]->(to) " //
-                        + " SET n += {properties}', " //
-                        + "{ fromId: row.properties.from, toId: row.properties.to, " //
-                        + " properties: row.properties, id: row.properties.id }" //
-                        + ") YIELD value") //
-                .append(" RETURN 1 ");
-
-        String statement = sb.toString();
-
-//        Map<String, Object> params = new HashMap<>();
-//        List<Map<String, Object>> batches = new ArrayList<>();
-//        for (Edge e : edges) {
-//            Map<String, Object> map = new HashMap<>();
-//            map.put("id", e.getId());
-//            map.put("from", e.getFrom());
-//            map.put("to", e.getTo());
-//            map.put("properties", e.getProperties());
-//            batches.add(map);
-//        }
-//        params.put("batch", batches);
-//
-//        cypher.query(statement, params, null);
-    }
-
-
 //"CALL apoc.create.relationship(person, row.relType,{status:row.status,percent:row.percent}, company) YIELD rel RETURN count(*) \n" ;
     //Procedure apoc.create.relationship has signature: apoc.create.relationship(from :: NODE?, relType :: STRING?, props :: MAP?, to :: NODE?) :: rel :: RELATIONSHIP?
     //Procedure apoc.merge.relationship has signature: apoc.merge.relationship(startNode :: NODE?, relationshipType :: STRING?, identProps :: MAP?, props :: MAP?, endNode :: NODE?) :: rel :: RELATIONSHIP?
 
     @Test
-    public void sendKafka() {
-        String topic = "compamy_relation3";
-        Long start = System.currentTimeMillis();
-
-//        params.put("companyId", "222");
-//        params.put("name", "bbb");
-//        String msg = JSON.toJSONString(params);
-//        System.out.println(msg);
-//        kafkaProduce.produce(topic, msg);
-
-//        for (int i = 0; i < 100000; i++) {
-//            params.put("companyId", "id" + i);
-//            params.put("name", "name" + i);
-//            String msg = JSON.toJSONString(params);
-//            kafkaProduce.produce(topic, msg);
-//        }
-//        System.out.println("cost: " + (System.currentTimeMillis() - start));
-
-        for (int i = 200000; i <= 300000; i++) {
-            Map<String, Object> m1 = new HashMap<>();
-            m1.put("companyId", "companyId" + i);
-            m1.put("personId", "personId" + i);
-            m1.put("companyName", "companyName_7_" + i);
-            m1.put("personName", "personName_7_" + i);
-            m1.put("relType", "relType_7_" + i + 2);
-            m1.put("status", "0");
-            m1.put("percent", i * 0.5 + "");
-            m1.put("rid", i * 3 + "");
-            String msg = JSON.toJSONString(m1);
-            System.out.println(msg);
-            kafkaProduce.produce(topic, msg);
-        }
-    }
-
-    @Test
     public void sendKafkaTest() {
         String topic = "test";
         for (int i = 260; i <= 270; i++) {
@@ -317,7 +248,7 @@ public class TestCreateNode {
     public void sendKafkaHolderV1() {
         //String topic = "inc_holder_relation_v1";
         String topic = "inc_node_relation_union";
-        for (int i = 1; i <= 10; i++) {
+        for (int i = 1; i <= 1000; i++) {
             HashMap<String, Object> m1 = new HashMap<>();
             m1.put("start_id", "start_id" + i);
             m1.put("end_id", "end_id" + i);
@@ -356,7 +287,7 @@ public class TestCreateNode {
     public void sendKafkaLegalEntityV1() {
         //String topic = "inc_legal_entity_relation_v1";
         String topic = "inc_node_relation_union";
-        for (int i = 20000; i <= 200000; i++) {
+        for (int i = 1; i <= 1000; i++) {
             HashMap<String, Object> m1 = new HashMap<>();
             m1.put("start_id", "start_id" + i);
             m1.put("end_id", "end_id" + i);
@@ -410,7 +341,7 @@ public class TestCreateNode {
     @Test
     public void sendKafkStaff1() {
         String topic = "inc_node_relation_union";
-        String s = "C:\\Users\\batmr\\Downloads\\8ec932c2-eb74-42c0-bfb0-4e2779383af7.csv";
+        String s = "C:\\Users\\batmr\\Downloads\\35c14491-fbbd-4aa4-b76c-93017b13dc90.csv";
         CsvReader reader = CsvUtil.getReader();
         //从文件中读取CSV数据
         CsvData data = reader.read(FileUtil.file(s));
@@ -425,5 +356,17 @@ public class TestCreateNode {
 
     }
 
+    @Test
+    public void sendKafkStaff2() {
+        String topic = "inc_node_relation_union";
+        String s = "D:\\Soft\\odpscmd_public\\bin\\tmp_xf_test4.csv";
+        FileReader fileReader = new FileReader(s);
+        List<String> lists = fileReader.readLines();
+        for (String mesaage : lists) {
+            kafkaProduce.produce(topic, mesaage);
+        }
+
+    }
+
 
 }

+ 4 - 3
src/test/java/com/winhc/test/TestMongo.java

@@ -1,6 +1,7 @@
 package com.winhc.test;
 
 import com.winhc.db.mongodb.dataobject.NodeRelationError;
+import com.winhc.db.mongodb.repository.NodeRelatonErrorRepository;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -17,8 +18,8 @@ import java.util.Date;
 @RunWith(SpringRunner.class)
 @SpringBootTest
 public class TestMongo {
-    //@Autowired
-    //NodeRelatonErrorRepository nodeRelatonErrorRepository;
+    @Autowired
+    NodeRelatonErrorRepository nodeRelatonErrorRepository;
 
     @Test
     public void saveData() {
@@ -31,6 +32,6 @@ public class TestMongo {
         n1.setStatus(1);
         n1.setTopicType("5");
         list.add(n1);
-        //nodeRelatonErrorRepository.saveAll(list);
+        nodeRelatonErrorRepository.saveAll(list);
     }
 }