Kafka安装配置
安装
kafka官网https://kafka.apache.org/downloads
wget https://archive.apache.org/dist/kafka/2.7.2/kafka_2.12-2.7.2.tgz
tar xf kafka_2.12-2.7.2.tgz -C /usr/local/ #将kafka安装到了/usr/local目录下
mv /usr/local/kafka_2.12-2.7.2 /usr/local/kafka #新建存放日志和数据的文件夹
mkdir /usr/local/kafka/logs
配置
afka的主配置文件为/usr/local/kafka/config/server.properties,这里以节点kafkazk1为例,重点介绍一些常用配置项的含义:
broker.id=1 port=19092 #当前kafka对外提供服务的端口默认是9092 host.name=10.0.0.6 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。 listeners=PLAINTEXT://10.0.0.6:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能 socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘 socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小 log.dirs=/usr/local/kafka/logs #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个 num.partitions=6 #默认的分区数,一个topic默认1个分区数 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 default.replication.factor=2 #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务 message.max.byte=5242880 #消息保存的最大值5M transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 replica.fetch.max.bytes=5242880 #取消息的最大直接数 log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天 log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件 log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除 log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能 log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件 zookeeper.connect=localhost:2181 #不是集群,所以可以写成localhost #zookeeper.connect=10.0.0.6:2181,10.0.0.7:2181,10.0.0.8:2181 #集群 zookeeper.connection.timeout.ms=18000 group.initial.rebalance.delay.ms=0 auto.create.topics.enable=true delete.topic.enable=true
每个配置项含义如下:
- broker.id:每一个broker在集群中的唯一表示,要求是正数。当该服务器的IP地址发生改变时,broker.id没有变化,则不会影响consumers的消息情况。
- listeners:设置kafka的监听地址与端口,可以将监听地址设置为主机名或IP地址,这里将监听地址设置为IP地址。
- log.dirs:这个参数用于配置kafka保存数据的位置,kafka中所有的消息都会存在这个目录下。可以通过逗号来指定多个路径, kafka会根据最少被使用的原则选择目录分配新的parition。需要注意的是,kafka在分配parition的时候选择的规则不是按照磁盘的空间大小来定的,而是根据分配的 parition的个数多小而定。
- num.partitions:这个参数用于设置新创建的topic有多少个分区,可以根据消费者实际情况配置,配置过小会影响消费性能。这里配置6个。
- log.retention.hours:这个参数用于配置kafka中消息保存的时间,还支持log.retention.minutes和 log.retention.ms配置项。这三个参数都会控制删除过期数据的时间,推荐使用log.retention.ms。如果多个同时设置,那么会选择最小的那个。
- log.segment.bytes:配置partition中每个segment数据文件的大小,默认是1GB,超过这个大小会自动创建一个新的segment file。
- zookeeper.connect:这个参数用于指定zookeeper所在的地址,它存储了broker的元信息。 这个值可以通过逗号设置多个值,每个值的格式均为:hostname:port/path,每个部分的含义如下:
- hostname:表示zookeeper服务器的主机名或者IP地址,这里设置为IP地址。
- port: 表示是zookeeper服务器监听连接的端口号。
- /path:表示kafka在zookeeper上的根目录。如果不设置,会使用根目录。
- auto.create.topics.enable:这个参数用于设置是否自动创建topic,如果请求一个topic时发现还没有创建, kafka会在broker上自动创建一个topic,如果需要严格的控制topic的创建,那么可以设置auto.create.topics.enable为false,禁止自动创建topic。
- delete.topic.enable:在0.8.2版本之后,Kafka提供了删除topic的功能,但是默认并不会直接将topic数据物理删除。如果要从物理上删除(即删除topic后,数据文件也会一同删除),就需要设置此配置项为true。
设置环境变量
$ vim /etc/profile export KAFKA_HOME=/usr/local/kafka export PATH=$PATH:$KAFKA_HOME/bin #生效 $ . /etc/profile
启动脚本
$ vim /usr/lib/systemd/system/kafka.service [Unit] Description=Apache Kafka server (broker) After=network.target zookeeper.service [Service] Type=simple User=root Group=root ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh Restart=on-failure [Install] WantedBy=multi-user.target $ systemctl daemon-reload
启动kafka
在启动kafka集群前,需要确保ZooKeeper集群已经正常启动。接着,依次在kafka各个节点上执行如下命令即可。这里将kafka放到后台运行,启动后,会在启动kafka的当前目录下生成一个nohup.out文件,可通过此文件查看kafka的启动和运行状态。通过jps指令,可以看到有个Kafka标识,这是kafka进程成功启动的标志。
$ cd /usr/local/kafka $ nohup bin/kafka-server-start.sh config/server.properties & # 或者 $ systemctl start kafka $ jps 21840 Kafka 15593 Jps 15789 QuorumPeerMain
Kafka相关知识
下图展示了Kafka的相关术语以及之间的关系:
上图中一个topic配置了3个partition。Partition1有两个offset:0和1。Partition2有4个offset。Partition3有1个offset。副本的id和副本所在的机器的id恰好相同。
如果一个topic的副本数为3,那么Kafka将在集群中为每个partition创建3个相同的副本。集群中的每个broker存储一个或多个partition。多个producer和consumer可同时生产和消费数据。
brokerKafka 集群包含一个或多个服务器,服务器节点称为broker。
broker存储topic的数据。如果某topic有N个partition,集群有N个broker,那么每个broker存储该topic的一个partition。
如果某topic有N个partition,集群有(N+M)个broker,那么其中有N个broker存储该topic的一个partition,剩下的M个broker不存储该topic的partition数据。
如果某topic有N个partition,集群中broker数目少于N个,那么一个broker存储该topic的一个或多个partition。在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致Kafka集群数据不均衡。
Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
类似于数据库的表名
Partition
topic中的数据分割为一个或多个partition。每个topic至少有一个partition。每个partition中的数据使用多个segment文件存储。partition中的数据是有序的,不同partition间的数据丢失了数据的顺序。如果topic有多个partition,消费数据时就不能保证数据的顺序。在需要严格保证消息的消费顺序的场景下,需要将partition数目设为1。
Producer
生产者即数据的发布者,该角色将消息发布到Kafka的topic中。broker接收到生产者发送的消息后,broker将该消息追加到当前用于追加数据的segment文件中。生产者发送的消息,存储到一个partition中,生产者也可以指定数据存储的partition。
Consumer
消费者可以从broker中读取数据。消费者可以消费多个topic中的数据。
Consumer Group
每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。
Leader
每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition。
Follower
Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。如果Leader失效,则从Follower中选举出一个新的Leader。当Follower与Leader挂掉、卡住或者同步太慢,leader会把这个follower从“in sync replicas”(ISR)列表中删除,重新创建一个Follower。
相关参考链接:Kafka详解(包括kafka集群搭建)-CSDN博客
https://www.cnblogs.com/duanxz/p/4492870.html
Java kakfa配置application.yml
spring: kafka: bootstrap-servers: 10.0.0.6:9092 producer: acks: all retries: 0 batch-size: 16384 buffer-memory: 33554432 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer properties: linger.ms: 1
pom.xml依赖
org.springframework.cloud spring-cloud-starter-bus-kafkaorg.springframework.integration spring-integration-kafkaorg.springframework.integration spring-integration-kafka${spring-integration.version} 发送kafka消息工具类实现
package com.test.util; /*发送kafka消息工具类*/ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; import com.test.common.util.JsonUtil; import lombok.extern.slf4j.Slf4j; @Slf4j @Component @EnableKafka public class KafkaSender { @Autowired(required = false) private KafkaTemplate
template; @Value("${kafka.debug:true}") private boolean kafka_send_debug; public void sendMsgAsyncAndLog(String topic, Object msg) { if (kafka_send_debug) { log.info("sendMsgAsyncAndLog to {} msg: {}", topic, JsonUtil.objToStr(msg)); } sendMsgAsync(topic, msg, new ListenableFutureCallback >() { @Override public void onSuccess(SendResult result) { log.info("sendMsgAysnc suc! msg:{}", msg); } @Override public void onFailure(Throwable ex) { log.error(ex.getMessage(), ex); log.error("sendMsgAysnc error! msg:{}", msg); } }); } public void sendMsgAsync(String topic, Object msg, ListenableFutureCallback > callback) { if (this.template != null) { ListenableFuture > future = this.template.send(topic, msg); future.addCallback(callback); } } } 监听topic并消费示例
package com.receive.test.listener; /*监听kafka topic消息并消费*/ import java.util.Optional; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import com.test.route.RouteDeal; import lombok.extern.slf4j.Slf4j; @Slf4j @Component public class TestKafkaListener { @Autowired private RouteDeal routeDeal; @KafkaListener(topics = { "topic.test_topic" }) public void kakfaListenerDeal(ConsumerRecord, String> record) { try { Optional
kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { String message = kafkaMessage.get(); if (Boolean.getBoolean("debug_log")) { log.info("receive log: {}", message); } routeDeal.putEle(message); } } catch (Exception e) { log.error(e.getMessage(), e); } } } Kafka调试
kefka提供了多个命令用于查看、创建、修改、删除topic信息,也可以通过命令测试如何生产消息、消费消息等,这些命令位于kafka安装目录的bin目录下,这里是/usr/local/kafka/bin,包括createKafkaOnly.sh createKafka.sh deleteKafka.sh 创建与删除topic。
登录任意一台kafka集群节点,切换到此目录下,即可进行命令操作。
下面列举kafka的一些常用命令的使用方法。
(1)显示topic列表
#kafka-topics.sh --zookeeper 10.0.0.6:2181,10.0.0.7:2181,10.0.0.8:2181 --list $ kafka-topics.sh --zookeeper 10.0.0.6:2181 --list topic123
(2)创建一个topic,并指定topic属性(副本数、分区数等)
#kafka-topics.sh --create --zookeeper 10.0.0.6:2181,10.0.0.7:2181,10.0.0.8:2181 --replication-factor 1 --partitions 3 --topic topic123 $ kafka-topics.sh --create --zookeeper 10.0.0.6:2181 --replication-factor 1 --partitions 3 --topic topic123 Created topic topic123. #--replication-factor表示指定副本的个数
(3)查看某个topic的状态
#kafka-topics.sh --describe --zookeeper 10.0.0.6:2181,10.0.0.7:2181,10.0.0.8:2181 --topic topic123 $ kafka-topics.sh --describe --zookeeper 10.0.0.6:2181 --topic topic123 Topic: topic123 PartitionCount: 3 ReplicationFactor: 1 Configs: Topic: topic123 Partition: 0 Leader: 1 Replicas: 1 Isr: 1 Topic: topic123 Partition: 1 Leader: 1 Replicas: 1 Isr: 1 Topic: topic123 Partition: 2 Leader: 1 Replicas: 1 Isr: 1
(4)生产消息 阻塞状态
#kafka-console-producer.sh --broker-list 10.0.0.6:9092,10.0.0.7:9092,10.0.0.8:9092 --topic topic123 $ kafka-console-producer.sh --broker-list 10.0.0.6:9092 --topic topic123
(5)消费消息 阻塞状态
#kafka-console-consumer.sh --bootstrap-server 10.0.0.6:9092,10.0.0.7:9092,10.0.0.8:9092 --topic topic123 $ kafka-console-consumer.sh --bootstrap-server 10.0.0.6:9092 --topic topic123 #从头开始消费消息 #kafka-console-consumer.sh --bootstrap-server 10.0.0.6:9092 --topic topic123 --from-beginning $ kafka-console-consumer.sh --bootstrap-server 10.0.0.6:9092,10.0.0.7:9092,10.0.0.8:9092 --topic topic123 --from-beginning
(6)删除topic
#kafka-topics.sh --delete --zookeeper 10.0.0.6:2181,10.0.0.7:2181,10.0.0.8:2181 --topic topic123 $ kafka-topics.sh --delete --zookeeper 10.0.0.6:2181 --topic topic123
(7)其他
#kafka查看topic里n条消息 sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_name --from-beginning --max-messages n #模拟发送数据 /home/kafka/software/kafka/bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic topic_name #消费数据 /home/kafka/software/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic topic_name --from-beginning #查看topic列表 /home/kafka/software/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181
kafka启动常见问题
1.端口被占用Socket server failed to bind to 0.0.0.0:9092: Address already in use.
解决办法:netstat -nltp | grep 9092 找到并kill掉对应的进程,重新启动kafka进程。
2.配置文件错误,如示例将log.retention.hours的值配置成了字母,实际应该是数值类型,导致kafka无法启动
解决办法:修改错误的配置,重新启动kafka进程
3.节点已注册
从kafka的server.log日志中发现如下信息,这种方式一般是因为磁盘问题,回想下是否进行过磁盘的重分配,将原来其它节点的磁盘,分配给了现在的节点,这种现象一般发生在单机版kafka多broker节点的场景
此时如果将磁盘分配还原成最开始的分配方式并启动kafka进程后,会出现下面第二张图的问题,出现了同一个kafka主题分区有两个目录的情况
解决办法:如果是初次部署,还没有流量进来,先卸载kafka节点,再重新部署;
其它情况,先停kafka服务,删除两个相同的目录中的一个,只保留其中一个目录即可,然后重启kafka进程
4. 当日志出现类似于这样的ERROR信息时,可能是出现了脏副本
Exiting because log truncation is not allowed for partition test01-1, current leader's latest offset 28025 is less than replica's latest offset 28402
出现这样的错误信息后,kafka启动会失败,这时,server.properties文件中添加参数unclean.leader.election.enable=true,启动kafka服务即可以启动
5. 当出现下面的报错时,需要检查目录的权限
FATAL Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) java.io.IOException: Permission denied
6. 当出现下面的报错时,需要检查磁盘问题
org.apache.kafka.common.KafkaException: java.io.IOException: Input/output error
猜你喜欢
网友评论
- 搜索
- 最新文章
- 热门文章