|
@@ -0,0 +1,128 @@
|
|
|
+package com.winhc.phoenix.example.configuration;
|
|
|
+
|
|
|
+import com.google.common.collect.Maps;
|
|
|
+import org.apache.kafka.clients.consumer.ConsumerConfig;
|
|
|
+import org.apache.kafka.clients.consumer.KafkaConsumer;
|
|
|
+import org.apache.kafka.clients.producer.ProducerConfig;
|
|
|
+import org.apache.kafka.common.serialization.StringDeserializer;
|
|
|
+import org.apache.kafka.common.serialization.StringSerializer;
|
|
|
+import org.springframework.beans.factory.annotation.Value;
|
|
|
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
|
|
+import org.springframework.context.annotation.Bean;
|
|
|
+import org.springframework.context.annotation.Configuration;
|
|
|
+import org.springframework.kafka.annotation.EnableKafka;
|
|
|
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
|
|
|
+import org.springframework.kafka.config.KafkaListenerContainerFactory;
|
|
|
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
|
|
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
|
|
+import org.springframework.kafka.core.KafkaTemplate;
|
|
|
+import org.springframework.kafka.core.ProducerFactory;
|
|
|
+
|
|
|
+import java.util.Map;
|
|
|
+
|
|
|
+/**
|
|
|
+ * @author: XuJiakai
|
|
|
+ * 2020/12/22 10:20
|
|
|
+ */
|
|
|
+@Configuration
|
|
|
+@EnableKafka
|
|
|
+@ConditionalOnProperty(value = "spring.profiles.active", havingValue = "prod", matchIfMissing = true)
|
|
|
+public class KafkaConfiguration {
|
|
|
+
|
|
|
+ @Value("${spring.kafka.bootstrap-servers}")
|
|
|
+ private String bootstrapServers;
|
|
|
+
|
|
|
+ @Value("${spring.kafka.consumer.enable-auto-commit}")
|
|
|
+ private Boolean autoCommit;
|
|
|
+
|
|
|
+ @Value("${spring.kafka.consumer.auto-commit-interval}")
|
|
|
+ private Integer autoCommitInterval;
|
|
|
+
|
|
|
+ @Value("${spring.kafka.consumer.group-id}")
|
|
|
+ private String groupId;
|
|
|
+
|
|
|
+ @Value("${spring.kafka.consumer.max-poll-records}")
|
|
|
+ private Integer maxPollRecords;
|
|
|
+
|
|
|
+ @Value("${spring.kafka.consumer.auto-offset-reset}")
|
|
|
+ private String autoOffsetReset;
|
|
|
+
|
|
|
+ @Value("${spring.kafka.producer.retries}")
|
|
|
+ private Integer retries;
|
|
|
+
|
|
|
+ @Value("${spring.kafka.producer.batch-size}")
|
|
|
+ private Integer batchSize;
|
|
|
+
|
|
|
+ @Value("${spring.kafka.producer.buffer-memory}")
|
|
|
+ private Integer bufferMemory;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 生产者配置信息
|
|
|
+ */
|
|
|
+ @Bean
|
|
|
+ public Map<String, Object> producerConfigs() {
|
|
|
+ Map<String, Object> props = Maps.newHashMap();
|
|
|
+ props.put(ProducerConfig.ACKS_CONFIG, "0");
|
|
|
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
|
|
|
+ props.put(ProducerConfig.RETRIES_CONFIG, retries);
|
|
|
+ props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
|
|
|
+ props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
|
|
|
+ props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
|
|
|
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
|
|
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
|
|
+ return props;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 生产者工厂
|
|
|
+ */
|
|
|
+ @Bean
|
|
|
+ public ProducerFactory<String, String> producerFactory() {
|
|
|
+ return new DefaultKafkaProducerFactory<>(producerConfigs());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 生产者模板
|
|
|
+ */
|
|
|
+ @Bean
|
|
|
+ public KafkaTemplate<String, String> kafkaTemplate() {
|
|
|
+ return new KafkaTemplate<>(producerFactory());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 消费者配置信息
|
|
|
+ */
|
|
|
+ @Bean
|
|
|
+ public Map<String, Object> consumerConfigs() {
|
|
|
+ Map<String, Object> props = Maps.newHashMap();
|
|
|
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
|
|
|
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
|
|
|
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
|
|
|
+ props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
|
|
|
+ props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 5000);
|
|
|
+ props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
|
|
|
+ props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
|
|
|
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
|
|
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
|
|
+ return props;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 消费者批量工程
|
|
|
+ */
|
|
|
+ @Bean
|
|
|
+ public KafkaListenerContainerFactory<?> batchFactory() {
|
|
|
+ ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
|
|
|
+ factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
|
|
|
+ //设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
|
|
|
+ factory.setBatchListener(true);
|
|
|
+ return factory;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ @Bean
|
|
|
+ public KafkaConsumer<String, String> kafkaConsumer() {
|
|
|
+ KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(consumerConfigs());
|
|
|
+ return kafkaConsumer;
|
|
|
+ }
|
|
|
+}
|