Skip to content

Commit 83fb65a

Browse files
committed
Add some high level pipeline tests for prometheus
1 parent 96b4e9d commit 83fb65a

File tree

13 files changed

+517
-216
lines changed

13 files changed

+517
-216
lines changed

internal/component/otelcol/exporter/prometheus/prometheus.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,15 @@ import (
88
"time"
99

1010
"github.com/go-kit/log"
11+
"github.com/prometheus/prometheus/storage"
12+
1113
"github.com/grafana/alloy/internal/component"
1214
"github.com/grafana/alloy/internal/component/otelcol"
1315
"github.com/grafana/alloy/internal/component/otelcol/exporter/prometheus/internal/convert"
1416
"github.com/grafana/alloy/internal/component/otelcol/internal/lazyconsumer"
1517
"github.com/grafana/alloy/internal/component/prometheus"
1618
"github.com/grafana/alloy/internal/featuregate"
1719
"github.com/grafana/alloy/internal/service/labelstore"
18-
"github.com/prometheus/prometheus/storage"
1920
)
2021

2122
func init() {
@@ -87,7 +88,7 @@ func New(o component.Options, c Arguments) (*Component, error) {
8788
return nil, err
8889
}
8990
ls := service.(labelstore.LabelStore)
90-
fanout := prometheus.NewFanout(nil, o.ID, o.Registerer, ls)
91+
fanout := prometheus.NewFanout(nil, o.Registerer, ls)
9192

9293
converter := convert.New(o.Logger, fanout, convertArgumentsToConvertOptions(c))
9394

internal/component/prometheus/enrich/enrich.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ func New(opts component.Options, args Arguments) (*Component, error) {
9797
}
9898
}
9999

100-
c.fanout = prometheus.NewFanout(args.ForwardTo, opts.ID, opts.Registerer, ls)
100+
c.fanout = prometheus.NewFanout(args.ForwardTo, opts.Registerer, ls)
101101
c.receiver = prometheus.NewInterceptor(
102102
c.fanout,
103103
ls,

internal/component/prometheus/fanout.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,7 @@ var _ storage.Appendable = (*Fanout)(nil)
2626
type Fanout struct {
2727
mut sync.RWMutex
2828
// children is where to fan out.
29-
children []storage.Appendable
30-
// ComponentID is what component this belongs to.
31-
componentID string
29+
children []storage.Appendable
3230
writeLatency prometheus.Histogram
3331
samplesCounter prometheus.Counter
3432
ls labelstore.LabelStore
@@ -39,7 +37,7 @@ type Fanout struct {
3937
}
4038

4139
// NewFanout creates a fanout appendable.
42-
func NewFanout(children []storage.Appendable, componentID string, register prometheus.Registerer, ls labelstore.LabelStore) *Fanout {
40+
func NewFanout(children []storage.Appendable, register prometheus.Registerer, ls labelstore.LabelStore) *Fanout {
4341
wl := prometheus.NewHistogram(prometheus.HistogramOpts{
4442
Name: "prometheus_fanout_latency",
4543
Help: "Write latency for sending to direct and indirect components",
@@ -55,7 +53,6 @@ func NewFanout(children []storage.Appendable, componentID string, register prome
5553

5654
return &Fanout{
5755
children: children,
58-
componentID: componentID,
5956
writeLatency: wl,
6057
samplesCounter: s,
6158
ls: ls,

internal/component/prometheus/fanout_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,15 @@ import (
1313

1414
func TestRollback(t *testing.T) {
1515
ls := labelstore.New(nil, prometheus.DefaultRegisterer)
16-
fanout := NewFanout([]storage.Appendable{NewFanout(nil, "1", prometheus.DefaultRegisterer, ls)}, "", prometheus.DefaultRegisterer, ls)
16+
fanout := NewFanout([]storage.Appendable{NewFanout(nil, prometheus.DefaultRegisterer, ls)}, prometheus.DefaultRegisterer, ls)
1717
app := fanout.Appender(t.Context())
1818
err := app.Rollback()
1919
require.NoError(t, err)
2020
}
2121

2222
func TestCommit(t *testing.T) {
2323
ls := labelstore.New(nil, prometheus.DefaultRegisterer)
24-
fanout := NewFanout([]storage.Appendable{NewFanout(nil, "1", prometheus.DefaultRegisterer, ls)}, "", prometheus.DefaultRegisterer, ls)
24+
fanout := NewFanout([]storage.Appendable{NewFanout(nil, prometheus.DefaultRegisterer, ls)}, prometheus.DefaultRegisterer, ls)
2525
app := fanout.Appender(t.Context())
2626
err := app.Commit()
2727
require.NoError(t, err)

internal/component/prometheus/operator/common/crdmanager.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ func (c *crdManager) Run(ctx context.Context) error {
145145
}()
146146

147147
// Start prometheus scrape manager.
148-
alloyAppendable := prometheus.NewFanout(c.args.ForwardTo, c.opts.ID, c.opts.Registerer, c.ls)
148+
alloyAppendable := prometheus.NewFanout(c.args.ForwardTo, c.opts.Registerer, c.ls)
149149
opts := &scrape.Options{}
150150
c.scrapeManager, err = scrape.NewManager(opts, slog.New(logging.NewSlogGoKitHandler(c.logger)), nil, alloyAppendable, unregisterer)
151151
if err != nil {
Lines changed: 270 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,270 @@
1+
package prometheus_test
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log/slog"
7+
"sync/atomic"
8+
"testing"
9+
"time"
10+
11+
"github.com/go-kit/log"
12+
promclient "github.com/prometheus/client_golang/prometheus"
13+
"github.com/prometheus/prometheus/model/labels"
14+
"github.com/prometheus/prometheus/storage"
15+
"github.com/stretchr/testify/assert"
16+
"github.com/stretchr/testify/require"
17+
"golang.org/x/exp/maps"
18+
19+
"github.com/grafana/alloy/internal/component"
20+
"github.com/grafana/alloy/internal/component/prometheus"
21+
"github.com/grafana/alloy/internal/component/prometheus/relabel"
22+
"github.com/grafana/alloy/internal/component/prometheus/remotewrite"
23+
"github.com/grafana/alloy/internal/component/prometheus/scrape"
24+
"github.com/grafana/alloy/internal/runtime/componenttest"
25+
"github.com/grafana/alloy/internal/runtime/logging"
26+
"github.com/grafana/alloy/internal/service/labelstore"
27+
"github.com/grafana/alloy/internal/service/livedebugging"
28+
"github.com/grafana/alloy/internal/static/metrics/wal"
29+
"github.com/grafana/alloy/internal/util"
30+
"github.com/grafana/alloy/internal/util/testappender"
31+
"github.com/grafana/alloy/syntax"
32+
)
33+
34+
// This test simulates a scrape -> remote_write pipeline, without actually scraping
35+
func TestPipeline(t *testing.T) {
36+
pipeline, ls, destination := newDefaultPipeline(t, util.TestLogger(t))
37+
38+
// We need to use a future timestamp since remote_write will ignore any
39+
// sample which is earlier than the time when it started. Adding a minute
40+
// ensures that our samples will never get ignored.
41+
sampleTimestamp := time.Now().Add(time.Minute).UnixMilli()
42+
43+
// Send metrics to our component. These will be written to the WAL and
44+
// subsequently written to our HTTP server.
45+
lset1 := labels.FromStrings("foo", "bar")
46+
ref1 := sendMetric(t, pipeline.Appender(t.Context()), lset1, sampleTimestamp, 12)
47+
lset2 := labels.FromStrings("fizz", "buzz")
48+
ref2 := sendMetric(t, pipeline.Appender(t.Context()), lset2, sampleTimestamp, 34)
49+
50+
expect := []*testappender.MetricSample{{
51+
Labels: lset1,
52+
Timestamp: sampleTimestamp,
53+
Value: 12,
54+
}, {
55+
Labels: lset2,
56+
Timestamp: sampleTimestamp,
57+
Value: 34,
58+
}}
59+
60+
require.EventuallyWithT(t, func(t *assert.CollectT) {
61+
require.Len(t, destination.CollectedSamples(), 2)
62+
require.ElementsMatch(t, expect, maps.Values(destination.CollectedSamples()))
63+
}, 5*time.Second, 10*time.Millisecond, "timed out waiting for metrics to be written to destination")
64+
65+
ref := ls.GetOrAddGlobalRefID(lset1)
66+
require.NotZero(t, ref)
67+
// Append result ref should match the labelstore ref
68+
require.Equal(t, ref, ref1)
69+
localRef := ls.GetLocalRefID("prometheus.remote_write.test", ref)
70+
require.NotZero(t, localRef)
71+
72+
ref = ls.GetOrAddGlobalRefID(lset2)
73+
require.NotZero(t, ref)
74+
// Append result ref should match the labelstore ref
75+
require.Equal(t, ref, ref2)
76+
localRef = ls.GetLocalRefID("prometheus.remote_write.test", ref)
77+
require.NotZero(t, localRef)
78+
}
79+
80+
// This test simulates a scrape -> relabel -> remote_write pipeline, without actually scraping
81+
func TestRelabelPipeline(t *testing.T) {
82+
pipeline, ls, destination := newRelabelPipeline(t, util.TestLogger(t))
83+
84+
// We need to use a future timestamp since remote_write will ignore any
85+
// sample which is earlier than the time when it started. Adding a minute
86+
// ensures that our samples will never get ignored.
87+
sampleTimestamp := time.Now().Add(time.Minute).UnixMilli()
88+
89+
// Send metrics to our component. These will be written to the WAL and
90+
// subsequently written to our HTTP server.
91+
lset1 := labels.FromStrings("foo", "bar")
92+
ref1 := sendMetric(t, pipeline.Appender(t.Context()), lset1, sampleTimestamp, 12)
93+
lset2 := labels.FromStrings("fizz", "buzz")
94+
ref2 := sendMetric(t, pipeline.Appender(t.Context()), lset2, sampleTimestamp, 34)
95+
96+
expect := []*testappender.MetricSample{{
97+
Labels: labels.NewBuilder(lset1).Set("lbl", "foo").Labels(),
98+
Timestamp: sampleTimestamp,
99+
Value: 12,
100+
}, {
101+
Labels: labels.NewBuilder(lset2).Set("lbl", "foo").Labels(),
102+
Timestamp: sampleTimestamp,
103+
Value: 34,
104+
}}
105+
106+
require.EventuallyWithT(t, func(t *assert.CollectT) {
107+
require.Len(t, destination.CollectedSamples(), 2)
108+
require.ElementsMatch(t, expect, maps.Values(destination.CollectedSamples()))
109+
}, 1*time.Minute, 100*time.Millisecond, "timed out waiting for metrics to be written to destination")
110+
111+
ref := ls.GetOrAddGlobalRefID(lset1)
112+
require.NotZero(t, ref)
113+
// Append result ref should match the labelstore ref
114+
require.Equal(t, ref, ref1)
115+
// This was relabeled, so we shouldn't have a local ref
116+
localRef := ls.GetLocalRefID("prometheus.remote_write.test", ref)
117+
require.Zero(t, localRef)
118+
119+
ref = ls.GetOrAddGlobalRefID(lset2)
120+
require.NotZero(t, ref)
121+
// Append result ref should match the labelstore ref
122+
require.Equal(t, ref, ref2)
123+
124+
// This was relabeled, so we shouldn't have a local ref
125+
localRef = ls.GetLocalRefID("prometheus.remote_write.test", ref)
126+
require.Zero(t, localRef)
127+
128+
lset1Relabeled := labels.NewBuilder(lset1).Set("lbl", "foo").Labels()
129+
ref = ls.GetOrAddGlobalRefID(lset1Relabeled)
130+
require.NotZero(t, ref)
131+
localRef = ls.GetLocalRefID("prometheus.remote_write.test", ref)
132+
require.NotZero(t, localRef)
133+
134+
lset2Relabeled := labels.NewBuilder(lset2).Set("lbl", "foo").Labels()
135+
ref = ls.GetOrAddGlobalRefID(lset2Relabeled)
136+
require.NotZero(t, ref)
137+
localRef = ls.GetLocalRefID("prometheus.remote_write.test", ref)
138+
require.NotZero(t, localRef)
139+
}
140+
141+
func BenchmarkPipelines(b *testing.B) {
142+
tests := []struct {
143+
name string
144+
pipelineBuilder func(t testing.TB, logger log.Logger) (storage.Appendable, labelstore.LabelStore, testappender.CollectingAppender)
145+
}{
146+
{"default", newDefaultPipeline},
147+
{"relabel", newRelabelPipeline},
148+
}
149+
150+
numberOfMetrics := []int{2, 10, 100, 1000}
151+
152+
for _, n := range numberOfMetrics {
153+
for _, tt := range tests {
154+
// Don't need care about the labelstore and destination for benchmarks
155+
pipeline, _, _ := tt.pipelineBuilder(b, log.NewNopLogger())
156+
b.Run(fmt.Sprintf("%s/%d-metrics", tt.name, n), func(b *testing.B) {
157+
b.ReportAllocs()
158+
b.ResetTimer()
159+
160+
for b.Loop() {
161+
for i := 0; i < n; i++ {
162+
sendMetric(
163+
b,
164+
pipeline.Appender(b.Context()),
165+
labels.FromStrings(fmt.Sprintf("metric-%d", i), fmt.Sprintf("metric-%d", i)),
166+
time.Now().Add(time.Minute).UnixMilli(),
167+
float64(i),
168+
)
169+
}
170+
}
171+
})
172+
}
173+
}
174+
}
175+
176+
func newDefaultPipeline(t testing.TB, logger log.Logger) (storage.Appendable, labelstore.LabelStore, testappender.CollectingAppender) {
177+
ls := labelstore.New(logger, promclient.DefaultRegisterer)
178+
rwAppendable, rwDestination := newRemoteWriteComponent(t, logger, ls)
179+
pipelineAppendable := prometheus.NewFanout([]storage.Appendable{rwAppendable}, promclient.DefaultRegisterer, ls)
180+
scrapeInterceptor := scrape.NewInterceptor("prometheus.scrape.test", ls, livedebugging.NewLiveDebugging(), pipelineAppendable)
181+
182+
return scrapeInterceptor, ls, rwDestination
183+
}
184+
185+
func newRelabelPipeline(t testing.TB, logger log.Logger) (storage.Appendable, labelstore.LabelStore, testappender.CollectingAppender) {
186+
ls := labelstore.New(logger, promclient.DefaultRegisterer)
187+
rwAppendable, rwDestination := newRemoteWriteComponent(t, logger, ls)
188+
relabelAppendable := newRelabelComponent(t, logger, []storage.Appendable{rwAppendable}, ls)
189+
pipelineAppendable := prometheus.NewFanout([]storage.Appendable{relabelAppendable}, promclient.DefaultRegisterer, ls)
190+
scrapeInterceptor := scrape.NewInterceptor("prometheus.scrape.test", ls, livedebugging.NewLiveDebugging(), pipelineAppendable)
191+
192+
return scrapeInterceptor, ls, rwDestination
193+
}
194+
195+
func newRemoteWriteComponent(t testing.TB, logger log.Logger, ls *labelstore.Service) (storage.Appendable, testappender.CollectingAppender) {
196+
walDir := t.TempDir()
197+
198+
walStorage, err := wal.NewStorage(logger, promclient.NewRegistry(), walDir)
199+
require.NoError(t, err)
200+
201+
fanoutLogger := slog.New(
202+
logging.NewSlogGoKitHandler(
203+
log.With(logger, "subcomponent", "fanout"),
204+
),
205+
)
206+
207+
inMemoryAppendable := testappender.ConstantAppendable{Inner: testappender.NewCollectingAppender()}
208+
store := storage.NewFanout(fanoutLogger, walStorage, testStorage{inMemoryAppendable: inMemoryAppendable})
209+
210+
return remotewrite.NewInterceptor("prometheus.remote_write.test", &atomic.Bool{}, livedebugging.NewLiveDebugging(), ls, store), inMemoryAppendable.Inner
211+
}
212+
213+
type testStorage struct {
214+
// Embed Queryable/ChunkQueryable for compatibility, but don't actually implement it.
215+
storage.Queryable
216+
storage.ChunkQueryable
217+
218+
inMemoryAppendable storage.Appendable
219+
}
220+
221+
func (t testStorage) Appender(ctx context.Context) storage.Appender {
222+
return t.inMemoryAppendable.Appender(ctx)
223+
}
224+
225+
func (t testStorage) StartTime() (int64, error) {
226+
return 0, nil
227+
}
228+
229+
func (t testStorage) Close() error {
230+
return nil
231+
}
232+
233+
func newRelabelComponent(t testing.TB, logger log.Logger, forwardTo []storage.Appendable, ls *labelstore.Service) storage.Appendable {
234+
cfg := `forward_to = []
235+
rule {
236+
action = "replace"
237+
target_label = "lbl"
238+
replacement = "foo"
239+
}`
240+
var args relabel.Arguments
241+
require.NoError(t, syntax.Unmarshal([]byte(cfg), &args))
242+
args.ForwardTo = forwardTo
243+
244+
tc, err := componenttest.NewControllerFromID(logger, "prometheus.relabel")
245+
require.NoError(t, err)
246+
go func() {
247+
err = tc.Run(componenttest.TestContext(t), args, func(opts component.Options) component.Options {
248+
inner := opts.GetServiceData
249+
opts.GetServiceData = func(name string) (interface{}, error) {
250+
if name == labelstore.ServiceName {
251+
return ls, nil
252+
}
253+
return inner(name)
254+
}
255+
return opts
256+
})
257+
require.NoError(t, err)
258+
}()
259+
require.NoError(t, tc.WaitRunning(5*time.Second))
260+
261+
return tc.Exports().(relabel.Exports).Receiver
262+
}
263+
264+
func sendMetric(t testing.TB, appender storage.Appender, labels labels.Labels, time int64, value float64) uint64 {
265+
ref, err := appender.Append(0, labels, time, value)
266+
require.NoError(t, err)
267+
require.NoError(t, appender.Commit())
268+
269+
return uint64(ref)
270+
}

internal/component/prometheus/receive_http/receive_http.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ func New(opts component.Options, args Arguments) (*Component, error) {
6565
return nil, err
6666
}
6767
ls := service.(labelstore.LabelStore)
68-
fanout := alloyprom.NewFanout(args.ForwardTo, opts.ID, opts.Registerer, ls)
68+
fanout := alloyprom.NewFanout(args.ForwardTo, opts.Registerer, ls)
6969

7070
uncheckedCollector := util.NewUncheckedCollector(nil)
7171
opts.Registerer.MustRegister(uncheckedCollector)

internal/component/prometheus/relabel/relabel.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ func New(o component.Options, args Arguments) (*Component, error) {
153153
}
154154
}
155155

156-
c.fanout = prometheus.NewFanout(args.ForwardTo, o.ID, o.Registerer, ls)
156+
c.fanout = prometheus.NewFanout(args.ForwardTo, o.Registerer, ls)
157157
c.receiver = prometheus.NewInterceptor(
158158
c.fanout,
159159
ls,

0 commit comments

Comments
 (0)