上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

rabbitMQ发布确认-交换机不存在或者无法抵达队列的缓存处理

guduadmin14小时前

rabbitMQ在发送消息时,会出现交换机不存在(交换机名字写错等消息),这种情况如何会退给生产者重新处理?【交换机层】

生产者发送消息时,消息未送达到指定的队列,如何消息回退?

核心:对类RabbitTemplate.ConfirmCallback 和RabbitTemplate.ReturnCallback的重写。

RabbitTemplate.ConfirmCallback:交换机在收到消息或者没收到消息时会被触发

RabbitTemplate.ReturnCallback:消息进入交换机,不能达到指定目的地时被出发。

开启交换机确认
开启消息不可达回退

配置文件不开启 这两项

spring:
  rabbitmq:
#    交换机进行确认消息
    publisher-confirm-type: correlated
#   交换机不可以路由消息时 消息回退
    publisher-returns: true
配置类声明
package com.esint.configs;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * 发布确认
 *
 */
@Configuration
public class ConfirmConfig {
    //交换机
    public static final String CONFIRM_EXCHANGE = "confirm.exchange";
    //队列
    public static final String CONFIRM_QUEUE = "confirm.queue";
    //routing-key
    public static final String CONFIRM_ROUTING_KEY = "key1";
    //声明 交换机
    @Bean("confirmExchange")
    public DirectExchange confirmExchange(){
        return new DirectExchange(CONFIRM_EXCHANGE);
    }
    //声明 队列
    @Bean("confrimQueue")
    public Queue confrimQueue(){
        return QueueBuilder.durable(CONFIRM_QUEUE).build();
    }
    //绑定
    @Bean
    public Binding queueBindingExchange(@Qualifier("confrimQueue") Queue confrimQueue,
                                        @Qualifier("confirmExchange") DirectExchange confirmExchange){
        return  BindingBuilder.bind(confrimQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
    }
}

消费者:

package com.esint.controller;
import com.esint.configs.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    //发消息
    @GetMapping("/sendMessage/{message}")
    public void sendMessage(@PathVariable String message){
        //普通发送模式 无是否发送成功回调
        CorrelationData correlationData = new CorrelationData("101");
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE,ConfirmConfig.CONFIRM_ROUTING_KEY+"123",message);
        log.info("发送消息为:{}",message);
    }
}

消费者:

package com.esint.consumer;
import com.esint.configs.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class Consumer {
    @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE)
    public void receiveConfrimMessage(Message message){
        log.info("接收到的消息为:" + new String(message.getBody()));
    }
}
核心修改的重写的类:
package com.esint.consumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback ,RabbitTemplate.ReturnCallback{
/**
 *  注入:本类为实现了RabbitTemplate的内部类,所以在RabbitTemplate发送消息的时候不会调用到我们自己的实现,所以需要把这个类在注入到RabbitTemplate中。
 */
   @Autowired
   private RabbitTemplate rabbitTemplate;
   @PostConstruct
   public void init(){
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
   }
    /**
     * RabbitTemplate.ConfirmCallback  是在【生产者】发送【交换机】 交换机的感知回应调去方法
     *
     * 交换机确认回调方法
     * 1.交换机接收消息成功
     *   参数1  correlationData保存了回调消息ID和相关信息
     *   参数2  交换机收到消息 true
     *   参数3  失败原因 为 null
     * 2.交换机接受消息失败
     *   参数1  correlationData保存了回调消息ID和相关信息
     *   参数2  交换机收到消息 false
     *   参数3  失败原因
     * @param correlationData  来源于生产者 所以在发消息时 需要带有这个属性
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : "";
        if(ack){
            log.info("交换机确认收到 ID:{}" ,id);
        }else {
            log.info("交换机未收到ID:{}的消息,原因:{}",id,cause);
            //这里实现发送交换机失败的存储逻辑
        }
    }
    /**
     * 回退消息
     * 在消息传递过程不可达目标地时 返还给生产者  只有消息不可达,才会执行这个方法
     *
     * @param message
     * @param replyCode
     * @param replyText
     * @param exchange
     * @param routingKey
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.error("消息{} 被交换机{} 退回,原因:{} 路由:{}",new String(message.getBody()),exchange,replyText,routingKey);
        //这里实现发送消息不到达的逻辑 发送消息无法被逻辑 默认就会被交换机丢掉 这里重写后 可以在这里处理存储
    }
}

故意发送一个错误路由时:

rabbitMQ发布确认-交换机不存在或者无法抵达队列的缓存处理,在这里插入图片描述,第1张

消息能发出 交换机有确认 消息可以被回退

网友评论

搜索
最新文章
热门文章
热门标签