Skip to content

Commit 331ef5c

Browse files
authored
Add GetDX processor (#15)
1 parent a76f0e2 commit 331ef5c

File tree

14 files changed

+519
-160
lines changed

14 files changed

+519
-160
lines changed

README.md

+21-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,27 @@ The following command-line arguments are supported:
2222
* `--http-path` (`string`) — HTTP path on which the webhook events will be expected (defaults to `/`)
2323
* `--secret-token` (`string`) — if specified, this value will be used as a HMAC SHA-256 secret to verify the webhook events
2424

25-
### Example
25+
## GetDX processor
26+
27+
This processor receives, enriches and streams Cirrus CI webhook events to DX's Data Cloud API.
28+
29+
### Usage
30+
31+
```
32+
docker run -it --rm ghcr.io/cirruslabs/cirrus-webhooks-server:latest getdx
33+
```
34+
35+
The following command-line arguments are supported:
36+
37+
* `--dx-instance` (`string`) — DX instance to use when sending webhook events as DX Pipeline events to the Data Cloud API
38+
* `--dx-api-key` (`string`) — API key to use when sending webhook events as DX Pipeline events to the Data Cloud API
39+
* `--http-addr` (`string`) — address on which the HTTP server will listen on (defaults to `:8080`)
40+
* `--http-path` (`string`) — HTTP path on which the webhook events will be expected (defaults to `/`)
41+
* `--secret-token` (`string`) — if specified, this value will be used as a HMAC SHA-256 secret to verify the webhook events
42+
43+
## Example
44+
45+
In this example, we'll receive Cirrus CI webhooks events using the Datadog processor.
2646

2747
The simplest way to try this processor is to use Docker and [ngrok](https://ngrok.com/).
2848

cmd/cws.go

+31-4
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,48 @@ package main
22

33
import (
44
"context"
5+
"fmt"
56
"github.com/cirruslabs/cirrus-webhooks-server/internal/command"
6-
"log"
7+
"github.com/cirruslabs/cirrus-webhooks-server/internal/logginglevel"
8+
"go.uber.org/zap"
79
"os"
810
"os/signal"
911
)
1012

1113
func main() {
14+
if !mainImpl() {
15+
os.Exit(1)
16+
}
17+
}
18+
19+
func mainImpl() bool {
1220
// Set up a signal-interruptible context
1321
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
22+
defer cancel()
23+
24+
// Initialize logger
25+
cfg := zap.NewProductionConfig()
26+
cfg.Level = logginglevel.Level
27+
logger, err := cfg.Build()
28+
if err != nil {
29+
_, _ = fmt.Fprintln(os.Stderr, err)
30+
31+
return false
32+
}
33+
defer func() {
34+
_ = logger.Sync()
35+
}()
36+
37+
// Replace zap.L() and zap.S() to avoid
38+
// propagating the *zap.Logger by hand
39+
zap.ReplaceGlobals(logger)
1440

1541
// Run the command
1642
if err := command.NewRootCmd().ExecuteContext(ctx); err != nil {
17-
cancel()
18-
log.Fatal(err)
43+
logger.Sugar().Error(err)
44+
45+
return false
1946
}
2047

21-
cancel()
48+
return true
2249
}

internal/command/datadog/datadog.go

+29-140
Original file line numberDiff line numberDiff line change
@@ -1,75 +1,47 @@
11
package datadog
22

33
import (
4-
"crypto/hmac"
5-
"crypto/sha256"
6-
"encoding/hex"
74
"encoding/json"
85
"errors"
96
"fmt"
10-
"github.com/brpaz/echozap"
117
payloadpkg "github.com/cirruslabs/cirrus-webhooks-server/internal/command/datadog/payload"
128
"github.com/cirruslabs/cirrus-webhooks-server/internal/datadogsender"
13-
mapset "github.com/deckarep/golang-set/v2"
9+
"github.com/cirruslabs/cirrus-webhooks-server/internal/server"
1410
"github.com/labstack/echo/v4"
1511
"github.com/spf13/cobra"
1612
"go.uber.org/zap"
17-
"io"
18-
"net/http"
19-
"strings"
2013
"time"
2114
)
2215

23-
var debug bool
24-
var httpAddr string
25-
var httpPath string
26-
var eventTypes []string
27-
var secretToken string
2816
var dogstatsdAddr string
2917
var apiKey string
3018
var apiSite string
3119

3220
var (
33-
ErrDatadogFailed = errors.New("failed to stream Cirrus CI events to Datadog")
34-
ErrSignatureVerificationFailed = errors.New("event signature verification failed")
21+
ErrDatadogFailed = errors.New("failed to stream Cirrus CI events to Datadog")
3522
)
3623

3724
func NewCommand() *cobra.Command {
3825
cmd := &cobra.Command{
3926
Use: "datadog",
4027
Short: "Stream Cirrus CI webhook events to Datadog",
41-
RunE: runDatadog,
28+
RunE: run,
4229
}
4330

44-
cmd.PersistentFlags().BoolVar(&debug, "debug", false, "enable debug logging")
45-
cmd.PersistentFlags().StringVar(&httpAddr, "http-addr", ":8080",
46-
"address on which the HTTP server will listen on")
47-
cmd.PersistentFlags().StringVar(&httpPath, "http-path", "/",
48-
"HTTP path on which the webhook events will be expected")
49-
cmd.PersistentFlags().StringSliceVar(&eventTypes, "event-types", []string{},
50-
"comma-separated list of the event types to limit processing to "+
51-
"(for example, --event-types=audit_event or --event-types=build,task")
52-
cmd.PersistentFlags().StringVar(&secretToken, "secret-token", "",
53-
"if specified, this value will be used as a HMAC SHA-256 secret to verify the webhook events")
31+
server.AppendFlags(cmd)
32+
5433
cmd.PersistentFlags().StringVar(&dogstatsdAddr, "dogstatsd-addr", "",
5534
"enables sending webhook events as Datadog events via the DogStatsD protocol to the specified address "+
5635
"(for example, --dogstatsd-addr=127.0.0.1:8125)")
5736
cmd.PersistentFlags().StringVar(&apiKey, "api-key", "",
58-
"Enables sending webhook events as Datadog logs via the Datadog API using the specified API key")
37+
"enables sending webhook events as Datadog logs via the Datadog API using the specified API key")
5938
cmd.PersistentFlags().StringVar(&apiSite, "api-site", "datadoghq.com",
6039
"specifies the Datadog site to use when sending webhook events as Datadog logs via the Datadog API")
6140

6241
return cmd
6342
}
6443

65-
func runDatadog(cmd *cobra.Command, args []string) error {
66-
// Initialize the logger
67-
config := zap.NewProductionConfig()
68-
if debug {
69-
config.Level = zap.NewAtomicLevelAt(zap.DebugLevel)
70-
}
71-
logger := zap.Must(config.Build()).Sugar()
72-
44+
func run(cmd *cobra.Command, _ []string) error {
7345
// Initialize a Datadog sender
7446
var sender datadogsender.Sender
7547
var err error
@@ -88,99 +60,43 @@ func runDatadog(cmd *cobra.Command, args []string) error {
8860
return err
8961
}
9062

91-
// Convert event types to a set for faster lookup
92-
eventTypesSet := mapset.NewSet[string](eventTypes...)
93-
94-
// Configure HTTP server
95-
e := echo.New()
96-
97-
e.Use(echozap.ZapLogger(logger.Desugar()))
98-
99-
e.POST(httpPath, func(ctx echo.Context) error {
100-
return processWebhookEvent(ctx, logger, sender, eventTypesSet)
101-
})
102-
103-
server := &http.Server{
104-
Addr: httpAddr,
105-
Handler: e,
106-
ReadHeaderTimeout: 10 * time.Second,
107-
}
108-
109-
logger.Infof("starting HTTP server on %s", httpAddr)
110-
111-
httpServerErrCh := make(chan error, 1)
112-
113-
go func() {
114-
httpServerErrCh <- server.ListenAndServe()
115-
}()
116-
117-
select {
118-
case <-cmd.Context().Done():
119-
if err := server.Close(); err != nil {
120-
return err
121-
}
122-
case httpServerErr := <-httpServerErrCh:
123-
return httpServerErr
124-
}
125-
126-
return <-httpServerErrCh
63+
return server.New(func(ctx echo.Context, presentedEventType string, body []byte, logger *zap.SugaredLogger) error {
64+
return processWebhookEvent(ctx, presentedEventType, body, sender, logger)
65+
}, zap.S()).Run(cmd.Context())
12766
}
12867

12968
func processWebhookEvent(
13069
ctx echo.Context,
131-
logger *zap.SugaredLogger,
70+
presentedEventType string,
71+
body []byte,
13272
sender datadogsender.Sender,
133-
eventTypesSet mapset.Set[string],
73+
logger *zap.SugaredLogger,
13474
) error {
135-
// Make sure this is an event we're looking for
136-
presentedEventType := ctx.Request().Header.Get("X-Cirrus-Event")
137-
138-
if eventTypesSet.Cardinality() != 0 && !eventTypesSet.Contains(presentedEventType) {
139-
logger.Debugf("skipping event of type %q because we only process events of types %s",
140-
presentedEventType, strings.Join(eventTypesSet.ToSlice(), ", "))
141-
142-
return ctx.String(http.StatusOK, fmt.Sprintf("skipping event of type %q", presentedEventType))
143-
}
144-
145-
body, err := io.ReadAll(ctx.Request().Body)
146-
if err != nil {
147-
logger.Warnf("failed to read request's body: %v", err)
75+
// Decode the event
76+
var payload payloadpkg.Payload
14877

149-
return ctx.NoContent(http.StatusBadRequest)
78+
switch presentedEventType {
79+
case "audit_event":
80+
payload = &payloadpkg.AuditEvent{}
81+
case "build", "task":
82+
payload = &payloadpkg.BuildOrTask{}
83+
default:
84+
return nil
15085
}
15186

152-
// Verify that this event comes from the Cirrus CI
153-
if err := verifyEvent(ctx, body); err != nil {
154-
logger.Warnf("%v", err)
155-
156-
return ctx.NoContent(http.StatusBadRequest)
87+
if err := json.Unmarshal(body, payload); err != nil {
88+
return fmt.Errorf("failed to enrich Datadog event with tags: "+
89+
"failed to parse the webhook event of type %q as JSON: %v", presentedEventType, err)
15790
}
15891

159-
// Log this event into the Datadog
92+
// Create a new Datadog event and enrich it with tags
16093
evt := &datadogsender.Event{
16194
Title: "Webhook event",
16295
Text: string(body),
16396
Tags: []string{fmt.Sprintf("webhook_event_type:%s", presentedEventType)},
16497
}
16598

166-
// Enrich the event with tags
167-
var payload payloadpkg.Payload
168-
169-
switch presentedEventType {
170-
case "audit_event":
171-
payload = &payloadpkg.AuditEvent{}
172-
case "build", "task":
173-
payload = &payloadpkg.BuildOrTask{}
174-
}
175-
176-
if payload != nil {
177-
if err = json.Unmarshal(body, payload); err != nil {
178-
logger.Warnf("failed to enrich Datadog event with tags: "+
179-
"failed to parse the webhook event of type %q as JSON: %v", presentedEventType, err)
180-
} else {
181-
payload.Enrich(ctx.Request().Header, evt, logger)
182-
}
183-
}
99+
payload.Enrich(ctx.Request().Header, evt, logger)
184100

185101
// Datadog silently discards log events submitted with a
186102
// timestamp that is more than 18 hours in the past, sigh.
@@ -191,37 +107,10 @@ func processWebhookEvent(
191107
"18 hours in the past, it'll likely going to be discarded", presentedEventType)
192108
}
193109

194-
message, err := sender.SendEvent(ctx.Request().Context(), evt)
195-
if err != nil {
110+
// Log this event to Datadog
111+
if err := sender.SendEvent(ctx.Request().Context(), evt); err != nil {
196112
return fmt.Errorf("%w: %v", ErrDatadogFailed, err)
197113
}
198114

199-
return ctx.String(http.StatusCreated, message)
200-
}
201-
202-
func verifyEvent(ctx echo.Context, body []byte) error {
203-
// Nothing to do
204-
if secretToken == "" {
205-
return nil
206-
}
207-
208-
// Calculate the expected signature
209-
hmacSHA256 := hmac.New(sha256.New, []byte(secretToken))
210-
hmacSHA256.Write(body)
211-
expectedSignature := hmacSHA256.Sum(nil)
212-
213-
// Prepare the presented signature
214-
presentedSignatureRaw := ctx.Request().Header.Get("X-Cirrus-Signature")
215-
presentedSignature, err := hex.DecodeString(presentedSignatureRaw)
216-
if err != nil {
217-
return fmt.Errorf("%w: failed to hex-decode the signature %q: %v",
218-
ErrSignatureVerificationFailed, presentedSignatureRaw, err)
219-
}
220-
221-
// Compare signatures
222-
if !hmac.Equal(expectedSignature, presentedSignature) {
223-
return fmt.Errorf("%w: signature is not valid", ErrSignatureVerificationFailed)
224-
}
225-
226115
return nil
227116
}

internal/command/datadog/payload/buildortask.go

+10-7
Original file line numberDiff line numberDiff line change
@@ -10,22 +10,25 @@ import (
1010

1111
type BuildOrTask struct {
1212
Build struct {
13-
ID *int64 `json:"id"`
14-
Status *string `json:"status"`
15-
Branch *string `json:"branch"`
16-
PullRequest *int64 `json:"pullRequest"`
17-
User struct {
13+
ID *int64 `json:"id"`
14+
Status *string `json:"status"`
15+
Branch *string `json:"branch"`
16+
PullRequest *int64 `json:"pullRequest"`
17+
ChangeIDInRepo *string `json:"changeIdInRepo"`
18+
User struct {
1819
Username *string `json:"username"`
1920
} `json:"user"`
20-
}
21+
} `json:"build"`
2122
Task struct {
2223
ID *int64 `json:"id"`
2324
Name *string `json:"name"`
2425
Status *string `json:"status"`
26+
StatusTimestamp *int64 `json:"statusTimestamp"`
2527
InstanceType *string `json:"instanceType"`
2628
UniqueLabels []string `json:"uniqueLabels"`
2729
ManualRerunCount *int64 `json:"manualRerunCount"`
28-
}
30+
LocalGroupID *int64 `json:"localGroupId"`
31+
} `json:"task"`
2932

3033
common
3134
}

0 commit comments

Comments
 (0)