Skip to content

Commit c63766a

Browse files
committed
Move mqtt 3.1.1 codec to seperate package.
1 parent d02d139 commit c63766a

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+989
-539
lines changed

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ metric | labels | description
194194
|mqtt_proxy_build_info| branch, goversion, revision, revision|A metric with a constant '1' value labeled by version, revision, branch, and goversion from which mqtt_proxy was built.|
195195
|mqtt_proxy_server_connections_active| |Number of active TCP connections from clients to server.|
196196
|mqtt_proxy_server_connections_total| |Total number of TCP connections from clients to server.|
197-
|mqtt_proxy_handler_requests_total|type|Total number of MQTT requests labeled by package control type. |
198-
|mqtt_proxy_handler_responses_total|type|Total number of MQTT responses labeled by package control type. |
197+
|mqtt_proxy_handler_requests_total| type, version |Total number of MQTT requests labeled by package control type and protocol version. |
198+
|mqtt_proxy_handler_responses_total| type, version |Total number of MQTT responses labeled by package control type and protocol version. |
199199
|mqtt_proxy_publisher_publish_duration_seconds | name, type, qos | Histogram tracking latencies for publish requests. |
200-
|mqtt_proxy_authenticator_login_duration_seconds | name, code, err | Histogram tracking latencies for login requests. |
200+
|mqtt_proxy_authenticator_login_duration_seconds | name, code, err | Histogram tracking latencies for login requests. |

apis/auth.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@ package apis
33
import (
44
"context"
55

6-
mqttcodec "github.com/grepplabs/mqtt-proxy/pkg/mqtt/codec"
6+
mqttproto "github.com/grepplabs/mqtt-proxy/pkg/mqtt/codec/proto"
77
)
88

99
const (
10-
AuthAccepted = mqttcodec.Accepted
11-
AuthUnauthorized = mqttcodec.RefusedNotAuthorized
10+
AuthAccepted = mqttproto.Accepted
11+
AuthUnauthorized = mqttproto.RefusedNotAuthorized
1212
)
1313

1414
type UserPasswordAuthRequest struct {

pkg/mqtt/codec/encoding.go

Lines changed: 0 additions & 61 deletions
This file was deleted.

pkg/mqtt/codec/packets.go

Lines changed: 44 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -1,120 +1,66 @@
1-
package mqttcodec
1+
package codec
22

33
import (
44
"bytes"
55
"fmt"
66
"io"
7+
8+
mqttproto "github.com/grepplabs/mqtt-proxy/pkg/mqtt/codec/proto"
9+
mqtt311 "github.com/grepplabs/mqtt-proxy/pkg/mqtt/codec/v311"
710
)
811

9-
type ControlPacket interface {
10-
Write(io.Writer) error
11-
Unpack(io.Reader) error
12-
String() string
13-
Type() byte
14-
Name() string
12+
func ReadPacket(r io.Reader, protocolVersion byte) (mqttproto.ControlPacket, error) {
13+
if protocolVersion == 0 {
14+
var (
15+
err error
16+
buf bytes.Buffer
17+
)
18+
versionReader := io.TeeReader(r, &buf)
19+
protocolVersion, err = readConnectVersion(versionReader)
20+
if err != nil {
21+
return nil, err
22+
}
23+
r = io.MultiReader(bytes.NewReader(buf.Bytes()), r)
24+
}
25+
switch protocolVersion {
26+
case mqttproto.MQTT_3_1_1:
27+
return mqtt311.ReadPacket(r)
28+
case mqttproto.MQTT_5:
29+
return nil, mqtt311.NewConnAckError(mqttproto.RefusedUnacceptableProtocolVersion, "mqtt5 is not supported yet")
30+
default:
31+
return nil, mqtt311.NewConnAckError(mqttproto.RefusedUnacceptableProtocolVersion, fmt.Sprintf("unsupported protocol version %v", protocolVersion))
32+
}
1533
}
1634

17-
func ReadPacket(r io.Reader) (ControlPacket, error) {
18-
var fh FixedHeader
35+
func readConnectVersion(r io.Reader) (byte, error) {
36+
// fixed header
37+
var fh mqttproto.FixedHeader
1938
b1 := make([]byte, 1)
2039
_, err := io.ReadFull(r, b1)
2140
if err != nil {
22-
return nil, err
41+
return 0, err
2342
}
24-
25-
err = fh.unpack(b1[0], r)
43+
err = fh.Unpack(b1[0], r)
2644
if err != nil {
27-
return nil, err
45+
return 0, err
2846
}
29-
30-
err = fh.validate()
47+
err = fh.Validate()
3148
if err != nil {
32-
return nil, err
49+
return 0, err
3350
}
34-
35-
cp, err := NewControlPacketWithHeader(fh)
36-
if err != nil {
37-
return nil, err
51+
if fh.MessageType != mqttproto.CONNECT {
52+
return 0, fmt.Errorf("expected CONNECT packet but got type 0x%x", fh.MessageType)
3853
}
39-
40-
packetBytes := make([]byte, fh.RemainingLength)
41-
n, err := io.ReadFull(r, packetBytes)
54+
// variable header
55+
// Protocol Name
56+
_, err = mqttproto.DecodeString(r)
4257
if err != nil {
43-
return nil, err
58+
return 0, err
4459
}
45-
if n != fh.RemainingLength {
46-
return nil, fmt.Errorf("failed to read encoded data, read %d from %d", n, fh.RemainingLength)
47-
}
48-
err = cp.Unpack(bytes.NewBuffer(packetBytes))
49-
return cp, err
50-
}
51-
52-
func NewControlPacket(packetType byte) ControlPacket {
53-
switch packetType {
54-
case CONNECT:
55-
return &ConnectPacket{FixedHeader: FixedHeader{MessageType: CONNECT}}
56-
case CONNACK:
57-
return &ConnackPacket{FixedHeader: FixedHeader{MessageType: CONNACK}}
58-
case PUBLISH:
59-
return &PublishPacket{FixedHeader: FixedHeader{MessageType: PUBLISH}}
60-
case PUBACK:
61-
return &PubackPacket{FixedHeader: FixedHeader{MessageType: PUBACK}}
62-
case PUBREC:
63-
return &PubrecPacket{FixedHeader: FixedHeader{MessageType: PUBREC}}
64-
case PUBREL:
65-
return &PubrelPacket{FixedHeader: FixedHeader{MessageType: PUBREL}}
66-
case PUBCOMP:
67-
return &PubcompPacket{FixedHeader: FixedHeader{MessageType: PUBCOMP}}
68-
case SUBSCRIBE:
69-
return &SubscribePacket{FixedHeader: FixedHeader{MessageType: SUBSCRIBE}}
70-
case SUBACK:
71-
return &SubackPacket{FixedHeader: FixedHeader{MessageType: SUBACK}}
72-
case UNSUBSCRIBE:
73-
return &UnsubscribePacket{FixedHeader: FixedHeader{MessageType: UNSUBSCRIBE}}
74-
case UNSUBACK:
75-
return &UnsubackPacket{FixedHeader: FixedHeader{MessageType: UNSUBACK}}
76-
case PINGREQ:
77-
return &PingreqPacket{FixedHeader: FixedHeader{MessageType: PINGREQ}}
78-
case PINGRESP:
79-
return &PingrespPacket{FixedHeader: FixedHeader{MessageType: PINGRESP}}
80-
case DISCONNECT:
81-
return &DisconnectPacket{FixedHeader: FixedHeader{MessageType: DISCONNECT}}
82-
83-
}
84-
return nil
85-
}
86-
87-
func NewControlPacketWithHeader(fh FixedHeader) (ControlPacket, error) {
88-
switch fh.MessageType {
89-
case CONNECT:
90-
return &ConnectPacket{FixedHeader: fh}, nil
91-
case CONNACK:
92-
return &ConnackPacket{FixedHeader: fh}, nil
93-
case PUBLISH:
94-
return &PublishPacket{FixedHeader: fh}, nil
95-
case PUBACK:
96-
return &PubackPacket{FixedHeader: fh}, nil
97-
case PUBREC:
98-
return &PubrecPacket{FixedHeader: fh}, nil
99-
case PUBREL:
100-
return &PubrelPacket{FixedHeader: fh}, nil
101-
case PUBCOMP:
102-
return &PubcompPacket{FixedHeader: fh}, nil
103-
case SUBSCRIBE:
104-
return &SubscribePacket{FixedHeader: fh}, nil
105-
case SUBACK:
106-
return &SubackPacket{FixedHeader: fh}, nil
107-
case UNSUBSCRIBE:
108-
return &UnsubscribePacket{FixedHeader: fh}, nil
109-
case UNSUBACK:
110-
return &UnsubackPacket{FixedHeader: fh}, nil
111-
case PINGREQ:
112-
return &PingreqPacket{FixedHeader: fh}, nil
113-
case PINGRESP:
114-
return &PingrespPacket{FixedHeader: fh}, nil
115-
case DISCONNECT:
116-
return &DisconnectPacket{FixedHeader: fh}, nil
117-
default:
118-
return nil, fmt.Errorf("unsupported packet type 0x%x", fh.MessageType)
60+
// Protocol Version
61+
version, err := mqttproto.DecodeByte(r)
62+
if err != nil {
63+
return 0, err
11964
}
65+
return version, nil
12066
}

0 commit comments

Comments
 (0)