上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

RabbitMQ 的七种消息传递形式

guduadmin51天前

文章目录

  • 一、RabbitMQ 架构简介
    • 二、准备工作
    • 三、消息收发
      • 1. Hello World
      • 2. Work queues
      • 3. Publish/Subscrite
        • 3.1. Direct
        • 3.2. Fanout
        • 3.3. Topic
        • 3.4. Header
        • 4. Routing
        • 5. Topics

          大部分情况下,我们可能都是在 Spring Boot 或者 Spring Cloud 环境下使用 RabbitMQ,因此本文我也主要从这两个方面来和大家分享 RabbitMQ 的用法。

          一、RabbitMQ 架构简介

          一图胜千言,如下:

          RabbitMQ 的七种消息传递形式,在这里插入图片描述,第1张

          这张图中涉及到如下一些概念:

          生产者(Publisher):发布消息到 RabbitMQ 中的交换机(Exchange)上。

          交换机(Exchange):和生产者建立连接并接收生产者的消息。

          消费者(Consumer):监听 RabbitMQ 中的 Queue 中的消息。

          队列(Queue):Exchange 将消息分发到指定的 Queue,Queue 和消费者进行交互。

          路由(Routes):交换机转发消息到队列的规则。

          二、准备工作

          大家知道,RabbitMQ 是 AMQP 阵营里的产品,Spring Boot 为 AMQP 提供了自动化配置依赖 spring-boot-starter-amqp,因此首先创建 Spring Boot 项目并添加该依赖,如下:

          RabbitMQ 的七种消息传递形式,在这里插入图片描述,第2张

          项目创建成功后,在 application.properties 中配置 RabbitMQ 的基本连接信息,如下:

          spring.rabbitmq.host=localhost
          spring.rabbitmq.username=guest
          spring.rabbitmq.password=guest
          spring.rabbitmq.port=5672
          

          接下来进行 RabbitMQ 配置,在 RabbitMQ 中,所有的消息生产者提交的消息都会交由 Exchange 进行再分配,Exchange 会根据不同的策略将消息分发到不同的 Queue 中。

          RabbitMQ 官网介绍了如下几种消息分发的形式:

          RabbitMQ 的七种消息传递形式,在这里插入图片描述,第3张

          RabbitMQ 的七种消息传递形式,在这里插入图片描述,第4张

          RabbitMQ 的七种消息传递形式,在这里插入图片描述,第5张

          这里我主要和大家介绍前六种消息收发方式。

          三、消息收发

          1. Hello World

          咦?这个咋没有交换机?这个其实是默认的交换机,我们需要提供一个生产者一个队列以及一个消费者。消息传播图如下:

          RabbitMQ 的七种消息传递形式,在这里插入图片描述,第6张

          来看看代码实现:

          先来看看队列的定义:

          @Configuration
          public class HelloWorldConfig {
              public static final String HELLO_WORLD_QUEUE_NAME = "hello_world_queue";
              @Bean
              Queue queue1() {
                  return new Queue(HELLO_WORLD_QUEUE_NAME);
              }
          }
          

          再来看看消息消费者的定义:

          @Component
          public class HelloWorldConsumer {
              @RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME)
              public void receive(String msg) {
                  System.out.println("msg = " + msg);
              }
          }
          

          消息发送:

          @SpringBootTest
          class RabbitmqdemoApplicationTests {
              @Autowired
              RabbitTemplate rabbitTemplate;
              @Test
              void contextLoads() {
                  rabbitTemplate.convertAndSend(HelloWorldConfig.HELLO_WORLD_QUEUE_NAME, "hello");
              }
          }
          

          这个时候使用的其实是默认的直连交换机(DirectExchange),DirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange 上,当一条消息到达 DirectExchange 时会被转发到与该条消息routing key相同的 Queue 上,例如消息队列名为 “hello-queue”,则 routingkey 为 “hello-queue” 的消息会被该消息队列接收。

          2. Work queues

          这种情况是这样的:

          一个生产者,一个默认的交换机(DirectExchange),一个队列,两个消费者,如下图:

          RabbitMQ 的七种消息传递形式,在这里插入图片描述,第7张

          一个队列对应了多个消费者,默认情况下,由队列对消息进行平均分配,消息会被分到不同的消费者手中。消费者可以配置各自的并发能力,进而提高消息的消费能力,也可以配置手动 ack,来决定是否要消费某一条消息。

          先来看并发能力的配置,如下:

          @Component
          public class HelloWorldConsumer {
              @RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME)
              public void receive(String msg) {
                  System.out.println("receive = " + msg);
              }
              @RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME,concurrency = "10")
              public void receive2(String msg) {
                  System.out.println("receive2 = " + msg+"------->"+Thread.currentThread().getName());
              }
          }
          

          可以看到,第二个消费者我配置了 concurrency 为 10,此时,对于第二个消费者,将会同时存在 10 个子线程去消费消息。

          启动项目,在 RabbitMQ 后台也可以看到一共有 11 个消费者。

          RabbitMQ 的七种消息传递形式,在这里插入图片描述,第8张

          此时,如果生产者发送 10 条消息,就会一下都被消费掉。

          消息发送方式如下:

          @SpringBootTest
          class RabbitmqdemoApplicationTests {
              @Autowired
              RabbitTemplate rabbitTemplate;
              @Test
              void contextLoads() {
                  for (int i = 0; i < 10; i++) {
                      rabbitTemplate.convertAndSend(HelloWorldConfig.HELLO_WORLD_QUEUE_NAME, "hello");
                  }
              }
          }
          

          消息消费日志如下:

          RabbitMQ 的七种消息传递形式,在这里插入图片描述,第9张

          可以看到,消息都被第一个消费者消费了。但是小伙伴们需要注意,事情并不总是这样(多试几次就可以看到差异),消息也有可能被第一个消费者消费(只是由于第二个消费者有十个线程一起开动,所以第二个消费者消费的消息占比更大)。

          当然消息消费者也可以开启手动 ack,这样可以自行决定是否消费 RabbitMQ 发来的消息,配置手动 ack 的方式如下:

          spring.rabbitmq.listener.simple.acknowledge-mode=manual
          

          消费代码如下:

          @Component
          public class HelloWorldConsumer {
              @RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME)
              public void receive(Message message,Channel channel) throws IOException {
                  System.out.println("receive="+message.getPayload());
                  channel.basicAck(((Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG)),true);
              }
              @RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME, concurrency = "10")
              public void receive2(Message message, Channel channel) throws IOException {
                  System.out.println("receive2 = " + message.getPayload() + "------->" + Thread.currentThread().getName());
                  channel.basicReject(((Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG)), true);
              }
          }
          

          此时第二个消费者拒绝了所有消息,第一个消费者消费了所有消息。

          这就是 Work queues 这种情况。

          3. Publish/Subscrite

          再来看发布订阅模式,这种情况是这样:

          一个生产者,多个消费者,每一个消费者都有自己的一个队列,生产者没有将消息直接发送到队列,而是发送到了交换机,每个队列绑定交换机,生产者发送的消息经过交换机,到达队列,实现一个消息被多个消费者获取的目的。需要注意的是,如果将消息发送到一个没有队列绑定的 Exchange上面,那么该消息将会丢失,这是因为在 RabbitMQ 中 Exchange 不具备存储消息的能力,只有队列具备存储消息的能力,如下图:

          RabbitMQ 的七种消息传递形式,在这里插入图片描述,第10张

          这种情况下,我们有四种交换机可供选择,分别是:

          Direct

          Fanout

          Topic

          Header

          我分别来给大家举一个简单例子看下。

          3.1. Direct

          DirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange 上,当一条消息到达 DirectExchange 时会被转发到与该条消息 routing key 相同的 Queue 上,例如消息队列名为 “hello-queue”,则 routingkey 为 “hello-queue” 的消息会被该消息队列接收。DirectExchange 的配置如下:

          @Configuration
          public class RabbitDirectConfig {
              public final static String DIRECTNAME = "javaboy-direct";
              @Bean
              Queue queue() {
                  return new Queue("hello-queue");
              }
              @Bean
          DirectExchange directExchange() {
                  return new DirectExchange(DIRECTNAME, true, false);
              }
              @Bean
              Binding binding() {
                  return BindingBuilder.bind(queue())
                          .to(directExchange()).with("direct");
              }
          }
          
          • 首先提供一个消息队列Queue,然后创建一个DirectExchange对象,三个参数分别是名字,重启后是否依然有效以及长期未用时是否删除。
          • 创建一个Binding对象将Exchange和Queue绑定在一起。
          • DirectExchange和Binding两个Bean的配置可以省略掉,即如果使用DirectExchange,可以只配置一个Queue的实例即可。

            再来看看消费者:

            @Component
            public class DirectReceiver {
                @RabbitListener(queues = "hello-queue")
                public void handler1(String msg) {
                    System.out.println("DirectReceiver:" + msg);
                }
            }
            

            通过 @RabbitListener 注解指定一个方法是一个消息消费方法,方法参数就是所接收到的消息。然后在单元测试类中注入一个 RabbitTemplate 对象来进行消息发送,如下:

            @RunWith(SpringRunner.class)
            @SpringBootTest
            public class RabbitmqApplicationTests {
                @Autowired
                RabbitTemplate rabbitTemplate;
                @Test
                public void directTest() {
                    rabbitTemplate.convertAndSend("hello-queue", "hello direct!");
                }
            }
            

            最终执行结果如下:

            RabbitMQ 的七种消息传递形式,在这里插入图片描述,第11张

            3.2. Fanout

            FanoutExchange 的数据交换策略是把所有到达 FanoutExchange 的消息转发给所有与它绑定的 Queue 上,在这种策略中,routingkey 将不起任何作用,FanoutExchange 配置方式如下:

            @Configuration
            public class RabbitFanoutConfig {
                public final static String FANOUTNAME = "sang-fanout";
                @Bean
                FanoutExchange fanoutExchange() {
                    return new FanoutExchange(FANOUTNAME, true, false);
                }
                @Bean
                Queue queueOne() {
                    return new Queue("queue-one");
                }
                @Bean
                Queue queueTwo() {
                    return new Queue("queue-two");
                }
                @Bean
                Binding bindingOne() {
                    return BindingBuilder.bind(queueOne()).to(fanoutExchange());
                }
                @Bean
                Binding bindingTwo() {
                    return BindingBuilder.bind(queueTwo()).to(fanoutExchange());
                }
            }
            

            在这里首先创建 FanoutExchange,参数含义与创建 DirectExchange 参数含义一致,然后创建两个 Queue,再将这两个 Queue 都绑定到 FanoutExchange 上。接下来创建两个消费者,如下:

            @Component
            public class FanoutReceiver {
                @RabbitListener(queues = "queue-one")
                public void handler1(String message) {
                    System.out.println("FanoutReceiver:handler1:" + message);
                }
                @RabbitListener(queues = "queue-two")
                public void handler2(String message) {
                    System.out.println("FanoutReceiver:handler2:" + message);
                }
            }
            

            两个消费者分别消费两个消息队列中的消息,然后在单元测试中发送消息,如下:

            @RunWith(SpringRunner.class)
            @SpringBootTest
            public class RabbitmqApplicationTests {
                @Autowired
                RabbitTemplate rabbitTemplate;
                @Test
                public void fanoutTest() {
                    rabbitTemplate
                    .convertAndSend(RabbitFanoutConfig.FANOUTNAME, 
                            null, "hello fanout!");
                }
            }
            

            注意这里发送消息时不需要 routingkey,指定 exchange 即可,routingkey 可以直接传一个 null。

            最终执行日志如下:

            RabbitMQ 的七种消息传递形式,在这里插入图片描述,第12张

            3.3. Topic

            TopicExchange 是比较复杂但是也比较灵活的一种路由策略,在 TopicExchange 中,Queue 通过 routingkey 绑定到 TopicExchange 上,当消息到达 TopicExchange 后,TopicExchange 根据消息的 routingkey 将消息路由到一个或者多个 Queue 上。TopicExchange 配置如下:

            @Configuration
            public class RabbitTopicConfig {
                public final static String TOPICNAME = "sang-topic";
                @Bean
                TopicExchange topicExchange() {
                    return new TopicExchange(TOPICNAME, true, false);
                }
                @Bean
                Queue xiaomi() {
                    return new Queue("xiaomi");
                }
                @Bean
                Queue huawei() {
                    return new Queue("huawei");
                }
                @Bean
                Queue phone() {
                    return new Queue("phone");
                }
                @Bean
                Binding xiaomiBinding() {
                    return BindingBuilder.bind(xiaomi()).to(topicExchange())
                            .with("xiaomi.#");
                }
                @Bean
                Binding huaweiBinding() {
                    return BindingBuilder.bind(huawei()).to(topicExchange())
                            .with("huawei.#");
                }
                @Bean
                Binding phoneBinding() {
                    return BindingBuilder.bind(phone()).to(topicExchange())
                            .with("#.phone.#");
                }
            }
            
            • 首先创建 TopicExchange,参数和前面的一致。然后创建三个 Queue,第一个 Queue 用来存储和 “xiaomi” 有关的消息,第二个 Queue 用来存储和 “huawei” 有关的消息,第三个 Queue 用来存储和 “phone” 有关的消息。
            • 将三个 Queue 分别绑定到 TopicExchange 上,第一个 Binding 中的 “xiaomi.#” 表示消息的 routingkey 凡是以 “xiaomi” 开头的,都将被路由到名称为 “xiaomi” 的 Queue 上,第二个 Binding 中的 “huawei.#” 表示消息的 routingkey 凡是以 “huawei” 开头的,都将被路由到名称为 “huawei” 的 Queue 上,第三个 Binding 中的 “#.phone.#” 则表示消息的 routingkey 中凡是包含 “phone” 的,都将被路由到名称为 “phone” 的 Queue 上。

              接下来针对三个 Queue 创建三个消费者,如下:

              @Component
              public class TopicReceiver {
                  @RabbitListener(queues = "phone")
                  public void handler1(String message) {
                      System.out.println("PhoneReceiver:" + message);
                  }
                  @RabbitListener(queues = "xiaomi")
                  public void handler2(String message) {
                      System.out.println("XiaoMiReceiver:"+message);
                  }
                  @RabbitListener(queues = "huawei")
                  public void handler3(String message) {
                      System.out.println("HuaWeiReceiver:"+message);
                  }
              }
              

              然后在单元测试中进行消息的发送,如下:

              @RunWith(SpringRunner.class)
              @SpringBootTest
              public class RabbitmqApplicationTests {
                  @Autowired
                  RabbitTemplate rabbitTemplate;
                  @Test
                  public void topicTest() {
                      rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,
              "xiaomi.news","小米新闻..");
                      rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,
              "huawei.news","华为新闻..");
                      rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,
              "xiaomi.phone","小米手机..");
                      rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,
              "huawei.phone","华为手机..");
                      rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,
              "phone.news","手机新闻..");
                  }
              }
              

              根据 RabbitTopicConfig 中的配置,第一条消息将被路由到名称为 “xiaomi” 的 Queue 上,第二条消息将被路由到名为 “huawei” 的 Queue 上,第三条消息将被路由到名为 “xiaomi” 以及名为 “phone” 的 Queue 上,第四条消息将被路由到名为 “huawei” 以及名为 “phone” 的 Queue 上,最后一条消息则将被路由到名为 “phone” 的 Queue 上。

              3.4. Header

              HeadersExchange 是一种使用较少的路由策略,HeadersExchange 会根据消息的 Header 将消息路由到不同的 Queue 上,这种策略也和 routingkey无关,配置如下:

              @Configuration
              public class RabbitHeaderConfig {
                  public final static String HEADERNAME = "javaboy-header";
                  @Bean
                  HeadersExchange headersExchange() {
                      return new HeadersExchange(HEADERNAME, true, false);
                  }
                  @Bean
                  Queue queueName() {
                      return new Queue("name-queue");
                  }
                  @Bean
                  Queue queueAge() {
                      return new Queue("age-queue");
                  }
                  @Bean
                  Binding bindingName() {
                      Map map = new HashMap<>();
                      map.put("name", "sang");
                      return BindingBuilder.bind(queueName())
                              .to(headersExchange()).whereAny(map).match();
                  }
                  @Bean
                  Binding bindingAge() {
                      return BindingBuilder.bind(queueAge())
                              .to(headersExchange()).where("age").exists();
                  }
              }
              

              这里的配置大部分和前面介绍的一样,差别主要体现的 Binding 的配置上,第一个 bindingName 方法中,whereAny 表示消息的 Header 中只要有一个 Header 匹配上 map 中的 key/value,就把该消息路由到名为 “name-queue” 的 Queue 上,这里也可以使用 whereAll 方法,表示消息的所有 Header 都要匹配。whereAny 和 whereAll 实际上对应了一个名为 x-match 的属性。bindingAge 中的配置则表示只要消息的 Header 中包含 age,不管 age 的值是多少,都将消息路由到名为 “age-queue” 的 Queue 上。

              接下来创建两个消息消费者:

              @Component
              public class HeaderReceiver {
                  @RabbitListener(queues = "name-queue")
                  public void handler1(byte[] msg) {
                      System.out.println("HeaderReceiver:name:"
                              + new String(msg, 0, msg.length));
                  }
                  @RabbitListener(queues = "age-queue")
                  public void handler2(byte[] msg) {
                      System.out.println("HeaderReceiver:age:"
                              + new String(msg, 0, msg.length));
                  }
              }
              

              注意这里的参数用 byte 数组接收。然后在单元测试中创建消息的发送方法,这里消息的发送也和 routingkey 无关,如下:

              @RunWith(SpringRunner.class)
              @SpringBootTest
              public class RabbitmqApplicationTests {
                  @Autowired
                  RabbitTemplate rabbitTemplate;
                  @Test
                  public void headerTest() {
                      Message nameMsg = MessageBuilder
                              .withBody("hello header! name-queue".getBytes())
                              .setHeader("name", "sang").build();
                      Message ageMsg = MessageBuilder
                              .withBody("hello header! age-queue".getBytes())
                              .setHeader("age", "99").build();
                      rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME, null, ageMsg);
                      rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME, null, nameMsg);
                  }
              }
              

              这里创建两条消息,两条消息具有不同的 header,不同 header 的消息将被发到不同的 Queue 中去。

              最终执行效果如下:

              RabbitMQ 的七种消息传递形式,在这里插入图片描述,第13张

              4. Routing

              这种情况是这样:

              一个生产者,一个交换机,两个队列,两个消费者,生产者在创建 Exchange 后,根据 RoutingKey 去绑定相应的队列,并且在发送消息时,指定消息的具体 RoutingKey 即可。

              如下图:

              RabbitMQ 的七种消息传递形式,在这里插入图片描述,第14张

              这个就是按照 routing key 去路由消息,我这里就不再举例子了,大家可以参考 3.3.1 小结。

              5. Topics

              这种情况是这样:

              一个生产者,一个交换机,两个队列,两个消费者,生产者创建 Topic 的 Exchange 并且绑定到队列中,这次绑定可以通过 * 和 # 关键字,对指定 RoutingKey 内容,编写时注意格式 xxx.xxx.xxx 去编写。

              如下图:

              RabbitMQ 的七种消息传递形式,在这里插入图片描述,第15张

              这个我也就不举例啦,前面 3.3.3 小节已经举过例子了,不再赘述。

网友评论

搜索
最新文章
热门文章
热门标签