SpringBoot integrates MQTT (MqttClient)

1. SpringBoot integrates MQTT

Create a project and introduce MQTT dependencies:

 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.3.12.RELEASE</version>
        </dependency>

        <!-- spring-integration-mqttrely -->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
            <version>6.1.2</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.28</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.75</version>
        </dependency> 

1. yml configuration file

In the application.yml file, define the MQTT connection information.

## MQTT Basic connection parameters ##
mqtt:
  host: tcp://192.168.xxx.xxx:1883
  #  host: tcp://broker.emqx.io:1883
  userName: admin
  passWord: xxxxxx
  qos: 1
  clientId: ClientId_local #ClientId_localMust be unique。
  timeout: 10 # overtime time
  keepalive: 30 # Keep connected time
  clearSession: true   # clear session(Set asfalse,Disconnect,Use the original session after reconnecting Keep subscribed topics,Can receive messages while offline)
  topic1: A/b/#  # Wildcard topics can only be used for subscriptions,Not available for publishing。+:Represents a single level wildcard,#:Represents multi-level wildcards
  topic2: A/abc
  topic3: ABC 

2. MQTT configuration class

Create a MqttConfig configuration class and obtain the MQTT connection parameters of the configuration file. Create MyMqttClient class and inject Spring.

@Slf4j
@Configuration
public class MqttConfig {

    @Value("{mqtt.host}")
    public String host;
    @Value("{mqtt.username}")
    public String username;
    @Value("{mqtt.password}")
    public String password;
    @Value("{mqtt.clientId}")
    public String clientId;
    @Value("{mqtt.timeout}")
    public int timeOut;
    @Value("{mqtt.keepalive}")
    public int keepAlive;

    @Value("{mqtt.clearSession}")
    public boolean clearSession;
    @Value("{mqtt.topic1}")
    public String topic1;
    @Value("{mqtt.topic2}")
    public String topic2;
    @Value("{mqtt.topic3}")
    public String topic3;

    @Bean//injectionSpring
    public MyMqttClient myMqttClient() {
        MyMqttClient myMqttClient = new MyMqttClient(host, username, password, clientId, timeOut, keepAlive, clearSession);
        for (int i = 0; i < 10; i++) {
            try {
                myMqttClient.connect();
                // You can subscribe to topics here,Recommended to put MqttCallbackExtended.connectCompletein method
                //myMqttClient.subscribe("ABC", 1);
                return myMqttClient;
            } catch (MqttException e) {
                log.error("== MqttConfig ==> MQTT connect exception, connect time = {}", i);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }
        }
        return myMqttClient;
    }

} 

3. MQTT client encapsulation class

Create the MQTT client encapsulation class MyMqttClient. Perform operations on MQTT Broker.

@Slf4j
public class MyMqttClient {

    /**
     * MQTT Broker Basic connection parameters,username、Password is an optional parameter
     */
    private String host;
    private String username;
    private String password;
    private String clientId;
    private int timeout;
    private int keepalive;
    private boolean clearSession;

    /**
     * MQTT client
     */
    private static MqttClient client;

    public static MqttClient getClient() {
        return client;
    }

    public static void setClient(MqttClient client) {
        MyMqttClient.client = client;
    }

    public MyMqttClient(String host, String username, String password, String clientId, int timeOut, int keepAlive, boolean clearSession) {
        this.host = host;
        this.username = username;
        this.password = password;
        this.clientId = clientId;
        this.timeout = timeOut;
        this.keepalive = keepAlive;
        this.clearSession = clearSession;
    }

    /**
     * set up MQTT Broker Basic connection parameters
     *
     * @param username
     * @param password
     * @param timeout
     * @param keepalive
     * @return
     */
    public MqttConnectOptions setMqttConnectOptions(String username, String password, int timeout, int keepalive) {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        options.setConnectionTimeout(timeout);
        options.setKeepAliveInterval(keepalive);
        options.setCleanSession(clearSession);
        options.setAutomaticReconnect(true);
        return options;
    }

    /**
     * connect MQTT Broker,get MqttClientconnection object
     */
    public void connect() throws MqttException {
        if (client == null) {
            client = new MqttClient(host, clientId, new MemoryPersistence());
            // Set callback
            client.setCallback(new MyMqttCallback(MyMqttClient.this));
        }
        // Connection parameters
        MqttConnectOptions mqttConnectOptions = setMqttConnectOptions(username, password, timeout, keepalive);
        if (!client.isConnected()) {
            client.connect(mqttConnectOptions);
        } else {
            client.disconnect();
            client.connect(mqttConnectOptions);
        }
        log.info("== MyMqttClient ==> MQTT connect success");//No exception occurred,The connection is successful
    }

    /**
     * release,defaultqosfor0,Non-endurance
     *
     * @param pushMessage
     * @param topic
     */
    public void publish(String pushMessage, String topic) {
        publish(pushMessage, topic, 0, false);
    }

    /**
     * make an announcement
     *
     * @param pushMessage
     * @param topic
     * @param qos
     * @param retained:retain
     */
    public void publish(String pushMessage, String topic, int qos, boolean retained) {
        MqttMessage message = new MqttMessage();
        message.setPayload(pushMessage.getBytes());
        message.setQos(qos);
        message.setRetained(retained);
        MqttTopic mqttTopic = MyMqttClient.getClient().getTopic(topic);
        if (null == mqttTopic) {
            log.error("== MyMqttClient ==> topic is not exist");
        }
        MqttDeliveryToken token;//Delivery:delivery
        synchronized (this) {//Notice:This must be synchronized,otherwise,in multithreadingpublishin the case of,Threads will deadlock,For analysis, please see the supplement at the end of the article.
            try {
                token = mqttTopic.publish(message);//It is also sent to the execution queue,Wait for execution thread to execute,Send messages to message middleware
                token.waitForCompletion(1000L);
            } catch (MqttPersistenceException e) {
                e.printStackTrace();
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * Subscribe to a topic,qosThe default is0
     *
     * @param topic
     */
    public void subscribe(String topic) {
        subscribe(topic, 0);
    }

    /**
     * Subscribe to a topic
     *
     * @param topic
     * @param qos
     */
    public void subscribe(String topic, int qos) {
        try {
            MyMqttClient.getClient().subscribe(topic, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
        log.info("== MyMqttClient ==> Subscription to topic successful:topic = {}, qos = {}", topic, qos);
    }

    /**
     * Unsubscribe from topic
     *
     * @param topic Topic name
     */
    public void cleanTopic(String topic) {
        if (client != null && client.isConnected()) {
            try {
                client.unsubscribe(topic);
            } catch (MqttException e) {
                e.printStackTrace();
            }
        } else {
            log.error("== MyMqttClient ==> Unsubscription failed!");
        }
        log.info("== MyMqttClient ==> Unsubscribed topic successfully:topic = {}", topic);
    }

} 

Description:

  • MqttClient: Call the client synchronously and communicate using blocking methods.
  • MqttClientPersistence: Represents a persistent data store used to store outbound and inbound information during transmission so that it can be delivered to the specified QoS.
  • MqttConnectOptions: Connection options, used to specify connection parameters. Some common methods are listed below.
  • setUserName: Set user name
  • setPassword: set password
  • setCleanSession: Set whether to clear the session
  • setKeepAliveInterval: Set heartbeat interval
  • setConnectionTimeout: Set the connection timeout
  • setAutomaticReconnect: Set whether to automatically reconnect

4. MqttClient callback class

Create a MqttClient callback class MyMqttCallback.

@Slf4j
public class MyMqttCallback implements MqttCallbackExtended {

    //Manual injection
    private MqttConfig mqttConfig = SpringUtils.getBean(MqttConfig.class);

    private MyMqttClient myMqttClient;

    public MyMqttCallback(MyMqttClient myMqttClient) {
        this.myMqttClient = myMqttClient;
    }

    /**
     * MQTT BrokerMethod called when the connection is successful。In this method you can execute Subscribe to topics agreed by the system(Recommended Use)。
     * if MQTT BrokerWhen disconnected and then reconnected successfully,The topic also needs to be subscribed again,It is more reasonable to place the resubscription topic in the callback method after the connection is successful.。
     *
     * @param reconnect
     * @param serverURI MQTT Brokerofurl
     */
    @Override
    public void connectComplete(boolean reconnect, String serverURI) {
        String connectMode = reconnect ? "Reconnection" : "direct connection";
        log.info("== MyMqttCallback ==> MQTT connection succeeded,Connection method:{},serverURI:{}", connectMode, serverURI);
        //Subscribe to topics
        myMqttClient.subscribe(mqttConfig.topic1, 1);
        myMqttClient.subscribe(mqttConfig.topic2, 1);
        myMqttClient.subscribe(mqttConfig.topic3, 1);

        List<String> topicList = new ArrayList<>();
        topicList.add(mqttConfig.topic1);
        topicList.add(mqttConfig.topic2);
        topicList.add(mqttConfig.topic3);
        log.info("== MyMqttCallback ==> Connection method:{},Subscription to topic successful,topic:{}", connectMode, topicList);
    }

    /**
     * lost connection,You can reconnect here
     * Will only be called once
     *
     * @param throwable
     */
    @Override
    public void connectionLost(Throwable throwable) {
        log.error("== MyMqttCallback ==> connectionLost Disconnect,5SThen try to reconnect: {}", throwable.getMessage());
        long reconnectTimes = 1;
        while (true) {
            try {
                if (MyMqttClient.getClient().isConnected()) {
                    //Determine that the reconnection has been successful  Need to resubscribe to topic Can be found hereifSubscribe to topics inside  or connectComplete(inside the method)  It's up to you to choose
                    log.warn("== MyMqttCallback ==> mqtt reconnect success end  reconnect  Resubscription successful");
                    return;
                }
                reconnectTimes += 1;
                log.warn("== MyMqttCallback ==> mqtt reconnect times = {} try again...  mqttreconnect time {}", reconnectTimes, reconnectTimes);
                MyMqttClient.getClient().reconnect();
            } catch (MqttException e) {
                log.error("== MyMqttCallback ==> mqttDisconnect exception", e);
            }
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e1) {
            }
        }
    }

    /**
     * message received(subscribeSubscribed topic messages)method called when
     *
     * @param topic
     * @param mqttMessage
     * @throws Exception The message obtained later will be executed here
     */
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        log.info("== MyMqttCallback ==> messageArrived Receive message topic: {},Receive message content: {}", topic, new String(mqttMessage.getPayload()));
        /**
         * Process business separately according to subscribed topics。able to passif-elseOr strategy mode to handle different topic messages separately。
         */
        //topic1theme
        if (topic.equals("ABC")) {
            Map maps = (Map) JSON.parse(new String(mqttMessage.getPayload(), StandardCharsets.UTF_8));
            //TODO business processing
            //doSomething1(maps);
            log.info("== MyMqttCallback ==> messageArrived Receive message topic: {},{}Business processing message content completed", topic, "TodoService1");
        }
        //topic2theme
        if (topic.equals("A/b/1qaz")) {
            Map maps = (Map) JSON.parse(new String(mqttMessage.getPayload(), StandardCharsets.UTF_8));
            //TODO business processing
            //doSomething2(maps);
            log.info("== MyMqttCallback ==> messageArrived Receive message topic: {},{}Business processing message content completed", topic, "TodoService2");
        }
    }

    /**
     * message sending(publish)Method to be called on completion
     *
     * @param iMqttDeliveryToken
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        log.info("== MyMqttCallback ==> deliveryComplete Message sent completed,Complete= {}", iMqttDeliveryToken.isComplete());
    }

} 

MqttCallback class method description:

  • connectionLost(Throwable cause): Called when the connection is lost
  • messageArrived(String topic, MqttMessage message): Called when a message is received
  • deliveryComplete(IMqttDeliveryToken token): Called when message sending is completed

MqttCallbackExtended class method description: This class inherits the MqttCallback class

  • connectComplete(boolean reconnect, String serverURI): Called when the connection is lost

4.1 SpringUtils tool class

@Component
public class SpringUtils implements BeanFactoryPostProcessor, ApplicationContextAware {
    /**
     * Springapplication context
     */
    private static ConfigurableListableBeanFactory beanFactory;

    private static ApplicationContext applicationContext;

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
        SpringUtils.beanFactory = beanFactory;
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        SpringUtils.applicationContext = applicationContext;
    }

    /**
     * Get object
     *
     * @param name
     * @return Object one registered in the name givenbeanExample of
     * @throws org.springframework.beans.BeansException
     */
    @SuppressWarnings("unchecked")
    public static <T> T getBean(String name) throws BeansException {
        return (T) beanFactory.getBean(name);
    }

    /**
     * The get type isrequiredTypeObject
     *
     * @param clz
     * @return
     * @throws org.springframework.beans.BeansException
     */
    public static <T> T getBean(Class<T> clz) throws BeansException {
        T result = (T) beanFactory.getBean(clz);
        return result;
    }

    /**
     * ifBeanFactoryContains a match for the given namebeandefinition,then returntrue
     *
     * @param name
     * @return boolean
     */
    public static boolean containsBean(String name) {
        return beanFactory.containsBean(name);
    }

    /**
     * Determine if the registration is under a given namebeandefinition is asingletonStill oneprototype。 If corresponding to the given namebeanDefinition not found,will throw an exception(NoSuchBeanDefinitionException)
     *
     * @param name
     * @return boolean
     * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
     */
    public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException {
        return beanFactory.isSingleton(name);
    }

    /**
     * @param name
     * @return Class Type of registered object
     * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
     */
    public static Class<?> getType(String name) throws NoSuchBeanDefinitionException {
        return beanFactory.getType(name);
    }

    /**
     * If givenbeanname inbeanThere is an alias in the definition,then return these aliases
     *
     * @param name
     * @return
     * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
     */
    public static String[] getAliases(String name) throws NoSuchBeanDefinitionException {
        return beanFactory.getAliases(name);
    }

    /**
     * Obtainaopproxy object
     *
     * @param invoker
     * @return
     */
    @SuppressWarnings("unchecked")
    public static <T> T getAopProxy(T invoker) {
        return (T) AopContext.currentProxy();
    }

    /**
     * Get the current environment configuration,No configuration returnednull
     *
     * @return Current environment configuration
     */
    public static String[] getActiveProfiles() {
        return applicationContext.getEnvironment().getActiveProfiles();
    }

    /**
     * Get the current environment configuration,When there are multiple environment configurations,Get only the first one
     *
     * @return Current environment configuration
     */
    public static String getActiveProfile() {
        final String[] activeProfiles = getActiveProfiles();
        if (activeProfiles == null) {
            return null;
        }
        return activeProfiles[0];
    }

} 

At this point, Springboot can integrate and operate MQTT Broker through MqttClient.

2. Operation MQTT

We create a MqttService class in the service layer, and the business uses the MqttService class to uniformly operate MqttClient.

1. Customize the message carrier class

Create a MyXxxMqttMsg class here to agree on the carrier class format for sending messages.

@Data
public class MyXxxMqttMsg implements Serializable {

    private static final long serialVersionUID = -8303548938481407659L;

    /**
     * MD5value:MD5_lower(content + timestamp)
     */
    private String md5;

    /**
     * Message content
     */
    private String content = "";

    /**
     * Timestamp
     */
    private Long timestamp;

} 

2. MqttService class

1) Interface:

public interface MqttService {

    /**
     * Add subscription topic
     *
     * @param topic Topic name
     */
    void addTopic(String topic);

    /**
     * Unsubscribe from topic
     *
     * @param topic Topic name
     */
    void removeTopic(String topic);

    /**
     * Publish topic message content
     *
     * @param msgContent
     * @param topic
     */
    void publish(String msgContent, String topic);

} 

2) Implementation class:

@Service
public class MqttServiceImpl implements MqttService {

    @Autowired
    private MyMqttClient myMqttClient;

    @Override
    public void addTopic(String topic) {
        myMqttClient.subscribe(topic);
    }

    @Override
    public void removeTopic(String topic) {
        myMqttClient.cleanTopic(topic);
    }

    @Override
    public void publish(String msgContent, String topic) {
        //MyXxxMqttMsg changeJson
        MyXxxMqttMsg myXxxMqttMsg = new MyXxxMqttMsg();
        myXxxMqttMsg.setContent(msgContent);
        myXxxMqttMsg.setTimestamp(System.currentTimeMillis());
        // TODO Md5value
        myXxxMqttMsg.setMd5(UUID.randomUUID().toString());
        String msgJson = JSON.toJSONString(myXxxMqttMsg);

        //make an announcement
        myMqttClient.publish(msgJson, topic);
    } 

3. Controller class

Create a MyMqttController class to operate MQTT.

@RestController
@RequestMapping("/mqtt")
@Api(value = "MyMqttController", tags = {"MQTTRelated operation interfaces"})
public class MyMqttController {
    @Autowired
    private MqttService mqttService;

    @GetMapping("/addTopic")
    @ApiOperation(value = "Add subscription topic interface")
    public void addTopic(String topic) {
        mqttService.addTopic(topic);
    }

    @GetMapping("/removeTopic")
    @ApiOperation(value = "Unsubscribe topic interface")
    public void removeTopic(String topic) {
        mqttService.removeTopic(topic);
    }

    @PostMapping("/removeTopic")
    @ApiOperation(value = "Publish topic message content interface")
    public void removeTopic(String msgContent, String topic) {
        mqttService.publish(msgContent, topic);
    }

} 

Subscription and cancellation of topic operations: MQTTX publishes a topic message.

>