根据amqp协议、rabbitmq入门、springboot集成rabbitmq 可知,rabbitmq的广播模式关键是使用fanout类型的exchange,fanout exchange会忽略message中的routing-key、queue中的binding-key,发给绑定exchange的全部queue。
创建fanout类型的exchange
import org.springframework.amqp.core.*; @Configuration public class MqConfig { /** * 定义广播交换机 * @return */ @Bean public FanoutExchange fanoutExchange() { final FanoutExchange fanoutExchange = new FanoutExchange("自定义广播类型的交换机名称"); return fanoutExchange; } }
发送
@Autowired private AmqpTemplate amqpTemplate; //发送到订阅数据的exchange中 amqpTemplate.convertAndSend("自定义广播类型的交换机名称", //fanout类型的exchange会忽略routing-key,所以这里的binding key传空字符串 "", message);
消费
import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; /** * 将数据发送给队列1 * @param message */ @RabbitListener(bindings = @QueueBinding(value = @Queue(“自定义队列1”), exchange = @Exchange(value = "自定义广播类型的交换机名称", type = ExchangeTypes.FANOUT), //fanout类型exchange会忽略binding-key key = "")) public void doSynAddDataToJD(String message) { log.info("广播模式,同步数据给订阅方"); } /** * 将数据发送给队列2 * @param message */ @RabbitListener(bindings = @QueueBinding(value = @Queue(“自定义队列2”), exchange = @Exchange(value = "自定义广播类型的交换机名称", type = ExchangeTypes.FANOUT), key = "")) public void doSynAddDataToJD(String message) { log.info("广播模式,同步数据给订阅方"); }
总结
实现发布订阅(广播模式)的关键在于对exchange类型的理解,可参考amqp协议、rabbitmq入门、springboot集成rabbitmq 、AMQP 0-9-1 Model Explained,源码中的类型有如下几种
package org.springframework.amqp.core; /** * Constants for the standard Exchange type names. * * @author Mark Fisher * @author Gary Russell */ public abstract class ExchangeTypes { /** * Direct exchange. * routing key和binding key完全匹配 */ public static final String DIRECT = "direct"; /** * Topic exchange. * binding key可使用通配符来匹配routing key */ public static final String TOPIC = "topic"; /** * Fanout exchange. * 会忽略routing key、binding key,消息发送到绑定exchange的全部queue */ public static final String FANOUT = "fanout"; /** * Headers exchange. * 使用headers中的属性来匹配,有只匹配一项或者全部匹配可选 */ public static final String HEADERS = "headers"; /** * System exchange. * 这个类型,暂时缺乏相关资料。 */ public static final String SYSTEM = "system"; }
猜你喜欢
网友评论
- 搜索
- 最新文章
- 热门文章