Luat系列教程:6、mqtt代码详解

写在前面:
由于本人并未学习过具体原理,所以本文可能会有多处常识性错误,如有发现请留言指出,谢谢!


阅读本文需要具有的技能:
看过该系列前几篇文章或明白前几篇文章内容的
熟悉lua语法,尤其是数组部分
可以明白字符串、字节码之间的区别
可以自己实践操作
对mqtt通讯有基本的了解
用过这东西

官方demo代码

官方代码可以在github(https://github.com/openLuat/Luat_2G_RDA_8955/)的`Luat_2G_RDA_8955/script_LuaTask/demo/mqtt`目录或luatools的`LuaTools 1.x.x\script\script_LuaTask\demo\mqtt`目录找到

如果你能看懂官方例程,那么可以直接去使用,不需要再看本文了

先定义一个假装能用来测试的mqtt需求

客户端订阅主题:
/d/test/#

设备订阅主题:
/s/test/设备的imei值

需求十分简单:

如果你知道mqtt是什么,肯定能明白

建立文件

首先先新建两个文件,用于测试这个工程

main.lua

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
PROJECT = "MQTT-TEST"
VERSION = "1.0.0"

--根据固件判断模块类型
moduleType = string.find(rtos.get_version(),"8955") and 2 or 4

require "log"
LOG_LEVEL = log.LOGLEVEL_TRACE

require "sys"

--每1分钟查询一次GSM信号强度,每1分钟查询一次基站信息
require "net"
net.startQueryAll(60000, 60000)

--加载硬件看门狗功能模块
--如果用的是720 4g模块,请注释掉这两行
require "wdt"
wdt.setup(pio.P0_30, pio.P0_31)

--加载网络指示灯功能模块
require "netLed"
netLed.setup(true,moduleType == 2 and pio.P1_1 or pio.P2_0,moduleType == 2 and nil or pio.P2_1)

require"mqttTest"

--启动系统框架
sys.init(0, 0)
sys.run()

mqttTest.lua

1
2
3
4
5
module(...,package.seeall)

require"misc"
require"mqtt"
--下面代码一会儿写

建立mqtt线程

一般来说,mqtt连接都是异步运行的,何时应该发送数据,何时应该接收数据,这些逻辑应该让mqtt收发的进程自己进行控制

所以我们在mqttTest.lua中添加一个新的线程(看不懂的回去看前几篇文章),文件改成如下(注意改一下测试服务器信息):

mqttTest.lua

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
module(...,package.seeall)

require"misc"
require"mqtt"

--测试用的服务器信息,按需求自己改
local mqttip,mqttport,mqttuser,mqttpassword,mqttheartbeat = "x.x.x.x","xxxx","user","password",600

--启动mqtt客户端任务
sys.taskInit(
function()
while true do
--该区域的代码会永久循环运行(除非出现语法错误)
end
end)

进行mqtt连接

一般来说,我们会在模块成功获取基站分配的ip后,才会进行网络的连接操作,所以我们需要使用socket.isReady()函数来判断是否连接网络,然后再进行网络操作

在成功获取ip后,我们才能新建一个mqtt对象,对其进行联网操作,mqtt客户端线程代码改为如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
--启动mqtt客户端任务
sys.taskInit(
function()
while true do
--是否获取到了分配的ip(是否连上网)
if socket.isReady() then
local imei = misc.getImei()
--新建一个mqtt对象
local mqttClient = mqtt.client(imei,mqttheartbeat,mqttuser,mqttpassword)
--尝试连接指定服务器
if mqttClient:connect(mqttip,mqttport,"tcp") then
--连接成功
log.info("mqttTest.mqttClient","connect success")
else
log.info("mqttTest.mqttClient","connect fail")
--连接失败
end
else
--没连上网,原地等待一秒,一秒后会循环回去重试
sys.wait(1000)
end
end
end)

对连接失败的处理

上述代码只是一个简单的连接服务器的代码,并且连上之后没有进行任何的其他操作

为了增加代码的稳健性,我们可以利用sys.waitUntil()函数,设置五分钟内没有获取到ip就开启飞行模式几秒,再关闭,让模块重新去获取GPRS连接:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
--启动mqtt客户端任务
sys.taskInit(
function()
while true do
--是否获取到了分配的ip(是否连上网)
if socket.isReady() then
local imei = misc.getImei()
--新建一个mqtt对象
local mqttClient = mqtt.client(imei,mqttheartbeat,mqttuser,mqttpassword)
--尝试连接指定服务器
if mqttClient:connect(mqttip,mqttport,"tcp") then
--连接成功
log.info("mqttTest.mqttClient","connect success")
else
log.info("mqttTest.mqttClient","connect fail")
--连接失败
end
else
--没连上网
--等待网络环境准备就绪,超时时间是5分钟
sys.waitUntil("IP_READY_IND",300000)
--等完了还没连上?
if not socket.isReady() then
--进入飞行模式,20秒之后,退出飞行模式
net.switchFly(true)
sys.wait(20000)
net.switchFly(false)
end
end
end
end)

同样,我们也可以给mqttClient:connect(mqttip,mqttport,"tcp")的连接加上错误次数的判断,连接错误超过五次,强制断开socket连接,等待五秒后重试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
--启动mqtt客户端任务
sys.taskInit(
function()
while true do
local retryConnectCnt = 0 --失败次数统计
--是否获取到了分配的ip(是否连上网)
if socket.isReady() then
local imei = misc.getImei()
--新建一个mqtt对象
local mqttClient = mqtt.client(imei,mqttheartbeat,mqttuser,mqttpassword)
--尝试连接指定服务器
if mqttClient:connect(mqttip,mqttport,"tcp") then
--连接成功
log.info("mqttTest.mqttClient","connect success")
retryConnectCnt = 0 --失败次数清零
else
log.info("mqttTest.mqttClient","connect fail")--连接失败
retryConnectCnt = retryConnectCnt+1 --失败次数加一
end
--断开MQTT连接
mqttClient:disconnect()
if retryConnectCnt>=5 then
link.shut()
retryConnectCnt=0
end
sys.wait(5000)
else
--没连上网
--等待网络环境准备就绪,超时时间是5分钟
sys.waitUntil("IP_READY_IND",300000)
--等完了还没连上?
if not socket.isReady() then
--进入飞行模式,20秒之后,退出飞行模式
net.switchFly(true)
sys.wait(20000)
net.switchFly(false)
end
end
end
end)

添加发送/接收处理函数,订阅主题

到了这一步,整个的mqtt线程只剩下循环处理接收和发送的数据这一部分和订阅topic部分与demo不同了,我们直接把这两部分加到mqtt线程的代码中吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
--启动mqtt客户端任务
sys.taskInit(
function()
while true do
local retryConnectCnt = 0 --失败次数统计
--是否获取到了分配的ip(是否连上网)
if socket.isReady() then
local imei = misc.getImei()
--新建一个mqtt对象
local mqttClient = mqtt.client(imei,mqttheartbeat,mqttuser,mqttpassword)
--尝试连接指定服务器
if mqttClient:connect(mqttip,mqttport,"tcp") then
--连接成功
log.info("mqttTest.mqttClient","connect success")
retryConnectCnt = 0 --失败次数清零
--订阅主题
if mqttClient:subscribe({["/s/test/"..imei]=0}) then
--循环处理接收和发送的数据
while true do
if not mqttInMsgProc(mqttClient) then
log.error("mqttTest.mqttInMsgProc error")
break
end
if not mqttOutMsgProc(mqttClient) then
log.error("mqttTest.mqttOutMsgProc error")
break
end
end
end
else
log.info("mqttTest.mqttClient","connect fail")--连接失败
retryConnectCnt = retryConnectCnt+1 --失败次数加一
end
--断开MQTT连接
mqttClient:disconnect()
if retryConnectCnt>=5 then
link.shut()
retryConnectCnt=0
end
sys.wait(5000)
else
--没连上网
--等待网络环境准备就绪,超时时间是5分钟
sys.waitUntil("IP_READY_IND",300000)
--等完了还没连上?
if not socket.isReady() then
--进入飞行模式,20秒之后,退出飞行模式
net.switchFly(true)
sys.wait(20000)
net.switchFly(false)
end
end
end
end)

可以看到,在接收和发送函数不返回false的情况下,接收和发送循环会一直进行下去;只有当两个函数之一返回false时,才会触发break导致退出该接收和发送循环

mqttInMsgProc(mqttClient)函数

这段的代码相对来说比较简单,我们可以直接使用mqttClient:receive(毫秒数)来接收我们的tcp消息。
我们在合适的地方,新建一个mqttInMsgProc(mqttClient)函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
function mqttInMsgProc(mqttClient)
local result,data
while true do
result,data = mqttClient:receive(2000)
--接收到数据
if result then
log.info("mqttTest.mqttInMsgProc",data.topic,string.toHex(data.payload))
--TODO:根据需求自行处理data.payload
else
break
end
end
return result or data=="timeout"
end

这段代码就是循环获取mqtt消息,如果没获取到,mqttClient:receive(2000)就会返回false,"timeout";如果获取到了,就会返回true,获取到的数据字符串;如果返回了false,不为"timeout",则表示数据处理出错,说明mqtt连接有了什么问题

细心的读者可能看出来了,如果接收函数一直在2秒内有接收到数据,那么这段函数会永远无限循环下去,没办法到达mqttOutMsgProc(mqttClient)函数进行发送数据的操作,所以我们先去讲mqttOutMsgProc(mqttClient)函数的实现过程,再回来改进mqttInMsgProc(mqttClient)函数

mqttOutMsgProc(mqttClient)函数

由于发送函数在mqtt线程中是一个循环的小部分,所以我们要建立一个消息发送的队列:有要发送的发数据时,将数据放到这个队列中;等运行到mqttOutMsgProc(mqttClient)函数时,将队列中的数据一个一个发出去

首先我们要建一个放这种队列的数组,在合适位置声明一下这个数组:

1
2
--数据发送的消息队列
local msgQuene = {}

接着我们构造一个可以往数组里插入数据的函数,table.insert()可以向数组添加数据,所以我们新建一个insertMsg函数:

1
2
3
local function insertMsg(topic,payload,qos,user)
table.insert(msgQuene,{t=topic,p=payload,q=qos,user=user})
end

还记得上面说过的消息接收函数函数会永远无限循环下去的问题吗?我们在合适的地方新建一个判断发送消息队列是否为空的函数:

1
2
3
function waitForSend()
return #msgQuene > 0
end

在数组有数据时,这个函数会返回true,我们可以将mqttInMsgProc(mqttClient)接收到数据后的代码添加一行判断发送队列是否有数据的代码,当检测到发送队列有数据时,就立即退出接收函数,转而去进行发送动作,接收函数最终改为了这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
function mqttInMsgProc(mqttClient)
local result,data
while true do
result,data = mqttClient:receive(2000)
--接收到数据
if result then
log.info("mqttTest.mqttInMsgProc",data.topic,string.toHex(data.payload))
--TODO:根据需求自行处理data.payload
--如果msgQuene中有等待发送的数据,则立即退出本循环
if waitForSend() then return true end
else
break
end
end
return result or data=="timeout"
end

最后我们终于可以开始写消息发送函数了,整体的函数就是检查队列是否为空,不为空的话就发一条消息并将其从队列中删除,然后重复这一操作,函数代码如下:

1
2
3
4
5
6
7
8
9
10
11
function mqttOutMsgProc(mqttClient)
while #msgQuene>0 do --数组大于零?
local outMsg = table.remove(msgQuene,1)--取出并删除一个元素
local result = mqttClient:publish(outMsg.t,outMsg.p,outMsg.q)--推送对应的mqtt消息
if outMsg.user and outMsg.user.cb then --如果存在回调函数
outMsg.user.cb(result,outMsg.user.para)--执行回调函数
end
if not result then return end
end
return true
end

outMsg.user即为消息队列数组中的,消息数组中的,包含了回调函数的,数组(反正挺绕的)

具体就像下面这样用:

1
2
3
4
5
insertMsg("/d/test/123","done",{cb=testcb})

local function testcb()
log.info("test.testcb","test message sent")
end

这样,该条消息发送后就会执行指定的回调函数

完成基本的mqtt线程

经过上述的更改,最终,mqttTest.lua已经实现了连接服务器并自动处理错误的功能,并且预留了消息接收以及向发送队列添加数据的接口,文件的所有代码如下:

mqttTest.lua

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
require"misc"
require"mqtt"

--测试用的服务器信息,按需求自己改
local mqttip,mqttport,mqttuser,mqttpassword,mqttheartbeat = "x.x.x.x","xxxx","user","password",600

--数据发送的消息队列
local msgQuene = {}

local function insertMsg(topic,payload,qos,user)
table.insert(msgQuene,{t=topic,p=payload,q=qos,user=user})
end

function waitForSend()
return #msgQuene > 0
end

function mqttOutMsgProc(mqttClient)
while #msgQuene>0 do --数组大于零?
local outMsg = table.remove(msgQuene,1)--取出并删除一个元素
local result = mqttClient:publish(outMsg.t,outMsg.p,outMsg.q)--推送对应的mqtt消息
if outMsg.user and outMsg.user.cb then --如果存在回调函数
outMsg.user.cb(result,outMsg.user.para)--执行回调函数
end
if not result then return end
end
return true
end

function mqttInMsgProc(mqttClient)
local result,data
while true do
result,data = mqttClient:receive(2000)
--接收到数据
if result then
log.info("mqttTest.mqttInMsgProc",data.topic,string.toHex(data.payload))
--TODO:根据需求自行处理data.payload
--如果msgQuene中有等待发送的数据,则立即退出本循环
if waitForSend() then return true end
else
break
end
end
return result or data=="timeout"
end


--启动mqtt客户端任务
sys.taskInit(
function()
while true do
local retryConnectCnt = 0 --失败次数统计
--是否获取到了分配的ip(是否连上网)
if socket.isReady() then
local imei = misc.getImei()
--新建一个mqtt对象
local mqttClient = mqtt.client(imei,mqttheartbeat,mqttuser,mqttpassword)
--尝试连接指定服务器
if mqttClient:connect(mqttip,mqttport,"tcp") then
--连接成功
log.info("mqttTest.mqttClient","connect success")
retryConnectCnt = 0 --失败次数清零
--订阅主题
if mqttClient:subscribe({["/s/test/"..imei]=0}) then
--循环处理接收和发送的数据
while true do
if not mqttInMsgProc(mqttClient) then
log.error("mqttTest.mqttInMsgProc error")
break
end
if not mqttOutMsgProc(mqttClient) then
log.error("mqttTest.mqttOutMsgProc error")
break
end
end
end
else
log.info("mqttTest.mqttClient","connect fail")--连接失败
retryConnectCnt = retryConnectCnt+1 --失败次数加一
end
--断开MQTT连接
mqttClient:disconnect()
if retryConnectCnt>=5 then
link.shut()
retryConnectCnt=0
end
sys.wait(5000)
else
--没连上网
--等待网络环境准备就绪,超时时间是5分钟
sys.waitUntil("IP_READY_IND",300000)
--等完了还没连上?
if not socket.isReady() then
--进入飞行模式,20秒之后,退出飞行模式
net.switchFly(true)
sys.wait(20000)
net.switchFly(false)
end
end
end
end)

实现协议需求

设备联网后向/d/test/设备的imei值的topic发送payload为当前设备ICCID的字符串

我们只需要在连接成功处添加代码即可,在if mqttClient:subscribe({["/s/test/"..imei]=0}) then所在代码下一行添加:

1
insertMsg("/d/test/"..misc.getImei(),sim.getIccid())

设备收到topic为/s/test/设备的imei值,payload为ok的数据后,再次向/d/test/设备的imei值的topic发送payload为done四个字节的字符串

我们只需要在接收处理消息处添加代码即可,在--TODO:根据需求自行处理data.payload所在代码下一行添加:

1
2
3
if data.topic == "/s/test/"..misc.getImei() and data.payload == "ok" then
insertMsg("/d/test/"..misc.getImei(),"done")
end

完整代码

经过上面的删删改改,功能以及基本实现了,整个文件的代码如下:

mqttTest.lua

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
require"misc"
require"mqtt"

--测试用的服务器信息,按需求自己改
local mqttip,mqttport,mqttuser,mqttpassword,mqttheartbeat = "x.x.x.x","xxxx","user","password",600

--数据发送的消息队列
local msgQuene = {}

local function insertMsg(topic,payload,qos,user)
table.insert(msgQuene,{t=topic,p=payload,q=qos,user=user})
end

function waitForSend()
return #msgQuene > 0
end

function mqttOutMsgProc(mqttClient)
while #msgQuene>0 do --数组大于零?
local outMsg = table.remove(msgQuene,1)--取出并删除一个元素
local result = mqttClient:publish(outMsg.t,outMsg.p,outMsg.q)--推送对应的mqtt消息
if outMsg.user and outMsg.user.cb then --如果存在回调函数
outMsg.user.cb(result,outMsg.user.para)--执行回调函数
end
if not result then return end
end
return true
end

function mqttInMsgProc(mqttClient)
local result,data
while true do
result,data = mqttClient:receive(2000)
--接收到数据
if result then
log.info("mqttTest.mqttInMsgProc",data.topic,string.toHex(data.payload))
--TODO:根据需求自行处理data.payload
if data.topic == "/s/test/"..misc.getImei() and data.payload == "ok" then
insertMsg("/d/test/"..misc.getImei(),"done")
end
--如果msgQuene中有等待发送的数据,则立即退出本循环
if waitForSend() then return true end
else
break
end
end
return result or data=="timeout"
end


--启动mqtt客户端任务
sys.taskInit(
function()
while true do
local retryConnectCnt = 0 --失败次数统计
--是否获取到了分配的ip(是否连上网)
if socket.isReady() then
local imei = misc.getImei()
--新建一个mqtt对象
local mqttClient = mqtt.client(imei,mqttheartbeat,mqttuser,mqttpassword)
--尝试连接指定服务器
if mqttClient:connect(mqttip,mqttport,"tcp") then
--连接成功
log.info("mqttTest.mqttClient","connect success")
retryConnectCnt = 0 --失败次数清零
--订阅主题
if mqttClient:subscribe({["/s/test/"..imei]=0}) then
insertMsg("/d/test/"..misc.getImei(),sim.getIccid())
--循环处理接收和发送的数据
while true do
if not mqttInMsgProc(mqttClient) then
log.error("mqttTest.mqttInMsgProc error")
break
end
if not mqttOutMsgProc(mqttClient) then
log.error("mqttTest.mqttOutMsgProc error")
break
end
end
end
else
log.info("mqttTest.mqttClient","connect fail")--连接失败
retryConnectCnt = retryConnectCnt+1 --失败次数加一
end
--断开MQTT连接
mqttClient:disconnect()
if retryConnectCnt>=5 then
link.shut()
retryConnectCnt=0
end
sys.wait(5000)
else
--没连上网
--等待网络环境准备就绪,超时时间是5分钟
sys.waitUntil("IP_READY_IND",300000)
--等完了还没连上?
if not socket.isReady() then
--进入飞行模式,20秒之后,退出飞行模式
net.switchFly(true)
sys.wait(20000)
net.switchFly(false)
end
end
end
end)

验证功能

我没条件测试这玩意儿。。。谁有空帮我测试下的?

上次更新 2021-01-28