package com.winhc.config; import com.google.common.collect.Maps; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; /** * @author π * @Description: * @date 2020/12/24 10:06 */ //@Component @Configuration @EnableKafka //@ConditionalOnProperty(value = "spring.profiles.active", havingValue = "dev", matchIfMissing = true) public class KafkaConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.consumer.enable-auto-commit}") private Boolean autoCommit; @Value("${spring.kafka.producer.retries}") private Integer retries; @Value("${spring.kafka.producer.batch-size}") private Integer batchSize; @Value("${spring.kafka.producer.buffer-memory}") private Integer bufferMemory; /** * 生产者配置信息 */ @Bean public Map producerConfigs() { Map props = Maps.newHashMap(); props.put(ProducerConfig.ACKS_CONFIG, "0"); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.RETRIES_CONFIG, retries); props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "3000"); return props; } /** * 生产者工厂 */ @Bean public ProducerFactory producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } /** * 生产者模板 */ @Bean public KafkaTemplate kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } @Bean("containerFactory") public ConcurrentKafkaListenerContainerFactory containerFactory() { ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory<>(); container.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps())); // 设置并发量,小于或等于Topic的分区数 container.setConcurrency(1); // 拉取超时时间 //container.getContainerProperties().setPollTimeout(1500); // 设置为批量监听 container.setBatchListener(true); // 设置提交偏移量的方式 //container.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); return container; } private Map consumerProps() { Map props = new HashMap<>(8); // kafka服务地址 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); // 设置是否自动提交 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit); // 一次拉取消息数量 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5000); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000); props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000); // 最大处理时间 props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 120000); // 序列化 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } }