14. Springboot integrates RabbitMQ

Table of contents

1 Introduction

  1. What is RabbitMQ?

  2. Install RabbitMQ

  3. Springboot integrates RabbitMQ

4.1. Add dependencies

4.2. Add configuration

4.3. Add controller as producer

4.4. Set producer message confirmation CallBack

4.5. Add Consumer as a consumer

4.6. Start the program and access


1 Introduction

Message Queue (MQ for short) is an asynchronous messaging middleware that decouples communication between applications. Applications can send messages to queues without knowing who will receive them. Receiving applications can retrieve messages from the queue without knowing who sent them. Message queue is an important middleware that helps asynchronous, reliable, and scalable communication between applications. Common message queue middleware include ActiveMQ, RabbitMQ, Kafka… Today we will introduce RabbitMQ.

2. What is RabbitMQ?

RabbitMQ is an open source message queue server that implements the AMQP (Advanced Message Queuing Protocol) standard. AMQP is an application layer protocol designed for message-oriented middleware. Clients and message middleware based on this protocol can transmit messages and are not restricted by products, development languages, etc.

AMQP: Advanced Message Queue, Advanced Message Queuing Protocol. It is an open standard for application layer protocols and is designed for message-oriented middleware. Clients and message middleware based on this protocol can transmit messages and are not restricted by products, development languages, etc.

Key features of RabbitMQ include:

  • High performance: RabbitMQ is capable of handling large volumes of messages and provides low-latency performance.
  • Reliability: RabbitMQ provides persistent message storage to ensure that messages will not be lost.
  • Scalability: RabbitMQ can easily scale to meet growing demand.
  • Flexibility: RabbitMQ supports multiple programming languages ​​and clients and provides rich functionality and configuration options.

Common application scenarios of RabbitMQ include:

  • Distributed systems: RabbitMQ can be used for asynchronous communication in distributed systems.
  • Asynchronous processing: RabbitMQ can be used to process tasks asynchronously to improve system performance and efficiency.
  • Message queue: RabbitMQ can be used to implement message queues, such as task queues, publish/subscribe queues, etc.
  • Message notifications: RabbitMQ can be used to send message notifications, such as emails or SMS messages.

3. Install RabbitMQ

RabbitMQ is an open source implementation of AMQP developed in Erlang language. Therefore, you need to install the Erlang environment before installing RabbitMQ.

Erlang download address: Downloads – Erlang/OTP

RabbitMQ download address: Installing RabbitMQ | RabbitMQ

Install Erlang first, and then install RabbitMQ. The installation project is relatively simple, and the next step is a no-brainer.

After installing RabbitMQ, open the cmd window and enter the sbin of the RabbitMQ installation directory. My directory is:

D:\RabbitMQ Server\rabbitmq_server-3.13.0\sbin

Then enter the following command to install the plug-in:

rabbitmq-plugins enable rabbitmq_management

The following prompt indicates that the installation was successful.

To verify whether RabbitMQ is installed successfully, enter the following command:

rabbitmqctl status

At this time, you can see the RabbitMQ management page by directly accessing http://127.0.0.1:15672. The default port of RabbitMQ is 15672, and the default management page account and password are guest.

After logging in, you can see an initial management interface:

4. Springboot integrates RabbitMQ

4.1. Add dependencies

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-parent</artifactId>
       <version>3.2.3</version>
       <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>springboot-rabbitmq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>springboot-rabbitmq</name>
    <description>springboot-rabbitmq</description>
    <properties>
       <java.version>17</java.version>
    </properties>
    <dependencies>
       <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter</artifactId>
       </dependency>
       <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-devtools</artifactId>
          <scope>runtime</scope>
          <optional>true</optional>
       </dependency>
       <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-configuration-processor</artifactId>
          <optional>true</optional>
       </dependency>
       <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-test</artifactId>
          <scope>test</scope>
       </dependency>
       <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-web</artifactId>
       </dependency>
       <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-amqp</artifactId>
       </dependency>
       <dependency>
          <groupId>cn.hutool</groupId>
          <artifactId>hutool-all</artifactId>
          <version>5.8.24</version>
       </dependency>
    </dependencies>

    <build>
       <plugins>
          <plugin>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-maven-plugin</artifactId>
          </plugin>
       </plugins>
    </build>
</project>

4.2. Add configuration

# rabbitmqConnection configuration information
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

# Ensure messages are returned if not received by the queue
spring.rabbitmq.publisher-returns=true
# After the message is successfully published to the exchange, the callback method will be triggered.
spring.rabbitmq.publisher-confirm-type=correlated

4.3. Add controller as producer

Create a new controller for sending messages.

package com.example.springbootrabbitmq.controller;

import com.example.springbootrabbitmq.config.MqProducerCallBack;
import jakarta.annotation.Resource;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("push/message")
public class PushMessageController {

    @Resource
    private RabbitTemplate rabbitTemplate;
    @Resource
    private MqProducerCallBack mqProducerCallBack;


    @GetMapping("test")
    public String sendMessage() {
        // correlationData:There is only one inside the object id Attributes,Used to indicate the uniqueness of the current message。
        CorrelationData correlationData = new CorrelationData("id_" + System.currentTimeMillis());
        // Message confirmation and return callback
        rabbitTemplate.setConfirmCallback(mqProducerCallBack);
        rabbitTemplate.setReturnsCallback(mqProducerCallBack);
        // message sending
        rabbitTemplate.convertAndSend("my-queue", "hello world", message -> {
            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            return message;
        }, correlationData);
        return "publisher success...";
    }
}

4.4. Set producer message confirmation CallBack

package com.example.springbootrabbitmq.config;

import cn.hutool.json.JSONUtil;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Component
public class MqProducerCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {

    /**
     * correlationData:There is only one inside the object id Attributes,Used to indicate the uniqueness of the current message。
     * ack:Message delivered tobroker status,truesuccess,falsefail。
     * cause:Reason for delivery failure。
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (!ack) {
            System.err.println("informationID=" + correlationData.getId() + "Delivery failed,Reason for failure:" + cause);
        } else {
            System.out.println("Message delivery received confirmation,correlationData=" + correlationData.getId());
        }
    }

    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        System.out.println("Return message result:" + JSONUtil.toJsonStr(returnedMessage));
    }

}

4.5. Add Consumer as a consumer

package com.example.springbootrabbitmq.consumer;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
public class PushMessageConsumer {


    /**
     * basicAck:Indicates successful confirmation,After using this receipt method,The message will berabbitmq broker delete。
     * void basicAck(long deliveryTag, boolean multiple)
     * deliveryTag:Represents the message delivery sequence number,Every time a message is consumed or the message is re-delivered,,deliveryTagwill increase。In manual message confirmation mode,We can specifydeliveryTagnews proceedingack、nack、rejectWait for operations。
     * multiple:Whether to confirm in batches,The value is true will be a one-time ackAll messages less than the current deliveryTag news。
     * */
    @RabbitListener(queuesToDeclare = @Queue(value = "my-queue"))
    @RabbitHandler
    public void consume(String msg, Channel channel, Message message) throws IOException {
        try {
            System.out.println("Consumer receives message:" + msg);

            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            System.out.println("deliveryTag:" + message.getMessageProperties().getDeliveryTag());
            System.out.println("redelivered:" + message.getMessageProperties().getRedelivered());
        } catch (Exception e) {
            if (message.getMessageProperties().getRedelivered()) {
                System.err.println("The message has been repeated and failed to be processed,Decline to receive again!");
                /**
                 * reject message,requeue=false Indicates that he will not rejoin the team,If a dead letter queue is configured, enter the dead letter queue.
                 * basicReject:reject message,andbasicNackThe difference is that batch operations cannot be performed,Other usages are very similar。
                 * deliveryTag:Represents the message delivery sequence number。
                 * requeue:The value is true The message will be requeued。
                 */
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
            } else {
                System.out.println("The message is about to be returned to the queue again for processing!");
                /**
                 * requeueWhether to return to the queue,trueRe-enlist
                 * deliveryTag:Represents the message delivery sequence number。
                 * multiple:Whether to confirm in batches。
                 * requeue:The value is true The message will be requeued。
                 */
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
        }
    }

} 

4.6. Start the program and access

Browser access: http://localhost:8080/push/message/test simulates messages for push.

Checking the console, we found that the consumer printed consumption information normally.

Open the RabbitMQ management console and you can find our message queue my-queue information.

You can view the filling of the message queue, message delivery status, etc.