Skip to content

Commit 5bacbd5

Browse files
author
Alexander Félix
committed
initial release
1 parent 76c4ed5 commit 5bacbd5

13 files changed

+870
-3
lines changed

.gitignore

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,4 @@
1212
*.out
1313

1414
# Dependency directories (remove the comment below to include it)
15-
# vendor/
15+
vendor/

Makefile

+63
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
.PHONY: start-nats
2+
## Starts nats-streaming server
3+
start-nats:
4+
@docker-compose up --no-start nats
5+
docker-compose start nats
6+
7+
.PHONY: stop-nats
8+
## Stops the nats-streaming server
9+
stop-nats:
10+
docker-compose stop nats
11+
12+
.PHONY: test-nonats
13+
## runs go test, expects nats to be running
14+
test-nonats:
15+
@go clean -testcache
16+
@which gotest || go get -u github.com/rakyll/gotest
17+
@gotest -p 1 -v -mod=vendor $$(go list ./... | grep -v /vendor/)
18+
19+
.PHONY: test
20+
## runs go test, starts and stops nats
21+
test: vendor start-nats
22+
@bash -c "trap 'trap - SIGINT SIGTERM ERR; docker-compose stop nats; exit 1' SIGINT SIGTERM ERR; $(MAKE) test-nonats"
23+
@$(MAKE) stop-nats
24+
25+
####################
26+
# Helpers and misc #
27+
####################
28+
29+
# COLORS
30+
GREEN := $(shell tput -Txterm setaf 2)
31+
YELLOW := $(shell tput -Txterm setaf 3)
32+
WHITE := $(shell tput -Txterm setaf 7)
33+
RESET := $(shell tput -Txterm sgr0)
34+
35+
.PHONY: tidy
36+
## Runs go mod tidy
37+
tidy:
38+
@go mod tidy
39+
40+
.PHONY: vendor
41+
## Updates vendored deps
42+
vendor:
43+
@echo "updating vendored deps..."
44+
@go mod vendor
45+
@echo "done!"
46+
47+
.PHONY: help
48+
# Help target stolen from this comment: https://gist.github.com/prwhite/8168133#gistcomment-2278355
49+
## Show help
50+
help:
51+
@echo 'Usage:'
52+
@echo ' ${YELLOW}make${RESET} ${GREEN}<target>${RESET}'
53+
@echo ''
54+
@echo 'Targets:'
55+
@awk '/^[a-zA-Z\-\_0-9]+:/ { \
56+
helpMessage = match(lastLine, /^## (.*)/); \
57+
if (helpMessage) { \
58+
helpCommand = substr($$1, 0, index($$1, ":")-1); \
59+
helpMessage = substr(lastLine, RSTART + 3, RLENGTH); \
60+
printf " ${YELLOW}%-$(TARGET_MAX_CHAR_NUM)s${RESET} ${GREEN}%s${RESET}\n", helpCommand, helpMessage; \
61+
} \
62+
} \
63+
{ lastLine = $$0 }' $(MAKEFILE_LIST)

README.md

-2
Original file line numberDiff line numberDiff line change
@@ -1,2 +0,0 @@
1-
# stanclient
2-
Golang nats-streaming stan client

client.go

+247
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
package stanclient
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"math/rand"
7+
"os"
8+
"regexp"
9+
"runtime"
10+
"strconv"
11+
"sync"
12+
"time"
13+
14+
"github.com/avast/retry-go"
15+
"github.com/nats-io/stan.go"
16+
)
17+
18+
var errNotEnabled = errors.New("Client not configued to be enabled")
19+
20+
// Client wrapper for stan connection
21+
type Client struct {
22+
lock *sync.Mutex
23+
conn stan.Conn
24+
subscriptions map[string]stan.Subscription
25+
Logger ClientLogger
26+
reconnectFunc func() error
27+
clientID string
28+
Config
29+
}
30+
31+
// New returns a connected eventclient or error
32+
func New(config Config, logger ClientLogger, wrapID bool, reconnectFunc func() error) (*Client, error) {
33+
lgr := logger
34+
if logger == nil {
35+
lgr = &EmptyLogger{}
36+
}
37+
38+
clientID := config.ClientID
39+
if wrapID {
40+
clientID = wrapClientID(clientID)
41+
}
42+
client := &Client{
43+
Config: config,
44+
subscriptions: make(map[string]stan.Subscription),
45+
Logger: lgr,
46+
clientID: clientID,
47+
reconnectFunc: reconnectFunc,
48+
lock: &sync.Mutex{},
49+
}
50+
if config.Enabled {
51+
if err := client.connect(); err != nil {
52+
return nil, fmt.Errorf("failed to connect to %s %w", config.NatsStreamingURL, err)
53+
}
54+
}
55+
56+
return client, nil
57+
}
58+
59+
// connect to nats-streaming via stan
60+
func (c *Client) connect() error {
61+
c.lock.Lock()
62+
defer c.lock.Unlock()
63+
64+
if c.conn != nil {
65+
return nil
66+
}
67+
68+
if !c.Enabled {
69+
return errNotEnabled
70+
}
71+
72+
err := retry.Do(
73+
func() error {
74+
var retryErr error
75+
c.conn, retryErr = stan.Connect(c.ClusterID, c.clientID, stan.NatsURL(c.NatsStreamingURL),
76+
stan.SetConnectionLostHandler(func(_ stan.Conn, reason error) {
77+
c.Logger.Info("Connection lost to nats-streaming server")
78+
if c.reconnectFunc == nil {
79+
c.Logger.Fatal(fmt.Sprintf("Connection lost, reason: %v", reason))
80+
}
81+
82+
c.conn = nil
83+
err := retry.Do(
84+
func() error {
85+
if err := c.connect(); err != nil {
86+
return err
87+
}
88+
89+
c.Logger.Info("Successfully reconnected to nats-streaming server")
90+
return nil
91+
},
92+
retry.OnRetry(func(n uint, err error) {
93+
c.Logger.Info(fmt.Sprintf("Reconnection try #%d failed with: %s", n, err))
94+
}),
95+
retry.Delay(time.Duration(c.ReconnectRetry.Delay)*time.Second),
96+
retry.DelayType(retry.FixedDelay),
97+
retry.Attempts(c.ReconnectRetry.Attempts),
98+
)
99+
100+
if err != nil {
101+
c.Logger.Fatal(fmt.Sprintf("All attempts to reconnect to the streaming server failed %s", err.Error()))
102+
}
103+
104+
if err := c.reconnectFunc(); err != nil {
105+
c.Logger.Fatal(fmt.Sprintf("Reconnection func failed %s", err.Error()))
106+
}
107+
}))
108+
109+
if retryErr != nil {
110+
return fmt.Errorf("can't connect make sure a NATS Streaming Server is running at: %s %w", c.NatsStreamingURL, retryErr)
111+
}
112+
113+
return nil
114+
},
115+
retry.OnRetry(func(n uint, err error) {
116+
c.Logger.Info(fmt.Sprintf("Connect() retry failed with retry-number: %d %s", n, err.Error()))
117+
}),
118+
retry.Delay(time.Duration(c.ConnectRetry.Delay)*time.Second),
119+
retry.DelayType(retry.FixedDelay),
120+
retry.Attempts(c.ConnectRetry.Attempts),
121+
)
122+
123+
if err != nil {
124+
return fmt.Errorf("all retries failed to connect to %s: %w", c.NatsStreamingURL, err)
125+
}
126+
c.Logger.Info(fmt.Sprintf("Connected to %s clusterID: [%s] clientID: [%s]", c.NatsStreamingURL, c.ClusterID, c.clientID))
127+
return nil
128+
}
129+
130+
// Close closes the connection
131+
func (c *Client) Close() error {
132+
if c.conn != nil {
133+
return c.conn.Close()
134+
}
135+
return nil
136+
}
137+
138+
// Subscribe an incoming subscriber to clients connection
139+
func (c *Client) Subscribe(subscriber Subscriber, opts ...stan.SubscriptionOption) error {
140+
if err := c.connect(); err != nil {
141+
if errors.Is(err, errNotEnabled) {
142+
return nil
143+
}
144+
return fmt.Errorf("failed to connect %w", err)
145+
}
146+
147+
s, err := c.conn.Subscribe(
148+
subscriber.Subject(),
149+
subscriber.MsgHandler(),
150+
opts...,
151+
)
152+
if err != nil {
153+
return fmt.Errorf("error subscribing to '%s' on cluster '%s': %w", subscriber.Subject(), c.ClusterID, err)
154+
}
155+
c.subscriptions[subscriber.Subject()+"-"+subscriber.Name()] = s
156+
return nil
157+
}
158+
159+
// QueueSubscribe an incoming subscriber to clients connection
160+
func (c *Client) QueueSubscribe(subscriber Subscriber, queueGroup string, opts ...stan.SubscriptionOption) error {
161+
if err := c.connect(); err != nil {
162+
if errors.Is(err, errNotEnabled) {
163+
return nil
164+
}
165+
return fmt.Errorf("failed to connect %w", err)
166+
}
167+
168+
s, err := c.conn.QueueSubscribe(
169+
subscriber.Subject(),
170+
queueGroup,
171+
subscriber.MsgHandler(),
172+
opts...,
173+
)
174+
if err != nil {
175+
return fmt.Errorf("error queue subscribing to '%s' on cluster '%s': %w", subscriber.Subject(), c.ClusterID, err)
176+
}
177+
c.subscriptions[subscriber.Subject()+"-"+queueGroup+"-"+subscriber.Name()] = s
178+
return nil
179+
}
180+
181+
// Unsubscribe <subscriber> or special case <all> for all subscriptions. Will leave the map key with nil value for unsubscribed.
182+
func (c *Client) Unsubscribe(subscriber string) error {
183+
if err := c.connect(); err != nil {
184+
if errors.Is(err, errNotEnabled) {
185+
return nil
186+
}
187+
return fmt.Errorf("failed to reconnect %w", err)
188+
}
189+
190+
switch subscriber {
191+
case "all":
192+
for subscriber, subscription := range c.subscriptions {
193+
if subscription == nil {
194+
// already unsubscribed
195+
continue
196+
}
197+
if err := subscription.Unsubscribe(); err != nil {
198+
return fmt.Errorf("unsubscribe all failed %w", err)
199+
}
200+
c.subscriptions[subscriber] = nil
201+
c.Logger.Info(fmt.Sprintf("successfully unsubscribed subscriber %s", subscriber))
202+
}
203+
default:
204+
sub, ok := c.subscriptions[subscriber]
205+
if !ok || sub == nil {
206+
return fmt.Errorf("could not find subscription %s amongst the current subscriptions", subscriber)
207+
}
208+
if err := sub.Unsubscribe(); err != nil {
209+
return fmt.Errorf("unsubscribe %s failed %w", subscriber, err)
210+
}
211+
c.subscriptions[subscriber] = nil
212+
213+
c.Logger.Info(fmt.Sprintf("successfully unsubscribed subscriber %s", subscriber))
214+
}
215+
216+
return nil
217+
}
218+
219+
// Subscriptions return a list of current subscribtions
220+
func (c *Client) Subscriptions() []string {
221+
result := []string{}
222+
for subscription, subscriber := range c.subscriptions {
223+
if subscriber != nil {
224+
result = append(result, subscription)
225+
}
226+
}
227+
return result
228+
}
229+
230+
// wrapClientID prefixes the provided client id with the host name, to make it unique.
231+
func wrapClientID(clientID string) string {
232+
hostname, err := os.Hostname()
233+
if err != nil {
234+
hostname = "<error>"
235+
}
236+
237+
// Replace all non-alphanumeric chars with something benign that will work in the client id.
238+
re := regexp.MustCompile("[^a-zA-Z0-9]+")
239+
hostname = re.ReplaceAllString(hostname, "-")
240+
241+
// Special case for running locally, add random
242+
if runtime.GOOS == "darwin" {
243+
hostname += strconv.Itoa(rand.Intn(100))
244+
return fmt.Sprintf("%s-%s", clientID, hostname)
245+
}
246+
return fmt.Sprintf("%s-%s", clientID, hostname)
247+
}

0 commit comments

Comments
 (0)