12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061 |
- package com.winhc.kafka.consumer;
- import com.winhc.common.enums.CompanyEnum;
- import com.winhc.db.mongodb.dataobject.NodeRelationError;
- import com.winhc.db.mongodb.repository.NodeRelatonErrorRepository;
- import com.winhc.service.RelationService;
- import com.winhc.utils.CompanyUtils;
- import lombok.AllArgsConstructor;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.context.annotation.Bean;
- import org.springframework.kafka.annotation.KafkaListener;
- import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
- import org.springframework.stereotype.Service;
- import java.util.List;
- import java.util.Map;
- /**
- * @author π
- * @Description:
- * @date 2021/1/8 16:04
- */
- @Slf4j
- @Service
- @AllArgsConstructor
- public class KafkaConsumerNeo4jV2 {
- private final Map<String, RelationService> map;
- @Autowired
- NodeRelatonErrorRepository nodeRelatonErrorRepository;
- @KafkaListener(id = "${spring.kafka.topic_node_relation_union}"
- , topics = "${spring.kafka.topic_node_relation_union}"
- , groupId = "${spring.kafka.consumer.group-id}", containerFactory = "containerFactory", errorHandler = "consumerAwareListenerErrorHandlerV2")
- public void consumerCompanyNode(List<ConsumerRecord<?, ?>> records) {
- List<Map<String, Object>> listMap = CompanyUtils.map(records);
- this.map.get(CompanyEnum.TopicType.COMPANY_NODE.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.COMPANY_NODE.CODE));
- this.map.get(CompanyEnum.TopicType.HOLDER_RELATION_V1.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.HOLDER_RELATION_V1.CODE));
- this.map.get(CompanyEnum.TopicType.HOLDER_RELATION_V2.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.HOLDER_RELATION_V2.CODE));
- this.map.get(CompanyEnum.TopicType.LEGAL_ENTITY_RELATION_V1.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.LEGAL_ENTITY_RELATION_V1.CODE));
- this.map.get(CompanyEnum.TopicType.LEGAL_ENTITY_RELATION_V2.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.LEGAL_ENTITY_RELATION_V2.CODE));
- this.map.get(CompanyEnum.TopicType.STAFF_RELATION.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.STAFF_RELATION.CODE));
- this.map.get(CompanyEnum.TopicType.PERSON_NODE_LABEL.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.PERSON_NODE_LABEL.CODE));
- }
- /**
- * 因为手动确认消费,若消费失败,记录重刷
- */
- @Bean("consumerAwareListenerErrorHandlerV2")
- public ConsumerAwareListenerErrorHandler dealError() {
- return (message, e, consumer) -> {
- List<NodeRelationError> nodeRelationErrors = CompanyUtils.toMessage((List<ConsumerRecord>) message.getPayload(), e.getMessage());
- nodeRelatonErrorRepository.saveAll(nodeRelationErrors);
- log.error("consumer error: save mongo size: {} , message: {}", nodeRelationErrors.size(), e.getMessage());
- return null;
- };
- }
- }
|