springBoot combines paho.client.mqtt to implement monitoring
Monitor and obtain the data uploaded by the device to the mqtt server
The IoT device sends data to a certain topic on the mqtt server. It is necessary to monitor the topic to obtain the data of the IoT device.
1.Introduce dependencies
Set various mqtt connection parameters and inject the client into the container
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
//Defined into the corresponding spring container
@Configuration
public class MqttConfig {
private static final String MQTT_BROKER_URI = "tcp://127.0.0.1:8181";
private static final String CLIENT_ID = "Subscribe to corresponding information" + System.currentTimeMillis();
//@bean The return value of the method Inject into the corresponding container,accomplishIMqttClientContainer management
@Bean
public IMqttClient mqttClient() throws Exception {
IMqttClient client = new MqttClient(MQTT_BROKER_URI, CLIENT_ID);
MqttConnectOptions options = new MqttConnectOptions();
options.setAutomaticReconnect(true);
options.setCleanSession(false);
options.setKeepAliveInterval(320); //correspond320Heartbeat per second
options.setConnectionTimeout(10);
client.connect(options);
return client;
}
}
Inherit listener and implement the corresponding listening class
package com.example.nxserver.mqttService;
import com.alibaba.fastjson2.JSONObject;
import com.example.nxserver.entity.SmStaticTb;
import com.example.nxserver.service.SmStaticTbService;
import jakarta.annotation.Resource;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Component;
@Component
public class MqttMessageListener implements IMqttMessageListener {
@Resource
SmStaticTbService smStaticTbService;
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// This handles the logic when the message arrives
System.out.println("Message arrived. Topic: " + topic + " Message: " + new String(message.getPayload()));
String messageStr = `Insert code snippet here`new String(message.getPayload());
JSONObject jsonObject = JSONObject.parseObject(messageStr);
// create DateTimeFormatter Example,and define the conversion format
System.out.println("newjsonObjectfor" + jsonObject);
if (jsonObject.containsKey("data")) {
smStaticTbService.receiveData(jsonObject, messageStr);
} else if (jsonObject.containsKey("Alarm")) {
smStaticTbService.receiveAlarm(jsonObject, messageStr);
}
}
}
Subscribe to a topic to monitor and set it to start automatically at boot.
@Component
public class MqttSubscriber {
@Resource
private IMqttClient mqttClient;
@Resource
MqttMessageListener mqttMessageListener;
//Start automatically when the container is initialized
@PostConstruct
public void subscribeToTopic() {
try {
mqttClient.subscribe(“acrel/adw300/+”, 1, mqttMessageListener);
System.out.println(“Subscription successful”);
} catch (Exception e) {
e.printStackTrace();
}
}
}