文章目录
- 自定义kafka客户端消费topic
- 结论
- 1 背景
- 2 spring集成2.1.8.RELEASE版本不支持autoStartup属性
- 3 自定义kafka客户端消费topic
- 3.1 yml配置
- 3.2 KafkaConfig客户端配置
- 3.3 手动启动消费客户端
自定义kafka客户端消费topic
结论
使用自定义的KafkaConsumer给spring进行管理,之后在注入topic的set方法中,开单线程主动订阅和读取该topic的消息。
1 背景
后端服务不需要启动时就开始监听消费,而是根据启动的模块或者用户自定义监听需要监听或者停止的topic
2 spring集成2.1.8.RELEASE版本不支持autoStartup属性
使用的spring集成2.1.8.RELEASE的版本,在@KafkaListener注解中没有找到可以直接配置属性autoStartup = "false"来手动启动topic,可能是版本低的原因,如果有可以支持的版本,也可以打在评论区,我去验证一下。
org.springframework.kafka spring-kafka 2.1.8.RELEASE @KafkaListener(topics = "
", autoStartup = "false") public void receive(String message) { // 处理接收到的消息 } 3 自定义kafka客户端消费topic
3.1 yml配置
spring: kafka: bootstrap-servers: 19.125.105.6:9092,19.125.105.7,19.125.105.8:9092 consumer: group-id: data-dev enable-auto-commit: true auto-offset-reset: latest auto-commit-interval: 1000 topic: costomTopic: costomData
3.2 KafkaConfig客户端配置
kafka其他配置项和原有的kafka客户端配置一样,只有额外增加了一个cutomConsumer让spring来管理,方便手动启动客户端来使用
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.*; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import java.util.HashMap; import java.util.Map; @Configuration public class KafkaConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.consumer.group-id}") private String groupId; @Value("${spring.kafka.consumer.enable-auto-commit}") private boolean enableAutoCommit; @Value("${spring.kafka.consumer.auto-offset-reset}") private String autoOffsetReset; // @Value("${spring.kafka.listener.concurrency}") // private Integer concurrency; @Value("${spring.kafka.consumer.auto-commit-interval}") private Integer autoCommitInterval; @Bean public KafkaTemplate
kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } @Bean KafkaListenerContainerFactory > kafkaContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); // concurrency factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); return factory; } private ProducerFactory producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } public ConsumerFactory consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } private Map producerConfigs() { Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.ACKS_CONFIG, "1"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } private Map consumerConfigs() { Map props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); return props; } @Bean public KafkaConsumer cutomConsumer() { // 新建一个自定义启动消费者 KafkaConsumer consumer = new KafkaConsumer<>(consumerConfigs()); return consumer; } } 3.3 手动启动消费客户端
这里手动启动消费客户端只有在配置了costomTopic才开始启动,如果需要动态指定启停topic
@Component public class CutomKafkaConsumer { // 使用cutomConsumer实例消费 @Autowired private KafkaConsumer cutomConsumer; @Value("${spring.kafka.topic.costomTopic:}") public void setCostomTopic(String costomTopic) { // 手动启动消费类,防止下级模块默认不配置costomTopic导致启动报错 if (StringUtils.isEmpty(costomTopic)) { return; } // 使这个消费者订阅对应话题 cutomConsumer.subscribe(Collections.singleton(costomTopic)); // 单线程拉取消息 ExecutorService consumerExecutor = Executors.newSingleThreadExecutor(); consumerExecutor.submit(new Runnable() { @Override public void run() { while (true) { ConsumerRecords
records = cutomConsumer.poll(3000); if (!records.iterator().hasNext()) { continue; } try { // 捕获异常,防止顶级消费循环被异常中断 records.forEach(record -> operate(record)); } catch (Exception e) { log.error("消费数据失败,失败原因: {}", e.getMessage(), e); } // 通过异步的方式提交位移 cutomConsumer.commitAsync(((offsets, exception) -> { if (exception == null) { offsets.forEach((topicPartition, metadata) -> { System.out.println(topicPartition + " -> offset=" + metadata.offset()); }); } else { exception.printStackTrace(); // 如果出错了,同步提交位移 cutomConsumer.commitSync(offsets); } })); } } }); } } public void operate(ConsumerRecord record) { log.info("kafkaTwoContainerFactory.operate start. key: {}, value : {}", record.key(), record.value()); } 参考:
Kafka消费者——API开发
Kafka Consumer如何实现精确一次消费数据
Apache Kafka - 灵活控制Kafka消费_动态开启/关闭监听实现
@KafkaListener 详解及消息消费启停控制
kafka多个消费者消费一个topic_kafka消费者组与重平衡机制,了解一下
kafka学习(五):消费者分区策略(再平衡机制)
Kafka 3.0 源码笔记(3)-Kafka 消费者的核心流程源码分析
猜你喜欢
网友评论
- 搜索
- 最新文章
- 热门文章