上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

带你深入了解RabbitMQ

guduadmin23小时前

1、消息队列概念

1.1 消息队列是什么

消息队列(Message Queue MQ)是实现应用之间数据通信的一种机制,采用先进先出的数据结构和生产者消费者设计模式实现通信。

带你深入了解RabbitMQ,在这里插入图片描述,第1张

带你深入了解RabbitMQ,在这里插入图片描述,第2张

1.2 消息队列有什么作用

消息队列的优势:

  • 解耦
  • 异步
  • 削峰

    1.2.1 解耦

    实现生产者和消费者的解耦,生产者和消费者不直接调用,也不用关心对方如何处理,代码的维护性提高

    例如:使用openfeign实现服务调用,如果被调用服务的接口发生修改,服务调用方也需要进行修改,服务之间的耦合性较高,不利于开发和维护

    带你深入了解RabbitMQ,在这里插入图片描述,第3张

    1.2.2 异步

    同步调用,服务A调用服务B,必须等待服务B执行完业务,服务A才能执行其它业务

    异步调用,服务A发送消息给消息队列,马上返回完成其它业务,不用等待服务B执行完

    带你深入了解RabbitMQ,在这里插入图片描述,第4张

    1.2.3 削峰

    可以通过控制消息队列的长度来限制请求流量,从而达到限流保护服务器的作用

    消息队列的缺点:

    1. 提高系统的复杂性
    2. 降低系统的可用性

    1.3 主流的消息队列

    主流的MQ:

    • ActiveMQ
    • RabbitMQ
    • RocketMQ
    • Kafka

      带你深入了解RabbitMQ,在这里插入图片描述,第5张

      1.4 消息队列的基本概念

      • 生产者

        向消息队列发送消息的服务

      • 消费者

        从消息队列取消息的服务

      • 队列 queue

        存放消息的容器,采用FIFO数据结构

      • 交换机 exchange

        实现消息路由,将消息分发到对应的队列中

      • 消息服务器 Broker

        进行消息通信的软件平台服务器

      • 虚拟主机 virtual host

        类似于namespace,将不同用户的交换机和队列区分开来

      • 连接 connection

        网络连接

      • 通道 channel

        数据通信的通道

        带你深入了解RabbitMQ,在这里插入图片描述,第6张

        2、安装RabbitMQ

        2.1 Linux安装

        1)安装erlang

        wget https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm
        rpm -Uvh erlang-solutions-1.0-1.noarch.rpm
        yum install epel-release
        yum install erlang
        

        2) 安装rabbitmq 目前的最新版本 支持erlang24

        wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.9.7/rabbitmq-server-3.9.7-1.el7.noarch.rpm
        yum install rabbitmq-server-3.9.7-1.el7.noarch.rpm  
        

        3) 启动rabbitmq

        service rabbitmq-server start
        

        4) 启用管理工具

        rabbitmq-plugins enable rabbitmq_management 
        

        5) 防火墙允许端口

        firewall-cmd --permanent --add-port=15672/tcp
        firewall-cmd --permanent --add-port=5672/tcp
        

        6) 提示不能使用localhost登录,添加远程登录的用户

        rabbitmqctl add_user admin admin
        rabbitmqctl set_user_tags admin administrator
        

        7) 设置开机启动输入下面命令

        chkconfig rabbitmq-server on
        

        2.2 Windows安装

        1)下载erlang和RabbitMQ安装包

        2)安装erlang

        3)安装rabbitmq

        4)打开菜单输入命令,启动管理工具

        带你深入了解RabbitMQ,在这里插入图片描述,第7张

        rabbitmq-plugins enable rabbitmq_management
        

        5)启动rabbitMQ

        net start rabbitmq
        net stop rabbitmq
        

        6)浏览器输入: http://localhost:15672 账号密码都是guest

        带你深入了解RabbitMQ,在这里插入图片描述,第8张

        3、RabbitMQ的基本使用

        3.1 添加用户

        不同的系统可以使用各自的用户登录RabbitMQ,可以在Admin的User页面添加新用户

        带你深入了解RabbitMQ,在这里插入图片描述,第9张

        3.2 添加虚拟主机

        虚拟主机相当于一个独立的MQ服务,有自身的队列、交换机、绑定策略等。

        添加虚拟主机

        带你深入了解RabbitMQ,在这里插入图片描述,第10张

        3.2 添加队列

        不同的消息队列保存不同类型的消息,如支付消息、秒杀消息、数据同步消息等。

        添加队列,需要填写虚拟主机、类型、名称、持久化、自动删除和参数等。

        带你深入了解RabbitMQ,在这里插入图片描述,第11张

        3.3 添加交换机

        生产者将消息发送到交换机Exchange,再由交换机路由到一个或多个队列中;

        交换器的类型有fanout、direct、topic、headers这四种,下篇文章将详细介绍。

        添加交换机

        带你深入了解RabbitMQ,在这里插入图片描述,第12张

        4、RabbitMQ的五种消息模型

        带你深入了解RabbitMQ,在这里插入图片描述,第13张

        RabbitMQ提供了多种消息模型,官网上第6种是RPC不属于常规的消息队列。

        属于消息模型的是前5种:

        1. 简单的一对一模型
        2. 工作队列模型 ,一个生产者将消息分发给多个消费者
        3. 发布/订阅模型 ,生产者发布消息,多个消费者同时收取
        4. 路由模型 ,生产者通过关键字发送消息给特定消费者
        5. 主题模型 ,路由模式基础上,在关键字里加入了通配符

        4.1 一对一模型

        带你深入了解RabbitMQ,在这里插入图片描述,第14张

        最基本的队列模型:

        一个生产者发送消息到一个队列,一个消费者从队列中取消息。

        4.1.1 操作步骤

        1)启动Rabbitmq,在管理页面中创建用户admin

        2)使用admin登录,然后创建虚拟主机myhost

        带你深入了解RabbitMQ,在这里插入图片描述,第15张

        创建队列,配置如下

        带你深入了解RabbitMQ,在这里插入图片描述,第16张

        4.1.2 案例代码

        导入依赖

        
          com.rabbitmq
          amqp-client
          3.4.1
        
        

        开发工具类

        public class MQUtils {
            public static final String QUEUE_NAME = "myqueue01";
            public static final String QUEUE_NAME2 = "myqueue02";
            public static final String EXCHANGE_NAME = "myexchange01";
            public static final String EXCHANGE_NAME2 = "myexchange02";
            public static final String EXCHANGE_NAME3 = "myexchange03";
            /**
             * 获得MQ的连接
             * @return
             * @throws IOException
             */
            public static Connection getConnection() throws IOException {
                ConnectionFactory connectionFactory = new ConnectionFactory();
                //配置服务器名、端口、虚拟主机名、登录账号和密码
                connectionFactory.setHost("localhost");
                connectionFactory.setPort(5672);
                connectionFactory.setVirtualHost("myhost");
                connectionFactory.setUsername("admin");
                connectionFactory.setPassword("123456");
                return connectionFactory.newConnection();
            }
        }
        

        开发生产者

        /**
         * 生产者,发送简单的消息到队列中
         */
        public class SimpleProducer {
            public static void main(String[] args) throws IOException {
                Connection connection = MQUtils.getConnection();
                //创建通道
                Channel channel = connection.createChannel();
                //定义队列
                channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);
                String msg = "Hello World!";
                //发布消息到队列
                channel.basicPublish("",MQUtils.QUEUE_NAME,null,msg.getBytes());
                channel.close();
                connection.close();
            }
        }
        

        运行生产者代码,管理页面点进myqueue01,在GetMessages中可以看到消息

        带你深入了解RabbitMQ,在这里插入图片描述,第17张

        开发消费者

        /**
         * 消费者,从队列中读取简单的消息
         */
        public class SimpleConsumer {
            public static void main(String[] args) throws IOException, InterruptedException {
                Connection connection = MQUtils.getConnection();
                Channel channel = connection.createChannel();
                //定义队列
                channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);
                //创建消费者
                QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
                //消费者消费通道中的消息
                channel.basicConsume(MQUtils.QUEUE_NAME,true,queueingConsumer);
                //读取消息
                while(true){
                    QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                    System.out.println(new String(delivery.getBody()));
                }
            }
        }
        

        4.2 工作队列模型

        带你深入了解RabbitMQ,在这里插入图片描述,第18张

        工作队列,生产者将消息分发给多个消费者,如果生产者生产了100条消息,消费者1消费50条,消费者2消费50条。

        4.2.1 案例代码

        开发生产者

        /**
          多对多模式的生产者,会发送多条消息到队列中
         */
        public class WorkProductor {
            public static void main(String[] args) throws IOException, InterruptedException {
                Connection connection = MQUtils.getConnection();
                Channel channel = connection.createChannel();
                channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);
                for(int i = 0;i < 100;i++){
                    String msg = "Hello-->" + i;
                    channel.basicPublish("",MQUtils.QUEUE_NAME,null, msg.getBytes());
                    System.out.println("send:" + msg);
                    Thread.sleep(10);
                }
                channel.close();
                connection.close();
            }
        }
        

        开发消费者1

        /**
         * 多对多模式的消费者1
         */
        public class WorkConsumer01 {
            public static void main(String[] args) throws IOException, InterruptedException {
                Connection connection = MQUtils.getConnection();
                Channel channel = connection.createChannel();
                channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);
                QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
                //消费者消费通道中的消息
                channel.basicConsume(MQUtils.QUEUE_NAME,true,queueingConsumer);
                while(true){
                    QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                    System.out.println("WorkConsumer1 receive :" + new String(delivery.getBody()));
                    Thread.sleep(10);
                }
            }
        }
        

        开发消费者2

        /**
         * 多对多模式的消费者2
         */
        public class WorkConsumer02 {
            public static void main(String[] args) throws IOException, InterruptedException {
                Connection connection = MQUtils.getConnection();
                Channel channel = connection.createChannel();
                channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);
                QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
                //消费者消费通道中的消息
                channel.basicConsume(MQUtils.QUEUE_NAME,true,queueingConsumer);
                while(true){
                    QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                    System.out.println("WorkConsumer2 receive :" + new String(delivery.getBody()));
                    Thread.sleep(1000);
                }
            }
        }
        

        生产者发送100个消息,两个消费者分别读取了50条。

        看消息内容,发现队列发送消息采用的是轮询方式,也就是先发给消费者1,再发给消费者2,依次往复。

        4.2.2 能者多劳

        上面案例中有一个问题:消费者处理消息的速度是不一样的,消费者1处理后睡眠10毫秒(Thread.sleep(10)),消费者2是1000毫秒,速度相差100倍,但是最后处理的消息数还是一样的。这样就存在效率问题:处理能力强的消费者得不到更多的消息。

        因为队列默认采用是自动确认机制,消息发过去后就自动确认,队列不清楚每个消息具体什么时间处理完,所以平均分配消息数量。

        实现能者多劳:

        1. channel.basicQos(1);限制队列一次发一个消息给消费者,等消费者有了反馈,再发下一条
        2. channel.basicAck 消费完消息后手动反馈,处理快的消费者就能处理更多消息
        3. basicConsume 中的参数改为false
        /**
        多对多模式的消费者1
        */
        public class WorkConsumer1 {
           public static void main(String[] args) throws IOException, InterruptedException {
               Connection connection = MQUtils.getConnection();
               Channel channel = connection.createChannel();
               channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);
               //同一时刻服务器只发送一条消息给消费者
               channel.basicQos(1);
               QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
               //true是自动返回完成状态,false表示手动
               channel.basicConsume(MQUtils.QUEUE_NAME,false,queueingConsumer);
               while(true){
                   QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                   System.out.println("WorkConsumer1 receive :" + new String(delivery.getBody()));
                   Thread.sleep(10);
                   //手动确定返回状态,不写就是自动确认
                   channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
               }
           }
        }
           
        /**
         * 多对多模式的消费者2
        */
        public class WorkConsumer2 {
           public static void main(String[] args) throws IOException, InterruptedException {
               Connection connection = MQUtils.getConnection();
               Channel channel = connection.createChannel();
               channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);
               //同一时刻服务器只发送一条消息给消费者
               channel.basicQos(1);
               QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
               //true是自动返回完成状态,false表示手动
               channel.basicConsume(MQUtils.QUEUE_NAME,false,queueingConsumer);
               while(true){
                   QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                   System.out.println("WorkConsumer2 receive :" + new String(delivery.getBody()));
                   Thread.sleep(1000);
                   //手动确定返回状态,不写就是自动确认
                   channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
               }
           }
        }
        

        4.3 发布/订阅模型

        带你深入了解RabbitMQ,在这里插入图片描述,第19张

        发布/订阅模式和Work模式的区别是:Work模式只存在一个队列,多个消费者共同消费一个队列中的消息;而发布订阅模式存在多个队列,不同的消费者可以从各自的队列中处理完全相同的消息。

        4.3.1 操作步骤

        实现步骤:

        1. 创建交换机(Exchange)类型是fanout(扇出)
        2. 交换机需要绑定不同的队列
        3. 不同的消费者从不同的队列中获得消息
        4. 生产者发送消息到交换机
        5. 再由交换机将消息分发到多个队列

        新建队列

        带你深入了解RabbitMQ,在这里插入图片描述,第20张

        新建交换机

        带你深入了解RabbitMQ,在这里插入图片描述,第21张

        点击交换机,在bindings里面绑定两个队列

        带你深入了解RabbitMQ,在这里插入图片描述,第22张

        4.3.2 案例代码

        生产者

        /**
         * 发布和订阅模式的生产者,消息会通过交换机发到队列
         */
        public class PublishProductor {
            public static void main(String[] args) throws IOException {
                Connection connection = MQUtils.getConnection();
                Channel channel = connection.createChannel();
                //声明fanout exchange
                channel.exchangeDeclare(MQUtils.EXCHANGE_NAME,"fanout");
                String msg = "Hello Fanout";
                //发布消息到交换机
                channel.basicPublish(MQUtils.EXCHANGE_NAME,"",null,msg.getBytes());
                System.out.println("send:" + msg);
                channel.close();
                connection.close();
            }
        }
        

        消费者1

        /**
         * 发布订阅模式的消费者1
         * 两个消费者绑定的消息队列不同
         * 通过交换机一个消息能被不同队列的两个消费者同时获取
         * 一个队列可以有多个消费者,队列中的消息只能被一个消费者获取
         */
        public class SubscribeConsumer1 {
            public static void main(String[] args) throws IOException, InterruptedException {
                Connection connection = MQUtils.getConnection();
                Channel channel = connection.createChannel();
                channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);
                //绑定队列1到交换机
                channel.queueBind(MQUtils.QUEUE_NAME,MQUtils.EXCHANGE_NAME,"");
                QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
                channel.basicConsume(MQUtils.QUEUE_NAME,true,queueingConsumer);
                while(true){
                    QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                    System.out.println("Consumer1 receive :" + new String(delivery.getBody()));
                }
            }
        }
        

        消费者2

        public class SubscribeConsumer2 {
            public static void main(String[] args) throws IOException, InterruptedException {
                Connection connection = MQUtils.getConnection();
                Channel channel = connection.createChannel();
                channel.queueDeclare(MQUtils.QUEUE_NAME2,false,false,false,null);
                //绑定队列2到交换机
                channel.queueBind(MQUtils.QUEUE_NAME2,MQUtils.EXCHANGE_NAME,"");
                QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
                channel.basicConsume(MQUtils.QUEUE_NAME2,true,queueingConsumer);
                while(true){
                    QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                    System.out.println("Consumer2 receive :" + new String(delivery.getBody()));
                }
            }
        }
        

        4.4 路由模型

        带你深入了解RabbitMQ,在这里插入图片描述,第23张

        路由模式的消息队列可以给队列绑定不同的key,生产者发送消息时,给消息设置不同的key,这样交换机在分发消息时,可以让消息路由到key匹配的队列中。

        可以想象上图是一个日志处理系统,C1可以处理error日志消息,C2可以处理info\error\warining类型的日志消息,使用路由模式就很容易实现了。

        4.3.1 操作步骤

        新建direct类型的交换机

        带你深入了解RabbitMQ,在这里插入图片描述,第24张

        4.3.2 案例代码

        生产者,给myqueue01绑定了key:error,myqueue02绑定了key:debug,然后发送了key:error的消息

        /**
          路由模式的生产者,发布消息会有特定的Key,消息会被绑定特定Key的消费者获取
         */
        public class RouteProductor {
            public static void main(String[] args) throws IOException, InterruptedException {
                Connection connection = MQUtils.getConnection();
                Channel channel = connection.createChannel();
                //声明交换机类型为direct
                channel.exchangeDeclare(MQUtils.EXCHANGE_NAME2,"direct");
                String msg = "Hello-->Route";
                //绑定队列1到交换机,指定了Key为error
                channel.queueBind(MQUtils.QUEUE_NAME,MQUtils.EXCHANGE_NAME2,"error");
                //绑定队列2到交换机,指定了Key为debug
                channel.queueBind(MQUtils.QUEUE_NAME2,MQUtils.EXCHANGE_NAME2,"debug");
                //error是一个指定的Key
                channel.basicPublish(MQUtils.EXCHANGE_NAME2,"error",null,msg.getBytes());
                System.out.println("send:" + msg);
                channel.close();
                connection.close();
            }
        }
        

        消费者1

        /**
         * 路由模式的消费者1
         * 可以指定Key,消费特定的消息
         */
        public class RouteConsumer1 {
            public static void main(String[] args) throws IOException, InterruptedException {
                Connection connection = MQUtils.getConnection();
                Channel channel = connection.createChannel();
                channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);
                QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
                channel.basicConsume(MQUtils.QUEUE_NAME,true,queueingConsumer);
                while(true){
                    QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                    System.out.println("RouteConsumer1 receive :" + new String(delivery.getBody()));
                }
            }
        }
        

        消费者2

        /**
         * 路由模式的消费者2
         * 可以指定Key,消费特定的消息
         */
        public class RouteConsumer2 {
            public static void main(String[] args) throws IOException, InterruptedException {
                Connection connection = MQUtils.getConnection();
                Channel channel = connection.createChannel();
                channel.queueDeclare(MQUtils.QUEUE_NAME2,false,false,false,null);
                QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
                channel.basicConsume(MQUtils.QUEUE_NAME2,true,queueingConsumer);
                while(true){
                    QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                    System.out.println("RouteConsumer2 receive :" + new String(delivery.getBody()));
                }
            }
        }
        

        4.5 主题模型

        带你深入了解RabbitMQ,在这里插入图片描述,第25张

        主题模式和路由模式差不多,在key中可以加入通配符:

        • * 匹配任意一个单词 com.* ----> com.hopu com.blb com.baidu
        • # 匹配.号隔开的0个或多个单词 com.# —> com.hopu.net com.hopu com.163.xxx.xxx.xxx

          4.3.1 案例代码

          生产者代码

          /**
            主题模式的生产者
           */
          public class TopicProductor {
              public static void main(String[] args) throws IOException, InterruptedException {
                  Connection connection = MQUtils.getConnection();
                  Channel channel = connection.createChannel();
                  //声明交换机类型为topic
                  channel.exchangeDeclare(MQUtils.EXCHANGE_NAME3,"topic");
                  //绑定队列到交换机,最后指定了Key
                  channel.queueBind(MQUtils.QUEUE_NAME,MQUtils.EXCHANGE_NAME3,"xray.#");
                  //绑定队列到交换机,最后指定了Key
                  channel.queueBind(MQUtils.QUEUE_NAME2,MQUtils.EXCHANGE_NAME3,"*.*.cn");
                  String msg = "Hello-->Topic";
                  channel.basicPublish(MQUtils.EXCHANGE_NAME3,"rabbit.com.cn",null,msg.getBytes());
                  System.out.println("send:" + msg);
                  channel.close();
                  connection.close();
              }
          }
          

          消费者1

          /**
           * 主题模式的消费者1 ,类似路由模式,可以使用通配符对Key进行筛选
           *   #匹配1个或多个单词,*匹配一个单词
           */
          public class TopicConsumer1 {
              public static void main(String[] args) throws IOException, InterruptedException {
                  Connection connection = MQUtils.getConnection();
                  Channel channel = connection.createChannel();
                  channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);
                  QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
                  channel.basicConsume(MQUtils.QUEUE_NAME,true,queueingConsumer);
                  while(true){
                      QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                      System.out.println("TopicConsumer1 receive :" + new String(delivery.getBody()));
                  }
              }
          }
          

          消费者2

          /**
           * 主题模式的消费者2
           */
          public class TopicConsumer2 {
              public static void main(String[] args) throws IOException, InterruptedException {
                  Connection connection = MQUtils.getConnection();
                  Channel channel = connection.createChannel();
                  channel.queueDeclare(MQUtils.QUEUE_NAME2,false,false,false,null);
                  QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
                  channel.basicConsume(MQUtils.QUEUE_NAME2,true,queueingConsumer);
                  while(true){
                      QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                      System.out.println("TopicConsumer2 receive :" + new String(delivery.getBody()));
                  }
              }
          }
          

          5、SpringBoot整合RabbitMQ

          1)创建两个SpringBoot项目,一个作为生产者,一个作为消费者

          生产者会发送两种消息:保存课程(更新和添加),删除课程

          消费者监听两个队列:保存课程队列和删除课程队列

          2)给生产者和消费者服务添加依赖

           
               org.springframework.boot
               spring-boot-starter-amqp
           
          

          3) 给生产者和消费者服务添加配置

          spring:
          	rabbitmq:
                host: localhost
                port: 5672
                username: root
                password: 123456
                virtual-host: test
          

          4)生产者的配置,用于生成消息队列和交换机

          /**
           * RabbitMQ的配置
           */
          @Configuration
          public class RabbitMQConfig {
              public static final String QUEUE_COURSE_SAVE = "queue.course.save";
              public static final String QUEUE_COURSE_REMOVE = "queue.course.remove";
              public static final String KEY_COURSE_SAVE = "key.course.save";
              public static final String KEY_COURSE_REMOVE = "key.course.remove";
              public static final String COURSE_EXCHANGE = "edu.course.exchange";
              @Bean
              public Queue queueCourseSave() {
                  return new Queue(QUEUE_COURSE_SAVE);
              }
              @Bean
              public Queue queueCourseRemove() {
                  return new Queue(QUEUE_COURSE_REMOVE);
              }
              @Bean
              public TopicExchange topicExchange() {
                  return new TopicExchange(COURSE_EXCHANGE);
              }
              @Bean
              public Binding bindCourseSave() {
                  return BindingBuilder.bind(queueCourseSave()).to(topicExchange()).with(KEY_COURSE_SAVE);
              }
              @Bean
              public Binding bindCourseRemove() {
                  return BindingBuilder.bind(queueCourseRemove()).to(topicExchange()).with(KEY_COURSE_REMOVE);
              }
          }
          

          5) 生产者发送消息的核心代码

              @Resource
              private RabbitTemplate rabbitTemplate;
              @GetMapping("/course/sendSaveMessage")
              public ResponseResult sendSaveMessage() {
                  //发送消息给队列 1 交换机 2 路由器 3 数据
                  rabbitTemplate.convertAndSend(RabbitMQConfig.COURSE_EXCHANGE, RabbitMQConfig.COURSE_SAVE_KEY, "添加课程");
                  return ResponseResult.ok("ok");
              }
              @GetMapping("/course/sendRemoveMessage")
              public ResponseResult sendRemoveMessage() {
                  //发送消息给队列 1 交换机 2 路由器 3 数据
                  rabbitTemplate.convertAndSend(RabbitMQConfig.COURSE_EXCHANGE, RabbitMQConfig.COURSE_REMOVE_KEY, 99L);
                  return ResponseResult.ok("ok");
              }
          

          6)消费者添加监听器

          import com.blb.common.config.RabbitMQConfig;
          import lombok.extern.slf4j.Slf4j;
          import org.springframework.amqp.rabbit.annotation.RabbitListener;
          import org.springframework.stereotype.Component;
          /**
           * 课程消息队列监听器
           */
          @Slf4j
          @Component
          public class CourseMQListener {
              @RabbitListener(queues = {RabbitMQConfig.COURSE_SAVE_QUEUE})
              public void handleSaveCourseMessage(String message) {
                  log.info("接收到保存课程消息:{}", message);
              }
              @RabbitListener(queues = {RabbitMQConfig.COURSE_REMOVE_QUEUE})
              public void handleRemoveCourseMessage(Long id) {
                  log.info("接收到删除课程消息:{}", id);
              }
          

          注意

          • 如果RabbitMQ的服务管理后台未发现我们用代码配置的队列和交换机,可能是未被Spring扫描到,请在消费者和生产者的启动类上添加扫描包的配置(com.blb.common.config包下就是RabbitMQ配置类,放在该包下是为了方便其它服务公用这一配置类)

            生产者

            @EnableDiscoveryClient
            @SpringBootApplication(scanBasePackages = {"com.blb.common.config", "com.blb.educourseservice"})
            public class EduCourseServiceApplication {
               public static void main(String[] args) {
                   SpringApplication.run(EduCourseServiceApplication.class, args);
               }
            }
            

            消费者

            @EnableFeignClients(basePackages = "com.blb.edusearchservice.client")
            @EnableDiscoveryClient
            @SpringBootApplication(scanBasePackages = {"com.blb.common.config", "com.blb.edusearchservice"})
            public class EduSearchSearchApplication {
               public static void main(String[] args) {
                   SpringApplication.run(EduSearchSearchApplication.class, args);
               }
            }
            

            6 总结

            一、消息队列是分布式系统的重要组件,起到的作用有:

            1. 解耦,生产者和消费者不需要知道对方的具体接口
            2. 异步,生产者发送完消息直接结束,不需要等待消费者执行完,效率高
            3. 削峰,控制高峰期消息的数量,降低服务器压力

            二、RabbitMQ的消息模型有:

            1. 一对一,一个生产者一个队列一个消费者,一个发一个收

            2. 一对多,一个生产者一个队列多个消费者,多个消费者共享一个队列中的消息

            3. 发布订阅模式

              由交换机绑定多个队列,消息分发到多个队列,每个消费者消费自己的队列中的消息

            4. 路由模式

              在发布订阅模式的基础上,加入路由键,消息通过键路由到不同的队列

            5. 主题模式

              在路由模式基础上,键中加入通配符,实现更加灵活的匹配

网友评论

搜索
最新文章
热门文章
热门标签