2024_5_13 SpringBoot configures multiple RabbitMQ

Table of contents

  1. Single RabbitMQ configuration

1.1. Import Maven coordinates

1.2. yaml configuration

1.3.java configuration class

1.3.1. Switch configuration

1.3.2. Queue configuration

1.3.3. Binding configuration

1.3.4. Connection configuration

1.4. Producer and consumer operation configuration

1.4.1. Producer operation configuration

1.4.2. Consumer operation configuration

  1. Multiple RabbitMQ configurations

2.1. yaml configuration

2.2.java configuration class

2.3. Producer and consumer operation configuration

2.3.1. Producer operation configuration

2.3.1. Consumer operation configuration

  1. Summary

**Description of requirements: The original SpringBoot project has already configured a RabbitMQ. The current requirement is to configure another RabbitMQ. The effect is that different RabbitMQs are pushed to different queues without interfering with each other and affecting use. **

1. Single RabbitMQ configuration

1.1. Import Maven coordinates

 <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.10.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>2.4.4</version>
        </dependency>

1.2. yaml configuration

 rabbitmq:
    host: xx.xxx.xxx.xxx
    port: xxxx
    username: xxxx
    password: xxxxxx
    virtual-host: xxxx
    publisher-returns: true
    publisher-confirms: true
    listener:
       simple:
         default-requeue-rejected: true
           retry:
            enabled: false
            max-attempts: 3
            initial-interval: 5000

1.3.java configuration class

1.3.1. Switch configuration

package com.ruoyi.report.config;

import org.springframework.amqp.core.DirectExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
public class ExchangeConfig {

    public static final String ecoa_exchange = "ecoaExchange";

    /**
     * 1.definitiondirect exchange
     * 2.durable="true" rabbitmqNo need to create a new switch when restarting
     * 3.directSwitches are relatively simple,The matching rule is:If the routing key matches,The message is delivered to the relevant queue
     */
    @Bean
    public DirectExchange ecoaExchange() {
        DirectExchange directExchange = new DirectExchange(ecoa_exchange, true, false);
        return directExchange;
    }

}

1.3.2. Queue configuration

package com.ruoyi.report.config;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

/**
 * @ClassName QueueConfig
 * @Description
 * @Author Mr.Huang
 * @Date 2023/9/22 16:26
 * @Version 1.0
 **/
@Component
public class QueueConfig {

    private static final String ecoa_file_upload_queue = "ecoa_file_upload_queue";

    @Bean
    public Queue ecoaFileUploadDispatchQueue() {
        /**
         durable="true" Endurance rabbitmqThere is no need to create a new queue when restarting
         auto-delete Indicates that the message queue will be automatically deleted when it is not in use. The default isfalse
         exclusive  Indicates whether the message queue is only currentlyconnectionTake effect,The default isfalse
         */
        return new Queue(ecoa_file_upload_queue, true, false, false);
    }
}

1.3.3. Binding configuration

package com.ruoyi.report.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

/**
 * @ClassName BindingConfig
 * @Description
 * @Author Mr.Huang
 * @Date 2023/9/22 16:31
 * @Version 1.0
 **/
@Component
public class BindingConfig {
    @Autowired
    private QueueConfig queueConfig;
    @Autowired
    private ExchangeConfig exchangeConfig;

    public static final String ECOA_file_upload_key = "ecoa_file_upload_key";


    @Bean
    public Binding ecoaFileUploadDispatchBinding() {
        return BindingBuilder.bind(queueConfig.ecoaFileUploadDispatchQueue()).to(exchangeConfig.ecoaExchange()).with(ECOA_file_upload_key);
    }
} 

1.3.4. Connection configuration

package com.ruoyi.report.config;

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @ClassName RabbitMqConfig
 * @Description
 * @Author Mr.Huang
 * @Date 2023/9/22 16:14
 * @Version 1.0
 **/
@Configuration
public class RabbitMqConfig {
    /**
     * connection factory
     */
    @Autowired
    private ConnectionFactory connectionFactory;

    /**
     * customizerabbit templateFor receiving and sending data
     * Message confirmation mechanism and callback can be set up
     *
     * @return
     */
    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        return template;
    }
}

1.4. Producer and consumer operation configuration

1.4.1. Producer operation configuration

package com.ruoyi.report.utils;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.ruoyi.common.utils.StringUtils;
import com.ruoyi.report.config.BindingConfig;
import com.ruoyi.report.config.ExchangeConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.UUID;

/**
 * @ClassName MessageUtils
 * @Description
 * @Author Mr.Huang
 * @Date 2023/9/22 16:36
 * @Version 1.0
 **/
@Component
public class MessageUtils {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * Send a message
     * Send shipping information
     * @param message information
     */
    public void sendMessage(Object message) {
        String uuid = UUID.randomUUID().toString();
        CorrelationData correlationId = new CorrelationData(uuid);
        Message msg = MessageBuilder.withBody(JSON.toJSONStringWithDateFormat(message, "yyyy-MM-dd HH:mm:ss", SerializerFeature.WriteNullStringAsEmpty).getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).build();
        rabbitTemplate.convertAndSend(ExchangeConfig.ecoa_exchange, BindingConfig.ECOA_file_upload_key, msg, correlationId);
    }
}

1.4.2. Consumer operation configuration

package com.ruoyi.report.consumer;

import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import com.ruoyi.report.config.RabbitMqConfig;
import com.ruoyi.report.entity.open.PrintResult;
import com.ruoyi.report.service.open.PrintSendLogService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @ClassName PrintFeedbackConsumer
 * @Description
 * @Author Mr.Huang
 * @Date 2024/4/30 10:23
 * @Version 1.0
 **/
@Slf4j
@Component
public class PrintFeedbackConsumer {

    @Autowired
    private PrintSendLogService printSendLogService;

    @RabbitListener(queues = {RabbitMqConfig.print_4pl_dispatch_info_feedback_queue}, containerFactory = "printContainerFactory")
    public void receiveMq(Message message, Channel channel) {
        try {
            String body = new String(message.getBody());
            log.info("accept【PrintResult push】RabbitMQinformation:"+body);
            JSONObject objJson = JSONObject.parseObject(body);
            Thread.sleep(1000);
            PrintResult printResult = JSONObject.toJavaObject(objJson, PrintResult.class);
            printSendLogService.updatePrintSendLog(printResult);
        }catch (Exception e){
            log.error("",e);
            try {
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
    }
}

2. Multiple RabbitMQ configurations

Maven coordinates are consistent with the single RabbitMQ configuration above

2.1. yaml configuration

 rabbitmq:
    first:
      host: xx.xxx.xxx.xxx
      port: xxxx
      username: xxxx
      password: xxxxxx
      virtual-host: xxxx
      publisher-returns: true
      publisher-confirms: true
      listener:
          simple:
            default-requeue-rejected: true
            retry:
              enabled: false
              max-attempts: 3
              initial-interval: 5000          
    second:
      host: xx.xxx.xxx.xxx
      port: xxxx
      username: xxxx
      password: xxxxxx
      publisher-returns: true
      publisher-confirms: true
      virtual-host: xxxx
      listener:
          simple:
            default-requeue-rejected: true
            retry:
              enabled: false
              max-attempts: 3
              initial-interval: 5000 

2.2.java configuration class

package com.ruoyi.report.config;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

/**
 * @ClassName RabbitMqConfig
 * @Description
 * @Author Mr.Huang
 * @Date 2023/9/22 16:14
 * @Version 1.0
 **/
@Configuration
public class RabbitMqConfig {
    // FirstMQElectronic drug testing queue andkey
    public static final String ECOA_file_upload_queue = "ecoa_file_upload_queue";
    public static final String ECOA_file_upload_key = "ecoa_file_upload_key";

    // the secondMQDocument printing platform queue andkey
    public static final String print_tms_dispatch_info_queue = "print_tms_dispatch_info_queue";
    public static final String print_4pl_dispatch_info_feedback_queue = "print_4pl_dispatch_info_feedback_queue";
    public static final String print_tms_dispatch_info_key = "print_tms_dispatch_info_key";
    public static final String print_4pl_dispatch_info_feedback_key = "print_4pl_dispatch_info_feedback_key";
    /** Switch name */
    public static final String EXCHANGE = "ecoaExchange";
    public static final String EXCHANGE2 = "tms_exchange";

    /** FirstrabbitMqqueue */
    @Bean(name = "ECOAConnectionFactory")
    @Primary
    public ConnectionFactory ECOAConnectionFactory(@Value("{spring.rabbitmq.first.host}") String host,
                                                  @Value("{spring.rabbitmq.first.port}") int port,
                                                  @Value("{spring.rabbitmq.first.username}") String username,
                                                  @Value("{spring.rabbitmq.first.password}") String password,
                                                  @Value("{spring.rabbitmq.first.virtual-host}") String virtualHost) {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        return connectionFactory;
    }

    /** the secondrabbitMqqueue */
    @Bean(name = "printConnectionFactory")
    public ConnectionFactory printConnectionFactory(@Value("{spring.rabbitmq.second.host}") String host,
                                                   @Value("{spring.rabbitmq.second.port}") int port,
                                                   @Value("{spring.rabbitmq.second.username}") String username,
                                                   @Value("{spring.rabbitmq.second.password}") String password,
                                                   @Value("{spring.rabbitmq.second.virtual-host}") String virtualHost) {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        return connectionFactory;
    }
    /** FirstrabbitMqOperation template */
    @Bean(name="ECOARabbitTemplate")
    @Primary
    public RabbitTemplate fplRabbitTemplate(@Qualifier("ECOAConnectionFactory") ConnectionFactory connectionFactory){
        RabbitTemplate firstRabbitTemplate = new RabbitTemplate(connectionFactory);
        return firstRabbitTemplate;
    }
    /** the secondrabbitMqOperation template */
    @Bean(name="printRabbitTemplate")
    public RabbitTemplate tcscRabbitTemplate(@Qualifier("printConnectionFactory") ConnectionFactory connectionFactory){
        RabbitTemplate secondRabbitTemplate = new RabbitTemplate(connectionFactory);
        return secondRabbitTemplate;
    }
    /** FirstrabbitMqconnection factory */
    @Bean(name="ECOAContainerFactory")
    public SimpleRabbitListenerContainerFactory ECOAContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
                                                                    @Qualifier("ECOAConnectionFactory") ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setMaxConcurrentConsumers(5);
        factory.setConcurrentConsumers(1);
        factory.setPrefetchCount(1);
        configurer.configure(factory, connectionFactory);
        return factory;
    }
    /** the secondrabbitMqconnection factory */
    @Bean(name="printContainerFactory")
    public SimpleRabbitListenerContainerFactory printContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
                                                                     @Qualifier("printConnectionFactory") ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setMaxConcurrentConsumers(5);
        factory.setConcurrentConsumers(1);
        factory.setPrefetchCount(1);
        configurer.configure(factory, connectionFactory);
        return factory;
    }
    /** FirstmqBind queue to switch */
    @Bean
    public String runECOAQueue(@Qualifier("ECOAConnectionFactory") ConnectionFactory connectionFactory) {
        System.out.println("configuration ECOAQueue ........................");
        Connection connection = connectionFactory.createConnection();
        Channel channel = connection.createChannel(false);
        try {
            channel.exchangeDeclare(EXCHANGE, "direct", true, false, null);
            // Document push to electronic drug testing queue
            channel.queueDeclare(ECOA_file_upload_queue, true, false, false, null);
            channel.queueBind(ECOA_file_upload_queue, EXCHANGE, ECOA_file_upload_key);

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            return "ECOAQueue";
        }
    }
    /** the secondmqBind queue to switch */
    @Bean
    public String runPrintQueue(@Qualifier("printConnectionFactory") ConnectionFactory connectionFactory) {
        System.out.println("configuration printQueue ........................");
        Connection connection = connectionFactory.createConnection();
        Channel channel = connection.createChannel(false);
        try {
            channel.exchangeDeclare(EXCHANGE2, "direct", true, false, null);
            // Document push document printing platform queue
            channel.queueDeclare(print_tms_dispatch_info_queue, true, false, false, null);
            channel.queueBind(print_tms_dispatch_info_queue, EXCHANGE2, print_tms_dispatch_info_key);
            // Document printing platform feedback queue
            channel.queueDeclare(print_4pl_dispatch_info_feedback_queue,true,false,false,null);
            channel.queueBind(print_4pl_dispatch_info_feedback_queue,EXCHANGE2,print_4pl_dispatch_info_feedback_key);

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            return "printQueue";
        }
    }
}

Note: You need to comment out the original MQ: switch, queue, and binding configuration classes, leaving only this configuration file. This configuration file has already bound the corresponding: switch and queue. You just need to pay attention to the queue name. , Don’t bind the wrong switch

2.3. Producer and consumer operation configuration

2.3.1. Producer operation configuration

package com.ruoyi.report.utils;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.ruoyi.report.config.RabbitMqConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.UUID;

/**
 * @ClassName MessageUtils
 * @Description
 * @Author Mr.Huang
 * @Date 2023/9/22 16:36
 * @Version 1.0
 **/
@Component
public class MessageUtils {

    @Resource(name = "ECOARabbitTemplate")
    private RabbitTemplate ECOARabbitTemplate;

    @Resource(name = "printRabbitTemplate")
    private RabbitTemplate printRabbitTemplate;

    /**
     * TowardsECOASend a message
     * Send shipping information
     * @param message information
     */
    public void sendMessage(Object message) {
        String uuid = UUID.randomUUID().toString();
        CorrelationData correlationId = new CorrelationData(uuid);
        Message msg = MessageBuilder.withBody(JSON.toJSONStringWithDateFormat(message, "yyyy-MM-dd HH:mm:ss", SerializerFeature.WriteNullStringAsEmpty).getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).build();
        ECOARabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE, RabbitMqConfig.ECOA_file_upload_key, msg, correlationId);
    }

    /**
     * TowardsprintSend a message
     * Send dispatch information
     * @param message information
     */
    public void sendPrintMessage(Object message) {
        String uuid = UUID.randomUUID().toString();
        CorrelationData correlationId = new CorrelationData(uuid);
        Message msg = MessageBuilder.withBody(JSON.toJSONStringWithDateFormat(message, "yyyy-MM-dd HH:mm:ss", SerializerFeature.WriteNullStringAsEmpty).getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).build();
        printRabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE2, RabbitMqConfig.print_tms_dispatch_info_key, msg, correlationId);
    }
} 

2.3.1. Consumer operation configuration

package com.ruoyi.report.consumer;

import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import com.ruoyi.report.config.RabbitMqConfig;
import com.ruoyi.report.entity.open.PrintResult;
import com.ruoyi.report.service.open.PrintSendLogService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @ClassName PrintFeedbackConsumer
 * @Description
 * @Author Mr.Huang
 * @Date 2024/4/30 10:23
 * @Version 1.0
 **/
@Slf4j
@Component
public class PrintFeedbackConsumer {

    @Autowired
    private PrintSendLogService printSendLogService;

    @RabbitListener(queues = {RabbitMqConfig.print_4pl_dispatch_info_feedback_queue}, containerFactory = "printContainerFactory")
    public void receiveMq(Message message, Channel channel) {
        try {
            String body = new String(message.getBody());
            log.info("accept【PrintResult push】RabbitMQinformation:"+body);
            JSONObject objJson = JSONObject.parseObject(body);
            Thread.sleep(1000);
            PrintResult printResult = JSONObject.toJavaObject(objJson, PrintResult.class);
            printSendLogService.updatePrintSendLog(printResult);
        }catch (Exception e){
            log.error("",e);
            try {
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
    }
}

Same operation as a single RabbitMQ consumer, just pay attention to the queue and connection factory to be consumed

3. Summary

**When configuring a single RabbitMQ, you do not need to care about how the underlying connection factory is configured. When the yaml content is filled in, it will automatically configure the connection factory. You only need to bind the switch, queue, and configuration. When you need to configure multiple mqs, you need to manually configure the connection factory yourself. It is not that you can only configure two RabbitMQs. You can configure more in this format. The only thing to note is not to confuse these queues with switches. **