Spring初始化顺序- RabbitMq 无法自动创建队列
项目中使用了RabbitMq, 并配置了自动创建topic, exchange,binding 等,但是通过测试发现,有一个队列始终无法自动创建,在对spring 源码以及rabbitmq 源码debug 后发现问题。
rabbitmq 配置了两套环境 , 以下为代码示例
@Configuration public class RabbitMqConfiguration { /** * one mq配置 */ @Bean(name = "oneRabbitMQProperties") @ConfigurationProperties(prefix = "spring.rabbitmq.one") public RabbitMQProperties oneRabbitMQProperties() { return new RabbitMQProperties(); } @Primary @Bean(name = "oneRabbitConnectionFactory") public ConnectionFactory oneRabbitConnectionFactory(@Qualifier("oneRabbitMQProperties") RabbitMQProperties rabbitMQProperties) { return connectionFactory(rabbitMQProperties, false); } @Primary @Bean("oneRabbitMqAdmin") public RabbitAdmin oneRabbitMqAdmin(@Qualifier("oneRabbitConnectionFactory") ConnectionFactory oneRabbitConnectionFactory){ return new RabbitAdmin(oneRabbitConnectionFactory); } @Primary @Bean(name = "oneRabbitTemplate") public RabbitTemplate oneRabbitTemplate(@Qualifier("oneRabbitConnectionFactory") ConnectionFactory connectionFactory) { return new RabbitTemplate(connectionFactory); } /** * two mq配置 */ @Bean(name = "twoRabbitMQProperties") @ConfigurationProperties(prefix = "spring.rabbitmq.two") public RabbitMQProperties twoRabbitMQProperties() { return new RabbitMQProperties(); } @Bean(name = "twoRabbitConnectionFactory") public ConnectionFactory twoRabbitConnectionFactory(@Qualifier("twoRabbitMQProperties") RabbitMQProperties rabbitMQProperties) { return connectionFactory(rabbitMQProperties, false); } @Bean("twoRabbitMqAdmin") public RabbitAdmin twoRabbitMqAdmin(@Qualifier("twoRabbitConnectionFactory") ConnectionFactory twoRabbitConnectionFactory){ return new RabbitAdmin(twoRabbitConnectionFactory); } @Bean(name = "twoRabbitTemplate") public RabbitTemplate twoRabbitTemplate(@Qualifier("twoRabbitConnectionFactory") ConnectionFactory connectionFactory) { return new RabbitTemplate(connectionFactory); } private ConnectionFactory connectionFactory(RabbitMQProperties rabbitMQProperties, boolean transaction) { CachingConnectionFactory factory = new CachingConnectionFactory(rabbitMQProperties.getHost(), rabbitMQProperties.getPort()); factory.setUsername(rabbitMQProperties.getUsername()); factory.setPassword(rabbitMQProperties.getPassword()); factory.setPublisherConfirms(!transaction); factory.setPublisherReturns(true); factory.setVirtualHost(rabbitMQProperties.getVirtualHost()); return factory; }
Queue, Exchange, Binding 自动生成配置:
@Configuration public class RabbitMqQueueConfiguration { @Bean public Queue oneQueue(@Qualifier("oneRabbitMqAdmin") RabbitAdmin oneRabbitMqAdmin) { Queue queue = new Queue("one_auto_test"); queue.setAdminsThatShouldDeclare(oneRabbitMqAdmin); return queue; } @Bean public DirectExchange oneExchange(@Qualifier("oneRabbitMqAdmin") RabbitAdmin oneRabbitMqAdmin) { DirectExchange exchange = new DirectExchange("one_auto_test"); exchange.setAdminsThatShouldDeclare(oneRabbitMqAdmin); return exchange; } @Bean public Binding oneBinding(Queue oneQueue, DirectExchange oneExchange, @Qualifier("oneRabbitMqAdmin") RabbitAdmin oneRabbitMqAdmin) { Binding binding = BindingBuilder.bind(oneQueue).to(oneExchange).with("one_auto_test"); binding.setAdminsThatShouldDeclare(oneRabbitMqAdmin); return binding; } @Bean public Queue twoQueue(@Qualifier("twoRabbitMqAdmin") RabbitAdmin oneRabbitMqAdmin) { Queue queue = new Queue("two_auto_test"); queue.setAdminsThatShouldDeclare(oneRabbitMqAdmin); return queue; } @Bean public DirectExchange twoExchange(@Qualifier("twoRabbitMqAdmin") RabbitAdmin oneRabbitMqAdmin) { DirectExchange exchange = new DirectExchange("two_auto_test"); exchange.setAdminsThatShouldDeclare(oneRabbitMqAdmin); return exchange; } @Bean public Binding twoBinding(Queue twoQueue, DirectExchange twoExchange, @Qualifier("twoRabbitMqAdmin") RabbitAdmin oneRabbitMqAdmin) { Binding binding = BindingBuilder.bind(twoQueue).to(twoExchange).with("two_auto_test"); binding.setAdminsThatShouldDeclare(oneRabbitMqAdmin); return binding; } }
通过运行项目,发现队列,交换机,绑定关系创建有问题
2023-03-21 17:54:06.860 TRACE 16056 --- \[ main] o.s.b.f.s.DefaultListableBeanFactory : Ignoring match to currently created bean 'twoExchange': Error creating bean with name 'twoExchange': Requested bean is currently in creation: Is there an unresolvable circular reference?
这个问题非常头疼,日志级别是TRACE 才能看到(Spring 5.0.6 日志级别为DEBUG), 而且在我们配置代码中的断点都可以进入,但是就是无法成功创建。下面我根据自己排查问题的步骤做一次分享记录。
首先当队列、交换机、绑定关系无法创建时,首先我怀疑是关于rabbitmq 的配置出现问题,导致无法创建。而配置类中@Bean 声明后按常理会创建队列,那么肯定在rabbitmq 中有一个步骤就是获取IOC容器中的 Queue,Exchange,Binding 来进行相关处理。
第一个关键角色登场:RabbitAdmin
// 在RabbitAdmin 实现了 InitializingBean ,afterPropertiesSet 会在 属性填充后执行 @Override public void afterPropertiesSet() { synchronized (this.lifecycleMonitor) { if (this.running || !this.autoStartup) { return; } if (this.connectionFactory instanceof CachingConnectionFactory && ((CachingConnectionFactory) this.connectionFactory).getCacheMode() == CacheMode.CONNECTION) { this.logger.warn("RabbitAdmin auto declaration is not supported with CacheMode.CONNECTION"); return; } // Prevent stack overflow... final AtomicBoolean initializing = new AtomicBoolean(false); this.connectionFactory.addConnectionListener(connection -> { if (!initializing.compareAndSet(false, true)) { // If we are already initializing, we don't need to do it again... return; } try { /* * ...but it is possible for this to happen twice in the same ConnectionFactory (if more than * one concurrent Connection is allowed). It's idempotent, so no big deal (a bit of network * chatter). In fact it might even be a good thing: exclusive queues only make sense if they are * declared for every connection. If anyone has a problem with it: use auto-startup="false". */ initialize(); } finally { initializing.compareAndSet(true, false); } }); this.running = true; } } public void initialize() { if (this.applicationContext == null) { this.logger.debug("no ApplicationContext has been set, cannot auto-declare Exchanges, Queues, and Bindings"); return; } this.logger.debug("Initializing declarations"); CollectioncontextExchanges = new LinkedList ( this.applicationContext.getBeansOfType(Exchange.class).values()); Collection contextQueues = new LinkedList ( this.applicationContext.getBeansOfType(Queue.class).values()); Collection contextBindings = new LinkedList ( this.applicationContext.getBeansOfType(Binding.class).values()); @SuppressWarnings("rawtypes") Collection collections = this.declareCollections ? this.applicationContext.getBeansOfType(Collection.class, false, false).values() : Collections.emptyList(); for (Collection> collection : collections) { if (collection.size() > 0 && collection.iterator().next() instanceof Declarable) { for (Object declarable : collection) { if (declarable instanceof Exchange) { contextExchanges.add((Exchange) declarable); } else if (declarable instanceof Queue) { contextQueues.add((Queue) declarable); } else if (declarable instanceof Binding) { contextBindings.add((Binding) declarable); } } } } final Collection exchanges = filterDeclarables(contextExchanges); final Collection queues = filterDeclarables(contextQueues); final Collection bindings = filterDeclarables(contextBindings); for (Exchange exchange : exchanges) { if ((!exchange.isDurable() || exchange.isAutoDelete()) && this.logger.isInfoEnabled()) { this.logger.info("Auto-declaring a non-durable or auto-delete Exchange (" + exchange.getName() + ") durable:" + exchange.isDurable() + ", auto-delete:" + exchange.isAutoDelete() + ". " + "It will be deleted by the broker if it shuts down, and can be redeclared by closing and " + "reopening the connection."); } } for (Queue queue : queues) { if ((!queue.isDurable() || queue.isAutoDelete() || queue.isExclusive()) && this.logger.isInfoEnabled()) { this.logger.info("Auto-declaring a non-durable, auto-delete, or exclusive Queue (" + queue.getName() + ") durable:" + queue.isDurable() + ", auto-delete:" + queue.isAutoDelete() + ", exclusive:" + queue.isExclusive() + ". " + "It will be redeclared if the broker stops and is restarted while the connection factory is " + "alive, but all messages will be lost."); } } if (exchanges.size() == 0 && queues.size() == 0 && bindings.size() == 0) { this.logger.debug("Nothing to declare"); return; } this.rabbitTemplate.execute(channel -> { declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()])); declareQueues(channel, queues.toArray(new Queue[queues.size()])); declareBindings(channel, bindings.toArray(new Binding[bindings.size()])); return null; }); this.logger.debug("Declarations finished"); }
在RabbitAdmin 中可以看到暴露队列、交换机、绑定关系的相关处理逻辑。此处打断点后,可以发现无法全量获取到的交换机、绑定关系、队列。
那为什么没有全量获取呢?下面我们对getBeansOfType 方法进行剖析。重点关注下面代码:
此处是根据类型在IOC 容器中获取相关类型的Bean。
@Override @SuppressWarnings("unchecked") publicMap getBeansOfType( @Nullable Class type, boolean includeNonSingletons, boolean allowEagerInit) throws BeansException { String[] beanNames = getBeanNamesForType(type, includeNonSingletons, allowEagerInit); Map result = CollectionUtils.newLinkedHashMap(beanNames.length); for (String beanName : beanNames) { try { Object beanInstance = getBean(beanName); if (!(beanInstance instanceof NullBean)) { result.put(beanName, (T) beanInstance); } } catch (BeanCreationException ex) { Throwable rootCause = ex.getMostSpecificCause(); if (rootCause instanceof BeanCurrentlyInCreationException) { BeanCreationException bce = (BeanCreationException) rootCause; String exBeanName = bce.getBeanName(); if (exBeanName != null && isCurrentlyInCreation(exBeanName)) { if (logger.isTraceEnabled()) { logger.trace("Ignoring match to currently created bean '" + exBeanName + "': " + ex.getMessage()); } onSuppressedException(ex); // Ignore: indicates a circular reference when autowiring constructors. // We want to find matches other than the currently created bean itself. continue; } } throw ex; } } return result; }
getBeansOfType 是先根据类型获取BeanName ,在通过BeanName去实例化,初始化Bean, 但是在这一步会出现错误。
2023-03-21 17:54:06.860 TRACE 16056 --- [ main] o.s.b.f.s.DefaultListableBeanFactory : Ignoring match to currently created bean 'twoExchange': Error creating bean with name 'twoExchange': Requested bean is currently in creation: Is there an unresolvable circular reference?
通过debug 不难发现上述错误是在 DefaultSingletonBeanRegistry.beforeSingletonCreation 方法中抛出的异常。
protected void beforeSingletonCreation(String beanName) { if (!this.inCreationCheckExclusions.contains(beanName) && !this.singletonsCurrentlyInCreation.add(beanName)) { throw new BeanCurrentlyInCreationException(beanName); } }
beforeSingletonCreation 就是在校验是否在重复创建bean 。表明现在有bean 正在重复创建。
此处大概已经明确了是bean的创建顺序导致有部分bean在重复创建,导致rabbitadmin 无法拿到全部的队列、交换机、绑定关系从而无法自动创建队列等。
接着,我们从Spring开始创建bean 的代码入手,
DefaultListableBeanFactory.preInstantiateSingletons
public void preInstantiateSingletons() throws BeansException { if (logger.isTraceEnabled()) { logger.trace("Pre-instantiating singletons in " + this); } // Iterate over a copy to allow for init methods which in turn register new bean definitions. // While this may not be part of the regular factory bootstrap, it does otherwise work fine. ListbeanNames = new ArrayList<>(this.beanDefinitionNames); // Trigger initialization of all non-lazy singleton beans... for (String beanName : beanNames) { RootBeanDefinition bd = getMergedLocalBeanDefinition(beanName); if (!bd.isAbstract() && bd.isSingleton() && !bd.isLazyInit()) { if (isFactoryBean(beanName)) { Object bean = getBean(FACTORY_BEAN_PREFIX + beanName); if (bean instanceof FactoryBean) { FactoryBean> factory = (FactoryBean>) bean; boolean isEagerInit; if (System.getSecurityManager() != null && factory instanceof SmartFactoryBean) { isEagerInit = AccessController.doPrivileged( (PrivilegedAction ) ((SmartFactoryBean>) factory)::isEagerInit, getAccessControlContext()); } else { isEagerInit = (factory instanceof SmartFactoryBean && ((SmartFactoryBean>) factory).isEagerInit()); } if (isEagerInit) { getBean(beanName); } } } else { getBean(beanName); } } } // Trigger post-initialization callback for all applicable beans... for (String beanName : beanNames) { Object singletonInstance = getSingleton(beanName); if (singletonInstance instanceof SmartInitializingSingleton) { StartupStep smartInitialize = this.getApplicationStartup().start("spring.beans.smart-initialize") .tag("beanName", beanName); SmartInitializingSingleton smartSingleton = (SmartInitializingSingleton) singletonInstance; if (System.getSecurityManager() != null) { AccessController.doPrivileged((PrivilegedAction
可以看到Spring 是获取到所有的beanNames 然后循环创建bean
哦嚯,原来Queue, Exchange,Binding的顺序在RabbitAdmin之前,而创建队列的方法是需要依赖RabbitAdmin ,这就导致,在创建Queue时,会触发RabbitAdmin 的创建,而RabbitAdmin 初始化过程中,又会去扫描所有的Queue, Exchange,Binding,这时部分bean在创建中,导致RabbitAdmin 无法拿到bean, 也就无法进行自动创建。
改进:将RabbitAdmin 改变为类属性注入,这样就避免了重复创建问题。但是并不是所有的项目都会出现此问题,此问题依赖于项目中bean的加载顺序。
猜你喜欢
网友评论
- 搜索
- 最新文章
- 热门文章