目前公司使用jeepluscloud版本,这个版本没有集成消息队列,这里记录一下,集成的过程;这个框架跟ruoyi的那个微服务版本结构一模一样,所以也可以快速上手。
1.项目结构图:
配置类的东西做成一个公共的模块
rabbitmq模块:
2.核心配置
1.pom类
jeeplus-common org.jeeplus ${revision} 4.0.0 jeeplus-common-rabbitmqjar org.springframework.boot spring-boot-starter-amqporg.jeeplus jeeplus-common-core${project.parent.version}
2.ConditionalOnRabbit
package com.jeeplus.common.rabbit.conditional; import org.springframework.context.annotation.Conditional; import java.lang.annotation.*; /** * 判断系统是否在启用了Rabbit, 未启用的情况下不将Bean注册到系统中 * * 使用场景: 在不使用Rabbit中间件但未去除Rabbit依赖的情况下, 通过配置文件中关闭Rabbit选项, * 同时将这个注解到有`@RabbitListener`标志的类上,让这个对象不注册到Spring容器中, * 从而避免`RabbitMQ`进行无限尝试重连服务器,导致项目一直抛出异常,影响开发和使用。 * * @author xxm * @since 2022/12/12 */ @Target({ ElementType.TYPE }) @Retention(RetentionPolicy.RUNTIME) @Documented @Conditional(OnRabbitEnable.class) public @interface ConditionalOnRabbit { }
3.OnRabbitEnable
package com.jeeplus.common.rabbit.conditional; import com.jeeplus.common.rabbit.configuration.RabbitMqProperties; import org.springframework.boot.context.properties.bind.Binder; import org.springframework.context.annotation.Condition; import org.springframework.context.annotation.ConditionContext; import org.springframework.core.type.AnnotatedTypeMetadata; /** * 判断是否在启用了Rabbit, 用来控制在没启用Rabbit情况下. 不将 @RabbitListener 修饰的监听器注册为Bean, 不然会导致无限尝试重连 * * @author xxm * @since 2022/12/12 */ public class OnRabbitEnable implements Condition { private final String rabbitPropertiesPrefix = "com.jeeplus.common.rabbit"; /** * @param context * @param metadata * @return */ @Override public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) { RabbitMqProperties rabbitMqProperties = Binder.get(context.getEnvironment()) .bind(rabbitPropertiesPrefix, RabbitMqProperties.class) .orElse(new RabbitMqProperties()); return rabbitMqProperties.isEnable(); } }
4.BootxRabbitListenerConfigurer
package com.jeeplus.common.rabbit.configuration; import lombok.RequiredArgsConstructor; import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer; import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory; /** * Rabbit 侦听器配置器 * * @author xxm * @since 2021/6/25 */ @Configuration @RequiredArgsConstructor public class BootxRabbitListenerConfigurer implements RabbitListenerConfigurer { private final DefaultMessageHandlerMethodFactory jsonHandlerMethodFactory; @Override public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) { registrar.setMessageHandlerMethodFactory(jsonHandlerMethodFactory); } }
5.BootxRabbitListenerConfigurer
package com.jeeplus.common.rabbit.configuration; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.converter.MappingJackson2MessageConverter; import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory; /** * 消息队列配置 * * @author xxm * @since 2021/6/25 */ @EnableRabbit @Configuration public class RabbitMqConfigurer { /** * 注册 RabbitTemplate 对象, 使用默认序列化方式 */ @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, ObjectMapper objectMapper) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); // 使用系统同版jackson 序列化配置 rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter(objectMapper)); return rabbitTemplate; } /** * 添加默认消息序列化方式, 使用默认序列化方式 */ @Bean public DefaultMessageHandlerMethodFactory jsonHandlerMethodFactory(ObjectMapper objectMapper) { DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory(); // 这里的转换器设置实现了 通过 @Payload 注解 自动反序列化message body MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter(); converter.setObjectMapper(objectMapper); factory.setMessageConverter(converter); return factory; } }
6.RabbitMqConfigurer
package com.jeeplus.common.rabbit.configuration; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.converter.MappingJackson2MessageConverter; import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory; /** * 消息队列配置 * * @author xxm * @since 2021/6/25 */ @EnableRabbit @Configuration public class RabbitMqConfigurer { /** * 注册 RabbitTemplate 对象, 使用默认序列化方式 */ @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, ObjectMapper objectMapper) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); // 使用系统同版jackson 序列化配置 rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter(objectMapper)); return rabbitTemplate; } /** * 添加默认消息序列化方式, 使用默认序列化方式 */ @Bean public DefaultMessageHandlerMethodFactory jsonHandlerMethodFactory(ObjectMapper objectMapper) { DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory(); // 这里的转换器设置实现了 通过 @Payload 注解 自动反序列化message body MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter(); converter.setObjectMapper(objectMapper); factory.setMessageConverter(converter); return factory; } }
7.RabbitMqProperties
package com.jeeplus.common.rabbit.configuration; import com.jeeplus.common.rabbit.conditional.ConditionalOnRabbit; import lombok.Getter; import lombok.Setter; import org.springframework.boot.context.properties.ConfigurationProperties; /** * MQTT配置 * * @author xxm * @since 2022/12/12 */ @Getter @Setter @ConfigurationProperties("com.jeeplus.common.rabbit") public class RabbitMqProperties { /** * 是否开启 RabbitMQ功能, * @see ConditionalOnRabbit 配合此注解使用 */ private boolean enable = false; }
8.RabbitMqCommonAutoConfiguration
package com.jeeplus.common.rabbit; import org.springframework.boot.autoconfigure.SpringBootApplication; /** * RabbitMQ配置 * * @author xxm * @since 2022/5/3 */ @SpringBootApplication public class RabbitMqCommonAutoConfiguration { }
9.org.springframework.boot.autoconfigure.AutoConfiguration.imports
RabbitMqCommonAutoConfiguration
10.spring.factories
## 配置自定义 starter 的自动化配置 org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.jeeplus.common.rabbit.RabbitMqCommonAutoConfiguration
3. nacos配置
哪一个服务模块需要消息队列,就在对应的yml文件中配置 rabbit链接
#rabbitmq rabbitmq: host: localhost port: 5627 username: root password: root123 virtual-host: / publisher-confirm-type: correlated listener: simple: acknowledge-mode: manual
4.服务中调用rabbitmq
建立两个包,配置类和监听类
1.mq模板配置
package com.jeeplus.duxin.config; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; /** * mq模板 * @author lgn * @date 2023/10/28 10:15 */ @Configuration public class MyRabbitConfig { private RabbitTemplate rabbitTemplate; @Primary @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); this.rabbitTemplate = rabbitTemplate; rabbitTemplate.setMessageConverter(messageConverter()); initRabbitTemplate(); return rabbitTemplate; } @Bean public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); } /** * 定制RabbitTemplate * 1、服务收到消息就会回调 * 1、spring.rabbitmq.publisher-confirms: true * 2、设置确认回调 * 2、消息正确抵达队列就会进行回调 * 1、spring.rabbitmq.publisher-returns: true * spring.rabbitmq.template.mandatory: true * 2、设置确认回调ReturnCallback * * 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息) * */ // @PostConstruct //MyRabbitConfig对象创建完成以后,执行这个方法 public void initRabbitTemplate() { /** * 1、只要消息抵达Broker就ack=true * correlationData:当前消息的唯一关联数据(这个是消息的唯一id) * ack:消息是否成功收到 * cause:失败的原因 */ //设置确认回调 rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> { System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]"); }); /** * 只要消息没有投递给指定的队列,就触发这个失败回调 * message:投递失败的消息详细信息 * replyCode:回复的状态码 * replyText:回复的文本内容 * exchange:当时这个消息发给哪个交换机 * routingKey:当时这个消息用哪个路邮键 */ rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> { System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" + "==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]"); }); } }
2.服务交换机 队列设置
初始化交换机,队列,建立绑定。
package com.jeeplus.duxin.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.Exchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; /** * 服务交换机 队列设置 * @author lgn * @date 2023/10/28 10:16 */ @Configuration public class MyRabbitMQConfig { /* 容器中的Queue、Exchange、Binding 会自动创建(在RabbitMQ)不存在的情况下 */ /* *//** * 初始化队列 * 死信队列 * * @return *//*@Bean public Queue orderDelayQueue() { *//* Queue(String name, 队列名字 boolean durable, 是否持久化 boolean exclusive, 是否排他 boolean autoDelete, 是否自动删除 Maparguments) 属性 *//* HashMap arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange", "order-event-exchange"); arguments.put("x-dead-letter-routing-key", "order.release.order"); arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟 Queue queue = new Queue("order.delay.queue", true, false, false, arguments); return queue; }*/ /* *//** * 初始化队列 * 普通队列 * * @return *//* @Bean public Queue orderReleaseQueue() { Queue queue = new Queue("order.release.order.queue", true, false, false); return queue; }*/ /* *//** * * TopicExchange * 创建topic类型的交换机 * @return *//* @Bean public Exchange orderEventExchange() { *//* * String name, * boolean durable, * boolean autoDelete, * Map arguments * *//* return new TopicExchange("order-event-exchange", true, false); }*/ /* *//** * 路由和交换机进行绑定 设置路由key * @author lgn * @date 2023/10/28 10:33 * @return Binding *//* @Bean public Binding orderCreateBinding() { *//* * String destination, 目的地(队列名或者交换机名字) * DestinationType destinationType, 目的地类型(Queue、Exhcange) * String exchange, * String routingKey, * Map arguments * *//* return new Binding("order.delay.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.create.order", null); }*/ /* @Bean public Binding orderReleaseBinding() { return new Binding("order.release.order.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.release.order", null); } *//** * 订单释放直接和库存释放进行绑定 * @return *//* @Bean public Binding orderReleaseOtherBinding() { return new Binding("stock.release.stock.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.release.other.#", null); }*/ /* *//** * 初始化队列 * 商品秒杀队列 * @return *//* @Bean public Queue orderSecKillOrrderQueue() { Queue queue = new Queue("order.seckill.order.queue", true, false, false); return queue; } @Bean public Binding orderSecKillOrrderQueueBinding() { //String destination, DestinationType destinationType, String exchange, String routingKey, // Map arguments Binding binding = new Binding( "order.seckill.order.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.seckill.order", null); return binding; }*/ /** * BOM模块的交换机 * TopicExchange * 创建topic类型的交换机 * @return */ @Bean public Exchange orderEventExchange() { /* * String name, * boolean durable, * boolean autoDelete, * Map arguments * */ return new TopicExchange("bom-event-exchange", true, false); } /** * 初始化BOM队列 * @return */ @Bean public Queue bomMaintenanceQueue() { Queue queue = new Queue("bom.maintenance.queue", true, false, false); return queue; } /** * bom * 路由和交换机进行绑定 设置路由key * @author lgn * @date 2023/10/28 10:33 * @return Binding */ @Bean public Binding bomCreateBinding() { /* * String destination, 目的地(队列名或者交换机名字) * DestinationType destinationType, 目的地类型(Queue、Exhcange) * String exchange, * String routingKey, * Map arguments * */ return new Binding("bom.maintenance.queue", Binding.DestinationType.QUEUE, "bom-event-exchange", "bom.maintenance.create", null); } /** * 初始化产品存货档案队列 * @return */ @Bean public Queue stockDocQueue() { Queue queue = new Queue("stock.doc.queue", true, false, false); return queue; } /** * 存货档案StockDoc * 路由和交换机进行绑定 设置路由key * @author lgn * @date 2023/10/28 10:33 * @return Binding */ @Bean public Binding docCreateBinding() { /* * String destination, 目的地(队列名或者交换机名字) * DestinationType destinationType, 目的地类型(Queue、Exhcange) * String exchange, * String routingKey, * Map arguments * */ return new Binding("stock.doc.queue", Binding.DestinationType.QUEUE, "bom-event-exchange", "stock.doc.create", null); } /** * 调用C++模块的交换机 * TopicExchange * 创建topic类型的交换机 * @return */ @Bean public Exchange cEventExchange() { /* * String name, * boolean durable, * boolean autoDelete, * Map arguments * */ return new TopicExchange("c-event-exchange", true, false); } /** * 初始化c++生成记录文件队列 * @return */ @Bean public Queue cCreatFileQueue() { Queue queue = new Queue("c.creatfile.queue", true, false, false); return queue; } /** * 初始化c++签名队列 * @return */ @Bean public Queue cDealQueue() { Queue queue = new Queue("c.deal.queue", true, false, false); return queue; } /** * 创建绑定关系 * @author lgn * @date 2023/10/30 9:34 * @return Binding */ @Bean public Binding cCreatFileCreateBinding() { /* * String destination, 目的地(队列名或者交换机名字) * DestinationType destinationType, 目的地类型(Queue、Exhcange) * String exchange, * String routingKey, * Map arguments * */ return new Binding("c.creatfile.queue", Binding.DestinationType.QUEUE, "c-event-exchange", "c.creatFile.create", null); } /** * 创建绑定关系 * @author lgn * @date 2023/10/30 9:34 * @return Binding */ @Bean public Binding cDealBinding() { /* * String destination, 目的地(队列名或者交换机名字) * DestinationType destinationType, 目的地类型(Queue、Exhcange) * String exchange, * String routingKey, * Map arguments * */ return new Binding("c.deal.queue", Binding.DestinationType.QUEUE, "c-event-exchange", "c.deal.create", null); } }
3.监听队列 接收消息
消费方消费消息
package com.jeeplus.duxin.listener; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; /** * 监听路由信息 * @author lgn * @date 2023/10/28 10:33 */ @Slf4j @Component //@RabbitListener标注在方法上,直接监听指定的队列,此时接收的参数需要与发送时类型一致 //@RabbitListener 注解是指定某方法作为消息消费的方法,例如监听某 Queue 里面的消息。 @RabbitListener(queues = "bom.maintenance.queue") public class MQTestListener { //@RabbitListener 可以标注在类上面,需配合 @RabbitHandler 注解一起使用 //@RabbitListener 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理,根据接受的参数类型进入具体的方法中。 @RabbitHandler public void listener(String info,Channel channel, Message message) throws IOException { System.out.println("=============接收消息开始执行:"+info); try { channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (Exception e) { channel.basicReject(message.getMessageProperties().getDeliveryTag(),true); } } }
5.使用
使用起来也非常方便:
在业务service中直接调用,生产者消息发送。
/** * mqTest * @author lgn * @date 2023/10/28 10:03 * @return Object */ public String mqTest() { //TODO 订单创建成功,发送消息给MQ rabbitTemplate.convertAndSend("bom-event-exchange","bom.maintenance.create","1234"); return null; }
希望对你有用!
猜你喜欢
- 1小时前大数据实验 实验七:Flink初级编程实践
- 1小时前unity 浏览器插件【embedded browser(原zfbrowser)】简单教程,使unity支持web h5页面,附软件下载链接
- 1小时前ImageNet Classification with Deep Convolutional 论文笔记
- 1小时前智能小程序相关名词解释(汇总)
- 1小时前linux搭建LAMP服务
- 1小时前科技助力养老变享老,内蒙古乌兰美康养院与清雷科技达成合作
- 1小时前清华大学操作系统rCore实验-第零章-Lab环境搭建
- 1小时前[Flink] Flink On Yarn(yarn-session.sh)启动错误
- 1小时前软件架构设计的核心:抽象与模型、“战略编程”
- 1小时前留学中介收费情况(留学中介机构收费标准)
网友评论
- 搜索
- 最新文章
- 热门文章