消费者的处理逻辑相对于生产者要复杂的多,本文将带你了解消费者的相关观念,以及消费消息的处理流程。
一、Kafka消费者基础概念
消费者定义:Kafka消费者是订阅并从Kafka主题(Topic)中消费消息的应用程序或服务组件。它们负责从生产者发送的消息队列中拉取消息,并对这些消息进行处理。
消费者组(Consumer Group):消费者通过加入消费者组来协同工作,同一组内的消费者共享主题的所有分区,但每个分区仅由一个消费者实例消费,实现负载均衡与故障转移。
Offset管理:消费者跟踪自己在每个分区中的消费进度,即Offset值。通过维护和提交Offset,消费者可以在重启后继续从上次停止的地方开始消费。
消费者的消费一般有两种模式;推模式和拉模式,推模式是服务端主动将消息推送给消费者,而拉模式是消费者主动向服务端发起请求拉取消息。Kafka中的消息是基于拉模式的。为什么要基于拉模式呢?采用拉模式比较简单,kafka broker不用记录每个消费者的状态,不用感知consumer。
二、消费者工作流程
- 初始化连接:消费者启动时会连接到Kafka集群,根据配置获取元数据信息,包括主题列表及其分区详情。
- 订阅主题:消费者明确指定要订阅的主题列表,然后根据所属消费者组动态分配到不同的分区。
- 消息拉取与处理:消费者通过调用poll()方法从分配到的分区拉取消息,每次拉取都会获得一批消息,然后按照业务逻辑处理这些消息。
- 位移提交提交:消费者可以选择自动或手动提交偏移量,确保在成功处理完消息后更新 Offset,以防止消息重复消费或丢失。
- 错误处理与重试:在消息处理过程中遇到异常时,消费者可以配置重试策略,保证消息的最终一致性。
下面来详细了解下kafka的位移提交
1.位移提交
对于Kafka分区而言,它的每条消息都有位移 offset,用来表示消息在分区中的位置(偏移量)。对于消费者而言,它也有一个 offset 的概念,消费者使用 offset 来表示消费到分区中某个消息所在的位置(位移)。
kafka有消息回溯功能,可以通过重新设置消费者的 offset 来指定消费位置。
在每次调用poll()方法时,它返回的是还没有被消费过的消息集,要做到这一点就需要记录上一次消费时的消费位移,并且这个消费位移必须做持久化保存,而不是单单保存在内存中,否则消费者重启之后就无法知晓之前的消费位移。
在考虑一种情况,当有新的消费者加入时,那么必然会有再均衡的动作,对于同一个分区而言,它可能在再均衡动作之后分配给新的消费者,如果不持久化保存消费位移,那么这个新的消费者也无法知晓之前的消费位移。
消费位移是持久化到哪里了呢?在旧消费者客户端中,消费位移是存储在Zookeeper中的。而在新消费者客户端中,消费位移存储在Kafka内部主题__consumer_offset中。这里把将消费位移存储起来(持久化)的动作称为提交,消费者在消费完消息后需要执行消费位移的提交。
为什么要放弃zookeeper而使用__consumer_offset呢?
老版本Consumer的位移管理是依托与Zookeeper的,他会自动或手动的将位移提交到Zookeeper中保存。当Consumer重启后,它能自动从Zookeeper中读取位移数据,从而在上次消费截止的地方继续消费,这种设计使得Kafka不需要保存位移数据,减少了broker端需要持久的状态空间,有利于实现高伸缩性。
但是Zookeeper并不适用于这种高频的写操作,因此,从0.8.2.x版本开始,推出了全新的位移管理机制。就是将Consumer的位移操作,提交到__consumer_offsets中。可以这么说__consumer_offsets的主要作用是保存Kafka消费者位移信息。它要求这个提交过程不仅实现高持久性,还要支持高频写操作。显然,Kafka的主题设计天然就满足这个条件。
位移主题就是普通的Kafka主题,你可以创建、修改它,只不过是Kafka的内部主题。这个主题的消息可以简单的理解为 KV 对。key 和 value 分别表示消息的键和消息体。key首先需要知道是哪个消费者,所以key中包含的信息包括,Kafka的第一个consumer程序启动时Kafka会自动创建位移主题,那么也会有分区等概念,分区是怎么设置的呢?broker端参数offsets.topic.num.partitions设置,默认值为50,副本通过offsets.topic.replication.factor设置,默认值为3,当然你可以自己设置这些值,但是建议使用默认值。
2.自动提交与手动提交
在Kafka中默认的消费位移的提交方式是自动提交,这个由消费者客户端参数enable.auto.commit配置,默认值为true。当然这个默认的自动提交不是每消费一条消息就提交一次,而是定期提交,这个定期的周期由客户端参数auto.commit.interval.ms配置,默认5S,此参数生效的前提是enable.auto.commit参数为true。
自动提交虽然简单,但可能会带来重复消费和消息丢失的问题。假设刚提交完一次消费,然后拉取一批新的消息进行消费,在下一次自动提交消费位移之前,消费者崩溃了,那么又得从上一次位移提交的地方重新开始消费,这样便造成了重复消费的现象。
自动提交是延时提交,重复消费可以理解,那么消息丢失是怎么发生的呢?
如上图,拉取线程A不断地拉取消息并存入本地缓存,比如BlockingQueue中,另一个处理线程B从缓存中读取消息并进行相应的逻辑处理。假设目前进行到了y+1次拉取,以及第m次位移提交,也就是X+4之前的位移已经确认提交了,处理线程B却还在处理X+2的消息,此时如果消费线程B发生了异常,待其恢复之后会从m处拉取消息,之间未处理的消息便丢失了。
自动提交可能还会造成磁盘被写满:只要consumer一直启动着,他就会无限期地向位移主题写入消息,一个极端的例子,consumer当前消费到某个主题的最新一条消息,位移是100,之后该主题没有任何消息产生,故consumer无消息可消费了,所以位移永远保持在100。由于是自动提交,位移主题中会不断的写入位移=100的消息。显然Kafka只要保留这类消息中的最新一条即可,之前的消息都应该被删除。这就要求Kafka必须针对位移主题消息特点的消息删除策略,否则这种消息越来越多,最终撑爆整个磁盘。
Kafka专门提供了后台线程定期巡检待Compact的主题,满足条件的即可删除,这个后台线程叫Log Cleaner,这个线程出问题会导致磁盘空间问题。
kafka提供了手动提交方式,这样消费者就有充足的时间处理消息。手动提交位移分为同步提交和异步提交。同步提交(commitSync)只要没有发生不可恢复的错误,它就会阻塞消费者线程直至位移提交完成。异步提交(commitAsync)在执行的时候消费者线程不会被阻塞,可能在提交消费位移的结果还没返回之前就开始了新的一次拉取。
3.指定位移消费
在kafka中当消费者查不到所记录的消费位移时,就会根据消费者客户端参数auto.offset.reset的配置来决定从何处开始进行消费,这个参数默认值为lastest,表示从分区末尾开始消费消息。如果auto.offset.reset为earliest,那么消费者会从起始处,也就是0开始消费。
有时候我们并不知道特定的消费位移,却知道一个相关的时间点,比如我们想要消费昨天8点之后的消息,这个需求更符合正常的思维逻辑。此时我们无法直接使用seek()方法追溯到相应的位置。KafkaConsumer同样考虑到了这种情况,它提供了一个offsetForTimes()方法,通过时间戳来查询与此对应的位置。
三、再均衡
再均衡是指分区的所属权从一个消费者转移另一个消费者的行为,它为消费组具备高可用性和伸缩性提供了保障,使我们可以即方便又安全的删除消费组内的消费者或往消费组内添加消费者。
不过在再均衡发生期间,消费组内的消费者无法读取消息。也就是说在再均衡发生期间的这一小段时间内,消费组变得不可用。
另外,当一个分区被重新分配给另一个消费者时,消费者当前的状态也会丢失。比如消费者消费完某个分区中的一部分消息时还没来的及提交位移就发生了再均衡操作,之后这个分区又被分配给另一个消费者,原来被消费完的那部分消息又被重新消费了一遍,也就是发生了重复消费。
那么再均衡会在什么情况下发生呢?
- 有新的消费者加入消费组。
- 有消费者宕机下线。消费者并不一定需要真正下线,例如遇到长时间GC、网络延迟导致消费者长时间未发送心跳等情况时,broker会认为消费者已经下线。比如采用了手动提交,但是处理消息耗时很长,在规定时间内没有进行位移提交,那就会认为该consumer宕机了,要进行再均衡。
- 有消费者退出消费组,被协调器错误的踢出group。
- 消费组所对应的GroupCoordinator(组协调器)发生了节点变更。
- 消费组内所订阅的任一主题的分区数发生变化。
哪些场景会使协调器认为某个消费者实例已挂而退出消费组呢?
每个消费者实例都会定期地向Coordinator(协调器)发送心跳请求,表明它还活着。如果某个consumer实例不能及时地发送这些心跳请求,Coordinator会认为该Consumer已死,从而从group中移除。然后开启新一轮的Rebalance。
Consumer端有个参数,sesson.timeout.ms就是用来表示这个的,默认值为10s。如果Coordinator在10s之内没有收到group下某个consumer实例的心跳,就会认为这个consumer已经挂了。sesson.timeout.ms决定了consumer存活性的时间间隔。
除了以上两个参数,consumer端还有一个参数,用来控制consumer实际消费能力对rebalance的影响,即max.poll.interval.ms。它设定了consumer端应用程序两次调用pool方法的最大时间间隔。默认值为5分钟,表示你的consumer程序如果在5分钟之内无法消费poll返回的消息,那么consumer会主动发起离开组的请求,coordinator会开启新一轮Rebalance,还可能会造成重复消费。
第一类非必要的Rebalance是因为未能及时发送心跳,导致consumer被踢出group而引发的。第二类非必要的Rebalance是consumer消费时间过程导致的。
关于消费者的相关内容就介绍到这里。
猜你喜欢
网友评论
- 搜索
- 最新文章
- 热门文章