2024_5_13 SpringBoot configures multiple RabbitMQ
Table of contents
- Single RabbitMQ configuration
1.1. Import Maven coordinates
1.2. yaml configuration
1.3.java configuration class
1.3.1. Switch configuration
1.3.2. Queue configuration
1.3.3. Binding configuration
1.3.4. Connection configuration
1.4. Producer and consumer operation configuration
1.4.1. Producer operation configuration
1.4.2. Consumer operation configuration
- Multiple RabbitMQ configurations
2.1. yaml configuration
2.2.java configuration class
2.3. Producer and consumer operation configuration
2.3.1. Producer operation configuration
2.3.1. Consumer operation configuration
- Summary
**Description of requirements: The original SpringBoot project has already configured a RabbitMQ. The current requirement is to configure another RabbitMQ. The effect is that different RabbitMQs are pushed to different queues without interfering with each other and affecting use. **
1. Single RabbitMQ configuration
1.1. Import Maven coordinates
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.4.4</version>
</dependency>
1.2. yaml configuration
rabbitmq:
host: xx.xxx.xxx.xxx
port: xxxx
username: xxxx
password: xxxxxx
virtual-host: xxxx
publisher-returns: true
publisher-confirms: true
listener:
simple:
default-requeue-rejected: true
retry:
enabled: false
max-attempts: 3
initial-interval: 5000
1.3.java configuration class
1.3.1. Switch configuration
package com.ruoyi.report.config;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
public class ExchangeConfig {
public static final String ecoa_exchange = "ecoaExchange";
/**
* 1.definitiondirect exchange
* 2.durable="true" rabbitmqNo need to create a new switch when restarting
* 3.directSwitches are relatively simple,The matching rule is:If the routing key matches,The message is delivered to the relevant queue
*/
@Bean
public DirectExchange ecoaExchange() {
DirectExchange directExchange = new DirectExchange(ecoa_exchange, true, false);
return directExchange;
}
}
1.3.2. Queue configuration
package com.ruoyi.report.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
/**
* @ClassName QueueConfig
* @Description
* @Author Mr.Huang
* @Date 2023/9/22 16:26
* @Version 1.0
**/
@Component
public class QueueConfig {
private static final String ecoa_file_upload_queue = "ecoa_file_upload_queue";
@Bean
public Queue ecoaFileUploadDispatchQueue() {
/**
durable="true" Endurance rabbitmqThere is no need to create a new queue when restarting
auto-delete Indicates that the message queue will be automatically deleted when it is not in use. The default isfalse
exclusive Indicates whether the message queue is only currentlyconnectionTake effect,The default isfalse
*/
return new Queue(ecoa_file_upload_queue, true, false, false);
}
}
1.3.3. Binding configuration
package com.ruoyi.report.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
/**
* @ClassName BindingConfig
* @Description
* @Author Mr.Huang
* @Date 2023/9/22 16:31
* @Version 1.0
**/
@Component
public class BindingConfig {
@Autowired
private QueueConfig queueConfig;
@Autowired
private ExchangeConfig exchangeConfig;
public static final String ECOA_file_upload_key = "ecoa_file_upload_key";
@Bean
public Binding ecoaFileUploadDispatchBinding() {
return BindingBuilder.bind(queueConfig.ecoaFileUploadDispatchQueue()).to(exchangeConfig.ecoaExchange()).with(ECOA_file_upload_key);
}
}
1.3.4. Connection configuration
package com.ruoyi.report.config;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @ClassName RabbitMqConfig
* @Description
* @Author Mr.Huang
* @Date 2023/9/22 16:14
* @Version 1.0
**/
@Configuration
public class RabbitMqConfig {
/**
* connection factory
*/
@Autowired
private ConnectionFactory connectionFactory;
/**
* customizerabbit templateFor receiving and sending data
* Message confirmation mechanism and callback can be set up
*
* @return
*/
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
return template;
}
}
1.4. Producer and consumer operation configuration
1.4.1. Producer operation configuration
package com.ruoyi.report.utils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.ruoyi.common.utils.StringUtils;
import com.ruoyi.report.config.BindingConfig;
import com.ruoyi.report.config.ExchangeConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;
/**
* @ClassName MessageUtils
* @Description
* @Author Mr.Huang
* @Date 2023/9/22 16:36
* @Version 1.0
**/
@Component
public class MessageUtils {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* Send a message
* Send shipping information
* @param message information
*/
public void sendMessage(Object message) {
String uuid = UUID.randomUUID().toString();
CorrelationData correlationId = new CorrelationData(uuid);
Message msg = MessageBuilder.withBody(JSON.toJSONStringWithDateFormat(message, "yyyy-MM-dd HH:mm:ss", SerializerFeature.WriteNullStringAsEmpty).getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).build();
rabbitTemplate.convertAndSend(ExchangeConfig.ecoa_exchange, BindingConfig.ECOA_file_upload_key, msg, correlationId);
}
}
1.4.2. Consumer operation configuration
package com.ruoyi.report.consumer;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import com.ruoyi.report.config.RabbitMqConfig;
import com.ruoyi.report.entity.open.PrintResult;
import com.ruoyi.report.service.open.PrintSendLogService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @ClassName PrintFeedbackConsumer
* @Description
* @Author Mr.Huang
* @Date 2024/4/30 10:23
* @Version 1.0
**/
@Slf4j
@Component
public class PrintFeedbackConsumer {
@Autowired
private PrintSendLogService printSendLogService;
@RabbitListener(queues = {RabbitMqConfig.print_4pl_dispatch_info_feedback_queue}, containerFactory = "printContainerFactory")
public void receiveMq(Message message, Channel channel) {
try {
String body = new String(message.getBody());
log.info("accept【PrintResult push】RabbitMQinformation:"+body);
JSONObject objJson = JSONObject.parseObject(body);
Thread.sleep(1000);
PrintResult printResult = JSONObject.toJavaObject(objJson, PrintResult.class);
printSendLogService.updatePrintSendLog(printResult);
}catch (Exception e){
log.error("",e);
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
}
2. Multiple RabbitMQ configurations
Maven coordinates are consistent with the single RabbitMQ configuration above
2.1. yaml configuration
rabbitmq:
first:
host: xx.xxx.xxx.xxx
port: xxxx
username: xxxx
password: xxxxxx
virtual-host: xxxx
publisher-returns: true
publisher-confirms: true
listener:
simple:
default-requeue-rejected: true
retry:
enabled: false
max-attempts: 3
initial-interval: 5000
second:
host: xx.xxx.xxx.xxx
port: xxxx
username: xxxx
password: xxxxxx
publisher-returns: true
publisher-confirms: true
virtual-host: xxxx
listener:
simple:
default-requeue-rejected: true
retry:
enabled: false
max-attempts: 3
initial-interval: 5000
2.2.java configuration class
package com.ruoyi.report.config;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
/**
* @ClassName RabbitMqConfig
* @Description
* @Author Mr.Huang
* @Date 2023/9/22 16:14
* @Version 1.0
**/
@Configuration
public class RabbitMqConfig {
// FirstMQElectronic drug testing queue andkey
public static final String ECOA_file_upload_queue = "ecoa_file_upload_queue";
public static final String ECOA_file_upload_key = "ecoa_file_upload_key";
// the secondMQDocument printing platform queue andkey
public static final String print_tms_dispatch_info_queue = "print_tms_dispatch_info_queue";
public static final String print_4pl_dispatch_info_feedback_queue = "print_4pl_dispatch_info_feedback_queue";
public static final String print_tms_dispatch_info_key = "print_tms_dispatch_info_key";
public static final String print_4pl_dispatch_info_feedback_key = "print_4pl_dispatch_info_feedback_key";
/** Switch name */
public static final String EXCHANGE = "ecoaExchange";
public static final String EXCHANGE2 = "tms_exchange";
/** FirstrabbitMqqueue */
@Bean(name = "ECOAConnectionFactory")
@Primary
public ConnectionFactory ECOAConnectionFactory(@Value("{spring.rabbitmq.first.host}") String host,
@Value("{spring.rabbitmq.first.port}") int port,
@Value("{spring.rabbitmq.first.username}") String username,
@Value("{spring.rabbitmq.first.password}") String password,
@Value("{spring.rabbitmq.first.virtual-host}") String virtualHost) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
return connectionFactory;
}
/** the secondrabbitMqqueue */
@Bean(name = "printConnectionFactory")
public ConnectionFactory printConnectionFactory(@Value("{spring.rabbitmq.second.host}") String host,
@Value("{spring.rabbitmq.second.port}") int port,
@Value("{spring.rabbitmq.second.username}") String username,
@Value("{spring.rabbitmq.second.password}") String password,
@Value("{spring.rabbitmq.second.virtual-host}") String virtualHost) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
return connectionFactory;
}
/** FirstrabbitMqOperation template */
@Bean(name="ECOARabbitTemplate")
@Primary
public RabbitTemplate fplRabbitTemplate(@Qualifier("ECOAConnectionFactory") ConnectionFactory connectionFactory){
RabbitTemplate firstRabbitTemplate = new RabbitTemplate(connectionFactory);
return firstRabbitTemplate;
}
/** the secondrabbitMqOperation template */
@Bean(name="printRabbitTemplate")
public RabbitTemplate tcscRabbitTemplate(@Qualifier("printConnectionFactory") ConnectionFactory connectionFactory){
RabbitTemplate secondRabbitTemplate = new RabbitTemplate(connectionFactory);
return secondRabbitTemplate;
}
/** FirstrabbitMqconnection factory */
@Bean(name="ECOAContainerFactory")
public SimpleRabbitListenerContainerFactory ECOAContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
@Qualifier("ECOAConnectionFactory") ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setMaxConcurrentConsumers(5);
factory.setConcurrentConsumers(1);
factory.setPrefetchCount(1);
configurer.configure(factory, connectionFactory);
return factory;
}
/** the secondrabbitMqconnection factory */
@Bean(name="printContainerFactory")
public SimpleRabbitListenerContainerFactory printContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
@Qualifier("printConnectionFactory") ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setMaxConcurrentConsumers(5);
factory.setConcurrentConsumers(1);
factory.setPrefetchCount(1);
configurer.configure(factory, connectionFactory);
return factory;
}
/** FirstmqBind queue to switch */
@Bean
public String runECOAQueue(@Qualifier("ECOAConnectionFactory") ConnectionFactory connectionFactory) {
System.out.println("configuration ECOAQueue ........................");
Connection connection = connectionFactory.createConnection();
Channel channel = connection.createChannel(false);
try {
channel.exchangeDeclare(EXCHANGE, "direct", true, false, null);
// Document push to electronic drug testing queue
channel.queueDeclare(ECOA_file_upload_queue, true, false, false, null);
channel.queueBind(ECOA_file_upload_queue, EXCHANGE, ECOA_file_upload_key);
} catch (Exception e) {
e.printStackTrace();
} finally {
return "ECOAQueue";
}
}
/** the secondmqBind queue to switch */
@Bean
public String runPrintQueue(@Qualifier("printConnectionFactory") ConnectionFactory connectionFactory) {
System.out.println("configuration printQueue ........................");
Connection connection = connectionFactory.createConnection();
Channel channel = connection.createChannel(false);
try {
channel.exchangeDeclare(EXCHANGE2, "direct", true, false, null);
// Document push document printing platform queue
channel.queueDeclare(print_tms_dispatch_info_queue, true, false, false, null);
channel.queueBind(print_tms_dispatch_info_queue, EXCHANGE2, print_tms_dispatch_info_key);
// Document printing platform feedback queue
channel.queueDeclare(print_4pl_dispatch_info_feedback_queue,true,false,false,null);
channel.queueBind(print_4pl_dispatch_info_feedback_queue,EXCHANGE2,print_4pl_dispatch_info_feedback_key);
} catch (Exception e) {
e.printStackTrace();
} finally {
return "printQueue";
}
}
}
Note: You need to comment out the original MQ: switch, queue, and binding configuration classes, leaving only this configuration file. This configuration file has already bound the corresponding: switch and queue. You just need to pay attention to the queue name. , Don’t bind the wrong switch
2.3. Producer and consumer operation configuration
2.3.1. Producer operation configuration
package com.ruoyi.report.utils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.ruoyi.report.config.RabbitMqConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.UUID;
/**
* @ClassName MessageUtils
* @Description
* @Author Mr.Huang
* @Date 2023/9/22 16:36
* @Version 1.0
**/
@Component
public class MessageUtils {
@Resource(name = "ECOARabbitTemplate")
private RabbitTemplate ECOARabbitTemplate;
@Resource(name = "printRabbitTemplate")
private RabbitTemplate printRabbitTemplate;
/**
* TowardsECOASend a message
* Send shipping information
* @param message information
*/
public void sendMessage(Object message) {
String uuid = UUID.randomUUID().toString();
CorrelationData correlationId = new CorrelationData(uuid);
Message msg = MessageBuilder.withBody(JSON.toJSONStringWithDateFormat(message, "yyyy-MM-dd HH:mm:ss", SerializerFeature.WriteNullStringAsEmpty).getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).build();
ECOARabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE, RabbitMqConfig.ECOA_file_upload_key, msg, correlationId);
}
/**
* TowardsprintSend a message
* Send dispatch information
* @param message information
*/
public void sendPrintMessage(Object message) {
String uuid = UUID.randomUUID().toString();
CorrelationData correlationId = new CorrelationData(uuid);
Message msg = MessageBuilder.withBody(JSON.toJSONStringWithDateFormat(message, "yyyy-MM-dd HH:mm:ss", SerializerFeature.WriteNullStringAsEmpty).getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).build();
printRabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE2, RabbitMqConfig.print_tms_dispatch_info_key, msg, correlationId);
}
}
2.3.1. Consumer operation configuration
package com.ruoyi.report.consumer;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import com.ruoyi.report.config.RabbitMqConfig;
import com.ruoyi.report.entity.open.PrintResult;
import com.ruoyi.report.service.open.PrintSendLogService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @ClassName PrintFeedbackConsumer
* @Description
* @Author Mr.Huang
* @Date 2024/4/30 10:23
* @Version 1.0
**/
@Slf4j
@Component
public class PrintFeedbackConsumer {
@Autowired
private PrintSendLogService printSendLogService;
@RabbitListener(queues = {RabbitMqConfig.print_4pl_dispatch_info_feedback_queue}, containerFactory = "printContainerFactory")
public void receiveMq(Message message, Channel channel) {
try {
String body = new String(message.getBody());
log.info("accept【PrintResult push】RabbitMQinformation:"+body);
JSONObject objJson = JSONObject.parseObject(body);
Thread.sleep(1000);
PrintResult printResult = JSONObject.toJavaObject(objJson, PrintResult.class);
printSendLogService.updatePrintSendLog(printResult);
}catch (Exception e){
log.error("",e);
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
}
Same operation as a single RabbitMQ consumer, just pay attention to the queue and connection factory to be consumed
3. Summary
**When configuring a single RabbitMQ, you do not need to care about how the underlying connection factory is configured. When the yaml content is filled in, it will automatically configure the connection factory. You only need to bind the switch, queue, and configuration. When you need to configure multiple mqs, you need to manually configure the connection factory yourself. It is not that you can only configure two RabbitMQs. You can configure more in this format. The only thing to note is not to confuse these queues with switches. **