Centos installs RabbitMQ, JavaSpring sends RabbitMQ delayed messages, and JavaSpring consumes RabbitMQ messages

1. Version description

erlang and rabbitmq release notes
https://www.rabbitmq.com/which-erlang.html
Confirm the mq version that needs to be installed and the corresponding erlang version.

2. Download the installation file

RabbitMQ download address:
https://packagecloud.io/rabbitmq/rabbitmq-server

Erlang download address:
https://packagecloud.io/rabbitmq/erlang

RabbitMQ delayed message plug-in download
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

Download the file as shown in the figure

Insert image description here

3. Installation steps

3.1. Check whether erlang and rabbitmq have been installed. If found, they need to be deleted.

 rpm -qa | grep rabbitmq-server
    rpm -qa | grep erlang
    # delete
    yum -y remove rabbitmq-server.noarch 

3.2, Install erlang locally

 yum localinstall erlang-23.2.7-2.el7.x86_64.rpm
    # Query the installed version
    erl -version
    # Erlang (SMP,ASYNC_THREADS,HIPE) (BEAM) emulator version xxx 

3.3, Install rabbitmq locally

 yum localinstall rabbitmq-server-3.9.0-1.el7.noarch.rpm 
 # start uprabbitmq
    systemctl start rabbitmq-server

    # Checkrabbitmqstate
    systemctl status rabbitmq-server

    # set uprabbitmqService starts automatically at boot
    systemctl enable rabbitmq-server

    # closurerabbitmqServe
    systemctl stop rabbitmq-server

    # RestartrabbitmqServe
    systemctl restart rabbitmq-server 

3.4, mq port open:

 firewall-cmd --zone=public --add-port=5672/tcp --permanent
    firewall-cmd --zone=public --add-port=15672/tcp --permanent
    firewall-cmd --reload
    firewall-cmd --zone=public --list-ports 

3.5, install mq management interface

 # Enable admin interface plug-in
    rabbitmq-plugins enable rabbitmq_management

    curl http://localhost:15672 you can openwebManagement page

    # rabbitmqThere is a default account passwordguest,But this situation is limited to this machinelocalhostto visit,So you need to add a remote login user

    # Add user
    rabbitmqctl add_user username password

    rabbitmqctl add_user admin 123456

    # Set user role,Assign operation permissions
    rabbitmqctl set_user_tags username Role

    rabbitmqctl set_user_tags admin administrator

    # Add resource permissions to users(Grant all access to the virtual machine root node)
    rabbitmqctl set_permissions -p / username ".*" ".*" ".*"

    rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

    # There are four types of roles:
    # administrator:Can log in to the console、View all information、And torabbitmqManage
    # monToring:monitor;Log in to the console,View all information
    # policymaker:strategist;Log in to the console to specify policies
    # managment:Ordinary administrator;Login control

    # change Password
    rabbitmqctl change_ password username New Password

    # delete users
    rabbitmqctl delete_user username

    # View user list
    rabbitmqctl list_users 

3.6, Delay message plug-in installation:

 # Copy the plug-in package first to     /usr/lib/rabbitmq/lib/rabbitmq_server-3.9.0/plugins
    cp rabbitmq_delayed_message_exchange-3.9.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.9.0/plugins/
    rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    #Restartmq 
    systemctl restart rabbitmq-server
    rabbitmq-plugins list 

3.7, login test

Access address: ip:15672 Account password: admin 123456
Login interface

Find the switch exchange and see if there is a delayed message type.
Insert image description here

Then you can write code to connect and send messages.

4. Java code

4.1, pom introduction:

 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency> 

4.2, configuration class:

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }

} 

4.3, message definition configuration class:

 import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class OrderRabbitMQConfig {

    @Autowired
    private RabbitAdmin rabbitAdmin;
    //================================Order delay=================================
    @Bean
    CustomExchange order_pay_delay_exchange() {
        HashMap<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange("order_pay_delay_exchange", "x-delayed-message", true, false, args);
    }
    @Bean
    public Queue order_pay_delay_queue() {
        Queue queue = new Queue("order_pay_delay_queue", true, false, false);
        rabbitAdmin.declareQueue(queue);
        return queue;
    }
    @Bean
    public Binding order_pay_delay_binding() {
        return BindingBuilder.bind(order_pay_delay_queue())
                .to(order_pay_delay_exchange()).with("order_pay_delay_routing").noargs();
    }

    //================================Order payment notification======================================
    @Bean
    public DirectExchange order_pay_notify_exchange() {
        return new DirectExchange("order_pay_notify_exchange", true, false);
    }
    @Bean
    public Queue order_pay_notify_direct_queue() {
        Map<String, Object> argsMap = new HashMap<>();
        argsMap.put("x-max-priority", 5);
        Queue queue = new Queue("order_pay_notify_queue", true, false, false, argsMap);
        rabbitAdmin.declareQueue(queue);
        return queue;
    }
    @Bean
    public Binding ctc_bidding_auction_pay_notify_binding() {
        return BindingBuilder.bind(order_pay_notify_direct_queue())
                .to(order_pay_notify_exchange()).with("order_pay_notify_routing");
    }
} 

4.4, message sending class:

 import cn.hutool.json.JSONUtil;
import com.xxx.rabbitmq.dto.PayOrderNotifyDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class RabbitMQSendUtils {

    private static RabbitTemplate rabbitTemplate;

    @Autowired
    public RabbitMQSendUtils(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    /**
     * Order payment delay notification、sendMQinformation
     */
    public static void sendPayDelayMessage(PayOrderNotifyDto dto, final Integer delayTimes) {
        //Send message to delay queue
        String msg = JSONUtil.toJsonStr(dto);
        log.info("Order payment delay notification、sendMQinformation: {}, delayTimes={}", msg, delayTimes);
        rabbitTemplate.convertAndSend("order_pay_delay_exchange", "order_pay_delay_routing", msg, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //Set the delay millisecond value for the message
                message.getMessageProperties().setDelay(delayTimes);
                return message;
            }
        });
    }

    /**
     * Order payment notification,sendMQinformation
     */
    public static void sendPayNotifyMsg(PayOrderNotifyDto dto) {
        log.info("Order payment notification,sendMQinformation: {}", dto);
        rabbitTemplate.convertAndSend("order_pay_notify_exchange", "order_pay_notify_routing", JSONUtil.toJsonStr(dto));
    }
} 

4.5, message monitoring consumer category:

 import cn.hutool.json.JSONUtil;
import com.xxx.rabbitmq.dto.PayOrderNotifyDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * MQConsumption monitoring
 */
@Slf4j
@Component
public class OrderMQListener {
    /**
     * Order delay notification information
     */
    @RabbitListener(queues = {"order_pay_delay_queue"})
    public void payDelayNotify(Message message) {
        try {
            String msg = new String(message.getBody());
            log.info("【Consumption】Order delay notification MQ Message content: {}, Message={}", msg, message);
            //Payment order changed to timeout and not paid》Cancel
            PayOrderNotifyDto dto = JSONUtil.toBean(msg, PayOrderNotifyDto.class);

        } catch (Exception e) {
            log.error("Order delay notification Message consumption failed:", e);
        }
    }
    /**
     * Order payment notification information
     */
    @RabbitListener(queues = {"order_pay_notify_queue"})
    public void payNotify(Message message) {
        try {
            String msg = new String(message.getBody());
            log.info("Order payment notification MQ Message content:{}, {}", msg, message);
            PayOrderNotifyDto payOrderNotifyDto = JSONUtil.toBean(msg, PayOrderNotifyDto.class);
        } catch (Exception e) {
            log.error("Order payment notification Message consumption failed:", e);
        }
    }

}