上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

RabbitMQ基础(2)——发布订阅fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计

guduadmin211月前

目录

  • 引出
  • 点对点(simple)
  • Work queues 一对多
  • 发布订阅/fanout模式
    • 以登陆验证码为例
    • pom文件导包
    • application.yml文件
    • rabbitmq的配置
    • 生产者生成验证码,发送给交换机
    • 消费者消费验证码
    • topic模式
      • 配置类增加配置
      • 生产者发送信息
      • 进行发送
      • 控制台查看
      • rabbitmq回调确认
        • 配置类
        • 验证生产者发送是否成功
        • 延迟队列(死信)设计
          • java代码步骤
            • 创建正常+死信队列
            • 配置类+常量
            • 生产者到正常队列
            • 消费者进行延迟消费
            • 延迟队列插件安装
              • 访问官网
              • 进入rabbitmq docker容器
              • 上传到linux服务器
              • 拷贝插件到容器中
              • 进入容器安装插件
              • 打开管理页面
              • 总结

                引出


                1.rabbitmq队列方式的梳理,点对点,一对多;

                2.发布订阅模式,交换机到消费者,以邮箱和手机验证码为例;

                3.topic模式,根据规则决定发送给哪个队列;

                4.rabbitmq回调确认,setConfirmCallback和setReturnsCallback;

                5.死信队列,延迟队列,创建方法,正常—死信,设置延迟时间;

                点对点(simple)

                点对对方式传输

                RabbitMQ基础(2)——发布订阅fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计,在这里插入图片描述,第1张

                Work queues 一对多

                1个生产者多个消费者

                RabbitMQ基础(2)——发布订阅fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计,在这里插入图片描述,第2张

                RabbitMQ基础(2)——发布订阅fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计,在这里插入图片描述,第3张

                发布订阅/fanout模式

                生产者通过fanout扇出交换机群发消息给消费者,同一条消息每一个消费者都可以收到。

                RabbitMQ基础(2)——发布订阅fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计,在这里插入图片描述,第4张

                以登陆验证码为例

                pom文件导包

                
                        
                            org.springframework.boot
                            spring-boot-starter-mail
                        
                        
                        
                            com.aliyun
                            aliyun-java-sdk-core
                            4.5.3
                        
                
                        
                            org.springframework.boot
                            spring-boot-starter-amqp
                        
                

                application.yml文件

                server:
                  port: 9099
                spring:
                  # 模块的名字
                  application:
                    name: user-auth
                  # 邮箱的配置
                  mail:
                    host: smtp.qq.com
                    port: 587
                    username: xxxx
                    password: xxxxx
                  # rabbitmq的配置
                  rabbitmq:
                    host: 192.168.111.130
                    port: 5672
                    username: admin
                    password: 123
                logging:
                  level:
                    com.tianju.auth: debug
                

                rabbitmq的配置

                需要用到的常量

                package com.tianju.auth.util;
                /**
                 * rabbitmq的常量
                 */
                public interface RabbitMqConstants {
                    String MQ_MAIL_QUEUE="mq_email_queue";
                    String MQ_PHONE_QUEUE="mq_phone_queue";
                    String MQ_FANOUT_EXCHANGE="mq_fanout_exchange";
                    // 参数 String name, boolean durable, boolean exclusive, boolean autoDelete
                    boolean durable = true;
                    boolean exclusive = false;
                    boolean autoDelete = false;
                }
                

                RabbitMqConfig.java配置

                邮箱队列,电话队列,交换机;

                邮箱绑定交换机,电话绑定交换机;

                创建队列参数说明:

                参数说明
                name字符串值,queue的名称。
                durable布尔值,表示该 queue 是否持久化。 持久化意味着当 RabbitMQ 重启后,该 queue 是否会恢复/仍存在。 另外,需要注意的是,queue 的持久化不等于其中的消息也会被持久化。
                exclusive布尔值,表示该 queue 是否排它式使用。排它式使用意味着仅声明他的连接可见/可用,其它连接不可见/不可用。
                autoDelete布尔值,表示当该 queue 没“人”(connection)用时,是否会被自动删除。

                不指定 durable、exclusive 和 autoDelete 时,默认为 truefalsefalse 。表示持久化、非排它、不用自动删除。

                创建交换机参数说明

                参数说明
                name字符串值,exchange 的名称。
                durable布尔值,表示该 exchage 是否持久化。 持久化意味着当 RabbitMQ 重启后,该 exchange 是否会恢复/仍存在。
                autoDelete布尔值,表示当该 exchange 没“人”(queue)用时,是否会被自动删除。

                不指定 durable 和 autoDelete 时,默认为 truefalse 。表示持久化、不用自动删除

                package com.tianju.auth.config;
                import com.tianju.auth.util.RabbitMqConstants;
                import org.springframework.amqp.core.Binding;
                import org.springframework.amqp.core.BindingBuilder;
                import org.springframework.amqp.core.FanoutExchange;
                import org.springframework.amqp.core.Queue;
                import org.springframework.context.annotation.Bean;
                import org.springframework.context.annotation.Configuration;
                @Configuration
                public class RabbitMqConfig {
                    @Bean // 邮箱的队列
                    public Queue mailQueue(){
                        return new Queue(RabbitMqConstants.MQ_MAIL_QUEUE,
                                RabbitMqConstants.durable,
                                RabbitMqConstants.exclusive,
                                RabbitMqConstants.autoDelete);
                    }
                    @Bean // 电话的队列
                    public Queue phoneQueue(){
                        return new Queue(RabbitMqConstants.MQ_PHONE_QUEUE,
                                RabbitMqConstants.durable,
                                RabbitMqConstants.exclusive,
                                RabbitMqConstants.autoDelete);
                    }
                    @Bean // 交换机
                    public FanoutExchange fanoutExchange(){
                        return new FanoutExchange(RabbitMqConstants.MQ_FANOUT_EXCHANGE,
                                RabbitMqConstants.durable,
                                RabbitMqConstants.autoDelete);
                    }
                    @Bean
                    public Binding mailBinding(){
                        return BindingBuilder.bind(mailQueue())
                                .to(fanoutExchange());
                    }
                    @Bean
                    public Binding phoneBinding(){
                        return BindingBuilder.bind(phoneQueue())
                                .to(fanoutExchange());
                    }
                }
                

                生产者生成验证码,发送给交换机

                接口

                package com.tianju.auth.service;
                public interface IUserService {
                    /**
                     * 生产者生成信息发送给交换机
                     * @param msg 信息,这里是验证码
                     */
                    void sendCode(String msg);
                }
                

                实现

                package com.tianju.auth.service.impl;
                import com.tianju.auth.service.IUserService;
                import com.tianju.auth.util.RabbitMqConstants;
                import lombok.extern.slf4j.Slf4j;
                import org.springframework.amqp.rabbit.core.RabbitTemplate;
                import org.springframework.beans.factory.annotation.Autowired;
                import org.springframework.stereotype.Service;
                import java.util.ArrayList;
                import java.util.List;
                @Service
                @Slf4j
                public class UserServiceImpl implements IUserService {
                    @Autowired
                    private RabbitTemplate rabbitTemplate;
                    @Override
                    public void sendCode(String msg) {
                        rabbitTemplate.convertAndSend(
                                RabbitMqConstants.MQ_FANOUT_EXCHANGE,
                                "routingkey.fanout",
                                msg);
                        log.debug("[生产者向交换机:] 发送一条信息:{}",msg);
                    }
                }
                

                测试类生成验证码,发给交换机

                RabbitMQ基础(2)——发布订阅fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计,在这里插入图片描述,第5张

                package com.tianju.auth.service.impl;
                import cn.hutool.core.lang.Snowflake;
                import com.tianju.auth.service.IUserService;
                import org.junit.Test;
                import org.junit.runner.RunWith;
                import org.springframework.beans.factory.annotation.Autowired;
                import org.springframework.boot.test.context.SpringBootTest;
                import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
                @SpringBootTest
                @RunWith(SpringJUnit4ClassRunner.class)
                public class UserServiceImplTest {
                    @Autowired
                    private IUserService userService;
                    @Test
                    public void sendCode() {
                        String code = new Snowflake().nextIdStr().substring(0, 6);
                        System.out.println(code);
                        userService.sendCode(code);
                    }
                }
                

                消费者消费验证码

                package com.tianju.auth.consumer;
                import com.tianju.auth.service.IEmailService;
                import com.tianju.auth.util.RabbitMqConstants;
                import com.tianju.auth.util.SMSUtil;
                import lombok.extern.slf4j.Slf4j;
                import org.springframework.amqp.rabbit.annotation.RabbitListener;
                import org.springframework.beans.factory.annotation.Autowired;
                import org.springframework.stereotype.Service;
                @Slf4j
                @Service
                public class UserConsumer {
                    @Autowired
                    private IEmailService emailService;
                    @RabbitListener(queues = RabbitMqConstants.MQ_MAIL_QUEUE)
                    public void emailConsumer(String msg){
                        log.debug("[email消费者:]消费{}",msg);
                        emailService.sendEmail("xxxx@qq.com", "登陆验证码", msg);
                    }
                    @RabbitListener(queues = RabbitMqConstants.MQ_PHONE_QUEUE)
                    public void phoneConsumer(String msg){
                        log.debug("[phone消费者:]消费{}",msg);
                        SMSUtil.send("xxxx", msg);
                    }
                }
                

                RabbitMQ基础(2)——发布订阅fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计,在这里插入图片描述,第6张

                topic模式

                RabbitMQ基础(2)——发布订阅fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计,在这里插入图片描述,第7张

                例如: routingkey: my.orange.rabbit —-> Q1,Q2

                RabbitMQ基础(2)——发布订阅fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计,在这里插入图片描述,第8张

                配置类增加配置

                package com.tianju.auth.util;
                /**
                 * rabbitmq的常量
                 */
                public interface RabbitMqConstants {
                    String MQ_MAIL_QUEUE="mq_email_queue";
                    String MQ_PHONE_QUEUE="mq_phone_queue";
                    String MQ_FANOUT_EXCHANGE="mq_fanout_exchange";
                    
                    String MQ_TOPIC_EXCHANGE="mq_topic_exchange";
                    String MQ_TOPIC_QUEUE_A = "mq_topic_queue_a";
                    String MQ_TOPIC_QUEUE_B = "mq_topic_queue_b";
                    // 参数 String name, boolean durable, boolean exclusive, boolean autoDelete
                    boolean durable = true;
                    boolean exclusive = false;
                    boolean autoDelete = false;
                }
                
                package com.tianju.auth.config;
                import com.tianju.auth.util.RabbitMqConstants;
                import org.springframework.amqp.core.*;
                import org.springframework.context.annotation.Bean;
                import org.springframework.context.annotation.Configuration;
                @Configuration
                public class RabbitMqConfig {
                    @Bean // 邮箱的队列
                    public Queue mailQueue(){
                        return new Queue(RabbitMqConstants.MQ_MAIL_QUEUE,
                                RabbitMqConstants.durable,
                                RabbitMqConstants.exclusive,
                                RabbitMqConstants.autoDelete);
                    }
                    @Bean // 电话的队列
                    public Queue phoneQueue(){
                        return new Queue(RabbitMqConstants.MQ_PHONE_QUEUE,
                                RabbitMqConstants.durable,
                                RabbitMqConstants.exclusive,
                                RabbitMqConstants.autoDelete);
                    }
                    @Bean // 交换机
                    public FanoutExchange fanoutExchange(){
                        return new FanoutExchange(RabbitMqConstants.MQ_FANOUT_EXCHANGE,
                                RabbitMqConstants.durable,
                                RabbitMqConstants.autoDelete);
                    }
                    @Bean
                    public Binding mailBinding(){
                        return BindingBuilder.bind(mailQueue())
                                .to(fanoutExchange());
                    }
                    @Bean
                    public Binding phoneBinding(){
                        return BindingBuilder.bind(phoneQueue())
                                .to(fanoutExchange());
                    }
                    @Bean // A队列
                    public Queue topicAQueue(){
                        return new Queue(RabbitMqConstants.MQ_TOPIC_QUEUE_A,
                                RabbitMqConstants.durable,
                                RabbitMqConstants.exclusive,
                                RabbitMqConstants.autoDelete);
                    }
                    /**
                     * topic模式相关配置
                     */
                    @Bean // B队列
                    public Queue topicBQueue(){
                        return new Queue(RabbitMqConstants.MQ_TOPIC_QUEUE_B,
                                RabbitMqConstants.durable,
                                RabbitMqConstants.exclusive,
                                RabbitMqConstants.autoDelete);
                    }
                    @Bean // topic的交换机
                    public TopicExchange topicMyExchange(){
                        return new TopicExchange(RabbitMqConstants.MQ_TOPIC_EXCHANGE,
                                RabbitMqConstants.durable,
                                RabbitMqConstants.autoDelete);
                    }
                    @Bean
                    public Binding topicAQueueBinding(){
                        return BindingBuilder
                                .bind(topicAQueue())
                                .to(topicMyExchange())
                                .with("topic.xxx"); // 规则 topic.xxx
                    }
                    @Bean
                    public Binding topicBQueueBinding(){
                        return BindingBuilder
                                .bind(topicBQueue())
                                .to(topicMyExchange())
                                .with("topic.*"); // 规则 topic.xxx
                    }
                }
                

                生产者发送信息

                RabbitMQ基础(2)——发布订阅fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计,在这里插入图片描述,第9张

                    /**
                     * topic模式下,生产者发送信息给交换机,可以决定给哪个队列发信息
                     * @param msg 发送的信息
                     * @param routingKey 类似正则表达式,决定给谁发
                     *                   .with("topic.xxx"); // 规则 topic.xxx ---- A队列
                     *                   .with("topic.*"); // 规则 topic.xxx   ---- B队列
                     *                   在配置类中,如上所述配置,则如果输入的routingKey为 topic.xxx则给A和B发;
                     *                                      如果输入的routingKey为 topic.yyy 则 只给B队列发;
                     */
                    void sendMsg(String msg,String routingKey);
                

                实现

                package com.tianju.auth.service.impl;
                import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
                import com.tianju.auth.entity.UserPrivs;
                import com.tianju.auth.mapper.UserMapper;
                import com.tianju.auth.service.IUserService;
                import com.tianju.auth.util.RabbitMqConstants;
                import lombok.extern.slf4j.Slf4j;
                import org.springframework.amqp.rabbit.core.RabbitTemplate;
                import org.springframework.beans.factory.annotation.Autowired;
                import org.springframework.stereotype.Service;
                import java.util.ArrayList;
                import java.util.List;
                @Service
                @Slf4j
                public class UserServiceImpl implements IUserService {
                    @Autowired
                    private RabbitTemplate rabbitTemplate;
                    @Override
                    public void sendCode(String msg) {
                        rabbitTemplate.convertAndSend(
                                RabbitMqConstants.MQ_FANOUT_EXCHANGE,
                                "routingkey.fanout",
                                msg);
                        log.debug("[生产者向交换机:] 发送一条信息:{}",msg);
                    }
                    @Override
                    public void sendMsg(String msg,String routingKey) {
                        rabbitTemplate.convertAndSend(
                                RabbitMqConstants.MQ_TOPIC_EXCHANGE,
                                routingKey, // "topic.yyy",此时只有B队列有信息
                                msg);
                        log.debug("[生产者向交换机:] 发送一条信息:{}",msg);
                    }
                }
                

                进行发送

                package com.tianju.auth.service.impl;
                import cn.hutool.core.lang.Snowflake;
                import com.tianju.auth.service.IUserService;
                import org.junit.Test;
                import org.junit.runner.RunWith;
                import org.springframework.beans.factory.annotation.Autowired;
                import org.springframework.boot.test.context.SpringBootTest;
                import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
                @SpringBootTest
                @RunWith(SpringJUnit4ClassRunner.class)
                public class UserServiceImplTest {
                    @Autowired
                    private IUserService userService;
                    @Test
                    public void sendCode() {
                        String code = new Snowflake().nextIdStr().substring(0, 6);
                        System.out.println(code);
                        userService.sendCode(code);
                    }
                    @Test
                    public void sendTopic() {
                        String code = new Snowflake().nextIdStr().substring(0, 6);
                        System.out.println(code);
                        userService.sendMsg(code,"topic.yyy");
                    }
                }
                

                RabbitMQ基础(2)——发布订阅fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计,在这里插入图片描述,第10张

                控制台查看

                RabbitMQ基础(2)——发布订阅fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计,在这里插入图片描述,第11张

                rabbitmq回调确认

                配置类

                spring:
                  # rabbitmq的配置
                  rabbitmq:
                    host: 192.168.111.130
                    port: 5672
                    username: admin
                    password: 123
                    # 确认收到
                    publisher-confirm-type: correlated
                    publisher-returns: true
                

                验证生产者发送是否成功

                使用RabbitTemplate的回调方法。

                先设置

                • setConfirmCallback
                • setReturnsCallback

                RabbitMQ基础(2)——发布订阅fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计,在这里插入图片描述,第12张

                    @Autowired
                    private RabbitTemplate rabbitTemplate;
                    @Override
                    public void sendCode(String msg) {
                        rabbitTemplate.convertAndSend(
                                RabbitMqConstants.MQ_FANOUT_EXCHANGE,
                                "routingkey.fanout",
                                msg);
                        log.debug("[生产者向交换机:] 发送一条信息:{}",msg);
                    }
                    @Override
                    public void sendMsg(String msg,String routingKey) {
                        // 如果发到交换机,看一下有没有反馈
                        rabbitTemplate.setConfirmCallback((c,ack,message)->{
                            log.debug("***** setConfirmCallback:ack--{}", ack); // 是否发送到交换机
                            log.debug("***** setConfirmCallback:c-->{}",c);
                            // channel error; protocol method: #method(reply-code=404,
                            // reply-text=NOT_FOUND - no exchange 'aaaa' in vhost '/', class-id=60, method-id=40)
                            log.debug("***** setConfirmCallback:m-->{}",message);
                            if (ack){
                                log.debug("[生产者:] 发送信息到交换机{}","RabbitMqConstants.MQ_TOPIC_EXCHANGE");
                            }else {
                                log.debug(message);
                            }
                        });
                        rabbitTemplate.setReturnsCallback(r->{
                            log.debug("返回文字{}", r.getReplyText());
                            log.debug("返回code{}", r.getReplyCode());
                            log.debug("返回Exchange{}", r.getExchange());
                            log.debug("返回RoutingKey{}", r.getRoutingKey());
                        });
                        rabbitTemplate.convertAndSend(
                                RabbitMqConstants.MQ_TOPIC_EXCHANGE,
                //                "aaaa",// 失败的情况
                                routingKey, // "topic.yyy",此时只有B队列有信息
                                msg);
                        log.debug("[生产者向交换机:] 发送一条信息:{}",msg);
                    }
                

                RabbitMQ基础(2)——发布订阅fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计,在这里插入图片描述,第13张

                rabbitTemplate.setConfirmCallback((c,ack,message)->{
                    log.debug("******* setConfirmCallback:ack->{}",ack);
                    log.debug("******* setConfirmCallback:c->{}",c);
                    log.debug("******* setConfirmCallback:chanel->{}",message);
                    if(ack){
                        log.debug("[生产者]发送信息到达交换机{}","RabbitMqConstants.MQ_TOPIC_EXCHANGE");
                    }else {
                        log.debug(message);
                    }
                });
                rabbitTemplate.setReturnsCallback(r->{
                    log.debug("返回文字:{}",r.getReplyText());
                    log.debug("返回code:{}",r.getReplyCode());
                    log.debug("返回Exchange:{}",r.getExchange());
                    log.debug("返回RoutingKey:{}",r.getRoutingKey());
                });
                rabbitTemplate.convertAndSend(
                        RabbitMqConstants.MQ_TOPIC_EXCHANGE,
                        "abc.xxx",
                        msg
                );
                
                    @Test
                    public void sendTopic() {
                        String code = new Snowflake().nextIdStr().substring(0, 6);
                        System.out.println(code);
                        userService.sendMsg(code,"topic.rrr");
                    }
                

                延迟队列(死信)设计

                Documentation: Table of Contents — RabbitMQ

                RabbitMQ基础(2)——发布订阅fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计,在这里插入图片描述,第14张

                RabbitMQ基础(2)——发布订阅fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计,在这里插入图片描述,第15张

                java代码步骤

                创建正常+死信队列

                package com.tianju.mq.config;
                import com.tianju.mq.constants.RabbitMqConstants;
                import org.springframework.amqp.core.Binding;
                import org.springframework.amqp.core.BindingBuilder;
                import org.springframework.amqp.core.DirectExchange;
                import org.springframework.amqp.core.Queue;
                import org.springframework.context.annotation.Bean;
                import org.springframework.context.annotation.Configuration;
                import java.util.HashMap;
                import java.util.Map;
                @Configuration
                public class RabbitMqConfig {
                    @Bean
                    public DirectExchange normalExchange(){
                        return new DirectExchange(RabbitMqConstants.MQ_NORMAL_EXCHANGE,
                                RabbitMqConstants.durable,
                                RabbitMqConstants.autoDelete);
                    }
                    @Bean
                    public Queue normalQueue(){
                        Map map = new HashMap<>(2);
                        map.put("x-dead-letter-exchange",RabbitMqConstants.MQ_DELAY_EXCHANGE);
                        map.put("x-dead-letter-routing-key",RabbitMqConstants.MQ_DELAY_ROUTING_KEY);
                        return new Queue(
                                RabbitMqConstants.MQ_NORMAL_QUEUE,
                                RabbitMqConstants.durable,
                                RabbitMqConstants.exclusive,
                                RabbitMqConstants.autoDelete,
                                map);
                    }
                    @Bean
                    public Binding normalBinding(){
                        return BindingBuilder.bind(normalQueue())
                                .to(normalExchange())
                                .with(RabbitMqConstants.MQ_NORMAL_ROUTING_KEY);
                    }
                    //------------------死信队列设计--------------------------
                    /**
                     * 死信(延迟)队列
                     * @return
                     */
                    @Bean
                    public Queue delayQueue(){
                        return new Queue(RabbitMqConstants.MQ_DELAY_QUEUE,
                                RabbitMqConstants.durable,
                                RabbitMqConstants.exclusive,
                                RabbitMqConstants.autoDelete);
                    }
                    /**
                     * 死信交换机
                     * @return
                     */
                    @Bean
                    public DirectExchange delayExchange(){
                        return new DirectExchange(RabbitMqConstants.MQ_DELAY_EXCHANGE,
                                RabbitMqConstants.durable,
                                RabbitMqConstants.autoDelete);
                    }
                    /**
                     * 死信交换机队列绑定
                     * @return
                     */
                    @Bean
                    public Binding delayBinding(){
                        return BindingBuilder.bind(delayQueue())
                                .to(delayExchange())
                                .with(RabbitMqConstants.MQ_DELAY_ROUTING_KEY);
                    }
                }
                

                配置类+常量

                package com.tianju.mq.constants;
                public interface RabbitMqConstants {
                    String MQ_DELAY_QUEUE = "mq_delay_queue"; // 延迟队列,死信队列
                    String MQ_DELAY_EXCHANGE = "mq_delay_exchange"; // 死信交换机
                    String MQ_DELAY_ROUTING_KEY = "mq_delay_routing_key"; // 死信路由
                    // 正常的队列,交换机,路由
                    String MQ_NORMAL_QUEUE = "mq_normal_queue";
                    String MQ_NORMAL_EXCHANGE = "mq_normal_exchange";
                    String MQ_NORMAL_ROUTING_KEY = "mq_normal_routing_key";
                    // 参数
                    boolean durable = true;
                    boolean exclusive = false;
                    boolean autoDelete = false;
                }
                
                server:
                  port: 9099
                spring:
                  # 邮箱的配置
                  mail:
                    host: smtp.qq.com
                    port: 587
                    username: xxxxx.com
                    password: xxxxx
                  # rabbitmq的配置
                  rabbitmq:
                    host: 192.168.111.130
                    port: 5672
                    username: admin
                    password: 123
                    # 确认收到
                    publisher-confirm-type: correlated
                    publisher-returns: true
                logging:
                  level:
                    com.tianju.mq: debug
                

                生产者到正常队列

                package com.tianju.mq.service;
                public interface IUserService {
                    /**
                     * 延迟队列的生产者
                     * @param msg 发送的信息
                     * @param delayTime 延迟的时间,毫秒
                     */
                    void sendDelay(String msg,int delayTime);
                }
                
                package com.tianju.mq.service.impl;
                import com.tianju.mq.constants.RabbitMqConstants;
                import com.tianju.mq.service.IUserService;
                import lombok.extern.slf4j.Slf4j;
                import org.springframework.amqp.core.MessageProperties;
                import org.springframework.amqp.rabbit.core.RabbitTemplate;
                import org.springframework.beans.factory.annotation.Autowired;
                import org.springframework.stereotype.Service;
                import java.util.Date;
                @Service
                @Slf4j
                public class UserServiceImpl implements IUserService {
                    @Autowired
                    private RabbitTemplate rabbitTemplate;
                    @Override
                    public void sendDelay(String msg, int delayTime) {
                        rabbitTemplate.convertAndSend(
                                RabbitMqConstants.MQ_NORMAL_EXCHANGE,
                                RabbitMqConstants.MQ_NORMAL_ROUTING_KEY,
                                msg,
                                process->{
                                    process.getMessageProperties().setExpiration(String.valueOf(delayTime));
                                    return process;
                                }
                        );
                        log.debug("[生产者:]发送消息:{},时间{},延迟{}秒",msg,new Date(),delayTime/1000);
                    }
                }
                

                RabbitMQ基础(2)——发布订阅fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计,在这里插入图片描述,第16张

                消费者进行延迟消费

                package com.tianju.mq.consumer;
                import com.tianju.mq.constants.RabbitMqConstants;
                import lombok.extern.slf4j.Slf4j;
                import org.springframework.amqp.rabbit.annotation.RabbitListener;
                import org.springframework.stereotype.Service;
                import java.util.Date;
                @Service
                @Slf4j
                public class UserConsumer {
                    @RabbitListener(queues = RabbitMqConstants.MQ_DELAY_QUEUE)
                    public void delayConsume(String msg){
                        log.debug("[消费者消费信息:{},时间:{}",msg,new Date());
                    }
                }
                

                RabbitMQ基础(2)——发布订阅fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计,在这里插入图片描述,第17张

                延迟队列插件安装

                访问官网

                Community Plugins — RabbitMQ

                RabbitMQ基础(2)——发布订阅fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计,在这里插入图片描述,第18张

                RabbitMQ基础(2)——发布订阅fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计,在这里插入图片描述,第19张

                进入rabbitmq docker容器

                [root@localhost ~]# docker exec -it rabbitmq bash
                

                查询插件列表是否存在延迟插件

                root@6d2342d51b11:/plugins# rabbitmq-plugins list
                Listing plugins with pattern ".*" ...
                 Configured: E = explicitly enabled; e = implicitly enabled
                 | Status: * = running on rabbit@6d2342d51b11
                 |/
                [  ] rabbitmq_amqp1_0                  3.9.11
                [  ] rabbitmq_auth_backend_cache       3.9.11
                [  ] rabbitmq_auth_backend_http        3.9.11
                [  ] rabbitmq_auth_backend_ldap        3.9.11
                [  ] rabbitmq_auth_backend_oauth2      3.9.11
                [  ] rabbitmq_auth_mechanism_ssl       3.9.11
                [  ] rabbitmq_consistent_hash_exchange 3.9.11
                [  ] rabbitmq_event_exchange           3.9.11
                [  ] rabbitmq_federation               3.9.11
                [  ] rabbitmq_federation_management    3.9.11
                [  ] rabbitmq_jms_topic_exchange       3.9.11
                [E*] rabbitmq_management               3.9.11
                [e*] rabbitmq_management_agent         3.9.11
                [  ] rabbitmq_mqtt                     3.9.11
                [  ] rabbitmq_peer_discovery_aws       3.9.11
                [  ] rabbitmq_peer_discovery_common    3.9.11
                [  ] rabbitmq_peer_discovery_consul    3.9.11
                [  ] rabbitmq_peer_discovery_etcd      3.9.11
                [  ] rabbitmq_peer_discovery_k8s       3.9.11
                [E*] rabbitmq_prometheus               3.9.11
                [  ] rabbitmq_random_exchange          3.9.11
                [  ] rabbitmq_recent_history_exchange  3.9.11
                [  ] rabbitmq_sharding                 3.9.11
                [  ] rabbitmq_shovel                   3.9.11
                [  ] rabbitmq_shovel_management        3.9.11
                [  ] rabbitmq_stomp                    3.9.11
                [  ] rabbitmq_stream                   3.9.11
                [  ] rabbitmq_stream_management        3.9.11
                [  ] rabbitmq_top                      3.9.11
                [  ] rabbitmq_tracing                  3.9.11
                [  ] rabbitmq_trust_store              3.9.11
                [e*] rabbitmq_web_dispatch             3.9.11
                [  ] rabbitmq_web_mqtt                 3.9.11
                [  ] rabbitmq_web_mqtt_examples        3.9.11
                [  ] rabbitmq_web_stomp                3.9.11
                [  ] rabbitmq_web_stomp_examples       3.9.11
                

                下载支持3.9.x的插件

                https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases?after=rabbitmq_v3_6_12

                RabbitMQ基础(2)——发布订阅fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计,在这里插入图片描述,第20张

                RabbitMQ基础(2)——发布订阅fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计,在这里插入图片描述,第21张

                退出容器:

                root@6d2342d51b11:/plugins# exit
                exit
                

                上传到linux服务器

                在/usr/local/software/下创建文件夹rabbitmq/plugins

                [root@localhost software]# mkdir -p rabbitmq/plugins
                

                RabbitMQ基础(2)——发布订阅fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计,在这里插入图片描述,第22张

                拷贝插件到容器中

                [root@localhost plugins]# docker cp ./rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq:/plugins
                

                进入容器安装插件

                [root@localhost plugins]# docker  exec -it rabbitmq bash
                root@6d2342d51b11:/# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
                

                RabbitMQ基础(2)——发布订阅fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计,在这里插入图片描述,第23张

                打开管理页面

                进入Exchange页面,下拉Type看是否已经安装成功。

                RabbitMQ基础(2)——发布订阅fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计,在这里插入图片描述,第24张


                总结

                1.rabbitmq队列方式的梳理,点对点,一对多;

                2.发布订阅模式,交换机到消费者,以邮箱和手机验证码为例;

                3.topic模式,根据规则决定发送给哪个队列;

                4.rabbitmq回调确认,setConfirmCallback和setReturnsCallback;

                5.死信队列,延迟队列,创建方法,正常—死信,设置延迟时间;

网友评论

搜索
最新文章
热门文章
热门标签
 
 解梦七星彩号码  梦见假钱50元人民币  做梦梦见好多蛇