上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

RabbitMQ--消息模型

guduadmin34小时前

可参考RabbitMQ官网:RabbitMQ: easy to use, flexible messaging and streaming — RabbitMQ

RabbitMQ--消息模型,第1张

 1、简单队列模式

简单队列模式的模型图:

RabbitMQ--消息模型,第2张

官方的HelloWorld是基于最基础的消息队列模型来实现的,只包括三个角色:

  • publisher:消息发布者,将消息发送到队列queue

  • queue:消息队列,负责接受并缓存消息

  • consumer:订阅队列,处理队列中的消息

    导入Demo,包括三部分:

            链接:https://pan.baidu.com/s/1KjLjpkgyF6a73_cxJYwnYg?pwd=d3ae 

            提取码:d3ae 

            --来自百度网盘超级会员V6的分享

            RabbitMQ--消息模型,第3张

            mq-demo:父工程,管理项目依赖

            publisher:消息的发送者

            consumer:消息的消费者

    1.1.publisher实现

           思路:

            ①建立连接        ②创建Channel        ③声明队列         ④发送消息        ⑥关闭连接和channel

    代码实现:

    package cn.itcast.mq.helloworld;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import org.junit.Test;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    public class PublisherTest {
        @Test
        public void testSendMessage() throws IOException, TimeoutException {
            // 1.建立连接
            ConnectionFactory factory = new ConnectionFactory();
            // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
            factory.setHost("192.168.150.101");
            factory.setPort(5672);
            factory.setVirtualHost("/");
            factory.setUsername("itcast");
            factory.setPassword("123321");
            // 1.2.建立连接
            Connection connection = factory.newConnection();
            // 2.创建通道Channel
            Channel channel = connection.createChannel();
            // 3.创建队列
            String queueName = "simple.queue";
            channel.queueDeclare(queueName, false, false, false, null);
            // 4.发送消息
            String message = "hello, rabbitmq!";
            channel.basicPublish("", queueName, null, message.getBytes());
            System.out.println("发送消息成功:【" + message + "】");
            // 5.关闭通道和连接
            channel.close();
            connection.close();
        }
    }

    1.2.consumer实现

    代码思路:

    • 建立连接

    • 创建Channel

    • 声明队列

    • 订阅消息

      代码实现:

      package cn.itcast.mq.helloworld;
      import com.rabbitmq.client.*;
      import java.io.IOException;
      import java.util.concurrent.TimeoutException;
      public class ConsumerTest {
          public static void main(String[] args) throws IOException, TimeoutException {
              // 1.建立连接
              ConnectionFactory factory = new ConnectionFactory();
              // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
              factory.setHost("192.168.150.101");
              factory.setPort(5672);
              factory.setVirtualHost("/");
              factory.setUsername("itcast");
              factory.setPassword("123321");
              // 1.2.建立连接
              Connection connection = factory.newConnection();
              // 2.创建通道Channel
              Channel channel = connection.createChannel();
              // 3.创建队列
              String queueName = "simple.queue";
              channel.queueDeclare(queueName, false, false, false, null);
              // 4.订阅消息
              channel.basicConsume(queueName, true, new DefaultConsumer(channel){
                  @Override
                  public void handleDelivery(String consumerTag, Envelope envelope,
                                             AMQP.BasicProperties properties, byte[] body) throws IOException {
                      // 5.处理消息
                      String message = new String(body);
                      System.out.println("接收到消息:【" + message + "】");
                  }
              });
              System.out.println("等待接收消息。。。。");
          }
      }

      2.SpringAMQP

      SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。

      SpringAmqp的官方地址:Spring AMQP

       RabbitMQ--消息模型,第4张

       

      SpringAMQP提供了三个功能:

      • 自动声明队列、交换机及其绑定关系

      • 基于注解的监听器模式,异步接收消息

      • 封装了RabbitTemplate工具,用于发送消息

        3.Basic Queue 简单队列模型

        在父工程mq-demo中引入依赖

        
        
            org.springframework.boot
            spring-boot-starter-amqp
        

        3.1.消息发送

        首先配置MQ地址,在publisher服务的application.yml中添加配置:

        spring:
          rabbitmq:
            host: 192.168.150.101 # 主机名
            port: 5672 # 端口
            virtual-host: / # 虚拟主机
            username: itcast # 用户名
            password: 123321 # 密码

        然后在publisher服务中编写测试类SpringAmqpTest,并利用RabbitTemplate实现消息发送:

        package cn.itcast.mq.spring;
        import org.junit.Test;
        import org.junit.runner.RunWith;
        import org.springframework.amqp.rabbit.core.RabbitTemplate;
        import org.springframework.beans.factory.annotation.Autowired;
        import org.springframework.boot.test.context.SpringBootTest;
        import org.springframework.test.context.junit4.SpringRunner;
        @RunWith(SpringRunner.class)
        @SpringBootTest
        public class SpringAmqpTest {
            @Autowired
            private RabbitTemplate rabbitTemplate;
            @Test
            public void testSimpleQueue() {
                // 队列名称
                String queueName = "simple.queue";
                // 消息
                String message = "hello, spring amqp!";
                // 发送消息
                rabbitTemplate.convertAndSend(queueName, message);
            }
        }

        3.2.消息接收

        首先配置MQ地址,在consumer服务的application.yml中添加配置:

        spring:
          rabbitmq:
            host: 192.168.150.101 # 主机名
            port: 5672 # 端口
            virtual-host: / # 虚拟主机
            username: itcast # 用户名
            password: 123321 # 密码

        然后在consumer服务的cn.itcast.mq.listener包中新建一个类SpringRabbitListener,代码如下:

        package cn.itcast.mq.listener;
        import org.springframework.amqp.rabbit.annotation.RabbitListener;
        import org.springframework.stereotype.Component;
        @Component
        public class SpringRabbitListener {
            @RabbitListener(queues = "simple.queue")
            public void listenSimpleQueueMessage(String msg) throws InterruptedException {
                System.out.println("spring 消费者接收到消息:【" + msg + "】");
            }
        }

        3.3.测试

        启动consumer服务,然后在publisher服务中运行测试代码,发送MQ消息

        4.WorkQueue

        Work queues,也被称为(Task queues),任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。

        RabbitMQ--消息模型,第5张

        当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。

        此时就可以使用work 模型,多个消费者共同处理消息处理,速度就能大大提高了。

        4.1.消息发送

        这次我们循环发送,模拟大量消息堆积现象。

        在publisher服务中的SpringAmqpTest类中添加一个测试方法:

        /**
             * workQueue
             * 向队列中不停发送消息,模拟消息堆积。
             */
        @Test
        public void testWorkQueue() throws InterruptedException {
            // 队列名称
            String queueName = "simple.queue";
            // 消息
            String message = "hello, message_";
            for (int i = 0; i < 50; i++) {
                // 发送消息
                rabbitTemplate.convertAndSend(queueName, message + i);
                Thread.sleep(20);
            }
        }

        4.2.消息接收

        要模拟多个消费者绑定同一个队列,我们在consumer服务的SpringRabbitListener中添加2个新的方法:

        @RabbitListener(queues = "simple.queue")
        public void listenWorkQueue1(String msg) throws InterruptedException {
            System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
            Thread.sleep(20);
        }
        @RabbitListener(queues = "simple.queue")
        public void listenWorkQueue2(String msg) throws InterruptedException {
            System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
            Thread.sleep(200);
        }

        注意到这个消费者sleep了1000秒,模拟任务耗时。

        4.3.测试

        启动ConsumerApplication后,在执行publisher服务中刚刚编写的发送测试方法testWorkQueue。

        可以看到消费者1很快完成了自己的25条消息。消费者2却在缓慢的处理自己的25条消息。

        也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。这样显然是有问题的。

        4.4.能者多劳

        在spring中有一个简单的配置,可以解决这个问题。我们修改consumer服务的application.yml文件,添加配置:

        spring:
          rabbitmq:
            listener:
              simple:
                prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

        5.发布/订阅

        发布订阅的模型如图:

        RabbitMQ--消息模型,第6张

        可以看到,在订阅模型中,多了一个exchange角色,而且过程略有变化:

        • Publisher:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)

        • Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:

          • Fanout:广播,将消息交给所有绑定到交换机的队列

          • Direct:定向,把消息交给符合指定routing key 的队列

          • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

        • Consumer:消费者,与以前一样,订阅队列,没有变化

        • Queue:消息队列也与以前一样,接收消息、缓存消息。

          Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

          5.1.Fanout

          Fanout,英文翻译是扇出,我觉得在MQ中叫广播更合适。

           RabbitMQ--消息模型,第7张

           

          在广播模式下,消息发送流程是这样的:

          • 1) 可以有多个队列

          • 2) 每个队列都要绑定到Exchange(交换机)

          • 3) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定

          • 4) 交换机把消息发送给绑定过的所有队列

          • 5) 订阅队列的消费者都能拿到消息

            我们的计划是这样的:

            • 创建一个交换机 itcast.fanout,类型是Fanout

            • 创建两个队列fanout.queue1和fanout.queue2,绑定到交换机itcast.fanout

              RabbitMQ--消息模型,第8张

               Spring提供了一个接口Exchange,来表示所有不同类型的交换机:

              RabbitMQ--消息模型,第9张

               在consumer中创建一个类,声明队列和交换机:

              package cn.itcast.mq.config;
              import org.springframework.amqp.core.Binding;
              import org.springframework.amqp.core.BindingBuilder;
              import org.springframework.amqp.core.FanoutExchange;
              import org.springframework.amqp.core.Queue;
              import org.springframework.context.annotation.Bean;
              import org.springframework.context.annotation.Configuration;
              @Configuration
              public class FanoutConfig {
                  /**
                   * 声明交换机
                   * @return Fanout类型交换机
                   */
                  @Bean
                  public FanoutExchange fanoutExchange(){
                      return new FanoutExchange("itcast.fanout");
                  }
                  /**
                   * 第1个队列
                   */
                  @Bean
                  public Queue fanoutQueue1(){
                      return new Queue("fanout.queue1");
                  }
                  /**
                   * 绑定队列和交换机
                   */
                  @Bean
                  public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
                      return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
                  }
                  /**
                   * 第2个队列
                   */
                  @Bean
                  public Queue fanoutQueue2(){
                      return new Queue("fanout.queue2");
                  }
                  /**
                   * 绑定队列和交换机
                   */
                  @Bean
                  public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
                      return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
                  }
              }

              5.1.1.消息发送

              在publisher服务的SpringAmqpTest类中添加测试方法:

              @Test
              public void testFanoutExchange() {
                  // 队列名称
                  String exchangeName = "itcast.fanout";
                  // 消息
                  String message = "hello, everyone!";
                  rabbitTemplate.convertAndSend(exchangeName, "", message);
              }

              5.1.2.消息接收

              在consumer服务的SpringRabbitListener中添加两个方法,作为消费者:

              @RabbitListener(queues = "fanout.queue1")
              public void listenFanoutQueue1(String msg) {
                  System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
              }
              @RabbitListener(queues = "fanout.queue2")
              public void listenFanoutQueue2(String msg) {
                  System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
              }

              5.2.Direct

              在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

              RabbitMQ--消息模型,第10张

              在Direct模型下:

              • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)

              • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。

              • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

                案例需求如下:

                1. 利用@RabbitListener声明Exchange、Queue、RoutingKey

                2. 在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2

                3. 在publisher中编写测试方法,向itcast. direct发送消息

                RabbitMQ--消息模型,第11张

                5.2.1.基于注解声明队列和交换机

                基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。

                在consumer的SpringRabbitListener中添加两个消费者,同时基于注解来声明队列和交换机:

                @RabbitListener(bindings = @QueueBinding(
                    value = @Queue(name = "direct.queue1"),
                    exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
                    key = {"red", "blue"}
                ))
                public void listenDirectQueue1(String msg){
                    System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
                }
                @RabbitListener(bindings = @QueueBinding(
                    value = @Queue(name = "direct.queue2"),
                    exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
                    key = {"red", "yellow"}
                ))
                public void listenDirectQueue2(String msg){
                    System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
                }

                5.2.2.消息发送

                在publisher服务的SpringAmqpTest类中添加测试方法:

                @Test
                public void testSendDirectExchange() {
                    // 交换机名称
                    String exchangeName = "itcast.direct";
                    // 消息
                    String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
                    // 发送消息
                    rabbitTemplate.convertAndSend(exchangeName, "red", message);
                }

                5.3.Topic

                5.3.1.说明

                Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!

                Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

                通配符规则:

                #:匹配一个或多个词

                *:匹配不多不少恰好1个词

                举例:

                 item.#:能够匹配item.spu.insert 或者 item.spu

                 item.*:只能匹配item.spu

                图示:

                RabbitMQ--消息模型,第12张

                解释:

                • Queue1:绑定的是china.# ,因此凡是以 china.开头的routing key 都会被匹配到。包括china.news和china.weather

                • Queue2:绑定的是#.news ,因此凡是以 .news结尾的 routing key 都会被匹配。包括china.news和japan.news

                  案例需求:

                  实现思路如下:

                  1. 并利用@RabbitListener声明Exchange、Queue、RoutingKey

                  2. 在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2

                  3. 在publisher中编写测试方法,向itcast. topic发送消息

                  RabbitMQ--消息模型,第13张

                  5.3.2.消息发送

                  在publisher服务的SpringAmqpTest类中添加测试方法:

                  /**
                       * topicExchange
                       */
                  @Test
                  public void testSendTopicExchange() {
                      // 交换机名称
                      String exchangeName = "itcast.topic";
                      // 消息
                      String message = "喜报!孙悟空大战哥斯拉,胜!";
                      // 发送消息
                      rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
                  }

                  5.3.3.消息接收

                  在consumer服务的SpringRabbitListener中添加方法:

                  @RabbitListener(bindings = @QueueBinding(
                      value = @Queue(name = "topic.queue1"),
                      exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
                      key = "china.#"
                  ))
                  public void listenTopicQueue1(String msg){
                      System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
                  }
                  @RabbitListener(bindings = @QueueBinding(
                      value = @Queue(name = "topic.queue2"),
                      exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
                      key = "#.news"
                  ))
                  public void listenTopicQueue2(String msg){
                      System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
                  }

                  喜欢的话,宝子们点个关注或赞吧!

                  RabbitMQ--消息模型,第14张 

网友评论

搜索
最新文章
热门文章
热门标签