Part 6 - 物联网云平台消息机制

2022-03-31
10分钟阅读时长

【版本】

当前版本号v20230208

版本修改说明
v20230208换掉Mosquitto 和 MQTT.fx,换为 EMQX 和 MQTTX
v20220420补充DateUtils
v20220408初始化

【实验名称】6.1 安装 EMQX 和 MQTTX

【实验目的】

  • 掌握 EMQ 安装、配置和使用
  • 掌握 MQTT 客户端 MQTTX 的安装、配置和使用

【实验环境】

  • EMQX
  • MQTTX

【实验说明】

  1. EMQX 是基于高并发的Erlang/OTP语言平台设计,支持百万级连接和分布式集群,发布订阅模式的开源MQTT消息服务器。完整支持MQTT V3.1/V3.1.1协议规范,扩展支持WebSocket、Stomp、CoAP、MQTT-SN或私有TCP协议。官网地址

  2. MQTTX 是EMQ 开源的一款优雅的跨平台MQTT 5.0 桌面客户端工具,它能运行在macOS, Linux, Windows 上,并且支持MQTT 消息格式转换。官网地址

【实验资源】

链接:https://pan.baidu.com/s/1bleh6m4YFCKaAKA8QHEf8A 
提取码:f6hc

【实验效果】

  1. MQTTX 向订阅主题发布消息,并测试是否能够收到订阅的消息。

【实验步骤】

EMQX 安装

  1. 下载emqx-5.0.14-windows-amd64.tar.gz,解压到安装目录下,例如d:\emqx

  2. 进入安装目录的bin目录下,新建一个文本文件,并命名为0-start-emqx.bat,作为 EMQX 的启动脚本。

cd d:
cd %~dp0
emqx start
  1. 同样在bin目录下,新建一个文本文件,并命名为0-stop-emqx.bat,作为 EMQX 的启动脚本。
cd d:
cd %~dp0
emqx stop
  1. 双击脚本0-start-emqx.bat启动EMQX。MQTT 服务器默认监听在tcp://localhost:1883

  2. 访问 EMQX 的管理控制台(EMQX Dashboard)

  3. 输入默认用户名 admin 与默认密码 public,登录进入 Dashboard。

MQTTX 安装

  1. 下载并安装MQTTX-Setup-1.7.3-x64.exe,启动MQTTX.exe

  2. 启动 MQTTX,新建连接。

  3. 输入 MQTT 代理地址、用户名和密码进行连接。

用户名:iotweb
密码:iotweb
Host: mqtt://localhost

  1. 观察 EMQX 服务器http://localhost:18083/#/connections是否有连接来自于 MQTTX 客户端

  2. 使用MQTTX,订阅(subscribe)/iot/cloud/lock01/receive主题。

  3. 使用MQTTX,向/iot/cloud/lock01/receive主题发布(publish)以下消息内容。

hello

  1. 查看订阅主题收到的消息。

【实验名称】6.2 iot-lock 项目配置 MQTT 客户端

【实验目的】

  • 掌握使用 SpringBoot 整合 MQTT 的配置

【实验环境】

  • IDEA
  • Maven 3.6
  • MariaDB 10.4
  • JDK 8
  • EMQX
  • MQTTX

【实验说明】

  1. 本实验承接实验4.3,在此基础上给智能锁设备端新增 MQTT 配置,实现 MQTT 通讯功能。
  2. 本实验使用 MQTTX 模拟用户终端,向模拟的物联网云平台的 MQTT 服务器(即 EMQX)发送消息,检查智能锁设备端能否收到订阅的 MQTT 消息。

【实验效果】

  1. 使用 MQTTX 模拟用户终端想主题发送消息,智能锁设备端能够收到订阅的消息。

【实验步骤】

pom.xml

  1. pom.xml 新增以下依赖配置,引入 MQTT 依赖包。
    <!-- MQTT start -->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-integration</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.integration</groupId>
      <artifactId>spring-integration-stream</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.integration</groupId>
      <artifactId>spring-integration-mqtt</artifactId>
    </dependency>
    <!-- MQTT end -->

application.yml

  1. application.yml 配置文件新增以下配置,连接 MQTT 服务器,并实现配置订阅主题和推送主题。在此配置文件中,我们设置了让智能锁设备订阅/iot/cloud/devices/receive主题。
mqtt:
  host: tcp://localhost:1883
  username: iotweb
  password: iotweb
  qos: 1
  client-id: lock001
  topic:
    post: /iot/cloud/devices/post
    post_reply: /iot/cloud/devices/post_reply
    receive: /iot/cloud/devices/receive
    receive_reply: /iot/cloud/devices/receive_reply

MqttConfig.java

  1. 新增一个MQTT 的配置类MqttConfig
package iot.cloud.platform.lock.config;

import iot.cloud.platform.lock.utils.ExcptUtil;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Configuration
@Slf4j
@Data
public class MqttConfig {

    @Value("${mqtt.host}")
    private String host;

    @Value("${mqtt.username}")
    private String username;

    @Value("${mqtt.password}")
    private String password;

    @Value("${mqtt.client-id}")
    private String clientId;
    @Value("${mqtt.topic.post}")
    private String topic_post;
    @Value("${mqtt.topic.post_reply}")
    private String topic_post_reply;
    @Value("${mqtt.topic.receive}")
    private String topic_receive;
    @Value("${mqtt.topic.receive_reply}")
    private String topic_receive_reply;
    /**
     *  qos
     *  MQTT协议中有三种消息发布服务质量:
     *  QOS 0: “至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
     *  QOS 1: “至少一次”,确保消息到达,但消息重复可能会发生。
     *  QOS2: “只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果,资源开销大
     *
     */
    @Value("${mqtt.qos}")
    private int qos;
    @Bean
    public MqttClient getMqttClient(){
        MqttClient client=null;
        try {
            final String id= clientId;
            client = new MqttClient(host, id, new MemoryPersistence());
            log.info("创建mqtt客户端ID:"+id);
        } catch (Exception e) {
            log.error("创建mqtt客户端异常:"+ ExcptUtil.filterStack(e));
        }
        return client;
    }

    @Bean
    public MqttConnectOptions getOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        // 是否清除session
        options.setCleanSession(false);
        return options;
    }

    /**
     * 初始化已订阅主题
     * @return
     */
    @Bean("mqttSubTopics")
    public Map<String,Integer> getMqttSubTopics(){
        Map<String,Integer> mqttSubTopics=new ConcurrentHashMap<>();
        mqttSubTopics.put(topic_post_reply,qos);
        mqttSubTopics.put(topic_receive,qos);
        return mqttSubTopics;
    }

    /**
     * 初始化已订阅主题对应的回复主题
     * @return
     */
    @Bean("mqttTopicReplyMap")
    public Map<String,String> getMqttTopicReplyMap(){
        Map<String,String> topicReplyMap=new ConcurrentHashMap<>();
        topicReplyMap.put(topic_post_reply,topic_post);
        topicReplyMap.put(topic_receive,topic_receive_reply);
        return topicReplyMap;
    }
}

MqttConsumer.java 和 MqttConsumerCallback.java

  1. 新增 MQTT 客户端MqttConsumer和 MQTT 接收消息的处理接口MqttConsumerCallback
  • MqttConsumer.java
package iot.cloud.platform.lock.mqtt;

import iot.cloud.platform.lock.service.ConfigService;
import iot.cloud.platform.lock.utils.ExcptUtil;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;

@Slf4j
@Component
public class MqttConsumer implements ApplicationRunner {
    @Autowired
    private MqttClient client;

    @Autowired
    private MqttConnectOptions options;

    @Autowired
    private MqttConsumerCallback callback;

    @Autowired
    @Qualifier("mqttSubTopics")
    private Map<String,Integer> mqttSubTopics;

    private final Map<String,String> topicReplyMap=new HashMap<>();

    @Autowired
    private ConfigService configService;

    @Override
    public void run(ApplicationArguments args) {
        log.info("初始化并启动mqtt......");
        init();
    }

    /**
     * 连接mqtt服务器
     */
    private void init() {
        try {
            client.setCallback(callback);
            client.connect(options);
        } catch (Exception e) {
            log.info("mqtt连接异常:" + ExcptUtil.filterStack(e));
        }
    }

    private String[] getSubTopics(){
        return mqttSubTopics.keySet().stream().toArray(String[]::new);
    }



    /**
     * 订阅某个主题
     *
     * @param topic
     * @param qos
     */
    public void subscribe(String topic, int qos) {
        try {
            client.subscribe(topic, qos);
            mqttSubTopics.put(topic,qos);
            log.info("订阅主题:" + topic);
        } catch (MqttException e) {
            log.error(ExcptUtil.filterStack(e));
        }
    }

    /**
     * 发布,非持久化
     *
     *  qos根据文档设置为1
     *
     * @param topic
     * @param msg
     */
    public void publish(String topic, String msg) {
        publish(1, false, topic, msg);
    }

    /**
     * 发布
     */
    public void publish(int qos, boolean retained, String topic, String pushMessage) {
        MqttMessage message = new MqttMessage();
        message.setQos(qos);
        message.setRetained(retained);
        message.setPayload(pushMessage.getBytes());
        MqttTopic mTopic = client.getTopic(topic);
        if (null == mTopic) {
            log.info("topic:" + topic + " 不存在");
        }
        MqttDeliveryToken token;
        try {
            token = mTopic.publish(message);
            token.waitForCompletion();

            if (!token.isComplete()) {
                log.info("消息发送成功");
            }
        } catch (MqttPersistenceException e) {
            log.error(ExcptUtil.filterStack(e));
        } catch (MqttException e) {
            log.error(ExcptUtil.filterStack(e));
        }
    }
}
  • MqttConsumerCallback.java
package iot.cloud.platform.lock.mqtt;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import iot.cloud.platform.lock.service.PwdService;
import iot.cloud.platform.lock.utils.DateUtil;
import iot.cloud.platform.lock.utils.ExcptUtil;
import iot.cloud.platform.lock.vo.MqttMsg;
import iot.cloud.platform.lock.vo.ResMsg;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

/**
 * mqtt回调处理类
 */

@Component
@Slf4j
public class MqttConsumerCallback implements MqttCallbackExtended {

    @Autowired
    private MqttClient client;
    @Autowired
    private MqttConnectOptions options;
    @Autowired
    @Qualifier("mqttSubTopics")
    private Map<String,Integer> mqttSubTopics;
	
    @Autowired
    @Qualifier("mqttTopicReplyMap")
    private Map<String,String> mqttTopicReplyMap;
	
    @Autowired
    private PwdService pwdService;
    /**
     * 断开重连
     */
    @Override
    public void connectionLost(Throwable cause) {
        log.info("MQTT连接断开",cause);
        try {
            if (null != client && !client.isConnected()) {
                client.reconnect();
                log.info("尝试重新连接");
            } else {
                client.connect(options);
                log.info("尝试建立新连接");
            }
        } catch (Exception e) {
            log.error(ExcptUtil.filterStack(e));
        }
    }

    /**
     * 消息处理
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) {
        try {
            String msg = new String(message.getPayload());
            log.info("收到主题[" + topic + "]消息 ->" + msg);
            ObjectMapper objMapper=new ObjectMapper();
            MqttMsg<HashMap> resvMsg=null;
            try {
                //ObjectMapper 是 Jackson包用于解析 Json 为java 对象
                resvMsg = objMapper.readValue(msg, MqttMsg.class);
            } catch (JsonProcessingException e) {
                log.error(ExcptUtil.filterStack(e));
            }

            //TODO:请完善此处代码,完成实验6.3,6.4的功能。
            ResMsg returnVal=null;
			
            MqttMsg<ResMsg> replyMsg=new MqttMsg();
            replyMsg.setEventId(resvMsg.getEventId());
            replyMsg.setEventName(resvMsg.getEventName());
            replyMsg.setEventTime(resvMsg.getEventTime());
            replyMsg.setData(returnVal);
            String receiveReplyMsg="{}";
            try {
                //把replyMsg对象转换为 JSON 消息
                receiveReplyMsg=objMapper.writeValueAsString(replyMsg);
                log.info("回复MQTT消息:"+receiveReplyMsg);
				String replyTopic=mqttTopicReplyMap.get(topic);
                if(replyTopic!=null){
                    client.publish(replyTopic,receiveReplyMsg.getBytes(StandardCharsets.UTF_8),1,true);
                }
            } catch (JsonProcessingException e) {
                log.error(e.getMessage(),e);
            }
        } catch (Exception e) {
            log.error(ExcptUtil.filterStack(e));
        }
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

    }

    @Override
    public void connectComplete(boolean b, String s) {
        mqttSubTopics.forEach((k,v) ->{
            try {
                client.subscribe(k,v);
                log.info("订阅主题:"+k);
            } catch (MqttException e) {
                log.error(ExcptUtil.filterStack(e));
            }
        });
    }
}

MqttMsg.java

  1. 新增一个MqttMsg类,用于装载 MQTT 消息。
package iot.cloud.platform.lock.vo;

import lombok.Data;

@Data
public class MqttMsg<T>{
  protected String eventId;
  protected String eventName;
  protected long eventTime;
  protected T data;
}

DateUtil

  1. 新增日期处理工具类DateUtil 和异常处理工具类ExcptUtil
  • DateUtil.java
package iot.cloud.platform.lock.utils;

import org.apache.commons.lang3.StringUtils;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;

public class DateUtil {

  public static Date parseJsonDateTime(String dtText){
    if(StringUtils.isBlank(dtText)){
      return null;
    }
    SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
    try {
      return format.parse(dtText);
    } catch (ParseException e) {
      throw new RuntimeException(e);
    }
  }
}
  • ExcptUtil.java
package iot.cloud.platform.lock.utils;

import org.apache.commons.lang3.StringUtils;

public class ExcptUtil {

    private final static String PACKAGE="iot.cloud.platform.lock";

    public static String getKeyInfo(Throwable t,String packageName){
        String msg="";
        if(t!=null){
            msg+=t.getClass().getName()+" -> ";
        }
        msg+=t.getMessage()+"\n"
                + filterStack(t,packageName);
        return msg;
    }

    public static String filterStack(Throwable throwable, String packageName) {
        if (throwable==null) {
            return "";
        } else {
            StackTraceElement[] stackTraceElements = throwable.getStackTrace();
            String stackTraceElementString = stackTraceElements[0].toString()+"\n";
            if (StringUtils.isEmpty(packageName)) {
                return stackTraceElementString;
            } else {
                StackTraceElement[] eles = stackTraceElements;
                int length = Math.min(stackTraceElements.length,10);

                for(int i = 1; i < length; ++i) {
                    StackTraceElement stackTraceElement = eles[i];
                    stackTraceElementString += stackTraceElement.toString()+"\n";
                    if (stackTraceElement.toString().contains(packageName)) {
                        break;
                    }
                }

                return stackTraceElementString;
            }
        }
    }
    public static String filterStack(Throwable throwable) {
        if (throwable==null) {
            return "";
        } else {
            StackTraceElement[] stackTraceElements = throwable.getStackTrace();
            String stackTraceElementString = stackTraceElements[0].toString()+"\n";
            StackTraceElement[] eles = stackTraceElements;
            int length = Math.min(stackTraceElements.length,10);

            for(int i = 1; i < length; ++i) {
                StackTraceElement stackTraceElement = eles[i];
                stackTraceElementString += stackTraceElement.toString()+"\n";
                if (stackTraceElement.toString().contains(PACKAGE)) {
                    break;
                }
            }
            return stackTraceElementString;
        }
    }

    public static String filterStack(Throwable throwable,Class clazz) {
        if (throwable==null) {
            return "";
        } else {
            StackTraceElement[] stackTraceElements = throwable.getStackTrace();
            String stackTraceElementString = stackTraceElements[0].toString()+"\n";
            StackTraceElement[] eles = stackTraceElements;
            int length = Math.min(stackTraceElements.length,10);

            for(int i = 1; i < length; ++i) {
                StackTraceElement stackTraceElement = eles[i];
                stackTraceElementString += stackTraceElement.toString()+"\n";
                if (stackTraceElement.toString().contains(clazz.getPackage().getName())) {
                    break;
                }
            }
            return stackTraceElementString;
        }
    }
}

测试

  1. 启动 IoTLockApplication。

  2. 参考实验6.1,启动EMQX

  3. 参考实验6.1,使用MQTTX,向/iot/cloud/devices/receive主题发布(publish)以下消息内容。

{"eventId":"1234","eventName":"sayhello","eventTime":1676083251643}
  1. 查看iot-lock项目的 IDEA 的控制台,查看是否能够收到以下消息。
2023-02-11 10:45:23.116  INFO 23288 --- [T Call: lock001] i.c.p.lock.mqtt.MqttConsumerCallback     : 收到主题[/iot/cloud/devices/receive]消息 ->{"eventId":"1234","eventName":"sayhello","eventTime":1676083251643}

【实验名称】6.3 iot-lock 项目实现修改固定密码功能

【实验目的】

  • 掌握使用 SpringBoot 整合 MQTT 协议并进行开发

【实验环境】

  • IDEA
  • Maven 3.6
  • MariaDB 10.4
  • JDK 8
  • EMQX
  • MQTTX

【实验说明】

  1. 此实验承接实验6.2,使用MQTTX模拟一个用户终端,向物联网云平台(EMQX服务器)发送一条修改固定密码的指令,智能锁设备端收到指令,并实现密码的修改。

【实验效果】

  1. 发送修改密码的消息以后,能够收到回复的 MQTT 消息,并且能够用新的固定密码654321解锁。

【实验步骤】

PwdService

  1. PwdService 新增修改固定密码方法resetFixedPwd
/**
   * 更新固定密码
   * @param oPwd 原密码
   * @param nPwd 新密码
   * @return
   */
  ResMsg resetFixedPwd(String oPwd, String nPwd);

PwdServiceImpl

  1. PwdServiceImpl 新增修改固定密码方法resetFixedPwd,并实现以下功能需求。具体代码自行实现。
  • (1)新密码必须是数字,而且位数为6-18位。如果新密码不符合要求,返回ResMsg对象属性值如下:
errcode="4002"
errmsg="密码必须为数字,长度为6-18"

代码提示:全选以下代码可以查看

public boolean isValidPwd(String pwd) {
    String tpwd=StringUtils.trim(pwd);
    boolean result=false;
    Pattern pattern = Pattern.compile("[0-9]{6,18}");//正则表达式校验
    if(pattern.matcher(pwd).matches()){
      result=true;
    }
    return result;
  }
  • (2)如果新密码满足(1)要求,而且旧密码与数据库中的固定密码匹配,返回ResMsg 对象属性值如下:
errcode="0"
errmsg="更新密码成功"
  • (3)如果新密码满足(1)要求,如果密码更新不成功,返回ResMsg 对象属性值如下:
errcode="4001"
errmsg="更新密码失败,旧密码错误"
  1. 重新构建(Build)项目,启动IoTLockApplication

  2. 启动 MQTT 服务器 EMQX。

  3. 使用MQTTX订阅主题/iot/cloud/devices/receive_reply

  4. 使用MQTTX向主题/iot/cloud/devices/receive发送以下消息,修改智能锁设备端密码为654321

{"eventId":"1234","eventName":"resetFixedPwd","eventTime":1676084750362,"data":{"nPwd":"654321","oPwd":"123456"}}
  1. 使用MQTTX查看订阅主题/iot/cloud/devices/receive_reply是否收到智能锁设备端回复的消息。

  2. 浏览器访问http://localhost:8097,尝试使用新的固定密码654321进行解锁,测试是否能够解锁成功。

【实验名称】6.4 iot-lock 项目实现新增临时密码功能(选做)

【实验目的】

  • 掌握使用 SpringBoot 整合 MQTT 协议并进行开发

【实验环境】

  • IDEA
  • Maven 3.6
  • MariaDB 10.4
  • JDK 8
  • EMQX
  • MQTTX

【实验说明】

  1. 此实验承接实验6.3,使用MQTTX 模拟一个用户终端,向物联网云平台( EMQX 服务器)发送一条新增临时密码的指令,智能锁设备端收到指令,实现新增临时密码,并回复用户终端是否新增成功。

【实验效果】

  1. 发送新增临时密码的消息以后,能够收到回复的 MQTT 消息,并且能够用新的临时密码075911解锁。

【实验步骤】

PwdService

  1. PwdService新增方法,其余代码参考6.3自行实现。
/**
   * 新增临时密码
   * @param pwd 临时密码
   * @param expiredTime 过期时间
   * @return
   */
  ResMsg addTempPwd(String pwd, Date expiredTime);
  • 如果新增成功,返回ResMsg 对象属性值如下:
errcode="0"
errmsg="新增密码成功"
  • 如果新增失败,返回ResMsg 对象属性值如下:
errcode="4003"
errmsg="新增密码失败"
  1. 重新构建(Build)项目,启动IoTLockApplication

  2. 启动 MQTT 服务器 EMQX。

  3. 使用MQTTX订阅主题/iot/cloud/devices/receive_reply

  4. 使用MQTTX向主题/iot/cloud/devices/receive发送以下消息,新增智能锁设备端临时密码075911,有效期至2023-7-11 10:00:00

{"eventId":"1234","eventName":"addTempPwd","eventTime":1676086292304,"data":{"pwd":"075911","expiredTime":"2023-07-11T10:00:00"}}
  1. 使用MQTTX查看订阅主题/iot/cloud/devices/receive_reply是否收到智能锁设备端回复的消息。

  1. 浏览器访问http://localhost:8097,尝试使用新的临时密码075911进行解锁,测试是否能够解锁成功。