|
@@ -0,0 +1,61 @@
|
|
|
+package com.winhc.kafka.consumer;
|
|
|
+
|
|
|
+import com.winhc.common.enums.CompanyEnum;
|
|
|
+import com.winhc.config.ConfigConstant;
|
|
|
+import com.winhc.kafka.KafkaProduce;
|
|
|
+import com.winhc.service.RelationService;
|
|
|
+import com.winhc.utils.CompanyUtils;
|
|
|
+import lombok.AllArgsConstructor;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.context.annotation.Bean;
|
|
|
+import org.springframework.kafka.annotation.KafkaListener;
|
|
|
+import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+
|
|
|
+/**
|
|
|
+ * @author π
|
|
|
+ * @Description:案件增量消费v2
|
|
|
+ * @date 2021/8/30 16:12
|
|
|
+ */
|
|
|
+@Slf4j
|
|
|
+@Service
|
|
|
+@AllArgsConstructor
|
|
|
+public class KafkaConsumerCaseMergeV2 {
|
|
|
+
|
|
|
+ private final Map<String, RelationService> map;
|
|
|
+ @Autowired
|
|
|
+ KafkaProduce kafkaProduce;
|
|
|
+ @Autowired
|
|
|
+ ConfigConstant configConstant;
|
|
|
+
|
|
|
+ @KafkaListener(id = "${spring.kafka.topic_case_v2}"
|
|
|
+ , topics = "${spring.kafka.topic_case_v2}"
|
|
|
+ , groupId = "${spring.kafka.consumer.group-id}", containerFactory = "containerFactory", errorHandler = "KafkaConsumerCaseMergeV2")
|
|
|
+ public void consumerCompanyNode(List<ConsumerRecord<?, ?>> records) {
|
|
|
+ List<Map<String, Object>> listMap = CompanyUtils.map(records);
|
|
|
+ this.map.get(CompanyEnum.TopicType.CASE_RELATION_INCR.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.CASE_RELATION_INCR.CODE));
|
|
|
+ this.map.get(CompanyEnum.TopicType.CASE_NODE_INCR.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.CASE_NODE_INCR.CODE));
|
|
|
+ this.map.get(CompanyEnum.TopicType.CASE_RELATION_UPDATE.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.CASE_RELATION_UPDATE.CODE));
|
|
|
+ this.map.get(CompanyEnum.TopicType.CASE_NODE_UPDATE.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.CASE_NODE_UPDATE.CODE));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 因为手动确认消费,若消费失败,记录重刷
|
|
|
+ */
|
|
|
+ @Bean("KafkaConsumerCaseMergeV2")
|
|
|
+ public ConsumerAwareListenerErrorHandler dealError() {
|
|
|
+ return (message, e, consumer) -> {
|
|
|
+ List<String> list = CompanyUtils.toMessage((List<ConsumerRecord>) message.getPayload());
|
|
|
+ for (String msg : list) {
|
|
|
+ kafkaProduce.produce(configConstant.topic_case, msg);
|
|
|
+ }
|
|
|
+ log.error("KafkaConsumerCaseMerge error: {}", e.toString());
|
|
|
+ return null;
|
|
|
+ };
|
|
|
+ }
|
|
|
+}
|