@@ -5,14 +5,19 @@ import (
55 "fmt"
66 "time"
77
8+ "github.com/android-sms-gateway/server/internal/sms-gateway/cache"
89 "github.com/android-sms-gateway/server/internal/sms-gateway/modules/push/types"
9- "github.com/capcom6/go-helpers /cache"
10- "github.com/capcom6/go-helpers/maps "
10+ cacheImpl "github.com/android-sms-gateway/server/pkg /cache"
11+ "github.com/samber/lo "
1112
12- "go.uber.org/fx"
1313 "go.uber.org/zap"
1414)
1515
16+ const (
17+ cachePrefixEvents = "events:"
18+ cachePrefixBlacklist = "blacklist:"
19+ )
20+
1621type Config struct {
1722 Mode Mode
1823
@@ -22,53 +27,51 @@ type Config struct {
2227 Timeout time.Duration
2328}
2429
25- type Params struct {
26- fx.In
27-
28- Config Config
29-
30- Client client
31- Metrics * metrics
32-
33- Logger * zap.Logger
34- }
35-
3630type Service struct {
3731 config Config
3832
39- client client
40- metrics * metrics
41-
42- cache * cache.Cache [eventWrapper ]
43- blacklist * cache.Cache [struct {}]
33+ client client
34+ events cache.Cache
35+ blacklist cache.Cache
4436
45- logger * zap.Logger
37+ metrics * metrics
38+ logger * zap.Logger
4639}
4740
48- func New (params Params ) * Service {
49- if params .Config .Timeout == 0 {
50- params .Config .Timeout = time .Second
41+ func New (
42+ config Config ,
43+ client client ,
44+ cacheFactory cache.Factory ,
45+ metrics * metrics ,
46+ logger * zap.Logger ,
47+ ) (* Service , error ) {
48+ events , err := cacheFactory .New (cachePrefixEvents )
49+ if err != nil {
50+ return nil , fmt .Errorf ("can't create events cache: %w" , err )
5151 }
52- if params .Config .Debounce < 5 * time .Second {
53- params .Config .Debounce = 5 * time .Second
52+
53+ blacklist , err := cacheFactory .New (cachePrefixBlacklist )
54+ if err != nil {
55+ return nil , fmt .Errorf ("can't create blacklist cache: %w" , err )
5456 }
5557
56- return & Service {
57- config : params . Config ,
58+ config . Timeout = max ( config . Timeout , time . Second )
59+ config . Debounce = max ( config . Debounce , 5 * time . Second )
5860
59- client : params . Client ,
60- metrics : params . Metrics ,
61+ return & Service {
62+ config : config ,
6163
62- cache : cache.New [eventWrapper ](cache.Config {}),
63- blacklist : cache.New [struct {}](cache.Config {
64- TTL : blacklistTimeout ,
65- }),
64+ client : client ,
65+ events : events ,
66+ blacklist : blacklist ,
6667
67- logger : params .Logger ,
68- }
68+ metrics : metrics ,
69+ logger : logger ,
70+ }, nil
6971}
7072
71- // Run runs the service with the provided context if a debounce is set.
73+ // Run starts a ticker that triggers the sendAll function every debounce interval.
74+ // It runs indefinitely until the provided context is canceled.
7275func (s * Service ) Run (ctx context.Context ) {
7376 ticker := time .NewTicker (s .config .Debounce )
7477 defer ticker .Stop ()
@@ -85,19 +88,28 @@ func (s *Service) Run(ctx context.Context) {
8588
8689// Enqueue adds the data to the cache and immediately sends all messages if the debounce is 0.
8790func (s * Service ) Enqueue (token string , event types.Event ) error {
88- if _ , err := s .blacklist .Get (token ); err == nil {
91+ ctx , cancel := context .WithTimeout (context .Background (), s .config .Timeout )
92+ defer cancel ()
93+
94+ if _ , err := s .blacklist .Get (ctx , token ); err == nil {
8995 s .metrics .IncBlacklist (BlacklistOperationSkipped )
9096 s .logger .Debug ("Skipping blacklisted token" , zap .String ("token" , token ))
9197 return nil
9298 }
9399
94100 wrapper := eventWrapper {
95- token : token ,
96- event : & event ,
97- retries : 0 ,
101+ Token : token ,
102+ Event : event ,
103+ Retries : 0 ,
104+ }
105+ wrapperData , err := wrapper .serialize ()
106+ if err != nil {
107+ s .metrics .IncError (1 )
108+ return fmt .Errorf ("can't serialize event wrapper: %w" , err )
98109 }
99110
100- if err := s .cache .Set (token , wrapper ); err != nil {
111+ if err := s .events .Set (ctx , wrapper .key (), wrapperData ); err != nil {
112+ s .metrics .IncError (1 )
101113 return fmt .Errorf ("can't add message to cache: %w" , err )
102114 }
103115
@@ -108,20 +120,43 @@ func (s *Service) Enqueue(token string, event types.Event) error {
108120
109121// sendAll sends messages to all targets from the cache after initializing the service.
110122func (s * Service ) sendAll (ctx context.Context ) {
111- targets := s .cache .Drain ()
112- if len (targets ) == 0 {
123+ rawEvents , err := s .events .Drain (ctx )
124+ if err != nil {
125+ s .logger .Error ("Can't drain cache" , zap .Error (err ))
113126 return
114127 }
115128
116- messages := maps .MapValues (targets , func (w eventWrapper ) types.Event {
117- return * w .event
118- })
129+ if len (rawEvents ) == 0 {
130+ return
131+ }
132+
133+ wrappers := lo .MapEntries (
134+ rawEvents ,
135+ func (key string , value []byte ) (string , * eventWrapper ) {
136+ wrapper := new (eventWrapper )
137+ if err := wrapper .deserialize (value ); err != nil {
138+ s .metrics .IncError (1 )
139+ s .logger .Error ("Failed to deserialize event wrapper" , zap .String ("key" , key ), zap .Binary ("value" , value ), zap .Error (err ))
140+ return "" , nil
141+ }
142+
143+ return wrapper .Token , wrapper
144+ },
145+ )
146+ delete (wrappers , "" )
147+
148+ messages := lo .MapValues (
149+ wrappers ,
150+ func (value * eventWrapper , key string ) Event {
151+ return value .Event
152+ },
153+ )
119154
120155 s .logger .Info ("Sending messages" , zap .Int ("count" , len (messages )))
121- ctx , cancel := context .WithTimeout (ctx , s .config .Timeout )
156+ sendCtx , cancel := context .WithTimeout (ctx , s .config .Timeout )
122157 defer cancel ()
123158
124- errs , err := s .client .Send (ctx , messages )
159+ errs , err := s .client .Send (sendCtx , messages )
125160 if len (errs ) == 0 && err == nil {
126161 s .logger .Info ("Messages sent successfully" , zap .Int ("count" , len (messages )))
127162 return
@@ -138,11 +173,11 @@ func (s *Service) sendAll(ctx context.Context) {
138173 for token , sendErr := range errs {
139174 s .logger .Error ("Can't send message" , zap .Error (sendErr ), zap .String ("token" , token ))
140175
141- wrapper := targets [token ]
142- wrapper .retries ++
176+ wrapper := wrappers [token ]
177+ wrapper .Retries ++
143178
144- if wrapper .retries >= maxRetries {
145- if err := s .blacklist .Set (token , struct {}{} ); err != nil {
179+ if wrapper .Retries >= maxRetries {
180+ if err := s .blacklist .Set (ctx , token , [] byte {}, cacheImpl . WithTTL ( blacklistTimeout ) ); err != nil {
146181 s .logger .Warn ("Can't add to blacklist" , zap .String ("token" , token ), zap .Error (err ))
147182 }
148183
@@ -154,8 +189,16 @@ func (s *Service) sendAll(ctx context.Context) {
154189 continue
155190 }
156191
157- if setErr := s .cache .SetOrFail (token , wrapper ); setErr != nil {
158- s .logger .Info ("Can't set message to cache" , zap .Error (setErr ))
192+ wrapperData , err := wrapper .serialize ()
193+ if err != nil {
194+ s .metrics .IncError (1 )
195+ s .logger .Error ("Can't serialize event wrapper" , zap .Error (err ))
196+ continue
197+ }
198+
199+ if setErr := s .events .SetOrFail (ctx , wrapper .key (), wrapperData ); setErr != nil {
200+ s .logger .Warn ("Can't set message to cache" , zap .Error (setErr ))
201+ continue
159202 }
160203
161204 s .metrics .IncRetry (RetryOutcomeRetried )
0 commit comments