-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpacket.go
154 lines (122 loc) · 3.13 KB
/
packet.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
/*
Package mq provides a mqtt-v5.0 protocol implementation
The specification is found at
https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html
*/
package mq
import (
"encoding"
"fmt"
"io"
)
// ReadPacket reads one packet from the reader. Returns a io.EOF or
// Malformed error on failure.
func ReadPacket(r io.Reader) (ControlPacket, error) {
var fh fixedHeader
if _, err := fh.ReadFrom(r); err != nil {
return nil, fmt.Errorf("ReadPacket: %w", err)
}
return fh.ReadRemaining(r)
}
// Dump writes all packet fields to the given writer, including empty
// value ones.
func Dump(w io.Writer, p Packet) {
if p, ok := p.(interface{ dump(io.Writer) }); ok {
p.dump(w)
}
}
// Packet and ControlPacket can be used interchangebly.
type Packet = ControlPacket
type ControlPacket interface {
// Write the packet in wireformat to a writer
io.WriterTo
// Unmarshal wireformat
encoding.BinaryUnmarshaler
// Return a short readable string suitable for logging
fmt.Stringer
}
// HasPacketID is implemented by packets carrying a packet ID.
type HasPacketID interface {
PacketID() uint16
}
// HasReason is implemented by packets carrying a reason code.
type HasReason interface {
ReasonCode() ReasonCode
}
// HasWellFormed is implemented by packets that implement WellFormed.
type HasWellFormed interface {
WellFormed() *Malformed
}
type fixedHeader struct {
fixed bits
remainingLen vbint
}
// ReadFrom reads the fixed byte and the remaining length, use
// ReadRemaining for the rest.
//
// Note: ReasonString for splitting this up is so we can compare
// performance as pahos Unpack works on the remaining only.
func (f *fixedHeader) ReadFrom(r io.Reader) (int64, error) {
n, err := f.fixed.ReadFrom(r)
if err != nil {
return n, err
}
m, err := f.remainingLen.ReadFrom(r)
return n + m, err
}
// ReadRemaining reads the reamining data and converts to a control
// packet.
func (f *fixedHeader) ReadRemaining(r io.Reader) (ControlPacket, error) {
var p ControlPacket
switch byte(f.fixed) & 0b1111_0000 {
case PUBLISH:
p = &Publish{fixed: f.fixed}
case PUBREL:
p = &PubRel{fixed: f.fixed}
case PUBCOMP:
p = &PubComp{fixed: f.fixed}
case PUBREC:
p = &PubRec{fixed: f.fixed}
case PUBACK:
p = &PubAck{fixed: f.fixed}
case CONNECT:
p = &Connect{fixed: f.fixed}
case CONNACK:
p = &ConnAck{fixed: f.fixed}
case SUBSCRIBE:
p = &Subscribe{fixed: f.fixed}
case UNSUBSCRIBE:
p = &Unsubscribe{fixed: f.fixed}
case SUBACK:
p = &SubAck{fixed: f.fixed}
case UNSUBACK:
p = &UnsubAck{fixed: f.fixed}
case PINGREQ:
p = &PingReq{fixed: f.fixed}
case PINGRESP:
p = &PingResp{fixed: f.fixed}
case DISCONNECT:
p = &Disconnect{fixed: f.fixed}
case AUTH:
p = &Auth{fixed: f.fixed}
default:
p = &Undefined{}
}
if f.remainingLen == 0 {
return p, nil
}
data := make([]byte, int(f.remainingLen))
if _, err := r.Read(data); err != nil {
return nil, fmt.Errorf(
"%s ReadRemaining: %w",
firstByte(f.fixed).String(), err,
)
}
if err := p.UnmarshalBinary(data); err != nil {
return nil, fmt.Errorf(
"%s %v UnmarshalBinary: %w",
firstByte(f.fixed).String(), f.remainingLen, err,
)
}
return p, nil
}