部署zk
https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.9.1/apache-zookeeper-3.9.1.tar.gz
tar -xf apache-zookeeper-3.9.1.tar.gz -C /apps cd /apps/ && ln -s apache-zookeeper-3.9.1 zookeeper 修改配置 ```bash grep -vE '^$|^#' conf/zoo.cfg tickTime=2000 initLimit=10 syncLimit=5 dataDir=/apps/zookeeper/data clientPort=2181 autopurge.snapRetainCount=3 autopurge.purgeInterval=24 server.1=192.168.1.60:2888:3888 #server.后面数字不要一样 server.2=192.168.1.61:2888:3888 server.3=192.168.1.62:2888:3888
root@ubuntu20:/apps/zookeeper/bin# cat /etc/profile.d/zk.sh
#!/bin/bash
export PATH=/apps/zookeeper/bin/:$PATH
mkdir /data
root@ubuntu20:/apps/zookeeper# echo 3 > data/myid
拷贝到其他节点并修改myid
配置环境变量
root@ubuntu20:/apps/zookeeper/bin# source /etc/profile.d/zk.sh
scp /etc/profile.d/zk.sh 192.168.1.62:/etc/profile.d
scp /etc/profile.d/zk.sh 192.168.1.61:/etc/profile.d
source /etc/profile.d/zk.sh
启动zk
zkServer.sh start
查看状态
root@ubuntu20:/apps/zookeeper# zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /apps/zookeeper/bin/…/conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: leader
访问zk
zkCli.sh -server 192.168.1.60:2181
部署kafka集群
192.168.1.60
192.168.1.61
192.168.1.62
下载解压
修改 配置文件 root@ubuntu20:/apps/kafka/config# cat server.properties broker.id=62 #每个节点不要一样 listeners=PLAINTEXT://192.168.1.62:9092 #监听的地址:配置为本机ip ,注意格式 zookeeper.connect=192.168.1.60:2181,192.168.1.61:2181,192.168.1.62:2181 #修改配置zk的地址
所有节点配置环境变量
cat /etc/profile.d/kafka.sh #!/bin/bash export KAFKA_HOME=/apps/kafka export PATH=$PATH:$KAFKA_HOME/bin source /etc/profile.d/kafka.sh root@ubuntu20:/apps/kafka/config# scp /etc/profile.d/kafka.sh 192.168.1.62:/etc/profile.d/
启动
kafka-server-start.sh -daemon /apps/kafka/config/server.properties
查看端口
vi /lib/systemd/system/kafka.service [Unit] Description=Apache Kafka After=network.target [Service] Type=simple #Environment=JAVA_HOME=/data/server/java PIDFile=/apps/kafka/kafka.pid ExecStart=/apps/kafka/bin/kafka-server-start.sh /apps/kafka/config/server.properties ExecStop=/bin/kill -TERM ${MAINPID} Restart=always RestartSec=20 [Install] WantedBy=multi-user.target
创建topic
root@ubuntu20:/apps/kafka/data# kafka-topics.sh --create --topic wang --bootstrap-server 192.168.1.60:9092 --partitions 3 --replication-factor 2 Created topic wang.
查看topic
kafka-topics.sh --bootstrap-server 192.168.1.60:9092 --list
查看topic详细信息
root@ubuntu20:/apps/kafka# kafka-topics.sh --bootstrap-server 192.168.1.60:9092 --topic luo --describe Topic: luo TopicId: 3Gm_DopHQb-HgvLih6WuiQ PartitionCount: 1 ReplicationFactor: 1 Configs: Topic: luo Partition: 0 Leader: 60 Replicas: 60 Isr: 60 root@ubuntu20:/apps/kafka# kafka-topics.sh --bootstrap-server 192.168.1.60:9092 --topic wang --describe Topic: wang TopicId: eZ7lTktaQ1mVarYE-Q_K1A PartitionCount: 3 ReplicationFactor: 2 Configs: Topic: wang Partition: 0 Leader: 61 Replicas: 61,60 Isr: 61,60 Topic: wang Partition: 1 Leader: 60 Replicas: 60,62 Isr: 60,62 Topic: wang Partition: 2 Leader: 62 Replicas: 62,61 Isr: 62,61 读写都在leader节点上
启动生产者
kafka-console-producer.sh --topic luo --bootstrap-server 192.168.1.60:9092
该命令的目的是启动一个 Kafka 生产者,将消息发送到 luo 主题,并使用 192.168.1.60:9092 作为 Kafka 集群的地址。
启动消费者 --from-beginning 从头开始拿数据
kafka-console-consumer.sh --topic luo --from-beginning --bootstrap-server 192.168.1.60:9092
修改分区数为5
kafka-topics.sh --bootstrap-server 192.168.1.60:9092 --topic wang --alter --partitions 5
删除topic
kafka-topics.sh --delete --bootstrap-server=192.168.1.60:9092 --topic test
kafka依赖zk提供元数据储存
分区:
副本:
kafka-producer-perf-test.sh --topic test --num-records 10000000 --throughput -1 --record-size 1024 --producer-props bootstrap.servers=192.168.1.60:9092 acks=-1 linger.ms=2000 compression.type=lz4 32532 records sent, 6499.9 records/sec (6.35 MB/sec), 1536.5 ms avg latency, 2338.0 ms max latency. 32532 records sent:发送了 32532 条消息。 6499.9 records/sec:平均每秒发送了 6499.9 条消息。 (6.35 MB/sec):发送速度为每秒 6.35 MB 的数据量。 1536.5 ms avg latency:平均延迟为 1536.5 毫秒,这表示从消息发送到确认接收所需的平均时间。 2338.0 ms max latency:最大延迟为 2338.0 毫秒,这表示消息发送到确认接收期间的最长时间。
消费者组案列
启动生产者
kafka-console-producer.sh --topic oldboy --broker-list 192.168.1.60:9092
基于配置文件启动消费者
root@kafka01:~# kafka-console-consumer.sh --bootstrap-server 192.168.1.60:9092 --topic oldboy --consumer.config /apps/kafka/config/consumer.properties
ll
.dd
基于命令行启动消费者
root@kafka01:~# kafka-console-consumer.sh --bootstrap-server 192.168.1.60:9092 --topic oldboy --consumer-property group.id=luohuiwen
上面的案例启动了1个生产者,2个消费者。但同一时刻只有一个消费者接收到生产者的消息,不可能2个消费者同时接收到生产者发送的消息哟~
kafka高效读写数据的底层原理:顺序写磁盘 零拷贝技术 异步刷盘
堆内存调整
root@kafka01:/apps/kafka/bin# grep export kafka-server-start.sh export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties" # export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" #修改此行就行 # export KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" export KAFKA_HEAP_OPTS="-server -Xmx256M -Xms256M -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads =8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
添加修改端口 这个是jmx的端口,后面的监控要用到
部署
安装数据库创建数据库授权
下载https://github.com/smartloli/kafka-eagle-bin/archive/v3.0.1.tar.gz
解压修改配置
efak.zk.cluster.alias=cluster1,cluster2 cluster1.zk.list=192.168.1.60:2181,192.168.1.61:2181,192.168.1.62:2181 #填写kafka地址 efak.url=jdbc:mysql://127.0.0.1:3306/oldboyedu_kafka? useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull #oldboyedu_kafka 创建的数据库 efak.username=kafka #数据库授权用户 efak.password=123456
访问
压力测试
kafka-consumer-perf-test.sh --broker-list 192.168.1.60:9092,192.168.1.61:9092,192.168.1.62:9092 --topic luohuiwen --messages 100000000 --fetch-size 1048576 --threads 10
告警
谁创建了topic就发生通知
调优
1.硬件架构选择
cpu:建议选择核心数多的优于主bi,
内存
磁盘
网卡
加大文件描述符
内核优化
export KAFKA_HEAP_OPTS="-server -Xmx256M -Xms256M -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads =8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70" export JMX_PORT="8888"
-server -Xmx256M
-Xms256M
-XX:PermSize=128m
-XX:+UseG1GC 表示让jvm使用G1垃圾收集器
-XX:MaxGCPauseMillis=200 设置每次年轻代垃圾回收的最长时间为200ms,如果时间无法满足,jvm会自动调整年轻代大小,以满足此值
-XX:ParallelGCThreads=8 设置并行垃圾回收的线程数,此值可以设置与机器处理数相等
-XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
该参数可以指定当整个堆使用率到达多少时,触发并标记周期的执行。默认值是45,即当堆的使用45%,执行并标记周期,该值一旦设置,始终不会被G1修改
broken调优
auto.create.topics.enable=false #是否允许自动创建topic
log.retention.hours=168 # 日志保留时间
猜你喜欢
- 16小时前基于 Eureka 的 Ribbon 负载均衡实现原理【SpringCloud 源码分析】
- 16小时前计算机毕业设计 基于Hadoop的物品租赁系统的设计与实现 Java实战项目 附源码+文档+视频讲解
- 16小时前前端超好玩的小游戏合集来啦--周末两天用html5做一个3D飞行兔子萝卜小游戏
- 10小时前魔方六面还原公式(魔方六面还原公式口诀)
- 9小时前鸿蒙系统什么时候能用的简单介绍
- 3小时前除夕夜守岁打一歇后语(除夕夜守岁打歇后语是什么)
- 1小时前路考注意事项(路考注意事项和要领)
- 1小时前王者荣耀赛季多久更新(王者荣耀赛季多久更新一次?)
- 1小时前dnfcc是什么意思(dnfc是什么职业)
- 1小时前等位基因(等位基因和非等位基因的概念)
网友评论
- 搜索
- 最新文章
- 热门文章