上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

【Spring连载】使用Spring访问 Apache Kafka(三)----接收消息

guduadmin13小时前

【Spring连载】使用Spring访问 Apache Kafka(三)----接收消息

  • 一、消息监听器Message Listeners
  • 二、消息监听器容器Message Listener Containers
    • 2.1 使用KafkaMessageListenerContainer
    • 2.2 使用ConcurrentMessageListenerContainer
    • 2.3 监听器容器自动启动Listener Container Auto Startup
    • 2.4 提交偏移量Committing Offsets
    • 2.5 手动提交偏移量Manually Committing Offsets
    • 三、@KafkaListener 注解
      • 3.1 Record监听器
      • 3.2 明确的分区分配Explicit Partition Assignment
      • 3.3 手动确认Manual Acknowledgment
      • 3.4 消费者记录元数据Consumer Record Metadata
      • 3.5 Batch监听器
      • 3.6 注解属性Annotation Properties
      • 3.7 获取 Consumer的group.id
      • 3.8 容器线程命名Container Thread Naming
      • 3.9 @KafkaListener做为元注解Meta Annotation
      • 3.10 @KafkaListener on a Class
      • 3.11 @KafkaListener属性修改 Attribute Modification
      • 3.12 @KafkaListener生命周期管理 Lifecycle Management
      • 3.13 @KafkaListener @Payload Validation
      • 3.14 再平衡监听器Rebalancing Listeners
      • 3.15 使用@SendTo转发监听器结果Forwarding Listener Results using @SendTo
      • 3.16 过滤消息Filtering Messages
      • 3.17 顺序启动Starting @KafkaListener s in Sequence
      • 3.18 使用KafkaTemplate接收Using KafkaTemplate to Receive

        一、消息监听器Message Listeners

        当你使用消息监听器容器,你必须提供一个监听器接收数据。现在有8个受支持的接口做为消息监听器。见以下代码:

        public interface MessageListener { 
            void onMessage(ConsumerRecord data);
        }
        public interface AcknowledgingMessageListener { 
            void onMessage(ConsumerRecord data, Acknowledgment acknowledgment);
        }
        public interface ConsumerAwareMessageListener extends MessageListener { 
            void onMessage(ConsumerRecord data, Consumer consumer);
        }
        public interface AcknowledgingConsumerAwareMessageListener extends MessageListener { 
            void onMessage(ConsumerRecord data, Acknowledgment acknowledgment, Consumer consumer);
        }
        public interface BatchMessageListener { 
            void onMessage(List> data);
        }
        public interface BatchAcknowledgingMessageListener { 
            void onMessage(List> data, Acknowledgment acknowledgment);
        }
        public interface BatchConsumerAwareMessageListener extends BatchMessageListener { 
            void onMessage(List> data, Consumer consumer);
        }
        public interface BatchAcknowledgingConsumerAwareMessageListener extends BatchMessageListener { 
            void onMessage(List> data, Acknowledgment acknowledgment, Consumer consumer);
        }
        

        MessageListener:,当使用自动提交或者容器管理的提交方法之一时,使用MessageListener接口处理从kafka consumer的poll方法收到的单个ConsumerRecord实例。

        AcknowledgingMessageListener:当使用手动提交方法之一时,使用AcknowledgingMessageListener接口处理Kafka consumer的poll()方法收到的单个ConsumerRecord实例。

        ConsumerAwareMessageListener:当使用自动提交或者容器管理的提交方法之一时,使用ConsumerAwareMessageListener接口处理从 Kafka consumer的poll方法收到的单个ConsumerRecord 实例。接口提供了对Consumer对象的访问。

        AcknowledgingConsumerAwareMessageListener:当使用手动提交方法之一时,使用AcknowledgingConsumerAwareMessageListener处理从Kafka consumer的poll方法收到的单个ConsumerRecord实例。接口提供了对Consumer对象的访问。

        BatchMessageListener:当使用自动提交或者容器管理的提交方法之一,使用BatchMessageListener处理从Kafka consumer的poll()方法收到的所有ConsumerRecord实例。因为已为监听器提供了完整的batch,当使用这个接口时AckMode.RECORD不被支持。

        BatchAcknowledgingMessageListener:当使用手动提交方法之一时,使用BatchAcknowledgingMessageListener处理Kafka consumer的poll()方法收到的所有ConsumerRecord实例。

        BatchConsumerAwareMessageListener:当使用自动提交或者容器管理的提交方法之一,使用BatchConsumerAwareMessageListener接口处理从 Kafka consumer的poll()方法收到的所有ConsumerRecord实例。因为已为监听器提供了完整的batch,当使用这个接口时AckMode.RECORD不被支持。接口提供了对Consumer对象的访问。

        BatchAcknowledgingConsumerAwareMessageListener:当使用手动提交方法之一时,使用BatchAcknowledgingConsumerAwareMessageListener接口处理从Kafka consumer的poll()方法收到的所有ConsumerRecord实例。接口提供了对Consumer对象的访问。

        注意,Consumer对象线程不安全,你只能在调用监听器的线程唤起它的方法。

        你不能在监听器中执行任何影响consumer的位置或者已提交的偏移量的Consumer方法;容器需要管理这些信息。

        二、消息监听器容器Message Listener Containers

        框架提供了2个MessageListenerContainer实现:

        • KafkaMessageListenerContainer
        • ConcurrentMessageListenerContainer

          KafkaMessageListenerContainer 以单线程方式接收所有topic和分区的所有消息。ConcurrentMessageListenerContainer委派给一个或多个KafkaMessageListenerContainer实例提供多线程消费。

          你可以为监听器容器增加一个RecordInterceptor;它会在调用监听器前被唤起,并允许检查和修改记录。如果拦截器返回null,监听器不会被调用。它还有额外的方法会在监听器退出时被调用(正常触发或者抛异常触发)。现在还有一个BatchInterceptor,提供类似的功能给Batch监听器。另外,ConsumerAwareRecordInterceptor 和BatchInterceptor提供了对Consumer的访问。这可能有用,比如在拦截器中访问consumer metrics。

          你不可以在拦截器中执行任何影响consumer位置和已提交偏移量的方法,容器需要管理这些信息。

          如果拦截器转换一条记录(创建一条新的),topic、partition、offset都必须保持原样,以避免类似于数据丢失等的意想不到的情况。

          CompositeRecordInterceptor和CompositeBatchInterceptor 可以被用来唤起多个拦截器。

          默认情况下,当使用事务的时候,拦截器在事务开始之前被唤起。你可以设置监听器容器的interceptBeforeTx 属性为false,以便在事务开始之后唤起拦截器。这个配置将应用在任何事务管理器上,不只是KafkaAwareTransactionManager。比如,它允许拦截器参与容器启动的JDBC事务。

          当concurrency 大于1的时候,ConcurrentMessageListenerContainer现在支持kafka的Static Membership特性。group.instance.id的值带有-n的后缀,n从1开始。这个属性连同一个增加的session.timeout.ms(45秒)可以用来减少rebalance 事件,比如在应用重启的时候。

          2.1 使用KafkaMessageListenerContainer

          KafkaMessageListenerContainer的构造方法如下:

          public KafkaMessageListenerContainer(ConsumerFactory consumerFactory,
                              ContainerProperties containerProperties)
          

          这个构造方法接收一个ConsumerFactory和关于topic和partition的信息,还有其他配置,都包含在ContainerProperties 对象。ContainerProperties 有如下构造方法:

          public ContainerProperties(TopicPartitionOffset... topicPartitions)
          public ContainerProperties(String... topics)
          public ContainerProperties(Pattern topicPattern)
          

          第一个构造方法接收一个TopicPartitionOffset数组,明确指示容器使用哪一个分区(使用consumer 的assign()方法),并带有一个可选的初始偏移量。默认情况下,正值是绝对偏移;负值是相对于一个分区中的当前最后偏移量。框架提供了TopicPartitionOffset 的一个带有额外的布尔值参数的构造方法。如果参数为true,初始偏移量(正值或负值)是相对于consumer的当前位置。偏移量在容器启动时应用。

          第二个构造方法接收一个topic数组,Kafka根据group.id 分配分区–在整个消费者组分发分区。

          第三个构造方法接收一个正则表达式,用来选择分区。

          你可以在创建容器的时候使用ContainerProps.setMessageListener来分配一个MessageListener给容器。详见以下代码:

          ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
          containerProps.setMessageListener(new MessageListener() {
              ...
          });
          DefaultKafkaConsumerFactory cf =
                                  new DefaultKafkaConsumerFactory<>(consumerProps());
          KafkaMessageListenerContainer container =
                                  new KafkaMessageListenerContainer<>(cf, containerProps);
          return container;
          

          注意当创建DefaultKafkaConsumerFactory时,使用接收以上代码中的properties的构造方法意味着key和value的Deserializer是从配置中发现的。或者,key和value的Deserializer实例可以被传给DefaultKafkaConsumerFactory的构造方法,这种情况下,所有Consumers共享同样的实例。还有一个选择是提供Supplier,它会被用来为每个Consumer获取单独的Deserializer实例:

          DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
          KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProps);
          return container;
          

          ContainerProperties中有一个logContainerConfig属性,当设置为true时INFO级别的日志被开启,每个监听器容器会写一条日志汇总容器的配置属性。

          默认情况下,topic偏移量提交的日志是在debug日志级别执行的。ContainerProperties中的commitLogLevel属性可以指定这些消息的日志级别。例如,将日志级别改为INFO:

          containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO);.
          

          ContainerProperties中还有一个missingTopicsFatal属性,默认值是false。如果broker上没有配置的topic,它会阻止容器的启动。如果容器被配置为监听topic的正则表达式,这个配置不会起效。

          ContainerProperties的父类ConsumerProperties中的authExceptionRetryInterval,在从KafkaConsumer获取到任何AuthenticationException或者AuthorizationException后,会引起容器重试获取消息。例如,这可以发生在配置的用户被拒绝访问特定topic或者credentials不正确。定义authExceptionRetryInterval 允许容器在被授予合适的权限时恢复。

          默认情况下,没有interval被配置–authentication和authorization错误被认为是严重的,会导致容器停止。

          当创建consumer工厂时,如果你把deserializers做为对象提供(在构造方法或者setter),工厂将调用configure()方法来使用配置属性配置他们。

          2.2 使用ConcurrentMessageListenerContainer

          ConcurrentMessageListenerContainer唯一的构造方法类似于KafkaMessageListenerContainer的构造方法:

          public ConcurrentMessageListenerContainer(ConsumerFactory consumerFactory,ContainerProperties containerProperties)
          

          这个容器还有一个concurrency属性,例如,container.setConcurrency(3)创建3个KafkaMessageListenerContainer实例。对于这个构造方法,Kafka使用它的组管理能力向所有consumer分发分区。

          当监听多个topic时,默认的分区分布可能不是你期望的。例如,如果你有3个topic,每topic5个分区,你希望使用concurrency=15,但是结果你只能看到5个活跃的consumer,每个consumer分配了每个topic的一个分区,留下10个空闲的consumer。这是因为默认的kafka 分区分配器是RangeAssignor。在这个场景下,你可能需要考虑使用RoundRobinAssignor,它会向所有consumer分发分区。然后,每个consumer被分配一个分区。要修改分区分配器,你可以设置提供给DefaultKafkaConsumerFactory的consumer属性partition.assignment.strategy(ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG)。如果使用spring boot,也可以这样配置:

          spring.kafka.consumer.properties.partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
          

          当容器属性配置了TopicPartitionOffset,ConcurrentMessageListenerContainer 向所有委派的KafkaMessageListenerContainer 实例分发TopicPartitionOffset 实例。比如,有6个TopicPartitionOffset实例, concurrency 是3,则每个容器获得2个分区。如果有5个TopicPartitionOffset 实例,2个容器分别获得2个分区,第三个容器获得1个分区。如果concurrency 大于TopicPartitions的数量,concurrency 会被降低使得每个容器获得一个分区。

          CommonClientConfigs的client.id属性在ConcurrentMessageListenerContainer中被修改,它会被追加一个-n,n是对应concurrency的index。当JMX启用时,这是给MBeans 提供唯一的名字所需要的。

          MessageListenerContainer 提供了对底层KafkaConsumer的metrics 的访问。在ConcurrentMessageListenerContainer中,metrics()方法返回所有目标KafkaMessageListenerContainer 实例的metrics。根据为底层KafkaConsumer提供的client-id,这些metrics被分组到Map

          ContainerProperties 的idleBetweenPolls 选项让监听器容器中的主循环在KafkaConsumer.poll()调用间休眠。实际休眠间隔选的是idleBetweenPolls和max.poll.interval.ms(consumer配置)与记录批处理时间差值的较小值。

          2.3 监听器容器自动启动Listener Container Auto Startup

          监听器容器实现了SmartLifecycle,默认情况下autoStartup为true。容器是在后期启动的(Integer.MAX-VALUE-100)。其他实现了SmartLifecycle的组件应该在更早的阶段启动,以处理来自监听器的数据。-100为后面的阶段留出了空间,使组件能够在容器之后自动启动。

          2.4 提交偏移量Committing Offsets

          有几个提交偏移量的选择。如果consumer的enable.auto.commit属性为true,kafka根据这个配置自动提交偏移。如果这个属性为false,容器支持几种AckMode设置。默认的AckMode是BATCH。KafkaMessageListenerContainer中,enable.auto.commit默认被设置为了false。

          consumer的poll()方法返回一条或多条ConsumerRecords。MessageListener 为每条记录被调用。以下列表描述了容器在每个AckMode 的行为(没有使用事务):

          • RECORD: 在监听器处理完记录返回时提交偏移量。
          • BATCH:在poll()返回的所有记录被处理之后提交偏移量。
          • TIME:在poll()返回的所有记录被处理后,只要自上次提交后的时间超过了ackTime,就提交偏移量。
          • COUNT:在poll()返回的所有记录被处理后,只要自上次提交后收到了ackCount条记录就提交偏移量。
          • COUNT_TIME:类似于TIME和COUNT,当任一条件为true时就提交。
          • MANUAL:消息监听器负责acknowledge()操作,除了这个,与BATCH 相同的语义被应用。
          • MANUAL_IMMEDIATE:在acknowledge()被监听器调用时立即提交偏移量。

            在使用事务的时候,偏移量被发往事务,语义等同于RECORD或者BATCH,具体取决于监听器类型(record或者batch)。

            MANUAL和 MANUAL_IMMEDIATE要求监听器类型为AcknowledgingMessageListener 或者BatchAcknowledgingMessageListener。

            依赖于容器的syncCommits属性,consumer的commitSync() 或者commitAsync()方法被调用。syncCommits默认是true。setCommitCallback可以获取异步提交的结果,默认的callback 是LoggingCommitCallback。它会记录error(会在debug级别记录success)。

            因为监听器容器有自己的机制来提交偏移量,容器需要ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG设置为false。现在这个属性无条件地被设置为false,除非在consumer工厂或者容器的consumer属性覆盖。

            Acknowledgment接口有如下方法:

            public interface Acknowledgment {
                void acknowledge();
                default void nack(Duration sleep) {
            		throw new UnsupportedOperationException("nack(sleep) is not supported by this Acknowledgment");
            	}
            	default void nack(int index, Duration sleep) {
            		throw new UnsupportedOperationException("nack(index, sleep) is not supported by this Acknowledgment");
            	}
            }
            

            acknowledge方法让监听器控制何时偏移量被提交。

            nack(long sleep) 配合record监听器使用;nack(int index, long sleep)配合batch监听器使用。为监听器类型调用错误的方法会抛出IllegalStateException。

            如果你想提交批次的一部分,使用nack()。当使用事务时,设置AckMode 为MANUAL,调用nack() 将发送成功处理的记录偏移量到事务。

            nack()只能在唤起监听器的consumer线程被调用。

            当使用Out of Order Commits,nack()是不允许的。

            使用record监听器,在调用nack()时,任何待定的偏移量被提交,从上次poll留下的记录被丢弃,并且查找会在分区上被执行,以便失败的记录和未处理的记录在下一次poll的时候被重新传送。通过设置sleep参数,consumer在重新传送之前可以被暂停。这与当容器配置了DefaultErrorHandler时抛异常的功能类似。

            当使用batch监听器,你可以指定发生故障的batch中的索引。当调用nack()时,在该索引之前的记录的偏移量将被提交 ,然后对失败的和丢弃的记录的查找会在分区上被执行 ,以便在下一次poll()中重新传送这些记录。

            consumer在睡眠期间暂停,因此我们继续轮询broker以保持consumer活跃。实际的睡眠时间取决于容器的pollTimeout,默认值为5秒。最短睡眠时间等于pollTimeout,所有睡眠时间都是它的倍数。对于较小的睡眠时间,或者为了提高其准确性,可以考虑减少容器的pollTimeout。

            batch监听器可以调用acknowledge(index)方法来提交batch的一部分偏移量。当调用这个方法,在index位置的记录以及在index之前的所有记录的偏移量将被提交。在执行部分batch提交后调用acknowledge()将提交batch其余部分的偏移量。这有以下限制:

            • AckMode.MANUAL_IMMEDIATE是需要的
            • 该方法必须在监听器线程被调用
            • 监听器必须消费一个List而不是原始的ConsumerRecords
            • index参数必须在list的元素范围内
            • index参数必须大于上一次调用使用的值

              这些限制是强制执行的,该方法将根据违反情况抛出IllegalArgumentException或IllegalStateException。

              2.5 手动提交偏移量Manually Committing Offsets

              通常,当使用“AckMode.MANUAL”或“AckMode.MANUAL_IMMEDIATE”时,acknowledgments必须按顺序执行,因为Kafka不维护每个记录的状态,只为每个组/分区保留一个已提交的偏移量。你现在可以设置容器属性asyncAcks,它允许acknowledgments 以任何顺序确认轮询返回的记录。监听器容器将推迟无序提交,直到收到缺少的acknowledgments 为止。consumer将被暂停(没有新记录被传送),直到提交了上一次轮询的所有偏移量。

              虽然此功能允许应用程序异步处理记录,但应该理解,它增加了失败后重复(duplicate )传送的可能性。

              三、@KafkaListener 注解

              @KafkaListener注解用于指定一个bean方法为监听器容器中的监听器。这个bean被封装在一个MessagingMessageListenerAdapter中,该适配器配置了各种功能,如转换器,以便在必要时转换数据以匹配方法参数。

              你可以使用SpEL#{…​}或者属性占位符${…​} 在注解上配置大部分的属性。

              3.1 Record监听器

              @KafkaListener注解提供了简单POJO 监听器的机制。见如下例子:

              public class Listener {
                  @KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
                  public void listen(String data) {
                      ...
                  }
              }
              

              这个机制需要在你的一个@Configuration类上有@EnableKafka注释和一个监听器容器工厂,该工厂用于配置底层的ConcurrentMessageListenerContainer。默认情况下,需要一个名为kafkaListenerContainerFactory的bean。以下示例显示了如何使用ConcurrentMessage ListenerContainer:

              @Configuration
              @EnableKafka
              public class KafkaConfig {
                  @Bean
                  KafkaListenerContainerFactory>
                                      kafkaListenerContainerFactory() {
                      ConcurrentKafkaListenerContainerFactory factory =
                                              new ConcurrentKafkaListenerContainerFactory<>();
                      factory.setConsumerFactory(consumerFactory());
                      factory.setConcurrency(3);
                      factory.getContainerProperties().setPollTimeout(3000);
                      return factory;
                  }
                  @Bean
                  public ConsumerFactory consumerFactory() {
                      return new DefaultKafkaConsumerFactory<>(consumerConfigs());
                  }
                  @Bean
                  public Map consumerConfigs() {
                      Map props = new HashMap<>();
                      props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
                      ...
                      return props;
                  }
              }
              

              请注意,要设置容器属性,必须在工厂上调用getContainerProperties()方法。它被用作注入到容器中的实际属性的模板。

              现在可以为注解创建的consumer设置client.id属性。clientIdPrefix的后缀是-n,其中n是一个整数,表示使用并发时的容器编号。

              现在还可以通过使用注解的属性来覆盖容器工厂的concurrency 和autoStartup属性。属性可以是简单的值、属性占位符或SpEL表达式。以下示例显示了如何执行此操作:

              @KafkaListener(id = "myListener", topics = "myTopic",
                      autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
              public void listen(String data) {
                  ...
              }
              

              3.2 明确的分区分配Explicit Partition Assignment

              你可以使用明确的主题和分区(以及可选的初始偏移量)配置POJO监听器。以下示例展示了如何执行此操作:

              @KafkaListener(id = "thing2", topicPartitions =
                      { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
                        @TopicPartition(topic = "topic2", partitions = "0",
                           partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
                      })
              public void listen(ConsumerRecord record) {
                  ...
              }
              

              你可以在partitions或partitionOffsets属性中指定每个分区,但不能同时指定两者。

              与大多数注解属性一样,你可以使用SpEL表达式。

              你可以将初始偏移initialOffset应用于所有已分配的分区:

              @KafkaListener(id = "thing3", topicPartitions =
                      { @TopicPartition(topic = "topic1", partitions = { "0", "1" },
                           partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))
                      })
              public void listen(ConsumerRecord record) {
                  ...
              }
              

              *通配符表示partitions属性中的所有分区。每个@TopicPartition中只能有一个带有通配符的@PartitionOffset。

              此外,当监听器实现ConsumerSeekAware时,即使在使用手动分配的情况下,onPartitionsAssigned也会被调用。例如,这允许在那个时候进行任意的查找操作。

              你可以指定一个逗号分隔的partitions列表或partition范围:

              @KafkaListener(id = "pp", autoStartup = "false",
                      topicPartitions = @TopicPartition(topic = "topic1",
                              partitions = "0-5, 7, 10-15"))
              public void process(String in) {
                  ...
              }
              

              以上范围包括了起始位置;该示例将分配分区0、1、2、3、4、5、7、10、11、12、13、14、15。

              相同的技术也可以用来指定初始偏移initial offsets:

              @KafkaListener(id = "thing3", topicPartitions =
                      { @TopicPartition(topic = "topic1",
                           partitionOffsets = @PartitionOffset(partition = "0-5", initialOffset = "0"))
                      })
              public void listen(ConsumerRecord record) {
                  ...
              }
              

              initial offset将会被应用在所有6个分区上。

              3.3 手动确认Manual Acknowledgment

              当使用手动AckMode时,你还可以向监听器提供Acknowledgement。以下示例还展示了如何使用不同的容器工厂container factory:

              @KafkaListener(id = "cat", topics = "myTopic",
                        containerFactory = "kafkaManualAckListenerContainerFactory")
              public void listen(String data, Acknowledgment ack) {
                  ...
                  ack.acknowledge();
              }
              

              3.4 消费者记录元数据Consumer Record Metadata

              有关record的元数据可从消息头headers中获得。您可以使用以下header名称来获取message的headers:

              • KafkaHeaders.OFFSET
              • KafkaHeaders.RECEIVED_KEY
              • KafkaHeaders.RECEIVED_TOPIC
              • KafkaHeaders.RECEIVED_PARTITION
              • KafkaHeaders.RECEIVED_TIMESTAMP
              • KafkaHeaders.TIMESTAMP_TYPE

                如果传入的record 具有空key,则RECEIVED_KEY 不存在。这是为了使框架与spring-messaging的空值headers不传递约定保持一致。

                以下示例显示了如何使用headers:

                @KafkaListener(id = "qux", topicPattern = "myTopic1")
                public void listen(@Payload String foo,
                        @Header(name = KafkaHeaders.RECEIVED_KEY, required = false) Integer key,
                        @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
                        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
                        ) {
                    ...
                }
                

                必须在监听器方法的具体实现上指定参数注解(@Payload、@Header);如果在接口上定义了它们,就不会检测到它们。

                另外,你可以在ConsumerRecordMetadata参数中接收record元数据,而不是使用headers:

                @KafkaListener(...)
                public void listen(String str, ConsumerRecordMetadata meta) {
                    ...
                }
                

                ConsumerRecordMetadata包含了除了key和value的所有从ConsumerRecord来的数据。

                3.5 Batch监听器

                你可以配置@KafkaListener方法来接收从consumer轮询中接收的整个batch的consumer记录。

                batch监听器不支持Non-Blocking Retries。

                要配置监听器容器工厂来创建batch监听器,可以设置batchListener属性。以下示例展示了如何执行此操作:

                @Bean
                public KafkaListenerContainerFactory batchFactory() {
                    ConcurrentKafkaListenerContainerFactory factory =
                            new ConcurrentKafkaListenerContainerFactory<>();
                    factory.setConsumerFactory(consumerFactory());
                    factory.setBatchListener(true);  // <<<<<<<<<<<<<<<<<<<<<<<<<
                    return factory;
                }
                

                你可以使用@KafkaListener注解上的batch属性覆盖工厂的batchListener属性。这与对容器Error Handlers的更改一起,允许将同一工厂用于record和batch监听器。

                容器工厂为recordMessageConverter和batchMessageConverter属性提供了单独的setter。

                以下示例展示了如何接收payloads列表:

                @KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
                public void listen(List list) {
                    ...
                }
                

                topic、partition、offset等在与payloads并行的headers中可用。以下示例展示了如何使用标头:

                @KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
                public void listen(List list,
                        @Header(KafkaHeaders.RECEIVED_KEY) List keys,
                        @Header(KafkaHeaders.RECEIVED_PARTITION) List partitions,
                        @Header(KafkaHeaders.RECEIVED_TOPIC) List topics,
                        @Header(KafkaHeaders.OFFSET) List offsets) {
                    ...
                }
                

                另一种方案,您可以接收Message<?>对象列表,每个消息中具有offset和其他详细信息,但它必须是方法上定义的唯一参数(除了当使用手动提交时,可选的Acknowledgement,和Consumer<?,?>参数)。以下示例展示了如何执行此操作:

                @KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory")
                public void listen14(List> list) {
                    ...
                }
                @KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory")
                public void listen15(List> list, Acknowledgment ack) {
                    ...
                }
                @KafkaListener(id = "listMsgAckConsumer", topics = "myTopic", containerFactory = "batchFactory")
                public void listen16(List> list, Acknowledgment ack, Consumer consumer) {
                    ...
                }
                

                在这种情况下,不会对payloads执行转换。

                如果BatchMessagingMessageConverter被配置了一个RecordMessageConverter,你也可以将泛型类型添加到Message参数中,payloads会被转换。

                您还可以收到ConsumerRecord<?,?>的列表对象,但它必须是方法上定义的唯一参数(除了当使用手动提交时,可选的Acknowledgement,和Consumer<?,?>参数)。以下示例展示了如何执行此操作:

                KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory")
                public void listen(List> list) {
                    ...
                }
                @KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory")
                public void listen(List> list, Acknowledgment ack) {
                    ...
                }
                

                监听器可以接收完整的被poll()方法返回的ConsumerRecords<?,?>对象,允许监听器访问其他方法,如partitions()(返回列表中的TopicPartition实例)和records(TopicPartiction)(获取选择性记录)。再重复一遍,这必须是方法上的唯一参数(除了当使用手动提交时,可选的Acknowledgement,和Consumer<?,?>参数)。以下示例展示了如何执行此操作:

                @KafkaListener(id = "pollResults", topics = "myTopic", containerFactory = "batchFactory")
                public void pollResults(ConsumerRecords records) {
                    ...
                }
                

                如果容器工厂配置了RecordFilterStrategy,ConsumerRecords<?,?>监听器会忽略它,并发出WARN级别的日志消息。如果>形式的监听器被使用,Records只能被batch监听器过滤。默认情况下,每次过滤一条记录;但是,你可以覆盖filterBatch以在一次调用中过滤整个batch。

                3.6 注解属性Annotation Properties

                id属性如果存在,会被用作Kafka consumer group.id属性,并覆盖consumer工厂中配置的属性。你也可以显式设置groupId或将idIsGroup设置为false,以恢复以前使用consumer工厂group.id的行为。

                你可以在大多数注解属性中使用属性占位符或SpEL表达式,如下例所示:

                @KafkaListener(topics = "${some.property}")
                @KafkaListener(topics = "#{someBean.someProperty}",
                    groupId = "#{someBean.someProperty}.group")
                

                SpEL表达式支持一个特殊的标记:__listener。它是一个伪bean名称,表示存在此注解的当前bean实例。

                考虑以下示例:

                @Bean
                public Listener listener1() {
                    return new Listener("topic1");
                }
                @Bean
                public Listener listener2() {
                    return new Listener("topic2");
                }
                

                给定前面示例中的bean,我们可以使用以下内容:

                public class Listener {
                    private final String topic;
                    public Listener(String topic) {
                        this.topic = topic;
                    }
                    @KafkaListener(topics = "#{__listener.topic}",
                        groupId = "#{__listener.topic}.group")
                    public void listen(...) {
                        ...
                    }
                    public String getTopic() {
                        return this.topic;
                    }
                }
                

                如果在不太可能的情况下,你有一个名为__listener的实际bean,那么你可以使用beanRef属性来更改表达式标记。以下示例展示了如何执行此操作:

                @KafkaListener(beanRef = "__x", topics = "#{__x.topic}",
                    groupId = "#{__x.topic}.group")
                

                你可以直接在注解上指定Kafka consumer属性,这些属性将覆盖consumer工厂中配置的任何同名属性。注意,你不能以这种方式指定group.id和client.id属性;它们将被忽略;而要使用groupId和clientIdPrefix属性。

                属性被指定为单独的字符串,具有正常的Java属性文件格式:foo:bar、foo=bar或foo-bar。

                @KafkaListener(topics = "myTopic", groupId = "group", properties = {
                    "max.poll.interval.ms:60000",
                    ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=100"
                })
                

                以下是一个设置deserializer的例子:

                @KafkaListener(id = "one", topics = "one")
                public void listen1(String in) {
                    System.out.println("1: " + in);
                }
                @KafkaListener(id = "two", topics = "two",
                        properties = "value.deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer")
                public void listen2(byte[] in) {
                    System.out.println("2: " + new String(in));
                }
                

                3.7 获取 Consumer的group.id

                当在多个容器中运行相同的监听器代码时,能够确定record来自哪个容器(由其consumer属性group.id 标识)可能会很有用。

                你可以在监听器线程上调用KafkaUtils.getConsumerGroupId()来完成此操作。或者,您可以访问方法参数中的group id。

                @KafkaListener(id = "bar", topicPattern = "${topicTwo:annotated2}", exposeGroupId = "${always:true}")
                public void listener(@Payload String foo,
                        @Header(KafkaHeaders.GROUP_ID) String groupId) {
                ...
                }
                

                这在record监听器和接收records List<?>的batch监听器中可用。它在接收ConsumerRecords<?,?>的参数的batch监听器中不可用,在这种情况下,请使用KafkaUtils机制。

                3.8 容器线程命名Container Thread Naming

                一个TaskExecutor用于唤起consumer 和监听器。你可以通过设置容器的ContainerProperties的consumerExecutor属性来提供自定义执行器。当使用pooled executors时,要确保有足够的线程可用于处理使用它们的所有容器的并发性。当使用ConcurrentMessageListenerContainer时,来自执行器的线程将用于每个consumer(并发)。

                如果不提供consumer 执行器,则会为每个容器使用SimpleAsyncTaskExecutor。这个执行器创建的线程名称类似于-C-。对于ConcurrentMessageListenerContainer,线程名称的部分变为-m,其中m表示consumer实例。每次启动容器时,n递增。因此,使用容器的bean名称,在容器第一次启动后,该容器中的线程将被命名为container-0-C-1、container-1-C-1等;在容器停止再启动后,container-0-C-2、container-1-C-2等。

                无论使用哪个executor你都可以更改线程的名称。将AbstractMessageListenerContainer.changeConsumerThreadName 属性设置为true,将调用AbstractMessage ListenerContAINer.threadNameSupplier 来获取线程名称。这是一个Function,默认实现返回container.getListenerId()。

                3.9 @KafkaListener做为元注解Meta Annotation

                你现在可以使用@KafkaListener作为元注解。以下示例展示了如何执行此操作:

                @Target(ElementType.METHOD)
                @Retention(RetentionPolicy.RUNTIME)
                @KafkaListener
                public @interface MyThreeConsumersListener {
                    @AliasFor(annotation = KafkaListener.class, attribute = "id")
                    String id();
                    @AliasFor(annotation = KafkaListener.class, attribute = "topics")
                    String[] topics();
                    @AliasFor(annotation = KafkaListener.class, attribute = "concurrency")
                    String concurrency() default "3";
                }
                

                你必须使用topics、topicPattern或topicPartitions 中的至少一个(然后,也包括id或groupId,除非你在consumer工厂配置中指定了group.id)。以下示例展示了如何执行此操作:

                @MyThreeConsumersListener(id = "my.group", topics = "my.topic")
                public void listen1(String in) {
                    ...
                }
                

                3.10 @KafkaListener on a Class

                在类级别使用@KafkaListener时,必须在方法级别指定@KafkaHandler。传递消息时,转换后的消息payload 类型用于确定调用哪个方法。以下示例展示了如何执行此操作:

                @KafkaListener(id = "multi", topics = "myTopic")
                static class MultiListenerBean {
                    @KafkaHandler
                    public void listen(String foo) {
                        ...
                    }
                    @KafkaHandler
                    public void listen(Integer bar) {
                        ...
                    }
                    @KafkaHandler(isDefault = true)
                    public void listenDefault(Object object) {
                        ...
                    }
                }
                

                你可以指定一个@KafkaHandler方法作为默认方法,如果其他方法不匹配,则会调用该方法。最多可以指定一个方法。当使用@KafkaHandler方法时,payload 必须已经转换为domain 对象(因此可以执行匹配)。使用自定义deserializer、JsonDeserializer或者TypePrecedence 设置为TYPE_ID的JsonMessageConverter。

                由于Spring解析方法参数的方式存在一些限制,默认的@KafkaHandler无法接收discrete headers;它必须使用ConsumerRecordMetadata 。

                例如:

                @KafkaHandler(isDefault = true)
                public void listenDefault(Object object, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
                    ...
                }
                

                如果object是一个String,则这将不起作用;topic参数仍将获得对object的引用。

                如果您需要默认方法中有关record的元数据,请使用以下方法:

                @KafkaHandler(isDefault = true)
                void listen(Object in, @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) {
                    String topic = meta.topic();
                    ...
                }
                

                3.11 @KafkaListener属性修改 Attribute Modification

                现在可以在容器被创建之前以编程方式修改注解属性。为此,将一个或多个“KafkaListenerAnnotationBeanPostProcessor.AnnotationEnhancer”添加到应用程序上下文中。AnnotationEnhancer是一个BiFunction<Map<String,Object>、AnnotatedElement、Map<String、Object>,并且必须返回一个属性的map。属性值可以包含SpEL或属性占位符;在执行任何解析之前调用enhancer。如果存在多个enhancer,并且它们实现了Ordered,则将按顺序调用它们。

                AnnotationEnhancer bean定义必须声明为静态的,因为它们在应用程序上下文的生命周期的早期是必需的。

                示例如下:

                @Bean
                public static AnnotationEnhancer groupIdEnhancer() {
                    return (attrs, element) -> {
                        attrs.put("groupId", attrs.get("id") + "." + (element instanceof Class
                                ? ((Class) element).getSimpleName()
                                : ((Method) element).getDeclaringClass().getSimpleName()
                                        +  "." + ((Method) element).getName()));
                        return attrs;
                    };
                }
                

                3.12 @KafkaListener生命周期管理 Lifecycle Management

                为@KafkaListener注解创建的监听器容器不是应用程序上下文中的bean。相反,它们使用KafkaListenerEndpointRegistry类型的infrastructure bean进行注册。这个bean由框架自动声明,并管理容器的生命周期;它将自动启动任何autoStartup设置为true的容器。所有容器工厂创建的所有容器都必须处于同一阶段。你可以使用registry以编程方式管理生命周期。启动或停止registry将启动或停止所有已注册的容器。或者,你可以通过使用单个容器的id属性来获取对该容器的引用。你可以在注解上设置autoStartup,它将覆盖容器工厂中配置的默认设置。你可以从应用程序上下文中获得对这个bean的引用,例如auto-wiring,以管理其注册的容器。以下示例展示了如何执行此操作:

                @KafkaListener(id = "myContainer", topics = "myTopic", autoStartup = "false")
                public void listen(...) { ... }
                @Autowired
                private KafkaListenerEndpointRegistry registry;
                ...
                    this.registry.getListenerContainer("myContainer").start();
                ...
                

                registry只维护其管理的容器的生命周期;声明为bean的容器不由registry管理,可以从应用程序上下文中获取。可以通过调用registry的getListenerContainers()方法来获取托管容器的集合。另一个一个方便的方法getAllListenerContainers(),它返回所有容器的集合,包括由registry管理的容器和声明为bean的容器。返回的集合将包括任何已初始化的prototype bean,但不会初始化任何lazy bean声明。

                刷新应用程序上下文后注册的Endpoints将立即启动,无论其autoStartup属性如何,以符合SmartLifecycle约定,注意autoStartup仅在应用程序上下文初始化期间被考虑。延迟注册的一个例子是在prototype scope中使用@KafkaListener的bean,在初始化上下文后创建实例。你可以将registry的alwaysStartAfterRefresh属性设置为false,然后容器的autoStartup属性将定义是否启动容器。

                3.13 @KafkaListener @Payload Validation

                现在可以添加一个Validator来验证@KafkaListener@Payload参数。你可以将validator添加到registrar本身。以下代码展示了如何执行此操作:

                @Configuration
                @EnableKafka
                public class Config implements KafkaListenerConfigurer {
                    ...
                    @Override
                    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
                      registrar.setValidator(new MyValidator());
                    }
                }
                

                当你将Spring Boot与validation starter一起使用时,会自动配置LocalValidatorFactoryBean,如下例所示:

                @Configuration
                @EnableKafka
                public class Config implements KafkaListenerConfigurer {
                    @Autowired
                    private LocalValidatorFactoryBean validator;
                    ...
                    @Override
                    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
                      registrar.setValidator(this.validator);
                    }
                }
                

                以下示例展示了如何验证:

                public static class ValidatedClass {
                  @Max(10)
                  private int bar;
                  public int getBar() {
                    return this.bar;
                  }
                  public void setBar(int bar) {
                    this.bar = bar;
                  }
                }
                @KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler",
                      containerFactory = "kafkaJsonListenerContainerFactory")
                public void validatedListener(@Payload @Valid ValidatedClass val) {
                    ...
                }
                @Bean
                public KafkaListenerErrorHandler validationErrorHandler() {
                    return (m, e) -> {
                        ...
                    };
                }
                

                validation也可以在类级别的监听器中对@KafkaHandler方法的payloads进行验证。

                3.14 再平衡监听器Rebalancing Listeners

                ContainerProperties有一个名为consumerRebalanceListener的属性,它了使用了Kafka客户端的ConsumerRebalanceListener接口的实现。如果未提供此属性,则容器配置一个日志监听器,该监听器在INFO级别记录再平衡事件。框架还添加了一个子接口ConsumerAwareRebalanceListener。以下列表显示了ConsumerAwareRebalanceListener接口定义:

                public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener {
                    void onPartitionsRevokedBeforeCommit(Consumer consumer, Collection partitions);
                    void onPartitionsRevokedAfterCommit(Consumer consumer, Collection partitions);
                    void onPartitionsAssigned(Consumer consumer, Collection partitions);
                    void onPartitionsLost(Consumer consumer, Collection partitions);
                }
                

                注意,在撤销分区时有两个回调。第一个立即调用。第二个是在任何挂起的偏移被提交后调用的。如果你希望在某些外部存储中维护偏移量,这将非常有用,如下例所示:

                containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
                    @Override
                    public void onPartitionsRevokedBeforeCommit(Consumer consumer, Collection partitions) {
                        // acknowledge any pending Acknowledgments (if using manual acks)
                    }
                    @Override
                    public void onPartitionsRevokedAfterCommit(Consumer consumer, Collection partitions) {
                        // ...
                            store(consumer.position(partition));
                        // ...
                    }
                    @Override
                    public void onPartitionsAssigned(Collection partitions) {
                        // ...
                            consumer.seek(partition, offsetTracker.getOffset() + 1);
                        // ...
                    }
                });
                

                另一个方法onPartitionsLost()(类似于ConsumerRebalanceListener中同名的方法)。ConsumerRebalanceListener上的默认实现仅调用了onPartionsRevoked。ConsumerAwareRebalanceListener上的默认实现不执行任何操作。当为监听器容器提供自定义监听器(任意类型)时,重要的是你的实现不要从onPartitionsLost调用onPartitionsRevoked。如果你实现ConsumerReblanceListener,则应覆盖默认方法。这是因为在调用你的实现方法后,监听器容器将从其onPartitionsLost的实现中调用自己的onPartitionsRevoked。如果你的实现委托给默认行为,那么每当Consumer在容器的监听器上调用该方法时,onPartitionsRevoked将被调用两次。

                3.15 使用@SendTo转发监听器结果Forwarding Listener Results using @SendTo

                如果你还用@SendTo注解对@KafkaListener进行了注解,并且方法调用返回了一个结果,则该结果将转发到@SendTo指定的主题。

                @SendTo值可以有几种形式:

                • @SendTo(“someTopic”)路由到字面定义的主题
                • @SendTo(“#{someExpression}”)路由到由应用程序在上下文初始化期间对表达式求值一次而确定的主题。
                • @SendTo(“!{someExpression}”)路由到通过在运行时计算表达式而确定的主题。用于求值的#root对象有三个属性:
                  • request: 传入的ConsumerRecord(或batch监听器的ConsumerRecords对象)
                  • source:从请求转换而来的org.springframework.messageing.Message<?>。
                  • result:该方法返回结果。
                  • @SendTo(无属性):这被视为!{source.headers[‘kafka_replyTopic’]}。

                    属性占位符在@SendTo值中解析。

                    表达式求值的结果必须是表示主题名称的字符串。以下示例展示了使用@SendTo的各种方法:

                    @KafkaListener(topics = "annotated21")
                    @SendTo("!{request.value()}") // runtime SpEL
                    public String replyingListener(String in) {
                        ...
                    }
                    @KafkaListener(topics = "${some.property:annotated22}")
                    @SendTo("#{myBean.replyTopic}") // config time SpEL
                    public Collection replyingBatchListener(List in) {
                        ...
                    }
                    @KafkaListener(topics = "annotated23", errorHandler = "replyErrorHandler")
                    @SendTo("annotated23reply") // static reply topic definition
                    public String replyingListenerWithErrorHandler(String in) {
                        ...
                    }
                    ...
                    @KafkaListener(topics = "annotated25")
                    @SendTo("annotated25reply1")
                    public class MultiListenerSendTo {
                        @KafkaHandler
                        public String foo(String in) {
                            ...
                        }
                        @KafkaHandler
                        @SendTo("!{'annotated25reply2'}")
                        public String bar(@Payload(required = false) KafkaNull nul,
                                @Header(KafkaHeaders.RECEIVED_KEY) int key) {
                            ...
                        }
                    }
                    

                    为了支持@SendTo,监听器容器工厂必须提供一个KafkaTemplate(在其replyTemplate属性中),用于发送回复。这应该是一个KafkaTemplate,而不是producer端上用于request/reply处理的ReplyingKafkaTemplate。当使用Spring Boot时,Boot会自动将template配置到工厂中;配置你自己的工厂时,必须按照以下示例所示进行设置。

                    你可以向监听器容器工厂添加ReplyHeadersConfigurer。将参考此信息以确定要在reply消息中设置哪些标头。以下示例展示了如何添加ReplyHeadersConfigurer:

                    @Bean
                    public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
                        ConcurrentKafkaListenerContainerFactory factory =
                            new ConcurrentKafkaListenerContainerFactory<>();
                        factory.setConsumerFactory(cf());
                        factory.setReplyTemplate(template());
                        factory.setReplyHeadersConfigurer((k, v) -> k.equals("cat"));
                        return factory;
                    }
                    

                    如果你愿意,也可以添加更多的headers。以下示例展示了如何执行此操作:

                    @Bean
                    public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
                        ConcurrentKafkaListenerContainerFactory factory =
                            new ConcurrentKafkaListenerContainerFactory<>();
                        factory.setConsumerFactory(cf());
                        factory.setReplyTemplate(template());
                        factory.setReplyHeadersConfigurer(new ReplyHeadersConfigurer() {
                          @Override
                          public boolean shouldCopy(String headerName, Object headerValue) {
                            return false;
                          }
                          @Override
                          public Map additionalHeaders() {
                            return Collections.singletonMap("qux", "fiz");
                          }
                        });
                        return factory;
                    }
                    

                    当您使用@SendTo时,您必须在ConcurrentKafkaListenerContainerFactory的replyTemplate属性中配置一个KafkaTemplate来执行发送。Spring Boot将自动织入其自动配置的template(如果存在单个实例,则为任意template)。

                    除非你使用request/reply语义,否则简单的send(topic, value)方法会被使用,因此你可能希望创建一个子类来生成分区或键。以下示例显示了如何执行此操作:

                    @Bean
                    public KafkaTemplate myReplyingTemplate() {
                        return new KafkaTemplate(producerFactory()) {
                            @Override
                            public CompletableFuture> send(String topic, String data) {
                                return super.send(topic, partitionForData(data), keyForData(data), data);
                            }
                            ...
                        };
                    }
                    

                    如果监听器方法返回Message或Collection

                    @KafkaListener(id = "messageReturned", topics = "someTopic")
                    public Message listen(String in, @Header(KafkaHeaders.REPLY_TOPIC) byte[] replyTo,
                            @Header(KafkaHeaders.CORRELATION_ID) byte[] correlation) {
                        return MessageBuilder.withPayload(in.toUpperCase())
                                .setHeader(KafkaHeaders.TOPIC, replyTo)
                                .setHeader(KafkaHeaders.KEY, 42)
                                .setHeader(KafkaHeaders.CORRELATION_ID, correlation)
                                .setHeader("someOtherHeader", "someValue")
                                .build();
                    }
                    

                    当使用request/reply语义时,发送方可以请求目标分区。

                    即使没有返回结果,您也可以使用@SendTo对@KafkaListener方法进行注解。这是为了允许配置errorHandler,该errorHandler可以将有关失败消息传递的信息转发到某个主题。以下示例展示了如何执行此操作:

                    @KafkaListener(id = "voidListenerWithReplyingErrorHandler", topics = "someTopic",
                            errorHandler = "voidSendToErrorHandler")
                    @SendTo("failures")
                    public void voidListenerWithReplyingErrorHandler(String in) {
                        throw new RuntimeException("fail");
                    }
                    @Bean
                    public KafkaListenerErrorHandler voidSendToErrorHandler() {
                        return (m, e) -> {
                            return ... // some information about the failure and input data
                        };
                    }
                    

                    如果监听器方法返回一个Iterable,则默认情况下,发送value时每个元素都会有一条记录。将@KafkaListener上的splitIterables属性设置为false,整个结果将作为单个ProducerRecord的value发送。这需要在reply template的生产者配置中有一个合适的serializer。但是,如果reply是Iterable>该属性将被忽略,并且每条消息将分别发送。

                    3.16 过滤消息Filtering Messages

                    在某些情况下,例如rebalance,可能会重新传递已处理的消息。框架无法知道这样的消息是否已被处理。这是一个应用程序级别的函数。这被称为Idempotent(幂等性)接收者模式,Spring Integration提供了它的实现。

                    Spring for Apache Kafka项目还通过FilteringMessageListenerAdapter类提供了一些帮助,该类可以包装您的MessageListener。此类采用RecordFilterStrategy的实现,在该实现中,你可以实现filter方法来发出消息是重复的并且应该丢弃的信号。FilteringMessageListenerAdapter有一个名为ackDiscarded的附加属性,它指示适配器是否应该acknowledge丢弃的记录。默认情况下为false。

                    当您使用@KafkaListener时,在容器工厂上设置RecordFilterStrategy(以及可选的ackDiscarded),以便将监听器封装在适当的过滤适配器中。

                    此外,框架还提供了FilteringBatchMessageListenerAdapter,用于当使用batch消息监听器时。

                    如果你的@KafkaListener收到ConsumerRecords而不是List>,FilteringBatchMessageListenerAdapter将被忽略,因为ConsumerRecords是不可变的。

                    你可以通过使用监听器注解上的filter属性来覆盖监听器容器工厂的默认RecordFilterStrategy。

                    @KafkaListener(id = "filtered", topics = "topic", filter = "differentFilter")
                    public void listen(Thing thing) {
                        ...
                    }
                    

                    3.17 顺序启动Starting @KafkaListener s in Sequence

                    一个常见的用例是在一个监听器消费完一个主题中的所有记录后启动另一个监听器。例如,在处理其他主题的记录之前,你可能希望将一个或多个compacted topics的内容加载到内存中。一个新的组件ContainerGroupSequencer使用@KafkaListener containerGroup属性将容器分组在一起,并在当前组中的所有容器都空闲时启动下一组中的容器。

                    最好用一个例子来说明:

                    @KafkaListener(id = "listen1", topics = "topic1", containerGroup = "g1", concurrency = "2")
                    public void listen1(String in) {
                    }
                    @KafkaListener(id = "listen2", topics = "topic2", containerGroup = "g1", concurrency = "2")
                    public void listen2(String in) {
                    }
                    @KafkaListener(id = "listen3", topics = "topic3", containerGroup = "g2", concurrency = "2")
                    public void listen3(String in) {
                    }
                    @KafkaListener(id = "listen4", topics = "topic4", containerGroup = "g2", concurrency = "2")
                    public void listen4(String in) {
                    }
                    @Bean
                    ContainerGroupSequencer sequencer(KafkaListenerEndpointRegistry registry) {
                        return new ContainerGroupSequencer(registry, 5000, "g1", "g2");
                    }
                    

                    在这里,我们有4个监听器,分为两组,g1和g2。

                    在应用程序上下文初始化期间,sequencer将所提供的组中所有容器的autoStartup属性设置为false。它还将任何容器(尚未设置)的idleEventInterval设置为提供的值(在这种情况下为5000ms)。然后,当应用程序上下文启动sequencer时,第一组中的容器就会启动。当接收到ListenerContainerIdleEvent时,每个容器中的每个单独的子容器都将停止。当ConcurrentMessageListenerContainer中的所有子容器都停止时,父容器也会停止。当一个组中的所有容器都已停止时,下一组中的容器将启动。组的数量或组中容器的数量没有限制。

                    默认情况下,最后一组(上述g2)中的容器在空闲时不会停止。要修改该行为,在sequencer上将stopLastGroupWhenIdle设置为true。

                    以前,每个组中的容器都被添加到Collection类型的bean中,bean名称为containerGroup。这些集合现在被弃用,取而代之的是ContainerGroup类型的bean,其bean名称是group name,后缀为.group;在上面的例子中,将有两个bean g1.group和g2.group。Collection bean将在未来的版本中删除。

                    3.18 使用KafkaTemplate接收Using KafkaTemplate to Receive

                    本节介绍如何使用KafkaTemplate接收消息。该template有四个receive()方法:

                    ConsumerRecord receive(String topic, int partition, long offset);
                    ConsumerRecord receive(String topic, int partition, long offset, Duration pollTimeout);
                    ConsumerRecords receive(Collection requested);
                    ConsumerRecords receive(Collection requested, Duration pollTimeout);
                    

                    正如你所看到的,你需要知道待获取的记录的分区和偏移量;为每个操作创建(并关闭)一个新的Consumer。

                    使用最后两种方法,将分别获取每条记录,并将结果组合到ConsumerRecords对象中。为请求创建TopicPartitionOffset时,仅支持正的绝对offset。

网友评论

搜索
最新文章
热门文章
热门标签