Skip to content

Commit b75c704

Browse files
committed
[push] support multiple events per token per batch
1 parent 11ed142 commit b75c704

File tree

6 files changed

+81
-66
lines changed

6 files changed

+81
-66
lines changed

internal/sms-gateway/modules/push/fcm/client.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -52,25 +52,25 @@ func (c *Client) Open(ctx context.Context) error {
5252
return nil
5353
}
5454

55-
func (c *Client) Send(ctx context.Context, messages map[string]types.Event) (map[string]error, error) {
56-
errs := make(map[string]error, len(messages))
57-
for address, payload := range messages {
58-
eventMap, err := eventToMap(payload)
55+
func (c *Client) Send(ctx context.Context, messages []types.Message) ([]error, error) {
56+
errs := make([]error, len(messages))
57+
58+
for i, message := range messages {
59+
data, err := eventToMap(message.Event)
5960
if err != nil {
60-
errs[address] = fmt.Errorf("can't marshal event: %w", err)
61+
errs[i] = fmt.Errorf("can't marshal event: %w", err)
6162
continue
6263
}
6364

6465
_, err = c.client.Send(ctx, &messaging.Message{
65-
Data: eventMap,
66+
Data: data,
6667
Android: &messaging.AndroidConfig{
6768
Priority: "high",
6869
},
69-
Token: address,
70+
Token: message.Token,
7071
})
71-
7272
if err != nil {
73-
errs[address] = fmt.Errorf("can't send message to %s: %w", address, err)
73+
errs[i] = fmt.Errorf("can't send message: %w", err)
7474
}
7575
}
7676

internal/sms-gateway/modules/push/metrics.go

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,6 @@ import (
55
"github.com/prometheus/client_golang/prometheus/promauto"
66
)
77

8-
type RetryOutcome string
9-
10-
const (
11-
RetryOutcomeRetried RetryOutcome = "retried"
12-
RetryOutcomeMaxAttempts RetryOutcome = "max_attempts"
13-
)
14-
158
type BlacklistOperation string
169

1710
const (
@@ -21,7 +14,7 @@ const (
2114

2215
type metrics struct {
2316
enqueuedCounter *prometheus.CounterVec
24-
retriesCounter *prometheus.CounterVec
17+
retriesCounter prometheus.Counter
2518
blacklistCounter *prometheus.CounterVec
2619
errorsCounter *prometheus.CounterVec
2720
}
@@ -35,12 +28,12 @@ func newMetrics() *metrics {
3528
Help: "Total number of messages enqueued",
3629
}, []string{"event"}),
3730

38-
retriesCounter: promauto.NewCounterVec(prometheus.CounterOpts{
31+
retriesCounter: promauto.NewCounter(prometheus.CounterOpts{
3932
Namespace: "sms",
4033
Subsystem: "push",
4134
Name: "retries_total",
4235
Help: "Total retry attempts",
43-
}, []string{"outcome"}),
36+
}),
4437

4538
blacklistCounter: promauto.NewCounterVec(prometheus.CounterOpts{
4639
Namespace: "sms",
@@ -62,8 +55,8 @@ func (m *metrics) IncEnqueued(event string) {
6255
m.enqueuedCounter.WithLabelValues(event).Inc()
6356
}
6457

65-
func (m *metrics) IncRetry(outcome RetryOutcome) {
66-
m.retriesCounter.WithLabelValues(string(outcome)).Inc()
58+
func (m *metrics) IncRetry() {
59+
m.retriesCounter.Inc()
6760
}
6861

6962
func (m *metrics) IncBlacklist(operation BlacklistOperation) {

internal/sms-gateway/modules/push/service.go

Lines changed: 39 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -130,77 +130,90 @@ func (s *Service) sendAll(ctx context.Context) {
130130
return
131131
}
132132

133-
wrappers := lo.MapEntries(
134-
rawEvents,
135-
func(key string, value []byte) (string, *eventWrapper) {
133+
wrappers := lo.FilterMap(
134+
lo.Values(rawEvents),
135+
func(value []byte, _ int) (*eventWrapper, bool) {
136136
wrapper := new(eventWrapper)
137137
if err := wrapper.deserialize(value); err != nil {
138138
s.metrics.IncError(1)
139-
s.logger.Error("Failed to deserialize event wrapper", zap.String("key", key), zap.Binary("value", value), zap.Error(err))
140-
return "", nil
139+
s.logger.Error("Failed to deserialize event wrapper", zap.Binary("value", value), zap.Error(err))
140+
return nil, false
141141
}
142142

143-
return wrapper.Token, wrapper
143+
return wrapper, true
144144
},
145145
)
146-
delete(wrappers, "")
147146

148-
messages := lo.MapValues(
147+
messages := lo.Map(
149148
wrappers,
150-
func(value *eventWrapper, key string) Event {
151-
return value.Event
149+
func(wrapper *eventWrapper, _ int) types.Message {
150+
return types.Message{
151+
Token: wrapper.Token,
152+
Event: wrapper.Event,
153+
}
152154
},
153155
)
154156

155-
s.logger.Info("Sending messages", zap.Int("count", len(messages)))
157+
totalMessages := len(messages)
158+
if totalMessages == 0 {
159+
return
160+
}
161+
162+
s.logger.Info("sending messages", zap.Int("total", totalMessages))
163+
156164
sendCtx, cancel := context.WithTimeout(ctx, s.config.Timeout)
157165
defer cancel()
158-
159166
errs, err := s.client.Send(sendCtx, messages)
160167
if len(errs) == 0 && err == nil {
161-
s.logger.Info("Messages sent successfully", zap.Int("count", len(messages)))
168+
s.logger.Info("messages sent successfully", zap.Int("total", totalMessages))
162169
return
163170
}
164171

165172
if err != nil {
166-
s.metrics.IncError(len(messages))
167-
s.logger.Error("Can't send messages", zap.Error(err))
173+
s.metrics.IncError(totalMessages)
174+
s.logger.Error("failed to send messages", zap.Int("total", totalMessages), zap.Error(err))
168175
return
169176
}
170177

171-
s.metrics.IncError(len(errs))
178+
totalErrors := lo.CountBy(errs, func(err error) bool { return err != nil })
179+
s.metrics.IncError(totalErrors)
180+
181+
for i, err := range errs {
182+
if err == nil {
183+
continue
184+
}
172185

173-
for token, sendErr := range errs {
174-
s.logger.Error("Can't send message", zap.Error(sendErr), zap.String("token", token))
186+
wrapper := wrappers[i]
187+
token := wrapper.Token
175188

176-
wrapper := wrappers[token]
177189
wrapper.Retries++
178190

179191
if wrapper.Retries >= maxRetries {
180192
if err := s.blacklist.Set(ctx, token, []byte{}, cacheImpl.WithTTL(blacklistTimeout)); err != nil {
181-
s.logger.Warn("Can't add to blacklist", zap.String("token", token), zap.Error(err))
193+
s.logger.Warn("failed to blacklist", zap.String("token", token), zap.Error(err))
194+
continue
182195
}
183196

184197
s.metrics.IncBlacklist(BlacklistOperationAdded)
185-
s.metrics.IncRetry(RetryOutcomeMaxAttempts)
186-
s.logger.Warn("Retries exceeded, blacklisting token",
198+
s.logger.Warn("retries exceeded, blacklisting token",
187199
zap.String("token", token),
188-
zap.Duration("ttl", blacklistTimeout))
200+
zap.Duration("ttl", blacklistTimeout),
201+
)
189202
continue
190203
}
191204

192205
wrapperData, err := wrapper.serialize()
193206
if err != nil {
194207
s.metrics.IncError(1)
195-
s.logger.Error("Can't serialize event wrapper", zap.Error(err))
208+
s.logger.Error("failed to serialize event wrapper", zap.Error(err))
196209
continue
197210
}
198211

199212
if setErr := s.events.SetOrFail(ctx, wrapper.key(), wrapperData); setErr != nil {
200-
s.logger.Warn("Can't set message to cache", zap.Error(setErr))
213+
s.logger.Warn("failed to set message to cache", zap.String("key", wrapper.key()), zap.Error(setErr))
201214
continue
202215
}
203216

204-
s.metrics.IncRetry(RetryOutcomeRetried)
217+
s.metrics.IncRetry()
205218
}
206219
}

internal/sms-gateway/modules/push/types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ type Event = types.Event
1818

1919
type client interface {
2020
Open(ctx context.Context) error
21-
Send(ctx context.Context, messages map[string]Event) (map[string]error, error)
21+
Send(ctx context.Context, messages []types.Message) ([]error, error)
2222
Close(ctx context.Context) error
2323
}
2424

internal/sms-gateway/modules/push/types/types.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@ import (
44
"github.com/android-sms-gateway/client-go/smsgateway"
55
)
66

7+
type Message struct {
8+
Token string
9+
Event Event
10+
}
11+
712
type Event struct {
813
Type smsgateway.PushEventType `json:"type"`
914
Data map[string]string `json:"data"`

internal/sms-gateway/modules/push/upstream/client.go

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111

1212
"github.com/android-sms-gateway/client-go/smsgateway"
1313
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/push/types"
14-
"github.com/capcom6/go-helpers/maps"
14+
"github.com/samber/lo"
1515
)
1616

1717
const BASE_URL = "https://api.sms-gate.app/upstream/v1"
@@ -42,18 +42,19 @@ func (c *Client) Open(ctx context.Context) error {
4242
return nil
4343
}
4444

45-
func (c *Client) Send(ctx context.Context, messages map[string]types.Event) (map[string]error, error) {
46-
payload := make(smsgateway.UpstreamPushRequest, 0, len(messages))
45+
func (c *Client) Send(ctx context.Context, messages []types.Message) ([]error, error) {
46+
payload := lo.Map(
47+
messages,
48+
func(item types.Message, _ int) smsgateway.PushNotification {
49+
return smsgateway.PushNotification{
50+
Token: item.Token,
51+
Event: item.Event.Type,
52+
Data: item.Event.Data,
53+
}
54+
},
55+
)
4756

48-
for address, data := range messages {
49-
payload = append(payload, smsgateway.PushNotification{
50-
Token: address,
51-
Event: data.Type,
52-
Data: data.Data,
53-
})
54-
}
55-
56-
payloadBytes, err := json.Marshal(payload)
57+
payloadBytes, err := json.Marshal(smsgateway.UpstreamPushRequest(payload))
5758

5859
if err != nil {
5960
return nil, fmt.Errorf("can't marshal payload: %w", err)
@@ -65,11 +66,11 @@ func (c *Client) Send(ctx context.Context, messages map[string]types.Event) (map
6566
}
6667

6768
req.Header.Set("Content-Type", "application/json")
68-
req.Header.Set("User-Agent", "android-sms-gateway/1.x (server; golang)")
69+
req.Header.Set("User-Agent", "sms-gate/1.x (server; golang)")
6970

7071
resp, err := c.client.Do(req)
7172
if err != nil {
72-
return c.mapErrors(messages, fmt.Errorf("can't send request: %w", err)), nil
73+
return nil, fmt.Errorf("can't send request: %w", err)
7374
}
7475

7576
defer func() {
@@ -84,10 +85,13 @@ func (c *Client) Send(ctx context.Context, messages map[string]types.Event) (map
8485
return nil, nil
8586
}
8687

87-
func (c *Client) mapErrors(messages map[string]types.Event, err error) map[string]error {
88-
return maps.MapValues(messages, func(e types.Event) error {
89-
return err
90-
})
88+
func (c *Client) mapErrors(messages []types.Message, err error) []error {
89+
return lo.Map(
90+
messages,
91+
func(_ types.Message, _ int) error {
92+
return err
93+
},
94+
)
9195
}
9296

9397
func (c *Client) Close(ctx context.Context) error {

0 commit comments

Comments
 (0)