上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

服务异步通讯之 SpringAMQP【微服务】

guduadmin503月前

文章目录

    • 一、初识 MQ
      • 1. 同步通讯
      • 2. 异步通讯
      • 3. MQ 常见框架
      • 二、RabbitMQ 入门
        • 1. 概述和安装
        • 2. 常见消息模型
        • 3. 基础模型练习
        • 三、SpringAMQP
          • 1. 简单队列模型
          • 2. 工作队列模型
          • 3. 发布订阅模型
            • 3.1 Fanout Exchange
            • 3.2 Direct Exchange
            • 3.3 Topic Exchange

              一、初识 MQ

              1. 同步通讯

              同步和异步的区别:

              ① 同步通讯类似于打电话,是一对一的同时发生的通讯,因此它的时效性更好(一步走完才能走下一步);

              ② 异步通讯类似于发短信,给一个人发的同时还可以给别人发,支持多线操作,但是由于通讯不同步,所以它的时效性差(多步可以同时走)。

              微服务间基于 Feign 的调用就属于同步调用,比如支付服务内部一般都会调用订单服务,这个时候它就必须要等待对方的回应才能进行后面的操作,显然这个过程是实时性的,但也存在一定的问题。

              同步通讯劣势:

              ① 耦合度高。每次加入新的需求,都要修改原来的代码;

              ② 性能下降。调用者需要等待服务提供者的响应,响应时间等于每次调用的时间之和,如果调用链过长会直接导致性能下降;

              ③ 资源浪费。调用链中的每个服务在等待响应的过程中,不能释放所占用的资源。 A 服务调用 B 服务,在 B 未执行完之前,A 一直干等着,它占用的资源也不会释放;

              ④ 级联失败。如果服务提供者出现问题,所有的调用方也会跟着出现问题。 A 服务里面同时调用了 B 服务和 C 服务,B 服务出故障导致 A 所占用的资源一直得不到释放,一次、两次、无数次这样的请求最终导致 A 的资源被耗尽,而 A 服务里面还有 C,C 是正常的,但由于 A 的资源已被耗尽了,导致 A 调用 C 的业务时也会出现问题。

              2. 异步通讯

              异步调用常见的实现就是事件驱动模式。

              异步调用引入一个 Broker 事件代理者,当服务消费者接收到前端请求后,就会发布一个事件给 Broker,然后由 Broker 广播,将事件通知给所有的提供者,让这些提供者各自开始行动。

              同步通讯中消费者会在代码中依次调用提供者,而异步通讯中的消费者不再负责调用提供者,它只管发布事件到 Broker,由提供者自行订阅事件就可以了。

              异步通讯优势:

              ① 解除了服务与服务之间的耦合。消费者不再负责调用提供者,只管发布事件到 Broker 就可以了, 所以一旦有新的业务出现,直接去订阅事件就可以了;

              ② 性能提升,吞吐量提高。消费者发布完事件后立马就会响应给客户,至于提供者什么时候运行完我们并不在意, 只要最终能做完就行了,所以业务的总耗时就为消费者的请求时间和事件的发布时间之和;

              ③ 不存在资源浪费。 什么是资源浪费?就是占着资源却不用。以前消费者会占着资源等待提供者运行完毕,现在服务之间已经彻底解耦了,又怎么会浪费;

              ④ 解决级联失败问题。服务之间没有强依赖,不必担心级联失败问题, 即使某一个提供者挂掉了,也不会影响消费者;

              ⑤ 流量削峰。 当大量的用户请求过来的时候,消费者会发布大量的事件给 Broker,因为提供者处理业务的能力有限,所以 Broker 在这里会对事件做一个缓冲,具体的提供者根据自己的能力处理业务,压力全给到 Broker 就可以了,最终对微服务起到一个保护的作用。

              异步通讯劣势:

              ① 依赖于 Broker 的可靠性、安全性和吞吐能力;

              ② 架构复杂,业务没有明显的流程线,不好追踪管理。

              异步通讯只是通知提供者去干,至于什么时候干完谁都不知道。实际上大多数情况下都会使用同步通讯,因为提供者返回的结果消费者立马要用,我们更追求的是时效性,而对并发并没有很高的要求!

              3. MQ 常见框架

              MQ 即消息队列,也就是事件驱动架构中的 Broker。

              服务异步通讯之 SpringAMQP【微服务】,在这里插入图片描述,第1张

              比较:

              ① RabbitMQ 支持多种协议,这意味着它支持的功能更多,其中 AMQP 可以实现跨语言通讯,而 RocketMQ 和 Kafka是自定义的协议;

              ② RabbitMQ、RocketMQ 和 Kafka 都支持主从集群,可用性较高;

              ③ Kafka的吞吐量非常高,但是高吞吐量带来的牺牲就是消息的延迟性和不可靠性,容易出现消息丢失的情况。

              Kafka 适用于海量数据的传输,但是对数据的安全要求不高的情况(比如日志信息),RabbitMQ 和 RocketMQ 的稳定性更强,更适合对稳定性要求较高的场景(比如业务通信),如果要做自定义协议,我们可以选择 RocketMQ,因为 RockerMQ 是基于 Java 语言开发的,且支持自定义协议。

              二、RabbitMQ 入门

              1. 概述和安装

              RabbitMQ 的整体结构如下:

              服务异步通讯之 SpringAMQP【微服务】,在这里插入图片描述,第2张

              消息的发布者将来会把消息发送到 exchange 交换机,交换机负责将消息路由到具体的 queue 队列,队列负责暂存消息,消息的接收者就会从队列中拿取并处理消息。每个用户会有自己的虚拟主机,各个虚拟主机之间是相互隔离的,这样可以避免干扰!

              ① 将本地的压缩文件上传至 Linux,并加载为一个镜像

              服务异步通讯之 SpringAMQP【微服务】,在这里插入图片描述,第3张

              服务异步通讯之 SpringAMQP【微服务】,在这里插入图片描述,第4张

              ② 创建并运行 MQ 容器,15672 是 RibbitMQ 控制台的端口,5672 是消息通讯的端口(发消息、收消息)

              docker run \
              -e RABBITMQ_DEFAULT_USER=zxe \
              -e RABBITMQ_DEFAULT_PASS=856724bb \
              --name mq \
              --hostname mq1 \
              -p 15672:15672 \
              -p 5672:5672 \
              -d \
              rabbitmq:3-management
              

              服务异步通讯之 SpringAMQP【微服务】,在这里插入图片描述,第5张

              ③ 浏览器访问一下 RabbitMQ 的控制台,根据刚才设置的用户名和密码登录

              服务异步通讯之 SpringAMQP【微服务】,在这里插入图片描述,第6张

              服务异步通讯之 SpringAMQP【微服务】,在这里插入图片描述,第7张

              每个模块的含义:

              ① Overview(总览):主要可以看到 RabbitMQ 的一些节点信息;

              ② Connections(连接):将来消息的发布者和接收者都应该与 RabbitMQ 建立连接;

              ③ Channels(通道):是操作 MQ 的工具,建立连接之后必须创建一个 Channel 通道,提供者和消费者才能基于 Channel 完成消息的发送或接收;

              ④ Exchanges(交换机):消息的路由器,负责将消息路由到对面;

              ⑤ Queues(队列):用来做消息存储;

              ⑥ Admin(管理):管理当前用户信息;

              ⑦ virtual host(虚拟主机):是对 queue、exchange 等资源的逻辑分组。

              2. 常见消息模型

              RabbitMQ 官方文档:rabbitmq.com

              查看官方文档可以看到官方给出的很多 Demo 案例,但是只有前五个和消息的接收发送有关。

              其中,前两个是基于队列发送的,也就是说消息的发布者和订阅者之间直接通过队列连接,没有交换机。而后三个属于发布订阅模式,有交换机,根据交换机的类型分为广播、路由和主题三种。

              服务异步通讯之 SpringAMQP【微服务】,在这里插入图片描述,第8张

              服务异步通讯之 SpringAMQP【微服务】,在这里插入图片描述,第9张

              3. 基础模型练习

              官方的 HelloWorld 是基于最基础的消息队列模型来实现的,只包括三个角色:消息发布者、消息队列和订阅者。

              ① 发布消息

              发布消息基本步骤:创建连接工厂 → 设置连接参数 → 建立连接 → 创建通道 → 创建队列 → 发送消息 → 关闭通道和连接。

              import com.rabbitmq.client.Channel;
              import com.rabbitmq.client.Connection;
              import com.rabbitmq.client.ConnectionFactory;
              import org.junit.Test;
              import java.io.IOException;
              import java.util.concurrent.TimeoutException;
              public class PublisherTest {
                  @Test
                  public void testSendMessage() throws IOException, TimeoutException {
                      // 1.创建连接工厂
                      ConnectionFactory factory = new ConnectionFactory();
                      // 2.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
                      factory.setHost("192.168.149.101");
                      factory.setPort(5672);
                      factory.setVirtualHost("/");
                      factory.setUsername("zxe");
                      factory.setPassword("123321");
                      // 3.建立连接
                      Connection connection = factory.newConnection();
                      // 4.创建通道Channel
                      Channel channel = connection.createChannel();
                      // 5.创建队列
                      String queueName = "simple.queue";
                      channel.queueDeclare(queueName, false, false, false, null);
                      // 6.发送消息
                      String message = "hello, rabbitmq!";
                      channel.basicPublish("", queueName, null, message.getBytes());
                      System.out.println("发送消息成功:【" + message + "】");
                      // 7.关闭通道和连接
                      channel.close();
                      connection.close();
                  }
              }
              

              服务异步通讯之 SpringAMQP【微服务】,在这里插入图片描述,第10张

              服务异步通讯之 SpringAMQP【微服务】,在这里插入图片描述,第11张

              ② 订阅消息

              订阅消息基本步骤:创建连接工厂 → 设置连接参数 → 建立连接 → 创建通道 → 创建队列 → 订阅消息 → 处理消息。

              import com.rabbitmq.client.*;
              import java.io.IOException;
              import java.util.concurrent.TimeoutException;
              public class ConsumerTest {
                  public static void main(String[] args) throws IOException, TimeoutException {
                      // 1.建立连接
                      ConnectionFactory factory = new ConnectionFactory();
                      // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
                      factory.setHost("192.168.149.101");
                      factory.setPort(5672);
                      factory.setVirtualHost("/");
                      factory.setUsername("zxe");
                      factory.setPassword("123321");
                      // 1.2.建立连接
                      Connection connection = factory.newConnection();
                      // 2.创建通道Channel
                      Channel channel = connection.createChannel();
                      // 3.创建队列
                      String queueName = "simple.queue";
                      channel.queueDeclare(queueName, false, false, false, null);
                      // 4.订阅消息
                      channel.basicConsume(queueName, true, new DefaultConsumer(channel){
                          @Override
                          public void handleDelivery(String consumerTag, Envelope envelope,
                                                     AMQP.BasicProperties properties, byte[] body) throws IOException {
                              // 5.处理消息,队列一有消息函数就会执行,body就是消息体
                              String message = new String(body);
                              System.out.println("接收到消息:【" + message + "】");
                          }
                      });
                      System.out.println("等待接收消息。。。。");
                  }
              }
              

              发布者已经创建了队列,订阅者为什么还要创建?这是因为它们的先后执行顺序是不确定的,以防万一订阅者在发布者之前执行,队列不存在的问题,其实最终也不会创建两个队列,而是有则不创建,没有就创建!

              从 MQ 上接收消息也需要时间,所以函数外的代码先执行,函数内的代码后执行,这就是异步。

              服务异步通讯之 SpringAMQP【微服务】,在这里插入图片描述,第12张

              消息一旦被接收,就会立马从队列中消失。

              服务异步通讯之 SpringAMQP【微服务】,在这里插入图片描述,第13张

              三、SpringAMQP

              1. 简单队列模型

              上面案例中发布消息和订阅消息的步骤十分繁琐吧?接下来的 SpringAMQP 就是用来简化这些步骤的,它是一种模板协议,与语言和平台无关,利用 SpringAMQP 提供好的模板工具直接可以发送和接收消息,其实底层封装的还是 Rabbit 的客户端。

              ① 父工程中引入 AMQP 依赖

              
              
                  org.springframework.boot
                  spring-boot-starter-amqp
              
              

              ② 编写 publisher 和 consumer 的 application.yml,添加连接信息

              spring:
                rabbitmq:
                  host: 192.168.150.101 #主机名
                  port: 5672 #端口
                  virtual-host: / #虚拟主机
                  username: zxe #用户名
                  password: 123321 #密码
              

              ③ SpringAMQP 方式并不会主动创建队列,需要我们提前去控制台创建 simple.queue 队列

              服务异步通讯之 SpringAMQP【微服务】,在这里插入图片描述,第14张

              ④ 在 publisher 中编写测试方法,向 simple.queue 队列发送消息

              主机名、端口等信息都写在配置文件里面了,代码中只需要传入队列名和所需发送的消息即可。

              import org.junit.Test;
              import org.junit.runner.RunWith;
              import org.springframework.amqp.rabbit.core.RabbitTemplate;
              import org.springframework.beans.factory.annotation.Autowired;
              import org.springframework.boot.test.context.SpringBootTest;
              import org.springframework.test.context.junit4.SpringRunner;
              @RunWith(SpringRunner.class)
              @SpringBootTest
              public class SpringAmqpTest {
                  @Autowired
                  private RabbitTemplate rabbitTemplate;
                  @Test
                  public void testSendMessage2SimpleQueue() {
                      String queueName = "simple.queue";
                      String message = "hello, spring amqp!";
                      //发送消息
                      rabbitTemplate.convertAndSend(queueName, message);
                  }
              }
              

              ⑤ 发布一个消息,去 RabbitMQ 控制台看一下

              服务异步通讯之 SpringAMQP【微服务】,在这里插入图片描述,第15张

              服务异步通讯之 SpringAMQP【微服务】,在这里插入图片描述,第16张

              ⑥ 在 consumer 服务中新建一个类,用于编写消费逻辑

              主机名、端口等信息都写在配置文件里面了,代码中只需要指定监听的队列名和接下来的行为方法就可以了,Spring 会自动将监听到的消息以参数的形式传递给行为方法。

              import org.springframework.amqp.rabbit.annotation.RabbitListener;
              import org.springframework.stereotype.Component;
              @Component
              public class SpringRabbitListener {
                  @RabbitListener(queues = "simple.queue")
                  public void listenSimpleQueue(String msg) {
                      System.out.println("消息者已接收到simple.queue的消息:【" + msg + "】");
                  }
              }
              

              ⑦ 运行 Consumer 的启动类,同时去看一个 RabbitMQ 控制台,发现队列中的消息条数变成了 0,说明消息已经被成功消费了

              服务异步通讯之 SpringAMQP【微服务】,在这里插入图片描述,第17张

              服务异步通讯之 SpringAMQP【微服务】,在这里插入图片描述,第18张

              2. 工作队列模型

              以前我们发布者发送过来的消息都由一个消费者去处理,但是当发布者发送的消息很多时,一个消费者就可能处理不过来了,处理不过来的消息全部堆积在队列中,长期下去,队列肯定会爆满,最终造成数据丢失。

              解决办法就是我们可以安排多个消费者,让它们共同处理队列中的消息。

              消费者之间是一种合作的关系,一条消息如果被某一消费者接收了,它就会立马从队列中消失,所以不会出现重复消费的问题。

              Work Queue(工作队列),可以提高消息的处理速度,避免队列消息堆积。

              服务异步通讯之 SpringAMQP【微服务】,在这里插入图片描述,第19张

              模拟 WorkQueue,基本思路如下:

              ① 在 publisher 服务中定义测试方法,每秒产生 50 条消息,发送到 simple.queue;

              ② 在 consumer 服务中定义两个消息监听者,都监听 simple.queue 队列;

              ③ 消费者 1 每秒处理 50 条消息,消费者 2 每秒处理 10 条消息。

              ① 编写 publish 代码,for 循环模拟发送 50 条消息,每发送一次我们让线程休眠 20 毫秒,这样发完 50 条消息就是 1 秒

              @Test
              public void testSendMessage2WorkQueue() throws InterruptedException {
                  String queueName = "simple.queue";
                  String message = "hello, message_";
                  for (int i = 1; i <= 50; i++) {
                      rabbitTemplate.convertAndSend(queueName, message + i);
                      Thread.sleep(20);
                  }
              }
              

              ② 编写 consumer 代码,两个方法模拟两个消费者,同样设置休眠时间模拟消费者的处理速度

              @RabbitListener(queues = "simple.queue")
              public void listenWorkQueue1(String msg) throws InterruptedException {
                  System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
                  Thread.sleep(20);
              }
              @RabbitListener(queues = "simple.queue")
              public void listenWorkQueue2(String msg) throws InterruptedException {
                  //err打印出红色字体,比较容易区分
                  System.err.println("消费者2接收到消息:【" + msg + "】" + LocalTime.now());
                  Thread.sleep(200);
              }
              

              ③ 运行之后我们发现,消费者并没有在一秒内将消息处理完,且消费者1 只处理偶数消息,消费者2 只处理奇数消息,且消费者1 提前就处理完了,消费者2 晚了好久才把消息处理完

              服务异步通讯之 SpringAMQP【微服务】,在这里插入图片描述,第20张

              这是因为我们 RabbitMQ 的预取机制所导致的。当消息进入队列之后,我们的消费者并不是立马去消费这些消息,而是先根据轮询的方式把消息平均分配给所有的消费者,这就是预取。

              它不管每个消费者的消费能力如何,一律平均分配,这就出现了消费者1 处理完毕,消费者2 还在继续的问题。

              ④ 所以接下来,可以去配置文件里设置消费预取限制,设置 preFetch 值可以控制预期消息的上限

              spring:
                rabbitmq:
                  host: 192.168.149.100 #主机名
                  port: 5672 #端口
                  virtual-host: / #虚拟主机
                  username: zxe #用户名
                  password: 856724bb #密码
                  listener:
                    simple:
                      prefetch: 1 #每次只能获取一条消息,处理完成后才能获取下一个消息
              

              ⑤ 配置完之后重启服务,果然在一秒内完成消费

              服务异步通讯之 SpringAMQP【微服务】,在这里插入图片描述,第21张

              3. 发布订阅模型

              发布订阅模式与之前案例的区别就是,它允许将同一消息发送给多个消费者(这里的多个消费者指的是多个不同的服务),实现方式是加入了 exchange 交换机。

              发布者将消息发送给交换机,交换机负责将消息路由到不同的队列,即一个消息同时被多个消费者消费。

              注意交换机只负责消息的路由,而不是存储,如果路由失败则消息丢失!

              3.1 Fanout Exchange

              Fanout Exchange 会将接收到的消息路由到每一个跟其绑定的 queue,因此称为广播模式。

              一次发布,多个消费者都能接受。

              服务异步通讯之 SpringAMQP【微服务】,在这里插入图片描述,第22张

              ① 声明队列、交换机,并将队列绑定到交换机上

              import org.springframework.amqp.core.Binding;
              import org.springframework.amqp.core.BindingBuilder;
              import org.springframework.amqp.core.FanoutExchange;
              import org.springframework.amqp.core.Queue;
              import org.springframework.context.annotation.Bean;
              import org.springframework.context.annotation.Configuration;
              @Configuration
              public class FanoutConfig {
                  //声明交换机
                  @Bean
                  public FanoutExchange fanoutExchange() {
                      return new FanoutExchange("zxe.fanout");
                  }
                  //声明队列
                  @Bean
                  public Queue fanoutQueue1() {
                      return new Queue("fanout.queue1");
                  }
                  @Bean
                  public Queue fanoutQueue2() {
                      return new Queue("fanout.queue2");
                  }
                  //将队列绑定到交换机,参数为上面声明的方法名
                  @Bean
                  public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
                      return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
                  }
                  @Bean
                  public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
                      return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
                  }
              }
              

              ② 运行项目,测试以下

              服务异步通讯之 SpringAMQP【微服务】,在这里插入图片描述,第23张

              服务异步通讯之 SpringAMQP【微服务】,在这里插入图片描述,第24张

              ② 编写消费者监听代码,分别监听两个队列的消息

              @RabbitListener(queues = "fanout.queue1")
              public void listenFanoutQueue1(String msg) {
                  System.out.println("消费者已接收到fanout.queue1的消息:【" + msg + "】");
              }
              @RabbitListener(queues = "fanout.queue2")
              public void listenFanoutQueue(String msg) {
                  System.out.println("消费者已接收到fanout.queue2的消息:【" + msg + "】");
              }
              

              ③ 编写发布者代码,将消息直接发送到交换机

              @Test
              public void testSendFanoutExchange() {
                  //交换机名称
                  String exchangeName = "zxe.fanout";
                  //消息
                  String message = "hello, every one!";
                  //发送消息
                  rabbitTemplate.convertAndSend(exchangeName, "", message);
              }
              

              ④ 启动项目,发布消息,并消费

              服务异步通讯之 SpringAMQP【微服务】,在这里插入图片描述,第25张

              步骤总结:

              ① 首先声明交换机和队列,并将队列绑定到交换机;

              ② 对于发布者,需要指定交换机名称和所要发布的消息;

              ③ 对于消费者,直接指定想要监听的队列名和行为方法即可。

              3.2 Direct Exchange

              Direct Exchange 会将接收到的消息根据规则路由到指定的 queue,因此称为路由模式。

              原理:

              ① 每一个队列都会设置一个 BindingKey;

              ② 发布者发送消息时,会指定消息的 RoutingKey;

              ③ 交换机负责将消息路由到 BindingKey 和 RoutingKey 一致的队列。

              BindingKey 和 RoutingKey 相当于对暗号,暗号相同则路由。

              一个队列可以指定多个 BindingKey,如果多个队列的 BindingKey 都可以与 RoutingKey 匹配,那交换机就会将消息发送给多个队列。

              ① 用 RabbitListener 注解声明交换机、队列和 BindingKey

              //用RabbitListener注解声明交换机、队列和 BindingKey
              @RabbitListener(bindings = @QueueBinding(
                      value = @Queue(name = "direct.queue1"),
                      exchange = @Exchange("zxe.direct"),
                      key = {"red", "blue"}
              ))
              public void listenDirectQueue1(String msg) {
                  System.out.println("消费者已接收到direct.queue1的消息:【" + msg + "】");
              }
              @RabbitListener(bindings = @QueueBinding(
                      value = @Queue(name = "direct.queue2"),
                      exchange = @Exchange("zxe.direct"),
                      key = {"red", "yellow"}
              ))
              public void listenDirectQueue2(String msg) {
                  System.out.println("消费者已接收到direct.queue2的消息:【" + msg + "】");
              }
              

              服务异步通讯之 SpringAMQP【微服务】,在这里插入图片描述,第26张

              ② 编写发布者代码,指定交换机名,要发布的消息,以及 RoutingKey

              服务异步通讯之 SpringAMQP【微服务】,在这里插入图片描述,第27张

              ③ 启动项目,可以看到队列1 接收到消息,因为它里面包含 blue

              服务异步通讯之 SpringAMQP【微服务】,在这里插入图片描述,第28张

              3.3 Topic Exchange

              Topic Exchange 与 Direct Exchange 类似,区别在于 Topic Exchange 的 routingKey 必须是由多个单词组成的列表,并以 . 分隔,比如 china.news,china.weather,所以它又称为话题模式。

              Queue 与 Exchange 指定 BindingKey 时可以使用通配符。

              #:代指 0 个或多个单词

              *:代指一个单词

              比如中国相关的一切,可以这样表示:china.*

              所有的新闻,可以这样表示:#.news

              以前 bindingKey 需要绑定 news、weather、food 等多个 key,现在只需要一个 china.* 就搞定了,只要是 china 的我都要!

              ① 用 @RabbitListener 声明队列、交换机和 BindingKey,这里使用通配符来指定 key

              @RabbitListener(bindings = @QueueBinding(
                          value = @Queue(name = "topic.queue1"),
                          exchange = @Exchange(name = "zxe.topic", type = ExchangeTypes.TOPIC),
                          key = "china.*"
                  ))
                  public void listenTopicQueue1(String msg) {
                      System.out.println("消费者已接收到topic.queue1的消息:【" + msg + "】");
                  }
                  @RabbitListener(bindings = @QueueBinding(
                          value = @Queue(name = "topic.queue2"),
                          exchange = @Exchange(name = "zxe.topic", type = ExchangeTypes.TOPIC),
                          key = "#.news"
                  ))
                  public void listenTopicQueue2(String msg) {
                      System.out.println("消费者已接收到topic.queue2的消息:【" + msg + "】");
                  }
              

              ② 编写发布者代码,指定 RoutingKey

              @Test
                  public void testSendTopicExchange() {
                      String exchangeName = "zxe.topic";
                      String message = "hello, china!";
                      rabbitTemplate.convertAndSend(exchangeName, "china.weather", message);
                  }
              

              ③ 运行一下

              服务异步通讯之 SpringAMQP【微服务】,在这里插入图片描述,第29张

网友评论

搜索
最新文章
热门文章
热门标签
 
 周公解梦梦见蛇追我  梦见老虎什么预兆  孕妇梦见大象