Problem Description
A colleague said that there is a project that cannot automatically create a new queue. Every time a new queue is added, it must be manually created in the rabbitmq background. This is only true in the production environment. The test environment does not have this problem.
When I heard that this kind of thing still happened, how could I bear it? In the spirit of whoever goes to hell if I don’t go to hell, I started to investigate this problem. By the way, I learned more about how springboot realizes the automatic creation of rabbitmq queue.
Find the cause
Normally, after a consumer uses the @RabbitListener annotation to declare queue and exchange, it will automatically create a queue that does not exist and declare the binding relationship between the queue and exchange. This is also used in this project, but why is there no queue created?
First, by observing the startup log, we found the following error message fragment:
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'test_queue' in vhost 'test_vhost', class-id=50, method-id=10)
From the error message, we can clearly know that the channel is closed abnormally because the queue does not exist in the vhost. This can also be explained through the official website
The rabbitmq official website describes the channel error message! Insert picture description here But this is still the result of not creating a queue , and continue to find the reason why it is not automatically created. Continue to check the log and find another error message
Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'auto_delete' for exchange 'dome_exchange' in vhost 'test_vhost': received 'false' but current is 'true', class-id=40, method-id=10)
Because this error message was not directly related to the current queue, it was ignored by colleagues. But when I saw the channel.close keyword, I thought it was because of this error that the link was closed and the queue was not created? With this question in mind, continue to troubleshoot the problem.
View source code
First confirm the cause of the error, check deme_exchange through rabbitmq background, and find that the auto_delete parameter is indeed true.
Rabbitmq background view parameters! Insert picture description here
consumer code
The auto_delete parameter is not declared in the consumer code. By checking the @Exchange annotation, it is found that auto_delete defaults to false.
/**
* @return true if the exchange is to be declared as auto-delete.
*/
String autoDelete() default "false";
This causes the exchange declared in the code when the application is started to be inconsistent with the parameters that already exist on the remote side, resulting in a 406 error. You can also see the explanation of the 406 error through the screenshot of the official website above, which can confirm this. As for the reason for the inconsistency, I later asked my colleague that he should have created the exchange manually in the background and did not pay attention to the details of these parameters. So you must standardize the operation, otherwise you will dig holes.
After finding the cause of the 406 error, continue to investigate why the queue is not automatically created. By looking at the @RabbitListener annotation source code, I found that the description of the queues parameters is as follows (the description may be different for different springboot versions)
/**
* The queues for this listener.
* The entries can be 'queue name', 'property-placeholder keys' or 'expressions'.
* Expression must be resolved to the queue name or {@code Queue} object.
* The queue(s) must exist, or be otherwise defined elsewhere as a bean(s) with
* a {@link org.springframework.amqp.rabbit.core.RabbitAdmin} in the application
* context.
* Mutually exclusive with {@link #bindings()} and {@link #queuesToDeclare()}.
* @return the queue names or expressions (SpEL) to listen to from target
* @see org.springframework.amqp.rabbit.listener.MessageListenerContainer
*/
String[] queues() default {};
It says that either the queue already exists, or the RabbitAdmin bean is defined elsewhere.
Continue to look at the RabbitAdmin source code and find several key information. This class implements the InitializingBean interface. This interface will execute the specified logic after the Bean initialization is completed. It is implemented in the afterPropertiesSet() method under the InitializingBean interface and found in the afterPropertiesSet() method rewritten by RabbitAdmin. The initialize() method will be called. The source code of this method is as follows
/**
* Declares all the exchanges, queues and bindings in the enclosing application context, if any. It should be safe
* (but unnecessary) to call this method more than once.
*/
@Override // NOSONAR complexity
public void initialize() {
if (this.applicationContext == null) {
this.logger.debug("no ApplicationContext has been set, cannot auto-declare Exchanges, Queues, and Bindings");
return;
}
this.logger.debug("Initializing declarations");
Collection<Exchange> contextExchanges = new LinkedList<Exchange>(
this.applicationContext.getBeansOfType(Exchange.class).values());
Collection<Queue> contextQueues = new LinkedList<Queue>(
this.applicationContext.getBeansOfType(Queue.class).values());
Collection<Binding> contextBindings = new LinkedList<Binding>(
this.applicationContext.getBeansOfType(Binding.class).values());
Collection<DeclarableCustomizer> customizers =
this.applicationContext.getBeansOfType(DeclarableCustomizer.class).values();
processDeclarables(contextExchanges, contextQueues, contextBindings);
final Collection<Exchange> exchanges = filterDeclarables(contextExchanges, customizers);
final Collection<Queue> queues = filterDeclarables(contextQueues, customizers);
final Collection<Binding> bindings = filterDeclarables(contextBindings, customizers);
for (Exchange exchange : exchanges) {
if ((!exchange.isDurable() || exchange.isAutoDelete()) && this.logger.isInfoEnabled()) {
this.logger.info("Auto-declaring a non-durable or auto-delete Exchange ("
+ exchange.getName()
+ ") durable:" + exchange.isDurable() + ", auto-delete:" + exchange.isAutoDelete() + ". "
+ "It will be deleted by the broker if it shuts down, and can be redeclared by closing and "
+ "reopening the connection.");
}
}
for (Queue queue : queues) {
if ((!queue.isDurable() || queue.isAutoDelete() || queue.isExclusive()) && this.logger.isInfoEnabled()) {
this.logger.info("Auto-declaring a non-durable, auto-delete, or exclusive Queue ("
+ queue.getName()
+ ") durable:" + queue.isDurable() + ", auto-delete:" + queue.isAutoDelete() + ", exclusive:"
+ queue.isExclusive() + ". "
+ "It will be redeclared if the broker stops and is restarted while the connection factory is "
+ "alive, but all messages will be lost.");
}
}
if (exchanges.size() == 0 && queues.size() == 0 && bindings.size() == 0) {
this.logger.debug("Nothing to declare");
return;
}
this.rabbitTemplate.execute(channel -> {
declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()]));
declareQueues(channel, queues.toArray(new Queue[queues.size()]));
declareBindings(channel, bindings.toArray(new Binding[bindings.size()]));
return null;
});
this.logger.debug("Declarations finished");
}
You can see that the initialize() method will obtain all Exchange, Queue and Binding beans from the spring container, and create non-existent exchanges, queues and bindings in declareExchanges(), declareQueues(), declareBindings() and other methods. relation. This can also explain why the corresponding exchange, queue and Bingding beans are created in the consumer, and the queue is automatically created.
When the declareExchanges() method returns 406 with an exception, the following declareQueues() and declareBindings() methods will not continue to be executed, so the new queue is not automatically created. At this point, we have found the reason why the queue was not created and the source code related to automatically creating exchanges and queues. The reason why no error is reported in the test environment is because there is no inconsistency in exchange parameters.
How RabbitAdmin is automatically created
After further thinking, I found that the RabbitAdmin bean was not created in the code, so when was it injected into the container?
Continuing to trace the source code, we found that RabbitAdmin was created in the RabbitAutoConfiguration class and injected into the container.
@Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "dynamic", matchIfMissing = true)
@ConditionalOnMissingBean
public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
RabbitAutoConfiguration is the configuration class of rabbitmq. Through annotations, you can find that it will be enabled when the RabbitTemplate class exists.
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass({ RabbitTemplate.class, Channel.class })
@EnableConfigurationProperties(RabbitProperties.class)
@Import(RabbitAnnotationDrivenConfiguration.class)
public class RabbitAutoConfiguration {
RabbitTemplate is an operation class that encapsulates Rabbitmq. It is explicitly declared in the project, so RabbitAutoConfiguration will also be created. At this point, the general process of rabbitmq automatically creating a queue has been strung together.