-
Notifications
You must be signed in to change notification settings - Fork 0
/
mqtt.lib.lua
161 lines (147 loc) · 4.51 KB
/
mqtt.lib.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
local bit = require "bit";
local stream_mt = {};
stream_mt.__index = stream_mt;
function stream_mt:read_bytes(n_bytes)
module:log("debug", "Reading %d bytes... (buffer: %d)", n_bytes, #self.buffer);
local data = self.buffer;
if not data then
module:log("debug", "No data, pausing.");
data = coroutine.yield();
module:log("debug", "Have %d bytes of data now (want %d)", #data, n_bytes);
end
if #data >= n_bytes then
data, self.buffer = data:sub(1, n_bytes), data:sub(n_bytes+1);
elseif #data < n_bytes then
module:log("debug", "Not enough data (only %d bytes out of %d), pausing.", #data, n_bytes);
self.buffer = data..coroutine.yield();
module:log("debug", "Now we have %d bytes, reading...", #data);
return self:read_bytes(n_bytes);
end
module:log("debug", "Returning %d bytes (buffer: %d)", #data, #self.buffer);
return data;
end
function stream_mt:read_string()
local len1, len2 = self:read_bytes(2):byte(1,2);
local len = bit.lshift(len1, 8) + len2;
return self:read_bytes(len), len+2;
end
local packet_type_codes = {
"connect", "connack",
"publish", "puback", "pubrec", "pubrel", "pubcomp",
"subscribe", "subak", "unsubscribe", "unsuback",
"pingreq", "pingresp",
"disconnect"
};
function stream_mt:read_packet()
local packet = {};
local header = self:read_bytes(1):byte();
packet.type = packet_type_codes[bit.rshift(bit.band(header, 0xf0), 4)];
packet.dup = bit.band(header, 0x08) == 0x08;
packet.qos = bit.rshift(bit.band(header, 0x06), 1);
packet.retain = bit.band(header, 0x01) == 0x01;
-- Get length
local length, multiplier = 0, 1;
repeat
local digit = self:read_bytes(1):byte();
length = length + bit.band(digit, 0x7f)*multiplier;
multiplier = multiplier*128;
until bit.band(digit, 0x80) == 0;
packet.length = length;
if packet.type == "connect" then
if self:read_string() ~= "MQIsdp" then
module:log("warn", "Unexpected packet signature!");
packet.type = nil; -- Invalid packet
else
packet.version = self:read_bytes(1):byte();
packet.connect_flags = self:read_bytes(1):byte();
packet.keepalive_timer = self:read_bytes(1):byte();
length = length - 11;
end
elseif packet.type == "publish" then
packet.topic = self:read_string();
length = length - (#packet.topic+2);
if packet.qos == 1 or packet.qos == 2 then
packet.id = self:read_bytes(2);
length = length - 2;
end
elseif packet.type == "subscribe" then
if packet.qos == 1 or packet.qos == 2 then
packet.id = self:read_bytes(2);
length = length - 2;
end
local topics = {};
while length > 0 do
local topic, len = self:read_string();
table.insert(topics, topic);
self:read_bytes(1); -- QoS not used
length = length - (len+1);
end
packet.topics = topics;
end
if length > 0 then
packet.data = self:read_bytes(length);
end
return packet;
end
local function new_parser(self)
return coroutine.wrap(function (data)
self.buffer = data;
while true do
data = coroutine.yield(self:read_packet());
module:log("debug", "Parser: %d new bytes", #data);
self.buffer = (self.buffer or "")..data;
end
end);
end
function stream_mt:feed(data)
module:log("debug", "Feeding %d bytes", #data);
local packets = {};
local packet = self.parser(data);
while packet do
module:log("debug", "Received packet");
table.insert(packets, packet);
packet = self.parser("");
end
module:log("debug", "Returning %d packets", #packets);
return packets;
end
local function new_stream()
local stream = setmetatable({}, stream_mt);
stream.parser = new_parser(stream);
return stream;
end
local function serialize_packet(packet)
local type_num = 0;
for i, v in ipairs(packet_type_codes) do -- FIXME: I'm so tired right now.
if v == packet.type then
type_num = i;
break;
end
end
local header = string.char(bit.lshift(type_num, 4));
if packet.type == "publish" then
local topic = packet.topic or "";
packet.data = string.char(bit.band(#topic, 0xff00), bit.band(#topic, 0x00ff))..topic..packet.data;
elseif packet.type == "suback" then
local t = {};
for _, topic in ipairs(packet.topics) do
table.insert(t, string.char(bit.band(#topic, 0xff00), bit.band(#topic, 0x00ff))..topic.."\000");
end
packet.data = table.concat(t);
end
-- Get length
local length = #(packet.data or "");
repeat
local digit = length%128;
length = math.floor(length/128);
if length > 0 then
digit = bit.bor(digit, 0x80);
end
header = header..string.char(digit); -- FIXME: ...
until length <= 0;
return header..(packet.data or "");
end
return {
new_stream = new_stream;
serialize_packet = serialize_packet;
};