上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

RabbitMQ 的基本概念

guduadmin11天前

一 MQ 的基本概念

1 MQ概述

MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。消息队列就是所谓的存放消息的队列。 消息队列解决的不是存放消息的队列的⽬的,解决的是通信问题。

  • 传统方式,系统之间直接调用 (http协议 httpclient/openFeign)

    RabbitMQ 的基本概念,第1张

    • 中间件

      RabbitMQ 的基本概念,第2张

      2 MQ 的优势

      异步、 解耦、 削峰

      1 应用解耦

      系统的耦合性越高,容错性就越低,可维护性就越低。以购物为例子

      RabbitMQ 的基本概念,第3张

      使用 MQ 使得应用间解耦,提升容错性和可维护性。

      RabbitMQ 的基本概念,第4张

      2 异步提速

      一个下单操作耗时:20 + 300 + 300 + 300 = 920ms,用户点击完下单按钮后,需要等待920ms才能得到下单响应,太慢!

      RabbitMQ 的基本概念,第5张

      用户点击完下单按钮后,只需等待25ms就能得到下单响应 (20 + 5 = 25ms)。提升用户体验和系统吞吐量(单位时间内处理请求的数目)。

      RabbitMQ 的基本概念,第6张

      3 削峰填谷

      RabbitMQ 的基本概念,第7张

      RabbitMQ 的基本概念,第8张

      使用了 MQ 之后,限制消费消息的速度为1000,这样一来,高峰期产生的数据势必会被积压在 MQ 中,高峰就被“削”掉了,但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直到消费完积压的消息,这就叫做“填谷”。使用MQ后,可以提高系统稳定性。

      RabbitMQ 的基本概念,第9张

      3 MQ 的劣势

      1 系统可用性降低

      系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。如何保证MQ的高可用?

      2系统复杂度提高

      MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。如何保证消息不被丢失等情况?

      4 常见的 MQ 产品

      RabbitMQ 的基本概念,第10张

      二 RabbitMQ安装

      1 上传软件

      erlang­18.3­1.el7.centos.x86_64.rpm

      socat­1.7.3.2­5.el7.lux.x86_64.rpm

      rabbitmq­server­3.6.5­1.noarch.rpm

      2 安装Erlang

      rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm

      3 安装RabbitMQ

      rpm -ivh socat-1.7.3.2-1.1.el7.x86_64.rpm --force --nodeps

      rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm

      4 开启管理界面及配置

      rabbitmq-plugins enable rabbitmq_management

      5 启动

      service rabbitmq-server start # 启动服务

      service rabbitmq-server stop # 停止服务

      service rabbitmq-server restart # 重启服务

      6 登录

      需要关闭防火墙, 远程服务器开启15672和5672开启

      http://192.168.56.140:15672/

      如果登录报错, 这是因为rabbitmq从3.3.0开始禁止使用guest/guest权限通过除localhost外的访问

      vi /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app

      RabbitMQ 的基本概念,第11张

      删除loopback_users 中的 <<"guest">>

      RabbitMQ 的基本概念,第12张

      云服务器记得开放15672端口

      默认账号和密码都是 guest

      三 界面介绍和操作

      1 添加用户

      RabbitMQ 的基本概念,第13张

      # 角色说明: 
      	1、 超级管理员(administrator) 可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。 
      	2、 监控者(monitoring) 可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情 况,磁盘使用情况等) 
      	3、 策略制定者(policymaker) 可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红 框标识的部分)。 
      	4、 普通管理者(management) 仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。 
      	5、 其他 无法登陆管理控制台,通常就是普通的生产者和消费者。

      2 创建虚拟机

      1 点击图中的Virtual Hosts

      2 创建虚拟机路径,记得要带 / 

      RabbitMQ 的基本概念,第14张

      3 将虚拟机分配给用户

      RabbitMQ 的基本概念,第15张

      四 RabbitMQ概念

      1 架构图

      RabbitMQ 的基本概念,第16张

      2 相关概念

      Publisher - ⽣产者:发布消息到RabbitMQ中的Exchange

      Consumer - 消费者:监听RabbitMQ中的Queue中的消息

      Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker,也就是我们的RabbitMQ服务器

      Virtual host:出于多租户和安全因素设计的,在RabbitMQ中可以创建出多个虚拟消息服务器VirtualHost。

      Connection:publisher/consumer 和 broker 之间的 TCP 连接

      channel-信道: 网络信道,几乎所有操作都在channel中进行,channel是消息读写的通道。客户端可以建立多个channel,每个channel表示一个会话任务 , 信道有特定的功能,比如创建交换机,创建队列。

      Exchange - 交换机:和⽣产者建⽴连接并接收⽣产者的消息 ,并且不能保存消息。

      Queue - 队列:Exchange会将消息分发到指定的Queue,Queue和消费者进⾏交互 ,队列是可以保存消息的。

      Routes - 路由:交换机以什么样的策略将消息发布到Queue。生产者发消息的时候,可以给消息贴一个标签,为了让指定的消费者接收消息。

      • 结构解读:

        首先安装好的RabbitMQ就是一个Broker,如果我们想将MQ给多个用户使用并且互不影响,那我们就需要将MQ通过虚拟化的方式分割成多个提供MQ的服务,也就是Virtual host,每个Virtual host都有独立的路径,并且和用户绑定。这样我们就可以在自己的世界里发消息了。

        • 通信解读:一条消息到底是怎么从生产者到了消费者的?

          • 首先生产者通过连接的方式连接到MQ的一个虚拟机,需要知道MQ的ip,端口,虚拟机路径,用户名和密码,准备好了以后就可以建立连接了TCP 连接Connection连接,

          • 但是建立和关闭TCP连接是有代价的,频繁的建立关闭TCP连接对于系统的性能有很大的影响,而且TCP的连接数也有限制,这也限制了系统处理高并发的能力。但是,在TCP连接中建立Channel是没有上述代价的,所以我们使用信道changel的方式发送和接受消息。

          • 消息进入MQ的第一站是Exchange交换机,交换机的作用:① 接收生产者发送的消息 ②和队列绑定。交换机是不保存信息的。生产者发消息的时候可以指定一个路由键,路由键可以理解为就是给消息贴了一个标签(做标记作用,消费者接收消息的时候有用)

          • 消息进入第二站queue,消费者要接收消息,需要一直监听着queue,那么消费者在监听queue的时候需要先指定队列要和那个交换机绑定,绑定的时候也需要指定路由键,如果发消息时的路由键和接收消息时候路由键一样,那么这个消息就会进入到这个队列。

          • 最后消费者就拿到消息了。需要说明的一点,所有的交换机和队列创建的时候都是需要起名字的。

          3 RabbitMQ的通讯

          官网介绍:RabbitMQ Tutorials — RabbitMQ

          RabbitMQ 的基本概念,第17张

          RabbitMQ 的基本概念,第18张

          主题

          五 案例解释

          新建maven工程,Spring整合MQ。因为MQ中有很多概念在boot中是体会不到的,boot屏蔽了很多概念。

          1 简单队列模式

          RabbitMQ 的基本概念,第19张

          1 代码

          生产者和消费者都导入maven依赖

          
              com.rabbitmq
              amqp-client
              5.10.0
          
          • 生产者代码,记得最后需要关闭资源。

            package com.xinzhi.product;
            import com.rabbitmq.client.Channel;
            import com.rabbitmq.client.Connection;
            import com.rabbitmq.client.ConnectionFactory;
            import java.io.IOException;
            import java.util.concurrent.TimeoutException;
            public class MyProduct {
                //队列名
                private static final String QUEUE_NAME = "my_queue";
                public static void main(String[] args) throws IOException, TimeoutException {
                    // 1.创建连接
                    ConnectionFactory connectionFactory = new ConnectionFactory();
                    // 2.设置连接地址
                    connectionFactory.setHost("192.168.32.11");
                    // 3.设置端口号:
                    connectionFactory.setPort(5672);
                    // 4.设置账号和密码
                    connectionFactory.setUsername("laohan123");
                    connectionFactory.setPassword("laohan123");
                    // 5.设置VirtualHost
                    connectionFactory.setVirtualHost("/laohan");
                    Connection connection = connectionFactory.newConnection();
                    // 6.获取信道
                    Channel channel = connection.createChannel();
                    // 7.创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列
                    // 7.1第一个参数:队列名称
                    // 7.2第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失
                    // 7.3第三个参数:该队列是否是私有的
                    // 7.4第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列
                    // 7.5队列的其他参数, 一般都是null
                    channel.queueDeclare("my_queue", false, false, false, null);
                    String message = "欣知大数据";
                    //四个参数
                    //exchange 交换机,如果使⽤了"",表示使⽤了默认交换机,默认交换机会隐式绑定到队列,
                    //routingKey路由键:如果使⽤了默认交换机,那么路由键就可以用队列名来代替。
                    //props header信息,一般设置null
                    //最后一个参数是要传递的消息字节数组
                    channel.basicPublish("",  //使⽤默认交换机
                            "my_queue",    //因为⽤了默认交换机,于是参数就是队列名称
                            null,
                            message.getBytes()        消息内容
                    );
                    channel.close();
                    connection.close();
                    System.out.println("发送成功");
                }
            }
            
          • 消费者代码

            package com.xinzhi;
            import com.rabbitmq.client.*;
            import java.io.IOException;
            import java.util.concurrent.TimeoutException;
            public class MyConsumer {
                public static void main(String[] args) throws IOException, TimeoutException {
                    // 1.创建连接
                    ConnectionFactory connectionFactory = new ConnectionFactory();
                    // 2.设置连接地址
                    connectionFactory.setHost("192.168.32.11");
                    // 3.设置端口号:
                    connectionFactory.setPort(5672);
                    // 4.设置账号和密码
                    connectionFactory.setUsername("laohan123");
                    connectionFactory.setPassword("laohan123");
                    // 5.设置VirtualHost
                    connectionFactory.setVirtualHost("/laohan");
                    Connection connection = connectionFactory.newConnection();
                    // 6.获取信道
                    Channel channel = connection.createChannel();
                    // 7.声明队列
                    channel.queueDeclare("my_queue", false, false, false, null);
                    // 8.创建消费者
                    Consumer consumer = new DefaultConsumer(channel) {
                        // consumerTag 消息的唯一标识,一般用来确认消息是否被消费
                        // envelope  封装了mq的基本方法
                        // properties 封装了mq的基本属性
                        // body       监听到的消息
                        @Override
                        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                            System.out.println(new String(body));
                        }
                    };
                    // 9.消费者监听某个队列  autoAck自动签收
                    channel.basicConsume("my_queue", false, consumer);
                }
            }
            RabbitMQ 的基本概念,第20张

            2 代码解读:

            envelope:单词 信封的意思,在这里是封装了MQ的一些基本方法

            - getDeliveryTag() 获取此参数信封中包含的交货标签
            - isRedeliver()    如果这是在 ack 失败后是否重新投递
            - getExchange()
            - getRoutingKey()

            3 流程解读

            这是RabbitMQ最简单的工作方式

            • 生产者声明好队列,然后把信息给了MQ默认的交换机,交换机将信息发给队列

            • 消费者也声明好队列,然后监听队列获取信息

              4 抽出工具类

              因为生产者和消费者都是相同的获取信道的方式

              public static Connection getConnection(){
                      // 1.创建连接
                      ConnectionFactory connectionFactory = new ConnectionFactory();
                      // 2.设置连接地址
                      connectionFactory.setHost("192.168.56.140");
                      // 3.设置端口号:
                      connectionFactory.setPort(5672);
                      // 4.设置账号和密码
                      connectionFactory.setUsername("laohan123");
                      connectionFactory.setPassword("laohan123");
                      // 5.设置VirtualHost
                      connectionFactory.setVirtualHost("/laohan");
                      Connection connection = null;
                      try {
                          connection = connectionFactory.newConnection();
                      } catch (IOException e) {
                          e.printStackTrace();
                      } catch (TimeoutException e) {
                          e.printStackTrace();
                      }
                      return connection;
                  }

              2 work queue

              RabbitMQ 的基本概念,第21张

              队列模式: 能者多劳模式

              1 代码

              • 生产者

                package com.xinzhi.work.product;
                import com.rabbitmq.client.Channel;
                import com.rabbitmq.client.Connection;
                import com.xinzhi.utils.RabbitUtil;
                import java.io.IOException;
                import java.util.concurrent.TimeoutException;
                public class MyProduct {
                    public static void main(String[] args) throws IOException, TimeoutException {
                        //1 获取连接和信道
                        Connection connection = RabbitUtil.getConnection();
                        Channel channel = connection.createChannel();
                        //2 声明队列
                        channel.queueDeclare("work_queue", true, false, false, null);
                        //3 发消息(消息先到了默认交换机,交换机和队列绑定了,所以信息也会直接到了queue)
                        for (int i = 1; i <101 ; i++) {
                            String message = "xinzhi"+i;
                            channel.basicPublish("","work_queue",null,message.getBytes());
                        }
                        //4 提示和释放资源
                        System.out.println("发送成功");
                        channel.close();
                        connection.close();
                    }
                }
                
              • 消费者 将下面的代码再复制两份MyConsumer1,MyConsumer2,等待时间设置成100,500

                package com.xinzhi.work.consumer;
                import com.rabbitmq.client.*;
                import com.xinzhi.utils.RabbitUtil;
                import java.io.IOException;
                public class MyConsumer1 {
                    public static void main(String[] args) throws IOException {
                        //1 获取连接和信道
                        Connection connection = RabbitUtil.getConnection();
                        Channel channel = connection.createChannel();
                        //2 声明队列
                        channel.queueDeclare("work_queue", true, false, false, null);
                        //3 声明消费者一次只接受一条消息
                        channel.basicQos(1);
                        // 4 声明消费者
                        Consumer consumer = new DefaultConsumer(channel) {
                            @Override
                            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                                try {
                                    Thread.sleep(10);
                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }
                                System.out.println("消费标签是" + consumerTag + "消息体是" + new String(body));
                                // 消息消费成功以后的唯一标识
                                System.out.println(envelope.getDeliveryTag());
                                // 确认签收当前的一条消息,如果是true是签收队列里面所有的消息
                                channel.basicAck(envelope.getDeliveryTag(), false);
                            }
                        };
                        channel.basicConsume("work_queue", consumer);
                    }
                }
                

                2 代码解读

                在简单模式的基础上添加了多个消费者,每个消费者添加了等待时间。

                生产者一次往队列里投放多条消息,消费者根据能力来消费这里面的所有消息,性能强的消费的消息多,所以是能者多劳

                3 订阅发布

                平分秋色

                RabbitMQ 的基本概念,第22张

                交换机类型 fanout

                发布订阅,这次使用了交换机,之前的两种方式都是没有显式的声明使用交换机,之前其实用的系统默认的交换机。

                这次使用了交换机,但是 没有使用路由键。只要和交换机绑定了的对了都可以接受到消息,也就是上图两个队列中可以收到相同的消息。

                1 代码

                • 生产者

                  package com.xinzhi.fanout;
                  import com.rabbitmq.client.BuiltinExchangeType;
                  import com.rabbitmq.client.Channel;
                  import com.rabbitmq.client.Connection;
                  import com.xinzhi.utils.RabbitUtil;
                  import java.io.IOException;
                  import java.util.concurrent.TimeoutException;
                  public class Product {
                      public static void main(String[] args) throws IOException, TimeoutException {
                          //1 获取连接和信道
                          Connection connection = RabbitUtil.getConnection();
                          Channel channel = connection.createChannel();
                          //2 声明交换机和类型
                          channel.exchangeDeclare("fanout_exchange", BuiltinExchangeType.FANOUT);
                          //3 将信息发给交换机
                          for (int i = 1; i <101 ; i++) {
                              String message = "laohan"+i;
                              channel.basicPublish("fanout_exchange","",null,message.getBytes());
                          }
                          System.out.println("success");
                          channel.close();
                          connection.createChannel();
                      }
                  }
                  
                • 消费者1

                  package com.xinzhi.fanout;
                  import com.rabbitmq.client.*;
                  import com.xinzhi.utils.RabbitUtil;
                  import java.io.IOException;
                  public class MyConsumer1 {
                      public static void main(String[] args) throws IOException {
                          //1 获取连接和信道
                          Connection connection = RabbitUtil.getConnection();
                          Channel channel = connection.createChannel();
                          //2 声明队列
                          channel.queueDeclare("fanout_queue1", true, false, false, null);
                          //3 声明交换机
                          channel.exchangeDeclare("fanout_exchange", BuiltinExchangeType.FANOUT);
                          //4 交换机和队列绑定
                          channel.queueBind("fanout_queue1", "fanout_exchange", "", null);
                          // 5 声明消费者
                          Consumer consumer = new DefaultConsumer(channel) {
                              @Override
                              public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                                  System.out.println("tag:" + consumerTag + ",message:" + new String(body));
                                  // 消息消费成功以后的唯一标识
                                  System.out.println(envelope.getDeliveryTag());
                                  // 确认签收当前的一条消息,如果是true是签收队列里面所有的消息
                                  channel.basicAck(envelope.getDeliveryTag(), false);
                              }
                          };
                          channel.basicConsume("fanout_queue1", consumer);
                      }
                  }
                  
                • 消费者2

                  package com.xinzhi.fanout;
                  import com.rabbitmq.client.*;
                  import com.xinzhi.utils.RabbitUtil;
                  import java.io.IOException;
                  public class MyConsumer2 {
                      public static void main(String[] args) throws IOException {
                          //1 获取连接和信道
                          Connection connection = RabbitUtil.getConnection();
                          Channel channel = connection.createChannel();
                          //2 声明队列
                          channel.queueDeclare("fanout_queue2", true, false, false, null);
                          //3 声明交换机
                          channel.exchangeDeclare("fanout_exchange", BuiltinExchangeType.FANOUT);
                          //4 交换机和队列绑定
                          channel.queueBind("fanout_queue2", "fanout_exchange", "", null);
                          // 5 声明消费者
                          Consumer consumer = new DefaultConsumer(channel) {
                              @Override
                              public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                                  System.out.println("tag:" + consumerTag + ",message:" + new String(body));
                                  // 消息消费成功以后的唯一标识
                                  System.out.println(envelope.getDeliveryTag());
                                  // 确认签收当前的一条消息,如果是true是签收队列里面所有的消息
                                  channel.basicAck(envelope.getDeliveryTag(), false);
                              }
                          };
                          channel.basicConsume("fanout_queue2", consumer);
                      }
                  }
                  

                  4 .路由 routing

                  暗送秋波

                  1 概念

                  交换机direct

                  • 在⽣产者发送消息时指明routing-key

                  • 在消费者声明队列和交换机的绑定关系时,指明routing-key

                    RabbitMQ 的基本概念,第23张

                    • 解决的问题是:

                      • 因为交换机和两个队列都绑定了,但是为了给队列里发送的消息不一样,也就是区分给那个队列发什么样 的消息,就有了routing key的概念。发消息的时候指定一下路由键,接收消息的时候队列要和交换机绑定,这时候也需要指定路由键,如果这两次的路由键一样,那么这个消息就放着这个队列里面

                      2 代码

                      • 生产者

                        package com.xinzhi.direct;
                        import com.rabbitmq.client.BuiltinExchangeType;
                        import com.rabbitmq.client.Channel;
                        import com.rabbitmq.client.Connection;
                        import com.xinzhi.utils.RabbitUtil;
                        import java.io.IOException;
                        import java.util.concurrent.TimeoutException;
                        public class Product {
                            public static void main(String[] args) throws IOException, TimeoutException {
                                //1 获取连接和信道
                                Connection connection = RabbitUtil.getConnection();
                                Channel channel = connection.createChannel();
                                //2 声明交换机和类型,并且持久化
                                channel.exchangeDeclare("direct_exchange", BuiltinExchangeType.DIRECT,true);
                                //3 将信息发给交换机,并且指定路由键
                                String message1 = "laohan1";
                                String message2 = "laohan2";
                                channel.basicPublish("direct_exchange","han",null,message1.getBytes());
                                channel.basicPublish("direct_exchange","man",null,message2.getBytes());
                                System.out.println("success");
                                channel.close();
                                connection.close();
                            }
                        }
                        
                      • 消费者1

                        package com.xinzhi.direct;
                        import com.rabbitmq.client.*;
                        import com.xinzhi.utils.RabbitUtil;
                        import java.io.IOException;
                        public class MyConsumer1 {
                            public static void main(String[] args) throws IOException {
                                //1 获取连接和信道
                                Connection connection = RabbitUtil.getConnection();
                                Channel channel = connection.createChannel();
                                //2 声明队列
                                channel.queueDeclare("direct_queue1", true, false, false, null);
                                //3 声明交换机
                                channel.exchangeDeclare("direct_exchange", BuiltinExchangeType.DIRECT,true);
                                //4 交换机和队列绑定
                                channel.queueBind("direct_queue1", "direct_exchange", "han");
                                // 5 声明消费者
                                Consumer consumer = new DefaultConsumer(channel) {
                                    @Override
                                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                                        System.out.println("tag:" + consumerTag + ",message:" + new String(body));
                                        // 消息消费成功以后的唯一标识
                                        System.out.println(envelope.getDeliveryTag());
                                        // 确认签收当前的一条消息,如果是true是签收队列里面所有的消息
                                        channel.basicAck(envelope.getDeliveryTag(), false);
                                    }
                                };
                                channel.basicConsume("direct_queue1", consumer);
                            }
                        }
                        
                      • 消费者2

                        package com.xinzhi.direct;
                        import com.rabbitmq.client.*;
                        import com.xinzhi.utils.RabbitUtil;
                        import java.io.IOException;
                        public class MyConsumer2 {
                            public static void main(String[] args) throws IOException {
                                //1 获取连接和信道
                                Connection connection = RabbitUtil.getConnection();
                                Channel channel = connection.createChannel();
                                //2 声明队列
                                channel.queueDeclare("direct_queue2", true, false, false, null);
                                //3 声明交换机
                                channel.exchangeDeclare("direct_exchange", BuiltinExchangeType.DIRECT,true);
                                //4 交换机和队列绑定
                                channel.queueBind("direct_queue2", "direct_exchange", "man");
                                // 5 声明消费者
                                Consumer consumer = new DefaultConsumer(channel) {
                                    @Override
                                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                                        System.out.println("tag:" + consumerTag + ",message:" + new String(body));
                                        // 消息消费成功以后的唯一标识
                                        System.out.println(envelope.getDeliveryTag());
                                        // 确认签收当前的一条消息,如果是true是签收队列里面所有的消息
                                        channel.basicAck(envelope.getDeliveryTag(), false);
                                    }
                                };
                                channel.basicConsume("direct_queue2", consumer);
                            }
                        }

                        5 通配符模式

                        你的心思我要

                        1 概念

                        交换机是 topic

                        • 因为路由模式里是精确匹配,比较局限,使用通配符方式,通配符,提⾼了匹配的范围,扩展业务。

                        • Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

                        • 通配符规则:# 匹配一个或多个词,* 匹配不多不少恰好1个词,例如:item.# 能够匹配 item.insert.abc 或者 item.insert,item.* 只能匹配 item.insert。

                          2 代码

                          • 生产者

                            package com.xinzhi.topic;
                            import com.rabbitmq.client.BuiltinExchangeType;
                            import com.rabbitmq.client.Channel;
                            import com.rabbitmq.client.Connection;
                            import com.xinzhi.utils.RabbitUtil;
                            import java.io.IOException;
                            import java.util.concurrent.TimeoutException;
                            public class Product {
                                public static void main(String[] args) throws IOException, TimeoutException {
                                    //1 获取连接和信道
                                    Connection connection = RabbitUtil.getConnection();
                                    Channel channel = connection.createChannel();
                                    //2 声明交换机和类型,并且持久化
                                    channel.exchangeDeclare("topic_exchange", BuiltinExchangeType.TOPIC,true);
                                    //3 将信息发给交换机,并且指定路由键
                                    String message1 = "laohanxueit";
                                    channel.basicPublish("topic_exchange","xinzhi.15",null,message1.getBytes());
                                    System.out.println("success");
                                    channel.close();
                                    connection.close();
                                }
                            }
                            
                          • 消费者

                            package com.xinzhi.topic;
                            import com.rabbitmq.client.*;
                            import com.xinzhi.utils.RabbitUtil;
                            import java.io.IOException;
                            public class MyConsumer {
                                public static void main(String[] args) throws IOException {
                                    //1 获取连接和信道
                                    Connection connection = RabbitUtil.getConnection();
                                    Channel channel = connection.createChannel();
                                    //2 声明队列
                                    channel.queueDeclare("topic_queue", true, false, false, null);
                                    //3 声明交换机
                                    channel.exchangeDeclare("topic_exchange", BuiltinExchangeType.TOPIC,true);
                                    //4 交换机和队列绑定
                                    channel.queueBind("topic_queue", "topic_exchange", "xinzhi.#");
                                    // 5 声明消费者
                                    Consumer consumer = new DefaultConsumer(channel) {
                                        @Override
                                        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                                            System.out.println("tag:" + consumerTag + ",message:" + new String(body));
                                            // 消息消费成功以后的唯一标识
                                            System.out.println(envelope.getDeliveryTag());
                                            // 确认签收当前的一条消息,如果是true是签收队列里面所有的消息
                                            channel.basicAck(envelope.getDeliveryTag(), false);
                                        }
                                    };
                                    channel.basicConsume("topic_queue", consumer);
                                }
                            }

                            六 SpringBoot整合

                            1 发布订阅

                            1 新建boot项目

                            2 导入依赖

                            
                                org.springframework.boot
                                spring-boot-starter-amqp
                            

                            3 配置文件

                            server:
                              port: 8099
                            spring:
                              rabbitmq:
                                host: 192.168.56.140
                                port: 5672
                                username: laohan123
                                password: laohan123
                                virtual-host: /laohan

                            4 配置类

                            import org.springframework.amqp.core.*;
                            import org.springframework.beans.factory.annotation.Qualifier;
                            import org.springframework.context.annotation.Bean;
                            import org.springframework.context.annotation.Configuration;
                            @Configuration
                            public class RabbitConfig {
                                public static final String EXCHANGE_NAME = "fanout_exchage";
                                public static final String QUEUE_NAME = "fanout_queue";
                                @Bean("queue")
                                public Queue queue(){
                            //        return new Queue(QUEUE_NAME, true, false, false);
                                    return QueueBuilder.durable(QUEUE_NAME).build();
                                }
                                @Bean("exchange")
                                public Exchange exchange(){
                            //        return new FanoutExchange(EXCHANGE_NAME, true, false);
                                    return ExchangeBuilder.fanoutExchange(EXCHANGE_NAME).durable(true).build();
                                }
                                @Bean
                                public Binding binding(@Qualifier("queue") Queue queue, @Qualifier("exchange") Exchange exchange){
                                    return BindingBuilder.bind(queue).to(exchange).with("").noargs();
                                }
                            }

                            5监听类

                            import com.rabbitmq.client.Channel;
                            import org.springframework.amqp.core.Message;
                            import org.springframework.amqp.rabbit.annotation.RabbitListener;
                            import org.springframework.stereotype.Component;
                            import java.io.IOException;
                            @Component
                            public class RabbitListen {
                            @RabbitListener(queues = {RabbitConfig.QUEUE_NAME})
                            public void listener(String body,Message message, Channel channel) throws IOException {
                                    long msgTag = message.getMessageProperties().getDeliveryTag();
                                    System.out.println("msgTag==>"+msgTag);
                                    System.out.println("message==>"+message);
                                    System.out.println("body==>"+body);
                                }
                            }

                            6 测试类发送消息

                            @Autowired
                            private RabbitTemplate rabbitTemplate;
                            @Test
                            void contextLoads() {
                                rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME,"","老韩学it");
                            }

                            2 topic

                            • 在发布订阅的基础上修改交换机名称和路由绑定就可以了

                              package com.xinzhi.config;
                              import org.springframework.amqp.core.*;
                              import org.springframework.beans.factory.annotation.Qualifier;
                              import org.springframework.context.annotation.Bean;
                              import org.springframework.context.annotation.Configuration;
                              @Configuration
                              public class RabbitConfig {
                              //    public static final String EXCHANGE_NAME = "fanout_exchage";
                                  public static final String EXCHANGE_NAME = "topic_exchange";
                              //    public static final String QUEUE_NAME = "fanout_queue";
                                  public static final String QUEUE_NAME = "topic_queue";
                                  @Bean("queue")
                                  public Queue queue(){
                              //        return new Queue(QUEUE_NAME, true, false, false);
                                      return QueueBuilder.durable(QUEUE_NAME).build();
                                  }
                                  @Bean("exchange")
                                  public Exchange exchange(){
                              //        return new FanoutExchange(EXCHANGE_NAME, true, false);
                              //        return ExchangeBuilder.fanoutExchange(EXCHANGE_NAME).durable(true).build();
                                      return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
                                  }
                                  @Bean
                                  public Binding binding(@Qualifier("queue") Queue queue, @Qualifier("exchange") Exchange exchange){
                                      return BindingBuilder.bind(queue).to(exchange).with("xinzhi.#").noargs();
                                  }
                              }
                              
                            • 发送消息验证

                               @Autowired
                                  private RabbitTemplate rabbitTemplate;
                                  @Test
                                  void contextLoads() {
                                      rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME,"xinzhi.15","老韩学it");
                                  }

                              七 消息的可靠性投递

                              1 什么是消息的可靠性投递

                              • 保证消息一定能发到消息队列中

                              • 细节

                                • 保证mq节点成功接受消息

                                • 消息发送端需要接受到mq服务端接收到消息的确认应答

                                • 完善的消息补偿机制,发送失败的消息可以再感知并二次处理

                              • RabbitMQ消息投递路径

                                • 生产者-->交换机-->队列-->消费者

                                • 通过两个点的控制,保证消息的可靠性投递

                                  • 生产者到交换机 confirmCallback

                                  • 交换机到队列 returnCallbakc

                              • 建议

                                • 开启消息确认机制以后,保证了消息的准确送达,但由于频繁的确认交互,RabbitMQ的整体效率变低,吞吐量下降严重,不是非常重要的消息不建议用消息确认机制

                                2 confirmCallback

                                • 机制:

                                  生产者投递消息以后,如果Broker收到消息以后,会给生产者一个ACK,生产者通过ACK可以确认这条消息是否成功发送到Broker。

                                • 开启confirmCallback

                                  spring.rabbitmq.publisher-confirm-type: correlated

                                  RabbitMQ 的基本概念,第24张
                                • 发送代码

                                  @Test
                                  void confirm(){
                                      rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                                          /**
                                               * 消息到交换机的确认
                                               * @param correlationData  配置信息
                                               * @param ack              交换机确认  true消息接受成功  false消息接受失败
                                               * @param cause             消息发送失败原因
                                               */
                                          @Override
                                          public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                                              System.out.println("ConfirmCallback==========>");
                                              System.out.println("correlationData==========>"+correlationData);
                                              System.out.println("ack==========>"+ack);
                                              System.out.println("cause==========>"+cause);
                                              if(ack){
                                                  System.out.println("发送成功");
                                                  // 更新数据库  成功
                                              }else {
                                                  System.out.println("发送失败,日志或数据库纪录");
                                                  // 更新数据库  失败
                                              }
                                          }
                                      });
                                      rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME,"xinzhi.15","老韩学it");
                                  }

                                  RabbitMQ 的基本概念,第25张

                                  • 模拟失败场景,修改发送时候交换机名称

                                    RabbitMQ 的基本概念,第26张

                                    2 returnCallback

                                    • return机制保证消息在rabbitmq中能够成功的投递到队列⾥

                                    • 两种模式:

                                      • 交换机到队列不成功,则丢弃消息(默认)

                                      • 交换机到队列不成功,返回生产者,触发returnCallback

                                    • 开启returnCallback,交换机到队列的可靠性投递

                                      spring.rabbitmq.publisher-returns=true

                                    • 修改投递到队列失败的策略

                                      spring.rabbitmq.template.mandatory=true

                                    • 发送消息验证.

                                       @Test
                                      void returnCallback(){
                                          rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
                                              @Override
                                              public void returnedMessage(ReturnedMessage returned) {
                                                  int code = returned.getReplyCode();
                                                  System.out.println("code==>"+code);
                                                  System.out.println("returned==>"+returned);
                                              }
                                          });
                                          rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME,"xinzhi.15","老韩学it");
                                      }

                                      • 发送消息以后,没有任何提示,我们修改路由键

                                        RabbitMQ 的基本概念,第27张

                                        八 消息确认

                                        1 背景

                                        保证消息从队列到消费者的过程。

                                        2 ACK介绍

                                        • 消费者从RabbitMQ中获取消息并且处理完成以后,反馈给RabbitMQ,RabbitMQ收到确认消息以后才能把消息从队列中删除

                                        • 消费者在处理消息的时候出现了网络不稳定、服务器异常等情况,那么就不会有ACK反馈,RabbitMQ认为这个消息没有正常消费,就将这个消息放回队列里面

                                        • 只有当消费者正确发送ack以后,RabbitMQ才会把消息从队列中删除

                                        • 消息的ack确认机制默认是打开的,消息如果未被进行ack的消息确认机制,这条消息将被锁定

                                          3 确认方式

                                          • 自动

                                          • 手动manual

                                            spring.rabbitmq.listener.simple.acknowledge-mode=manual

                                            RabbitMQ 的基本概念,第28张

                                            • 发送消息,并且开启监听模式,虽然消息被消费了,但是因为开启了手动确认模式配置,但是代码里没有手动确认所以队列里的消息不会删除

                                              RabbitMQ 的基本概念,第29张

                                              • 代码中开启确认机制

                                                 channel.basicAck(msgTag,false);
                                                • 消息拒绝

                                                  // false 一次拒绝一条   true 重新回到队列
                                                  channel.basicNack(msgTag,false,true);

                                                  结果就会看到控制台一直接受消息,因为对列有消息就会被监听到,监听以后拒绝了又放到队列里面,然后 又监听...

                                                  • DeliveryTag

                                                    表示消息投递的序号,每次消费消息或者消息重新投递以后,DeliveryTag都会+1

                                                  • basicReject

                                                    也是消息拒绝的,一次只能拒绝一条消息,也可以设置是否重新回如队列

网友评论

搜索
最新文章
热门文章
热门标签