一、简介
1.1、消息队列简介
1.1.1、什么是消息队列
消息队列,英文名:Message Queue,经常缩写为MQ。从字面上来理解,消息队列是一种用来存储消息的队列。来看一下下面的代码:
// 1. 创建一个保存字符串的队列 QueuestringQueue = new LinkedList (); // 2. 往消息队列中放入消息 stringQueue.offer("hello"); // 3. 从消息队列中取出消息并打印 System.out.println(stringQueue.poll());
上述代码,创建了一个队列,先往队列中添加了一个消息,然后又从队列中取出了一个消息。这说明了队列是可以用来存取消息的。
我们可以简单理解消息队列就是将需要传输的数据存放在队列中。
1.1.2、消息队列中间件
- 消息队列——用于存放消息的组件
- 程序员可以将消息放入到队列中,也可以从消息队列中获取消息
- 很多时候消息队列不是一个永久性的存储,是作为临时存储存在的(设定一个期限:设置消息在MQ中保存10天)
- 消息队列中间件:消息队列的组件,例如:Kafka、Active MQ、RabbitMQ、RocketMQ、ZeroMQ
1.1.2.1、为什么叫Kafka呢
Kafka的架构师jay kreps非常喜欢franz kafka(弗兰兹·卡夫卡),并且觉得kafka这个名字很酷,因此取了个和消息传递系统完全不相干的名称kafka,该名字并没有特别的含义。
1.1.3、消息队列的应用场景
1.1.3.1、异步处理
电商网站中,新的用户注册时,需要将用户的信息保存到数据库中,同时还需要额外发送注册的邮件通知、以及短信注册码给用户。但因为发送邮件、发送注册短信需要连接外部的服务器,需要额外等待一段时间,此时,就可以使用消息队列来进行异步处理,从而实现快速响应。
- 可以将一些比较耗时的操作放在其他系统中,通过消息队列将需要进行处理的消息进行存储,其他系统可以消费消息队列中的数据
- 比较常见的:发送短信验证码、发送邮件
1.1.3.2、系统解耦
- 原先一个微服务是通过接口(HTTP)调用另一个微服务,这时候耦合很严重,只要接口发生变化就会导致系统不可用
- 使用消息队列可以将系统进行解耦合,现在第一个微服务可以将消息放入到消息队列中,另一个微服务可以从消息队列中把消息取出来进行处理。进行系统解耦
1.1.3.3、流量削峰
- 因为消息队列是低延迟、高可靠、高吞吐的,可以应对大量并发
1.1.3.4、日志处理(大数据领域常见)
大型电商网站(淘宝、京东、国美、苏宁…)、App(抖音、美团、滴滴等)等需要分析用户行为,要根据用户的访问行为来发现用户的喜好以及活跃情况,需要在页面上收集大量的用户访问信息。
- 可以使用消息队列作为临时存储,或者一种通信管道
1.1.4、生产者、消费者模型
我们之前学习过Java的服务器开发,Java服务器端开发的交互模型是这样的:
我们之前也学习过使用Java JDBC来访问操作MySQL数据库,它的交互模型是这样的:
它也是一种请求响应模型,只不过它不再是基于http协议,而是基于MySQL数据库的通信协议。而如果我们基于消息队列来编程,此时的交互模式成为:生产者、消费者模型。
- 生产者、消费者模型
- 生产者负责将消息生产到MQ中
- 消费者负责从MQ中获取消息
- 生产者和消费者是解耦的,可能是生产者一个程序、消费者是另外一个程序
1.1.5、消息队列的两种模式
1.1.5.1、点对点模式
消息发送者生产消息发送到消息队列中,然后消息接收者从消息队列中取出并且消费消息。消息被消费以后,消息队列中不再有存储,所以消息接收者不可能消费到已经被消费的消息。
点对点模式特点:
- 每个消息只有一个接收者(Consumer)(即一旦被消费,消息就不再在消息队列中)
- 发送者和接收者间没有依赖性,发送者发送消息之后,不管有没有接收者在运行,都不会影响到发送者下次发送消息;
- 接收者在成功接收消息之后需向队列应答成功,以便消息队列删除当前接收的消息;
1.1.5.2、发布订阅模式
发布/订阅模式特点:
- 每个消息可以有多个订阅者;
- 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息。
- 为了消费消息,订阅者需要提前订阅该角色主题,并保持在线运行;
1.2、Kafka简介
Kafka官网:http://kafka.apache.org
1.2.1、什么是Kafka
Kafka是由Apache软件基金会开发的一个开源流平台,由Scala和Java编写。Kafka的Apache官网是这样介绍Kakfa的。
Apache Kafka是一个分布式流平台。一个分布式的流平台应该包含3点关键的能力:
- 发布和订阅流数据流,类似于消息队列或者是企业消息传递系统
- 以容错的持久化方式存储数据流
- 处理数据流
我们重点关键三个部分的关键词:
- Publish and subscribe:发布与订阅
- Store:存储
- Process:处理
特性
- 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个主题可以分多个分区, 消费组对分区进行消费操作;
- 可扩展性:kafka集群支持热扩展;
- 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失;
- 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败);
- 高并发:支持数千个客户端同时读写;
- 日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如Hadoop、Hbase、Solr等;
- 消息系统:解耦和生产者和消费者、缓存消息等;
- 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到Hadoop、数据仓库中做离线分析和挖掘;
- 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告;
- 流式处理:比如spark streaming和storm;
技术优势
-
可伸缩性:Kafka 的两个重要特性造就了它的可伸缩性。
- Kafka 集群在运行期间可以轻松地扩展或收缩(可以添加或删除代理),而不会宕机。
- 可以扩展一个 Kafka 主题来包含更多的分区。由于一个分区无法扩展到多个代理,所以它的容量受到代理磁盘空间的限制。能够增加分区和代理的数量意味着单个主题可以存储的数据量是没有限制的。
-
容错性和可靠性:Kafka 的设计方式使某个代理的故障能够被集群中的其他代理检测到。由于每个主题都可以在多个代理上复制,所以集群可以在不中断服务的情况下从此类故障中恢复并继续运行。
-
吞吐量:代理能够以超快的速度有效地存储和检索数据。
1.2.2、Kafka的应用场景
我们通常将Apache Kafka用在两类程序:
- 建立实时数据管道,以可靠地在系统或应用程序之间获取数据
- 构建实时流应用程序,以转换或响应数据流
上图,我们可以看到:
- Producers:可以有很多的应用程序,将消息数据放入到Kafka集群中。
- Consumers:可以有很多的应用程序,将消息数据从Kafka集群中拉取出来。
- Connectors:Kafka的连接器可以将数据库中的数据导入到Kafka,也可以将Kafka的数据导出到数据库中。
- Stream Processors:流处理器可以Kafka中拉取数据,也可以将数据写入到Kafka中。
1.2.3、Kafka诞生背景
kafka的诞生,是为了解决linkedin的数据管道问题,起初linkedin采用了ActiveMQ来进行数据交换,大约是在2010年前后,那时的ActiveMQ还远远无法满足linkedin对数据传递系统的要求,经常由于各种缺陷而导致消息阻塞或者服务无法正常访问,为了能够解决这个问题,linkedin决定研发自己的消息传递系统,当时linkedin的首席架构师jay kreps便开始组织团队进行消息传递系统的研发。
1.3、Kafka的优势
ActiveMQ RabbitMQ Kafka RocketMQ 所属社区/公司 Apache Mozilla Public License Apache Apache/Ali 成熟度 成熟 成熟 成熟 比较成熟 生产者-消费者模式 支持 支持 支持 支持 发布-订阅 支持 支持 支持 支持 REQUEST-REPLY 支持 支持 - 支持 API完备性 高 高 高 低(静态配置) 多语言支持 支持JAVA优先 语言无关 支持,JAVA优先 支持 单机呑吐量 万级(最差) 万级 十万级 十万级(最高) 消息延迟 - 微秒级 毫秒级 - 可用性 高(主从) 高(主从) 非常高(分布式) 高 消息丢失 - 低 理论上不会丢失 - 消息重复 - 可控制 理论上会有重复 - 事务 支持 不支持 支持 支持 文档的完备性 高 高 高 中 提供快速入门 有 有 有 无 首次部署难度 - 低 中 高 在大数据技术领域,一些重要的组件、框架都支持Apache Kafka,不论成成熟度、社区、性能、可靠性,Kafka都是非常有竞争力的一款产品。
1.4、Kafka生态圈介绍
Apache Kafka这么多年的发展,目前也有一个较庞大的生态圈。
Kafka生态圈官网地址:https://cwiki.apache.org/confluence/display/KAFKA/Ecosystem
1.5、Kafka版本
http://kafka.apache.org/downloads 可以查看到每个版本的发布时间。
二、环境搭配
2.1、 java环境
首先需要安装Java环境,同时配置环境变量,步骤如下:
- 官网下载JDK
- 解压缩,放到指定目录
- 配置环境变量
在/etc/profile文件中配置如下变量
export JAVA_HOME=/java/jdk-12.0.1 export JRE_HOME=$JAVA_HOME/jre export CLASSPATH=.:$JAVA_HOME/lib:$JRE_HOME/lib:$CLASSPATH export PATH=.:$JAVA_HOME/bin:$JRE_HOME/bin:$KE_HOME/bin:${MAVEN_HOME}/bin:$PATH
- 测试jdk
java -version
2.2、ZooKeeper的安装
Zookeeper是安装Kafka集群的必要组件,Kafka通过Zookeeper来实施对元数据信息的管理,包括集群、主题、分区等内容。
同样在官网下载安装包到指定目录解压缩,步骤如下:
- ZooKeeper官网:http://zookeeper.apache.org/
- 修改Zookeeper的配置文件,首先进入安装路径conf目录,并将zoo_sample.cfg文件修改为zoo.cfg,并对核心参数进行配置。
文件内容如下:
# The number of milliseconds of each tick # zk服务器的心跳时间 tickTime=2000 # The number of ticks that the initial # synchronization phase can take # 投票选举新Leader的初始化时间 initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement # do not use /tmp for storage, /tmp here is just # example sakes. # 数据目录 dataDir=temp/zookeeper/data # 日志目录 dataLogDir=temp/zookeeper/log # the port at which the clients will connect # Zookeeper对外服务端口,保持默认 clientPort=2181
- 启动Zookeeper命令:bin/zkServer.sh start
angyan@Server-node:/mnt/d/zookeeper-3.4.14$ bin/zkServer.sh start ZooKeeper JMX enabled by default Using config: /mnt/d/zookeeper-3.4.14/bin/../conf/zoo.cfg //启动成功 Starting zookeeper ... STARTED angyan@Server-node:/mnt/d/zookeeper-3.4.14$
2.3、搭建Kafka集群
- 将Kafka的安装包上传到虚拟机,并解压
cd /export/software/ tar -xvzf kafka_2.12-2.4.1.tgz -C ../server/ cd /export/server/kafka_2.12-2.4.1/
- 修改 server.properties
cd /export/server/kafka_2.12-2.4.1/config vim server.properties # 指定broker的id broker.id=0 # 指定Kafka数据的位置 log.dirs=/export/server/kafka_2.12-2.4.1/data # 配置zk的三个节点 zookeeper.connect=node1.angyan.cn:2181,node2.angyan.cn:2181,node3.angyan.cn:2181
- 将安装好的kafka复制到另外两台服务器
cd /export/server scp -r kafka_2.12-2.4.1/ node2.angyan.cn:$PWD scp -r kafka_2.12-2.4.1/ node3.angyan.cn:$PWD 修改另外两个节点的broker.id分别为1和2 ---------node2.angyan.cn-------------- cd /export/server/kafka_2.12-2.4.1/config vim erver.properties broker.id=1 --------node3.angyan.cn-------------- cd /export/server/kafka_2.12-2.4.1/config vim server.properties broker.id=2
- 配置KAFKA_HOME环境变量
vim /etc/profile export KAFKA_HOME=/export/server/kafka_2.12-2.4.1 export PATH=:$PATH:${KAFKA_HOME} 分发到各个节点 scp /etc/profile node2.angyan.cn:$PWD scp /etc/profile node3.angyan.cn:$PWD 每个节点加载环境变量 source /etc/profile
- 启动服务器
# 启动ZooKeeper nohup bin/zookeeper-server-start.sh config/zookeeper.properties & # 启动Kafka cd /export/server/kafka_2.12-2.4.1 nohup bin/kafka-server-start.sh config/server.properties & # 测试Kafka集群是否启动成功 bin/kafka-topics.sh --bootstrap-server node1.angyan.cn:9092 --list
注意:
- 每一个Kafka的节点都需要修改broker.id(每个节点的标识,不能重复)
- log.dir数据存储目录需要配置
2.4、目录结构分析
目录名称 说明 bin Kafka的所有执行脚本都在这里。例如:启动Kafka服务器、创建Topic、生产者、消费者程序等等 config Kafka的所有配置文件 libs 运行Kafka所需要的所有JAR包 logs Kafka的所有日志文件,如果Kafka出现一些问题,需要到该目录中去查看异常信息 site-docs Kafka的网站帮助文件 2.5、Kafka一键启动/关闭脚本
为了方便将来进行一键启动、关闭Kafka,我们可以编写一个shell脚本来操作。将来只要执行一次该脚本就可以快速启动/关闭Kafka。
- 在节点1中创建 /export/onekey 目录
cd /export/onekey
- 准备slave配置文件,用于保存要启动哪几个节点上的kafka
node1.angyan.cn node2.angyan.cn node3.angyan.cn
- 编写start-kafka.sh脚本
vim start-kafka.sh cat /export/onekey/slave | while read line do { echo $line ssh $line "source /etc/profile;export JMX_PORT=9988;nohup ${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties >/dev/nul* 2>&1 & " }& wait done
- 编写stop-kafka.sh脚本
vim stop-kafka.sh cat /export/onekey/slave | while read line do { echo $line ssh $line "source /etc/profile;jps |grep Kafka |cut -d' ' -f1 |xargs kill -s 9" }& wait done
- 给start-kafka.sh、stop-kafka.sh配置执行权限
chmod u+x start-kafka.sh chmod u+x stop-kafka.sh
- 执行一键启动、一键关闭
./start-kafka.sh ./stop-kafka.sh
2.6、Kafka测试消息生产与消费
- 首先创建一个主题
命令如下:
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic angyan --partitions 2 --replication-factor 1
–zookeeper:指定了Kafka所连接的Zookeeper服务地址。多个zookeeper用 ‘,’分开。
–topic:指定了所要创建主题的名称
–partitions:指定了分区个数
–replication-factor:指定了副本因子。每个副本分布在不通节点,不能超过总节点数。如你只有一个节点,但是创建时指定副本数为2,就会报错。
–create:创建主题的动作指令
angyan@Server-node:/mnt/d/kafka_2.12-2.2.1$ bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic angyan --partitions 2 --replication-factor 1 //主题创建成功 Created topic angyan. angyan@Server-node:/mnt/d/kafka_2.12-2.2.1$
- 展示所有主题
命令:
bin/kafka-topics.sh --zookeeper localhost:2181 --list
angyan@Server-node:/mnt/d/kafka_2.12-2.2.1$ bin/kafka-topics.sh --zookeeper localhost:2181 --list angyan angyan@Server-node:/mnt/d/kafka_2.12-2.2.1$
- 查看主题详情
命令:
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic angyan
–describe 查看详情动作指令
angyan@Server-node:/mnt/d/kafka_2.12-2.2.1$ bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic angyan Topic: angyan PartitionCount:2 ReplicationFactor:1 Configs: Topic: angyan Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: angyan Partition: 1 Leader: 0 Replicas: 0 Isr: 0 angyan@Server-node:/mnt/d/kafka_2.12-2.2.1$
命令:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic angyan
–bootstrap-server 指定了连接Kafka集群的地址
–topic 指定了消费端订阅的主题
angyan@Server-node:/mnt/d/kafka_2.12-2.2.1$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic angyan Hello,Kafka!
- 生产端发送消息
命令:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic angyan
–broker-list 指定了连接的Kafka集群的地址
–topic 指定了发送消息时的主题
angyan@Server-node:/mnt/d/kafka_2.12-2.2.1$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic angyan >Hello,Kafka! >
三、基础操作
3.1、创建topic
创建一个topic(主题)。Kafka中所有的消息都是保存在主题中,要生产消息到Kafka,首先必须要有一个确定的主题。
# 创建名为test的主题 bin/kafka-topics.sh --create --bootstrap-server node1.angyan.cn:9092 --topic test # 查看目前Kafka中的主题 bin/kafka-topics.sh --list --bootstrap-server node1.angyan.cn:9092
3.2、生产消息到Kafka
使用Kafka内置的测试程序,生产一些消息到Kafka的test主题中。
bin/kafka-console-producer.sh --broker-list node1.angyan.cn:9092 --topic test
3.3、从Kafka消费消息
使用下面的命令来消费 test 主题中的消息。
bin/kafka-console-consumer.sh --bootstrap-server node1.angyan.cn:9092 --topic test --from-beginning
3.4、使用Kafka Tools操作Kafka
- 安装Kafka集群,可以测试以下
- 创建一个topic主题(消息都是存放在topic中,类似mysql建表的过程)
- 基于kafka的内置测试生产者脚本来读取标准输入(键盘输入)的数据,并放入到topic中
- 基于kafka的内置测试消费者脚本来消费topic中的数据
- 推荐大家开发的使用Kafka Tool
- 浏览Kafka集群节点、多少个topic、多少个分区
- 创建topic/删除topic
- 浏览ZooKeeper中的数据
四、Kafka基准测试
4.1、基准测试
基准测试(benchmark testing)是一种测量和评估软件性能指标的活动。我们可以通过基准测试,了解到软件、硬件的性能水平。主要测试负载的执行时间、传输速度、吞吐量、资源占用率等。
4.1.1、基于1个分区1个副本的基准测试
测试步骤:
- 启动Kafka集群
- 创建一个1个分区1个副本的topic: benchmark
- 同时运行生产者、消费者基准测试程序
- 观察结果
4.1.1.1、创建topic
bin/kafka-topics.sh --zookeeper node1.angyan.cn:2181 --create --topic benchmark --partitions 1 --replication-factor 1
4.1.1.2、生产消息基准测试
在生产环境中,推荐使用生产5000W消息,这样会性能数据会更准确些。为了方便测试,课程上演示测试500W的消息作为基准测试。
bin/kafka-producer-perf-test.sh --topic benchmark --num-records 5000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=node1.angyan.cn:9092,node2.angyan.cn:9092,node3.angyan.cn:9092 acks=1
bin/kafka-producer-perf-test.sh --topic topic的名字 --num-records 总共指定生产数据量(默认5000W) --throughput 指定吞吐量——限流(-1不指定) --record-size record数据大小(字节) --producer-props bootstrap.servers=192.168.1.20:9092,192.168.1.21:9092,192.168.1.22:9092 acks=1 指定Kafka集群地址,ACK模式
4.1.1.3、消费消息基准测试
bin/kafka-consumer-perf-test.sh --broker-list node1.angyan.cn:9092,node2.angyan.cn:9092,node3.angyan.cn:9092 --topic benchmark --fetch-size 1048576 --messages 5000000
bin/kafka-consumer-perf-test.sh --broker-list 指定kafka集群地址 --topic 指定topic的名称 --fetch-size 每次拉取的数据大小 --messages 总共要消费的消息个数
五、Java编程操作Kafka
5.1、同步生产消息到Kafka中
5.1.1、需求
接下来,我们将编写Java程序,将1-100的数字消息写入到Kafka中。
5.1.2、准备工作
5.1.2.1、导入Maven Kafka POM依赖
central http://maven.aliyun.com/nexus/content/groups/public// true true always fail org.apache.kafka kafka-clients 2.4.1 org.apache.commons commons-io 1.3.2 org.slf4j slf4j-log4j12 1.7.6 log4j log4j 1.2.16 org.apache.maven.plugins maven-compiler-plugin 3.7.0 1.8 5.1.2.2、导入log4j.properties
将log4j.properties配置文件放入到resources文件夹中
log4j.rootLogger=INFO,stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%5p - %m%n
5.1.2.3、创建包和类
创建包cn.angyan.kafka,并创建KafkaProducerTest类。
5.1.3、开发步骤
- 创建用于连接Kafka的Properties配置
- 创建一个生产者对象KafkaProducer
- 调用send发送1-100消息到指定Topic test,并获取返回值Future,该对象封装了返回值
- 再调用一个Future.get()方法等待响应
- 关闭生产者
5.1.4、代码开发
必要参数配置
public static Properties initConfig() { Properties props = new Properties(); // 该属性指定 brokers 的地址清单,格式为 host:port。清单里不需要包含所有的 broker地址, // 生产者会从给定的 broker 里查找到其它 broker 的信息。——建议至少提供两个 broker的信息,因为一旦其中一个宕机,生产者仍然能够连接到集群上。 props.put("bootstrap.servers", brokerList); // 将 key 转换为字节数组的配置,必须设定为一个实现了org.apache.kafka.common.serialization.Serializer 接口的类, // 生产者会用这个类把键对象序列化为字节数组。 // ——kafka 默认提供了 StringSerializer和 IntegerSerializer、ByteArraySerializer。当然也可以自定义序列化器。 props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); // 和 key.serializer 一样,用于 value 的序列化 props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); // 内容形式如:"producer-1" props.put("client.id", "producer.client.id.demo"); return props; }
public class KafkaProducerTest { public static void main(String[] args) { // 1. 创建用于连接Kafka的Properties配置 Properties props = new Properties(); props.put("bootstrap.servers", "192.168.88.100:9092"); props.put("acks", "all"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 2. 创建一个生产者对象KafkaProducer KafkaProducer
producer = new KafkaProducer (props); // 3. 调用send发送1-100消息到指定Topic test for(int i = 0; i < 100; ++i) { try { // 获取返回值Future,该对象封装了返回值 Future future = producer.send(new ProducerRecord ("test", null, i + "")); // 调用一个Future.get()方法等待响应 future.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } // 5. 关闭生产者 producer.close(); } } 5.1.5、序列化器
消息要到网络上进行传输,必须进行序列化,而序列化器的作用就是如此。
Kafka 提供了默认的字符串序列化器(org.apache.kafka.common.serialization.StringSerializer),
还有整型(IntegerSerializer)和字节数组(BytesSerializer)序列化器,这些序列化器都实现了接口(org.apache.kafka.common.serialization.Serializer)基本上能够满足大部分场景的需求。
5.1.6、自定义序列化器
/** * 自定义序列化器 */ public class CompanySerializer implements Serializer
{ @Override public void configure(Map configs, boolean isKey) { } @Override public byte[] serialize(String topic, Company data) { if (data == null) { return null; } byte[] name, address; try { if (data.getName() != null) { name = data.getName().getBytes("UTF-8"); } else { name = new byte[0]; } if (data.getAddress() != null) { address = data.getAddress().getBytes("UTF-8"); } else { address = new byte[0]; } ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + name.length + address.length); buffer.putInt(name.length); buffer.put(name); buffer.putInt(address.length); buffer.put(address); return buffer.array(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return new byte[0]; } @Override public void close() { } } 使用自定义的序列化器
public class ProducerDefineSerializer { public static final String brokerList = "localhost:9092"; public static final String topic = "angyan"; public static void main(String[] args) throws ExecutionException, InterruptedException { Properties properties = new Properties(); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CompanySerializer.class.getName()); // properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,ProtostuffSerializer.class.getName()); properties.put("bootstrap.servers", brokerList); KafkaProducer
producer =new KafkaProducer<>(properties); Company company = Company.builder().name("kafka").address("北京").build(); // Company company = Company.builder().name("hiddenkafka").address("China").telphone("13000000000").build(); ProducerRecord record =new ProducerRecord<>(topic, company); producer.send(record).get(); } } 5.1.7、分区器
本身kafka有自己的分区策略的,如果未指定,就会使用默认的分区策略:
Kafka根据传递消息的key来进行分区的分配,即hash(key) % numPartitions。如果Key相同的话,那么就会分配到统一分区。
实现自定义分区器需要通过配置参数ProducerConfig.PARTITIONER_CLASS_CONFIG来实现
// 自定义分区器的使用 props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,DefinePartitioner.class.getName());
5.1.8、拦截器
Producer拦截器(interceptor)是个相当新的功能,它和consumer端interceptor是在Kafka 0.10版本被引入的,主要用于实现clients端的定制化控制逻辑。
生产者拦截器可以用在消息发送前做一些准备工作。
使用场景
1、按照某个规则过滤掉不符合要求的消息
2、修改消息的内容
3、统计类需求
实现自定义拦截器之后需要在配置参数中指定这个拦截器,此参数的默认值为空,如下:
// 自定义拦截器使用 props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,ProducerDefineSerializer.class.getName());
5.1.9、retries
生产者从服务器收到的错误有可能是临时性的错误(比如分区找不到首领)。在这种情况下,如果达到了 retires 设置的次数,生产者会放弃重试并返回错误。默认情况下,生产者会在每次重试之间等待100ms,可以通过 retry.backoff.ms 参数来修改这个时间间隔。
5.1.10、batch.size
当有多个消息要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算,而不是消息个数。当批次被填满,批次里的所有消息会被发送出去。不过生产者并不一定都会等到批次被填满才发送,半满的批次,甚至只包含一个消息的批次也可能被发送。所以就算把 batch.size 设置的很大,也不会造成延迟,只会占用更多的内存而已,如果设置的太小,生产者会因为频繁发送消息而增加一些额外的开销。
5.1.11、max.request.size
该参数用于控制生产者发送的请求大小,它可以指定能发送的单个消息的最大值,也可以指单个请求里所有消息的总大小。 broker 对可接收的消息最大值也有自己的限制( message.max.size ),所以两边的配置最好匹配,避免生产者发送的消息被 broker 拒绝。
5.2、从Kafka的topic中消费消息
5.2.1、需求
从 test topic中,将消息都消费,并将记录的offset、key、value打印出来
5.2.2、准备工作
在cn.test.kafka包下创建KafkaConsumerTest类
5.2.3、开发步骤
- 创建Kafka消费者配置
- 创建Kafka消费者
- 订阅要消费的主题
- 使用一个while循环,不断从Kafka的topic中拉取消息
- 将将记录(record)的offset、key、value都打印出来
5.2.4、参考代码
必要参数设置
public static final String brokerList = "localhost:9092"; public static final String groupId = "group.angyan"; public static Properties initConfig() { Properties props = new Properties(); // 与KafkaProducer中设置保持一致 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 必填参数,该参数和KafkaProducer中的相同,制定连接Kafka集群所需的broker地址清单,可以设置一个或者多个 props.put("bootstrap.servers", brokerList); // 消费者隶属于的消费组,默认为空,如果设置为空,则会抛出异常,这个参数要设置成具有一定业务含义的名称 props.put("group.id", groupId); // 指定KafkaConsumer对应的客户端ID,默认为空,如果不设置KafkaConsumer会自动生成一个非空字符串 props.put("client.id", "consumer.client.id.demo"); // 指定消费者拦截器 props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,ConsumerInterceptorTTL.class.getName()); return props; }
public class KafkaProducerTest { public static void main(String[] args) { // 1. 创建用于连接Kafka的Properties配置 Properties props = new Properties(); props.put("bootstrap.servers", "node1.angyan.cn:9092"); props.put("acks", "all"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 2. 创建一个生产者对象KafkaProducer KafkaProducer
producer = new KafkaProducer (props); // 3. 调用send发送1-100消息到指定Topic test for(int i = 0; i < 100; ++i) { try { // 获取返回值Future,该对象封装了返回值 Future future = producer.send(new ProducerRecord ("test", null, i + "")); // 调用一个Future.get()方法等待响应 future.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } // 5. 关闭生产者 producer.close(); } } 5.3、异步使用带有回调函数方法生产消息
如果我们想获取生产者消息是否成功,或者成功生产消息到Kafka中后,执行一些其他动作。此时,可以很方便地使用带有回调函数来发送消息。
需求:
- 在发送消息出现异常时,能够及时打印出异常信息
- 在发送消息成功时,打印Kafka的topic名字、分区id、offset
public class KafkaProducerTest { public static void main(String[] args) { // 1. 创建用于连接Kafka的Properties配置 Properties props = new Properties(); props.put("bootstrap.servers", "node1.angyan.cn:9092"); props.put("acks", "all"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 2. 创建一个生产者对象KafkaProducer KafkaProducer
producer = new KafkaProducer (props); // 3. 调用send发送1-100消息到指定Topic test for(int i = 0; i < 100; ++i) { // 一、同步方式 // 获取返回值Future,该对象封装了返回值 // Future future = producer.send(new ProducerRecord ("test", null, i + "")); // 调用一个Future.get()方法等待响应 // future.get(); // 二、带回调函数异步方式 producer.send(new ProducerRecord ("test", null, i + ""), new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception != null) { System.out.println("发送消息出现异常"); } else { String topic = metadata.topic(); int partition = metadata.partition(); long offset = metadata.offset(); System.out.println("发送消息到Kafka中的名字为" + topic + "的主题,第" + partition + "分区,第" + offset + "条数据成功!"); } } }); } // 5. 关闭生产者 producer.close(); } } 5.4、服务端常用参数配置
参数配置:config/server.properties
broker.id=0 listeners=PLAINTEXT://:9092 # it uses the value for "listeners" if configured. Otherwise, it will use the value #advertised.listeners=PLAINTEXT://your.host.name:9092 #log.dirs=/tmp/kafka-logs log.dirs=/tmp/kafka/log # Zookeeper connection string (see zookeeper docs for details). zookeeper.connect=localhost:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=6000
-
zookeeper.connect
- 指明Zookeeper主机地址,如果zookeeper是集群则以逗号隔开,如172.6.14.61:2181,172.6.14.62:2181,172.6.14.63:2181
-
listeners
- 监听列表,broker对外提供服务时绑定的IP和端口。多个以逗号隔开,如果监听器名称不是一个安全的协议, listener.security.protocol.map也必须设置。主机名称设置0.0.0.0绑定所有的接口,主机名称为空则绑定默认的接口。如:PLAINTEXT://myhost:9092,SSL://:9091,CLIENT://0.0.0.0:9092,REPLICATION://localhost:9093
-
broker.id
- broker的唯一标识符,如果不配置则自动生成,建议配置且一定要保证集群中必须唯一,默认-1
-
log.dirs
- 日志数据存放的目录,如果没有配置则使用log.dir,建议此项配置。
-
message.max.bytes
- 服务器接受单个消息的最大大小,默认1000012 约等于976.6KB。
六、架构
6.1、Kafka重要概念
6.1.1、broker
- Kafka服务器进程,生产者、消费者都要连接broker
- 一个Kafka的集群通常由多个broker组成,这样才能实现负载均衡、以及容错
- broker是无状态(Sateless)的,它们是通过ZooKeeper来维护集群状态
- 一个Kafka的broker每秒可以处理数十万次读写,每个broker都可以处理TB消息而不影响性能
6.1.2、zookeeper
- ZK用来管理和协调broker,并且存储了Kafka的元数据(例如:有多少topic、partition、consumer)
- ZK服务主要用于通知生产者和消费者Kafka集群中有新的broker加入、或者Kafka集群中出现故障的broker。
注意:Kafka正在逐步想办法将ZooKeeper剥离,维护两套集群成本较高,社区提出KIP-500就是要替换掉ZooKeeper的依赖。“Kafka on Kafka”——Kafka自己来管理自己的元数据
6.1.3、producer(生产者)
- 生产者负责将数据推送给broker的topic
6.1.4、consumer(消费者)
- 消费者负责从broker的topic中拉取数据,并自己进行处理
6.1.5、consumer group(消费者组)
- consumer group是kafka提供的可扩展且具有容错性的消费者机制
- 一个消费者组可以包含多个消费者
- 一个消费者组有一个唯一的ID(group Id),配置group.id一样的消费者是属于同一个组中
- 组内的消费者一起消费主题的所有分区数据
6.1.6、分区(Partitions)
- 在Kafka集群中,主题被分为多个分区
- Kafka集群的分布式就是由分区来实现的。一个topic中的消息可以分布在topic中的不同partition中
6.1.7、副本(Replicas)
- 实现Kafkaf集群的容错,实现partition的容错。一个topic至少应该包含大于1个的副本
- 副本可以确保某个服务器出现故障时,确保数据依然可用
- 在Kafka中,一般都会设计副本的个数>1
6.1.8、主题(Topic)
- 主题是一个逻辑概念,用于生产者发布数据,消费者拉取数据
- 一个Kafka集群中,可以包含多个topic。一个topic可以包含多个分区
- Kafka中的主题必须要有标识符,而且是唯一的,Kafka中可以有任意数量的主题,没有数量上的限制
- 在主题中的消息是有结构的,一般一个主题包含某一类消息
- 一旦生产者发送消息到主题中,这些消息就不能被更新(更改)
6.1.9、偏移量(offset)
- offset记录着下一条将要发送给Consumer的消息的序号
- 默认Kafka将offset存储在ZooKeeper中
- 在一个分区中,消息是有顺序的方式存储着,每个在分区的消费都是有一个递增的id。这个就是偏移量offset
- 偏移量在分区中才是有意义的。在分区之间,offset是没有任何意义的
- 相对消费者、partition来说,可以通过offset来拉取数据
6.2、消费者组
- 一个消费者组中可以包含多个消费者,共同来消费topic中的数据
- 一个topic中如果只有一个分区,那么这个分区只能被某个组中的一个消费者消费
- 有多少个分区,那么就可以被同一个组内的多少个消费者消费
七、Kafka生产者幂等性与事务
7.1、幂等性
7.1.1、简介
拿http举例来说,一次或多次请求,得到地响应是一致的(网络超时等问题除外),换句话说,就是执行多次操作与执行一次操作的影响是一样的。
如果,某个系统是不具备幂等性的,如果用户重复提交了某个表格,就可能会造成不良影响。例如:用户在浏览器上点击了多次提交订单按钮,会在后台生成多个一模一样的订单。
7.1.2、Kafka生产者幂等性
在生产者生产消息时,如果出现retry时,有可能会一条消息被发送了多次,如果Kafka不具备幂等性的,就有可能会在partition中保存多条一模一样的消息。
- 生产者消息重复问题
- Kafka生产者生产消息到partition,如果直接发送消息,kafka会将消息保存到分区中,但Kafka会返回一个ack给生产者,表示当前操作是否成功,是否已经保存了这条消息。如果ack响应的过程失败了,此时生产者会重试,继续发送没有发送成功的消息,Kafka又会保存一条一模一样的消息
7.1.3、配置幂等性
props.put("enable.idempotence",true);
7.1.4、幂等性原理
为了实现生产者的幂等性,Kafka引入了 Producer ID(PID)和 Sequence Number的概念。
- PID:每个Producer在初始化时,都会分配一个唯一的PID,这个PID对用户来说,是透明的。
- Sequence Number:针对每个生产者(对应PID)发送到指定主题分区的消息都对应一个从0开始递增的Sequence Number。
- 在Kafka中可以开启幂等性
- 当Kafka的生产者生产消息时,会增加一个pid(生产者的唯一编号)和sequence number(针对消息的一个递增序列)
- 发送消息,会连着pid和sequence number一块发送
- kafka接收到消息,会将消息和pid、sequence number一并保存下来
- 如果ack响应失败,生产者重试,再次发送消息时,Kafka会根据pid、sequence number是否需要再保存一条消息
- 判断条件:生产者发送过来的sequence number 是否小于等于 partition中消息对应的sequence
7.2、Kafka事务
7.2.1、简介
Kafka事务是2017年Kafka 0.11.0.0引入的新特性。类似于数据库的事务。Kafka事务指的是生产者生产消息以及消费者提交offset的操作可以在一个原子操作中,要么都成功,要么都失败。尤其是在生产者、消费者并存时,事务的保障尤其重要。(consumer-transform-producer模式)
7.2.2、事务操作API
Producer接口中定义了以下5个事务相关方法:
- initTransactions(初始化事务):要使用Kafka事务,必须先进行初始化操作
- beginTransaction(开始事务):启动一个Kafka事务
- sendOffsetsToTransaction(提交偏移量):批量地将分区对应的offset发送到事务中,方便后续一块提交
- commitTransaction(提交事务):提交事务
- abortTransaction(放弃事务):取消事务
7.3、【理解】Kafka事务编程
7.3.1、事务相关属性配置
7.3.1.1、生产者
// 配置事务的id,开启了事务会默认开启幂等性 props.put("transactional.id", "first-transactional");
-
生产者
- 初始化事务
- 开启事务
- 需要使用producer来将消费者的offset提交到事务中
- 提交事务
- 如果出现异常回滚事务
如果使用了事务,不要使用异步发送
7.3.1.2、消费者
// 1. 消费者需要设置隔离级别 props.put("isolation.level","read_committed"); // 2. 关闭自动提交 props.put("enable.auto.commit", "false");
7.3.2、Kafka事务编程
7.3.2.1、需求
在Kafka的topic 「ods_user」中有一些用户数据,数据格式如下:
姓名,性别,出生日期 张三,1,1980-10-09 李四,0,1985-11-01
我们需要编写程序,将用户的性别转换为男、女(1-男,0-女),转换后将数据写入到topic 「dwd_user」中。要求使用事务保障,要么消费了数据同时写入数据到 topic,提交offset。要么全部失败。
7.3.2.2、启动生产者控制台程序模拟数据
# 创建名为ods_user和dwd_user的主题 bin/kafka-topics.sh --create --bootstrap-server node1.angyan.cn:9092 --topic ods_user bin/kafka-topics.sh --create --bootstrap-server node1.angyan.cn:9092 --topic dwd_user # 生产数据到 ods_user bin/kafka-console-producer.sh --broker-list node1.angyan.cn:9092 --topic ods_user # 从dwd_user消费数据 bin/kafka-console-consumer.sh --bootstrap-server node1.angyan.cn:9092 --topic dwd_user --from-beginning --isolation-level read_committed
7.3.2.3、编写创建消费者代码
编写一个方法 createConsumer,该方法中返回一个消费者,订阅「ods_user」主题。注意:需要配置事务隔离级别、关闭自动提交。
实现步骤:
- 创建Kafka消费者配置
Properties props = new Properties(); props.setProperty("bootstrap.servers", "node1.angyan.cn:9092"); props.setProperty("group.id", "ods_user"); props.put("isolation.level","read_committed"); props.setProperty("enable.auto.commit", "false"); props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- 创建消费者,并订阅 ods_user 主题
// 1. 创建消费者 public static Consumer
createConsumer() { // 1. 创建Kafka消费者配置 Properties props = new Properties(); props.setProperty("bootstrap.servers", "node1.angyan.cn:9092"); props.setProperty("group.id", "ods_user"); props.put("isolation.level","read_committed"); props.setProperty("enable.auto.commit", "false"); props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 2. 创建Kafka消费者 KafkaConsumer consumer = new KafkaConsumer<>(props); // 3. 订阅要消费的主题 consumer.subscribe(Arrays.asList("ods_user")); return consumer; } 7.3.2.4、编写创建生产者代码
编写一个方法 createProducer,返回一个生产者对象。注意:需要配置事务的id,开启了事务会默认开启幂等性。
- 创建生产者配置
Properties props = new Properties(); props.put("bootstrap.servers", "node1.angyan.cn:9092"); props.put("transactional.id", "dwd_user"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- 创建生产者对象
public static Producer
createProduceer() { // 1. 创建生产者配置 Properties props = new Properties(); props.put("bootstrap.servers", "node1.angyan.cn:9092"); props.put("transactional.id", "dwd_user"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 2. 创建生产者 Producer producer = new KafkaProducer<>(props); return producer; } 7.3.2.5、编写代码消费并生产数据
实现步骤:
- 调用之前实现的方法,创建消费者、生产者对象
- 生产者调用initTransactions初始化事务
- 编写一个while死循环,在while循环中不断拉取数据,进行处理后,再写入到指定的topic
(1) 生产者开启事务
(2) 消费者拉取消息
(3) 遍历拉取到的消息,并进行预处理(将1转换为男,0转换为女)
(4) 生产消息到dwd_user topic中
(5) 提交偏移量到事务中
(6) 提交事务
(7) 捕获异常,如果出现异常,则取消事务
public static void main(String[] args) { Consumer
consumer = createConsumer(); Producer producer = createProducer(); // 初始化事务 producer.initTransactions(); while(true) { try { // 1. 开启事务 producer.beginTransaction(); // 2. 定义Map结构,用于保存分区对应的offset Map offsetCommits = new HashMap<>(); // 2. 拉取消息 ConsumerRecords records = consumer.poll(Duration.ofSeconds(2)); for (ConsumerRecord record : records) { // 3. 保存偏移量 offsetCommits.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() + 1)); // 4. 进行转换处理 String[] fields = record.value().split(","); fields[1] = fields[1].equalsIgnoreCase("1") ? "男":"女"; String message = fields[0] + "," + fields[1] + "," + fields[2]; // 5. 生产消息到dwd_user producer.send(new ProducerRecord<>("dwd_user", message)); } // 6. 提交偏移量到事务 producer.sendOffsetsToTransaction(offsetCommits, "ods_user"); // 7. 提交事务 producer.commitTransaction(); } catch (Exception e) { // 8. 放弃事务 producer.abortTransaction(); } } } 7.3.2.6、测试
7.3.2.7、模拟异常测试事务
// 3. 保存偏移量 offsetCommits.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() + 1)); // 4. 进行转换处理 String[] fields = record.value().split(","); fields[1] = fields[1].equalsIgnoreCase("1") ? "男":"女"; String message = fields[0] + "," + fields[1] + "," + fields[2]; // 模拟异常 int i = 1/0; // 5. 生产消息到dwd_user producer.send(new ProducerRecord<>("dwd_user", message));
启动程序一次,抛出异常。
再启动程序一次,还是抛出异常。
直到我们处理该异常为止。
我们发现,可以消费到消息,但如果中间出现异常的话,offset是不会被提交的,除非消费、生产消息都成功,才会提交事务。
八、分区和副本机制
8.1 生产者分区写入策略
生产者写入消息到topic,Kafka将依据不同的策略将数据分配到不同的分区中
- 轮询分区策略
- 随机分区策略
- 按key分区分配策略
- 自定义分区策略
8.1.1、轮询策略
- 默认的策略,也是使用最多的策略,可以最大限度保证所有消息平均分配到一个分区
- 如果在生产消息时,key为null,则使用轮询算法均衡地分配分区
8.1.2、随机策略(不用)
随机策略,每次都随机地将消息分配到每个分区。在较早的版本,默认的分区策略就是随机策略,也是为了将消息均衡地写入到每个分区。但后续轮询策略表现更佳,所以基本上很少会使用随机策略。
8.1.3、按key分配策略
按key分配策略,有可能会出现「数据倾斜」,key.hash() % 分区的数量。例如:某个key包含了大量的数据,因为key值一样,所有所有的数据将都分配到一个分区中,造成该分区的消息数量远大于其他的分区。
8.1.4、乱序问题
轮询策略、随机策略都会导致一个问题,生产到Kafka中的数据是乱序存储的。而按key分区可以一定程度上实现数据有序存储——也就是局部有序,但这又可能会导致数据倾斜,所以在实际生产环境中要结合实际情况来做取舍。
- 在Kafka中生产者是有写入策略,如果topic有多个分区,就会将数据分散在不同的partition中存储
- 当partition数量大于1的时候,数据(消息)会打散分布在不同的partition中
- 如果只有一个分区,消息是有序的
8.1.5、自定义分区策略
实现步骤:
- 创建自定义分区器
public class KeyWithRandomPartitioner implements Partitioner { private Random r; @Override public void configure(Map
configs) { r = new Random(); } @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { // cluster.partitionCountForTopic 表示获取指定topic的分区数量 return r.nextInt(1000) % cluster.partitionCountForTopic(topic); } @Override public void close() { } } - 在Kafka生产者配置中,自定使用自定义分区器的类名
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, KeyWithRandomPartitioner.class.getName());
8.2、消费者组Rebalance机制
8.2.1、Rebalance再均衡
Kafka中的Rebalance称之为再均衡,是Kafka中确保Consumer group下所有的consumer如何达成一致,分配订阅的topic的每个分区的机制。
Rebalance触发的时机有:
- 消费者组中consumer的个数发生变化。例如:有新的consumer加入到消费者组,或者是某个consumer停止了。
-
订阅的topic个数发生变化
消费者可以订阅多个主题,假设当前的消费者组订阅了三个主题,但有一个主题突然被删除了,此时也需要发生再均衡。
-
订阅的topic分区数发生变化
8.2.2、Rebalance的不良影响
- 发生Rebalance时,consumer group下的所有consumer都会协调在一起共同参与,Kafka使用分配策略尽可能达到最公平的分配
- Rebalance过程会对consumer group产生非常严重的影响,Rebalance的过程中所有的消费者都将停止工作,直到Rebalance完成
8.3、消费者分区分配策略
保障每个消费者尽量能够均衡地消费分区的数据,不能出现某个消费者消费分区的数量特别多,某个消费者消费的分区特别少
8.3.1、Range范围分配策略
Range范围分配策略是Kafka默认的分配策略,它可以确保每个消费者消费的分区数量是均衡的。
注意:Rangle范围分配策略是针对每个Topic的。
配置
配置消费者的partition.assignment.strategy为org.apache.kafka.clients.consumer.RangeAssignor。
算法公式
n = 分区数量 / 消费者数量
m = 分区数量 % 消费者数量
前m个消费者消费n+1个
剩余消费者消费n个
8.3.2、RoundRobin轮询策略
RoundRobinAssignor轮询策略是将消费组内所有消费者以及消费者所订阅的所有topic的partition按照字典序排序(topic和分区的hashcode进行排序),然后通过轮询方式逐个将分区以此分配给每个消费者。
消费者挨个分配消费的分区
配置
配置消费者的partition.assignment.strategy为org.apache.kafka.clients.consumer.RoundRobinAssignor。
8.3.3、Stricky粘性分配策略
从Kafka 0.11.x开始,引入此类分配策略。主要目的:
- 分区分配尽可能均匀
- 在发生rebalance的时候,分区的分配尽可能与上一次分配保持相同
没有发生rebalance时,Striky粘性分配策略和RoundRobin分配策略类似。
发生了rebalance,轮询分配策略,重新走一遍轮询分配的过程。而粘性会保证跟上一次的尽量一致,只是将新的需要分配的分区,均匀的分配到现有可用的消费者中即可
减少上下文的切换
上面如果consumer2崩溃了,此时需要进行rebalance。如果是Range分配和轮询分配都会重新进行分配,例如:
通过上图,我们发现,consumer0和consumer1原来消费的分区大多发生了改变。接下来我们再来看下粘性分配策略。
我们发现,Striky粘性分配策略,保留rebalance之前的分配结果。这样,只是将原先consumer2负责的两个分区再均匀分配给consumer0、consumer1。这样可以明显减少系统资源的浪费,例如:之前consumer0、consumer1之前正在消费某几个分区,但由于rebalance发生,导致consumer0、consumer1需要重新消费之前正在处理的分区,导致不必要的系统开销。(例如:某个事务正在进行就必须要取消了)
8.4 副本机制
副本的目的就是冗余备份,当某个Broker上的分区数据丢失时,依然可以保障数据可用。因为在其他的Broker上的副本是可用的。
8.4.1、producer的acks参数
对副本关系较大的就是,producer配置的acks参数了,acks参数表示当生产者生产消息的时候,写入到副本的要求严格程度。它决定了生产者如何在性能和可靠性之间做取舍。
注意:acks参数配置的是一个字符串类型,而不是整数类型,如果配置为整数类型会抛出异常
配置:
Properties props = new Properties(); props.put("bootstrap.servers", "node1.angyan.cn:9092"); props.put("acks", "all"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
8.4.2、acks配置为0
- 生产者只管写入,不管是否写入成功,可能会数据丢失。性能是最好的
- 生产者在成功写入消息之前不会等待任何来自服务器的相应。如果出现问题生产者是感知不到的,消息就丢失了。不过因为生产者不需要等待服务器响应,所以它可以以网络能够支持的最大速度发送消息,从而达到很高的吞吐量。
ACK为0,基准测试:
bin/kafka-producer-perf-test.sh --topic benchmark --num-records 5000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=node1.angyan.cn:9092,node2.angyan.cn:9092,node3.angyan.cn:9092 acks=0
8.4.3、acks配置为1
- 当生产者的ACK配置为1时,生产者会等待leader副本确认接收后,才会发送下一条数据,性能中等。
- 默认值为1,只要集群的首领节点收到消息,生产这就会收到一个来自服务器的成功响应。如果消息无法达到首领节点(比如首领节点崩溃,新的首领还没有被选举出来),生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。但是,这样还有可能会导致数据丢失,如果收到写成功通知,此时首领节点还没来的及同步数据到follower节点,首领节点崩溃,就会导致数据丢失。
8.4.4、acks配置为-1或者all
- 确保消息写入到leader分区、还确保消息写入到对应副本都成功后,接着发送下一条,性能是最差的
- 只有当所有参与复制的节点都收到消息时,生产这会收到一个来自服务器的成功响应,这种模式是最安全的,它可以保证不止一个服务器收到消息。
ACK基准测试:
bin/kafka-producer-perf-test.sh --topic benchmark --num-records 5000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=node1.angyan.cn:9092,node2.angyan.cn:9092,node3.angyan.cn:9092 acks=all
根据业务情况来选择ack机制,是要求性能最高,一部分数据丢失影响不大,可以选择0/1。如果要求数据一定不能丢失,就得配置为-1/all。
分区中是有leader和follower的概念,为了确保消费者消费的数据是一致的,只能从分区leader去读写消息,follower做的事情就是同步数据,Backup。
九、高级(High Level)API与低级(Low Level)API
- 高级API就是直接让Kafka帮助管理、处理分配、数据
- offset存储在ZK中
- 由kafka的rebalance来控制消费者分配的分区
- 开发起来比较简单,无需开发者关注底层细节
- 无法做到细粒度的控制
- 低级API:由编写的程序自己控制逻辑
- 自己来管理Offset,可以将offset存储在ZK、MySQL、Redis、HBase、Flink的状态存储
- 指定消费者拉取某个分区的数据
- 可以做到细粒度的控制
- 原有的Kafka的策略会失效,需要我们自己来实现消费机制
9.1、高级API
/** * 消费者程序:从test主题中消费数据 */ public class consumerTest { public static void main(String[] args) { // 1. 创建Kafka消费者配置 Properties props = new Properties(); props.setProperty("bootstrap.servers", "192.168.88.100:9092"); props.setProperty("group.id", "test"); props.setProperty("enable.auto.commit", "true"); props.setProperty("auto.commit.interval.ms", "1000"); props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 2. 创建Kafka消费者 KafkaConsumer
consumer = new KafkaConsumer<>(props); // 3. 订阅要消费的主题 consumer.subscribe(Arrays.asList("test")); // 4. 使用一个while循环,不断从Kafka的topic中拉取消息 while (true) { // 定义100毫秒超时 ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } } - 上面是之前编写的代码,消费Kafka的消息很容易实现,写起来比较简单
- 不需要执行去管理offset,直接通过ZK管理;也不需要管理分区、副本,由Kafka统一管理
- 消费者会自动根据上一次在ZK中保存的offset去接着获取数据
- 在ZK中,不同的消费者组(group)同一个topic记录不同的offset,这样不同程序读取同一个topic,不会受offset的影响
高级API的缺点
-
不能控制offset,例如:想从指定的位置读取
-
不能细化控制分区、副本、ZK等
9.2、低级API
通过使用低级API,我们可以自己来控制offset,想从哪儿读,就可以从哪儿读。而且,可以自己控制连接分区,对分区自定义负载均衡。而且,之前offset是自动保存在ZK中,使用低级API,我们可以将offset不一定要使用ZK存储,我们可以自己来存储offset。例如:存储在文件、MySQL、或者内存中。但是低级API,比较复杂,需要执行控制offset,连接到哪个分区,并找到分区的leader。
9.3、手动消费分区数据
之前的代码,我们让Kafka根据消费组中的消费者动态地为topic分配要消费的分区。但在某些时候,我们需要指定要消费的分区,例如:
- 如果某个程序将某个指定分区的数据保存到外部存储中,例如:Redis、MySQL,那么保存数据的时候,只需要消费该指定的分区数据即可
- 如果某个程序是高可用的,在程序出现故障时将自动重启(例如:后面我们将学习的Flink、Spark程序)。这种情况下,程序将从指定的分区重新开始消费数据。
如何进行手动消费分区中的数据呢?
- 不再使用之前的 subscribe 方法订阅主题,而使用 「assign」方法指定想要消费的消息
String topic = "test"; TopicPartition partition0 = new TopicPartition(topic, 0); TopicPartition partition1 = new TopicPartition(topic, 1); consumer.assign(Arrays.asList(partition0, partition1));
- 一旦指定了分区,就可以就像前面的示例一样,在循环中调用「poll」方法消费消息
注意
- 当手动管理消费分区时,即使GroupID是一样的,Kafka的组协调器都将不再起作用
- 如果消费者失败,也将不再自动进行分区重新分配
十、监控工具Kafka-eagle介绍
10.1、Kafka-Eagle简介
在开发工作中,当业务前提不复杂时,可以使用Kafka命令来进行一些集群的管理工作。但如果业务变得复杂,例如:我们需要增加group、topic分区,此时,我们再使用命令行就感觉很不方便,此时,如果使用一个可视化的工具帮助我们完成日常的管理工作,将会大大提高对于Kafka集群管理的效率,而且我们使用工具来监控消费者在Kafka中消费情况。
早期,要监控Kafka集群我们可以使用Kafka Monitor以及Kafka Manager,但随着我们对监控的功能要求、性能要求的提高,这些工具已经无法满足。
Kafka Eagle是一款结合了目前大数据Kafka监控工具的特点,重新研发的一块开源免费的Kafka集群优秀的监控工具。它可以非常方便的监控生产环境中的offset、lag变化、partition分布、owner等。
官网地址:https://www.kafka-eagle.org/
10.2、安装Kafka-Eagle
10.2.1、开启Kafka JMX端口
10.2.1.1、JMX接口
JMX(Java Management Extensions)是一个为应用程序植入管理功能的框架。JMX是一套标准的代理和服务,实际上,用户可以在任何Java应用程序中使用这些代理和服务实现管理。很多的一些软件都提供了JMX接口,来实现一些管理、监控功能。
10.2.1.2、开启Kafka JMX
在启动Kafka的脚本前,添加:
cd ${KAFKA_HOME} export JMX_PORT=9988 nohup bin/kafka-server-start.sh config/server.properties &
10.2.2、安装Kafka-Eagle
- 安装JDK,并配置好JAVA_HOME。
- 将kafka_eagle上传,并解压到 /export/server 目录中。
cd cd /export/software/ tar -xvzf kafka-eagle-bin-1.4.6.tar.gz -C ../server/ cd /export/server/kafka-eagle-bin-1.4.6/ tar -xvzf kafka-eagle-web-1.4.6-bin.tar.gz cd /export/server/kafka-eagle-bin-1.4.6/kafka-eagle-web-1.4.6
- 配置 kafka_eagle 环境变量。
vim /etc/profile
export KE_HOME=/export/server/kafka-eagle-bin-1.4.6/kafka-eagle-web-1.4.6 export PATH=$PATH:$KE_HOME/bin
source /etc/profile
- 配置 kafka_eagle。使用vi打开conf目录下的system-config.properties
vim conf/system-config.properties
# 修改第4行,配置kafka集群别名 kafka.eagle.zk.cluster.alias=cluster1 # 修改第5行,配置ZK集群地址 cluster1.zk.list=node1.angyan.cn:2181,node2.angyan.cn:2181,node3.angyan.cn:2181 # 注释第6行 #cluster2.zk.list=xdn10:2181,xdn11:2181,xdn12:2181 # 修改第32行,打开图标统计 kafka.eagle.metrics.charts=true kafka.eagle.metrics.retain=30 # 注释第69行,取消sqlite数据库连接配置 #kafka.eagle.driver=org.sqlite.JDBC #kafka.eagle.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db #kafka.eagle.username=root #kafka.eagle.password=www.kafka-eagle.org # 修改第77行,开启mys kafka.eagle.driver=com.mysql.jdbc.Driver kafka.eagle.url=jdbc:mysql://node1.angyan.cn:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull kafka.eagle.username=root kafka.eagle.password=123456
- 配置JAVA_HOME
cd /export/server/kafka-eagle-bin-1.4.6/kafka-eagle-web-1.4.6/bin vim ke.sh # 在第24行添加JAVA_HOME环境配置 export JAVA_HOME=/export/server/jdk1.8.0_241
- 修改Kafka eagle可执行权限
cd /export/server/kafka-eagle-bin-1.4.6/kafka-eagle-web-1.4.6/bin chmod +x ke.sh
- 启动 kafka_eagle。
./ke.sh start
- 访问Kafka eagle,默认用户为admin,密码为:123456
http://node1.angyan.cn:8048/ke
10.3、Kafka度量指标
指标 意义 Brokers Spread broker使用率 Brokers Skew 分区是否倾斜 Brokers Leader Skew leader partition是否存在倾斜 十一、Kafka原理
11.1 、分区的leader与follower
11.1.1、Leader和Follower
在Kafka中,每个topic都可以配置多个分区以及多个副本。每个分区都有一个leader以及0个或者多个follower,在创建topic时,Kafka会将每个分区的leader均匀地分配在每个broker上。我们正常使用kafka是感觉不到leader、follower的存在的。但其实,所有的读写操作都是由leader处理,而所有的follower都复制leader的日志数据文件,如果leader出现故障时,follower就会被选举为leader。所以,可以这样说:
- Kafka中的leader负责处理读写操作,而follower只负责副本数据的同步
- 如果leader出现故障,其他follower会被重新选举为leader
- follower像一个consumer一样,拉取leader对应分区的数据,并保存到日志数据文件中
- Kafka中的leader和follower是相对分区有意义,不是相对broker
- Kafka在创建topic的时候,会尽量分配分区的leader在不同的broker中,其实就是负载均衡
- leader职责:读写数据
- follower职责:同步数据、参与选举(leader crash之后,会选举一个follower重新成为分区的leader
- 注意和ZooKeeper区分
- ZK的leader负责读、写,follower可以读取
- Kafka的leader负责读写、follower不能读写数据(确保每个消费者消费的数据是一致的),Kafka一个topic有多个分区leader,一样可以实现数据操作的负载均衡
11.1.2、查看某个partition的leader
使用Kafka-eagle查看某个topic的partition的leader在哪个服务器中。为了方便观察,我们创建一个名为test的3个分区、3个副本的topic。
11.1.3、AR、ISR、OSR
在实际环境中,leader有可能会出现一些故障,所以Kafka一定会选举出新的leader。在讲解leader选举之前,我们先要明确几个概念。Kafka中,把follower可以按照不同状态分为三类——AR、ISR、OSR。
- 一个topic下的所有副本称为 「AR」(Assigned Replicas——已分配的副本)
- 所有与leader副本保持一定程度同步的副本(包括 leader 副本在内)组成 「ISR」(In-Sync Replicas——正在同步的副本,可以理解为当前有几个follower是存活的)
- 由于follower副本同步滞后过多的副本(不包括 leader 副本)组成 「OSR」(Out-of-Sync Replias——不再同步的副本)
- AR = ISR + OSR
- 正常情况下,所有的follower副本都应该与leader副本保持同步,即AR = ISR,OSR集合为空。
11.1.4、查看分区的ISR
- 使用Kafka Eagle查看某个Topic的partition的ISR有哪几个节点。
- 尝试关闭id为0的broker(杀掉该broker的进程),参看topic的ISR情况。
11.1.5、Leader选举
leader对于消息的写入以及读取是非常关键的,此时有两个疑问:
- Kafka如何确定某个partition是leader、哪个partition是follower呢?
- 某个leader崩溃了,如何快速确定另外一个leader呢?因为Kafka的吞吐量很高、延迟很低,所以选举leader必须非常快
11.1.5.1、如果leader崩溃,Kafka会如何?
leader在崩溃后,Kafka又从其他的follower中快速选举出来了leader。
11.1.5.2、Controller介绍
- controller是kafka集群的老大,是针对Broker的一个角色
- Leader是针对partition的一个角色,是通过ISR来进行快速选举
- Kafka启动时,会在所有的broker中选择一个controller
- 前面leader和follower是针对partition,而controller是针对broker的
- 创建topic、或者添加分区、修改副本数量之类的管理任务都是由controller完成的
- Kafka分区leader的选举,也是由controller决定的
- Controller是高可用的,是用过ZK来进行选举
11.1.5.3、Controller的选举
- 在Kafka集群启动的时候,每个broker都会尝试去ZooKeeper上注册成为Controller(ZK临时节点)
- 但只有一个竞争成功,其他的broker会注册该节点的监视器
- 一点该临时节点状态发生变化,就可以进行相应的处理
- Controller也是高可用的,一旦某个broker崩溃,其他的broker会重新注册为Controller
- 如果Kafka是基于ZK来进行选举,ZK的压力可能会比较大。例如:某个节点崩溃,这个节点上不仅仅只有一个leader,是有不少的leader需要选举。通过ISR快速进行选举。
11.1.5.4、找到当前Kafka集群的controller
- 点击Kafka Tools的「Tools」菜单,找到「ZooKeeper Brower…」
- 点击左侧树形结构的controller节点,就可以查看到哪个broker是controller了。
11.1.5.5、测试controller选举
通过kafka tools找到controller所在的broker对应的kafka进程,杀掉该进程,重新打开ZooKeeper brower,观察kafka是否能够选举出来新的Controller。
11.1.5.6、Controller选举partition leader
- 所有Partition的leader选举都由controller决定
- controller会将leader的改变直接通过RPC的方式通知需为此作出响应的Broker
- controller读取到当前分区的ISR,只要有一个Replica还幸存,就选择其中一个作为leader否则,则任意选这个一个Replica作为leader
- 如果该partition的所有Replica都已经宕机,则新的leader为-1
为什么不能通过ZK的方式来选举partition的leader?
- Kafka集群如果业务很多的情况下,会有很多的partition
- 假设某个broker宕机,就会出现很多的partiton都需要重新选举leader
- 如果使用zookeeper选举leader,会给zookeeper带来巨大的压力。所以,kafka中leader的选举不能使用ZK来实现
11.1.6、leader负载均衡
11.1.6.1、Preferred Replica
- Kafka中引入了一个叫做「preferred-replica」的概念,意思就是:优先的Replica
- 在ISR列表中,第一个replica就是preferred-replica
- 第一个分区存放的broker,肯定就是preferred-replica
- 执行以下脚本可以将preferred-replica设置为leader,均匀分配每个分区的leader。
./kafka-leader-election.sh --bootstrap-server node1.angyan.cn:9092 --topic 主题 --partition=1 --election-type preferred
11.1.6.2、确保leader在broker中负载均衡
杀掉test主题的某个broker,这样kafka会重新分配leader。等到Kafka重新分配leader之后,再次启动kafka进程。此时:观察test主题各个分区leader的分配情况。
此时,会造成leader分配是不均匀的,所以可以执行以下脚本来重新分配leader:
bin/kafka-leader-election.sh --bootstrap-server node1.angyan.cn:9092 --topic test --partition=2 --election-type preferred
–partition:指定需要重新分配leader的partition编号
11.2、Kafka生产、消费数据工作流程
11.2.1、Kafka数据写入流程
-
生产者先从 zookeeper 的 "/brokers/topics/主题名/partitions/分区名/state"节点找到该 partition 的leader
-
生产者在ZK中找到该ID找到对应的broker
-
broker进程上的leader将消息写入到本地log中
-
follower从leader上拉取消息,写入到本地log,并向leader发送ACK
-
leader接收到所有的ISR中的Replica的ACK后,并向生产者返回ACK。
消息发送的过程中,涉及到两个线程协同工作,主线程首先将业务数据封装成ProducerRecord对象,之后调用send()方法将消息放入RecordAccumulator(消息收集器,也可以理解为主线程与Sender线程直接的缓冲区)中暂存,Sender线程负责将消息信息构成请求,并最终执行网络I/O的线程,它从RecordAccumulator中取出消息并批量发送出去,需要注意的是,KafkaProducer是线程安全的,多个线程间可以共享使用同一个KafkaProducer对象
11.2.2、Kafka数据消费流程
11.2.2.1、两种消费模式
-
kafka采用拉取模型,由消费者自己记录消费状态,每个消费者互相独立地顺序拉取每个分区的消息
-
消费者可以按照任意的顺序消费消息。比如,消费者可以重置到旧的偏移量,重新处理之前已经消费过的消息;或者直接跳到最近的位置,从当前的时刻开始消费。
11.2.2.2、Kafka消费数据流程
- 每个consumer都可以根据分配策略(默认RangeAssignor),获得要消费的分区
- 获取到consumer对应的offset(默认从ZK中获取上一次消费的offset)
- 找到该分区的leader,拉取数据
- 消费者提交offset
11.3、Kafka的数据存储形式
- Kafka的数据组织结构
- topic
- partition
- segment
- .log数据文件
- .index(稀疏索引)
- .timeindex(根据时间做的索引)
- 一个topic由多个分区组成
- 一个分区(partition)由多个segment(段)组成
- 一个segment(段)由多个文件组成(log、index、timeindex)
11.3.1、存储日志
我们来看一下Kafka中的数据到底是如何在磁盘中存储的。
- Kafka中的数据是保存在 /export/server/kafka_2.12-2.4.1/data中
- 消息是保存在以:「主题名-分区ID」的文件夹中的
- 数据文件夹中包含以下内容:
这些分别对应:
文件名 说明 00000000000000000000.index 索引文件,根据offset查找数据就是通过该索引文件来操作的 00000000000000000000.log 日志数据文件 00000000000000000000.timeindex 时间索引 leader-epoch-checkpoint 持久化每个partition leader对应的LEO(log end offset、日志文件中下一条待写入消息的offset) - 每个日志文件的文件名为起始偏移量,因为每个分区的起始偏移量是0,所以,分区的日志文件都以0000000000000000000.log开始
- 默认的每个日志文件最大为「log.segment.bytes =102410241024」1G
- 为了简化根据offset查找消息,Kafka日志文件名设计为开始的偏移量
11.3.1.1、观察测试
为了方便测试观察,新创建一个topic:「test_10m」,该topic每个日志数据文件最大为10M
bin/kafka-topics.sh --create --zookeeper node1.angyan.cn --topic test_10m --replication-factor 2 --partitions 3 --config segment.bytes=10485760
使用之前的生产者程序往「test_10m」主题中生产数据,可以观察到如下:
11.3.1.2、写入消息
- 新的消息总是写入到最后的一个日志文件中
- 该文件如果到达指定的大小(默认为:1GB)时,将滚动到一个新的文件中
11.3.1.3、读取消息
- 根据「offset」首先需要找到存储数据的 segment 段(注意:offset指定分区的全局偏移量)
- 然后根据这个「全局分区offset」找到相对于文件的「segment段offset」
- 最后再根据 「segment段offset」读取消息
- 为了提高查询效率,每个文件都会维护对应的范围内存,查找的时候就是使用简单的二分查找
11.3.1.4、删除消息
- 在Kafka中,消息是会被定期清理的。一次删除一个segment段的日志文件
- Kafka的日志管理器,会根据Kafka的配置,来决定哪些文件可以被删除
11.3.2、消息传递的语义性
Flink里面有对应的每种不同机制的保证,提供Exactly-Once保障(二阶段事务提交方式)
- At-most once:最多一次(只管把数据消费到,不管有没有成功,可能会有数据丢失)
- At-least once:最少一次(有可能会出现重复消费)
- Exactly-Once:仅有一次(事务性性的保障,保证消息有且仅被处理一次)
11.4、消息不丢失机制
11.4.1、broker数据不丢失
消息丢失
- 从ZK中拉取offset,读取消息
- 在Java程序中处理这条消息,并将处理后的结果写入到存储中
- 在写入的时候,出现了故障,导致写入失败
- 这个时候又提交offset到ZK中
- 下一次就会从新的offset开始消费,之前的offset就丢失了
消息重复消费
- 根据offset来消费partition中的数据
- 消费者Java程序处理数据,并将结果写入到存储中
- 将offset提交到ZK中,提交失败
- 写入到存储中是成功的,但是写入到ZK中是失败的,会出现重复消费
如何防止消息的重复消费
一条消息被消费者消费多次。如果为了消息的不重复消费,而把生产端的重试机制关闭、消费端的手动提交改成自动提交,这样反而会出现消息丢失,那么可以直接在防止消息丢失的手段上再加上消费消息时的幂等性保证,就能解决消息的重复消费问题。
幂等性如何保证:
- mysql 插入业务id作为主键,主键是唯一的,所以一次只能插入一条。
- 使用redis或zk的分布式锁 (主流的方案)
解决消息重复消费
- 根据offset来消费partition中的数据
- 消费者Java程序处理数据,并将结果写入到存储中(开启事务)
- 将offset提交到ZK中,只有都成功以后提交事务,如果不成功回滚事务
- 通过低级(Low Level)来控制offset
生产者通过分区的leader写入数据后,所有在ISR中follower都会从leader中复制数据,这样,可以确保即使leader崩溃了,其他的follower的数据仍然是可用的
如何防止消息丢失
- 发送方:ack是1 或者-1/all 可以防止消息丢失,如果要做到99.9999%,ack设成all,把min.insync.replicas配置成分区备份数。
- 消费方:把自动提交改为手动提交
如何做到顺序消费
- 发送方:在发送时将ack不能设置0,关闭重试,使用同步发送,等到发送成功再发送下一条。确保消息是顺序发送的。
- 接收方:消息是发送到一个分区中,只能有一个消费组的消费者来接收消息。因此,kafka的顺序消费会牺牲掉性能。
11.4.2、生产者数据不丢失
-
生产者连接leader写入数据时,可以通过ACK机制来确保数据已经成功写入。ACK机制有三个可选配置
-
配置ACK响应要求为 -1 时 —— 表示所有的节点都收到数据(leader和follower都接
收到数据)
-
配置ACK响应要求为 1 时 —— 表示leader收到数据
-
配置ACK影响要求为 0 时 —— 生产者只负责发送数据,不关心数据是否丢失(这种情况可能会产生数据丢失,但性能是最好的)
-
生产者可以采用同步和异步两种方式发送数据
-
同步:发送一批数据给kafka后,等待kafka返回结果
-
异步:发送一批数据给kafka,只是提供一个回调函数。
说明:如果broker迟迟不给ack,而buffer又满了,开发者可以设置是否直接清空buffer中的数据。
11.4.3、消费者数据不丢失
在消费者消费数据的时候,只要每个消费者记录好offset值即可,就能保证数据不丢失。
11.5、数据积压
Kafka消费者消费数据的速度是非常快的,但如果由于处理Kafka消息时,由于有一些外部IO、或者是产生网络拥堵,就会造成Kafka中的数据积压(或称为数据堆积)。如果数据一直积压,会导致数据出来的实时性受到较大影响。
消息的消费者的消费速度远赶不上生产者的生产消息的速度,导致kafka中有大量的数据没有被消费。随着没有被消费的数据堆积越多消费者寻址的性能会越来越差,最后导致整个kafka对外提供的服务的性能很差,从而造成其他服务也访问速度变慢,造成服务雪崩。
11.5.1、使用Kafka-Eagle查看数据积压情况
11.5.2、解决数据积压问题
- 在这个消费者中,使用多线程,充分利用机器的性能进行消费消息
- 通过业务的架构设计,提升业务层面消费的性能。
- 创建多个消费组,多个消费者,部署到其他机器上,一起消费,提高消费者的消费速度
- 创建一个消费者,该消费者在kafka另建一个主题,配上多个分区,多个分区再配上多个消费者。该消费者将pol下来的消息,不进行消费,直接转发到新建的主题上。此时,新的主题的多个分区的多个消费者就开始一起消费了。一一不常用
11.5.2.1、数据写入MySQL失败
问题描述
某日运维人员找到开发人员,说某个topic的一个分区发生数据积压,这个topic非常重要,而且开始有用户投诉。运维非常紧张,赶紧重启了这台机器。重启之后,还是无济于事。
问题分析
消费这个topic的代码比较简单,主要就是消费topic数据,然后进行判断在进行数据库操作。运维通过kafka-eagle找到积压的topic,发现该topic的某个分区积压了几十万条的消息。
最后,通过查看日志发现,由于数据写入到MySQL中报错,导致消费分区的offset一自没有提交,所以数据积压严重。
11.5.2.2、因为网络延迟消费失败
问题描述
基于Kafka开发的系统平稳运行了两个月,突然某天发现某个topic中的消息出现数据积压,大概有几万条消息没有被消费。
问题分析
通过查看应用程序日志发现,有大量的消费超时失败。后查明原因,因为当天网络抖动,通过查看Kafka的消费者超时配置为50ms,随后,将消费的时间修改为500ms后问题解决。
11.6、延迟队列
延迟队列的应用场景:在订单创建成功后如果超过30分钟没有付款,则需要取消订单,此时可用延时队列来实现。
-
创建多个topic,每个topic表示延时的间隔
- topic_5s: 延时5s执行的队列
- topic_1m:延时1分钟执行的队列
- topic_30m:延时30分钟执行的队列
-
消息发送者发送消息到相应的topic,并带上消息的发送时间
-
消费者订阅相应的topic,消费时轮询消费整个topic中的消息
- 如果消息的发送时间,和消费的当前时间超过预设的值,比如30分钟
- 如果消息的发送时间,和消费的当前时间没有超过预设的值,则不消费当前的offset及之后的offset的所有消息都消费。
- 下次继续消费该offset处的消息,判断时间是否已满足预设值
十二、Kafka中数据清理(Log Deletion)
Kafka的消息存储在磁盘中,为了控制磁盘占用空间,Kafka需要不断地对过去的一些消息进行清理工作。Kafka的每个分区都有很多的日志文件,这样也是为了方便进行日志的清理。在Kafka中,提供两种日志清理方式:
- 日志删除(Log Deletion):按照指定的策略直接删除不符合条件的日志。
- 日志压缩(Log Compaction):按照消息的key进行整合,有相同key的但有不同value值,只保留最后一个版本。
在Kafka的broker或topic配置中:
配置项 配置值 说明 log.cleaner.enable true(默认) 开启自动清理日志功能 log.cleanup.policy delete(默认) 删除日志 log.cleanup.policy compaction 压缩日志 log.cleanup.policy delete,compact 同时支持删除、压缩 12.1、日志删除
日志删除是以段(segment日志)为单位来进行定期清理的。
12.1.1、定时日志删除任务
Kafka日志管理器中会有一个专门的日志删除任务来定期检测和删除不符合保留条件的日志分段文件,这个周期可以通过broker端参数log.retention.check.interval.ms来配置,默认值为300,000,即5分钟。当前日志分段的保留策略有3种:
- 基于时间的保留策略
- 基于日志大小的保留策略
- 基于日志起始偏移量的保留策略
12.1.2、基于时间的保留策略
以下三种配置可以指定如果Kafka中的消息超过指定的阈值,就会将日志进行自动清理:
- log.retention.hours
- log.retention.minutes
- log.retention.ms
其中,优先级为 log.retention.ms > log.retention.minutes > log.retention.hours。默认情况,在broker中,配置如下:
log.retention.hours=168
也就是,默认日志的保留时间为168小时,相当于保留7天。
删除日志分段时:
- 从日志文件对象中所维护日志分段的跳跃表中移除待删除的日志分段,以保证没有线程对这些日志分段进行读取操作
- 将日志分段文件添加上“.deleted”的后缀(也包括日志分段对应的索引文件)
- Kafka的后台定时任务会定期删除这些“.deleted”为后缀的文件,这个任务的延迟执行时间可以通过file.delete.delay.ms参数来设置,默认值为60000,即1分钟。
12.1.2.1、设置topic 5秒删除一次
设置topic的删除策略
- key: retention.ms
- value: 5000
12.1.3、基于日志大小的保留策略
日志删除任务会检查当前日志的大小是否超过设定的阈值来寻找可删除的日志分段的文件集合。可以通过broker端参数 log.retention.bytes 来配置,默认值为-1,表示无穷大。如果超过该大小,会自动将超出部分删除。
注意:
log.retention.bytes 配置的是日志文件的总大小,而不是单个的日志分段的大小,一个日志文件包含多个日志分段。
12.1.4、基于日志起始偏移量保留策略
每个segment日志都有它的起始偏移量,如果起始偏移量小于 logStartOffset,那么这些日志文件将会标记为删除。
12.2 日志压缩(Log Compaction)
Log Compaction是默认的日志删除之外的清理过时数据的方式。它会将相同的key对应的数据只保留一个版本。
- Log Compaction执行后,offset将不再连续,但依然可以查询Segment
- Log Compaction执行前后,日志分段中的每条消息偏移量保持不变。Log Compaction会生成一个新的Segment文件
- Log Compaction是针对key的,在使用的时候注意每个消息的key不为空
- 基于Log Compaction可以保留key的最新更新,可以基于Log Compaction来恢复消费者的最新状态
十三、Kafka配额限速机制(Quotas)
生产者和消费者以极高的速度生产/消费大量数据或产生请求,从而占用broker上的全部资源,造成网络IO饱和。有了配额(Quotas)就可以避免这些问题。Kafka支持配额管理,从而可以对Producer和Consumer的produce&fetch操作进行流量限制,防止个别业务压爆服务器。
13.1、限制producer端速率
为所有client id设置默认值,以下为所有producer程序设置其TPS不超过1MB/s,即1048576/s,命令如下:
bin/kafka-configs.sh --zookeeper node1.angyan.cn:2181 --alter --add-config 'producer_byte_rate=1048576' --entity-type clients --entity-default
运行基准测试,观察生产消息的速率
bin/kafka-producer-perf-test.sh --topic test --num-records 500000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=node1.angyan.cn:9092,node2.angyan.cn:9092,node3.angyan.cn:9092 acks=1
结果:
50000 records sent, 1108.156028 records/sec (1.06 MB/sec)
13.2、限制consumer端速率
对consumer限速与producer类似,只不过参数名不一样。
为指定的topic进行限速,以下为所有consumer程序设置topic速率不超过1MB/s,即1048576/s。命令如下:
bin/kafka-configs.sh --zookeeper node1.angyan.cn:2181 --alter --add-config 'consumer_byte_rate=1048576' --entity-type clients --entity-default
运行基准测试:
bin/kafka-consumer-perf-test.sh --broker-list node1.angyan.cn:9092,node2.angyan.cn:9092,node3.angyan.cn:9092 --topic test --fetch-size 1048576 --messages 500000
结果为:
MB.sec:1.0743
13.3、取消Kafka的Quota配置
使用以下命令,删除Kafka的Quota配置
bin/kafka-configs.sh --zookeeper node1.angyan.cn:2181 --alter --delete-config 'producer_byte_rate' --entity-type clients --entity-default bin/kafka-configs.sh --zookeeper node1.angyan.cn:2181 --alter --delete-config 'consumer_byte_rate' --entity-type clients --entity-default
十四、Kafka实战
14.1、生产者
14.1.1、导入依赖
org.springframework.kafka spring-kafka 14.1.2、配置文件
spring.kafka.angyan.bootstrap-servers=112.126.74.249:9092,112.126.74.249:9093 spring.kafka.angyan.clientId=TEST_DEMO_MESSAGE spring.kafka.angyan.producer.compressionType=gzip spring.kafka.angyan.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.angyan.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer # 提交延时 spring.kafka.angyan.linger.ms=1000 spring.kafka.angyan.template.defaultTopic=TEST_DEMO_TOPIC
14.1.3、发送消息
@Service public class KafkaServiceImpl implements KafkaService { @Qualifier("kafkaTemplate") @Autowired private KafkaTemplate kafkaRecordTemplate; @Override public String sendMessage(String key, byte[] bytes) { Map header = new HashMap(); header.put(KafkaHeaders.KEY,key); MessageHeaders messageHeaders = new MessageHeaders(header); Message message = MessageBuilder.createMessage(bytes, messageHeaders); kafkaRecordTemplate.send(message); return null; } }
14.2、消费者
14.2.1、配置类
pring.kafka.angyan.bootstrap-servers=112.126.74.249:9092,112.126.74.249:9093 spring.kafka.angyan.consumer.group.id=TEST_DEMO_MESSAGE spring.kafka.angyan.consumer.clientId=TEST_DEMO_MESSAGE spring.kafka.angyan.defaultTopic=TEST_DEMO_TOPIC spring.kafka.angyan.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.angyan.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
14.2.2、消费消息
@Component public class KafkaCustomer { @KafkaListener(topics = "${spring.kafka.angyan.defaultTopic}",containerFactory = "kafkaTemplateConsumer") public void testKafka(ConsumerRecord
record){ //处理业务逻辑 } }
-
-
-
-
- Kafka的数据组织结构
-
-
-
- 高级API就是直接让Kafka帮助管理、处理分配、数据
-
- 在Kafka中可以开启幂等性
- Kafka生产者生产消息到partition,如果直接发送消息,kafka会将消息保存到分区中,但Kafka会返回一个ack给生产者,表示当前操作是否成功,是否已经保存了这条消息。如果ack响应的过程失败了,此时生产者会重试,继续发送没有发送成功的消息,Kafka又会保存一条一模一样的消息
- 生产者消息重复问题
- 消费者负责从broker的topic中拉取数据,并自己进行处理
- 生产者负责将数据推送给broker的topic
- 服务器接受单个消息的最大大小,默认1000012 约等于976.6KB。
-
- 安装Kafka集群,可以测试以下
- 生产端发送消息
- 查看主题详情
- 展示所有主题
- 首先创建一个主题
- 执行一键启动、一键关闭
- 给start-kafka.sh、stop-kafka.sh配置执行权限
- 编写stop-kafka.sh脚本
- 编写start-kafka.sh脚本
- 准备slave配置文件,用于保存要启动哪几个节点上的kafka
- 在节点1中创建 /export/onekey 目录
- 启动服务器
- 配置KAFKA_HOME环境变量
- 将安装好的kafka复制到另外两台服务器
- 修改 server.properties
- 将Kafka的安装包上传到虚拟机,并解压
- 启动Zookeeper命令:bin/zkServer.sh start
- 测试jdk
-
- 生产者、消费者模型
- 可以使用消息队列作为临时存储,或者一种通信管道
- 因为消息队列是低延迟、高可靠、高吞吐的,可以应对大量并发
- 使用消息队列可以将系统进行解耦合,现在第一个微服务可以将消息放入到消息队列中,另一个微服务可以从消息队列中把消息取出来进行处理。进行系统解耦
- 原先一个微服务是通过接口(HTTP)调用另一个微服务,这时候耦合很严重,只要接口发生变化就会导致系统不可用
- 比较常见的:发送短信验证码、发送邮件
- 可以将一些比较耗时的操作放在其他系统中,通过消息队列将需要进行处理的消息进行存储,其他系统可以消费消息队列中的数据
猜你喜欢
网友评论
- 搜索
- 最新文章
- 热门文章