消息的可靠性
消息的可靠性保证:保证消息在传输过程中不会出现消息丢失的情况,确保发送的消息至少被消费一次。
消息的可靠性问题
消息从生产者生成,到消费者消费,大致可分为三个阶段,这三阶段都有可能出现消息丢失的情况
- 阶段一中:如果生产者实现代码中的交换机名称填写错误,那么在mq上找不到对应的交换机,发送的消息会出现丢失。
- 阶段二中:生产者实现代码中的routingKey为“a”,交换机与队列绑定的routingKey为“b”,这时交换机将消息发送到队列时,由于两个key不相等,找不到对应的队列,消息存储失败,丢失
- 阶段二中:消息默认存储在内存中,在消费者消费之前,如果mq服务器宕机,内存就会释放,消息出现丢失。
- 阶段三中:消费者消费消息后,会自动给mq服务端返回一个ack标志,然后mq将消息从队列中删除。如果消费者在获取到消息以后,然后在进行业务处理中消费者宕机了,这时这个消息没有被消费,但是由于之前已经返回了ack,所以mq中删除了这个消息,这时消息出现丢失。
保证生产者发送的消息不丢失
生产者确认机制:可以让生产者感知到消息是否正确发送给了交换机,如果消息正常到mq服务端的交换机,那么mq会返回一个ack给生产者表示发送消息接收到了。如果消息没有正常到交换机,那么mq服务端会给生产者一个nack。当返回nack时,在生产者端可重试消息的发送或其它处理。
生产者回退机制:可以让生产者感知到消息是否正确投递给了队列,交换机并不存储消息,消息只有到达队列才算是发送成功。如果交换机投递消息到队列失败了,这时队列会返回一个nack到生产者,在生产者端可重试消息的发送或其它处理。
配置文件中添加确认机制的配置
spring: rabbitmq: # 解决生产者发送消息到交换机失败的问题 # 异步回调,MQ返回结果到生产者端时会回调这个ConfirmCallback接口的实现 publisher-confirm-type: correlated # 解决交换机到队列路由失败的问题,失败则调用ReturnCallback函数 publisher-returns: true
自定义RabbitTemplate,实现回调机制,配置确认交换机和队列
@Slf4j @Configuration public class RabbitmqConfig { @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); /** * 接口ConfirmCallback confirmCallback:生产者把消息发送到交换机的结果回调 * correlationData:可以封装消息的ID,需要在发送消息时传入此参数,这里才能接收到,否则是null * boolean ack:消息发送的结果状态,发送成功是true,失败是false * String cause:发送失败的描述信息,如果发送成功是null。 */ rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> { if (ack) { // ack 消息发送到交换机成功 log.info("消息发送到交换机成功, ID:{}", correlationData.getId()); } else { // nack 消息发送到交换机失败 log.info("消息发送到交换机失败, ID:{}, 原因{}",correlationData.getId(), cause); /** * 消息发送失败后 --> 根据业务具体分析,然后处理 * 比如:所有要发送的消息在发送前都会记录在数据库的一张表里 * 记录字段包括:消息状态表示是否发送成功,消息ID,消息重试次数,消息内容等 * 发送失败后在这里可以根据ID从数据库重新获取到这条消息,然后如果重试次数为3, * 那么这里就重试发送3次,如果中间有一次成功了,在成功逻辑里将状态改为成功, * 或是超过3次,还是失败则不再重试。 */ } }); //定义消息从交换机路由到队列时失败的策略。true,则调用ReturnCallback;false:则直接丢弃消息 rabbitTemplate.setMandatory(true); //解决交换机到队列路由失败的问题,失败则调用ReturnCallback函数 rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> { log.info("交换机发送消息到队列失败,应答码{},原因{},交换机{},路由键{},消息{}", replyCode, replyText, exchange, routingKey, message.toString()); // 如果有业务需要,可以重发消息 }); return rabbitTemplate; } @Bean public DirectExchange confirmDirectExchange(){ return new DirectExchange("confirm_direct_exchange"); } @Bean public Queue confirmDirectQueue(){ return new Queue("confirm_direct_queue"); } @Bean public Binding bindingConfirmQueue(){ return BindingBuilder.bind(confirmDirectQueue()) .to(confirmDirectExchange()) .with("info_confirm"); } }
生产者实现代码
@SpringBootTest public class PublisherTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testConfirmDirectExchange() { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); String message = "hello confirm !"; //输出:消息发送到交换机成功, ID:4d435bc4-0c92-4f59-91ba-a592cc12af40 //rabbitTemplate.convertAndSend("confirm_direct_exchange","info_confirm",message,correlationData); /** * 消息发送到交换机失败, ID:a076d4bf-ae3d-4c3b-ac22-4a38d5095287, * 原因channel error; protocol method: #method
(reply-code=404, * reply-text=NOT_FOUND - no exchange 'confirm_direct_exchange_1' in vhost '/', * class-id=60, method-id=40) * 指在虚拟主机'/'中,没有交换机'confirm_direct_exchange_1' */ //rabbitTemplate.convertAndSend("confirm_direct_exchange_1","info_confirm",message,correlationData); /** * 消息发送到交换机成功, ID:3dc943df-c2cf-4562-937b-1485764413c5 * 交换机发送消息到队列失败,应答码312,原因NO_ROUTE,交换机confirm_direct_exchange, * 路由键info_confirm_1,消息(Body:'hello confirm !' ......) */ rabbitTemplate.convertAndSend("confirm_direct_exchange","info_confirm_1",message,correlationData); } } 消息持久化
生产者确认可以确保消息投递到RabbitMQ的队列中,但是消息发送到RabbitMQ以后,如果突然宕机,也可能导致消息丢失。要想确保消息在RabbitMQ中安全保存,必须开启消息持久化机制。
对交换机,队列,消息三者持久化。
交换机与队列持久化
SpringAMQP中声明的队列和交换机默认都是持久化的
@Bean public DirectExchange confirmDirectExchange(){ return new DirectExchange("confirm_direct_exchange"); }
如上交换机默认持久化,也可以选择多参数的构造方法自己设置,队列也是同样的设置。
消息的持久化
利用SpringAMQP发送消息时,可以设置消息的属性(MessageProperties),指定delivery-mode:持久化和非持久化,代码如下:
@Test public void testDurableMessage() { Message message = MessageBuilder.withBody("hello,durable,message" .getBytes(StandardCharsets.UTF_8)) //消息持久化模式 .setDeliveryMode(MessageDeliveryMode.PERSISTENT) .build(); CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend("confirm_direct_exchange","info_confirm",message,correlationData); }
所有发送的消息都会转为Message类型,底层默认情况下,SpringAMQP发出的任何消息都是持久化的,不用特意创建设置。
保证消费者消费的消息不丢失
消费者与mq之间的消息丢失问题描述:
RabbitMQ投递消息给消费者 ---> 消费者获取消息后,返回ack给RabbitMQ ---> RabbitMQ删除消息 ---> 消费者宕机,消息尚未处理,消息丢失。
所以消费者返回ack的时机非常重要,如果能等到处理完消息后再返回ack,消息就不会丢失。
SpringAMQP则允许配置三种消息确认模式:
- none:mq假定消费者获取消息后会成功处理,因此消息投递后立即被删除
- auto:由spring监测listener代码是否出现异常,没有异常则返回ack,删除队列消息;抛出异常则返回nack,不会将队列的消息删除。
- manual:手动实现,需要在业务代码结束后,调用api发送ack。
none模式
yml文件配置新增
spring: rabbitmq: listener: simple: acknowledge-mode: none
模拟一个消息处理异常
@RabbitListener(queues = "simple.queue") public void listenSimpleQueue(String msg) { log.info("消费者接收到simple.queue的消息:{}", msg); // 模拟异常 System.out.println(1 / 0); log.debug("消息处理完成!"); }
测试可以发现,当消息处理抛异常时,消息依然被mq删除了。
auto模式(默认模式)
yml配置新增(或者不配置,默认就是auto模式)
spring: rabbitmq: listener: simple: acknowledge-mode: auto
测试,抛出异常后,mq上消息没有删除
当消费者出现异常后,消息会不断(重入队)到队列,再重新发送给消费者,然后再次异常,再次入队到队列,无限循环,导致mq的消息处理飙升,带来不必要的压力:
可以使用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的到mq队列。
application.yml文件,添加内容:
spring: rabbitmq: listener: simple: acknowledge-mode: auto retry: enabled: true # 开启消费者失败重试 initial-interval: 1000 # 初始的失败等待时长为1秒 max-attempts: 3 # 最大重试次数
重新启动消费者服务,消费者会消费之前因错误未正常消费的消息。因为设置了重试机制,所以在重试完后,不会在循环发送消息,mq上会直接删除这条消息。所以重试达到最大次数后,Spring会返回ack,消息会被丢弃。 最后抛出重试耗尽的异常。
在以上的测试中,达到最大重试次数后,消息会被丢弃,这是由Spring内部机制决定的。
那么如果不想丢失这些发送失败的消息,而需要另做处理的话,可以通过MessageRecovery接口来实现。将失败后的消息投递到一个指定的,专门存放异常消息的队列,后续由人员自由处理。
实现代码如下:
@Bean public DirectExchange errorMessageExchange(){ return new DirectExchange("errorMessageExchange"); } @Bean public Queue errorMessageQueue(){ return new Queue("errorMessageQueue"); } @Bean public Binding errorBinding(){ return BindingBuilder.bind(errorMessageQueue()) .to(errorMessageExchange()) .with("error"); } @Bean public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){ //重试耗尽后,将失败消息投递到指定的交换机 return new RepublishMessageRecoverer(rabbitTemplate, "errorMessageExchange", "error"); }
测试,可以看到所有因异常导致消息处理错误,并重试完最大次数后还是失败的消息,都发送到了这个专门处理错误的队列里,这样即使是重试后仍然失败的消息也不会丢失。
消费端限流问题
实际项目中,可能在mq中堆积了成千上万的消息,如果不进行限流,当我们打开消费端时,这些消息都会一下子过来,造成服务器的宕机,所以需要进行消费者限流处理。
Rabbitmq提供了一种QoS(服务质量保证功能),来应对这种巨量的消息瞬间全部喷涌推送过来,通过Rabbitmq控制消费端的消费速度,进行必要的限流。
在yml配置中新增配置如下:
spring: rabbitmq: listener: simple: acknowledge-mode: manual # 开启手动ack prefetch: 3 # 每次只能获取3条消息,处理完成才能获取下次的3个消息 concurrency: 1 # 消费者最小数量为1 max-concurrency: 10 # 消费者最大数量为10
消费者端代码实现,新增两个消费者
@RabbitListener(bindings = @QueueBinding( value = @Queue("current_limit_queue"), exchange = @Exchange(value = "current_limit_exchange",type = "topic"), key = {"current.limit.#"} )) /** * message:封装了消息的相关信息,包括消息ID * channel:表示信道,封装Rabbitmq通信的相关消息的配置信息,如果当前消息被成功消费, * 通过信道进行标记(已消费),Rabbitmq获取到响应ack的确认信息。 */ public void currentLimitConsumer1(Message message, Channel channel) throws Exception { //模拟业务耗时3秒 TimeUnit.SECONDS.sleep(3); //消息的唯一标识 long deliveryTag = message.getMessageProperties().getDeliveryTag(); //手动确认消息是否接收,通过消息的id来指定这条消息被成功处理了,true表示这条消息被成功消费 channel.basicAck(deliveryTag,true); log.info("消费者currentLimitConsumer1接收到消息:{}", new String(message.getBody())); } @RabbitListener(bindings = @QueueBinding( value = @Queue("current_limit_queue"), exchange = @Exchange(value = "current_limit_exchange",type = "topic"), key = {"current.limit.#"} )) public void currentLimitConsumer2(Message message, Channel channel) throws Exception { TimeUnit.SECONDS.sleep(3); long deliveryTag = message.getMessageProperties().getDeliveryTag(); channel.basicAck(deliveryTag,true); log.info("消费者currentLimitConsumer2接收到消息:{}", new String(message.getBody())); }
生产者端代码实现
@Test public void testMessageLimitPushlisher() { String exchange = "current_limit_exchange"; String key = "current.limit.key"; for (int i = 1; i <= 1000; i++) { String message = "消息限流实现,这是第【" + i + "】条消息"; rabbitTemplate.convertAndSend(exchange,key,message); } }
测试:先启动消费者监听队列消息,然后启动生产者发送消息。限流效果如下:
Ready:表示处于队列中等待被消费者消费的消息数量。
Unacked:表示已被消费者取出但还未确认的消息数量。
死信交换机
死信交换机
当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):
-
消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false,表示消费失败的消息不重新加入队列,这样这个消息可以成为死信。
-
消息是一个过期消息,超时无人消费
-
要投递的队列消息满了,无法投递
如果这个包含死信的队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(DLX)。
大致流程如下:
给队列绑定一个死信交换机,这样这个队列如果出现死信的消息,就将这个消息发送到死信交换机
- 给队列设置dead-letter-exchange属性,指定一个交换机
- 给队列设置dead-letter-routing-key属性,设置死信交换机与死信队列的RoutingKey
配置文件中,配置ack模式acknowledge-mode=auto,配置retry重试机制。
定义普通队列,死信交换机,死信队列
@Bean public Queue commonQueue(){ return QueueBuilder.durable("common.queue") .deadLetterExchange("dl.direct") // 指定死信交换机 .deadLetterRoutingKey("dl_key") .build(); } @Bean public DirectExchange dlExchange(){ return new DirectExchange("dl.direct"); } @Bean public Queue dlQueue(){ return new Queue("dl.queue"); } @Bean public Binding dlBinding(){ return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("dl_key"); }
监听common.queue队列,由于有异常,因为模式为auto,设置了重试机制,所以消费失败后会按设置的重试次数去重试,重试完还是失败,这时这个消息会成为死信,因为这个队列绑定了死信交换机,所以这个死信的消息会发送到死信交换机里,间接传递到死信队列里
@RabbitListener(queues = "common.queue") public void listenCommonQueue(String msg) { log.info("消费者listenCommonQueue接收到消息:{}", msg); int i = 1 / 0; log.debug("消息处理完成!"); }
发送消息代码
@Test public void testCommonQueue() { String queueName = "common.queue"; String message = "hello, common.queue!"; rabbitTemplate.convertAndSend(queueName, message); }
最终消息到了死信队列里,防止了消息的丢失
消息的延迟消费
要求:发送消息后,消费者需要过10秒后才能消费到。
实现流程:消息发送到延迟队列(ttl.queue),过10秒后消息过期,将这个消息发送到死信交换机,转到死信队列,消费者监听死信队列,这样就可以在10s后获取到这个消息。
配置延迟队列,并绑定死信交换机
@Bean public Queue ttlQueue(){ return QueueBuilder.durable("ttl.queue") .ttl(10000) // 设置队列的超时时间,10秒 .deadLetterExchange("dl.direct") .deadLetterRoutingKey("dl_key") .build(); } @Bean public DirectExchange ttlExchange(){ return new DirectExchange("ttl.exchange"); } @Bean public Binding ttlBinding(){ return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl"); }
发送消息到延迟队列,队列设置了延迟时间10s,10s后这条消息成为死信,传递到死信交换机
@Test public void testTtlQueue() { String message = "这是一条延迟的消息"; rabbitTemplate.convertAndSend("ttl.exchange","ttl",message); }
消费端监听死信队列,10s后获取到过期消息。
@RabbitListener(queues = "dl.queue") public void listenDlQueue(String msg){ log.info("消费者从死信队列里获取到消息:{}",msg); }
消息超时方法:除了给队列设置ttl属性,也可以在消息处设置过期时间
如果两个都设置了,哪个时间短就使用哪个。
@Test public void testTTLMsg() { Message message = MessageBuilder .withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8)) //设置这条消息的过期时间为5秒 .setExpiration("5000") .build(); rabbitTemplate.convertAndSend("ttl.exchange","ttl",message); }
延迟队列
通过死信交换机和TTL实现消息的延时消费,配置起来相对麻烦,可以直接用延迟队列实现。
RabbitMQ的官方也推出了DelayExchange插件,实现延迟队列效果。
- 生产者将消息(msg)和路由键(routekey)发送指定的延时交换机(exchange)上
- 延时交换机(exchange)存储消息等待消息到期根据路由键(routekey)找到绑定自己的队列
(queue)并把消息给它
- 队列(queue)再把消息发送给监听它的消费者(customer)
下载地址:Releases · rabbitmq/rabbitmq-delayed-message-exchange · GitHub
将插件拷贝到rabbitmq-server的安装路径:/usr/lib/rabbitmq/lib/rabbitmq_server-
3.8.4/plugins
启用插件:rabbitmq-plugins enable rabbitmq_delayed_message_exchange
查看插件:rabbitmq-plugins list
重启Rabbitmq服务:systemctl restart rabbitmq-server
交换机指定属性 delayed = " true " ,代码实现如下:
@RabbitListener(bindings = @QueueBinding( value = @Queue("delay_queue"), //交换机的类型可以是任意类型,只需要设定delayed属性为true即可。 exchange = @Exchange(value = "delay_exchange",delayed = "true"), key = "delay" )) public void listenDelayedQueue(String msg) { log.info("消费者listenDelayedQueue接收到消息:{}",msg); }
发送消息,要携带x-delay属性,指定延迟的时间:
@Test public void testDelayedMsg() { Message message = MessageBuilder .withBody("hello, delayed message".getBytes(StandardCharsets.UTF_8)) .setHeader("x-delay",10000)//延迟10s .build(); rabbitTemplate.convertAndSend("delay_exchange","delay",message); }
消息的重复消费问题
消息的重复消费问题:指一条消息被消费者多次消费,在一些特殊的业务中是不能允许的。
重复消费场景示例如下:
解决消息重复消费问题:需要在消费端考虑消息的幂等性
幂等性:指对一个接口的多次调用其结果都是一样的。
解决方法:让生产者发送每条消息的时候,里面加一个全局唯一的 id,类似订单 id 之类的东西,然后你这里消费到了之后,先根据这个 id 去比如 redis 里查一下,之前消费过吗?(或是直接设计一张数据库表,表字段设为唯一约束存这个id,查到就表示消费过了),如果没有消费过,你就处理,然后这个 id 写 redis。如果消费过了,那就不处理了,保证别重复处理相同的消息即可。
-
猜你喜欢
网友评论
- 搜索
- 最新文章
- 热门文章