RabbitMQ----生产者可靠性
生产者可靠性主要分为两个方面:
- 生产者重连
- 生产者确认
一、生产者重连
有的时候由于网络波动,可能会出现客户端连接MO失败的情况。通过配置我们可以开启连接失败后的重连机制:
spring: rabbitmq : connection-timeout: 1s # 设MO的连接超时时间 template: retry: enabled: true # 开户超时重试机制 initial-interval: 1ms # 失败后的初始等待时间 multiplier:1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier max-attempts:3 # 最大重试次数
注意:
当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMOP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的,会影响业务性能。
如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。
二、生产者确认
RabbitMQ有Publisher Confirm和Publisher Return两种确认机制。开启确机制认后,在MO成功收到消息后会返回确认消息给生产者。返回的结果有以下几种情况:
- 消息投递到了MQ,但是路由失败。此时会通过Publisher Return返回路由异常原因,然后返回ACK,告知投递成功
- 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
- 持久消息投递到了MO,并且入队完成持久化,返回ACK ,告知投递成功
- 其它情况都会返回NACK,告知投递失败
Publisher Return确认机制只有在交换机路由失败的时候才return返回ACK
SpringAMQP实现生产者确认
1、在publisher(生产者)这个微服务的application.yml中添加配置:
spring: rabbitmq: publisher-confirm-type: correlated # 开ublisher confirm机制,并设置confirm类型 publisher-returns: true # 开publisher return机制(交换机路由失败时返回ACK,大部分原因是开发问题)
配置说明:这里publisher-confirm-type有三种模式可选
- none:关闭confirm机制
- simple:同步阻塞等待MO的回执消息
- correlated:MO异步回调方式返回回执消息
2、每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置:
@Slf4j @Configuration public class configConfirmConfig implements ApplicationContextAware { @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { //huoRabbitTemplate RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); //设置ReturnsCallback rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returned) { log.debug("收到return callback,exchange:{},key:{},msg:{},code:{},text:{}", returned.getExchange(), returned.getRoutingKey(),returned.getMessage(), returned.getReplyCode(),returned.getReplyText()); } }); } }
3、发送消息,指定消息ID,消息ConfirmCallback
@Test void testConfirmCallback() { //1、创建CorrelationData CorrelationData data = new CorrelationData(UUID.randomUUID().toString()); //2、添加ConfirmCallback data.getFuture().addCallback(new ListenableFutureCallback
() { @Override public void onFailure(Throwable ex) { log.error("消息回调失败",ex); } @Override public void onSuccess(CorrelationData.Confirm result) { log.debug("收到confirm callback 回执"); if (result.isAck()){ //消息发送成功 log.debug("消息发送成功,收到ack"); }else { //消息发送失败,这里需要重发消息 log.error("消息发送失败,收到nack, 原因:{}",result.getReason()); } } }); rabbitTemplate.convertAndSend("hmall.direct","blue","hello",data); } 总结
SpringAMQP中生产者消息确认的几种返回值情况:
- 消息投递到了MQ,但是路由失败。会return路由异常原因,返回ACK
- 临时消息投递到了MQ,并且入队成功,返回ACK
- 持久消息投递到了MQ,并且入队完成持久化,返回ACK
- 其它情况都会返回NACK,告知投递失败
如何处理生产者的确认消息?
- 生产者确认需要额外的网络和系统资源开销,尽量不要使用
- 如果一定要使用,无需开启Publisher-Return机制,因为一般路由失败是自己业务问题
- 对于nack消息可以有限次数重试,依然失败则记录异常消息
猜你喜欢
- 2小时前新手Python环境配置以及pip安装教程
- 2小时前一文搞懂Python文件读取报错UnicodeDecodeError: ‘gbk‘ codec can‘t decode byte
- 2小时前低代码技术杂谈
- 2小时前【开源】基于JAVA的河南软件客服系统
- 2小时前【Python】Python读写.xlsx文件(基本操作、空值补全等)
- 2小时前年龄性别预测2:Pytorch实现年龄性别预测和识别(含训练代码和数据)
- 2小时前TCP 的三次握手和四次挥手
- 2小时前js下载pdf文件并预览(base64),但文件太大无法正常显示
- 2小时前使用Docker部署PDF多功能工具Stirling-PDF
- 2小时前6个提升Python编程能力的PyCharm插件
网友评论
- 搜索
- 最新文章
- 热门文章