Skip to content

Commit acf8f91

Browse files
committed
rabbitmq
1 parent bc9f08d commit acf8f91

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+12020
-8
lines changed

README.md

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ MQTT Proxy allows MQTT clients to send messages to other messaging systems
2121
* [x] [Amazon SQS](https://aws.amazon.com/sqs/)
2222
* [x] [Amazon SNS](https://aws.amazon.com/sns/)
2323
* [ ] [Amazon Kinesis](https://aws.amazon.com/kinesis/)
24+
* [ ] [RabbitMQ](https://www.rabbitmq.com/)
2425
* [ ] Others
2526
* Authentication
2627
* [x] Noop
@@ -157,7 +158,7 @@ prerequisites
157158
3. publish
158159
159160
```
160-
mosquitto_pub -L mqtt://localhost:1883/dummy -m "test qos 0" --repeat 1 -q 2
161+
mosquitto_pub -L mqtt://localhost:1883/dummy -m "test qos 2" --repeat 1 -q 2
161162
```
162163
163164
### SNS publisher
@@ -178,7 +179,36 @@ prerequisites
178179
3. publish
179180
180181
```
181-
mosquitto_pub -L mqtt://localhost:1883/dummy -m "test qos 0" --repeat 1 -q 2
182+
mosquitto_pub -L mqtt://localhost:1883/dummy -m "test qos 2" --repeat 1 -q 2
183+
```
184+
185+
186+
### RabbitMQ publisher
187+
1. Start rabbitmq and create test queue
188+
189+
```
190+
cd scripts/rabbitmq
191+
docker-compose up
192+
curl -i -u user:bitnami -H "content-type:application/json" -XPUT -d'{"durable":true}' http://localhost:15672/api/queues/%2f/test
193+
```
194+
195+
196+
2. Build and start MQTT Proxy
197+
198+
```
199+
make build
200+
./mqtt-proxy server \
201+
--mqtt.publisher.name=rabbitmq \
202+
--mqtt.publisher.rabbitmq.username=user \
203+
--mqtt.publisher.rabbitmq.password=bitnami \
204+
--mqtt.publisher.rabbitmq.default-queue=test \
205+
--mqtt.publisher.rabbitmq.confirms.exactly-once=true
206+
```
207+
208+
3. publish
209+
210+
```
211+
mosquitto_pub -L mqtt://localhost:1883/dummy -m "test qos 0" --repeat 1 -q 0
182212
```
183213
184214
### plain authenticator

cmd/server.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
pubinst "github.com/grepplabs/mqtt-proxy/pkg/publisher/instrument"
1717
pubkafka "github.com/grepplabs/mqtt-proxy/pkg/publisher/kafka"
1818
pubnoop "github.com/grepplabs/mqtt-proxy/pkg/publisher/noop"
19+
pubrabbitmq "github.com/grepplabs/mqtt-proxy/pkg/publisher/rabbitmq"
1920
pubsns "github.com/grepplabs/mqtt-proxy/pkg/publisher/sns"
2021
pubsqs "github.com/grepplabs/mqtt-proxy/pkg/publisher/sqs"
2122
httpserver "github.com/grepplabs/mqtt-proxy/pkg/server/http"
@@ -131,6 +132,27 @@ func runServer(
131132
if err != nil {
132133
return fmt.Errorf("setup sns publisher: %w", err)
133134
}
135+
case config.PublisherRabbitMQ:
136+
publisher, err = pubrabbitmq.New(logger, registry,
137+
pubrabbitmq.WithScheme(cfg.MQTT.Publisher.RabbitMQ.Scheme),
138+
pubrabbitmq.WithHost(cfg.MQTT.Publisher.RabbitMQ.Host),
139+
pubrabbitmq.WithPort(cfg.MQTT.Publisher.RabbitMQ.Port),
140+
pubrabbitmq.WithUsername(cfg.MQTT.Publisher.RabbitMQ.Username),
141+
pubrabbitmq.WithPassword(cfg.MQTT.Publisher.RabbitMQ.Password),
142+
pubrabbitmq.WithVHost(cfg.MQTT.Publisher.RabbitMQ.VHost),
143+
pubrabbitmq.WithExchange(cfg.MQTT.Publisher.RabbitMQ.Exchange),
144+
pubrabbitmq.WithConnectionTimeout(cfg.MQTT.Publisher.RabbitMQ.ConnectionTimeout),
145+
pubrabbitmq.WithRequestTimeout(cfg.MQTT.Publisher.RabbitMQ.RequestTimeout),
146+
pubrabbitmq.WithQueueMappings(cfg.MQTT.Publisher.RabbitMQ.QueueMappings),
147+
pubrabbitmq.WithDefaultQueue(cfg.MQTT.Publisher.RabbitMQ.DefaultQueue),
148+
pubrabbitmq.WithMessageFormat(cfg.MQTT.Publisher.MessageFormat),
149+
pubrabbitmq.WithPublisherConfirmsAtLeastOnce(cfg.MQTT.Publisher.RabbitMQ.PublisherConfirms.AtLeastOnce),
150+
pubrabbitmq.WithPublisherConfirmsAtMostOnce(cfg.MQTT.Publisher.RabbitMQ.PublisherConfirms.AtMostOnce),
151+
pubrabbitmq.WithPublisherConfirmsExactlyOnce(cfg.MQTT.Publisher.RabbitMQ.PublisherConfirms.ExactlyOnce),
152+
)
153+
if err != nil {
154+
return fmt.Errorf("setup rabbitmq publisher: %w", err)
155+
}
134156
default:
135157
return fmt.Errorf("unknown publisher %s", cfg.MQTT.Publisher.Name)
136158
}

go.mod

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ require (
1515
github.com/oklog/run v1.1.0
1616
github.com/prometheus/client_golang v1.13.1
1717
github.com/prometheus/common v0.37.0
18+
github.com/rabbitmq/amqp091-go v1.8.0
1819
github.com/stretchr/testify v1.8.2
1920
github.com/sykesm/zap-logfmt v0.0.4
2021
go.uber.org/atomic v1.10.0
@@ -38,6 +39,8 @@ require (
3839
github.com/go-playground/locales v0.14.0 // indirect
3940
github.com/go-playground/universal-translator v0.18.0 // indirect
4041
github.com/golang/protobuf v1.5.3 // indirect
42+
github.com/hashicorp/errwrap v1.1.0 // indirect
43+
github.com/hashicorp/go-multierror v1.1.1 // indirect
4144
github.com/leodido/go-urn v1.2.1 // indirect
4245
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
4346
github.com/pmezard/go-difflib v1.0.0 // indirect

go.sum

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,11 @@ github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+
201201
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
202202
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
203203
github.com/hamba/avro v1.5.6/go.mod h1:3vNT0RLXXpFm2Tb/5KC71ZRJlOroggq1Rcitb6k4Fr8=
204+
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
205+
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
206+
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
207+
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
208+
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
204209
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
205210
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
206211
github.com/heetch/avro v0.3.1/go.mod h1:4xn38Oz/+hiEUTpbVfGVLfvOg0yKLlRP7Q9+gJJILgA=
@@ -292,6 +297,8 @@ github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1
292297
github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
293298
github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5mo=
294299
github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4=
300+
github.com/rabbitmq/amqp091-go v1.8.0 h1:GBFy5PpLQ5jSVVSYv8ecHGqeX7UTLYR4ItQbDCss9MM=
301+
github.com/rabbitmq/amqp091-go v1.8.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc=
295302
github.com/rogpeppe/clock v0.0.0-20190514195947-2896927a307a/go.mod h1:4r5QyqhjIWCcK8DO4KMclc5Iknq5qVBAlbYYzAbUScQ=
296303
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
297304
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
@@ -334,7 +341,8 @@ go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ=
334341
go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
335342
go.uber.org/automaxprocs v1.5.1 h1:e1YG66Lrk73dn4qhg8WFSvhF0JuFQF0ERIp4rpuV8Qk=
336343
go.uber.org/automaxprocs v1.5.1/go.mod h1:BF4eumQw0P9GtnuxxovUd06vwm1o18oMzFtK66vU6XU=
337-
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
344+
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
345+
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
338346
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
339347
go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8=
340348
go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak=

pkg/config/config.go

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package config
22

33
import (
44
"fmt"
5+
"os"
56
"regexp"
67
"strings"
78
"time"
@@ -13,10 +14,11 @@ import (
1314

1415
// publisher names
1516
const (
16-
PublisherNoop = "noop"
17-
PublisherKafka = "kafka"
18-
PublisherSQS = "sqs"
19-
PublisherSNS = "sns"
17+
PublisherNoop = "noop"
18+
PublisherKafka = "kafka"
19+
PublisherSQS = "sqs"
20+
PublisherSNS = "sns"
21+
PublisherRabbitMQ = "rabbitmq"
2022
)
2123

2224
// authenticator names
@@ -103,6 +105,24 @@ type Server struct {
103105
DefaultTopicARN string `default:"" help:"Default topic ARN for MQTT publish messages."`
104106
TopicARNMappings TopicMappings `placeholder:"TOPIC_ARN=REGEX" help:"Comma separated list of topic ARNs to MQTT topic mappings."`
105107
} `embed:"" prefix:"sns."`
108+
RabbitMQ struct {
109+
Scheme string `default:"${RabbitMQSchemeDefault}" enum:"${RabbitMQSchemeEnum}" help:"Rabbitmq URI scheme. One of: [${RabbitMQSchemeEnum}]"`
110+
Host string `default:"localhost" help:"Rabbitmq host."`
111+
Port int `default:"5672" help:"Rabbitmq port." validate:"gte=1"`
112+
Username string `default:"" help:"Rabbitmq username."`
113+
Password string `default:"${RabbitMQPassword}" help:"Rabbitmq password."`
114+
VHost string `default:"/" help:"Rabbitmq vhost."`
115+
ConnectionTimeout time.Duration `default:"10s" help:"Rabbitmq connection timeout." validate:"gte=0"`
116+
RequestTimeout time.Duration `default:"3s" help:"Rabbitmq request timeout." validate:"gte=0"`
117+
Exchange string `default:"" help:"Rabbitmq exchange."`
118+
DefaultQueue string `default:"" help:"Default rabbitmq queue for MQTT publish messages."`
119+
QueueMappings TopicMappings `placeholder:"QUEUE=REGEX" help:"Comma separated list of rabbitmq queue to MQTT topic mappings."`
120+
PublisherConfirms struct {
121+
AtMostOnce bool `default:"false" help:"Publisher confirms for AT_MOST_ONCE QoS."`
122+
AtLeastOnce bool `default:"false" help:"Publisher confirms for AT_LEAST_ONCE QoS."`
123+
ExactlyOnce bool `default:"false" help:"Publisher confirms for EXACTLY_ONCE QoS."`
124+
} `embed:"" prefix:"confirms."`
125+
} `embed:"" prefix:"rabbitmq."`
106126
} `embed:"" prefix:"publisher."`
107127
} `embed:"" prefix:"mqtt."`
108128
}
@@ -116,9 +136,12 @@ func ServerVars() kong.Vars {
116136
"AuthDefault": AuthNoop,
117137
"AuthEnum": strings.Join([]string{AuthNoop, AuthPlain}, ", "),
118138
"PublisherDefault": PublisherNoop,
119-
"PublisherEnum": strings.Join([]string{PublisherNoop, PublisherKafka, PublisherSQS, PublisherSNS}, ", "),
139+
"PublisherEnum": strings.Join([]string{PublisherNoop, PublisherKafka, PublisherSQS, PublisherSNS, PublisherRabbitMQ}, ", "),
120140
"MessageFormatDefault": MessageFormatPlain,
121141
"MessageFormatEnum": strings.Join([]string{MessageFormatPlain, MessageFormatBase64, MessageFormatJson}, ", "),
142+
"RabbitMQSchemeDefault": "amqp",
143+
"RabbitMQSchemeEnum": strings.Join([]string{"amqp", "amqps"}, ", "),
144+
"RabbitMQPassword": os.Getenv("MQTT_PUBLISHER_RABBITMQ_PASSWORD"),
122145
}
123146
}
124147

pkg/config/config_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ func TestConfigValidation(t *testing.T) {
108108
s.MQTT.ListenAddress = "localhost:1883"
109109
s.MQTT.Publisher.Name = PublisherNoop
110110
s.MQTT.Publisher.Kafka.Workers = 1
111+
s.MQTT.Publisher.RabbitMQ.Port = 5672
111112
return s
112113
},
113114
},
@@ -120,6 +121,7 @@ func TestConfigValidation(t *testing.T) {
120121
s.MQTT.Publisher.Name = PublisherKafka
121122
s.MQTT.Publisher.Kafka.BootstrapServers = "localhost:9092"
122123
s.MQTT.Publisher.Kafka.Workers = 1
124+
s.MQTT.Publisher.RabbitMQ.Port = 5672
123125
return s
124126
},
125127
},
@@ -140,6 +142,7 @@ func TestConfigValidation(t *testing.T) {
140142
s.MQTT.Publisher.Kafka.BootstrapServers = "localhost:9092"
141143
s.MQTT.Publisher.Kafka.GracePeriod = 10 * time.Second
142144
s.MQTT.Publisher.Kafka.Workers = 10
145+
s.MQTT.Publisher.RabbitMQ.Port = 5672
143146
return s
144147
},
145148
},

0 commit comments

Comments
 (0)