上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

kafka使用详解、最佳实践和问题排查

guduadmin14小时前

kafka是一个常用的分布式消息中间件,与RabbitMQ对比,特点是可以无限横向扩容,并保持高可靠性、高吞吐量和低延迟,因此比RabbitMQ有更高的市场占有率(网上搜了一下,kafka大约41%,RabbitMQ大约29%)。

一、kafka常见概念

一般正常的开发,了解到前6个概念就好,其余的概念更多用于kafka运维配置,或问题排查。

1、producer生产者

指生产消息,并把消息投递到kafka的外部应用程序 ,它不是kafka的组成部分

2、consumer消费者

指连接到kafka,接收/订阅消息,并进行后续逻辑处理的外部应用程序,它也不是kafka的组成部分。

一个消费者可以同时消费kafka的多个队列(主题)

3、Consumer Group消费者组

连接到kafka的消费者,必须指定消费者组,多个消费者可以指定相同的消费者组,这样可以避免同一个消息被重复消费。

如果2个消费者绑定了同一个队列(主题),指定了不同的消费者组,则每条消息,都会同时投递给这2个消费者。

4、topic 主题

kafka里,收发消息的逻辑集合,每个主题,都可以认为是一个队列;

生产者和消费者,都是通过连接主题 来处理消息。

5、partition分区

kafka里存储消息的物理集合,一个主题可以划分为1个或多个分区,可以理解为子队列;

每个分区只属于一个主题,且只能被一个消费者消费(同一个组)。

该主题收到的所有消息,会根据消息的key选择对应的分区进行投递;

如果消息未指定key,且没有定义分区规则时,则kafka会随机平均投递到主题的多个分区里。

注意:每个分区里的消息,一定是按队列的规则,保证先进先出;但是不同的分区的消息,则无法保证。

因此,如果要确保消费者能按消息的投递顺序进行消费:

  • 每个主题只建一个分区(这个不推荐)
  • 同一批需要保证顺序的消息,指定相同的key,比如使用用户ID作为消息的key,相同key的消息会投递到同一个分区

    6、offset偏移量

    指每个分区里的消息的唯一编号,并且是从0开始递增的。主题+分区+偏移量,可以唯一定位一条消息。

    注:每个分区里的每条消息,offset一定是不同的;不同分区的offset是会重复的。

    消费者会也记录每次消费的offset值,来标识自己当前处理到哪一条消息了,以便断开重连时,继续消费,消费者的offset也是存储在kafka中。

    7、broker集群节点

    kafka集群里的某个节点,通常是一台服务器上的kafka实例。

    8、replica副本

    主题的每个分区,可以指定多个副本,每个副本都存储一份完全相同的消息数据。

    一般建议同一个分区的不同副本,要保存在不同的broker上,避免broker故障导致该分区数据丢失。

    9、leader/follower

    主题的每个分区,如果有多个副本,那么其中一个副本会作为leader对外提供读写服务,其余的作为follower只同步数据。

    如果leader出现故障时,会从follower中选举一个副本作为leader重新提供服务。

    10、ISR(in-sync replicas)

    分区的同步副本集合,每个分区都会维护一个ISR列表,内容是那些与leader保持同步的follower清单。

    如果某个follower跟不上同步的进度,或无法保持同步时,会从ISR列表中移除。

    只有在ISR列表里的follower,才有机会提升为leader.

    注:已经成为leader的副本,也在ISR中

    11、LEO日志末端偏移量

    LEO指Log End Offset,即当前分区中下一条待写入消息的偏移量,该条消息是未指向具体的消息。

    分区的每个副本,都有自己的LEO。

    12、HW高水位线

    HW指High Watermark Offset,即当前分区中已经被提交并复制到所有副本的最高消息偏移量(offset),

    leader接收到消息,但是还未同步完时,不会更新HW值。

    leader会比对自己和所有follower的LEO,用其中的较小值,来更新HW值。

    13、LAG滞后消息数

    一个消费者组,在消费 主题的每个分区时,每个分区都会计算一个LAG值,指该分区的消息总数与消费者已消费的消息数的差值。

    通常是 该分区的HW 减 消费者组的offset。

    实践中,运维人员应当对LAG进行监控,比如超出10000时进行告警和处理。

    理解了这些概念,网上找了一张kafka工作原理图:

    kafka使用详解、最佳实践和问题排查,在这里插入图片描述,第1张

    二、kafka与RabbitMQ对比

    相对于RabbitMQ,Kafka有如下特点:

    • kafka的消息消费完并不会立即删除,而是保存一定时间后才删除,默认是7天。而RabbitMQ是消费完就删除。

      kafka不删消息这点我很喜欢,尤其是排查问题需要数据恢复时。

    • kafka消费者只支持pull模式,不支持push模式,即消费者只能主动轮询kafka获取消息,默认是每500ms拉一次,每次最多拉500条数据,轮询的优点就是灵活,缺点就是没消息时空耗性能。

      RabbitMQ默认只支持push模式,主动推送消息给消费者,实时性更好。

    • kafka通过topic主题连接生产者和消费者,多个消费者连接到同一个topic,即可消费该主题的所有消息,如果有不需要的消息,也只能由消费者自行判断和抛弃处理。

      RabbitMQ则是通过Exchange接收消息,再通过指定的规则转发到具体的Queue,由消费者消费,可以参考我以前写的文章:https://youbl.blog.csdn.net/article/details/80401945

      RabbitMQ可以通过配置很多路由,避免消息投递给不必要的消费者,

      不过RabbitMQ也支持直接通过Queue接收和投递消息。

    • 性能上,RabbitMQ是单线程模型,大数据上会有瓶颈;而kafka可以几乎无限扩展。
    • 有序性,kafka对于主题的每个分区,因为有且只能有一个消费者,所以能保证消息的有序性,不同分区则无法保证;

      而RabbitMQ在多消费者时,会平均分配消息,无法保证有序,并且在消息消费失败重新投递时,也会破坏消息顺序。

      三、kafka最佳实践

      1、生产者配置

      • 生产者发送时,有个acks配置,说明如下:

        • 为0,生产者发消息后,不等borker响应就返回成功,性能最高,丢数据概率也最高;
        • 为1,生产者发消息后,leader节点返回成功,就算成功;但是leader未同步给其它副本前就挂了,也会丢数据;
        • 为all 或 -1,则必须等所有副本都同步成功,才返回成功,保证数据不丢失,但性能最低。

          生产环境,建议配置为-1,另外2个配置都有丢数据的可能性。

        • min.insync.replicas 最小副本数要求,默认值1,建议为2(当然要求每个topic副本数都在3以上)

          因为如果配置为1,当leader收到数据,还未同步就故障了,会丢失数据。

        • retries: 重试次数,设置为较大值,默认值为Integer.MAX_VALUE,确保发送成功。

          注:虽然重试次数默认很大,但是重试还受到另一个时间配置的影响:delivery.timeout.ms(默认2分钟),retries还没用完,这个超时时间就到了,也会中断发送。

          另外,如果设置了较大的retries,请使用异步发消息的方式,避免同步操作导致线程堵塞,影响用户体验,或其它业务问题。

        • 配置参考:

          spring:
            kafka:
              producer:
                bootstrap-servers: 10.1.1.1:9092
                key-serializer: org.apache.kafka.common.serialization.StringSerializer
                value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
                retries: 10000
              properties:
                delivery.timeout.ms: 2000 # 发送消息上报成功或失败的最大时间,默认120000,两分钟
                linger.ms: 0              # 生产者把数据组合到一个批处理进行请求的最大延迟时间,默认0
                # 参考 https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient
                request.timeout.ms: 1000  # 批处理就绪后到响应的等待时长,含网络+服务器复制时间
                batch.size: 1000
          

          2、消费者配置

          • 为了避免消息丢失,消费者需要开启手动ack,消息业务逻辑处理完成再提交偏移量
          • 参考后面的死循环问题,建议使用String反序列化器
          • 根据业务情况,配置合适的批量拉取数量max-poll-records,默认值500
          • 根据业务情况,配置合适的auto-offset-reset值,默认值latest
            • latest:消费者在消费主题的某个分区时,如果没有之前的消费记录(以前提交的偏移量),则只拉取最新消息,忽略历史消息。
            • earliest:与latest相反,没有之前的消费记录时,从最早的消息开始处理。
            • none:没有之前的消费记录时,抛出异常。
            • 配置参考:
              spring:
                kafka:
                  consumer:
                    bootstrap-servers: 10.1.1.1:9092
                    max-poll-records: 100
                    key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
                    value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
                    auto-offset-reset: latest
                  listener:
                    type: batch
                    ack-mode: manual_immediate
              

              3、其它

              • 配置多个 bootstrap server url,避免单节点故障,导致连接失败
              • 如果是异步发消息,不要在kafkaTemplate的成功回调和失败回调方法里有太多业务逻辑,回调方法是单线程处理,里面的业务逻辑会占用delivery.timeout.ms的超时时间配置,可能导致后续消息发送超时。
              • 同理,消费者也是单线程处理,消费逻辑太重的话,可能导致session.timeout.ms超时,从而被认为消费者离线,导致问题

                四、kafka工具介绍

                1、图形化工具

                推荐 OffsetExplorer,下载地址:https://www.kafkatool.com/download.html

                kafka使用详解、最佳实践和问题排查,在这里插入图片描述,第2张

                2、命令行工具

                kafka安装包内置了很多脚本工具,可以方便的查询kafka的状态,这些工具只需要下载就可以使用,无需安装。

                • 下载地址: https://kafka.apache.org/downloads

                  下载后解压,在bin目录下有很多sh文件,这些是在linux上使用的;

                  如果在Windows下使用,要用 bin\windows\ 下的那些bat文件。

                  下面用windows的bat文件命令举例(linux改用对应的sh文件执行即可)

                • 使用说明,请参考官方文档:https://kafka.apache.org/documentation/

                  查询某个消费者组下有哪些消费者,以及这些消费者对主题的消费状态:

                  d:\kafka_2.13-3.4.0\bin\windows\kafka-consumer-groups.bat --describe --group=cb_consumers --bootstrap-server=10.0.0.1:9092

                  kafka使用详解、最佳实践和问题排查,在这里插入图片描述,第3张

                  字段说明:

                  • GROUP 消费者分组
                  • TOPIC 消费的主题
                  • PARTITION 消费的分区
                  • CURRENT-OFFSET 当前消费到的消息偏移量
                  • LOG-END-OFFSET 当前分区的最大消息偏移量
                  • LAG 滞后消息条数
                  • CONSUMER-ID 消费者的ID
                  • HOST 消费者所在的主机
                  • CLIENT-ID 客户端ID

                    注:LAG可以简单理解为 LOG-END-OFFSET 减 CURRENT-OFFSET,但是实际上LAG=HW 减 CURRENT-OFFSET

                    四、springboot项目使用

                    1、生产者Demo代码:

                    1.1、添加pom依赖:

                    
                        org.springframework.kafka
                        spring-kafka
                    
                    

                    1.2、添加application.yml配置:

                    spring:
                      kafka:
                        producer:
                          bootstrap-servers: 10.1.1.1:9092
                          key-serializer: org.apache.kafka.common.serialization.StringSerializer
                          value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
                          retries: 2  # 失败重发次数
                    

                    1.3、Java发送代码:

                    private final KafkaTemplate kafkaTemplate; // 注入的Bean
                    // 同步发送消息
                    String topic = "beinetTest111";
                    Object result = kafkaTemplate.send(topic, "我是key", objData).get();
                    

                    2、消费者Demo代码:

                    2.1、添加pom依赖:

                    
                        org.springframework.kafka
                        spring-kafka
                    
                    

                    2.2、添加application.yml配置:

                    spring:
                      kafka:
                        consumer:
                          bootstrap-servers: 10.1.1.1:9092
                          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
                          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer 
                        listener:
                          type: batch
                          ack-mode: manual_immediate
                    

                    2.3、Java发送代码:

                    @KafkaListener(topics = "${kafka-topic.reports}")
                    public void consumerCreateTask(List> consumerRecordList, Acknowledgment ack) {
                        if (consumerRecordList == null || consumerRecordList.size() <= 0)
                            return;
                        long start = System.nanoTime();
                        ConsumerRecord lastRecord = consumerRecordList.get(0);
                        try {
                            // 转换dto,并进行业务逻辑处理
                            long elapsedTime = System.nanoTime() - start;
                            log.debug("Topic:{} 分区:{} 偏移:{} 条数:{} 耗时:{}ns",
                                    lastRecord.topic(),
                                    lastRecord.partition(),
                                    lastRecord.offset(),
                                    dtos.size(),
                                    elapsedTime);
                        } catch (Exception exp) {
                            long elapsedTime = System.nanoTime() - start;
                            log.error("Topic:{} 分区:{} 偏移:{} 耗时:{}ns 出错:",
                                    lastRecord.topic(),
                                    lastRecord.partition(),
                                    lastRecord.offset(),
                                    elapsedTime,
                                    exp);
                        } finally {
                            // 不论成败,都提交,避免出错导致死循环,避免丢消息的逻辑,可以在catch里备份
                            ack.acknowledge();
                        }
                    }
                    

                    五、kafka常见问题

                    1、有多个消费者,但是总会有一个消费者拿不到消息数据

                    对一个消费者组而言,一个主题有几个分区,就最多接受几个消费者;

                    比如主题有2个分区,那么每个分区只能分配给组里的一个消费者,最多只有2个消费者连接上来,如果组里有3个消费者,那么肯定会有一个消费者处于空闲状态,没活干。

                    如果主题有2个分区,但是组里只有一个消费者,那么2个分区的消息,都会投递给这一个消费者。

                    2、主题的分区分配策略是怎么样的?

                    当主题存在多个分区和多个消费者时,kafka的源码实现里有如下几种分区分配策略:

                    • Range策略(默认策略):

                      把当前消费者组消费的每个主题的所有分区,逐个分配给消费者,注意是每个主题单独处理,所以会出现不均衡的情况。

                      例如:a主题有3个分区a0/a1/a2,b主题3个分区b0/b1/b2,有2个消费者C0/C1,分配过程大致是:

                      第1步分配a主题: a0->C0, a1->C1, a2->C0

                      第2步分配b主题: b0->C0, b1->C1, b2->C0

                      这样可以看出,【消费者C0要维护4个分区的数据,而C1只要维护2个分区的数据】,出现了明显的不均衡问题。

                    • Round-Robin策略:

                      把所有的分区,排序后,轮询方式逐一分配给所有的消费者,

                      例如:a主题有3个分区a0/a1/a2,b主题3个分区b0/b1/b2,有2个消费者C0/C1,分配过程大致是:

                      第1步分配a主题: a0->C0, a1->C1, a2->C0

                      第2步分配b主题: b0->C1, b1->C0, b2->C1

                      注意:第2步不是从头开始,而是接着第1步,继续分配,所以排除了Range方案的不均衡问题,

                      最终的分配结果是【2个消费者,各自负责3个分区】。

                      但是,如果2个消费者消费的主题,只有部分交集,并不完全相同时,还是会出现不均衡的情况。

                      如果希望改用这种策略,目前暂时没有配置方法,要在代码里修改partition.assignment.strategy属性,参考代码:

                      @Configuration
                      @RequiredArgsConstructor
                      public class KafkaConfiguration {
                          private final KafkaProperties kafkaProperties;
                          private final ConcurrentKafkaListenerContainerFactory kafkaFactory;
                          @Bean("myKafkaFactory")
                          public KafkaListenerContainerFactory> batchFactory() {
                              Map props = kafkaProperties.buildConsumerProperties();
                              props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");
                              kafkaFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props));
                              return kafkaFactory;
                          }
                      }
                      

                      然后在消费者代码上指定使用这个工厂Bean:

                       @KafkaListener(id = "beinetHandler1", groupId = "beinetGroup", topicPattern = "beinetTest.*",
                                  containerFactory = "myKafkaFactory")
                          public void msgHandler(List message, Acknowledgment ack) {
                      
                      • 在kafka的源码里还有2种实现,本文暂不深入介绍:

                        org.apache.kafka.clients.consumer.CooperativeStickyAssignor

                        org.apache.kafka.clients.consumer.StickyAssignor

                        3、消费者加入或退出时,消息还能正常消费吗?

                        结论:只要有存活的消费者存在,那么所有的消息都能正常消费。

                        当有新的消费者加入组,或组中有消费者下线/退出,都会触发消费者重新平衡的动作,就是重新为所有的消费者分配分区。

                        重平衡发生时,默认停止所有消费者工作,直到分配结束。

                        4、有兄弟说已经往kafka写入消息了,但是消费者那边没有数据入库

                        • 首先,确认kafka有消息,用上面的图形化工具offset explorer,去对应的topic主题查找数据,发现确实有数据
                        • 再在工具里,查看Consumers下的对应Group,发现Lag为0,说明消息已经被正常消费了
                        • 查看消费者的应用日志,没有消费日志产生
                        • 再继续排查消费者的应用日志,发现有如下日志:cb_consumers: partitions assigned: []

                          这表示,该消费者没有分配到分区,不在工作中。

                          初步判断,是不是有人在其它地方启动了消费者,把数据给消费掉了。

                        • offset explorer这个工具,不能展示消费者IP信息,只能使用上面的命令kafka-consumer-groups.bat查看消费者IP,

                          再找运维看看这个IP是谁的。

                        • 最后定位到是测试环境配置错误,把开发环境的数据消费掉了。

                          5、反序列化失败,导致消费者死循环问题

                          某天发布到测试环境后,发现程序启动后,一直抛如下异常,且持续几十分钟也不会中断:

                          <#6d8d6458> j.l.IllegalStateException: No type information in headers and no default type provided
                              at o.s.util.Assert.state(Assert.java:76)
                              at o.s.k.s.s.JsonDeserializer.deserialize(JsonDeserializer.java:535)
                              at o.a.k.c.c.i.Fetcher.parseRecord(Fetcher.java:1387)
                              at o.a.k.c.c.i.Fetcher.access$3400(Fetcher.java:133)
                              at o.a.k.c.c.i.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1618)
                              at o.a.k.c.c.i.Fetcher$CompletedFetch.access$1700(Fetcher.java:1454)
                              at o.a.k.c.c.i.Fetcher.fetchRecords(Fetcher.java:687)
                              at o.a.k.c.c.i.Fetcher.fetchedRecords(Fetcher.java:638)
                              at o.a.k.c.c.KafkaConsumer.pollForFetches(KafkaConsumer.java:1272)
                              at o.a.k.c.c.KafkaConsumer.poll(KafkaConsumer.java:1233)
                              at o.a.k.c.c.KafkaConsumer.poll(KafkaConsumer.java:1206)
                              at j.i.r.GeneratedMethodAccessor109.invoke(Unknown Source)
                              at j.i.r.DelegatingMethodAccessorImpl.invoke(Unknown Source)
                              at j.l.reflect.Method.invoke(Unknown Source)
                              at o.s.a.s.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
                              at o.s.a.f.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:208)
                              at c.s.proxy.$Proxy186.poll(Unknown Source)
                              at o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1413)
                              at o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1250)
                              at o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1162)
                              at j.u.c.Executors$RunnableAdapter.call(Unknown Source)
                              at j.u.c.FutureTask.run(Unknown Source)
                              at java.lang.Thread.run(Unknown Source)
                          

                          放google搜索了一下,说是反序列化找不到类型信息导致的,并建议不要使用JSON反序列化。

                          查了一下配置变化记录,确实加了一个kafka的反序列化配置变更:

                          spring: 
                            kafka:
                              producer:
                                value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
                              consumer:
                                value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
                          

                          把spring.consumer.value-deserializer 改成:org.apache.kafka.common.serialization.StringDeserializer 就恢复了。

                          了解了一下,同事希望在消费者的方法参数上,直接使用对象,而不是使用String,所以做了这个修改。

                          而正好出错的这个消费者,消费的是其它项目生产的消息,里面并不包含类型信息。

                          而这个异常,是在spring的底层抛出的,业务代码上无法进行try 捕捉,代码同时又设置了手工提交ack,导致代码进入了死循环。

                          为了避免这种问题,还是建议使用StringDeserializer反序列化,自己在代码里反序列化比较好。

                          6、broker单个故障,导致消费者无法提交偏移量的问题

                          生产环境,为了性能和故障转移,部署了6个broker,某天有一个broker故障下线了,按理应该会自动切换,实际上,所有的消费者都开始抛异常:“error when storing group assignment during syncgroup”

                          直到人工恢复broker并上线,故障才恢复。

                          最终排查结果:

                          • 运维把kafka的一个内部topic: __consumer_offsets 副本数配置为2,
                          • 同时配置 min.insync.replicas=2 ,该配置的含义是ISR列表最小同步副本数不得少于2个
                          • 故障下线的broker,正好包含该topic: __consumer_offsets的一个副本,导致该主题的副本数只剩下一个,不符合 min.insync.replicas=2配置要求,从而停止工作
                          • topic: __consumer_offsets的作用,是接收并存储所有消费者组的消费偏移量,该主题不工作,就会导致消费者无法提交偏移量,从而导致所有消费不正常,会重复消费数据。

                            知道问题了,调整就是把 topic: __consumer_offsets的副本数调整为3(默认值就是3,运维改错了)

                            7、所有消费者都不消费任何消息

                            如果消费者先启动,然后才创建topic,会导致消费者消费不到数据,可以重启消费者试试

                            8、spring中的kafka,是否存在线程安全问题

                            生产者使用的KafkaTemplate是线程安全的,经测试,都是使用同一个线程进行消息发送。

                            同样,消费者也是线程安全的,每个消费者也是单线程处理所有接收到的消息。

                            9、kafka消息堵塞怎么处理

                            • 如果消息不重要,可以直接删除主题重建,该主题下的所有消息自然就没了,注意需要重建topic,然后重启消费者;
                            • 如果所有消息都需要消费,基于kafka的分区特性,每个分区只能一个消费者,因此无法简单通过增加消费者来解决问题
                              • 确认消费者有没有出现异常,需要注意的是,有些开发人员会吞掉异常,导致你认为消费者正常的,可以通过业务数据是否持续增长来判别,如果是消费者异常了,修复bug即可。
                              • 如果消费正常,再确认是否突发消息数据增长,简单判断就是主题的LAG是否在持续按正常速率降低,观察几分钟,如果持续降低,基于判断是突发消息,可以耐心等待。
                              • 如果消息不会正常下降,基本判断是消费速度慢,
                                • 先用工具确认该主题下每个分区的LAG,如果只是某个分区的LAG特别高,其它分区正常(未堵塞),那么应该是消息分配不均衡,这个分区的消息特别多,考虑调整生产者的消息key,确保所有分区的消息数量均衡;
                                • 如果对消息时序无特别要求,可以代码里通过线程池异步处理消息,注意要控制线程池数量,不要导致应用oom了;
                                • 考虑增加几个分区,再增加几个消费者,这样新生产的消息会重新分散到不同分区,降低旧消费者的压力;
                                • 堵塞的消息,考虑加个新消费者,消费到另外的临时主题,临时主题多加个几倍的分区和消费者,进行快速消费,注意不要对下游或数据库造成冲击,导致其它问题。

网友评论

搜索
最新文章
热门文章
热门标签