Skip to content
This repository was archived by the owner on May 29, 2024. It is now read-only.

Commit 999087e

Browse files
committed
Publish SNS event on new alert
1 parent d2bcad1 commit 999087e

File tree

11 files changed

+234
-13
lines changed

11 files changed

+234
-13
lines changed

config.env.template

+2
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ P0_PAGERDUTY_ALERT_EVENTS_URL=
3232
P1_PAGERDUTY_INTEGRATION_KEY=
3333
P1_PAGERDUTY_ALERT_EVENTS_URL=
3434

35+
SNS_TOPIC_ARN=
36+
3537
# Metrics configurations
3638
METRICS_HOST=localhost
3739
METRICS_PORT=7300

go.mod

+2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ require (
2525
github.com/Microsoft/go-winio v0.6.1 // indirect
2626
github.com/VictoriaMetrics/fastcache v1.10.0 // indirect
2727
github.com/ajg/form v1.5.1 // indirect
28+
github.com/aws/aws-sdk-go v1.50.3 // indirect
2829
github.com/benbjohnson/clock v1.3.5 // indirect
2930
github.com/beorn7/perks v1.0.1 // indirect
3031
github.com/bits-and-blooms/bitset v1.7.0 // indirect
@@ -104,6 +105,7 @@ require (
104105
github.com/jbenet/goprocess v0.1.4 // indirect
105106
github.com/jinzhu/inflection v1.0.0 // indirect
106107
github.com/jinzhu/now v1.1.5 // indirect
108+
github.com/jmespath/go-jmespath v0.4.0 // indirect
107109
github.com/klauspost/compress v1.16.7 // indirect
108110
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
109111
github.com/koron/go-ssdp v0.0.4 // indirect

go.sum

+5
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ github.com/allegro/bigcache v1.2.1 h1:hg1sY1raCwic3Vnsvje6TT7/pnZba83LeFck5NrFKS
3232
github.com/allegro/bigcache v1.2.1/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
3333
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c=
3434
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
35+
github.com/aws/aws-sdk-go v1.50.3 h1:NnXC/ukOakZbBwQcwAzkAXYEB4SbWboP9TFx9vvhIrE=
36+
github.com/aws/aws-sdk-go v1.50.3/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk=
3537
github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g=
3638
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
3739
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
@@ -437,6 +439,9 @@ github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD
437439
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
438440
github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
439441
github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
442+
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
443+
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
444+
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
440445
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
441446
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
442447
github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ=

internal/alert/manager.go

+32-6
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ type Config struct {
2727
RoutingCfgPath string
2828
PagerdutyAlertEventsURL string
2929
RoutingParams *core.AlertRoutingParams
30+
SNSConfig *client.SNSConfig
3031
}
3132

3233
// alertManager ... Alert manager implementation
@@ -39,28 +40,30 @@ type alertManager struct {
3940
interpolator *Interpolator
4041
cdHandler CoolDownHandler
4142
cm RoutingDirectory
43+
sns client.SNSClient
4244

4345
logger *zap.Logger
4446
metrics metrics.Metricer
4547
alertTransit chan core.Alert
4648
}
4749

4850
// NewManager ... Instantiates a new alert manager
49-
func NewManager(ctx context.Context, cfg *Config, cm RoutingDirectory) Manager {
51+
func NewManager(ctx context.Context, cfg *Config, cm RoutingDirectory, sns client.SNSClient) Manager {
5052
// NOTE - Consider constructing dependencies in higher level
5153
// abstraction and passing them in
5254

5355
ctx, cancel := context.WithCancel(ctx)
5456

57+
// NOTE - Consider adding support for additional sns configurations
5558
am := &alertManager{
56-
ctx: ctx,
57-
cdHandler: NewCoolDownHandler(),
58-
cfg: cfg,
59-
cm: cm,
60-
59+
ctx: ctx,
60+
cdHandler: NewCoolDownHandler(),
61+
cfg: cfg,
62+
cm: cm,
6163
cancel: cancel,
6264
interpolator: new(Interpolator),
6365
store: NewStore(),
66+
sns: sns,
6467
alertTransit: make(chan core.Alert),
6568
metrics: metrics.WithContext(ctx),
6669
logger: logging.WithContext(ctx),
@@ -142,6 +145,24 @@ func (am *alertManager) handlePagerDutyPost(alert core.Alert) error {
142145
return nil
143146
}
144147

148+
func (am *alertManager) handleSNSPublish(alert core.Alert, policy *core.AlertPolicy) error {
149+
event := &client.AlertEventTrigger{
150+
Message: am.interpolator.SlackMessage(alert, policy.Msg),
151+
DedupKey: alert.PathID,
152+
Severity: alert.Sev,
153+
}
154+
155+
resp, err := am.sns.PostEvent(am.ctx, event)
156+
if err != nil {
157+
return err
158+
}
159+
160+
if resp.Status != core.SuccessStatus {
161+
return fmt.Errorf("client %s could not post to sns: %s", am.sns.GetName(), resp.Message)
162+
}
163+
return nil
164+
}
165+
145166
// EventLoop ... Event loop for alert manager subsystem
146167
func (am *alertManager) EventLoop() error {
147168
ticker := time.NewTicker(time.Second * 1)
@@ -202,6 +223,11 @@ func (am *alertManager) HandleAlert(alert core.Alert, policy *core.AlertPolicy)
202223
if err := am.handlePagerDutyPost(alert); err != nil {
203224
am.logger.Error("could not post to pagerduty", zap.Error(err))
204225
}
226+
227+
if err := am.handleSNSPublish(alert, policy); err != nil {
228+
am.logger.Error("could not publish to sns", zap.Error(err))
229+
}
230+
205231
}
206232

207233
// Shutdown ... Shuts down the alert manager subsystem

internal/alert/manager_test.go

+28-5
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,9 @@ func TestEventLoop(t *testing.T) {
3939
description: "Test low sev alert sends to slack",
4040
test: func(t *testing.T) {
4141
cm := alert.NewRoutingDirectory(cfg.AlertConfig)
42-
am := alert.NewManager(ctx, cfg.AlertConfig, cm)
42+
sns := mocks.NewMockSNSClient(c)
43+
44+
am := alert.NewManager(ctx, cfg.AlertConfig, cm, sns)
4345

4446
go func() {
4547
_ = am.EventLoop()
@@ -76,6 +78,12 @@ func TestEventLoop(t *testing.T) {
7678
}, nil).Times(1)
7779
}
7880

81+
sns.EXPECT().PostEvent(gomock.Any(), gomock.Any()).Return(
82+
&client.AlertAPIResponse{
83+
Message: "test",
84+
Status: core.SuccessStatus,
85+
}, nil).AnyTimes()
86+
7987
ingress <- alert
8088
time.Sleep(1 * time.Second)
8189
id := core.NewUUID()
@@ -93,7 +101,8 @@ func TestEventLoop(t *testing.T) {
93101
description: "Test medium sev alert sends to just PagerDuty",
94102
test: func(t *testing.T) {
95103
cm := alert.NewRoutingDirectory(cfg.AlertConfig)
96-
am := alert.NewManager(ctx, cfg.AlertConfig, cm)
104+
sns := mocks.NewMockSNSClient(c)
105+
am := alert.NewManager(ctx, cfg.AlertConfig, cm, sns)
97106

98107
go func() {
99108
_ = am.EventLoop()
@@ -130,6 +139,12 @@ func TestEventLoop(t *testing.T) {
130139
}, nil).Times(1)
131140
}
132141

142+
sns.EXPECT().PostEvent(gomock.Any(), gomock.Any()).Return(
143+
&client.AlertAPIResponse{
144+
Message: "test",
145+
Status: core.SuccessStatus,
146+
}, nil).AnyTimes()
147+
133148
ingress <- alert
134149
time.Sleep(1 * time.Second)
135150
id := core.UUID{}
@@ -147,7 +162,8 @@ func TestEventLoop(t *testing.T) {
147162
description: "Test high sev alert sends to both slack and PagerDuty",
148163
test: func(t *testing.T) {
149164
cm := alert.NewRoutingDirectory(cfg.AlertConfig)
150-
am := alert.NewManager(ctx, cfg.AlertConfig, cm)
165+
sns := mocks.NewMockSNSClient(c)
166+
am := alert.NewManager(ctx, cfg.AlertConfig, cm, sns)
151167

152168
go func() {
153169
_ = am.EventLoop()
@@ -181,7 +197,7 @@ func TestEventLoop(t *testing.T) {
181197
&client.AlertAPIResponse{
182198
Message: "test",
183199
Status: core.SuccessStatus,
184-
}, nil).Times(1)
200+
}, nil)
185201
}
186202

187203
for _, cli := range cm.GetSlackClients(core.HIGH) {
@@ -191,8 +207,15 @@ func TestEventLoop(t *testing.T) {
191207
&client.AlertAPIResponse{
192208
Message: "test",
193209
Status: core.SuccessStatus,
194-
}, nil).Times(1)
210+
}, nil)
195211
}
212+
213+
sns.EXPECT().PostEvent(gomock.Any(), gomock.Any()).Return(
214+
&client.AlertAPIResponse{
215+
Message: "test",
216+
Status: core.SuccessStatus,
217+
}, nil).AnyTimes()
218+
196219
ingress <- alert
197220
time.Sleep(1 * time.Second)
198221
id := core.UUID{}

internal/app/init.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,9 @@ func InitializeAlerting(ctx context.Context, cfg *config.Config) (alert.Manager,
7272
}
7373

7474
clientMap := alert.NewRoutingDirectory(cfg.AlertConfig)
75+
snsClient := client.NewSNSClient(cfg.AlertConfig.SNSConfig, "pessimism")
7576

76-
return alert.NewManager(ctx, cfg.AlertConfig, clientMap), nil
77+
return alert.NewManager(ctx, cfg.AlertConfig, clientMap, snsClient), nil
7778
}
7879

7980
// InitializeETL ... Performs dependency injection to build etl struct

internal/client/sns.go

+92
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
//go:generate mockgen -package mocks --destination=../mocks/mock_sns.go . SNSClient
2+
3+
package client
4+
5+
import (
6+
"context"
7+
"github.com/aws/aws-sdk-go/aws"
8+
"github.com/aws/aws-sdk-go/aws/session"
9+
"github.com/aws/aws-sdk-go/service/sns"
10+
"github.com/base-org/pessimism/internal/core"
11+
"github.com/base-org/pessimism/internal/logging"
12+
"go.uber.org/zap"
13+
"os"
14+
)
15+
16+
// SNSClient ... An interface for SNS clients to implement
17+
type SNSClient interface {
18+
AlertClient
19+
}
20+
21+
// SNSConfig ... Configuration for SNS client
22+
type SNSConfig struct {
23+
TopicArn string
24+
}
25+
26+
type snsClient struct {
27+
svc *sns.SNS
28+
name string
29+
topicArn string
30+
}
31+
32+
// NewSNSClient ... Initializer
33+
func NewSNSClient(cfg *SNSConfig, name string) SNSClient {
34+
35+
if cfg.TopicArn == "" {
36+
logging.NoContext().Warn("No SNS topic ARN provided")
37+
}
38+
39+
logging.NoContext().Debug("AWS Region", zap.String("region", os.Getenv("AWS_REGION")))
40+
41+
// Initialize a session that the SDK will use
42+
sess, err := session.NewSession()
43+
if err != nil {
44+
logging.NoContext().Error("Failed to create SNS session", zap.Error(err))
45+
return nil
46+
}
47+
48+
return &snsClient{
49+
svc: sns.New(sess),
50+
topicArn: cfg.TopicArn,
51+
name: name,
52+
}
53+
}
54+
55+
// PostEvent ... Posts an event to an SNS topic ARN
56+
func (sc snsClient) PostEvent(ctx context.Context, event *AlertEventTrigger) (*AlertAPIResponse, error) {
57+
// Publish a message to the topic
58+
result, err := sc.svc.Publish(&sns.PublishInput{
59+
MessageAttributes: getAttributesFromEvent(event),
60+
Message: &event.Message,
61+
TopicArn: &sc.topicArn,
62+
})
63+
if err != nil {
64+
return &AlertAPIResponse{
65+
Status: core.FailureStatus,
66+
Message: err.Error(),
67+
}, err
68+
}
69+
70+
return &AlertAPIResponse{
71+
Status: core.SuccessStatus,
72+
Message: *result.MessageId,
73+
}, nil
74+
}
75+
76+
// getAttributesFromEvent ... Helper method to get attributes from an AlertEventTrigger
77+
func getAttributesFromEvent(event *AlertEventTrigger) map[string]*sns.MessageAttributeValue {
78+
return map[string]*sns.MessageAttributeValue{
79+
"severity": {
80+
DataType: aws.String("String"),
81+
StringValue: aws.String(event.Severity.String()),
82+
},
83+
"dedup_key": {
84+
DataType: aws.String("String"),
85+
StringValue: aws.String(event.DedupKey.String()),
86+
},
87+
}
88+
}
89+
90+
func (sc snsClient) GetName() string {
91+
return sc.name
92+
}

internal/client/sns_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
package client

internal/config/config.go

+3
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ func NewConfig(fileName core.FilePath) *Config {
5858
RoutingCfgPath: getEnvStrWithDefault("ALERT_ROUTE_CFG_PATH", "alerts-routing.yaml"),
5959
PagerdutyAlertEventsURL: getEnvStrWithDefault("PAGERDUTY_ALERT_EVENTS_URL", ""),
6060
RoutingParams: nil, // This is populated after the config is created (see IngestAlertConfig)
61+
SNSConfig: &client.SNSConfig{
62+
TopicArn: getEnvStrWithDefault("SNS_TOPIC_ARN", ""),
63+
},
6164
},
6265

6366
ClientConfig: &client.Config{

internal/metrics/metrics.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -247,8 +247,9 @@ func (m *Metrics) RecordAlertGenerated(alert core.Alert, dest core.AlertDestinat
247247
net := alert.PathID.Network().String()
248248
h := alert.HT.String()
249249
sev := alert.Sev.String()
250+
path := alert.PathID.String()
250251

251-
m.AlertsGenerated.WithLabelValues(net, h, sev, dest.String(), clientName).Inc()
252+
m.AlertsGenerated.WithLabelValues(net, h, path, sev, dest.String(), clientName).Inc()
252253
}
253254

254255
func (m *Metrics) RecordNodeError(n core.Network) {

0 commit comments

Comments
 (0)