@@ -58,52 +58,109 @@ type AlertFields struct {
5858}
5959
6060func main () {
61- socketsFolder , err := utils .MkdirJoin (plugins .WorkDir , "sockets" )
62- if err != nil {
63- _ = catcher .Error ("cannot create socket directory" , err , nil )
64- os .Exit (1 )
65- }
61+ // Recover from panics to ensure the main function doesn't terminate
62+ defer func () {
63+ if r := recover (); r != nil {
64+ _ = catcher .Error ("recovered from panic in alerts main function" , nil , map [string ]any {
65+ "panic" : r ,
66+ })
67+ // Restart the main function after a brief delay
68+ time .Sleep (5 * time .Second )
69+ go main ()
70+ }
71+ }()
72+
73+ // Initialize with retry logic instead of exiting
74+ var socketsFolder utils.Folder
75+ var err error
76+ var socketFile string
77+ var unixAddress * net.UnixAddr
78+ var listener * net.UnixListener
79+
80+ // Retry loop for initialization
81+ for {
82+ socketsFolder , err = utils .MkdirJoin (plugins .WorkDir , "sockets" )
83+ if err != nil {
84+ _ = catcher .Error ("cannot create socket directory" , err , nil )
85+ time .Sleep (5 * time .Second )
86+ continue
87+ }
6688
67- socketFile : = socketsFolder .FileJoin ("com.utmstack.alerts_correlation.sock" )
68- _ = os .Remove (socketFile )
89+ socketFile = socketsFolder .FileJoin ("com.utmstack.alerts_correlation.sock" )
90+ _ = os .Remove (socketFile )
6991
70- unixAddress , err := net .ResolveUnixAddr ("unix" , socketFile )
71- if err != nil {
72- _ = catcher .Error ("cannot resolve unix address" , err , nil )
73- os .Exit (1 )
74- }
92+ unixAddress , err = net .ResolveUnixAddr ("unix" , socketFile )
93+ if err != nil {
94+ _ = catcher .Error ("cannot resolve unix address" , err , nil )
95+ time .Sleep (5 * time .Second )
96+ continue
97+ }
7598
76- listener , err := net .ListenUnix ("unix" , unixAddress )
77- if err != nil {
78- _ = catcher .Error ("cannot listen to unix socket" , err , nil )
79- os .Exit (1 )
99+ listener , err = net .ListenUnix ("unix" , unixAddress )
100+ if err != nil {
101+ _ = catcher .Error ("cannot listen to unix socket" , err , nil )
102+ time .Sleep (5 * time .Second )
103+ continue
104+ }
105+
106+ // If we got here, initialization was successful
107+ break
80108 }
81109
82110 grpcServer := grpc .NewServer ()
83111 plugins .RegisterCorrelationServer (grpcServer , & correlationServer {})
84112
85- osUrl := plugins .PluginCfg ("com.utmstack" , false ).Get ("opensearch" ).String ()
86- err = opensearch .Connect ([]string {osUrl })
87- if err != nil {
88- _ = catcher .Error ("cannot connect to OpenSearch" , err , nil )
89- os .Exit (1 )
113+ // Connect to OpenSearch with retry logic
114+ for {
115+ osUrl := plugins .PluginCfg ("com.utmstack" , false ).Get ("opensearch" ).String ()
116+ err = opensearch .Connect ([]string {osUrl })
117+ if err != nil {
118+ _ = catcher .Error ("cannot connect to OpenSearch" , err , nil )
119+ time .Sleep (5 * time .Second )
120+ continue
121+ }
122+ // If we got here, connection was successful
123+ break
90124 }
91125
126+ // Serve with error handling
92127 if err := grpcServer .Serve (listener ); err != nil {
93128 _ = catcher .Error ("cannot serve grpc" , err , nil )
94- os .Exit (1 )
129+ // Instead of exiting, restart the main function
130+ time .Sleep (5 * time .Second )
131+ go main ()
132+ return
95133 }
96134}
97135
98136func (p * correlationServer ) Correlate (_ context.Context ,
99137 alert * plugins.Alert ) (* emptypb.Empty , error ) {
138+ // Recover from panics to ensure the method doesn't terminate
139+ defer func () {
140+ if r := recover (); r != nil {
141+ _ = catcher .Error ("recovered from panic in Correlate method" , nil , map [string ]any {
142+ "panic" : r ,
143+ "alert" : alert .Name ,
144+ })
145+ }
146+ }()
100147
101148 parentId := getPreviousAlertId (alert )
102149
103150 return nil , newAlert (alert , parentId )
104151}
105152
106153func getPreviousAlertId (alert * plugins.Alert ) string {
154+ // Recover from panics to ensure the function doesn't terminate
155+ defer func () {
156+ if r := recover (); r != nil {
157+ _ = catcher .Error ("recovered from panic in getPreviousAlertId" , nil , map [string ]any {
158+ "panic" : r ,
159+ "alert" : alert .Name ,
160+ })
161+ }
162+ }()
163+
107164 if len (alert .DeduplicateBy ) == 0 {
108165 return ""
109166 }
@@ -174,24 +231,53 @@ func getPreviousAlertId(alert *plugins.Alert) string {
174231 Source : & opensearch.Source {Excludes : []string {}},
175232 }
176233
177- ctx , cancel := context .WithTimeout (context .Background (), 10 * time .Second )
234+ // Retry logic for search operation
235+ maxRetries := 3
236+ retryDelay := 2 * time .Second
178237
179- defer cancel ()
238+ for retry := 0 ; retry < maxRetries ; retry ++ {
239+ ctx , cancel := context .WithTimeout (context .Background (), 10 * time .Second )
240+ defer cancel ()
180241
181- hits , err := searchQuery .SearchIn (ctx , []string {opensearch .BuildIndexPattern ("v11" , "alert" )})
182- if err != nil {
183- _ = catcher .Error ("cannot search for previous alerts" , err , map [string ]any {"alert" : alert .Name })
184- return ""
185- }
242+ hits , err := searchQuery .SearchIn (ctx , []string {opensearch .BuildIndexPattern ("v11" , "alert" )})
243+ if err == nil {
244+ if hits .Hits .Total .Value != 0 {
245+ return hits .Hits .Hits [0 ].ID
246+ }
247+ return ""
248+ }
186249
187- if hits .Hits .Total .Value != 0 {
188- return hits .Hits .Hits [0 ].ID
250+ _ = catcher .Error ("cannot search for previous alerts, retrying" , err , map [string ]any {
251+ "alert" : alert .Name ,
252+ "retry" : retry + 1 ,
253+ "maxRetries" : maxRetries ,
254+ })
255+
256+ if retry < maxRetries - 1 {
257+ time .Sleep (retryDelay )
258+ // Increase delay for next retry
259+ retryDelay *= 2
260+ }
189261 }
190262
263+ // If we get here, all retries failed
264+ _ = catcher .Error ("all retries failed when searching for previous alerts" , nil , map [string ]any {
265+ "alert" : alert .Name ,
266+ })
191267 return ""
192268}
193269
194270func newAlert (alert * plugins.Alert , parentId string ) error {
271+ // Recover from panics to ensure the function doesn't terminate
272+ defer func () {
273+ if r := recover (); r != nil {
274+ _ = catcher .Error ("recovered from panic in newAlert" , nil , map [string ]any {
275+ "panic" : r ,
276+ "alert" : alert .Name ,
277+ })
278+ }
279+ }()
280+
195281 var severityN int
196282 var severityLabel string
197283 switch alert .Severity {
@@ -239,16 +325,37 @@ func newAlert(alert *plugins.Alert, parentId string) error {
239325 DeduplicatedBy : alert .DeduplicateBy ,
240326 }
241327
242- cancelableContext , cancel := context .WithTimeout (context .Background (), 10 * time .Second )
328+ // Retry logic for indexing operation
329+ maxRetries := 3
330+ retryDelay := 2 * time .Second
243331
244- defer cancel ()
332+ for retry := 0 ; retry < maxRetries ; retry ++ {
333+ cancelableContext , cancel := context .WithTimeout (context .Background (), 10 * time .Second )
334+ defer cancel ()
245335
246- err := opensearch .IndexDoc (cancelableContext , a , opensearch .BuildCurrentIndex ("v11" , "alert" ), alert .Id )
247- if err != nil {
248- return catcher .Error ("cannot index document" , err , map [string ]any {
249- "alert" : alert .Name ,
336+ err := opensearch .IndexDoc (cancelableContext , a , opensearch .BuildCurrentIndex ("v11" , "alert" ), alert .Id )
337+ if err == nil {
338+ return nil
339+ }
340+
341+ _ = catcher .Error ("cannot index document, retrying" , err , map [string ]any {
342+ "alert" : alert .Name ,
343+ "retry" : retry + 1 ,
344+ "maxRetries" : maxRetries ,
250345 })
346+
347+ if retry < maxRetries - 1 {
348+ time .Sleep (retryDelay )
349+ // Increase delay for next retry
350+ retryDelay *= 2
351+ } else {
352+ // If all retries failed, return the error
353+ return catcher .Error ("all retries failed when indexing document" , err , map [string ]any {
354+ "alert" : alert .Name ,
355+ })
356+ }
251357 }
252358
359+ // This should never be reached, but just in case
253360 return nil
254361}
0 commit comments