【版本】
当前版本号v20230208
版本 | 修改说明 |
---|---|
v20230208 | 换掉Mosquitto 和 MQTT.fx,换为 EMQX 和 MQTTX |
v20220420 | 补充DateUtils |
v20220408 | 初始化 |
【实验名称】6.1 安装 EMQX 和 MQTTX
【实验目的】
- 掌握 EMQ 安装、配置和使用
- 掌握 MQTT 客户端 MQTTX 的安装、配置和使用
【实验环境】
- EMQX
- MQTTX
【实验说明】
EMQX 是基于高并发的Erlang/OTP语言平台设计,支持百万级连接和分布式集群,发布订阅模式的开源MQTT消息服务器。完整支持MQTT V3.1/V3.1.1协议规范,扩展支持WebSocket、Stomp、CoAP、MQTT-SN或私有TCP协议。官网地址
MQTTX 是EMQ 开源的一款优雅的跨平台MQTT 5.0 桌面客户端工具,它能运行在macOS, Linux, Windows 上,并且支持MQTT 消息格式转换。官网地址
【实验资源】
链接:https://pan.baidu.com/s/1bleh6m4YFCKaAKA8QHEf8A
提取码:f6hc
【实验效果】
- MQTTX 向订阅主题发布消息,并测试是否能够收到订阅的消息。
【实验步骤】
EMQX 安装
下载
emqx-5.0.14-windows-amd64.tar.gz
,解压到安装目录下,例如d:\emqx
。进入安装目录的
bin
目录下,新建一个文本文件,并命名为0-start-emqx.bat
,作为 EMQX 的启动脚本。
cd d:
cd %~dp0
emqx start
- 同样在
bin
目录下,新建一个文本文件,并命名为0-stop-emqx.bat
,作为 EMQX 的启动脚本。
cd d:
cd %~dp0
emqx stop
双击脚本
0-start-emqx.bat
启动EMQX
。MQTT 服务器默认监听在tcp://localhost:1883
。输入默认用户名 admin 与默认密码 public,登录进入 Dashboard。
MQTTX 安装
下载并安装
MQTTX-Setup-1.7.3-x64.exe
,启动MQTTX.exe
。启动 MQTTX,新建连接。
输入 MQTT 代理地址、用户名和密码进行连接。
用户名:iotweb
密码:iotweb
Host: mqtt://localhost
观察 EMQX 服务器http://localhost:18083/#/connections是否有连接来自于 MQTTX 客户端
使用MQTTX,订阅(subscribe)
/iot/cloud/lock01/receive
主题。使用
MQTTX
,向/iot/cloud/lock01/receive
主题发布(publish)以下消息内容。
hello
- 查看订阅主题收到的消息。
【实验名称】6.2 iot-lock 项目配置 MQTT 客户端
【实验目的】
- 掌握使用 SpringBoot 整合 MQTT 的配置
【实验环境】
- IDEA
- Maven 3.6
- MariaDB 10.4
- JDK 8
- EMQX
- MQTTX
【实验说明】
- 本实验承接实验4.3,在此基础上给智能锁设备端新增 MQTT 配置,实现 MQTT 通讯功能。
- 本实验使用 MQTTX 模拟用户终端,向模拟的物联网云平台的 MQTT 服务器(即 EMQX)发送消息,检查智能锁设备端能否收到订阅的 MQTT 消息。
【实验效果】
- 使用 MQTTX 模拟用户终端想主题发送消息,智能锁设备端能够收到订阅的消息。
【实验步骤】
pom.xml
- pom.xml 新增以下依赖配置,引入 MQTT 依赖包。
<!-- MQTT start -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<!-- MQTT end -->
application.yml
application.yml
配置文件新增以下配置,连接 MQTT 服务器,并实现配置订阅主题和推送主题。在此配置文件中,我们设置了让智能锁设备订阅/iot/cloud/devices/receive
主题。
mqtt:
host: tcp://localhost:1883
username: iotweb
password: iotweb
qos: 1
client-id: lock001
topic:
post: /iot/cloud/devices/post
post_reply: /iot/cloud/devices/post_reply
receive: /iot/cloud/devices/receive
receive_reply: /iot/cloud/devices/receive_reply
MqttConfig.java
- 新增一个MQTT 的配置类
MqttConfig
。
package iot.cloud.platform.lock.config;
import iot.cloud.platform.lock.utils.ExcptUtil;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Configuration
@Slf4j
@Data
public class MqttConfig {
@Value("${mqtt.host}")
private String host;
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
@Value("${mqtt.client-id}")
private String clientId;
@Value("${mqtt.topic.post}")
private String topic_post;
@Value("${mqtt.topic.post_reply}")
private String topic_post_reply;
@Value("${mqtt.topic.receive}")
private String topic_receive;
@Value("${mqtt.topic.receive_reply}")
private String topic_receive_reply;
/**
* qos
* MQTT协议中有三种消息发布服务质量:
* QOS 0: “至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
* QOS 1: “至少一次”,确保消息到达,但消息重复可能会发生。
* QOS2: “只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果,资源开销大
*
*/
@Value("${mqtt.qos}")
private int qos;
@Bean
public MqttClient getMqttClient(){
MqttClient client=null;
try {
final String id= clientId;
client = new MqttClient(host, id, new MemoryPersistence());
log.info("创建mqtt客户端ID:"+id);
} catch (Exception e) {
log.error("创建mqtt客户端异常:"+ ExcptUtil.filterStack(e));
}
return client;
}
@Bean
public MqttConnectOptions getOptions() {
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(username);
options.setPassword(password.toCharArray());
// 是否清除session
options.setCleanSession(false);
return options;
}
/**
* 初始化已订阅主题
* @return
*/
@Bean("mqttSubTopics")
public Map<String,Integer> getMqttSubTopics(){
Map<String,Integer> mqttSubTopics=new ConcurrentHashMap<>();
mqttSubTopics.put(topic_post_reply,qos);
mqttSubTopics.put(topic_receive,qos);
return mqttSubTopics;
}
/**
* 初始化已订阅主题对应的回复主题
* @return
*/
@Bean("mqttTopicReplyMap")
public Map<String,String> getMqttTopicReplyMap(){
Map<String,String> topicReplyMap=new ConcurrentHashMap<>();
topicReplyMap.put(topic_post_reply,topic_post);
topicReplyMap.put(topic_receive,topic_receive_reply);
return topicReplyMap;
}
}
MqttConsumer.java 和 MqttConsumerCallback.java
- 新增 MQTT 客户端
MqttConsumer
和 MQTT 接收消息的处理接口MqttConsumerCallback
- MqttConsumer.java
package iot.cloud.platform.lock.mqtt;
import iot.cloud.platform.lock.service.ConfigService;
import iot.cloud.platform.lock.utils.ExcptUtil;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@Component
public class MqttConsumer implements ApplicationRunner {
@Autowired
private MqttClient client;
@Autowired
private MqttConnectOptions options;
@Autowired
private MqttConsumerCallback callback;
@Autowired
@Qualifier("mqttSubTopics")
private Map<String,Integer> mqttSubTopics;
private final Map<String,String> topicReplyMap=new HashMap<>();
@Autowired
private ConfigService configService;
@Override
public void run(ApplicationArguments args) {
log.info("初始化并启动mqtt......");
init();
}
/**
* 连接mqtt服务器
*/
private void init() {
try {
client.setCallback(callback);
client.connect(options);
} catch (Exception e) {
log.info("mqtt连接异常:" + ExcptUtil.filterStack(e));
}
}
private String[] getSubTopics(){
return mqttSubTopics.keySet().stream().toArray(String[]::new);
}
/**
* 订阅某个主题
*
* @param topic
* @param qos
*/
public void subscribe(String topic, int qos) {
try {
client.subscribe(topic, qos);
mqttSubTopics.put(topic,qos);
log.info("订阅主题:" + topic);
} catch (MqttException e) {
log.error(ExcptUtil.filterStack(e));
}
}
/**
* 发布,非持久化
*
* qos根据文档设置为1
*
* @param topic
* @param msg
*/
public void publish(String topic, String msg) {
publish(1, false, topic, msg);
}
/**
* 发布
*/
public void publish(int qos, boolean retained, String topic, String pushMessage) {
MqttMessage message = new MqttMessage();
message.setQos(qos);
message.setRetained(retained);
message.setPayload(pushMessage.getBytes());
MqttTopic mTopic = client.getTopic(topic);
if (null == mTopic) {
log.info("topic:" + topic + " 不存在");
}
MqttDeliveryToken token;
try {
token = mTopic.publish(message);
token.waitForCompletion();
if (!token.isComplete()) {
log.info("消息发送成功");
}
} catch (MqttPersistenceException e) {
log.error(ExcptUtil.filterStack(e));
} catch (MqttException e) {
log.error(ExcptUtil.filterStack(e));
}
}
}
- MqttConsumerCallback.java
package iot.cloud.platform.lock.mqtt;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import iot.cloud.platform.lock.service.PwdService;
import iot.cloud.platform.lock.utils.DateUtil;
import iot.cloud.platform.lock.utils.ExcptUtil;
import iot.cloud.platform.lock.vo.MqttMsg;
import iot.cloud.platform.lock.vo.ResMsg;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
/**
* mqtt回调处理类
*/
@Component
@Slf4j
public class MqttConsumerCallback implements MqttCallbackExtended {
@Autowired
private MqttClient client;
@Autowired
private MqttConnectOptions options;
@Autowired
@Qualifier("mqttSubTopics")
private Map<String,Integer> mqttSubTopics;
@Autowired
@Qualifier("mqttTopicReplyMap")
private Map<String,String> mqttTopicReplyMap;
@Autowired
private PwdService pwdService;
/**
* 断开重连
*/
@Override
public void connectionLost(Throwable cause) {
log.info("MQTT连接断开",cause);
try {
if (null != client && !client.isConnected()) {
client.reconnect();
log.info("尝试重新连接");
} else {
client.connect(options);
log.info("尝试建立新连接");
}
} catch (Exception e) {
log.error(ExcptUtil.filterStack(e));
}
}
/**
* 消息处理
*/
@Override
public void messageArrived(String topic, MqttMessage message) {
try {
String msg = new String(message.getPayload());
log.info("收到主题[" + topic + "]消息 ->" + msg);
ObjectMapper objMapper=new ObjectMapper();
MqttMsg<HashMap> resvMsg=null;
try {
//ObjectMapper 是 Jackson包用于解析 Json 为java 对象
resvMsg = objMapper.readValue(msg, MqttMsg.class);
} catch (JsonProcessingException e) {
log.error(ExcptUtil.filterStack(e));
}
//TODO:请完善此处代码,完成实验6.3,6.4的功能。
ResMsg returnVal=null;
MqttMsg<ResMsg> replyMsg=new MqttMsg();
replyMsg.setEventId(resvMsg.getEventId());
replyMsg.setEventName(resvMsg.getEventName());
replyMsg.setEventTime(resvMsg.getEventTime());
replyMsg.setData(returnVal);
String receiveReplyMsg="{}";
try {
//把replyMsg对象转换为 JSON 消息
receiveReplyMsg=objMapper.writeValueAsString(replyMsg);
log.info("回复MQTT消息:"+receiveReplyMsg);
String replyTopic=mqttTopicReplyMap.get(topic);
if(replyTopic!=null){
client.publish(replyTopic,receiveReplyMsg.getBytes(StandardCharsets.UTF_8),1,true);
}
} catch (JsonProcessingException e) {
log.error(e.getMessage(),e);
}
} catch (Exception e) {
log.error(ExcptUtil.filterStack(e));
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
@Override
public void connectComplete(boolean b, String s) {
mqttSubTopics.forEach((k,v) ->{
try {
client.subscribe(k,v);
log.info("订阅主题:"+k);
} catch (MqttException e) {
log.error(ExcptUtil.filterStack(e));
}
});
}
}
MqttMsg.java
- 新增一个
MqttMsg
类,用于装载 MQTT 消息。
package iot.cloud.platform.lock.vo;
import lombok.Data;
@Data
public class MqttMsg<T>{
protected String eventId;
protected String eventName;
protected long eventTime;
protected T data;
}
DateUtil
- 新增日期处理工具类
DateUtil
和异常处理工具类ExcptUtil
- DateUtil.java
package iot.cloud.platform.lock.utils;
import org.apache.commons.lang3.StringUtils;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
public class DateUtil {
public static Date parseJsonDateTime(String dtText){
if(StringUtils.isBlank(dtText)){
return null;
}
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
try {
return format.parse(dtText);
} catch (ParseException e) {
throw new RuntimeException(e);
}
}
}
- ExcptUtil.java
package iot.cloud.platform.lock.utils;
import org.apache.commons.lang3.StringUtils;
public class ExcptUtil {
private final static String PACKAGE="iot.cloud.platform.lock";
public static String getKeyInfo(Throwable t,String packageName){
String msg="";
if(t!=null){
msg+=t.getClass().getName()+" -> ";
}
msg+=t.getMessage()+"\n"
+ filterStack(t,packageName);
return msg;
}
public static String filterStack(Throwable throwable, String packageName) {
if (throwable==null) {
return "";
} else {
StackTraceElement[] stackTraceElements = throwable.getStackTrace();
String stackTraceElementString = stackTraceElements[0].toString()+"\n";
if (StringUtils.isEmpty(packageName)) {
return stackTraceElementString;
} else {
StackTraceElement[] eles = stackTraceElements;
int length = Math.min(stackTraceElements.length,10);
for(int i = 1; i < length; ++i) {
StackTraceElement stackTraceElement = eles[i];
stackTraceElementString += stackTraceElement.toString()+"\n";
if (stackTraceElement.toString().contains(packageName)) {
break;
}
}
return stackTraceElementString;
}
}
}
public static String filterStack(Throwable throwable) {
if (throwable==null) {
return "";
} else {
StackTraceElement[] stackTraceElements = throwable.getStackTrace();
String stackTraceElementString = stackTraceElements[0].toString()+"\n";
StackTraceElement[] eles = stackTraceElements;
int length = Math.min(stackTraceElements.length,10);
for(int i = 1; i < length; ++i) {
StackTraceElement stackTraceElement = eles[i];
stackTraceElementString += stackTraceElement.toString()+"\n";
if (stackTraceElement.toString().contains(PACKAGE)) {
break;
}
}
return stackTraceElementString;
}
}
public static String filterStack(Throwable throwable,Class clazz) {
if (throwable==null) {
return "";
} else {
StackTraceElement[] stackTraceElements = throwable.getStackTrace();
String stackTraceElementString = stackTraceElements[0].toString()+"\n";
StackTraceElement[] eles = stackTraceElements;
int length = Math.min(stackTraceElements.length,10);
for(int i = 1; i < length; ++i) {
StackTraceElement stackTraceElement = eles[i];
stackTraceElementString += stackTraceElement.toString()+"\n";
if (stackTraceElement.toString().contains(clazz.getPackage().getName())) {
break;
}
}
return stackTraceElementString;
}
}
}
测试
启动 IoTLockApplication。
参考实验6.1,启动
EMQX
。参考实验6.1,使用
MQTTX
,向/iot/cloud/devices/receive
主题发布(publish)以下消息内容。
{"eventId":"1234","eventName":"sayhello","eventTime":1676083251643}
- 查看
iot-lock
项目的 IDEA 的控制台,查看是否能够收到以下消息。
2023-02-11 10:45:23.116 INFO 23288 --- [T Call: lock001] i.c.p.lock.mqtt.MqttConsumerCallback : 收到主题[/iot/cloud/devices/receive]消息 ->{"eventId":"1234","eventName":"sayhello","eventTime":1676083251643}
【实验名称】6.3 iot-lock 项目实现修改固定密码功能
【实验目的】
- 掌握使用 SpringBoot 整合 MQTT 协议并进行开发
【实验环境】
- IDEA
- Maven 3.6
- MariaDB 10.4
- JDK 8
- EMQX
- MQTTX
【实验说明】
- 此实验承接实验6.2,使用MQTTX模拟一个用户终端,向物联网云平台(EMQX服务器)发送一条修改固定密码的指令,智能锁设备端收到指令,并实现密码的修改。
【实验效果】
- 发送修改密码的消息以后,能够收到回复的 MQTT 消息,并且能够用新的固定密码
654321
解锁。
【实验步骤】
PwdService
- PwdService 新增修改固定密码方法
resetFixedPwd
/**
* 更新固定密码
* @param oPwd 原密码
* @param nPwd 新密码
* @return
*/
ResMsg resetFixedPwd(String oPwd, String nPwd);
PwdServiceImpl
- PwdServiceImpl 新增修改固定密码方法
resetFixedPwd
,并实现以下功能需求。具体代码自行实现。
- (1)新密码必须是数字,而且位数为6-18位。如果新密码不符合要求,返回ResMsg对象属性值如下:
errcode="4002"
errmsg="密码必须为数字,长度为6-18"
代码提示:全选以下代码可以查看
public boolean isValidPwd(String pwd) { String tpwd=StringUtils.trim(pwd); boolean result=false; Pattern pattern = Pattern.compile("[0-9]{6,18}");//正则表达式校验 if(pattern.matcher(pwd).matches()){ result=true; } return result; }
- (2)如果新密码满足(1)要求,而且旧密码与数据库中的固定密码匹配,返回ResMsg 对象属性值如下:
errcode="0"
errmsg="更新密码成功"
- (3)如果新密码满足(1)要求,如果密码更新不成功,返回ResMsg 对象属性值如下:
errcode="4001"
errmsg="更新密码失败,旧密码错误"
重新构建(Build)项目,启动
IoTLockApplication
。启动 MQTT 服务器 EMQX。
使用
MQTTX
订阅主题/iot/cloud/devices/receive_reply
。使用
MQTTX
向主题/iot/cloud/devices/receive
发送以下消息,修改智能锁设备端密码为654321
。
{"eventId":"1234","eventName":"resetFixedPwd","eventTime":1676084750362,"data":{"nPwd":"654321","oPwd":"123456"}}
使用
MQTTX
查看订阅主题/iot/cloud/devices/receive_reply
是否收到智能锁设备端回复的消息。浏览器访问http://localhost:8097,尝试使用新的固定密码
654321
进行解锁,测试是否能够解锁成功。
【实验名称】6.4 iot-lock 项目实现新增临时密码功能(选做)
【实验目的】
- 掌握使用 SpringBoot 整合 MQTT 协议并进行开发
【实验环境】
- IDEA
- Maven 3.6
- MariaDB 10.4
- JDK 8
- EMQX
- MQTTX
【实验说明】
- 此实验承接实验6.3,使用MQTTX 模拟一个用户终端,向物联网云平台( EMQX 服务器)发送一条新增临时密码的指令,智能锁设备端收到指令,实现新增临时密码,并回复用户终端是否新增成功。
【实验效果】
- 发送新增临时密码的消息以后,能够收到回复的 MQTT 消息,并且能够用新的临时密码
075911
解锁。
【实验步骤】
PwdService
PwdService
新增方法,其余代码参考6.3自行实现。
/**
* 新增临时密码
* @param pwd 临时密码
* @param expiredTime 过期时间
* @return
*/
ResMsg addTempPwd(String pwd, Date expiredTime);
- 如果新增成功,返回ResMsg 对象属性值如下:
errcode="0"
errmsg="新增密码成功"
- 如果新增失败,返回ResMsg 对象属性值如下:
errcode="4003"
errmsg="新增密码失败"
重新构建(Build)项目,启动
IoTLockApplication
。启动 MQTT 服务器 EMQX。
使用
MQTTX
订阅主题/iot/cloud/devices/receive_reply
。使用
MQTTX
向主题/iot/cloud/devices/receive
发送以下消息,新增智能锁设备端临时密码075911
,有效期至2023-7-11 10:00:00
。
{"eventId":"1234","eventName":"addTempPwd","eventTime":1676086292304,"data":{"pwd":"075911","expiredTime":"2023-07-11T10:00:00"}}
- 使用
MQTTX
查看订阅主题/iot/cloud/devices/receive_reply
是否收到智能锁设备端回复的消息。
- 浏览器访问http://localhost:8097,尝试使用新的临时密码
075911
进行解锁,测试是否能够解锁成功。