springBoot combines paho.client.mqtt to implement monitoring

Monitor and obtain the data uploaded by the device to the mqtt server

The IoT device sends data to a certain topic on the mqtt server. It is necessary to monitor the topic to obtain the data of the IoT device.

1.Introduce dependencies

Set various mqtt connection parameters and inject the client into the container

import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

//Defined into the corresponding spring container
@Configuration
public class MqttConfig {

private static final String MQTT_BROKER_URI = "tcp://127.0.0.1:8181";
private static final String CLIENT_ID = "Subscribe to corresponding information" + System.currentTimeMillis();

//@bean  The return value of the method Inject into the corresponding container,accomplishIMqttClientContainer management
@Bean
public IMqttClient mqttClient() throws Exception {
    IMqttClient client = new MqttClient(MQTT_BROKER_URI, CLIENT_ID);
    MqttConnectOptions options = new MqttConnectOptions();
    options.setAutomaticReconnect(true);
    options.setCleanSession(false);
    options.setKeepAliveInterval(320); //correspond320Heartbeat per second
    options.setConnectionTimeout(10);
    client.connect(options);
    return client;
} 

}

Inherit listener and implement the corresponding listening class

package com.example.nxserver.mqttService;

import com.alibaba.fastjson2.JSONObject;
import com.example.nxserver.entity.SmStaticTb;
import com.example.nxserver.service.SmStaticTbService;
import jakarta.annotation.Resource;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Component;

@Component
public class MqttMessageListener implements IMqttMessageListener {
@Resource
SmStaticTbService smStaticTbService;

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
    // This handles the logic when the message arrives
    System.out.println("Message arrived. Topic: " + topic + " Message: " + new String(message.getPayload()));
    String messageStr = `Insert code snippet here`new String(message.getPayload());
    JSONObject jsonObject = JSONObject.parseObject(messageStr);
    // create DateTimeFormatter Example,and define the conversion format
    System.out.println("newjsonObjectfor" + jsonObject);
    if (jsonObject.containsKey("data")) {
        smStaticTbService.receiveData(jsonObject, messageStr);
    } else if (jsonObject.containsKey("Alarm")) {
        smStaticTbService.receiveAlarm(jsonObject, messageStr);
    }

} 

}

Subscribe to a topic to monitor and set it to start automatically at boot.

@Component
public class MqttSubscriber {

@Resource
private IMqttClient mqttClient;
@Resource
MqttMessageListener mqttMessageListener; 

//Start automatically when the container is initialized
@PostConstruct
public void subscribeToTopic() {
try {
mqttClient.subscribe(“acrel/adw300/+”, 1, mqttMessageListener);
System.out.println(“Subscription successful”);
} catch (Exception e) {
e.printStackTrace();
}
}
}