rabbitMQ在发送消息时,会出现交换机不存在(交换机名字写错等消息),这种情况如何会退给生产者重新处理?【交换机层】
生产者发送消息时,消息未送达到指定的队列,如何消息回退?
核心:对类RabbitTemplate.ConfirmCallback 和RabbitTemplate.ReturnCallback的重写。
RabbitTemplate.ConfirmCallback:交换机在收到消息或者没收到消息时会被触发
RabbitTemplate.ReturnCallback:消息进入交换机,不能达到指定目的地时被出发。
开启交换机确认
开启消息不可达回退
配置文件不开启 这两项
spring: rabbitmq: # 交换机进行确认消息 publisher-confirm-type: correlated # 交换机不可以路由消息时 消息回退 publisher-returns: true
配置类声明
package com.esint.configs; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 发布确认 * */ @Configuration public class ConfirmConfig { //交换机 public static final String CONFIRM_EXCHANGE = "confirm.exchange"; //队列 public static final String CONFIRM_QUEUE = "confirm.queue"; //routing-key public static final String CONFIRM_ROUTING_KEY = "key1"; //声明 交换机 @Bean("confirmExchange") public DirectExchange confirmExchange(){ return new DirectExchange(CONFIRM_EXCHANGE); } //声明 队列 @Bean("confrimQueue") public Queue confrimQueue(){ return QueueBuilder.durable(CONFIRM_QUEUE).build(); } //绑定 @Bean public Binding queueBindingExchange(@Qualifier("confrimQueue") Queue confrimQueue, @Qualifier("confirmExchange") DirectExchange confirmExchange){ return BindingBuilder.bind(confrimQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY); } }
消费者:
package com.esint.controller; import com.esint.configs.ConfirmConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @Slf4j @RestController @RequestMapping("/confirm") public class ProducerController { @Autowired private RabbitTemplate rabbitTemplate; //发消息 @GetMapping("/sendMessage/{message}") public void sendMessage(@PathVariable String message){ //普通发送模式 无是否发送成功回调 CorrelationData correlationData = new CorrelationData("101"); rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE,ConfirmConfig.CONFIRM_ROUTING_KEY+"123",message); log.info("发送消息为:{}",message); } }
消费者:
package com.esint.consumer; import com.esint.configs.ConfirmConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Slf4j @Component public class Consumer { @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE) public void receiveConfrimMessage(Message message){ log.info("接收到的消息为:" + new String(message.getBody())); } }
核心修改的重写的类:
package com.esint.consumer; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; @Slf4j @Component public class MyCallBack implements RabbitTemplate.ConfirmCallback ,RabbitTemplate.ReturnCallback{ /** * 注入:本类为实现了RabbitTemplate的内部类,所以在RabbitTemplate发送消息的时候不会调用到我们自己的实现,所以需要把这个类在注入到RabbitTemplate中。 */ @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void init(){ rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnCallback(this); } /** * RabbitTemplate.ConfirmCallback 是在【生产者】发送【交换机】 交换机的感知回应调去方法 * * 交换机确认回调方法 * 1.交换机接收消息成功 * 参数1 correlationData保存了回调消息ID和相关信息 * 参数2 交换机收到消息 true * 参数3 失败原因 为 null * 2.交换机接受消息失败 * 参数1 correlationData保存了回调消息ID和相关信息 * 参数2 交换机收到消息 false * 参数3 失败原因 * @param correlationData 来源于生产者 所以在发消息时 需要带有这个属性 * @param ack * @param cause */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String id = correlationData != null ? correlationData.getId() : ""; if(ack){ log.info("交换机确认收到 ID:{}" ,id); }else { log.info("交换机未收到ID:{}的消息,原因:{}",id,cause); //这里实现发送交换机失败的存储逻辑 } } /** * 回退消息 * 在消息传递过程不可达目标地时 返还给生产者 只有消息不可达,才会执行这个方法 * * @param message * @param replyCode * @param replyText * @param exchange * @param routingKey */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.error("消息{} 被交换机{} 退回,原因:{} 路由:{}",new String(message.getBody()),exchange,replyText,routingKey); //这里实现发送消息不到达的逻辑 发送消息无法被逻辑 默认就会被交换机丢掉 这里重写后 可以在这里处理存储 } }
故意发送一个错误路由时:
消息能发出 交换机有确认 消息可以被回退
猜你喜欢
- 11天前(郭富城热舞劲歌演唱会)郭富城年度压轴《新濠尊属系列郭富城梦幻舞林演唱会2023》
- 11天前(甘州区文化旅游局)2025甘津文旅资源对接推介会在兰州举办
- 11天前(2025年“文化和自然遗产日”广东主会场活动举办)2025年“文化和自然遗产日”广东主会场活动举办
- 11天前(云南滇陇工程咨询有限公司)陇滇携手谋发展 文旅合作谱新篇
- 11天前(纳米比亚旅游报价)纳米比亚旅游局2024年中国推介会圆满落幕
- 11天前(澳涞坞是什么)从最美山庄到世界舞台:澳涞山庄见证世界十佳旅居城市评选
- 11天前(希尔顿集团2021年筹建的酒店)希尔顿集团两大重点项目亮相第四届上海旅游投资促进大会
- 11天前(筑格集团有限公司)洲际酒店集团旗下筑格酒店品牌正式亮相大中华区
- 11天前(泸沽湖大酒店地址)泸沽湖岚岳酒店盛大开业|以摩梭文化为魂,打造高端度假新地标
- 11天前(我在港航“呵护”飞机 每一次安全着陆就是最好的荣誉)我在港航“呵护”飞机 每一次安全着陆就是最好的荣誉
网友评论
- 搜索
- 最新文章
- (2020广州车展哈弗)你的猛龙 独一无二 哈弗猛龙广州车展闪耀登场
- (哈弗新能源suv2019款)智能科技颠覆出行体验 哈弗重塑新能源越野SUV价值认知
- (2021款全新哈弗h5自动四驱报价)新哈弗H5再赴保障之旅,无惧冰雪护航哈弗全民电四驱挑战赛
- (海南航空现况怎样)用一场直播找到市场扩张新渠道,海南航空做对了什么?
- (visa jcb 日本)优惠面面俱到 JCB信用卡邀您畅玩日本冰雪季
- (第三届“堡里有年味·回村过大年”民俗花灯会活动)第三届“堡里有年味·回村过大年”民俗花灯会活动
- (展示非遗魅力 长安启源助力铜梁龙舞出征)展示非遗魅力 长安启源助力铜梁龙舞出征
- (阿斯塔纳航空公司)阿斯塔纳航空机队飞机数量增至50架
- (北京香港航班动态查询)香港快运航空北京大兴新航线今日首航
- (我在港航“呵护”飞机 每一次安全着陆就是最好的荣誉)我在港航“呵护”飞机 每一次安全着陆就是最好的荣誉
- 热门文章