上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

RabbitMQ高级特性

guduadmin118小时前

消息的可靠性

消息的可靠性保证:保证消息在传输过程中不会出现消息丢失的情况,确保发送的消息至少被消费一次。

消息的可靠性问题

消息从生产者生成,到消费者消费,大致可分为三个阶段,这三阶段都有可能出现消息丢失的情况

RabbitMQ高级特性,第1张

  • 阶段一中:如果生产者实现代码中的交换机名称填写错误,那么在mq上找不到对应的交换机,发送的消息会出现丢失。
  • 阶段二中:生产者实现代码中的routingKey为“a”,交换机与队列绑定的routingKey为“b”,这时交换机将消息发送到队列时,由于两个key不相等,找不到对应的队列,消息存储失败,丢失
  • 阶段二中:消息默认存储在内存中,在消费者消费之前,如果mq服务器宕机,内存就会释放,消息出现丢失。
  • 阶段三中:消费者消费消息后,会自动给mq服务端返回一个ack标志,然后mq将消息从队列中删除。如果消费者在获取到消息以后,然后在进行业务处理中消费者宕机了,这时这个消息没有被消费,但是由于之前已经返回了ack,所以mq中删除了这个消息,这时消息出现丢失。

    保证生产者发送的消息不丢失

    生产者确认机制:可以让生产者感知到消息是否正确发送给了交换机,如果消息正常到mq服务端的交换机,那么mq会返回一个ack给生产者表示发送消息接收到了。如果消息没有正常到交换机,那么mq服务端会给生产者一个nack。当返回nack时,在生产者端可重试消息的发送或其它处理。

     生产者回退机制:可以让生产者感知到消息是否正确投递给了队列,交换机并不存储消息,消息只有到达队列才算是发送成功。如果交换机投递消息到队列失败了,这时队列会返回一个nack到生产者,在生产者端可重试消息的发送或其它处理。

    配置文件中添加确认机制的配置

    spring:
      rabbitmq:
        # 解决生产者发送消息到交换机失败的问题
        # 异步回调,MQ返回结果到生产者端时会回调这个ConfirmCallback接口的实现
        publisher-confirm-type: correlated 
        # 解决交换机到队列路由失败的问题,失败则调用ReturnCallback函数
        publisher-returns: true  

    自定义RabbitTemplate,实现回调机制,配置确认交换机和队列

    @Slf4j
    @Configuration
    public class RabbitmqConfig {
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            /**
             * 接口ConfirmCallback confirmCallback:生产者把消息发送到交换机的结果回调
             * correlationData:可以封装消息的ID,需要在发送消息时传入此参数,这里才能接收到,否则是null
             * boolean ack:消息发送的结果状态,发送成功是true,失败是false
             * String cause:发送失败的描述信息,如果发送成功是null。
             */
            rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> {
                if (ack) {
                    // ack 消息发送到交换机成功
                    log.info("消息发送到交换机成功, ID:{}", correlationData.getId());
                } else {
                    // nack 消息发送到交换机失败
                    log.info("消息发送到交换机失败, ID:{}, 原因{}",correlationData.getId(), cause);
                    /**
                     * 消息发送失败后 --> 根据业务具体分析,然后处理
                     * 比如:所有要发送的消息在发送前都会记录在数据库的一张表里
                     *      记录字段包括:消息状态表示是否发送成功,消息ID,消息重试次数,消息内容等
                     *      发送失败后在这里可以根据ID从数据库重新获取到这条消息,然后如果重试次数为3,
                     *      那么这里就重试发送3次,如果中间有一次成功了,在成功逻辑里将状态改为成功,
                     *      或是超过3次,还是失败则不再重试。
                     */
                }
            });
            //定义消息从交换机路由到队列时失败的策略。true,则调用ReturnCallback;false:则直接丢弃消息
            rabbitTemplate.setMandatory(true);
            //解决交换机到队列路由失败的问题,失败则调用ReturnCallback函数
            rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
                log.info("交换机发送消息到队列失败,应答码{},原因{},交换机{},路由键{},消息{}",
                        replyCode, replyText, exchange, routingKey, message.toString());
                // 如果有业务需要,可以重发消息
            });
            return rabbitTemplate;
        }
        @Bean
        public DirectExchange confirmDirectExchange(){
            return new DirectExchange("confirm_direct_exchange");
        }
        @Bean
        public Queue confirmDirectQueue(){
            return new Queue("confirm_direct_queue");
        }
        @Bean
        public Binding bindingConfirmQueue(){
            return BindingBuilder.bind(confirmDirectQueue())
                                 .to(confirmDirectExchange())
                                 .with("info_confirm");
        }
    }
    

    生产者实现代码

    @SpringBootTest
    public class PublisherTest {
        @Autowired
        private RabbitTemplate rabbitTemplate;
        @Test
        public void testConfirmDirectExchange() {
            CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
            String message = "hello confirm !";
            //输出:消息发送到交换机成功, ID:4d435bc4-0c92-4f59-91ba-a592cc12af40
            //rabbitTemplate.convertAndSend("confirm_direct_exchange","info_confirm",message,correlationData);
            /**
             * 消息发送到交换机失败, ID:a076d4bf-ae3d-4c3b-ac22-4a38d5095287,
             * 原因channel error; protocol method: #method(reply-code=404,
             * reply-text=NOT_FOUND - no exchange 'confirm_direct_exchange_1' in vhost '/',
             * class-id=60, method-id=40)
             * 指在虚拟主机'/'中,没有交换机'confirm_direct_exchange_1'
             */
            //rabbitTemplate.convertAndSend("confirm_direct_exchange_1","info_confirm",message,correlationData);
            /**
             * 消息发送到交换机成功, ID:3dc943df-c2cf-4562-937b-1485764413c5
             * 交换机发送消息到队列失败,应答码312,原因NO_ROUTE,交换机confirm_direct_exchange,
             * 路由键info_confirm_1,消息(Body:'hello confirm !' ......)
             */
            rabbitTemplate.convertAndSend("confirm_direct_exchange","info_confirm_1",message,correlationData);
        }
    }
    

    消息持久化

    生产者确认可以确保消息投递到RabbitMQ的队列中,但是消息发送到RabbitMQ以后,如果突然宕机,也可能导致消息丢失。要想确保消息在RabbitMQ中安全保存,必须开启消息持久化机制。

    对交换机,队列,消息三者持久化。

    交换机与队列持久化

    SpringAMQP中声明的队列和交换机默认都是持久化的

    @Bean
    public DirectExchange confirmDirectExchange(){
       return new DirectExchange("confirm_direct_exchange");
    }

    RabbitMQ高级特性,第2张

    RabbitMQ高级特性,第3张

    如上交换机默认持久化,也可以选择多参数的构造方法自己设置,队列也是同样的设置。

    消息的持久化

    利用SpringAMQP发送消息时,可以设置消息的属性(MessageProperties),指定delivery-mode:持久化和非持久化,代码如下:

    @Test
    public void testDurableMessage() {
        Message message = MessageBuilder.withBody("hello,durable,message"
                                        .getBytes(StandardCharsets.UTF_8))
                                        //消息持久化模式
                                        .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                                        .build();
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend("confirm_direct_exchange","info_confirm",message,correlationData);
    }

    所有发送的消息都会转为Message类型,底层默认情况下,SpringAMQP发出的任何消息都是持久化的,不用特意创建设置。

    RabbitMQ高级特性,第4张

    保证消费者消费的消息不丢失

    消费者与mq之间的消息丢失问题描述:

    RabbitMQ投递消息给消费者 ---> 消费者获取消息后,返回ack给RabbitMQ ---> RabbitMQ删除消息 ---> 消费者宕机,消息尚未处理,消息丢失。

    所以消费者返回ack的时机非常重要,如果能等到处理完消息后再返回ack,消息就不会丢失。

    SpringAMQP则允许配置三种消息确认模式:

    • none:mq假定消费者获取消息后会成功处理,因此消息投递后立即被删除
    • auto:由spring监测listener代码是否出现异常,没有异常则返回ack,删除队列消息;抛出异常则返回nack,不会将队列的消息删除。
    • manual:手动实现,需要在业务代码结束后,调用api发送ack。

      none模式

      yml文件配置新增

      spring:
        rabbitmq:
          listener:
            simple:
              acknowledge-mode: none 

      模拟一个消息处理异常

      @RabbitListener(queues = "simple.queue")
      public void listenSimpleQueue(String msg) {
          log.info("消费者接收到simple.queue的消息:{}", msg);
          // 模拟异常
          System.out.println(1 / 0);
          log.debug("消息处理完成!");
      }

      测试可以发现,当消息处理抛异常时,消息依然被mq删除了。

      auto模式(默认模式)

      yml配置新增(或者不配置,默认就是auto模式)

      spring:
        rabbitmq:
          listener:
            simple:
              acknowledge-mode: auto

      测试,抛出异常后,mq上消息没有删除

      RabbitMQ高级特性,第5张

      当消费者出现异常后,消息会不断(重入队)到队列,再重新发送给消费者,然后再次异常,再次入队到队列,无限循环,导致mq的消息处理飙升,带来不必要的压力:

      RabbitMQ高级特性,第6张

      可以使用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的到mq队列。

      application.yml文件,添加内容:

      spring:
        rabbitmq:
          listener:
            simple:
              acknowledge-mode: auto
              retry:
                enabled: true # 开启消费者失败重试
                initial-interval: 1000 # 初始的失败等待时长为1秒
                max-attempts: 3 # 最大重试次数

      重新启动消费者服务,消费者会消费之前因错误未正常消费的消息。因为设置了重试机制,所以在重试完后,不会在循环发送消息,mq上会直接删除这条消息。所以重试达到最大次数后,Spring会返回ack,消息会被丢弃。 最后抛出重试耗尽的异常。

      RabbitMQ高级特性,第7张

      在以上的测试中,达到最大重试次数后,消息会被丢弃,这是由Spring内部机制决定的。

      那么如果不想丢失这些发送失败的消息,而需要另做处理的话,可以通过MessageRecovery接口来实现。将失败后的消息投递到一个指定的,专门存放异常消息的队列,后续由人员自由处理。

      实现代码如下:

      @Bean
      public DirectExchange errorMessageExchange(){
          return new DirectExchange("errorMessageExchange");
      }
      @Bean
      public Queue errorMessageQueue(){
          return new Queue("errorMessageQueue");
      }
      @Bean
      public Binding errorBinding(){
          return BindingBuilder.bind(errorMessageQueue())
                               .to(errorMessageExchange())
                               .with("error");
      }
      @Bean
      public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
          //重试耗尽后,将失败消息投递到指定的交换机
          return new RepublishMessageRecoverer(rabbitTemplate, "errorMessageExchange", "error");
      }

      测试,可以看到所有因异常导致消息处理错误,并重试完最大次数后还是失败的消息,都发送到了这个专门处理错误的队列里,这样即使是重试后仍然失败的消息也不会丢失。

      RabbitMQ高级特性,第8张

      消费端限流问题

      实际项目中,可能在mq中堆积了成千上万的消息,如果不进行限流,当我们打开消费端时,这些消息都会一下子过来,造成服务器的宕机,所以需要进行消费者限流处理。

      Rabbitmq提供了一种QoS(服务质量保证功能),来应对这种巨量的消息瞬间全部喷涌推送过来,通过Rabbitmq控制消费端的消费速度,进行必要的限流。

      在yml配置中新增配置如下:

      spring:
        rabbitmq:
          listener:
            simple:
              acknowledge-mode: manual # 开启手动ack
              prefetch: 3 # 每次只能获取3条消息,处理完成才能获取下次的3个消息
              concurrency: 1 # 消费者最小数量为1
              max-concurrency: 10 # 消费者最大数量为10

      消费者端代码实现,新增两个消费者

      @RabbitListener(bindings = @QueueBinding(
              value = @Queue("current_limit_queue"),
              exchange = @Exchange(value = "current_limit_exchange",type = "topic"),
              key = {"current.limit.#"}
      ))
      /**
       * message:封装了消息的相关信息,包括消息ID
       * channel:表示信道,封装Rabbitmq通信的相关消息的配置信息,如果当前消息被成功消费,
       *          通过信道进行标记(已消费),Rabbitmq获取到响应ack的确认信息。
       */
      public void currentLimitConsumer1(Message message, Channel channel) throws Exception {
          //模拟业务耗时3秒
          TimeUnit.SECONDS.sleep(3);
          //消息的唯一标识
          long deliveryTag = message.getMessageProperties().getDeliveryTag();
          //手动确认消息是否接收,通过消息的id来指定这条消息被成功处理了,true表示这条消息被成功消费
          channel.basicAck(deliveryTag,true);
          log.info("消费者currentLimitConsumer1接收到消息:{}", new String(message.getBody()));
      }
      @RabbitListener(bindings = @QueueBinding(
              value = @Queue("current_limit_queue"),
              exchange = @Exchange(value = "current_limit_exchange",type = "topic"),
              key = {"current.limit.#"}
      ))
      public void currentLimitConsumer2(Message message, Channel channel) throws Exception {
          TimeUnit.SECONDS.sleep(3);
          long deliveryTag = message.getMessageProperties().getDeliveryTag();
          channel.basicAck(deliveryTag,true);
          log.info("消费者currentLimitConsumer2接收到消息:{}", new String(message.getBody()));
      }

      生产者端代码实现

      @Test
      public void testMessageLimitPushlisher() {
          String exchange = "current_limit_exchange";
          String key = "current.limit.key";
          for (int i = 1; i <= 1000; i++) {
              String message = "消息限流实现,这是第【" + i + "】条消息";
              rabbitTemplate.convertAndSend(exchange,key,message);
          }
      }

      测试:先启动消费者监听队列消息,然后启动生产者发送消息。限流效果如下:

      Ready:表示处于队列中等待被消费者消费的消息数量。

      Unacked:表示已被消费者取出但还未确认的消息数量。

      RabbitMQ高级特性,第9张

      死信交换机

      死信交换机

      当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

      • 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false,表示消费失败的消息不重新加入队列,这样这个消息可以成为死信。

      • 消息是一个过期消息,超时无人消费

      • 要投递的队列消息满了,无法投递

        如果这个包含死信的队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(DLX)。

        大致流程如下:

        RabbitMQ高级特性,第10张

        给队列绑定一个死信交换机,这样这个队列如果出现死信的消息,就将这个消息发送到死信交换机

        • 给队列设置dead-letter-exchange属性,指定一个交换机
        • 给队列设置dead-letter-routing-key属性,设置死信交换机与死信队列的RoutingKey

          配置文件中,配置ack模式acknowledge-mode=auto,配置retry重试机制。

          定义普通队列,死信交换机,死信队列

          @Bean
          public Queue commonQueue(){
              return QueueBuilder.durable("common.queue")
                      .deadLetterExchange("dl.direct") // 指定死信交换机
                      .deadLetterRoutingKey("dl_key")
                      .build();
          }
          @Bean
          public DirectExchange dlExchange(){
              return new DirectExchange("dl.direct");
          }
          @Bean
          public Queue dlQueue(){
              return new Queue("dl.queue");
          }
          @Bean
          public Binding dlBinding(){
              return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("dl_key");
          }

          监听common.queue队列,由于有异常,因为模式为auto,设置了重试机制,所以消费失败后会按设置的重试次数去重试,重试完还是失败,这时这个消息会成为死信,因为这个队列绑定了死信交换机,所以这个死信的消息会发送到死信交换机里,间接传递到死信队列里

          @RabbitListener(queues = "common.queue")
          public void listenCommonQueue(String msg) {
              log.info("消费者listenCommonQueue接收到消息:{}", msg);
              int i = 1 / 0;
              log.debug("消息处理完成!");
          }

          发送消息代码

          @Test
          public void testCommonQueue() {
              String queueName = "common.queue";
              String message = "hello, common.queue!";
              rabbitTemplate.convertAndSend(queueName, message);
          }

          最终消息到了死信队列里,防止了消息的丢失

          RabbitMQ高级特性,第11张

           消息的延迟消费

          要求:发送消息后,消费者需要过10秒后才能消费到。

          实现流程:消息发送到延迟队列(ttl.queue),过10秒后消息过期,将这个消息发送到死信交换机,转到死信队列,消费者监听死信队列,这样就可以在10s后获取到这个消息。

          配置延迟队列,并绑定死信交换机

          @Bean
          public Queue ttlQueue(){
              return QueueBuilder.durable("ttl.queue")
                      .ttl(10000) // 设置队列的超时时间,10秒
                      .deadLetterExchange("dl.direct")
                      .deadLetterRoutingKey("dl_key")
                      .build();
          }
          @Bean
          public DirectExchange ttlExchange(){
              return new DirectExchange("ttl.exchange");
          }
          @Bean
          public Binding ttlBinding(){
              return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
          }

          发送消息到延迟队列,队列设置了延迟时间10s,10s后这条消息成为死信,传递到死信交换机

          @Test
          public void testTtlQueue() {
              String message = "这是一条延迟的消息";
              rabbitTemplate.convertAndSend("ttl.exchange","ttl",message);
          }

          消费端监听死信队列,10s后获取到过期消息。

          @RabbitListener(queues = "dl.queue")
          public void listenDlQueue(String msg){
              log.info("消费者从死信队列里获取到消息:{}",msg);
          }

          消息超时方法:除了给队列设置ttl属性,也可以在消息处设置过期时间

          如果两个都设置了,哪个时间短就使用哪个。

          @Test
          public void testTTLMsg() {
              Message message = MessageBuilder
                      .withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8))
                      //设置这条消息的过期时间为5秒
                      .setExpiration("5000")
                      .build();
              rabbitTemplate.convertAndSend("ttl.exchange","ttl",message);
          }

          延迟队列

          通过死信交换机和TTL实现消息的延时消费,配置起来相对麻烦,可以直接用延迟队列实现。

          RabbitMQ的官方也推出了DelayExchange插件,实现延迟队列效果。

          1. 生产者将消息(msg)和路由键(routekey)发送指定的延时交换机(exchange)上
          2. 延时交换机(exchange)存储消息等待消息到期根据路由键(routekey)找到绑定自己的队列

            (queue)并把消息给它

          3. 队列(queue)再把消息发送给监听它的消费者(customer)

          下载地址:Releases · rabbitmq/rabbitmq-delayed-message-exchange · GitHub

          将插件拷贝到rabbitmq-server的安装路径:/usr/lib/rabbitmq/lib/rabbitmq_server-

          3.8.4/plugins

          启用插件:rabbitmq-plugins enable rabbitmq_delayed_message_exchange

          查看插件:rabbitmq-plugins list    

          RabbitMQ高级特性,第12张

          重启Rabbitmq服务:systemctl restart rabbitmq-server

          交换机指定属性 delayed = " true " ,代码实现如下:

          @RabbitListener(bindings = @QueueBinding(
                  value = @Queue("delay_queue"),
                  //交换机的类型可以是任意类型,只需要设定delayed属性为true即可。
                  exchange = @Exchange(value = "delay_exchange",delayed = "true"),
                  key = "delay"
          ))
          public void listenDelayedQueue(String msg) {
              log.info("消费者listenDelayedQueue接收到消息:{}",msg);
          }

          发送消息,要携带x-delay属性,指定延迟的时间:

          @Test
          public void testDelayedMsg() {
              Message message = MessageBuilder
                      .withBody("hello, delayed message".getBytes(StandardCharsets.UTF_8))
                      .setHeader("x-delay",10000)//延迟10s
                      .build();
              rabbitTemplate.convertAndSend("delay_exchange","delay",message);
          }

          消息的重复消费问题

          消息的重复消费问题:指一条消息被消费者多次消费,在一些特殊的业务中是不能允许的。

          重复消费场景示例如下:

          RabbitMQ高级特性,第13张

          解决消息重复消费问题:需要在消费端考虑消息的幂等性

          幂等性:指对一个接口的多次调用其结果都是一样的。

          解决方法:让生产者发送每条消息的时候,里面加一个全局唯一的 id,类似订单 id 之类的东西,然后你这里消费到了之后,先根据这个 id 去比如 redis 里查一下,之前消费过吗?(或是直接设计一张数据库表,表字段设为唯一约束存这个id,查到就表示消费过了),如果没有消费过,你就处理,然后这个 id 写 redis。如果消费过了,那就不处理了,保证别重复处理相同的消息即可。

网友评论

搜索
最新文章
热门文章
热门标签