手动提交offset
虽然offset十分遍历,但是由于其是基于时间提交的,开发人员难以把握offset提交的实际。因此Kafka还提供了手动提交offset的API
手动提交offset的方法有两种:分别commitSync(同步提交)和commitAsync(异步提交)。两者的相同点是,都会将本次提交的一批数据最高的偏移量提交:不同点是,同步提交阻塞当前线程,一致到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败)而异步提交则没有重试机制,故有可能提交失败。
commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据。
commitAsync(异步提交):发送完提交offset请求后,就开始消费下一批数据了
同步提交
是否自动提交offset properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
同步提交offset kafkaConsumer.commitSync();
由于同步提交offset有失败重试机制,故更加可靠,但是由于一致等待提交结果,提交的效率比较低。以下为同步提交offset的示例
package com.longer.handsync; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.ArrayList; import java.util.Properties; public class CustomConsumerByHandSync { public static void main(String[] args) { //创建消费者的配置对象 Properties properties=new Properties(); //2、给消费者配置对象添加参数 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092"); //配置序列化 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName()); //配置消费者组(组名任意起名)必须 properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test"); //修改分区策略 properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor"); // properties.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG,"false"); //是否自动提交offset properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false); //创建消费者对象 KafkaConsumerkafkaConsumer=new KafkaConsumer (properties); //注册要消费的主题 ArrayList topics=new ArrayList<>(); topics.add("two"); kafkaConsumer.subscribe(topics); while (true){ //设置1s中消费一批数据 ConsumerRecords consumerRecords=kafkaConsumer.poll(Duration.ofSeconds(1)); //打印消费到的数据 for(ConsumerRecord record:consumerRecords){ System.out.println(record); } //同步提交offset kafkaConsumer.commitSync(); } } }
异步提交
虽然同步提交offset更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会收到很大的影响,因此更多情况下会选择异步offset的方式
kafkaConsumer.commitAsync();
package com.longer.handasync; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.ArrayList; import java.util.Properties; /** * 同步提交 */ public class CustomConsumerByHandAsync { public static void main(String[] args) { //创建消费者的配置对象 Properties properties=new Properties(); //2、给消费者配置对象添加参数 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092"); //配置序列化 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName()); //配置消费者组(组名任意起名)必须 properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test"); //修改分区策略 properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor"); // properties.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG,"false"); //是否自动提交offset properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false); //创建消费者对象 KafkaConsumerkafkaConsumer=new KafkaConsumer (properties); //注册要消费的主题 ArrayList topics=new ArrayList<>(); topics.add("two"); kafkaConsumer.subscribe(topics); while (true){ //设置1s中消费一批数据 ConsumerRecords consumerRecords=kafkaConsumer.poll(Duration.ofSeconds(1)); //打印消费到的数据 for(ConsumerRecord record:consumerRecords){ System.out.println(record); } //同步提交offset kafkaConsumer.commitAsync(); } } }
指定 Offset 消费
auto.offset.reset = earliest | latest | none 默认是latest
当Kafka中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量时(例如该数据已被删除),该怎么办?
1)earliest:自动将偏移量重置为最早的偏移量,–from-beginning
2) latest(默认值):自动将偏移量重置为最新偏移量
3)如果未找到消费者组的先前偏移量,则向消费者抛出异常。
主要代码
Setassigment=new HashSet<>(); while (assigment.size()==0){ kafkaConsumer.poll(Duration.ofSeconds(1)); //获取消费者分区分配信息(有了分区分配信息才能开始消费) assigment= kafkaConsumer.assignment(); } //遍历所有分区,并指定从100得位置开始消费 for (TopicPartition tp : assigment) { kafkaConsumer.seek(tp,100); }
package com.longer.seek; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.ArrayList; import java.util.HashSet; import java.util.Properties; import java.util.Set; public class CustomConsumerSeek { public static void main(String[] args) { //创建消费者的配置对象 Properties properties=new Properties(); //2、给消费者配置对象添加参数 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092"); //配置序列化 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName()); //配置消费者组(组名任意起名)必须 properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test"); //修改分区策略 properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor"); // properties.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG,"false"); //创建消费者对象 KafkaConsumerkafkaConsumer=new KafkaConsumer (properties); //注册要消费的主题 ArrayList topics=new ArrayList<>(); topics.add("two"); kafkaConsumer.subscribe(topics); Set assigment=new HashSet<>(); while (assigment.size()==0){ kafkaConsumer.poll(Duration.ofSeconds(1)); //获取消费者分区分配信息(有了分区分配信息才能开始消费) assigment= kafkaConsumer.assignment(); } //遍历所有分区,并指定从100得位置开始消费 for (TopicPartition tp : assigment) { kafkaConsumer.seek(tp,100); } while (true){ //设置1s中消费一批数据 ConsumerRecords consumerRecords=kafkaConsumer.poll(Duration.ofSeconds(1)); //打印消费到的数据 for(ConsumerRecord record:consumerRecords){ System.out.println(record); } } } }
猜你喜欢
- 6天前(从“见世面”到“内在需要”:在海南,追问旅行的意义)从“见世面”到“内在需要”:在海南,追问旅行的意义
- 6天前(2020海丝之路文化博览会)2023海丝之路文化和旅游博览会开幕
- 6天前(三亚太阳湾柏悦度假酒店)三亚太阳湾柏悦酒店携手ROSEONLY诺誓缔造浪漫七夕
- 6天前(天气预报 华为)2025HDC华为天气上新系统级天气智能体,引领更智能的气象服务
- 6天前(云南南博会展馆)旅居云南馆亮相第9届南博会
- 6天前(新西兰航空官方网站)新西兰航空85周年焕新启航 全方位客舱升级,飞「悦」快意时光
- 6天前(武隆旅游门票)炸了!519中国旅游日武隆甩出王炸福利,59.9元通玩6大景点?!
- 6天前(辽宁新增6个国家4a级旅游景区有哪些)辽宁新增6个国家4A级旅游景区
- 6天前(“三天跨两城”催生租车新需求,神州租车清明跨城订单同比增长416%)“三天跨两城”催生租车新需求,神州租车清明跨城订单同比增长416%
- 6天前(筑格集团有限公司)洲际酒店集团旗下筑格酒店品牌正式亮相大中华区
网友评论
- 搜索
- 最新文章
- (2020广州车展哈弗)你的猛龙 独一无二 哈弗猛龙广州车展闪耀登场
- (哈弗新能源suv2019款)智能科技颠覆出行体验 哈弗重塑新能源越野SUV价值认知
- (2021款全新哈弗h5自动四驱报价)新哈弗H5再赴保障之旅,无惧冰雪护航哈弗全民电四驱挑战赛
- (海南航空现况怎样)用一场直播找到市场扩张新渠道,海南航空做对了什么?
- (visa jcb 日本)优惠面面俱到 JCB信用卡邀您畅玩日本冰雪季
- (第三届“堡里有年味·回村过大年”民俗花灯会活动)第三届“堡里有年味·回村过大年”民俗花灯会活动
- (展示非遗魅力 长安启源助力铜梁龙舞出征)展示非遗魅力 长安启源助力铜梁龙舞出征
- (阿斯塔纳航空公司)阿斯塔纳航空机队飞机数量增至50架
- (北京香港航班动态查询)香港快运航空北京大兴新航线今日首航
- (我在港航“呵护”飞机 每一次安全着陆就是最好的荣誉)我在港航“呵护”飞机 每一次安全着陆就是最好的荣誉
- 热门文章