目录
一、Kafka中数据清理(Log Deletion)
1.1、日志删除
1.1.1、定时日志删除任务
1.1.2、基于时间的保留策略
1.1.2.1、设置topic 5秒删除一次
1.1.3、基于日志大小的保留策略
1.1.4、基于日志起始偏移量保留策略
1.2 日志压缩(Log Compaction)
二、Kafka配额限速机制(Quotas)
2.1、限制producer端速率
2.2、限制consumer端速率
2.3、取消Kafka的Quota配置
三、Kafka实战
3.1、生产者
3.1.1、导入依赖
3.1.2、配置文件
3.1.3、发送消息
3.2、消费者
3.2.1、配置类
3.2.2、消费消息
一、Kafka中数据清理(Log Deletion)
Kafka的消息存储在磁盘中,为了控制磁盘占用空间,Kafka需要不断地对过去的一些消息进行清理工作。Kafka的每个分区都有很多的日志文件,这样也是为了方便进行日志的清理。在Kafka中,提供两种日志清理方式:
- 日志删除(Log Deletion):按照指定的策略直接删除不符合条件的日志。
- 日志压缩(Log Compaction):按照消息的key进行整合,有相同key的但有不同value值,只保留最后一个版本。
在Kafka的broker或topic配置中:
配置项 配置值 说明 log.cleaner.enable true(默认) 开启自动清理日志功能 log.cleanup.policy delete(默认) 删除日志 log.cleanup.policy compaction 压缩日志 log.cleanup.policy delete,compact 同时支持删除、压缩 1.1、日志删除
日志删除是以段(segment日志)为单位来进行定期清理的。
1.1.1、定时日志删除任务
Kafka日志管理器中会有一个专门的日志删除任务来定期检测和删除不符合保留条件的日志分段文件,这个周期可以通过broker端参数log.retention.check.interval.ms来配置,默认值为300,000,即5分钟。当前日志分段的保留策略有3种:
- 基于时间的保留策略
- 基于日志大小的保留策略
- 基于日志起始偏移量的保留策略
1.1.2、基于时间的保留策略
以下三种配置可以指定如果Kafka中的消息超过指定的阈值,就会将日志进行自动清理:
- log.retention.hours
- log.retention.minutes
- log.retention.ms
其中,优先级为 log.retention.ms > log.retention.minutes > log.retention.hours。默认情况,在broker中,配置如下:
log.retention.hours=168
也就是,默认日志的保留时间为168小时,相当于保留7天。
删除日志分段时:
- 从日志文件对象中所维护日志分段的跳跃表中移除待删除的日志分段,以保证没有线程对这些日志分段进行读取操作
- 将日志分段文件添加上“.deleted”的后缀(也包括日志分段对应的索引文件)
- Kafka的后台定时任务会定期删除这些“.deleted”为后缀的文件,这个任务的延迟执行时间可以通过file.delete.delay.ms参数来设置,默认值为60000,即1分钟。
1.1.2.1、设置topic 5秒删除一次
设置topic的删除策略
- key: retention.ms
- value: 5000
1.1.3、基于日志大小的保留策略
日志删除任务会检查当前日志的大小是否超过设定的阈值来寻找可删除的日志分段的文件集合。可以通过broker端参数 log.retention.bytes 来配置,默认值为-1,表示无穷大。如果超过该大小,会自动将超出部分删除。
注意:
log.retention.bytes 配置的是日志文件的总大小,而不是单个的日志分段的大小,一个日志文件包含多个日志分段。
1.1.4、基于日志起始偏移量保留策略
每个segment日志都有它的起始偏移量,如果起始偏移量小于 logStartOffset,那么这些日志文件将会标记为删除。
1.2 日志压缩(Log Compaction)
Log Compaction是默认的日志删除之外的清理过时数据的方式。它会将相同的key对应的数据只保留一个版本。
- Log Compaction执行后,offset将不再连续,但依然可以查询Segment
- Log Compaction执行前后,日志分段中的每条消息偏移量保持不变。Log Compaction会生成一个新的Segment文件
- Log Compaction是针对key的,在使用的时候注意每个消息的key不为空
- 基于Log Compaction可以保留key的最新更新,可以基于Log Compaction来恢复消费者的最新状态
二、Kafka配额限速机制(Quotas)
生产者和消费者以极高的速度生产/消费大量数据或产生请求,从而占用broker上的全部资源,造成网络IO饱和。有了配额(Quotas)就可以避免这些问题。Kafka支持配额管理,从而可以对Producer和Consumer的produce&fetch操作进行流量限制,防止个别业务压爆服务器。
2.1、限制producer端速率
为所有client id设置默认值,以下为所有producer程序设置其TPS不超过1MB/s,即1048576/s,命令如下:
bin/kafka-configs.sh --zookeeper node1.angyan.cn:2181 --alter --add-config 'producer_byte_rate=1048576' --entity-type clients --entity-default
运行基准测试,观察生产消息的速率
bin/kafka-producer-perf-test.sh --topic test --num-records 500000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=node1.angyan.cn:9092,node2.angyan.cn:9092,node3.angyan.cn:9092 acks=1
结果:
50000 records sent, 1108.156028 records/sec (1.06 MB/sec)
2.2、限制consumer端速率
对consumer限速与producer类似,只不过参数名不一样。
为指定的topic进行限速,以下为所有consumer程序设置topic速率不超过1MB/s,即1048576/s。命令如下:
bin/kafka-configs.sh --zookeeper node1.angyan.cn:2181 --alter --add-config 'consumer_byte_rate=1048576' --entity-type clients --entity-default
运行基准测试:
bin/kafka-consumer-perf-test.sh --broker-list node1.angyan.cn:9092,node2.angyan.cn:9092,node3.angyan.cn:9092 --topic test --fetch-size 1048576 --messages 500000
结果为:
MB.sec:1.0743
2.3、取消Kafka的Quota配置
使用以下命令,删除Kafka的Quota配置
bin/kafka-configs.sh --zookeeper node1.angyan.cn:2181 --alter --delete-config 'producer_byte_rate' --entity-type clients --entity-default bin/kafka-configs.sh --zookeeper node1.angyan.cn:2181 --alter --delete-config 'consumer_byte_rate' --entity-type clients --entity-default
三、Kafka实战
3.1、生产者
3.1.1、导入依赖
org.springframework.kafka spring-kafka3.1.2、配置文件
spring.kafka.angyan.bootstrap-servers=112.126.74.249:9092,112.126.74.249:9093 spring.kafka.angyan.clientId=TEST_DEMO_MESSAGE spring.kafka.angyan.producer.compressionType=gzip spring.kafka.angyan.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.angyan.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer # 提交延时 spring.kafka.angyan.linger.ms=1000 spring.kafka.angyan.template.defaultTopic=TEST_DEMO_TOPIC
3.1.3、发送消息
@Service public class KafkaServiceImpl implements KafkaService { @Qualifier("kafkaTemplate") @Autowired private KafkaTemplate kafkaRecordTemplate; @Override public String sendMessage(String key, byte[] bytes) { Map header = new HashMap(); header.put(KafkaHeaders.KEY,key); MessageHeaders messageHeaders = new MessageHeaders(header); Message message = MessageBuilder.createMessage(bytes, messageHeaders); kafkaRecordTemplate.send(message); return null; } }
3.2、消费者
3.2.1、配置类
pring.kafka.angyan.bootstrap-servers=112.126.74.249:9092,112.126.74.249:9093 spring.kafka.angyan.consumer.group.id=TEST_DEMO_MESSAGE spring.kafka.angyan.consumer.clientId=TEST_DEMO_MESSAGE spring.kafka.angyan.defaultTopic=TEST_DEMO_TOPIC spring.kafka.angyan.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.angyan.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
3.2.2、消费消息
@Component public class KafkaCustomer { @KafkaListener(topics = "${spring.kafka.angyan.defaultTopic}",containerFactory = "kafkaTemplateConsumer") public void testKafka(ConsumerRecord
record){ //处理业务逻辑 } }
猜你喜欢
- 10天前(七尚酒店百度百科)Lohkah七尚酒店首度开创充满新知的闽地研学旅程
- 10天前(天气预报 华为)2025HDC华为天气上新系统级天气智能体,引领更智能的气象服务
- 10天前(甘肃文化旅游宣传片)甘肃文旅推介走进重庆
- 10天前(云南滇陇工程咨询有限公司)陇滇携手谋发展 文旅合作谱新篇
- 10天前(新西兰“空降”上海:新西兰旅游局邀请你来“玩真的”!)新西兰“空降”上海:新西兰旅游局邀请你来“玩真的”!
- 10天前(“清透会呼吸”轻松拿捏春日出游氛围感)“清透会呼吸”轻松拿捏春日出游氛围感
- 10天前(曼谷丽思卡尔顿公寓价格)在曼谷丽思卡尔顿酒店CALEŌ 邂逅鸡尾酒的浪漫艺术
- 10天前(希尔顿集团2021年筹建的酒店)希尔顿集团两大重点项目亮相第四届上海旅游投资促进大会
- 10天前(天津四季酒店开业时间)天津四季酒店邀你开启灿烂暑假
- 10天前(大黄山景区高质量发展联盟成立多少年)大黄山景区高质量发展联盟成立
网友评论
- 搜索
- 最新文章
- (2020广州车展哈弗)你的猛龙 独一无二 哈弗猛龙广州车展闪耀登场
- (哈弗新能源suv2019款)智能科技颠覆出行体验 哈弗重塑新能源越野SUV价值认知
- (2021款全新哈弗h5自动四驱报价)新哈弗H5再赴保障之旅,无惧冰雪护航哈弗全民电四驱挑战赛
- (海南航空现况怎样)用一场直播找到市场扩张新渠道,海南航空做对了什么?
- (visa jcb 日本)优惠面面俱到 JCB信用卡邀您畅玩日本冰雪季
- (第三届“堡里有年味·回村过大年”民俗花灯会活动)第三届“堡里有年味·回村过大年”民俗花灯会活动
- (展示非遗魅力 长安启源助力铜梁龙舞出征)展示非遗魅力 长安启源助力铜梁龙舞出征
- (阿斯塔纳航空公司)阿斯塔纳航空机队飞机数量增至50架
- (北京香港航班动态查询)香港快运航空北京大兴新航线今日首航
- (我在港航“呵护”飞机 每一次安全着陆就是最好的荣誉)我在港航“呵护”飞机 每一次安全着陆就是最好的荣誉
- 热门文章