1. SpringBoot integrates MQTT
Create a project and introduce MQTT dependencies:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.3.12.RELEASE</version>
</dependency>
<!-- spring-integration-mqttrely -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>6.1.2</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.28</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
1. yml configuration file
In the application.yml file, define the MQTT connection information.
## MQTT Basic connection parameters ##
mqtt:
host: tcp://192.168.xxx.xxx:1883
# host: tcp://broker.emqx.io:1883
userName: admin
passWord: xxxxxx
qos: 1
clientId: ClientId_local #ClientId_localMust be unique。
timeout: 10 # overtime time
keepalive: 30 # Keep connected time
clearSession: true # clear session(Set asfalse,Disconnect,Use the original session after reconnecting Keep subscribed topics,Can receive messages while offline)
topic1: A/b/# # Wildcard topics can only be used for subscriptions,Not available for publishing。+:Represents a single level wildcard,#:Represents multi-level wildcards
topic2: A/abc
topic3: ABC
2. MQTT configuration class
Create a MqttConfig configuration class and obtain the MQTT connection parameters of the configuration file. Create MyMqttClient class and inject Spring.
@Slf4j
@Configuration
public class MqttConfig {
@Value("{mqtt.host}")
public String host;
@Value("{mqtt.username}")
public String username;
@Value("{mqtt.password}")
public String password;
@Value("{mqtt.clientId}")
public String clientId;
@Value("{mqtt.timeout}")
public int timeOut;
@Value("{mqtt.keepalive}")
public int keepAlive;
@Value("{mqtt.clearSession}")
public boolean clearSession;
@Value("{mqtt.topic1}")
public String topic1;
@Value("{mqtt.topic2}")
public String topic2;
@Value("{mqtt.topic3}")
public String topic3;
@Bean//injectionSpring
public MyMqttClient myMqttClient() {
MyMqttClient myMqttClient = new MyMqttClient(host, username, password, clientId, timeOut, keepAlive, clearSession);
for (int i = 0; i < 10; i++) {
try {
myMqttClient.connect();
// You can subscribe to topics here,Recommended to put MqttCallbackExtended.connectCompletein method
//myMqttClient.subscribe("ABC", 1);
return myMqttClient;
} catch (MqttException e) {
log.error("== MqttConfig ==> MQTT connect exception, connect time = {}", i);
try {
Thread.sleep(2000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
return myMqttClient;
}
}
3. MQTT client encapsulation class
Create the MQTT client encapsulation class MyMqttClient. Perform operations on MQTT Broker.
@Slf4j
public class MyMqttClient {
/**
* MQTT Broker Basic connection parameters,username、Password is an optional parameter
*/
private String host;
private String username;
private String password;
private String clientId;
private int timeout;
private int keepalive;
private boolean clearSession;
/**
* MQTT client
*/
private static MqttClient client;
public static MqttClient getClient() {
return client;
}
public static void setClient(MqttClient client) {
MyMqttClient.client = client;
}
public MyMqttClient(String host, String username, String password, String clientId, int timeOut, int keepAlive, boolean clearSession) {
this.host = host;
this.username = username;
this.password = password;
this.clientId = clientId;
this.timeout = timeOut;
this.keepalive = keepAlive;
this.clearSession = clearSession;
}
/**
* set up MQTT Broker Basic connection parameters
*
* @param username
* @param password
* @param timeout
* @param keepalive
* @return
*/
public MqttConnectOptions setMqttConnectOptions(String username, String password, int timeout, int keepalive) {
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setConnectionTimeout(timeout);
options.setKeepAliveInterval(keepalive);
options.setCleanSession(clearSession);
options.setAutomaticReconnect(true);
return options;
}
/**
* connect MQTT Broker,get MqttClientconnection object
*/
public void connect() throws MqttException {
if (client == null) {
client = new MqttClient(host, clientId, new MemoryPersistence());
// Set callback
client.setCallback(new MyMqttCallback(MyMqttClient.this));
}
// Connection parameters
MqttConnectOptions mqttConnectOptions = setMqttConnectOptions(username, password, timeout, keepalive);
if (!client.isConnected()) {
client.connect(mqttConnectOptions);
} else {
client.disconnect();
client.connect(mqttConnectOptions);
}
log.info("== MyMqttClient ==> MQTT connect success");//No exception occurred,The connection is successful
}
/**
* release,defaultqosfor0,Non-endurance
*
* @param pushMessage
* @param topic
*/
public void publish(String pushMessage, String topic) {
publish(pushMessage, topic, 0, false);
}
/**
* make an announcement
*
* @param pushMessage
* @param topic
* @param qos
* @param retained:retain
*/
public void publish(String pushMessage, String topic, int qos, boolean retained) {
MqttMessage message = new MqttMessage();
message.setPayload(pushMessage.getBytes());
message.setQos(qos);
message.setRetained(retained);
MqttTopic mqttTopic = MyMqttClient.getClient().getTopic(topic);
if (null == mqttTopic) {
log.error("== MyMqttClient ==> topic is not exist");
}
MqttDeliveryToken token;//Delivery:delivery
synchronized (this) {//Notice:This must be synchronized,otherwise,in multithreadingpublishin the case of,Threads will deadlock,For analysis, please see the supplement at the end of the article.
try {
token = mqttTopic.publish(message);//It is also sent to the execution queue,Wait for execution thread to execute,Send messages to message middleware
token.waitForCompletion(1000L);
} catch (MqttPersistenceException e) {
e.printStackTrace();
} catch (MqttException e) {
e.printStackTrace();
}
}
}
/**
* Subscribe to a topic,qosThe default is0
*
* @param topic
*/
public void subscribe(String topic) {
subscribe(topic, 0);
}
/**
* Subscribe to a topic
*
* @param topic
* @param qos
*/
public void subscribe(String topic, int qos) {
try {
MyMqttClient.getClient().subscribe(topic, qos);
} catch (MqttException e) {
e.printStackTrace();
}
log.info("== MyMqttClient ==> Subscription to topic successful:topic = {}, qos = {}", topic, qos);
}
/**
* Unsubscribe from topic
*
* @param topic Topic name
*/
public void cleanTopic(String topic) {
if (client != null && client.isConnected()) {
try {
client.unsubscribe(topic);
} catch (MqttException e) {
e.printStackTrace();
}
} else {
log.error("== MyMqttClient ==> Unsubscription failed!");
}
log.info("== MyMqttClient ==> Unsubscribed topic successfully:topic = {}", topic);
}
}
Description:
- MqttClient: Call the client synchronously and communicate using blocking methods.
- MqttClientPersistence: Represents a persistent data store used to store outbound and inbound information during transmission so that it can be delivered to the specified QoS.
- MqttConnectOptions: Connection options, used to specify connection parameters. Some common methods are listed below.
- setUserName: Set user name
- setPassword: set password
- setCleanSession: Set whether to clear the session
- setKeepAliveInterval: Set heartbeat interval
- setConnectionTimeout: Set the connection timeout
- setAutomaticReconnect: Set whether to automatically reconnect
4. MqttClient callback class
Create a MqttClient callback class MyMqttCallback.
@Slf4j
public class MyMqttCallback implements MqttCallbackExtended {
//Manual injection
private MqttConfig mqttConfig = SpringUtils.getBean(MqttConfig.class);
private MyMqttClient myMqttClient;
public MyMqttCallback(MyMqttClient myMqttClient) {
this.myMqttClient = myMqttClient;
}
/**
* MQTT BrokerMethod called when the connection is successful。In this method you can execute Subscribe to topics agreed by the system(Recommended Use)。
* if MQTT BrokerWhen disconnected and then reconnected successfully,The topic also needs to be subscribed again,It is more reasonable to place the resubscription topic in the callback method after the connection is successful.。
*
* @param reconnect
* @param serverURI MQTT Brokerofurl
*/
@Override
public void connectComplete(boolean reconnect, String serverURI) {
String connectMode = reconnect ? "Reconnection" : "direct connection";
log.info("== MyMqttCallback ==> MQTT connection succeeded,Connection method:{},serverURI:{}", connectMode, serverURI);
//Subscribe to topics
myMqttClient.subscribe(mqttConfig.topic1, 1);
myMqttClient.subscribe(mqttConfig.topic2, 1);
myMqttClient.subscribe(mqttConfig.topic3, 1);
List<String> topicList = new ArrayList<>();
topicList.add(mqttConfig.topic1);
topicList.add(mqttConfig.topic2);
topicList.add(mqttConfig.topic3);
log.info("== MyMqttCallback ==> Connection method:{},Subscription to topic successful,topic:{}", connectMode, topicList);
}
/**
* lost connection,You can reconnect here
* Will only be called once
*
* @param throwable
*/
@Override
public void connectionLost(Throwable throwable) {
log.error("== MyMqttCallback ==> connectionLost Disconnect,5SThen try to reconnect: {}", throwable.getMessage());
long reconnectTimes = 1;
while (true) {
try {
if (MyMqttClient.getClient().isConnected()) {
//Determine that the reconnection has been successful Need to resubscribe to topic Can be found hereifSubscribe to topics inside or connectComplete(inside the method) It's up to you to choose
log.warn("== MyMqttCallback ==> mqtt reconnect success end reconnect Resubscription successful");
return;
}
reconnectTimes += 1;
log.warn("== MyMqttCallback ==> mqtt reconnect times = {} try again... mqttreconnect time {}", reconnectTimes, reconnectTimes);
MyMqttClient.getClient().reconnect();
} catch (MqttException e) {
log.error("== MyMqttCallback ==> mqttDisconnect exception", e);
}
try {
Thread.sleep(5000);
} catch (InterruptedException e1) {
}
}
}
/**
* message received(subscribeSubscribed topic messages)method called when
*
* @param topic
* @param mqttMessage
* @throws Exception The message obtained later will be executed here
*/
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
log.info("== MyMqttCallback ==> messageArrived Receive message topic: {},Receive message content: {}", topic, new String(mqttMessage.getPayload()));
/**
* Process business separately according to subscribed topics。able to passif-elseOr strategy mode to handle different topic messages separately。
*/
//topic1theme
if (topic.equals("ABC")) {
Map maps = (Map) JSON.parse(new String(mqttMessage.getPayload(), StandardCharsets.UTF_8));
//TODO business processing
//doSomething1(maps);
log.info("== MyMqttCallback ==> messageArrived Receive message topic: {},{}Business processing message content completed", topic, "TodoService1");
}
//topic2theme
if (topic.equals("A/b/1qaz")) {
Map maps = (Map) JSON.parse(new String(mqttMessage.getPayload(), StandardCharsets.UTF_8));
//TODO business processing
//doSomething2(maps);
log.info("== MyMqttCallback ==> messageArrived Receive message topic: {},{}Business processing message content completed", topic, "TodoService2");
}
}
/**
* message sending(publish)Method to be called on completion
*
* @param iMqttDeliveryToken
*/
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
log.info("== MyMqttCallback ==> deliveryComplete Message sent completed,Complete= {}", iMqttDeliveryToken.isComplete());
}
}
MqttCallback class method description:
- connectionLost(Throwable cause): Called when the connection is lost
- messageArrived(String topic, MqttMessage message): Called when a message is received
- deliveryComplete(IMqttDeliveryToken token): Called when message sending is completed
MqttCallbackExtended class method description: This class inherits the MqttCallback class
- connectComplete(boolean reconnect, String serverURI): Called when the connection is lost
4.1 SpringUtils tool class
@Component
public class SpringUtils implements BeanFactoryPostProcessor, ApplicationContextAware {
/**
* Springapplication context
*/
private static ConfigurableListableBeanFactory beanFactory;
private static ApplicationContext applicationContext;
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
SpringUtils.beanFactory = beanFactory;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
SpringUtils.applicationContext = applicationContext;
}
/**
* Get object
*
* @param name
* @return Object one registered in the name givenbeanExample of
* @throws org.springframework.beans.BeansException
*/
@SuppressWarnings("unchecked")
public static <T> T getBean(String name) throws BeansException {
return (T) beanFactory.getBean(name);
}
/**
* The get type isrequiredTypeObject
*
* @param clz
* @return
* @throws org.springframework.beans.BeansException
*/
public static <T> T getBean(Class<T> clz) throws BeansException {
T result = (T) beanFactory.getBean(clz);
return result;
}
/**
* ifBeanFactoryContains a match for the given namebeandefinition,then returntrue
*
* @param name
* @return boolean
*/
public static boolean containsBean(String name) {
return beanFactory.containsBean(name);
}
/**
* Determine if the registration is under a given namebeandefinition is asingletonStill oneprototype。 If corresponding to the given namebeanDefinition not found,will throw an exception(NoSuchBeanDefinitionException)
*
* @param name
* @return boolean
* @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
*/
public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException {
return beanFactory.isSingleton(name);
}
/**
* @param name
* @return Class Type of registered object
* @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
*/
public static Class<?> getType(String name) throws NoSuchBeanDefinitionException {
return beanFactory.getType(name);
}
/**
* If givenbeanname inbeanThere is an alias in the definition,then return these aliases
*
* @param name
* @return
* @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
*/
public static String[] getAliases(String name) throws NoSuchBeanDefinitionException {
return beanFactory.getAliases(name);
}
/**
* Obtainaopproxy object
*
* @param invoker
* @return
*/
@SuppressWarnings("unchecked")
public static <T> T getAopProxy(T invoker) {
return (T) AopContext.currentProxy();
}
/**
* Get the current environment configuration,No configuration returnednull
*
* @return Current environment configuration
*/
public static String[] getActiveProfiles() {
return applicationContext.getEnvironment().getActiveProfiles();
}
/**
* Get the current environment configuration,When there are multiple environment configurations,Get only the first one
*
* @return Current environment configuration
*/
public static String getActiveProfile() {
final String[] activeProfiles = getActiveProfiles();
if (activeProfiles == null) {
return null;
}
return activeProfiles[0];
}
}
At this point, Springboot can integrate and operate MQTT Broker through MqttClient.
2. Operation MQTT
We create a MqttService class in the service layer, and the business uses the MqttService class to uniformly operate MqttClient.
1. Customize the message carrier class
Create a MyXxxMqttMsg class here to agree on the carrier class format for sending messages.
@Data
public class MyXxxMqttMsg implements Serializable {
private static final long serialVersionUID = -8303548938481407659L;
/**
* MD5value:MD5_lower(content + timestamp)
*/
private String md5;
/**
* Message content
*/
private String content = "";
/**
* Timestamp
*/
private Long timestamp;
}
2. MqttService class
1) Interface:
public interface MqttService {
/**
* Add subscription topic
*
* @param topic Topic name
*/
void addTopic(String topic);
/**
* Unsubscribe from topic
*
* @param topic Topic name
*/
void removeTopic(String topic);
/**
* Publish topic message content
*
* @param msgContent
* @param topic
*/
void publish(String msgContent, String topic);
}
2) Implementation class:
@Service
public class MqttServiceImpl implements MqttService {
@Autowired
private MyMqttClient myMqttClient;
@Override
public void addTopic(String topic) {
myMqttClient.subscribe(topic);
}
@Override
public void removeTopic(String topic) {
myMqttClient.cleanTopic(topic);
}
@Override
public void publish(String msgContent, String topic) {
//MyXxxMqttMsg changeJson
MyXxxMqttMsg myXxxMqttMsg = new MyXxxMqttMsg();
myXxxMqttMsg.setContent(msgContent);
myXxxMqttMsg.setTimestamp(System.currentTimeMillis());
// TODO Md5value
myXxxMqttMsg.setMd5(UUID.randomUUID().toString());
String msgJson = JSON.toJSONString(myXxxMqttMsg);
//make an announcement
myMqttClient.publish(msgJson, topic);
}
3. Controller class
Create a MyMqttController class to operate MQTT.
@RestController
@RequestMapping("/mqtt")
@Api(value = "MyMqttController", tags = {"MQTTRelated operation interfaces"})
public class MyMqttController {
@Autowired
private MqttService mqttService;
@GetMapping("/addTopic")
@ApiOperation(value = "Add subscription topic interface")
public void addTopic(String topic) {
mqttService.addTopic(topic);
}
@GetMapping("/removeTopic")
@ApiOperation(value = "Unsubscribe topic interface")
public void removeTopic(String topic) {
mqttService.removeTopic(topic);
}
@PostMapping("/removeTopic")
@ApiOperation(value = "Publish topic message content interface")
public void removeTopic(String msgContent, String topic) {
mqttService.publish(msgContent, topic);
}
}
Subscription and cancellation of topic operations: MQTTX publishes a topic message.
>