diff --git a/CHANGELOG.md b/CHANGELOG.md index 2e55a1a3a5..0dc97dd082 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,8 @@ Main (unreleased) - Add `meta_cache_address` to `beyla.ebpf` component. (@skl) +- Remove labelstore interactions from the prometheus interceptor simplifying prometheus pipelines. (@kgeckhart) + ### Bugfixes - `loki.source.api` no longer drops request when relabel rules drops a specific stream. (@kalleep) @@ -98,7 +100,6 @@ v1.12.0-rc.0 - The `otelcol.processor.servicegraph` component now supports defining the maximum number of buckets for generated exponential histograms. - See the upstream [core][https://github.com/open-telemetry/opentelemetry-collector/blob/v0.139.0/CHANGELOG.md] and [contrib][https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.139.0/CHANGELOG.md] changelogs for more details. - ### Enhancements - Add per-application rate limiting with the `strategy` attribute in the `faro.receiver` component, to prevent one application from consuming the rate limit quota of others. (@hhertout) diff --git a/internal/component/otelcol/exporter/prometheus/prometheus.go b/internal/component/otelcol/exporter/prometheus/prometheus.go index c838afd499..64158da953 100644 --- a/internal/component/otelcol/exporter/prometheus/prometheus.go +++ b/internal/component/otelcol/exporter/prometheus/prometheus.go @@ -8,6 +8,8 @@ import ( "time" "github.com/go-kit/log" + "github.com/prometheus/prometheus/storage" + "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/component/otelcol" "github.com/grafana/alloy/internal/component/otelcol/exporter/prometheus/internal/convert" @@ -15,7 +17,6 @@ import ( "github.com/grafana/alloy/internal/component/prometheus" "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/internal/service/labelstore" - "github.com/prometheus/prometheus/storage" ) func init() { @@ -87,7 +88,7 @@ func New(o component.Options, c Arguments) (*Component, error) { return nil, err } ls := service.(labelstore.LabelStore) - fanout := prometheus.NewFanout(nil, o.ID, o.Registerer, ls) + fanout := prometheus.NewFanout(nil, o.Registerer, ls) converter := convert.New(o.Logger, fanout, convertArgumentsToConvertOptions(c)) diff --git a/internal/component/prometheus/enrich/enrich.go b/internal/component/prometheus/enrich/enrich.go index 2128ac544b..a947fe1590 100644 --- a/internal/component/prometheus/enrich/enrich.go +++ b/internal/component/prometheus/enrich/enrich.go @@ -97,10 +97,9 @@ func New(opts component.Options, args Arguments) (*Component, error) { } } - c.fanout = prometheus.NewFanout(args.ForwardTo, opts.ID, opts.Registerer, ls) + c.fanout = prometheus.NewFanout(args.ForwardTo, opts.Registerer, ls) c.receiver = prometheus.NewInterceptor( c.fanout, - ls, prometheus.WithComponentID(c.opts.ID), prometheus.WithAppendHook(func(_ storage.SeriesRef, l labels.Labels, t int64, v float64, next storage.Appender) (storage.SeriesRef, error) { if c.exited.Load() { diff --git a/internal/component/prometheus/enrich/enrich_test.go b/internal/component/prometheus/enrich/enrich_test.go index ba75d2cde2..6a5934b373 100644 --- a/internal/component/prometheus/enrich/enrich_test.go +++ b/internal/component/prometheus/enrich/enrich_test.go @@ -5,15 +5,16 @@ import ( "testing" "time" + prom "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + "github.com/stretchr/testify/require" + "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/component/discovery" "github.com/grafana/alloy/internal/component/prometheus" "github.com/grafana/alloy/internal/service/labelstore" "github.com/grafana/alloy/internal/util" - prom "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/storage" - "github.com/stretchr/testify/require" ) func TestEnricher(t *testing.T) { @@ -108,9 +109,8 @@ func TestEnricher(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - ls := labelstore.New(nil, prom.DefaultRegisterer) fanout := prometheus.NewInterceptor( - nil, ls, + nil, prometheus.WithAppendHook(func(ref storage.SeriesRef, l labels.Labels, _ int64, _ float64, _ storage.Appender) (storage.SeriesRef, error) { for name, value := range tt.expectedLabels { require.Equal(t, l.Get(name), value) diff --git a/internal/component/prometheus/fanout.go b/internal/component/prometheus/fanout.go index bcb0e60437..414292dc49 100644 --- a/internal/component/prometheus/fanout.go +++ b/internal/component/prometheus/fanout.go @@ -23,12 +23,11 @@ import ( var _ storage.Appendable = (*Fanout)(nil) // Fanout supports the default Alloy style of appendables since it can go to multiple outputs. It also allows the intercepting of appends. +// It also maintains the responsibility of assigning global ref IDs to a series via the label store. type Fanout struct { mut sync.RWMutex // children is where to fan out. - children []storage.Appendable - // ComponentID is what component this belongs to. - componentID string + children []storage.Appendable writeLatency prometheus.Histogram samplesCounter prometheus.Counter ls labelstore.LabelStore @@ -39,7 +38,7 @@ type Fanout struct { } // NewFanout creates a fanout appendable. -func NewFanout(children []storage.Appendable, componentID string, register prometheus.Registerer, ls labelstore.LabelStore) *Fanout { +func NewFanout(children []storage.Appendable, register prometheus.Registerer, ls labelstore.LabelStore) *Fanout { wl := prometheus.NewHistogram(prometheus.HistogramOpts{ Name: "prometheus_fanout_latency", Help: "Write latency for sending to direct and indirect components", @@ -55,7 +54,6 @@ func NewFanout(children []storage.Appendable, componentID string, register prome return &Fanout{ children: children, - componentID: componentID, writeLatency: wl, samplesCounter: s, ls: ls, diff --git a/internal/component/prometheus/fanout_test.go b/internal/component/prometheus/fanout_test.go index e9ab46a7aa..1428e46fef 100644 --- a/internal/component/prometheus/fanout_test.go +++ b/internal/component/prometheus/fanout_test.go @@ -13,7 +13,7 @@ import ( func TestRollback(t *testing.T) { ls := labelstore.New(nil, prometheus.DefaultRegisterer) - fanout := NewFanout([]storage.Appendable{NewFanout(nil, "1", prometheus.DefaultRegisterer, ls)}, "", prometheus.DefaultRegisterer, ls) + fanout := NewFanout([]storage.Appendable{NewFanout(nil, prometheus.DefaultRegisterer, ls)}, prometheus.DefaultRegisterer, ls) app := fanout.Appender(t.Context()) err := app.Rollback() require.NoError(t, err) @@ -21,7 +21,7 @@ func TestRollback(t *testing.T) { func TestCommit(t *testing.T) { ls := labelstore.New(nil, prometheus.DefaultRegisterer) - fanout := NewFanout([]storage.Appendable{NewFanout(nil, "1", prometheus.DefaultRegisterer, ls)}, "", prometheus.DefaultRegisterer, ls) + fanout := NewFanout([]storage.Appendable{NewFanout(nil, prometheus.DefaultRegisterer, ls)}, prometheus.DefaultRegisterer, ls) app := fanout.Appender(t.Context()) err := app.Commit() require.NoError(t, err) diff --git a/internal/component/prometheus/interceptor.go b/internal/component/prometheus/interceptor.go index 7994f3530d..8c57f706ff 100644 --- a/internal/component/prometheus/interceptor.go +++ b/internal/component/prometheus/interceptor.go @@ -8,9 +8,6 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/metadata" "github.com/prometheus/prometheus/storage" - "go.uber.org/atomic" - - "github.com/grafana/alloy/internal/service/labelstore" ) // Interceptor is a storage.Appendable which invokes callback functions upon @@ -26,12 +23,6 @@ type Interceptor struct { // next is the next appendable to pass in the chain. next storage.Appendable - ls labelstore.LabelStore - - // lastSeriesCount stores the number of series that were sent through the last interceptappender. It helps to estimate how - // much memory to allocate for the staleness trackers. - lastSeriesCount atomic.Int64 - componentID string } @@ -39,10 +30,9 @@ var _ storage.Appendable = (*Interceptor)(nil) // NewInterceptor creates a new Interceptor storage.Appendable. Options can be // provided to NewInterceptor to install custom hooks for different methods. -func NewInterceptor(next storage.Appendable, ls labelstore.LabelStore, opts ...InterceptorOption) *Interceptor { +func NewInterceptor(next storage.Appendable, opts ...InterceptorOption) *Interceptor { i := &Interceptor{ next: next, - ls: ls, } for _, opt := range opts { opt(i) @@ -102,27 +92,23 @@ func WithComponentID(id string) InterceptorOption { } // Appender satisfies the Appendable interface. -func (f *Interceptor) Appender(ctx context.Context) storage.Appender { +func (i *Interceptor) Appender(ctx context.Context) storage.Appender { app := &interceptappender{ - interceptor: f, - ls: f.ls, - stalenessTrackers: make([]labelstore.StalenessTracker, 0, f.lastSeriesCount.Load()), + interceptor: i, } - if f.next != nil { - app.child = f.next.Appender(ctx) + if i.next != nil { + app.child = i.next.Appender(ctx) } return app } -func (f *Interceptor) String() string { - return f.componentID + ".receiver" +func (i *Interceptor) String() string { + return i.componentID + ".receiver" } type interceptappender struct { - interceptor *Interceptor - child storage.Appender - ls labelstore.LabelStore - stalenessTrackers []labelstore.StalenessTracker + interceptor *Interceptor + child storage.Appender } func (a *interceptappender) SetOptions(opts *storage.AppendOptions) { @@ -135,15 +121,6 @@ var _ storage.Appender = (*interceptappender)(nil) // Append satisfies the Appender interface. func (a *interceptappender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) { - if ref == 0 { - ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l)) - } - a.stalenessTrackers = append(a.stalenessTrackers, labelstore.StalenessTracker{ - GlobalRefID: uint64(ref), - Labels: l, - Value: v, - }) - if a.interceptor.onAppend != nil { return a.interceptor.onAppend(ref, l, t, v, a.child) } @@ -155,8 +132,6 @@ func (a *interceptappender) Append(ref storage.SeriesRef, l labels.Labels, t int // Commit satisfies the Appender interface. func (a *interceptappender) Commit() error { - a.interceptor.lastSeriesCount.Store(int64(len(a.stalenessTrackers))) - a.ls.TrackStaleness(a.stalenessTrackers) if a.child == nil { return nil } @@ -165,8 +140,6 @@ func (a *interceptappender) Commit() error { // Rollback satisfies the Appender interface. func (a *interceptappender) Rollback() error { - a.interceptor.lastSeriesCount.Store(int64(len(a.stalenessTrackers))) - a.ls.TrackStaleness(a.stalenessTrackers) if a.child == nil { return nil } @@ -180,10 +153,6 @@ func (a *interceptappender) AppendExemplar( e exemplar.Exemplar, ) (storage.SeriesRef, error) { - if ref == 0 { - ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l)) - } - if a.interceptor.onAppendExemplar != nil { return a.interceptor.onAppendExemplar(ref, l, e, a.child) } @@ -200,10 +169,6 @@ func (a *interceptappender) UpdateMetadata( m metadata.Metadata, ) (storage.SeriesRef, error) { - if ref == 0 { - ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l)) - } - if a.interceptor.onUpdateMetadata != nil { return a.interceptor.onUpdateMetadata(ref, l, m, a.child) } @@ -221,10 +186,6 @@ func (a *interceptappender) AppendHistogram( fh *histogram.FloatHistogram, ) (storage.SeriesRef, error) { - if ref == 0 { - ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l)) - } - // TODO histograms are not currently tracked for staleness causing them to be held forever if a.interceptor.onAppendHistogram != nil { return a.interceptor.onAppendHistogram(ref, l, t, h, fh, a.child) } @@ -240,10 +201,6 @@ func (a *interceptappender) AppendCTZeroSample( t, ct int64, ) (storage.SeriesRef, error) { - if ref == 0 { - ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l)) - } - if a.interceptor.onAppendCTZeroSample != nil { return a.interceptor.onAppendCTZeroSample(ref, l, t, ct, a.child) } @@ -261,10 +218,6 @@ func (a *interceptappender) AppendHistogramCTZeroSample( fh *histogram.FloatHistogram, ) (storage.SeriesRef, error) { - if ref == 0 { - ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l)) - } - if a.child == nil { return 0, nil } diff --git a/internal/component/prometheus/operator/common/crdmanager.go b/internal/component/prometheus/operator/common/crdmanager.go index b5948edf9e..ce4e7f7aa3 100644 --- a/internal/component/prometheus/operator/common/crdmanager.go +++ b/internal/component/prometheus/operator/common/crdmanager.go @@ -145,7 +145,7 @@ func (c *crdManager) Run(ctx context.Context) error { }() // Start prometheus scrape manager. - alloyAppendable := prometheus.NewFanout(c.args.ForwardTo, c.opts.ID, c.opts.Registerer, c.ls) + alloyAppendable := prometheus.NewFanout(c.args.ForwardTo, c.opts.Registerer, c.ls) opts := &scrape.Options{} c.scrapeManager, err = scrape.NewManager(opts, slog.New(logging.NewSlogGoKitHandler(c.logger)), nil, alloyAppendable, unregisterer) if err != nil { diff --git a/internal/component/prometheus/pipeline_test.go b/internal/component/prometheus/pipeline_test.go new file mode 100644 index 0000000000..1a050ad409 --- /dev/null +++ b/internal/component/prometheus/pipeline_test.go @@ -0,0 +1,270 @@ +package prometheus_test + +import ( + "context" + "fmt" + "log/slog" + "testing" + "time" + + "github.com/go-kit/log" + promclient "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/atomic" + "golang.org/x/exp/maps" + + "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/component/prometheus" + "github.com/grafana/alloy/internal/component/prometheus/relabel" + "github.com/grafana/alloy/internal/component/prometheus/remotewrite" + "github.com/grafana/alloy/internal/component/prometheus/scrape" + "github.com/grafana/alloy/internal/runtime/componenttest" + "github.com/grafana/alloy/internal/runtime/logging" + "github.com/grafana/alloy/internal/service/labelstore" + "github.com/grafana/alloy/internal/service/livedebugging" + "github.com/grafana/alloy/internal/static/metrics/wal" + "github.com/grafana/alloy/internal/util" + "github.com/grafana/alloy/internal/util/testappender" + "github.com/grafana/alloy/syntax" +) + +// This test simulates a scrape -> remote_write pipeline, without actually scraping +func TestPipeline(t *testing.T) { + pipeline, ls, destination := newDefaultPipeline(t, util.TestLogger(t)) + + // We need to use a future timestamp since remote_write will ignore any + // sample which is earlier than the time when it started. Adding a minute + // ensures that our samples will never get ignored. + sampleTimestamp := time.Now().Add(time.Minute).UnixMilli() + + // Send metrics to our component. These will be written to the WAL and + // subsequently written to our HTTP server. + lset1 := labels.FromStrings("foo", "bar") + ref1 := sendMetric(t, pipeline.Appender(t.Context()), lset1, sampleTimestamp, 12) + lset2 := labels.FromStrings("fizz", "buzz") + ref2 := sendMetric(t, pipeline.Appender(t.Context()), lset2, sampleTimestamp, 34) + + expect := []*testappender.MetricSample{{ + Labels: lset1, + Timestamp: sampleTimestamp, + Value: 12, + }, { + Labels: lset2, + Timestamp: sampleTimestamp, + Value: 34, + }} + + require.EventuallyWithT(t, func(t *assert.CollectT) { + require.Len(t, destination.CollectedSamples(), 2) + require.ElementsMatch(t, expect, maps.Values(destination.CollectedSamples())) + }, 5*time.Second, 10*time.Millisecond, "timed out waiting for metrics to be written to destination") + + ref := ls.GetOrAddGlobalRefID(lset1) + require.NotZero(t, ref) + // Append result ref should match the labelstore ref + require.Equal(t, ref, ref1) + localRef := ls.GetLocalRefID("prometheus.remote_write.test", ref) + require.NotZero(t, localRef) + + ref = ls.GetOrAddGlobalRefID(lset2) + require.NotZero(t, ref) + // Append result ref should match the labelstore ref + require.Equal(t, ref, ref2) + localRef = ls.GetLocalRefID("prometheus.remote_write.test", ref) + require.NotZero(t, localRef) +} + +// This test simulates a scrape -> relabel -> remote_write pipeline, without actually scraping +func TestRelabelPipeline(t *testing.T) { + pipeline, ls, destination := newRelabelPipeline(t, util.TestLogger(t)) + + // We need to use a future timestamp since remote_write will ignore any + // sample which is earlier than the time when it started. Adding a minute + // ensures that our samples will never get ignored. + sampleTimestamp := time.Now().Add(time.Minute).UnixMilli() + + // Send metrics to our component. These will be written to the WAL and + // subsequently written to our HTTP server. + lset1 := labels.FromStrings("foo", "bar") + ref1 := sendMetric(t, pipeline.Appender(t.Context()), lset1, sampleTimestamp, 12) + lset2 := labels.FromStrings("fizz", "buzz") + ref2 := sendMetric(t, pipeline.Appender(t.Context()), lset2, sampleTimestamp, 34) + + expect := []*testappender.MetricSample{{ + Labels: labels.NewBuilder(lset1).Set("lbl", "foo").Labels(), + Timestamp: sampleTimestamp, + Value: 12, + }, { + Labels: labels.NewBuilder(lset2).Set("lbl", "foo").Labels(), + Timestamp: sampleTimestamp, + Value: 34, + }} + + require.EventuallyWithT(t, func(t *assert.CollectT) { + require.Len(t, destination.CollectedSamples(), 2) + require.ElementsMatch(t, expect, maps.Values(destination.CollectedSamples())) + }, 1*time.Minute, 100*time.Millisecond, "timed out waiting for metrics to be written to destination") + + ref := ls.GetOrAddGlobalRefID(lset1) + require.NotZero(t, ref) + // Append result ref should match the labelstore ref + require.Equal(t, ref, ref1) + // This was relabeled, so we shouldn't have a local ref + localRef := ls.GetLocalRefID("prometheus.remote_write.test", ref) + require.Zero(t, localRef) + + ref = ls.GetOrAddGlobalRefID(lset2) + require.NotZero(t, ref) + // Append result ref should match the labelstore ref + require.Equal(t, ref, ref2) + + // This was relabeled, so we shouldn't have a local ref + localRef = ls.GetLocalRefID("prometheus.remote_write.test", ref) + require.Zero(t, localRef) + + lset1Relabeled := labels.NewBuilder(lset1).Set("lbl", "foo").Labels() + ref = ls.GetOrAddGlobalRefID(lset1Relabeled) + require.NotZero(t, ref) + localRef = ls.GetLocalRefID("prometheus.remote_write.test", ref) + require.NotZero(t, localRef) + + lset2Relabeled := labels.NewBuilder(lset2).Set("lbl", "foo").Labels() + ref = ls.GetOrAddGlobalRefID(lset2Relabeled) + require.NotZero(t, ref) + localRef = ls.GetLocalRefID("prometheus.remote_write.test", ref) + require.NotZero(t, localRef) +} + +func BenchmarkPipelines(b *testing.B) { + tests := []struct { + name string + pipelineBuilder func(t testing.TB, logger log.Logger) (storage.Appendable, labelstore.LabelStore, testappender.CollectingAppender) + }{ + {"default", newDefaultPipeline}, + {"relabel", newRelabelPipeline}, + } + + numberOfMetrics := []int{2, 10, 100, 1000} + + for _, n := range numberOfMetrics { + for _, tt := range tests { + // Don't need care about the labelstore and destination for benchmarks + pipeline, _, _ := tt.pipelineBuilder(b, log.NewNopLogger()) + b.Run(fmt.Sprintf("%s/%d-metrics", tt.name, n), func(b *testing.B) { + b.ReportAllocs() + b.ResetTimer() + + for b.Loop() { + for i := 0; i < n; i++ { + sendMetric( + b, + pipeline.Appender(b.Context()), + labels.FromStrings(fmt.Sprintf("metric-%d", i), fmt.Sprintf("metric-%d", i)), + time.Now().Add(time.Minute).UnixMilli(), + float64(i), + ) + } + } + }) + } + } +} + +func newDefaultPipeline(t testing.TB, logger log.Logger) (storage.Appendable, labelstore.LabelStore, testappender.CollectingAppender) { + ls := labelstore.New(logger, promclient.DefaultRegisterer) + rwAppendable, rwDestination := newRemoteWriteComponent(t, logger, ls) + pipelineAppendable := prometheus.NewFanout([]storage.Appendable{rwAppendable}, promclient.DefaultRegisterer, ls) + scrapeInterceptor := scrape.NewInterceptor("prometheus.scrape.test", livedebugging.NewLiveDebugging(), pipelineAppendable) + + return scrapeInterceptor, ls, rwDestination +} + +func newRelabelPipeline(t testing.TB, logger log.Logger) (storage.Appendable, labelstore.LabelStore, testappender.CollectingAppender) { + ls := labelstore.New(logger, promclient.DefaultRegisterer) + rwAppendable, rwDestination := newRemoteWriteComponent(t, logger, ls) + relabelAppendable := newRelabelComponent(t, logger, []storage.Appendable{rwAppendable}, ls) + pipelineAppendable := prometheus.NewFanout([]storage.Appendable{relabelAppendable}, promclient.DefaultRegisterer, ls) + scrapeInterceptor := scrape.NewInterceptor("prometheus.scrape.test", livedebugging.NewLiveDebugging(), pipelineAppendable) + + return scrapeInterceptor, ls, rwDestination +} + +func newRemoteWriteComponent(t testing.TB, logger log.Logger, ls *labelstore.Service) (storage.Appendable, testappender.CollectingAppender) { + walDir := t.TempDir() + + walStorage, err := wal.NewStorage(logger, promclient.NewRegistry(), walDir) + require.NoError(t, err) + + fanoutLogger := slog.New( + logging.NewSlogGoKitHandler( + log.With(logger, "subcomponent", "fanout"), + ), + ) + + inMemoryAppendable := testappender.ConstantAppendable{Inner: testappender.NewCollectingAppender()} + store := storage.NewFanout(fanoutLogger, walStorage, testStorage{inMemoryAppendable: inMemoryAppendable}) + + return remotewrite.NewInterceptor("prometheus.remote_write.test", &atomic.Bool{}, livedebugging.NewLiveDebugging(), ls, store), inMemoryAppendable.Inner +} + +type testStorage struct { + // Embed Queryable/ChunkQueryable for compatibility, but don't actually implement it. + storage.Queryable + storage.ChunkQueryable + + inMemoryAppendable storage.Appendable +} + +func (t testStorage) Appender(ctx context.Context) storage.Appender { + return t.inMemoryAppendable.Appender(ctx) +} + +func (t testStorage) StartTime() (int64, error) { + return 0, nil +} + +func (t testStorage) Close() error { + return nil +} + +func newRelabelComponent(t testing.TB, logger log.Logger, forwardTo []storage.Appendable, ls *labelstore.Service) storage.Appendable { + cfg := `forward_to = [] + rule { + action = "replace" + target_label = "lbl" + replacement = "foo" + }` + var args relabel.Arguments + require.NoError(t, syntax.Unmarshal([]byte(cfg), &args)) + args.ForwardTo = forwardTo + + tc, err := componenttest.NewControllerFromID(logger, "prometheus.relabel") + require.NoError(t, err) + go func() { + err = tc.Run(componenttest.TestContext(t), args, func(opts component.Options) component.Options { + inner := opts.GetServiceData + opts.GetServiceData = func(name string) (interface{}, error) { + if name == labelstore.ServiceName { + return ls, nil + } + return inner(name) + } + return opts + }) + require.NoError(t, err) + }() + require.NoError(t, tc.WaitRunning(5*time.Second)) + + return tc.Exports().(relabel.Exports).Receiver +} + +func sendMetric(t testing.TB, appender storage.Appender, labels labels.Labels, time int64, value float64) uint64 { + ref, err := appender.Append(0, labels, time, value) + require.NoError(t, err) + require.NoError(t, appender.Commit()) + + return uint64(ref) +} diff --git a/internal/component/prometheus/receive_http/receive_http.go b/internal/component/prometheus/receive_http/receive_http.go index 5d6c14232f..e737eab65b 100644 --- a/internal/component/prometheus/receive_http/receive_http.go +++ b/internal/component/prometheus/receive_http/receive_http.go @@ -65,7 +65,7 @@ func New(opts component.Options, args Arguments) (*Component, error) { return nil, err } ls := service.(labelstore.LabelStore) - fanout := alloyprom.NewFanout(args.ForwardTo, opts.ID, opts.Registerer, ls) + fanout := alloyprom.NewFanout(args.ForwardTo, opts.Registerer, ls) uncheckedCollector := util.NewUncheckedCollector(nil) opts.Registerer.MustRegister(uncheckedCollector) diff --git a/internal/component/prometheus/receive_http/receive_http_test.go b/internal/component/prometheus/receive_http/receive_http_test.go index 4907eed9ab..5866852b4e 100644 --- a/internal/component/prometheus/receive_http/receive_http_test.go +++ b/internal/component/prometheus/receive_http/receive_http_test.go @@ -17,12 +17,6 @@ import ( "time" "github.com/golang/snappy" - "github.com/grafana/alloy/internal/component" - fnet "github.com/grafana/alloy/internal/component/common/net" - alloyprom "github.com/grafana/alloy/internal/component/prometheus" - "github.com/grafana/alloy/internal/service/labelstore" - "github.com/grafana/alloy/internal/util" - "github.com/grafana/alloy/syntax/alloytypes" "github.com/phayes/freeport" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/config" @@ -35,6 +29,13 @@ import ( "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/protoadapt" + + "github.com/grafana/alloy/internal/component" + fnet "github.com/grafana/alloy/internal/component/common/net" + alloyprom "github.com/grafana/alloy/internal/component/prometheus" + "github.com/grafana/alloy/internal/service/labelstore" + "github.com/grafana/alloy/internal/util" + "github.com/grafana/alloy/syntax/alloytypes" ) // generateTestCertAndKey generates a self-signed certificate and private key for testing @@ -569,10 +570,8 @@ func testAppendable(actualSamples chan testSample) []storage.Appendable { return ref, nil } - ls := labelstore.New(nil, prometheus.DefaultRegisterer) return []storage.Appendable{alloyprom.NewInterceptor( nil, - ls, alloyprom.WithAppendHook( hookFn))} } diff --git a/internal/component/prometheus/relabel/relabel.go b/internal/component/prometheus/relabel/relabel.go index 10cb285374..d71d4247ab 100644 --- a/internal/component/prometheus/relabel/relabel.go +++ b/internal/component/prometheus/relabel/relabel.go @@ -153,10 +153,9 @@ func New(o component.Options, args Arguments) (*Component, error) { } } - c.fanout = prometheus.NewFanout(args.ForwardTo, o.ID, o.Registerer, ls) + c.fanout = prometheus.NewFanout(args.ForwardTo, o.Registerer, ls) c.receiver = prometheus.NewInterceptor( c.fanout, - ls, prometheus.WithComponentID(c.opts.ID), prometheus.WithAppendHook(func(ref storage.SeriesRef, l labels.Labels, t int64, v float64, next storage.Appender) (storage.SeriesRef, error) { if c.exited.Load() { diff --git a/internal/component/prometheus/relabel/relabel_test.go b/internal/component/prometheus/relabel/relabel_test.go index d1145cbb5b..b82845ba2c 100644 --- a/internal/component/prometheus/relabel/relabel_test.go +++ b/internal/component/prometheus/relabel/relabel_test.go @@ -25,32 +25,8 @@ import ( "github.com/grafana/alloy/syntax" ) -func TestRelabelThroughAppend(t *testing.T) { - appendable, relabeller := generateRelabel(t) - lbls := labels.FromStrings("__address__", "localhost") - - app := appendable.Appender(t.Context()) - relabedRef, err := app.Append(storage.SeriesRef(0), lbls, time.Now().UnixMilli(), 0) - require.NoError(t, err) - require.NoError(t, app.Commit()) - - require.True(t, relabeller.cache.Len() == 1) - // Get the first entry since we only have one we can get oldest - ref, cachedLbls, _ := relabeller.cache.GetOldest() - - // We shouldn't have allowed a zero ref to be cached - require.NotEqual(t, storage.SeriesRef(0), ref) - require.NotEqual(t, lbls, cachedLbls) - - // We should have added a new ref after relabeling - require.NotEqual(t, storage.SeriesRef(0), relabedRef) - - // That ref should not be the cached ref - require.NotEqual(t, relabedRef, ref) -} - func TestUpdateReset(t *testing.T) { - _, relabeller := generateRelabel(t) + relabeller := generateRelabel(t) lbls := labels.FromStrings("__address__", "localhost") relabeller.relabel(storage.SeriesRef(1), 0, lbls) require.True(t, relabeller.cache.Len() == 1) @@ -72,8 +48,7 @@ func TestValidator(t *testing.T) { } func TestNil(t *testing.T) { - ls := labelstore.New(nil, prom.DefaultRegisterer) - fanout := prometheus.NewInterceptor(nil, ls, prometheus.WithAppendHook(func(ref storage.SeriesRef, _ labels.Labels, _ int64, _ float64, _ storage.Appender) (storage.SeriesRef, error) { + fanout := prometheus.NewInterceptor(nil, prometheus.WithAppendHook(func(ref storage.SeriesRef, _ labels.Labels, _ int64, _ float64, _ storage.Appender) (storage.SeriesRef, error) { require.True(t, false) return ref, nil })) @@ -102,7 +77,7 @@ func TestNil(t *testing.T) { } func TestLRU(t *testing.T) { - _, relabeller := generateRelabel(t) + relabeller := generateRelabel(t) for i := 0; i < 600_000; i++ { lbls := labels.FromStrings("__address__", "localhost", "inc", strconv.Itoa(i)) @@ -112,7 +87,7 @@ func TestLRU(t *testing.T) { } func TestLRUNaN(t *testing.T) { - _, relabeller := generateRelabel(t) + relabeller := generateRelabel(t) lbls := labels.FromStrings("__address__", "localhost") ref := storage.SeriesRef(1) relabeller.relabel(ref, 0, lbls) @@ -127,7 +102,7 @@ func TestLRUNaN(t *testing.T) { } func TestMetrics(t *testing.T) { - _, relabeller := generateRelabel(t) + relabeller := generateRelabel(t) lbls := labels.FromStrings("__address__", "localhost") relabeller.relabel(storage.SeriesRef(1), 0, lbls) @@ -138,8 +113,7 @@ func TestMetrics(t *testing.T) { } func BenchmarkCache(b *testing.B) { - ls := labelstore.New(nil, prom.DefaultRegisterer) - fanout := prometheus.NewInterceptor(nil, ls, prometheus.WithAppendHook(func(ref storage.SeriesRef, l labels.Labels, _ int64, _ float64, _ storage.Appender) (storage.SeriesRef, error) { + fanout := prometheus.NewInterceptor(nil, prometheus.WithAppendHook(func(ref storage.SeriesRef, l labels.Labels, _ int64, _ float64, _ storage.Appender) (storage.SeriesRef, error) { require.True(b, l.Has("new_label")) return ref, nil })) @@ -173,17 +147,12 @@ func BenchmarkCache(b *testing.B) { app.Commit() } -func generateRelabel(t *testing.T) (storage.Appendable, *Component) { - ls := labelstore.New(nil, prom.DefaultRegisterer) - fanout := prometheus.NewInterceptor(nil, ls) - var appendable storage.Appendable +func generateRelabel(t *testing.T) *Component { + fanout := prometheus.NewInterceptor(nil) relabeller, err := New(component.Options{ - ID: "1", - Logger: util.TestAlloyLogger(t), - OnStateChange: func(e component.Exports) { - newE := e.(Exports) - appendable = newE.Receiver - }, + ID: "1", + Logger: util.TestAlloyLogger(t), + OnStateChange: func(e component.Exports) {}, Registerer: prom.NewRegistry(), GetServiceData: getServiceData, }, Arguments{ @@ -201,7 +170,7 @@ func generateRelabel(t *testing.T) (storage.Appendable, *Component) { }) require.NotNil(t, relabeller) require.NoError(t, err) - return appendable, relabeller + return relabeller } func TestRuleGetter(t *testing.T) { diff --git a/internal/component/prometheus/remotewrite/interceptor.go b/internal/component/prometheus/remotewrite/interceptor.go new file mode 100644 index 0000000000..dbd2ae0ce0 --- /dev/null +++ b/internal/component/prometheus/remotewrite/interceptor.go @@ -0,0 +1,147 @@ +package remotewrite + +import ( + "fmt" + + "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/metadata" + "github.com/prometheus/prometheus/storage" + "go.uber.org/atomic" + + "github.com/grafana/alloy/internal/component/prometheus" + "github.com/grafana/alloy/internal/service/labelstore" + "github.com/grafana/alloy/internal/service/livedebugging" +) + +func NewInterceptor(componentID string, exited *atomic.Bool, debugDataPublisher livedebugging.DebugDataPublisher, ls labelstore.LabelStore, store storage.Storage) *prometheus.Interceptor { + liveDebuggingComponentID := livedebugging.ComponentID(componentID) + + handleLocalLink := func(globalRef uint64, l labels.Labels, cachedLocalRef uint64, newLocalRef uint64) { + // We had a local ref that was still valid nothing to do + if cachedLocalRef != 0 && cachedLocalRef == newLocalRef { + return + } + + // There are some unique scenarios that can have an append end with no error but the returned localRef is zero (duplicate exemplars). + // We don't want to update a valid link to an invalid link + if cachedLocalRef != 0 && newLocalRef == 0 { + return + } + + // This should never happen in a proper appender chain. Since we cannot enforce it, we are extra defensive. + if globalRef == 0 { + globalRef = ls.GetOrAddGlobalRefID(l) + } + + if cachedLocalRef == 0 { + ls.AddLocalLink(componentID, globalRef, newLocalRef) + } else { + ls.ReplaceLocalLink(componentID, globalRef, cachedLocalRef, newLocalRef) + } + } + + return prometheus.NewInterceptor( + store, + prometheus.WithComponentID(componentID), + // In the methods below, conversion is needed because remote_writes assume + // they are responsible for generating ref IDs. This means two + // remote_writes may return the same ref ID for two different series. We + // treat the remote_write ID as a "local ID" and translate it to a "global + // ID" to ensure Alloy compatibility. + + prometheus.WithAppendHook(func(globalRef storage.SeriesRef, l labels.Labels, t int64, v float64, next storage.Appender) (storage.SeriesRef, error) { + if exited.Load() { + return 0, fmt.Errorf("%s has exited", componentID) + } + + localRef := ls.GetLocalRefID(componentID, uint64(globalRef)) + newLocalRef, nextErr := next.Append(storage.SeriesRef(localRef), l, t, v) + if nextErr == nil { + handleLocalLink(uint64(globalRef), l, localRef, uint64(newLocalRef)) + } + + debugDataPublisher.PublishIfActive(livedebugging.NewData( + liveDebuggingComponentID, + livedebugging.PrometheusMetric, + 1, + func() string { + return fmt.Sprintf("sample: ts=%d, labels=%s, value=%f", t, l, v) + }, + )) + return globalRef, nextErr + }), + prometheus.WithHistogramHook(func(globalRef storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram, next storage.Appender) (storage.SeriesRef, error) { + if exited.Load() { + return 0, fmt.Errorf("%s has exited", componentID) + } + + localRef := ls.GetLocalRefID(componentID, uint64(globalRef)) + newLocalRef, nextErr := next.AppendHistogram(storage.SeriesRef(localRef), l, t, h, fh) + if nextErr == nil { + handleLocalLink(uint64(globalRef), l, localRef, uint64(newLocalRef)) + } + + debugDataPublisher.PublishIfActive(livedebugging.NewData( + liveDebuggingComponentID, + livedebugging.PrometheusMetric, + 1, + func() string { + var data string + if h != nil { + data = fmt.Sprintf("histogram: ts=%d, labels=%s, value=%s", t, l, h.String()) + } else if fh != nil { + data = fmt.Sprintf("float_histogram: ts=%d, labels=%s, value=%s", t, l, fh.String()) + } else { + data = fmt.Sprintf("histogram_with_no_value: ts=%d, labels=%s", t, l) + } + return data + }, + )) + return globalRef, nextErr + }), + prometheus.WithMetadataHook(func(globalRef storage.SeriesRef, l labels.Labels, m metadata.Metadata, next storage.Appender) (storage.SeriesRef, error) { + if exited.Load() { + return 0, fmt.Errorf("%s has exited", componentID) + } + + localRef := ls.GetLocalRefID(componentID, uint64(globalRef)) + newLocalRef, nextErr := next.UpdateMetadata(storage.SeriesRef(localRef), l, m) + if nextErr == nil { + handleLocalLink(uint64(globalRef), l, localRef, uint64(newLocalRef)) + } + + debugDataPublisher.PublishIfActive(livedebugging.NewData( + liveDebuggingComponentID, + livedebugging.PrometheusMetric, + 1, + func() string { + return fmt.Sprintf("metadata: labels=%s, type=%q, unit=%q, help=%q", l, m.Type, m.Unit, m.Help) + }, + )) + return globalRef, nextErr + }), + prometheus.WithExemplarHook(func(globalRef storage.SeriesRef, l labels.Labels, e exemplar.Exemplar, next storage.Appender) (storage.SeriesRef, error) { + if exited.Load() { + return 0, fmt.Errorf("%s has exited", componentID) + } + + localRef := ls.GetLocalRefID(componentID, uint64(globalRef)) + newLocalRef, nextErr := next.AppendExemplar(storage.SeriesRef(localRef), l, e) + if nextErr == nil { + handleLocalLink(uint64(globalRef), l, localRef, uint64(newLocalRef)) + } + + debugDataPublisher.PublishIfActive(livedebugging.NewData( + liveDebuggingComponentID, + livedebugging.PrometheusMetric, + 1, + func() string { + return fmt.Sprintf("exemplar: ts=%d, labels=%s, exemplar_labels=%s, value=%f", e.Ts, l, e.Labels, e.Value) + }, + )) + return globalRef, nextErr + }), + ) +} diff --git a/internal/component/prometheus/remotewrite/remote_write.go b/internal/component/prometheus/remotewrite/remote_write.go index 236a1ba1b2..fe8f52fb38 100644 --- a/internal/component/prometheus/remotewrite/remote_write.go +++ b/internal/component/prometheus/remotewrite/remote_write.go @@ -11,10 +11,6 @@ import ( "time" "github.com/go-kit/log" - "github.com/prometheus/prometheus/model/exemplar" - "github.com/prometheus/prometheus/model/histogram" - "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/model/metadata" "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/remote" @@ -125,135 +121,8 @@ func New(o component.Options, args Arguments) (*Component, error) { storage: storage.NewFanout(fanoutLogger, walStorage, remoteStore), debugDataPublisher: debugDataPublisher.(livedebugging.DebugDataPublisher), } - componentID := livedebugging.ComponentID(res.opts.ID) - handleLocalLink := func(globalRef uint64, l labels.Labels, cachedLocalRef uint64, newLocalRef uint64) { - // We had a local ref that was still valid nothing to do - if cachedLocalRef != 0 && cachedLocalRef == newLocalRef { - return - } - - // There are some unique scenarios that can have an append end with no error but the returned localRef is zero (duplicate exemplars). - // We don't want to update a valid link to an invalid link - if cachedLocalRef != 0 && newLocalRef == 0 { - return - } - - // This should never happen in a proper appender chain. Since we cannot enforce it, we are extra defensive. - if globalRef == 0 { - globalRef = ls.GetOrAddGlobalRefID(l) - } - - if cachedLocalRef == 0 { - ls.AddLocalLink(res.opts.ID, globalRef, newLocalRef) - } else { - ls.ReplaceLocalLink(res.opts.ID, globalRef, cachedLocalRef, newLocalRef) - } - } - - res.receiver = prometheus.NewInterceptor( - res.storage, - ls, - prometheus.WithComponentID(res.opts.ID), - // In the methods below, conversion is needed because remote_writes assume - // they are responsible for generating ref IDs. This means two - // remote_writes may return the same ref ID for two different series. We - // treat the remote_write ID as a "local ID" and translate it to a "global - // ID" to ensure Alloy compatibility. - - prometheus.WithAppendHook(func(globalRef storage.SeriesRef, l labels.Labels, t int64, v float64, next storage.Appender) (storage.SeriesRef, error) { - if res.exited.Load() { - return 0, fmt.Errorf("%s has exited", o.ID) - } - - localRef := ls.GetLocalRefID(res.opts.ID, uint64(globalRef)) - newLocalRef, nextErr := next.Append(storage.SeriesRef(localRef), l, t, v) - if nextErr == nil { - handleLocalLink(uint64(globalRef), l, localRef, uint64(newLocalRef)) - } - - res.debugDataPublisher.PublishIfActive(livedebugging.NewData( - componentID, - livedebugging.PrometheusMetric, - 1, - func() string { - return fmt.Sprintf("sample: ts=%d, labels=%s, value=%f", t, l, v) - }, - )) - return globalRef, nextErr - }), - prometheus.WithHistogramHook(func(globalRef storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram, next storage.Appender) (storage.SeriesRef, error) { - if res.exited.Load() { - return 0, fmt.Errorf("%s has exited", o.ID) - } - - localRef := ls.GetLocalRefID(res.opts.ID, uint64(globalRef)) - newLocalRef, nextErr := next.AppendHistogram(storage.SeriesRef(localRef), l, t, h, fh) - if nextErr == nil { - handleLocalLink(uint64(globalRef), l, localRef, uint64(newLocalRef)) - } - - res.debugDataPublisher.PublishIfActive(livedebugging.NewData( - componentID, - livedebugging.PrometheusMetric, - 1, - func() string { - var data string - if h != nil { - data = fmt.Sprintf("histogram: ts=%d, labels=%s, value=%s", t, l, h.String()) - } else if fh != nil { - data = fmt.Sprintf("float_histogram: ts=%d, labels=%s, value=%s", t, l, fh.String()) - } else { - data = fmt.Sprintf("histogram_with_no_value: ts=%d, labels=%s", t, l) - } - return data - }, - )) - return globalRef, nextErr - }), - prometheus.WithMetadataHook(func(globalRef storage.SeriesRef, l labels.Labels, m metadata.Metadata, next storage.Appender) (storage.SeriesRef, error) { - if res.exited.Load() { - return 0, fmt.Errorf("%s has exited", o.ID) - } - - localRef := ls.GetLocalRefID(res.opts.ID, uint64(globalRef)) - newLocalRef, nextErr := next.UpdateMetadata(storage.SeriesRef(localRef), l, m) - if nextErr == nil { - handleLocalLink(uint64(globalRef), l, localRef, uint64(newLocalRef)) - } - - res.debugDataPublisher.PublishIfActive(livedebugging.NewData( - componentID, - livedebugging.PrometheusMetric, - 1, - func() string { - return fmt.Sprintf("metadata: labels=%s, type=%q, unit=%q, help=%q", l, m.Type, m.Unit, m.Help) - }, - )) - return globalRef, nextErr - }), - prometheus.WithExemplarHook(func(globalRef storage.SeriesRef, l labels.Labels, e exemplar.Exemplar, next storage.Appender) (storage.SeriesRef, error) { - if res.exited.Load() { - return 0, fmt.Errorf("%s has exited", o.ID) - } - - localRef := ls.GetLocalRefID(res.opts.ID, uint64(globalRef)) - newLocalRef, nextErr := next.AppendExemplar(storage.SeriesRef(localRef), l, e) - if nextErr == nil { - handleLocalLink(uint64(globalRef), l, localRef, uint64(newLocalRef)) - } - - res.debugDataPublisher.PublishIfActive(livedebugging.NewData( - componentID, - livedebugging.PrometheusMetric, - 1, - func() string { - return fmt.Sprintf("exemplar: ts=%d, labels=%s, exemplar_labels=%s, value=%f", e.Ts, l, e.Labels, e.Value) - }, - )) - return globalRef, nextErr - }), - ) + res.receiver = NewInterceptor(o.ID, &res.exited, res.debugDataPublisher, ls, res.storage) // Immediately export the receiver which remains the same for the component // lifetime. diff --git a/internal/component/prometheus/remotewrite/remote_write_test.go b/internal/component/prometheus/remotewrite/remote_write_test.go index 0cb846892e..e7ca57a231 100644 --- a/internal/component/prometheus/remotewrite/remote_write_test.go +++ b/internal/component/prometheus/remotewrite/remote_write_test.go @@ -7,14 +7,15 @@ import ( "testing" "time" - "github.com/grafana/alloy/internal/component/prometheus/remotewrite" - "github.com/grafana/alloy/internal/runtime/componenttest" - "github.com/grafana/alloy/internal/util" - "github.com/grafana/alloy/syntax" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage/remote" "github.com/stretchr/testify/require" + + "github.com/grafana/alloy/internal/component/prometheus/remotewrite" + "github.com/grafana/alloy/internal/runtime/componenttest" + "github.com/grafana/alloy/internal/util" + "github.com/grafana/alloy/syntax" ) // Test is an integration-level test which ensures that metrics can get sent to @@ -40,7 +41,9 @@ func Test(t *testing.T) { remote_timeout = "100ms" queue_config { - batch_send_deadline = "100ms" + // This will guarantee that we get both samples we expect or the test times out + max_samples_per_send = 2 + batch_send_deadline = "1m" } } `, srv.URL)) @@ -221,6 +224,7 @@ func sendMetric( } func testArgsForConfig(t *testing.T, cfg string) remotewrite.Arguments { + t.Helper() var args remotewrite.Arguments require.NoError(t, syntax.Unmarshal([]byte(cfg), &args)) return args diff --git a/internal/component/prometheus/scrape/interceptor.go b/internal/component/prometheus/scrape/interceptor.go new file mode 100644 index 0000000000..f99ed36fed --- /dev/null +++ b/internal/component/prometheus/scrape/interceptor.go @@ -0,0 +1,77 @@ +package scrape + +import ( + "fmt" + + "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/metadata" + "github.com/prometheus/prometheus/storage" + + "github.com/grafana/alloy/internal/component/prometheus" + "github.com/grafana/alloy/internal/service/livedebugging" +) + +// NewInterceptor creates a new Prometheus storage.Appendable interceptor proxies calls to the provided appendable publishing +// live debugging data using the provided debugDataPublisher if live debugging is active. +func NewInterceptor(componentID livedebugging.ComponentID, debugDataPublisher livedebugging.DebugDataPublisher, appendable storage.Appendable) *prometheus.Interceptor { + return prometheus.NewInterceptor(appendable, + prometheus.WithAppendHook(func(globalRef storage.SeriesRef, l labels.Labels, t int64, v float64, next storage.Appender) (storage.SeriesRef, error) { + newRef, nextErr := next.Append(globalRef, l, t, v) + debugDataPublisher.PublishIfActive(livedebugging.NewData( + componentID, + livedebugging.PrometheusMetric, + 1, + func() string { + return fmt.Sprintf("sample: ts=%d, labels=%s, value=%f", t, l, v) + }, + )) + return newRef, nextErr + }), + prometheus.WithHistogramHook(func(globalRef storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram, next storage.Appender) (storage.SeriesRef, error) { + newRef, nextErr := next.AppendHistogram(globalRef, l, t, h, fh) + debugDataPublisher.PublishIfActive(livedebugging.NewData( + componentID, + livedebugging.PrometheusMetric, + 1, + func() string { + var data string + if h != nil { + data = fmt.Sprintf("histogram: ts=%d, labels=%s, value=%s", t, l, h.String()) + } else if fh != nil { + data = fmt.Sprintf("float_histogram: ts=%d, labels=%s, value=%s", t, l, fh.String()) + } else { + data = fmt.Sprintf("histogram_with_no_value: ts=%d, labels=%s", t, l) + } + return data + }, + )) + return newRef, nextErr + }), + prometheus.WithMetadataHook(func(globalRef storage.SeriesRef, l labels.Labels, m metadata.Metadata, next storage.Appender) (storage.SeriesRef, error) { + newRef, nextErr := next.UpdateMetadata(globalRef, l, m) + debugDataPublisher.PublishIfActive(livedebugging.NewData( + componentID, + livedebugging.PrometheusMetric, + 1, + func() string { + return fmt.Sprintf("metadata: labels=%s, type=%q, unit=%q, help=%q", l, m.Type, m.Unit, m.Help) + }, + )) + return newRef, nextErr + }), + prometheus.WithExemplarHook(func(globalRef storage.SeriesRef, l labels.Labels, e exemplar.Exemplar, next storage.Appender) (storage.SeriesRef, error) { + newRef, nextErr := next.AppendExemplar(globalRef, l, e) + debugDataPublisher.PublishIfActive(livedebugging.NewData( + componentID, + livedebugging.PrometheusMetric, + 1, + func() string { + return fmt.Sprintf("exemplar: ts=%d, labels=%s, exemplar_labels=%s, value=%f", e.Ts, l, e.Labels, e.Value) + }, + )) + return newRef, nextErr + }), + ) +} diff --git a/internal/component/prometheus/scrape/scrape.go b/internal/component/prometheus/scrape/scrape.go index 54c2ef540d..902f56fc57 100644 --- a/internal/component/prometheus/scrape/scrape.go +++ b/internal/component/prometheus/scrape/scrape.go @@ -16,10 +16,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery/targetgroup" - "github.com/prometheus/prometheus/model/exemplar" - "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/model/metadata" "github.com/prometheus/prometheus/scrape" "github.com/prometheus/prometheus/storage" promlogging "github.com/prometheus/prometheus/util/logging" @@ -310,7 +307,7 @@ func New(o component.Options, args Arguments) (*Component, error) { return nil, fmt.Errorf("honor_metadata is an experimental feature, and must be enabled by setting the stability.level flag to experimental") } - alloyAppendable := prometheus.NewFanout(args.ForwardTo, o.ID, o.Registerer, ls) + alloyAppendable := prometheus.NewFanout(args.ForwardTo, o.Registerer, ls) scrapeOptions := &scrape.Options{ // NOTE: This is not Update()-able. ExtraMetrics: args.ExtraMetrics, @@ -353,7 +350,7 @@ func New(o component.Options, args Arguments) (*Component, error) { unregisterer: unregisterer, } - interceptor := c.newInterceptor(ls) + interceptor := NewInterceptor(livedebugging.ComponentID(o.ID), c.debugDataPublisher, alloyAppendable) scraper, err := scrape.NewManager( scrapeOptions, @@ -648,66 +645,4 @@ func (c *Component) populatePromLabels(targets []discovery.Target, jobName strin return allTargets } -func (c *Component) newInterceptor(ls labelstore.LabelStore) *prometheus.Interceptor { - componentID := livedebugging.ComponentID(c.opts.ID) - return prometheus.NewInterceptor(c.appendable, ls, - prometheus.WithAppendHook(func(globalRef storage.SeriesRef, l labels.Labels, t int64, v float64, next storage.Appender) (storage.SeriesRef, error) { - _, nextErr := next.Append(globalRef, l, t, v) - c.debugDataPublisher.PublishIfActive(livedebugging.NewData( - componentID, - livedebugging.PrometheusMetric, - 1, - func() string { - return fmt.Sprintf("sample: ts=%d, labels=%s, value=%f", t, l, v) - }, - )) - return globalRef, nextErr - }), - prometheus.WithHistogramHook(func(globalRef storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram, next storage.Appender) (storage.SeriesRef, error) { - _, nextErr := next.AppendHistogram(globalRef, l, t, h, fh) - c.debugDataPublisher.PublishIfActive(livedebugging.NewData( - componentID, - livedebugging.PrometheusMetric, - 1, - func() string { - var data string - if h != nil { - data = fmt.Sprintf("histogram: ts=%d, labels=%s, value=%s", t, l, h.String()) - } else if fh != nil { - data = fmt.Sprintf("float_histogram: ts=%d, labels=%s, value=%s", t, l, fh.String()) - } else { - data = fmt.Sprintf("histogram_with_no_value: ts=%d, labels=%s", t, l) - } - return data - }, - )) - return globalRef, nextErr - }), - prometheus.WithMetadataHook(func(globalRef storage.SeriesRef, l labels.Labels, m metadata.Metadata, next storage.Appender) (storage.SeriesRef, error) { - _, nextErr := next.UpdateMetadata(globalRef, l, m) - c.debugDataPublisher.PublishIfActive(livedebugging.NewData( - componentID, - livedebugging.PrometheusMetric, - 1, - func() string { - return fmt.Sprintf("metadata: labels=%s, type=%q, unit=%q, help=%q", l, m.Type, m.Unit, m.Help) - }, - )) - return globalRef, nextErr - }), - prometheus.WithExemplarHook(func(globalRef storage.SeriesRef, l labels.Labels, e exemplar.Exemplar, next storage.Appender) (storage.SeriesRef, error) { - _, nextErr := next.AppendExemplar(globalRef, l, e) - c.debugDataPublisher.PublishIfActive(livedebugging.NewData( - componentID, - livedebugging.PrometheusMetric, - 1, - func() string { - return fmt.Sprintf("exemplar: ts=%d, labels=%s, exemplar_labels=%s, value=%f", e.Ts, l, e.Labels, e.Value) - }, - )) - return globalRef, nextErr - }), - ) -} - func (c *Component) LiveDebugging() {} diff --git a/internal/runtime/componenttest/componenttest.go b/internal/runtime/componenttest/componenttest.go index 0df4c20c75..d72dd89048 100644 --- a/internal/runtime/componenttest/componenttest.go +++ b/internal/runtime/componenttest/componenttest.go @@ -118,7 +118,7 @@ func (c *Controller) Exports() component.Exports { // until ctx is canceled, the component exits, or if there was an error. // // Run may only be called once per Controller. -func (c *Controller) Run(ctx context.Context, args component.Arguments) error { +func (c *Controller) Run(ctx context.Context, args component.Arguments, optsModifiers ...func(opts component.Options) component.Options) error { dataPath, err := os.MkdirTemp("", "controller-*") if err != nil { return err @@ -127,7 +127,7 @@ func (c *Controller) Run(ctx context.Context, args component.Arguments) error { _ = os.RemoveAll(dataPath) }() - run, err := c.buildComponent(dataPath, args) + run, err := c.buildComponent(dataPath, args, optsModifiers...) if err != nil { c.onRun.Do(func() { @@ -157,7 +157,7 @@ func (c *Controller) Run(ctx context.Context, args component.Arguments) error { return err } -func (c *Controller) buildComponent(dataPath string, args component.Arguments) (component.Component, error) { +func (c *Controller) buildComponent(dataPath string, args component.Arguments, optsModifiers ...func(opts component.Options) component.Options) (component.Component, error) { c.innerMut.Lock() defer c.innerMut.Unlock() @@ -189,6 +189,10 @@ func (c *Controller) buildComponent(dataPath string, args component.Arguments) ( }, } + for _, mod := range optsModifiers { + opts = mod(opts) + } + inner, err := c.reg.Build(opts, args) if err != nil { return nil, err