KafkaConsumerNeo4jV2.java 3.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. package com.winhc.kafka.consumer;
  2. import com.winhc.common.enums.CompanyEnum;
  3. import com.winhc.db.mongodb.dataobject.NodeRelationError;
  4. import com.winhc.db.mongodb.repository.NodeRelatonErrorRepository;
  5. import com.winhc.service.RelationService;
  6. import com.winhc.utils.CompanyUtils;
  7. import lombok.AllArgsConstructor;
  8. import lombok.extern.slf4j.Slf4j;
  9. import org.apache.kafka.clients.consumer.ConsumerRecord;
  10. import org.springframework.beans.factory.annotation.Autowired;
  11. import org.springframework.context.annotation.Bean;
  12. import org.springframework.kafka.annotation.KafkaListener;
  13. import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
  14. import org.springframework.stereotype.Service;
  15. import java.util.List;
  16. import java.util.Map;
  17. /**
  18. * @author π
  19. * @Description:
  20. * @date 2021/1/8 16:04
  21. */
  22. @Slf4j
  23. @Service
  24. @AllArgsConstructor
  25. public class KafkaConsumerNeo4jV2 {
  26. private final Map<String, RelationService> map;
  27. @Autowired
  28. NodeRelatonErrorRepository nodeRelatonErrorRepository;
  29. @KafkaListener(id = "${spring.kafka.topic_node_relation_union}"
  30. , topics = "${spring.kafka.topic_node_relation_union}"
  31. , groupId = "${spring.kafka.consumer.group-id}", containerFactory = "containerFactory", errorHandler = "consumerAwareListenerErrorHandlerV2")
  32. public void consumerCompanyNode(List<ConsumerRecord<?, ?>> records) {
  33. List<Map<String, Object>> listMap = CompanyUtils.map(records);
  34. this.map.get(CompanyEnum.TopicType.COMPANY_NODE.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.COMPANY_NODE.CODE));
  35. this.map.get(CompanyEnum.TopicType.HOLDER_RELATION_V1.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.HOLDER_RELATION_V1.CODE));
  36. this.map.get(CompanyEnum.TopicType.HOLDER_RELATION_V2.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.HOLDER_RELATION_V2.CODE));
  37. this.map.get(CompanyEnum.TopicType.LEGAL_ENTITY_RELATION_V1.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.LEGAL_ENTITY_RELATION_V1.CODE));
  38. this.map.get(CompanyEnum.TopicType.LEGAL_ENTITY_RELATION_V2.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.LEGAL_ENTITY_RELATION_V2.CODE));
  39. this.map.get(CompanyEnum.TopicType.STAFF_RELATION.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.STAFF_RELATION.CODE));
  40. this.map.get(CompanyEnum.TopicType.PERSON_NODE_LABEL.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.PERSON_NODE_LABEL.CODE));
  41. }
  42. /**
  43. * 因为手动确认消费,若消费失败,记录重刷
  44. */
  45. @Bean("consumerAwareListenerErrorHandlerV2")
  46. public ConsumerAwareListenerErrorHandler dealError() {
  47. return (message, e, consumer) -> {
  48. List<NodeRelationError> nodeRelationErrors = CompanyUtils.toMessage((List<ConsumerRecord>) message.getPayload(), e.getMessage());
  49. nodeRelatonErrorRepository.saveAll(nodeRelationErrors);
  50. log.error("consumer error: save mongo size: {} , message: {}", nodeRelationErrors.size(), e.getMessage());
  51. return null;
  52. };
  53. }
  54. }