目录
RabbitMQ如何保证消息不丢失?
哪些环节会有丢消息的可能?
RabbitMQ消息零丢失方案
1. 生产者保证消息正确发送到RibbitMQ
2. RabbitMQ消息存盘不丢消息
3. RabbitMQ 主从消息同步时不丢消息
4. RabbitMQ消费者不丢失消息
如何保证消息幂等?
如何保证消息的顺序?
关于RabbitMQ的数据堆积问题
RabbitMQ如何保证消息不丢失?
哪些环节会有丢消息的可能?
其中,1,2,4三个场景都是跨网络的,而跨网络就肯定会有丢消息的可能。关于3这个环节,通常MQ存盘时都会先写入操作系统的缓存page cache中,然后再由操作系统异步的将消息写入硬盘。这个中间有个时间差,就可能会造成消息丢失。如果服务挂了,缓存中还没有来得及写入硬盘的消息就会丢失。这也是任何用户态的应用程序无法避免的。
RabbitMQ消息零丢失方案
1. 生产者保证消息正确发送到RibbitMQ
对于单个数据,可以使用生产者确认机制。通过多次确认的方式,保证生产者的消息能够正确的发送到RabbitMQ中。
RabbitMQ的生产者确认机制分为同步确认和异步确认。同步确认主要是通过在生产者端使用Channel.waitForConfirmsOrDie()指定一个等待确认的完成时间。异步确认机制则是通过channel.addConfirmListener(ConfirmCallback var1, ConfirmCallback var2)在生产者端注入两个回调确认函数。第一个函数是在生产者消息发送成功时调用,第二个函数则是生产者消息发送失败时调用。两个函数需要通过sequenceNumber自行完成消息的前后对应。sequenceNumber的生成方式需要通过channel的序列获取。int sequenceNumber = channel.getNextPublishSeqNo();之前文章中做过介绍。
当前版本的RabbitMQ,可以在Producer中添加一个ReturnListener,监听那些成功发到Exchange,但是却没有路由到Queue的消息。如果不想将这些消息返回给Producer,就可以在Exchange中,也可以声明一个alternate-exchange参数,将这些无法正常路由的消息转发到指定的备份Exchange上。
如果发送批量消息,在RabbitMQ中,另外还有一种手动事务的方式,可以保证消息正确发送。手动事务机制主要有几个关键的方法: channel.txSelect() 开启事务; channel.txCommit() 提交事务; channel.txRollback() 回滚事务; 用这几个方法来进行事务管理。但是这种方式需要手动控制事务逻辑,并且手动事务会对channel产生阻塞,造成吞吐量下降。
2. RabbitMQ消息存盘不丢消息
对于Classic经典队列,直接将队列声明成为持久化队列即可。而新增的Quorum队列和Stream队列,都是明显的持久化队列,能更好的保证服务端消息不会丢失。
3. RabbitMQ 主从消息同步时不丢消息
RabbitMQ的集群架构。普通集群模式,消息是分散存储的,不会主动进行消息同步了,是有可能丢失消息的。镜像模式集群,数据会主动在集群各个节点当中同步,这时丢失消息的概率不会太高。另外,启用Federation联邦机制,给包含重要消息的队列建立一个远端备份,也可以降低消息丢失的概率。
4. RabbitMQ消费者不丢失消息
RabbitMQ在消费消息时可以指定是自动应答,还是手动应答。如果是自动应答模式,消费者会在完成业务处理后自动进行应答,而如果消费者的业务逻辑抛出异常,RabbitMQ会将消息进行重试,这样是不会丢失消息的,但是有可能会造成消息一直重复消费。
将RabbitMQ的应答模式设定为手动应答可以提高消息消费的可靠性。
channel.basicConsume(queueName, false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { long deliveryTag = envelope.getDeliveryTag(); channel.basicAck(deliveryTag, false); } }); channel.basicConsume(queueName, true, myconsumer);
任何用户态的应用程序都无法保证绝对的数据安全,所以备份与恢复的方案都需要考虑。
如何保证消息幂等?
当消费者消费消息处理业务逻辑时,如果抛出异常,或者不向RabbitMQ返回响应,默认情况下,RabbitMQ会无限次数的重复进行消息消费。处理幂等问题,可以设定RabbitMQ的重试次数。在SpringBoot集成RabbitMQ时,可以在配置文件中指定spring.rabbitmq.listener.simple.retry开头的一系列属性,来制定重试策略。 然后,需要在业务上处理幂等问题。
处理幂等问题的关键是要给每个消息一个唯一的标识。在SpringBoot框架集成RabbitMQ后,可以给每个消息指定一个全局唯一的MessageID,在消费者端针对MessageID做幂等性判断。
//这里用的message要是org.springframework.amqp.core.Message //发送者指定ID字段 Message message2 = MessageBuilder.withBody(message.getBytes()).setMessageId(UUID.randomUUID().toString()).build(); rabbitTemplate.send(message2); //消费者获取MessageID,自己做幂等性判断 @RabbitListener(queues = "fanout_email_queue") public void process(Message message) throws Exception { // 获取消息Id String messageId = message.getMessageProperties().getMessageId(); ... }
原生API当中,也是支持MessageId的。比如,针对订单消息,那就用订单ID来做唯一键。在RabbitMQ中,消息的头部就是一个很好的携带数据的地方。
// ==== 发送消息时,携带sequenceNumber和orderNo AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.deliveryMode(MessageProperties.PERSISTENT_TEXT_PLAIN.getDeliveryMode()); builder.priority(MessageProperties.PERSISTENT_TEXT_PLAIN.getPriority()); //携带消息ID builder.messageId(""+channel.getNextPublishSeqNo()); Mapheaders = new HashMap<>(); //携带订单号 headers.put("order", "123"); builder.headers(headers); channel.basicPublish("", QUEUE_NAME, builder.build(), message.getBytes("UTF-8")); // ==== 接收消息时,拿到sequenceNumber Consumer myconsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { //获取消息ID System.out.println("messageId:"+properties.getMessageId()); //获取订单ID properties.getHeaders().forEach((key,value)-> System.out.println("key: "+key +"; value: "+value)); // (process the message components here ...) //消息处理完后,进行答复。答复过的消息,服务器就不会再次转发。 //没有答复过的消息,服务器会一直不停转发。 channel.basicAck(deliveryTag, false); } }; channel.basicConsume(QUEUE_NAME, false, myconsumer);
如何保证消息的顺序?
某些场景下,需要保证消息的消费顺序,例如一个下单过程,需要先完成扣款,然后扣减库存,然后通知快递发货,这个顺序不能乱。如果每个步骤都通过消息进行异步通知的话,这一组消息就必须保证他们的消费顺序是一致的。
在RabbitMQ当中,针对消息顺序的设计其实是比较弱的。唯一比较好的策略就是 单队列+单消息推送。即一组有序消息,只发到一个队列中,利用队列的FIFO特性保证消息在队列内顺序不会乱。显然这是以极度消耗性能作为代价的,在实际适应过程中,应该尽量避免这种场景。然后在消费者进行消费时,保证只有一个消费者,同时指定prefetch属性为1,即每次RabbitMQ都只往客户端推送一个消息。
spring.rabbitmq.listener.simple.prefetch=1
在多队列情况下,如何保证消息的顺序性,目前使用RabbitMQ的话,还没有比较好的解决方案。在使用时,应该尽量避免这种情况。
关于RabbitMQ的数据堆积问题
RabbitMQ一直以来都有一个缺点,就是对于消息堆积问题的处理不好。当RabbitMQ中有大量消息堆积时,整体性能会严重下降。而目前新推出的Quorum队列以及Stream队列,目的就在于解决这个核心问题。但是这两种队列的稳定性和周边生态都还不够完善,因此,在使用RabbitMQ时,还是要非常注意消息堆积的问题。尽量让消息的消费速度和生产速度保持一致。
如果确实出现了消息堆积比较严重的场景,就需要从数据流转的各个环节综合考虑,设计适合的解决方案。
消息生产者端
最明显的方式自然是降低消息生产的速度。但是,生产者端产生消息的速度通常是跟业务息息相关的,一般情况下不太好直接优化。可以选择尽量多采用批量消息的方式,降低IO频率。
RabbitMQ服务端
RabbitMQ本身其实也在着力于提高服务端的消息堆积能力。对于消息堆积严重的队列,可以预先添加懒加载机制,或者创建Sharding分片队列,这些措施都有助于优化服务端的消息堆积能力。
消息消费者端
最直接的方式,就是增加消费者数量。尤其当消费端的服务出现问题,已经有大量消息堆积时。这时,可以尽量多的申请机器,部署消费端应用,争取在最短的时间内消费掉积压的消息。
对于单个消费者端,可以通过配置提升消费者端的吞吐量。例如
# 单次推送消息数量 spring.rabbitmq.listener.simple.prefetch=1 # 消费者的消费线程数量 spring.rabbitmq.listener.simple.concurrency=5
灵活配置相关参数,能够在一定程度上调整每个消费者实例的吞吐量,减少消息堆积数量。当遇到紧急状况,来不及调整消费者端时,可以紧急上线一个消费者组,专门用来将消息快速转录。保存到数据库或者Redis,然后再慢慢进行处理。
猜你喜欢
网友评论
- 搜索
- 最新文章
- 热门文章