最近遇到了关于 RabbitMQ 的问题,打比方说:某个微服务模块中,RabbitMQ 的大部分消费者需要重试两次,而小部分消费者由于特殊原因并不需要进行重试。这就涉及到自定义重试次数的话题了,但在网上找了一圈没发现相关的,但是功夫不负有心人,最后还是解决了这个问题,接下来给大家分享一下~
目录
1 默认配置重试次数
2 自定义重试次数
2.1 消费者
① 配置文件
② 配置队列,绑定交换机
③ 消费者文件
2.2 生产者
① 配置文件
② 生产者文件
③ 测试文件
2.3 启动测试文件
1 默认配置重试次数
一般来说,关于 RabbitMQ 的重试次数是直接在配置文件中进行定义(比如 application.yml),那么所有的消费者都将遵循这个配置条件,比如 👇
spring.application.name=spirng-boot-rabbitmq spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=admin spring.rabbitmq.listener.simple.retry.enabled=true # 开启消费者重试机制 spring.rabbitmq.listener.simple.retry.max-attempts=3 # 最大重试次数 spring.rabbitmq.listener.simple.retry.initial-interval=3000 # 重试时间间隔
该配置中的 max-attempts 决定了消费者的重试次数,不过有一点需求注意:max-attempts 指的是尝试次数,就是说最开始消费的那一次也是计算在内的,那么 max-attempts: 3 便是重试两次,另外一次是正常消费~
同时消费者遵循的是本模块的 RabbitMQ 配置,并不会读取生产者的配置。打比方说,生产者模块配置重试 3 次,而消费者模块配置重试 1 次,那么生产者给消费者发送消息,消费者进行消费,如果触发了重试,消费者也只会重试一次,它只遵循消费者模块的配置!!
如上,默认配置重试次数就算完成了,但是并没有实现针对不同消费者的自定义重试功能,请继续看第二章内容。
2 自定义重试次数
以应用广泛的订阅模式为例,由于消费者和生产者配置不一,注意消费者和生产者不在同一模块!因此分开阐述:
2.1 消费者
主要配置是在消费者这!!
① 配置文件
对于消费者来说,该配置不仅起到了连接作用,同时也启动了重试机制,默认重试 2 次。
spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=admin spring.rabbitmq.listener.simple.retry.enabled=true # 开启消费者重试机制 spring.rabbitmq.listener.simple.retry.max-attempts=3 # 最大重试次数 spring.rabbitmq.listener.simple.retry.initial-interval=3000 # 重试时间间隔
② 配置队列,绑定交换机
package com.yinyu.consumer.rabbitmq; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer; import org.springframework.retry.interceptor.RetryOperationsInterceptor; @Configuration public class FanoutRabbitConfig { @Autowired private ConnectionFactory connectionFactory; //自定义工厂 @Bean public SimpleRabbitListenerContainerFactory listenerContainerFactory() { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(jsonMessageConverter()); factory.setConcurrentConsumers(3); factory.setMaxConcurrentConsumers(10); factory.setAdviceChain(retries()); return factory; } @Bean public RetryOperationsInterceptor retries() { return RetryInterceptorBuilder.stateless() .maxAttempts(1) //设置最大尝试次数为1(不重试) .backOffOptions(1000, 3.0, 10000) .recoverer(new RejectAndDontRequeueRecoverer()).build(); } @Bean public MessageConverter jsonMessageConverter() { return new Jackson2JsonMessageConverter(); } @Bean public Queue retryTest1() { return new Queue("yinyu.retryTest1"); } @Bean public Queue retryTest2() { return new Queue("yinyu.retryTest2"); } @Bean public Exchange topicExchange() { return new TopicExchange("yinyu");//交换机命名 } //队列绑定交换机 @Bean public ListallActivateBinding() { return Arrays.asList(BindingBuilder.bind( BindingBuilder.bind(retryTest1()).to(topicExchange()).with("yinyu.retryTest1").noargs(), BindingBuilder.bind(retryTest2()).to(topicExchange()).with("yinyu.retryTest2").noargs()); } }
③ 消费者文件
用于接收消息,设置了一个对照组,一个自定义配置,一个默认配置
package com.yinyu.consumer.rabbitmq; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class ReceiverA { @RabbitListener(queues = "yinyu.retryTest1", containerFactory = "listenerContainerFactory") public void retryReceiver1(Mapmap) { log.info("retryTest 自定义配置开始, key: {}", map.get("key")); if (!Objects.equals(map.get("key"), "yinyu")){ throw new RuntimeException("value 值匹配不准确,请重新进行请求!!"); } log.info("retryTest 结束"); } @RabbitListener(queues = "yinyu.retryTest2") public void retryReceiver2(Map map) { log.info("retryTest 默认配置开始, key: {}", map.get("key")); if (!Objects.equals(map.get("key"), "yinyu")){ throw new RuntimeException("value 值匹配不准确,请重新进行请求!!"); } log.info("retryTest 结束"); } }
2.2 生产者
生产者不需要过多的配置,它的作用是发送消息
① 配置文件
写在 application.properties 中,对于生产者来说奇起的是连接 rabbitmq 作用,如果它是调用其他模块的消费者,那么这个重试配置是不起作用的。
spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=admin spring.rabbitmq.listener.simple.retry.enabled=true # 开启消费者重试机制 spring.rabbitmq.listener.simple.retry.max-attempts=5 # 最大重试次数 spring.rabbitmq.listener.simple.retry.initial-interval=3000 # 重试时间间隔
② 生产者文件
用于发送消息,
package com.yinyu.producer.rabbitmq; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.text.SimpleDateFormat; import java.util.Date; @Component public class Sender { @Autowired private RabbitTemplate rabbitTemplate; public void retryProducer1(){ Mapmap = new HashMap<>(); map.put("key","yinyu自定义"); rabbitTemplate.convertAndSend("yinyu", "yinyu.retryTest1", map); } public void retryProducer2(){ Map map = new HashMap<>(); map.put("key","yinyu默认"); rabbitTemplate.convertAndSend("yinyu", "yinyu.retryTest2", map); } }
③ 测试文件
package com.yinyu.producer.rabbitmq; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest public class SenderTest { @Autowired private Sender sender; //测试自定义配置 @Test public void testCustomConfig() { sender.retryProducer1(); } //测试默认配置 @Test public void testDefaultConfig() { sender.retryProducer2(); } }
2.3 启动测试文件
有条件的各位可以启动一下生产者的测试文件中这两个方法,最终结果:
- retryProducer1 发送消息后,retryReceiver1 消费消息,虽然报错,但没有重试(遵循自定义配置)
- retryProducer2 发送消息后,retryReceiver2 消费消息,报错且重试 4 次(遵循默认配置)
完美实现自定义重试次数的需求!!
猜你喜欢
- 5天前(兰州旅游文化产业发展有限公司)甘肃省兰州市2023年乡村旅游暨A级旅游景区管理工作培训班开班
- 5天前(fender japan hybrid)Fender东京旗舰店盛大开幕在即,开售商品和店内服务提前揭晓
- 5天前(七尚酒店百度百科)Lohkah七尚酒店首度开创充满新知的闽地研学旅程
- 5天前(三亚太阳湾柏悦度假酒店)三亚太阳湾柏悦酒店携手ROSEONLY诺誓缔造浪漫七夕
- 5天前(兵团猛进秦剧团持续开展“戏曲进校园”活动)兵团猛进秦剧团持续开展“戏曲进校园”活动
- 5天前(澳涞山庄见证北欧零碳到中国实践,世界十佳环境保护城市榜单发布)澳涞山庄见证北欧零碳到中国实践,世界十佳环境保护城市榜单发布
- 5天前(苏梅岛普吉岛哪个好玩)苏梅岛金普顿基塔蕾度假酒店推出家庭度假套餐
- 5天前(星级饭店的发展困境)星级饭店转型之路:从市场逻辑到行业实践的深度探索
- 5天前(锦州新增两家国家aaa级旅游景区有哪些)锦州新增两家国家AAA级旅游景区
- 5天前(内蒙古交通旅游图)内蒙古着力提升交通与旅游服务水平
网友评论
- 搜索
- 最新文章
- (2020广州车展哈弗)你的猛龙 独一无二 哈弗猛龙广州车展闪耀登场
- (哈弗新能源suv2019款)智能科技颠覆出行体验 哈弗重塑新能源越野SUV价值认知
- (2021款全新哈弗h5自动四驱报价)新哈弗H5再赴保障之旅,无惧冰雪护航哈弗全民电四驱挑战赛
- (海南航空现况怎样)用一场直播找到市场扩张新渠道,海南航空做对了什么?
- (visa jcb 日本)优惠面面俱到 JCB信用卡邀您畅玩日本冰雪季
- (第三届“堡里有年味·回村过大年”民俗花灯会活动)第三届“堡里有年味·回村过大年”民俗花灯会活动
- (展示非遗魅力 长安启源助力铜梁龙舞出征)展示非遗魅力 长安启源助力铜梁龙舞出征
- (阿斯塔纳航空公司)阿斯塔纳航空机队飞机数量增至50架
- (北京香港航班动态查询)香港快运航空北京大兴新航线今日首航
- (我在港航“呵护”飞机 每一次安全着陆就是最好的荣誉)我在港航“呵护”飞机 每一次安全着陆就是最好的荣誉
- 热门文章