【版本】
当前版本号v20230208
版本 | 修改说明 |
---|---|
v20230208 | 换掉Mosquitto 和 MQTT.fx,换为 EMQX 和 MQTTX |
v20220420 | 补充DateUtils |
v20220408 | 初始化 |
【实验名称】6.1 安装 EMQX 和 MQTTX
【实验目的】
- 掌握 EMQ 安装、配置和使用
- 掌握 MQTT 客户端 MQTTX 的安装、配置和使用
【实验环境】
- EMQX
- MQTTX
【实验说明】
EMQX 是基于高并发的Erlang/OTP语言平台设计,支持百万级连接和分布式集群,发布订阅模式的开源MQTT消息服务器。完整支持MQTT V3.1/V3.1.1协议规范,扩展支持WebSocket、Stomp、CoAP、MQTT-SN或私有TCP协议。官网地址
MQTTX 是EMQ 开源的一款优雅的跨平台MQTT 5.0 桌面客户端工具,它能运行在macOS, Linux, Windows 上,并且支持MQTT 消息格式转换。官网地址
【实验资源】
链接:https://pan.baidu.com/s/1bleh6m4YFCKaAKA8QHEf8A 提取码:f6hc
复制
【实验效果】
- MQTTX 向订阅主题发布消息,并测试是否能够收到订阅的消息。
【实验步骤】
EMQX 安装
下载
emqx-5.0.14-windows-amd64.tar.gz
,解压到安装目录下,例如d:\emqx
。进入安装目录的
bin
目录下,新建一个文本文件,并命名为0-start-emqx.bat
,作为 EMQX 的启动脚本。
cd d: cd %~dp0 emqx start
复制
- 同样在
bin
目录下,新建一个文本文件,并命名为0-stop-emqx.bat
,作为 EMQX 的启动脚本。
cd d: cd %~dp0 emqx stop
复制
双击脚本
0-start-emqx.bat
启动EMQX
。MQTT 服务器默认监听在tcp://localhost:1883
。输入默认用户名 admin 与默认密码 public,登录进入 Dashboard。
MQTTX 安装
下载并安装
MQTTX-Setup-1.7.3-x64.exe
,启动MQTTX.exe
。启动 MQTTX,新建连接。
输入 MQTT 代理地址、用户名和密码进行连接。
用户名:iotweb 密码:iotweb Host: mqtt://localhost
复制
观察 EMQX 服务器http://localhost:18083/#/connections是否有连接来自于 MQTTX 客户端
使用MQTTX,订阅(subscribe)
/iot/cloud/lock01/receive
主题。使用
MQTTX
,向/iot/cloud/lock01/receive
主题发布(publish)以下消息内容。
hello
复制
- 查看订阅主题收到的消息。
【实验名称】6.2 iot-lock 项目配置 MQTT 客户端
【实验目的】
- 掌握使用 SpringBoot 整合 MQTT 的配置
【实验环境】
- IDEA
- Maven 3.6
- MariaDB 10.4
- JDK 8
- EMQX
- MQTTX
【实验说明】
- 本实验承接实验4.3,在此基础上给智能锁设备端新增 MQTT 配置,实现 MQTT 通讯功能。
- 本实验使用 MQTTX 模拟用户终端,向模拟的物联网云平台的 MQTT 服务器(即 EMQX)发送消息,检查智能锁设备端能否收到订阅的 MQTT 消息。
【实验效果】
- 使用 MQTTX 模拟用户终端想主题发送消息,智能锁设备端能够收到订阅的消息。
【实验步骤】
pom.xml
- pom.xml 新增以下依赖配置,引入 MQTT 依赖包。
<!-- MQTT start --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency> <!-- MQTT end -->
复制
application.yml
application.yml
配置文件新增以下配置,连接 MQTT 服务器,并实现配置订阅主题和推送主题。在此配置文件中,我们设置了让智能锁设备订阅/iot/cloud/devices/receive
主题。
mqtt: host: tcp://localhost:1883 username: iotweb password: iotweb qos: 1 client-id: lock001 topic: post: /iot/cloud/devices/post post_reply: /iot/cloud/devices/post_reply receive: /iot/cloud/devices/receive receive_reply: /iot/cloud/devices/receive_reply 复制
MqttConfig.java
- 新增一个MQTT 的配置类
MqttConfig
。
package iot.cloud.platform.lock.config; import iot.cloud.platform.lock.utils.ExcptUtil; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @Configuration @Slf4j @Data public class MqttConfig { @Value("${mqtt.host}") private String host; @Value("${mqtt.username}") private String username; @Value("${mqtt.password}") private String password; @Value("${mqtt.client-id}") private String clientId; @Value("${mqtt.topic.post}") private String topic_post; @Value("${mqtt.topic.post_reply}") private String topic_post_reply; @Value("${mqtt.topic.receive}") private String topic_receive; @Value("${mqtt.topic.receive_reply}") private String topic_receive_reply; /** * qos * MQTT协议中有三种消息发布服务质量: * QOS 0: “至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。 * QOS 1: “至少一次”,确保消息到达,但消息重复可能会发生。 * QOS2: “只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果,资源开销大 * */ @Value("${mqtt.qos}") private int qos; @Bean public MqttClient getMqttClient(){ MqttClient client=null; try { final String id= clientId; client = new MqttClient(host, id, new MemoryPersistence()); log.info("创建mqtt客户端ID:"+id); } catch (Exception e) { log.error("创建mqtt客户端异常:"+ ExcptUtil.filterStack(e)); } return client; } @Bean public MqttConnectOptions getOptions() { MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(username); options.setPassword(password.toCharArray()); // 是否清除session options.setCleanSession(false); return options; } /** * 初始化已订阅主题 * @return */ @Bean("mqttSubTopics") public Map<String,Integer> getMqttSubTopics(){ Map<String,Integer> mqttSubTopics=new ConcurrentHashMap<>(); mqttSubTopics.put(topic_post_reply,qos); mqttSubTopics.put(topic_receive,qos); return mqttSubTopics; } /** * 初始化已订阅主题对应的回复主题 * @return */ @Bean("mqttTopicReplyMap") public Map<String,String> getMqttTopicReplyMap(){ Map<String,String> topicReplyMap=new ConcurrentHashMap<>(); topicReplyMap.put(topic_post_reply,topic_post); topicReplyMap.put(topic_receive,topic_receive_reply); return topicReplyMap; } }
复制
MqttConsumer.java 和 MqttConsumerCallback.java
- 新增 MQTT 客户端
MqttConsumer
和 MQTT 接收消息的处理接口MqttConsumerCallback
- MqttConsumer.java
package iot.cloud.platform.lock.mqtt; import iot.cloud.platform.lock.service.ConfigService; import iot.cloud.platform.lock.utils.ExcptUtil; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; @Slf4j @Component public class MqttConsumer implements ApplicationRunner { @Autowired private MqttClient client; @Autowired private MqttConnectOptions options; @Autowired private MqttConsumerCallback callback; @Autowired @Qualifier("mqttSubTopics") private Map<String,Integer> mqttSubTopics; private final Map<String,String> topicReplyMap=new HashMap<>(); @Autowired private ConfigService configService; @Override public void run(ApplicationArguments args) { log.info("初始化并启动mqtt......"); init(); } /** * 连接mqtt服务器 */ private void init() { try { client.setCallback(callback); client.connect(options); } catch (Exception e) { log.info("mqtt连接异常:" + ExcptUtil.filterStack(e)); } } private String[] getSubTopics(){ return mqttSubTopics.keySet().stream().toArray(String[]::new); } /** * 订阅某个主题 * * @param topic * @param qos */ public void subscribe(String topic, int qos) { try { client.subscribe(topic, qos); mqttSubTopics.put(topic,qos); log.info("订阅主题:" + topic); } catch (MqttException e) { log.error(ExcptUtil.filterStack(e)); } } /** * 发布,非持久化 * * qos根据文档设置为1 * * @param topic * @param msg */ public void publish(String topic, String msg) { publish(1, false, topic, msg); } /** * 发布 */ public void publish(int qos, boolean retained, String topic, String pushMessage) { MqttMessage message = new MqttMessage(); message.setQos(qos); message.setRetained(retained); message.setPayload(pushMessage.getBytes()); MqttTopic mTopic = client.getTopic(topic); if (null == mTopic) { log.info("topic:" + topic + " 不存在"); } MqttDeliveryToken token; try { token = mTopic.publish(message); token.waitForCompletion(); if (!token.isComplete()) { log.info("消息发送成功"); } } catch (MqttPersistenceException e) { log.error(ExcptUtil.filterStack(e)); } catch (MqttException e) { log.error(ExcptUtil.filterStack(e)); } } }
复制
- MqttConsumerCallback.java
package iot.cloud.platform.lock.mqtt; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import iot.cloud.platform.lock.service.PwdService; import iot.cloud.platform.lock.utils.DateUtil; import iot.cloud.platform.lock.utils.ExcptUtil; import iot.cloud.platform.lock.vo.MqttMsg; import iot.cloud.platform.lock.vo.ResMsg; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; import java.util.Arrays; import java.util.HashMap; import java.util.Map; /** * mqtt回调处理类 */ @Component @Slf4j public class MqttConsumerCallback implements MqttCallbackExtended { @Autowired private MqttClient client; @Autowired private MqttConnectOptions options; @Autowired @Qualifier("mqttSubTopics") private Map<String,Integer> mqttSubTopics; @Autowired @Qualifier("mqttTopicReplyMap") private Map<String,String> mqttTopicReplyMap; @Autowired private PwdService pwdService; /** * 断开重连 */ @Override public void connectionLost(Throwable cause) { log.info("MQTT连接断开",cause); try { if (null != client && !client.isConnected()) { client.reconnect(); log.info("尝试重新连接"); } else { client.connect(options); log.info("尝试建立新连接"); } } catch (Exception e) { log.error(ExcptUtil.filterStack(e)); } } /** * 消息处理 */ @Override public void messageArrived(String topic, MqttMessage message) { try { String msg = new String(message.getPayload()); log.info("收到主题[" + topic + "]消息 ->" + msg); ObjectMapper objMapper=new ObjectMapper(); MqttMsg<HashMap> resvMsg=null; try { //ObjectMapper 是 Jackson包用于解析 Json 为java 对象 resvMsg = objMapper.readValue(msg, MqttMsg.class); } catch (JsonProcessingException e) { log.error(ExcptUtil.filterStack(e)); } //TODO:请完善此处代码,完成实验6.3,6.4的功能。 ResMsg returnVal=null; MqttMsg<ResMsg> replyMsg=new MqttMsg(); replyMsg.setEventId(resvMsg.getEventId()); replyMsg.setEventName(resvMsg.getEventName()); replyMsg.setEventTime(resvMsg.getEventTime()); replyMsg.setData(returnVal); String receiveReplyMsg="{}"; try { //把replyMsg对象转换为 JSON 消息 receiveReplyMsg=objMapper.writeValueAsString(replyMsg); log.info("回复MQTT消息:"+receiveReplyMsg); String replyTopic=mqttTopicReplyMap.get(topic); if(replyTopic!=null){ client.publish(replyTopic,receiveReplyMsg.getBytes(StandardCharsets.UTF_8),1,true); } } catch (JsonProcessingException e) { log.error(e.getMessage(),e); } } catch (Exception e) { log.error(ExcptUtil.filterStack(e)); } } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { } @Override public void connectComplete(boolean b, String s) { mqttSubTopics.forEach((k,v) ->{ try { client.subscribe(k,v); log.info("订阅主题:"+k); } catch (MqttException e) { log.error(ExcptUtil.filterStack(e)); } }); } }
复制
MqttMsg.java
- 新增一个
MqttMsg
类,用于装载 MQTT 消息。
package iot.cloud.platform.lock.vo; import lombok.Data; @Data public class MqttMsg<T>{ protected String eventId; protected String eventName; protected long eventTime; protected T data; }
复制
DateUtil
- 新增日期处理工具类
DateUtil
和异常处理工具类ExcptUtil
- DateUtil.java
package iot.cloud.platform.lock.utils; import org.apache.commons.lang3.StringUtils; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; public class DateUtil { public static Date parseJsonDateTime(String dtText){ if(StringUtils.isBlank(dtText)){ return null; } SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss"); try { return format.parse(dtText); } catch (ParseException e) { throw new RuntimeException(e); } } }
复制
- ExcptUtil.java
package iot.cloud.platform.lock.utils; import org.apache.commons.lang3.StringUtils; public class ExcptUtil { private final static String PACKAGE="iot.cloud.platform.lock"; public static String getKeyInfo(Throwable t,String packageName){ String msg=""; if(t!=null){ msg+=t.getClass().getName()+" -> "; } msg+=t.getMessage()+"\n" + filterStack(t,packageName); return msg; } public static String filterStack(Throwable throwable, String packageName) { if (throwable==null) { return ""; } else { StackTraceElement[] stackTraceElements = throwable.getStackTrace(); String stackTraceElementString = stackTraceElements[0].toString()+"\n"; if (StringUtils.isEmpty(packageName)) { return stackTraceElementString; } else { StackTraceElement[] eles = stackTraceElements; int length = Math.min(stackTraceElements.length,10); for(int i = 1; i < length; ++i) { StackTraceElement stackTraceElement = eles[i]; stackTraceElementString += stackTraceElement.toString()+"\n"; if (stackTraceElement.toString().contains(packageName)) { break; } } return stackTraceElementString; } } } public static String filterStack(Throwable throwable) { if (throwable==null) { return ""; } else { StackTraceElement[] stackTraceElements = throwable.getStackTrace(); String stackTraceElementString = stackTraceElements[0].toString()+"\n"; StackTraceElement[] eles = stackTraceElements; int length = Math.min(stackTraceElements.length,10); for(int i = 1; i < length; ++i) { StackTraceElement stackTraceElement = eles[i]; stackTraceElementString += stackTraceElement.toString()+"\n"; if (stackTraceElement.toString().contains(PACKAGE)) { break; } } return stackTraceElementString; } } public static String filterStack(Throwable throwable,Class clazz) { if (throwable==null) { return ""; } else { StackTraceElement[] stackTraceElements = throwable.getStackTrace(); String stackTraceElementString = stackTraceElements[0].toString()+"\n"; StackTraceElement[] eles = stackTraceElements; int length = Math.min(stackTraceElements.length,10); for(int i = 1; i < length; ++i) { StackTraceElement stackTraceElement = eles[i]; stackTraceElementString += stackTraceElement.toString()+"\n"; if (stackTraceElement.toString().contains(clazz.getPackage().getName())) { break; } } return stackTraceElementString; } } }
复制
测试
启动 IoTLockApplication。
参考实验6.1,启动
EMQX
。参考实验6.1,使用
MQTTX
,向/iot/cloud/devices/receive
主题发布(publish)以下消息内容。
{"eventId":"1234","eventName":"sayhello","eventTime":1676083251643}
复制
- 查看
iot-lock
项目的 IDEA 的控制台,查看是否能够收到以下消息。
2023-02-11 10:45:23.116 INFO 23288 --- [T Call: lock001] i.c.p.lock.mqtt.MqttConsumerCallback : 收到主题[/iot/cloud/devices/receive]消息 ->{"eventId":"1234","eventName":"sayhello","eventTime":1676083251643}
复制
【实验名称】6.3 iot-lock 项目实现修改固定密码功能
【实验目的】
- 掌握使用 SpringBoot 整合 MQTT 协议并进行开发
【实验环境】
- IDEA
- Maven 3.6
- MariaDB 10.4
- JDK 8
- EMQX
- MQTTX
【实验说明】
- 此实验承接实验6.2,使用MQTTX模拟一个用户终端,向物联网云平台(EMQX服务器)发送一条修改固定密码的指令,智能锁设备端收到指令,并实现密码的修改。
【实验效果】
- 发送修改密码的消息以后,能够收到回复的 MQTT 消息,并且能够用新的固定密码
654321
解锁。
【实验步骤】
PwdService
- PwdService 新增修改固定密码方法
resetFixedPwd
/** * 更新固定密码 * @param oPwd 原密码 * @param nPwd 新密码 * @return */ ResMsg resetFixedPwd(String oPwd, String nPwd);
复制
PwdServiceImpl
- PwdServiceImpl 新增修改固定密码方法
resetFixedPwd
,并实现以下功能需求。具体代码自行实现。
- (1)新密码必须是数字,而且位数为6-18位。如果新密码不符合要求,返回ResMsg对象属性值如下:
errcode="4002" errmsg="密码必须为数字,长度为6-18"
复制
代码提示:全选以下代码可以查看
public boolean isValidPwd(String pwd) { String tpwd=StringUtils.trim(pwd); boolean result=false; Pattern pattern = Pattern.compile("[0-9]{6,18}");//正则表达式校验 if(pattern.matcher(pwd).matches()){ result=true; } return result; }
- (2)如果新密码满足(1)要求,而且旧密码与数据库中的固定密码匹配,返回ResMsg 对象属性值如下:
errcode="0" errmsg="更新密码成功"
- (3)如果新密码满足(1)要求,如果密码更新不成功,返回ResMsg 对象属性值如下:
errcode="4001" errmsg="更新密码失败,旧密码错误"
重新构建(Build)项目,启动
IoTLockApplication
。启动 MQTT 服务器 EMQX。
使用
MQTTX
订阅主题/iot/cloud/devices/receive_reply
。使用
MQTTX
向主题/iot/cloud/devices/receive
发送以下消息,修改智能锁设备端密码为654321
。
{"eventId":"1234","eventName":"resetFixedPwd","eventTime":1676084750362,"data":{"nPwd":"654321","oPwd":"123456"}}
使用
MQTTX
查看订阅主题/iot/cloud/devices/receive_reply
是否收到智能锁设备端回复的消息。浏览器访问http://localhost:8097,尝试使用新的固定密码
654321
进行解锁,测试是否能够解锁成功。
【实验名称】6.4 iot-lock 项目实现新增临时密码功能(选做)
【实验目的】
- 掌握使用 SpringBoot 整合 MQTT 协议并进行开发
【实验环境】
- IDEA
- Maven 3.6
- MariaDB 10.4
- JDK 8
- EMQX
- MQTTX
【实验说明】
- 此实验承接实验6.3,使用MQTTX 模拟一个用户终端,向物联网云平台( EMQX 服务器)发送一条新增临时密码的指令,智能锁设备端收到指令,实现新增临时密码,并回复用户终端是否新增成功。
【实验效果】
- 发送新增临时密码的消息以后,能够收到回复的 MQTT 消息,并且能够用新的临时密码
075911
解锁。
【实验步骤】
PwdService
PwdService
新增方法,其余代码参考6.3自行实现。
/** * 新增临时密码 * @param pwd 临时密码 * @param expiredTime 过期时间 * @return */ ResMsg addTempPwd(String pwd, Date expiredTime);
- 如果新增成功,返回ResMsg 对象属性值如下:
errcode="0" errmsg="新增密码成功"
- 如果新增失败,返回ResMsg 对象属性值如下:
errcode="4003" errmsg="新增密码失败"
重新构建(Build)项目,启动
IoTLockApplication
。启动 MQTT 服务器 EMQX。
使用
MQTTX
订阅主题/iot/cloud/devices/receive_reply
。使用
MQTTX
向主题/iot/cloud/devices/receive
发送以下消息,新增智能锁设备端临时密码075911
,有效期至2023-7-11 10:00:00
。
{"eventId":"1234","eventName":"addTempPwd","eventTime":1676086292304,"data":{"pwd":"075911","expiredTime":"2023-07-11T10:00:00"}}
- 使用
MQTTX
查看订阅主题/iot/cloud/devices/receive_reply
是否收到智能锁设备端回复的消息。
- 浏览器访问http://localhost:8097,尝试使用新的临时密码
075911
进行解锁,测试是否能够解锁成功。