1、简介
之前博客中记录了直接使用Kafka客户端实现生产者和消费者之间的交互,这种方式通过设置各种参数编码繁琐,因此通过SpringBoot集成Kafka成为一种常用的实现,下面就详细介绍 SpringBoot 是如何和Kafka进行集成的,本文主要参考官网进行学习(Messaging)。
2、引入依赖
org.springframework.kafka spring-kafka
3、生产者
3.1、生产者配置文件
spring: kafka: bootstrap-servers: node-1:9092,node-2:9092 producer: # 指定key-value 序列化方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer # 指定缓冲区一批大小,默认16k batch-size: * # 指定缓冲区总大小,默认32m buffer-memory: * # 指定消息确认方式 acks: -1 # 指定消息发送压缩方式 compression-type: snappy # 配置额外参数 properties: # 自定义分区 partitioner.class: # 指定发送延迟时间,默认0ms linger.ms:
3.2、java代码实现生产者发送消息
public class KafkaProducerController { @Autowired private KafkaTemplatetemplate; public String test(){ // 发送消息,send方法有不同的重载 template.send("topic1", "hello long!"); return "ok!"; } }
3.3、自定义 KafkaTemplate
通过自定义的 KafkaTemplate 可以快速指定需要的生产者参数,能够做到高度可控,灵活编码。
@SpringBootConfiguration @ConfigurationProperties(prefix="kafka") public class KafkaProducerConfig { private String bootstrap_servers_config; private String retries_config; private String batch_size_config; private String linger_ms_config; private String buffer_memory_config; private String topic; @Bean public KafkaTemplate kafkaTemplate(){ HashMapconfigs = new HashMap<>(); configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrap_servers_config); configs.put(ProducerConfig.RETRIES_CONFIG,retries_config); configs.put(ProducerConfig.BATCH_SIZE_CONFIG,batch_size_config); configs.put(ProducerConfig.LINGER_MS_CONFIG,linger_ms_config); configs.put(ProducerConfig.BUFFER_MEMORY_CONFIG,buffer_memory_config); configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,RoundRobinPartitioner.class); //设置序列化 configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class); //设置自定义分区 DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory(configs); return new KafkaTemplate(producerFactory); } }
4、消费者
4.1、消费者配置文件
spring: kafka: bootstrap-servers: node-1:9092,node-2:9092 consumer: key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer group-id: test # 指定消费者组
4.2、消费者java代码实现
@SpringBootConfiguration public class KafkaConsumerConfig { @KafkaListener(topics = {"topic1"}) public void consumer(String msg){ System.out.println("consumer massage from kafka: " + msg); } }
4.3、@KafkaListener参数详解
注:topicPartitions和topics、topicPattern不能同时使用
示例如下:
@KafkaListener(id = "test1", // 监听器ID(唯一) groupId = "test", // 设置消费者组 topicPartitions = { // 配置topic和分区:有两个topic,分别为topic1、topic2,topic1只接收分区0,2的消息; // topic2接收分区0和分区1的消息,但是分区1的消费者初始位置为10 @TopicPartition(topic = "topic1", partitions = { "0", "2" }), @TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "9"))}, properties = {"enable.auto.commit:false","max.poll.interval.ms:6000"})
参数详解:
参数 | 描述 |
topic | 指定要监听哪些topic(与topicPattern、topicPartitions 三选一) |
topicPattern | 匹配Topic进行监听(与topics、topicPartitions 三选一) |
topicPartitions | 显式分区分配 |
errorHandler | 异常处理,填写 beanName |
properties | 配置其他属性 |
注意:
1)、消费者组使用优先级:groupId > id > 配置文件中指定的消费者组。
2)、如果 groupId 不存在,id 存在,但是在注解中将 idIsGroup 设置为 false,则使用配置文件中的消费者组。
3)、@KafkaListener 注解中和配置文件中的相同配置优先级高于配置文件。
4.4、自定义消费者异常
@Component public class KafkaDefaultListenerErrorHandler implements KafkaListenerErrorHandler { @Override public Object handleError(Message> message, ListenerExecutionFailedException exception) { return null; } @Override public Object handleError(Message> message, ListenerExecutionFailedException exception, Consumer, ?> consumer) { //do someting return null; } }
5、总结
本文详细介绍 SpringBoot 集成 kafka,举例说明生产者和消费者的使用方式,以及一些自定义参数如何配置,帮助大家进一步熟悉在 SpringBoot 框架下 kafka的使用。关于kafka 幂等性、事务等更高级用法,将会在公众号分享。
本人是一个从小白自学计算机技术,对运维、后端、各种中间件技术、大数据等有一定的学习心得,想获取自学总结资料(pdf版本)或者希望共同学习,关注微信公众号:it自学社团。后台回复相应技术名称/技术点即可获得。(本人学习宗旨:学会了就要免费分享)
猜你喜欢
- 5天前(希尔顿2021活动)希尔顿集团618盛夏大促开启
- 5天前(安徽民宿发展报告)首届安徽省乡村民宿创意设计大赛启动
- 5天前(甘州区文化旅游局)2025甘津文旅资源对接推介会在兰州举办
- 5天前(罗马尼亚的匈牙利族自治)江苏赴匈牙利、罗马尼亚开展文旅交流推广活动
- 5天前(2025年“文化和自然遗产日”广东主会场活动举办)2025年“文化和自然遗产日”广东主会场活动举办
- 5天前(新西兰航空官方网站)新西兰航空85周年焕新启航 全方位客舱升级,飞「悦」快意时光
- 5天前(美诺酒店集团旗下臻选品牌m collection)美诺酒店集团启动盛橡品牌战略焕新 开启全球扩张新篇章
- 5天前(“百场黄梅唱响百家景区”示范演出活动在黄山风景区举行)“百场黄梅唱响百家景区”示范演出活动在黄山风景区举行
- 5天前(携程租车加盟合作)携程租车加盟优势全解析:开启旅游出行市场新篇章
- 5天前(第三届“堡里有年味·回村过大年”民俗花灯会活动)第三届“堡里有年味·回村过大年”民俗花灯会活动
网友评论
- 搜索
- 最新文章
- (2020广州车展哈弗)你的猛龙 独一无二 哈弗猛龙广州车展闪耀登场
- (哈弗新能源suv2019款)智能科技颠覆出行体验 哈弗重塑新能源越野SUV价值认知
- (2021款全新哈弗h5自动四驱报价)新哈弗H5再赴保障之旅,无惧冰雪护航哈弗全民电四驱挑战赛
- (海南航空现况怎样)用一场直播找到市场扩张新渠道,海南航空做对了什么?
- (visa jcb 日本)优惠面面俱到 JCB信用卡邀您畅玩日本冰雪季
- (第三届“堡里有年味·回村过大年”民俗花灯会活动)第三届“堡里有年味·回村过大年”民俗花灯会活动
- (展示非遗魅力 长安启源助力铜梁龙舞出征)展示非遗魅力 长安启源助力铜梁龙舞出征
- (阿斯塔纳航空公司)阿斯塔纳航空机队飞机数量增至50架
- (北京香港航班动态查询)香港快运航空北京大兴新航线今日首航
- (我在港航“呵护”飞机 每一次安全着陆就是最好的荣誉)我在港航“呵护”飞机 每一次安全着陆就是最好的荣誉
- 热门文章