上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

自定义kafka客户端消费topic

guduadmin11天前

文章目录

  • 自定义kafka客户端消费topic
    • 结论
    • 1 背景
    • 2 spring集成2.1.8.RELEASE版本不支持autoStartup属性
    • 3 自定义kafka客户端消费topic
      • 3.1 yml配置
      • 3.2 KafkaConfig客户端配置
      • 3.3 手动启动消费客户端

        自定义kafka客户端消费topic

        结论

        使用自定义的KafkaConsumer给spring进行管理,之后在注入topic的set方法中,开单线程主动订阅和读取该topic的消息。

        1 背景

        后端服务不需要启动时就开始监听消费,而是根据启动的模块或者用户自定义监听需要监听或者停止的topic

        2 spring集成2.1.8.RELEASE版本不支持autoStartup属性

        使用的spring集成2.1.8.RELEASE的版本,在@KafkaListener注解中没有找到可以直接配置属性autoStartup = "false"来手动启动topic,可能是版本低的原因,如果有可以支持的版本,也可以打在评论区,我去验证一下。

        
            org.springframework.kafka
            spring-kafka
            2.1.8.RELEASE
        
        
        @KafkaListener(topics = "", autoStartup = "false") 
        public void receive(String message) {    
        	// 处理接收到的消息 
        }
        

        3 自定义kafka客户端消费topic

        3.1 yml配置

        spring:
          kafka:
              bootstrap-servers: 19.125.105.6:9092,19.125.105.7,19.125.105.8:9092
              consumer:
                group-id: data-dev
                enable-auto-commit: true
                auto-offset-reset: latest
                auto-commit-interval: 1000
              topic:
                costomTopic: costomData
        

        3.2 KafkaConfig客户端配置

        kafka其他配置项和原有的kafka客户端配置一样,只有额外增加了一个cutomConsumer让spring来管理,方便手动启动客户端来使用

        import org.apache.kafka.clients.consumer.ConsumerConfig;
        import org.apache.kafka.clients.consumer.KafkaConsumer;
        import org.apache.kafka.clients.producer.ProducerConfig;
        import org.apache.kafka.common.serialization.StringDeserializer;
        import org.apache.kafka.common.serialization.StringSerializer;
        import org.springframework.beans.factory.annotation.Value;
        import org.springframework.context.annotation.Bean;
        import org.springframework.context.annotation.Configuration;
        import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
        import org.springframework.kafka.config.KafkaListenerContainerFactory;
        import org.springframework.kafka.core.*;
        import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
        import java.util.HashMap;
        import java.util.Map;
        @Configuration
        public class KafkaConfig {
            @Value("${spring.kafka.bootstrap-servers}")
            private String bootstrapServers;
            @Value("${spring.kafka.consumer.group-id}")
            private String groupId;
            @Value("${spring.kafka.consumer.enable-auto-commit}")
            private boolean enableAutoCommit;
            @Value("${spring.kafka.consumer.auto-offset-reset}")
            private String autoOffsetReset;
            //    @Value("${spring.kafka.listener.concurrency}")
        //    private Integer concurrency;
            @Value("${spring.kafka.consumer.auto-commit-interval}")
            private Integer autoCommitInterval;
            @Bean
            public KafkaTemplate kafkaTemplate() {
                return new KafkaTemplate<>(producerFactory());
            }
            @Bean
            KafkaListenerContainerFactory> kafkaContainerFactory() {
                ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
                factory.setConsumerFactory(consumerFactory());
                // concurrency
                factory.setConcurrency(3);
                factory.getContainerProperties().setPollTimeout(3000);
                return factory;
            }
            private ProducerFactory producerFactory() {
                return new DefaultKafkaProducerFactory<>(producerConfigs());
            }
            public ConsumerFactory consumerFactory() {
                return new DefaultKafkaConsumerFactory<>(consumerConfigs());
            }
            private Map producerConfigs() {
                Map props = new HashMap<>();
                props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
                props.put(ProducerConfig.RETRIES_CONFIG, 0);
                props.put(ProducerConfig.ACKS_CONFIG, "1");
                props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
                props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
                return props;
            }
            private Map consumerConfigs() {
                Map props = new HashMap<>();
                props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
                props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
                props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
                props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
                props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
                props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
                props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
                return props;
            }
            @Bean
            public KafkaConsumer cutomConsumer() {
                // 新建一个自定义启动消费者
                KafkaConsumer consumer = new KafkaConsumer<>(consumerConfigs());
                return consumer;
            }
        }
        

        3.3 手动启动消费客户端

        这里手动启动消费客户端只有在配置了costomTopic才开始启动,如果需要动态指定启停topic

        @Component
        public class CutomKafkaConsumer {
            // 使用cutomConsumer实例消费
            @Autowired
            private KafkaConsumer cutomConsumer;
            @Value("${spring.kafka.topic.costomTopic:}")
            public void setCostomTopic(String costomTopic) {
                // 手动启动消费类,防止下级模块默认不配置costomTopic导致启动报错
                if (StringUtils.isEmpty(costomTopic)) {
                    return;
                }
                // 使这个消费者订阅对应话题
                cutomConsumer.subscribe(Collections.singleton(costomTopic));
                // 单线程拉取消息
                ExecutorService consumerExecutor = Executors.newSingleThreadExecutor();
                consumerExecutor.submit(new Runnable() {
                    @Override
                    public void run() {
                        while (true) {
                            ConsumerRecords records = cutomConsumer.poll(3000);
                            if (!records.iterator().hasNext()) {
                                continue;
                            }
                            try {
                                // 捕获异常,防止顶级消费循环被异常中断
                                records.forEach(record -> operate(record));
                            } catch (Exception e) {
                                log.error("消费数据失败,失败原因: {}", e.getMessage(), e);
                            }
                            // 通过异步的方式提交位移
                            cutomConsumer.commitAsync(((offsets, exception) -> {
                                if (exception == null) {
                                    offsets.forEach((topicPartition, metadata) -> {
                                        System.out.println(topicPartition + " -> offset=" + metadata.offset());
                                    });
                                } else {
                                    exception.printStackTrace();
                                    // 如果出错了,同步提交位移
                                    cutomConsumer.commitSync(offsets);
                                }
                            }));
                        }
                    }
                });
            }
        }    
        public void operate(ConsumerRecord record) {
            log.info("kafkaTwoContainerFactory.operate start. key: {}, value : {}", record.key(), record.value());
        }
        

        参考:

        Kafka消费者——API开发

        Kafka Consumer如何实现精确一次消费数据

        Apache Kafka - 灵活控制Kafka消费_动态开启/关闭监听实现

        @KafkaListener 详解及消息消费启停控制

        kafka多个消费者消费一个topic_kafka消费者组与重平衡机制,了解一下

        kafka学习(五):消费者分区策略(再平衡机制)

        Kafka 3.0 源码笔记(3)-Kafka 消费者的核心流程源码分析

网友评论

搜索
最新文章
热门文章
热门标签