上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

Apache Kafka - 灵活控制Kafka消费

guduadmin111小时前

文章目录

  • 概述
  • 思路
  • Code
  • 扩展
    • KafkaListenerEndpointRegistry

      Apache Kafka - 灵活控制Kafka消费,在这里插入图片描述,第1张


      概述

      在实际应用中,往往需要根据业务需求动态开启/关闭Kafka消费者监听。例如,在某些时间段内,可能需要暂停对某个Topic的消费,或者在某些条件下才开启对某个Topic的消费。

      在Spring Boot中,要实现动态的控制或关闭消费以及动态开启或关闭监听,可以使用Spring Kafka提供的一些功能。


      思路

      首先,需要配置Kafka消费者的相关属性。在Spring Boot中,可以通过在application.properties或application.yml文件中添加相应的配置来实现。

      以下是一个示例配置:

      spring.kafka.consumer.bootstrap-servers=
      spring.kafka.consumer.group-id=<消费者组ID>
      

      接下来,可以创建一个Kafka消费者,使用@KafkaListener注解来指定要监听的Kafka主题,并编写相应的消息处理方法。例如:

      import org.springframework.kafka.annotation.KafkaListener;
      import org.springframework.stereotype.Component;
      @Component
      public class KafkaConsumer {
          @KafkaListener(topics = "")
          public void receive(String message) {
              // 处理接收到的消息
          }
      }
      

      现在,你可以使用以下两种方法来控制或关闭消费以及动态开启或关闭监听:

      方法1:使用@KafkaListener注解的autoStartup属性

      @KafkaListener注解具有一个名为autoStartup的属性,可以用于控制是否自动启动消费者。默认情况下,它的值为true,表示自动启动。如果将其设置为false,则消费者将不会自动启动。

      @KafkaListener(topics = "", autoStartup = "false")
      public void receive(String message) {
          // 处理接收到的消息
      }
      

      要在运行时动态启动消费者,你可以通过KafkaListenerEndpointRegistry bean来手动启动:

      @Autowired
      private KafkaListenerEndpointRegistry endpointRegistry;
      // 启动消费者
      endpointRegistry.getListenerContainer("").start();
      

      同样,你也可以使用stop()方法来停止消费者:

      // 停止消费者
      endpointRegistry.getListenerContainer("").stop();
      

      方法2:使用KafkaListenerEndpointRegistry bean的pause()和resume()方法

      KafkaListenerEndpointRegistry bean提供了pause()和resume()方法,用于暂停和恢复消费者的监听。

      @Autowired
      private KafkaListenerEndpointRegistry endpointRegistry;
      // 暂停消费者监听
      endpointRegistry.getListenerContainer("").pause();
      // 恢复消费者监听
      endpointRegistry.getListenerContainer("").resume();
      

      使用这些方法,可以在运行时动态地控制或关闭消费,以及动态地开启或关闭监听。


      Code

      import lombok.extern.slf4j.Slf4j;
      import org.apache.kafka.clients.consumer.ConsumerConfig;
      import org.springframework.beans.factory.annotation.Value;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
      import org.springframework.kafka.config.KafkaListenerContainerFactory;
      import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
      import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
      import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
      import org.springframework.kafka.listener.ContainerProperties;
      import java.util.HashMap;
      import java.util.Map;
      /**
       * @author artisan
       */
      @Slf4j
      @Configuration
      public class KafkaConfig {
          @Value("${spring.kafka.bootstrap-servers}")
          private String bootstrapServer;
          @Value("${spring.kafka.consumer.auto-offset-reset}")
          private String autoOffsetReset;
          @Value("${spring.kafka.consumer.enable-auto-commit}")
          private String enableAutoCommit;
          @Value("${spring.kafka.consumer.key-deserializer}")
          private String keyDeserializer;
          @Value("${spring.kafka.consumer.value-deserializer}")
          private String valueDeserializer;
          @Value("${spring.kafka.consumer.group-id}")
          private String group_id;
          @Value("${spring.kafka.consumer.max-poll-records}")
          private String maxPollRecords;
          @Value("${spring.kafka.consumer.max-poll-interval-ms}")
          private String maxPollIntervalMs;
          @Value("${spring.kafka.listener.concurrency}")
          private Integer concurrency;
          private final String consumerInterceptor = "net.zf.module.system.kafka.interceptor.FailureRateInterceptor";
          /**
           * 消费者配置信息
           */
          @Bean
          public Map consumerConfigs() {
              Map props = new HashMap<>(32);
              props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServer);
              props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
              props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,enableAutoCommit);
              props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,keyDeserializer);
              props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,valueDeserializer);
              props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,maxPollRecords);
              props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,maxPollIntervalMs);
              props.put(ConsumerConfig.GROUP_ID_CONFIG,group_id);
              props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,consumerInterceptor );
              return props;
          }
          /**
           * 消费者批量工厂
           */
          @Bean
          public KafkaListenerContainerFactory> batchFactory() {
              ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
              factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
              factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
              factory.setBatchListener(true);
              factory.setConcurrency(concurrency);
              return factory;
          }
      
          /**
           * 异常处理器
           *
           * @return
           */
          @Bean
          public ConsumerAwareListenerErrorHandler consumerAwareListenerErrorHandler() {
              return (message, exception, consumer) -> {
      //            log.error("消息{} , 异常原因{}", message, exception.getMessage());
                  log.error("consumerAwareListenerErrorHandler called");
                  return null;
              };
          }
      }
      

      使用

         @KafkaListener(topicPattern = KafkaTopicConstant.ATTACK_MESSAGE + ".*",
                  containerFactory = "batchFactory",
                  errorHandler = "consumerAwareListenerErrorHandler",
                  id = "attackConsumer")
          public void processMessage(List records, Acknowledgment ack)  {
              log.info("AttackKafkaConsumer 当前线程 {} , 本次拉取的数据总量:{} ", Thread.currentThread().getId(), records.size());
              try {
                  List attackMessages = new ArrayList();
                  records.stream().forEach(record -> {
                      messageExecutorFactory.process(KafkaTopicConstant.ATTACK_MESSAGE).execute(record, attackMessages);
                  });
                  if (!attackMessages.isEmpty()) {
                      attackMessageESService.addDocuments(attackMessages, false);
                  }
              } finally {
                  ack.acknowledge();
              }
          }
      

      在这段代码中,@KafkaListener注解表示这是一个Kafka消费者,

      • topicPattern参数指定了该消费者要监听的主题的模式,即以 KafkaTopicConstant.ATTACK_MESSAGE开头的所有主题。
      • containerFactory参数指定了用于创建Kafka监听器容器的工厂类别名。
      • errorHandler参数指定了用于处理监听器抛出异常的错误处理器。id参数指定了该消费者的ID。

        在该消费者的方法中,当有消息到达时,records参数将包含一组消息记录,ack参数用于手动确认已经消费了这些消息。

        在方法中,首先记录了当前线程ID和拉取的数据总量。将消息记录逐一处理,并将处理结果存储在一个名为attackMessages的列表中。如果列表不为空,则将其添加到ES搜索引擎中。

        最后,手动确认已经消费了这些消息。


        【控制】

        import lombok.extern.slf4j.Slf4j;
        import org.springframework.beans.factory.annotation.Autowired;
        import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
        import org.springframework.web.bind.annotation.GetMapping;
        import org.springframework.web.bind.annotation.RestController;
         
        @Slf4j
        @RestController
        public class KafkaConsumerController {
            @Autowired
            private KafkaListenerEndpointRegistry registry;
            /**
             * 开启监听
             */
            @GetMapping("/start")
            public void start() {
                // 判断监听容器是否启动,未启动则将其启动
                if (!registry.getListenerContainer("attackConsumer").isRunning()) {
                    log.info("start  ");
                    registry.getListenerContainer("attackConsumer").start();
                }
                // 将其恢复
                registry.getListenerContainer("attackConsumer").resume();
                log.info("resume over ");
            }
            /**
             * 关闭监听
             */
            @GetMapping("/pause")
            public void pause() {
                // 暂停监听
                registry.getListenerContainer("attackConsumer").pause();
                log.info("pause");
            }
        }
            
        

        扩展

        KafkaListenerEndpointRegistry

        KafkaListenerEndpointRegistry是 Spring Kafka 提供的一个组件,用于管理 Kafka 消费者监听器的注册和启动。它是一个接口,提供了管理 Kafka 监听器容器的方法,如注册和启动监听器容器,以及暂停和恢复监听器容器等。

        在 Spring Boot 应用程序中使用 @KafkaListener 注解时,Spring Kafka 会自动创建一个 KafkaListenerEndpointRegistry 实例,并使用它来管理所有的 Kafka 监听器容器。 它是 Spring Kafka 中的一个核心组件,用于实现 Kafka 消费者的监听和控制。

        Apache Kafka - 灵活控制Kafka消费,在这里插入图片描述,第2张

网友评论

搜索
最新文章
热门文章
热门标签