上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

[springboot配置Kafka] springboot配置多个kafka,包含账号密码

guduadmin19小时前

说明

本示例只配置了Consumer没有配置Producer,可参考配置文件_1中注释内容部分

1.引入依赖

    
      org.springframework.kafka
      spring-kafka
    

2.yml配置

spring:
  kafka:
    one:
    #测试环境
      bootstrap-servers: 127.0.0.1:9092
      topic: default_topic
      properties:
        security:
          protocol: SASL_PLAINTEXT
        sasl:
          mechanism: SCRAM-SHA-512
          jaas:
            config: org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";
      consumer:
        # 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
        group-id: defaultName
        #关闭自动提交
        enable-auto-commit: false
        #重置消费者的offset
        # smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest
        auto-offset-reset: latest
        #key value 的反序列化
        key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        max-poll-records: 5
    two:
      #测试环境
      bootstrap-servers: 127.0.0.1:9092
      topic: default_topic_two
      consumer:
        # 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
        group-id: defaultName_two
        #关闭自动提交
        enable-auto-commit: false
        #重置消费者的offset
        # smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest
        auto-offset-reset: latest
        #key value 的反序列化
        key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        max-poll-records: 5

3.新建配置文件

3.1配置文件_1

@Configuration
@EnableKafka
public class K1kafkaConfiguration {
    @Value("${spring.kafka.one.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.one.consumer.group-id}")
    private String groupId;
    @Value("${spring.kafka.one.consumer.enable-auto-commit}")
    private String enableAutoCommit;
    @Value("${spring.kafka.one.consumer.auto-offset-reset}")
    private String autoOffsetReset;
    @Value("${spring.kafka.one.consumer.max-poll-records}")
    private String maxPollRecords;
    @Value("${spring.kafka.one.properties.security.protocol}")
    private String securityprotocol;
    @Value("${spring.kafka.one.properties.sasl.mechanism}")
    private String mechanism;
    @Value("${spring.kafka.one.properties.sasl.jaas.config}")
    private String jaasconfig;
    //@Value("${spring.kafka.one.producer.linger-ms}")
    //private Integer lingerMs;
    //@Value("${spring.kafka.one.producer.max-request-size}")
    //private Integer maxRequestSize;
    //@Value("${spring.kafka.one.producer.batch-size}")
    //private Integer batchSize;
    //@Value("${spring.kafka.one.producer.buffer-memory}")
    //private Integer bufferMemory;
    //@Bean
    //public KafkaTemplate kafkaOneTemplate() {
    //    return new KafkaTemplate<>(producerFactory());
    //}
    @Bean
    @Primary
//理解为默认优先选择当前容器下的消费者工厂
    KafkaListenerContainerFactory> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        //并发数量
        factory.setConcurrency(1);
        //开启批量监听
         //factory.setBatchListener(type);
        // 被过滤的消息将被丢弃
        // factory.setAckDiscarded(true);
        factory.getContainerProperties().setPollTimeout(3000);
        //设置手动提交ackMode
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.getContainerProperties().setMissingTopicsFatal(false);
        // 设置记录筛选策略
        //factory.setRecordFilterStrategy(new RecordFilterStrategy() {
        //    @Override
        //    public boolean filter(ConsumerRecord consumerRecord) {
        //        String msg = consumerRecord.value().toString();
        //        if(Integer.parseInt(msg.substring(msg.length() - 1)) % 2 == 0){
        //            return false;
        //        }
        //        // 返回true消息将会被丢弃
        //        return true;
        //    }
        //});
        return factory;
    }
    //private ProducerFactory producerFactory() {
    //    return new DefaultKafkaProducerFactory<>(producerConfigs());
    //}
    @Bean//第一个消费者工厂的bean
    public ConsumerFactory consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
    //private Map producerConfigs() {
    //    Map props = new HashMap<>();
    //    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    //    props.put(ProducerConfig.LINGER_MS_CONFIG,lingerMs);
    //    props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxRequestSize);
    //    props.put(ProducerConfig.BATCH_SIZE_CONFIG,batchSize);
    //    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,bufferMemory);
    //    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    //    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    //    return props;
    //}
    @Bean
    public Map consumerConfigs() {
        Map props = new HashMap<>();
        props.put("security.protocol",securityprotocol);
        props.put("sasl.mechanism",mechanism);
        props.put("sasl.jaas.config",jaasconfig);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,enableAutoCommit);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,autoOffsetReset);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,maxPollRecords);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

3.2配置文件_2

@Configuration
@EnableKafka
public class K2kafkaConfiguration {
    @Value("${spring.kafka.two.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.two.consumer.group-id}")
    private String groupId;
    @Value("${spring.kafka.two.consumer.enable-auto-commit}")
    private String enableAutoCommit;
    @Value("${spring.kafka.two.consumer.auto-offset-reset}")
    private String autoOffsetReset;
    @Value("${spring.kafka.two.consumer.max-poll-records}")
    private String maxPollRecords;
  
    @Bean
    KafkaListenerContainerFactory> kafkaTwoContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        //并发数量
        factory.setConcurrency(1);
        //开启批量监听
        //factory.setBatchListener(type);
        // 被过滤的消息将被丢弃
        // factory.setAckDiscarded(true);
        factory.getContainerProperties().setPollTimeout(3000);
        //设置手动提交ackMode
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.getContainerProperties().setMissingTopicsFatal(false);
        return factory;
    }
    public ConsumerFactory consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
 
    private Map consumerConfigs() {
        Map props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,enableAutoCommit);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,autoOffsetReset);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,maxPollRecords);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

4.设置消费

4.1 设置消费_1

@Component
@Slf4j(topic = "KAFKALOG")
public class Consumer {
    @KafkaListener(topics = "#{'${spring.kafka.one.topic}'}", groupId = "defaultName",containerFactory = "kafkaListenerContainerFactory")
    public void listenGroup(ConsumerRecord record, Acknowledgment ack) {
        log.info("[Consumer] 接收到kafka消息:{}",record.value());
        System.out.println(record);
        System.out.println(record.value());
        //手动提交offset
        //ack.acknowledge();
    }

4.2 设置消费_2

@Component
@Slf4j(topic = "KAFKALOG")
public class Consumer2 {
    @KafkaListener(topics = "#{'${spring.kafka.two.topic}'}", groupId = "defaultName_two",containerFactory = "kafkaTwoContainerFactory")
    public void listenGroup(ConsumerRecord record, Acknowledgment ack) {
        log.info("[Consumer2 ] 接收到kafka消息:{}",record.value());
        System.out.println(record);
        System.out.println(record.value());
        //手动提交offset
        //ack.acknowledge();
    }

网友评论

搜索
最新文章
热门文章
热门标签