|
@@ -1,11 +1,16 @@
|
|
|
package com.winhc.kafka.consumer;
|
|
|
|
|
|
import com.winhc.common.enums.CompanyEnum;
|
|
|
+import com.winhc.config.ConfigConstant;
|
|
|
+import com.winhc.db.mongodb.dataobject.NodeRelationError;
|
|
|
+import com.winhc.kafka.KafkaProduce;
|
|
|
import com.winhc.service.RelationService;
|
|
|
import com.winhc.utils.CompanyUtils;
|
|
|
import lombok.AllArgsConstructor;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.beans.factory.annotation.Value;
|
|
|
import org.springframework.context.annotation.Bean;
|
|
|
import org.springframework.kafka.annotation.KafkaListener;
|
|
|
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
|
|
@@ -25,14 +30,20 @@ import java.util.Map;
|
|
|
public class KafkaConsumerCaseMerge {
|
|
|
|
|
|
private final Map<String, RelationService> map;
|
|
|
+ @Autowired
|
|
|
+ KafkaProduce kafkaProduce;
|
|
|
+ @Autowired
|
|
|
+ ConfigConstant configConstant;
|
|
|
|
|
|
@KafkaListener(id = "${spring.kafka.topic_case}"
|
|
|
, topics = "${spring.kafka.topic_case}"
|
|
|
, groupId = "${spring.kafka.consumer.group-id}", containerFactory = "containerFactory", errorHandler = "KafkaConsumerCaseMerge")
|
|
|
public void consumerCompanyNode(List<ConsumerRecord<?, ?>> records) {
|
|
|
List<Map<String, Object>> listMap = CompanyUtils.map(records);
|
|
|
- this.map.get(CompanyEnum.TopicType.CASE_RELATION.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.CASE_RELATION.CODE));
|
|
|
- this.map.get(CompanyEnum.TopicType.CASE_NODE.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.CASE_NODE.CODE));
|
|
|
+ this.map.get(CompanyEnum.TopicType.CASE_RELATION_INCR.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.CASE_RELATION_INCR.CODE));
|
|
|
+ this.map.get(CompanyEnum.TopicType.CASE_NODE_INCR.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.CASE_NODE_INCR.CODE));
|
|
|
+ this.map.get(CompanyEnum.TopicType.CASE_RELATION_UPDATE.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.CASE_RELATION_UPDATE.CODE));
|
|
|
+ this.map.get(CompanyEnum.TopicType.CASE_NODE_UPDATE.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.CASE_NODE_UPDATE.CODE));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -41,8 +52,11 @@ public class KafkaConsumerCaseMerge {
|
|
|
@Bean("KafkaConsumerCaseMerge")
|
|
|
public ConsumerAwareListenerErrorHandler dealError() {
|
|
|
return (message, e, consumer) -> {
|
|
|
- System.out.println(e.toString());
|
|
|
- System.out.println(message);
|
|
|
+ List<String> list = CompanyUtils.toMessage((List<ConsumerRecord>) message.getPayload());
|
|
|
+ for (String msg : list) {
|
|
|
+ kafkaProduce.produce(configConstant.topic_case, msg);
|
|
|
+ }
|
|
|
+ log.error("KafkaConsumerCaseMerge error: {}", e.toString());
|
|
|
return null;
|
|
|
};
|
|
|
}
|