1. Version description
erlang and rabbitmq release notes
https://www.rabbitmq.com/which-erlang.html
Confirm the mq version that needs to be installed and the corresponding erlang version.
2. Download the installation file
RabbitMQ download address:
https://packagecloud.io/rabbitmq/rabbitmq-server
Erlang download address:
https://packagecloud.io/rabbitmq/erlang
RabbitMQ delayed message plug-in download
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
Download the file as shown in the figure
3. Installation steps
3.1. Check whether erlang and rabbitmq have been installed. If found, they need to be deleted.
rpm -qa | grep rabbitmq-server
rpm -qa | grep erlang
# delete
yum -y remove rabbitmq-server.noarch
3.2, Install erlang locally
yum localinstall erlang-23.2.7-2.el7.x86_64.rpm
# Query the installed version
erl -version
# Erlang (SMP,ASYNC_THREADS,HIPE) (BEAM) emulator version xxx
3.3, Install rabbitmq locally
yum localinstall rabbitmq-server-3.9.0-1.el7.noarch.rpm
# start uprabbitmq
systemctl start rabbitmq-server
# Checkrabbitmqstate
systemctl status rabbitmq-server
# set uprabbitmqService starts automatically at boot
systemctl enable rabbitmq-server
# closurerabbitmqServe
systemctl stop rabbitmq-server
# RestartrabbitmqServe
systemctl restart rabbitmq-server
3.4, mq port open:
firewall-cmd --zone=public --add-port=5672/tcp --permanent
firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --reload
firewall-cmd --zone=public --list-ports
3.5, install mq management interface
# Enable admin interface plug-in
rabbitmq-plugins enable rabbitmq_management
curl http://localhost:15672 you can openwebManagement page
# rabbitmqThere is a default account passwordguest,But this situation is limited to this machinelocalhostto visit,So you need to add a remote login user
# Add user
rabbitmqctl add_user username password
rabbitmqctl add_user admin 123456
# Set user role,Assign operation permissions
rabbitmqctl set_user_tags username Role
rabbitmqctl set_user_tags admin administrator
# Add resource permissions to users(Grant all access to the virtual machine root node)
rabbitmqctl set_permissions -p / username ".*" ".*" ".*"
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
# There are four types of roles:
# administrator:Can log in to the console、View all information、And torabbitmqManage
# monToring:monitor;Log in to the console,View all information
# policymaker:strategist;Log in to the console to specify policies
# managment:Ordinary administrator;Login control
# change Password
rabbitmqctl change_ password username New Password
# delete users
rabbitmqctl delete_user username
# View user list
rabbitmqctl list_users
3.6, Delay message plug-in installation:
# Copy the plug-in package first to /usr/lib/rabbitmq/lib/rabbitmq_server-3.9.0/plugins
cp rabbitmq_delayed_message_exchange-3.9.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.9.0/plugins/
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
#Restartmq
systemctl restart rabbitmq-server
rabbitmq-plugins list
3.7, login test
Access address: ip:15672 Account password: admin 123456
Find the switch exchange and see if there is a delayed message type.
Then you can write code to connect and send messages.
4. Java code
4.1, pom introduction:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
4.2, configuration class:
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
}
4.3, message definition configuration class:
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class OrderRabbitMQConfig {
@Autowired
private RabbitAdmin rabbitAdmin;
//================================Order delay=================================
@Bean
CustomExchange order_pay_delay_exchange() {
HashMap<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange("order_pay_delay_exchange", "x-delayed-message", true, false, args);
}
@Bean
public Queue order_pay_delay_queue() {
Queue queue = new Queue("order_pay_delay_queue", true, false, false);
rabbitAdmin.declareQueue(queue);
return queue;
}
@Bean
public Binding order_pay_delay_binding() {
return BindingBuilder.bind(order_pay_delay_queue())
.to(order_pay_delay_exchange()).with("order_pay_delay_routing").noargs();
}
//================================Order payment notification======================================
@Bean
public DirectExchange order_pay_notify_exchange() {
return new DirectExchange("order_pay_notify_exchange", true, false);
}
@Bean
public Queue order_pay_notify_direct_queue() {
Map<String, Object> argsMap = new HashMap<>();
argsMap.put("x-max-priority", 5);
Queue queue = new Queue("order_pay_notify_queue", true, false, false, argsMap);
rabbitAdmin.declareQueue(queue);
return queue;
}
@Bean
public Binding ctc_bidding_auction_pay_notify_binding() {
return BindingBuilder.bind(order_pay_notify_direct_queue())
.to(order_pay_notify_exchange()).with("order_pay_notify_routing");
}
}
4.4, message sending class:
import cn.hutool.json.JSONUtil;
import com.xxx.rabbitmq.dto.PayOrderNotifyDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class RabbitMQSendUtils {
private static RabbitTemplate rabbitTemplate;
@Autowired
public RabbitMQSendUtils(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
/**
* Order payment delay notification、sendMQinformation
*/
public static void sendPayDelayMessage(PayOrderNotifyDto dto, final Integer delayTimes) {
//Send message to delay queue
String msg = JSONUtil.toJsonStr(dto);
log.info("Order payment delay notification、sendMQinformation: {}, delayTimes={}", msg, delayTimes);
rabbitTemplate.convertAndSend("order_pay_delay_exchange", "order_pay_delay_routing", msg, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//Set the delay millisecond value for the message
message.getMessageProperties().setDelay(delayTimes);
return message;
}
});
}
/**
* Order payment notification,sendMQinformation
*/
public static void sendPayNotifyMsg(PayOrderNotifyDto dto) {
log.info("Order payment notification,sendMQinformation: {}", dto);
rabbitTemplate.convertAndSend("order_pay_notify_exchange", "order_pay_notify_routing", JSONUtil.toJsonStr(dto));
}
}
4.5, message monitoring consumer category:
import cn.hutool.json.JSONUtil;
import com.xxx.rabbitmq.dto.PayOrderNotifyDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* MQConsumption monitoring
*/
@Slf4j
@Component
public class OrderMQListener {
/**
* Order delay notification information
*/
@RabbitListener(queues = {"order_pay_delay_queue"})
public void payDelayNotify(Message message) {
try {
String msg = new String(message.getBody());
log.info("【Consumption】Order delay notification MQ Message content: {}, Message={}", msg, message);
//Payment order changed to timeout and not paid》Cancel
PayOrderNotifyDto dto = JSONUtil.toBean(msg, PayOrderNotifyDto.class);
} catch (Exception e) {
log.error("Order delay notification Message consumption failed:", e);
}
}
/**
* Order payment notification information
*/
@RabbitListener(queues = {"order_pay_notify_queue"})
public void payNotify(Message message) {
try {
String msg = new String(message.getBody());
log.info("Order payment notification MQ Message content:{}, {}", msg, message);
PayOrderNotifyDto payOrderNotifyDto = JSONUtil.toBean(msg, PayOrderNotifyDto.class);
} catch (Exception e) {
log.error("Order payment notification Message consumption failed:", e);
}
}
}