Skip to content

Commit 50b5eca

Browse files
committed
Remove labelstore interactions from the interceptor
1 parent a10fb83 commit 50b5eca

File tree

9 files changed

+40
-91
lines changed

9 files changed

+40
-91
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ Main (unreleased)
1919

2020
- update promtail converter to use `file_match` block for `loki.source.file` instead of going through `local.file_match`. (@kalleep)
2121

22+
- remove labelstore interactions from the prometheus interceptor simplifying prometheus pipelines. (@kgeckhart)
23+
2224
### Bugfixes
2325

2426
- `loki.source.api` no longer drops request when relabel rules drops a specific stream. (@kalleep)

internal/component/prometheus/enrich/enrich.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,6 @@ func New(opts component.Options, args Arguments) (*Component, error) {
100100
c.fanout = prometheus.NewFanout(args.ForwardTo, opts.Registerer, ls)
101101
c.receiver = prometheus.NewInterceptor(
102102
c.fanout,
103-
ls,
104103
prometheus.WithComponentID(c.opts.ID),
105104
prometheus.WithAppendHook(func(_ storage.SeriesRef, l labels.Labels, t int64, v float64, next storage.Appender) (storage.SeriesRef, error) {
106105
if c.exited.Load() {

internal/component/prometheus/enrich/enrich_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,16 @@ import (
55
"testing"
66
"time"
77

8+
prom "github.com/prometheus/client_golang/prometheus"
9+
"github.com/prometheus/prometheus/model/labels"
10+
"github.com/prometheus/prometheus/storage"
11+
"github.com/stretchr/testify/require"
12+
813
"github.com/grafana/alloy/internal/component"
914
"github.com/grafana/alloy/internal/component/discovery"
1015
"github.com/grafana/alloy/internal/component/prometheus"
1116
"github.com/grafana/alloy/internal/service/labelstore"
1217
"github.com/grafana/alloy/internal/util"
13-
prom "github.com/prometheus/client_golang/prometheus"
14-
"github.com/prometheus/prometheus/model/labels"
15-
"github.com/prometheus/prometheus/storage"
16-
"github.com/stretchr/testify/require"
1718
)
1819

1920
func TestEnricher(t *testing.T) {
@@ -108,9 +109,8 @@ func TestEnricher(t *testing.T) {
108109
}
109110
for _, tt := range tests {
110111
t.Run(tt.name, func(t *testing.T) {
111-
ls := labelstore.New(nil, prom.DefaultRegisterer)
112112
fanout := prometheus.NewInterceptor(
113-
nil, ls,
113+
nil,
114114
prometheus.WithAppendHook(func(ref storage.SeriesRef, l labels.Labels, _ int64, _ float64, _ storage.Appender) (storage.SeriesRef, error) {
115115
for name, value := range tt.expectedLabels {
116116
require.Equal(t, l.Get(name), value)

internal/component/prometheus/interceptor.go

Lines changed: 9 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,6 @@ import (
88
"github.com/prometheus/prometheus/model/labels"
99
"github.com/prometheus/prometheus/model/metadata"
1010
"github.com/prometheus/prometheus/storage"
11-
"go.uber.org/atomic"
12-
13-
"github.com/grafana/alloy/internal/service/labelstore"
1411
)
1512

1613
// Interceptor is a storage.Appendable which invokes callback functions upon
@@ -26,23 +23,16 @@ type Interceptor struct {
2623
// next is the next appendable to pass in the chain.
2724
next storage.Appendable
2825

29-
ls labelstore.LabelStore
30-
31-
// lastSeriesCount stores the number of series that were sent through the last interceptappender. It helps to estimate how
32-
// much memory to allocate for the staleness trackers.
33-
lastSeriesCount atomic.Int64
34-
3526
componentID string
3627
}
3728

3829
var _ storage.Appendable = (*Interceptor)(nil)
3930

4031
// NewInterceptor creates a new Interceptor storage.Appendable. Options can be
4132
// provided to NewInterceptor to install custom hooks for different methods.
42-
func NewInterceptor(next storage.Appendable, ls labelstore.LabelStore, opts ...InterceptorOption) *Interceptor {
33+
func NewInterceptor(next storage.Appendable, opts ...InterceptorOption) *Interceptor {
4334
i := &Interceptor{
4435
next: next,
45-
ls: ls,
4636
}
4737
for _, opt := range opts {
4838
opt(i)
@@ -102,27 +92,23 @@ func WithComponentID(id string) InterceptorOption {
10292
}
10393

10494
// Appender satisfies the Appendable interface.
105-
func (f *Interceptor) Appender(ctx context.Context) storage.Appender {
95+
func (i *Interceptor) Appender(ctx context.Context) storage.Appender {
10696
app := &interceptappender{
107-
interceptor: f,
108-
ls: f.ls,
109-
stalenessTrackers: make([]labelstore.StalenessTracker, 0, f.lastSeriesCount.Load()),
97+
interceptor: i,
11098
}
111-
if f.next != nil {
112-
app.child = f.next.Appender(ctx)
99+
if i.next != nil {
100+
app.child = i.next.Appender(ctx)
113101
}
114102
return app
115103
}
116104

117-
func (f *Interceptor) String() string {
118-
return f.componentID + ".receiver"
105+
func (i *Interceptor) String() string {
106+
return i.componentID + ".receiver"
119107
}
120108

121109
type interceptappender struct {
122-
interceptor *Interceptor
123-
child storage.Appender
124-
ls labelstore.LabelStore
125-
stalenessTrackers []labelstore.StalenessTracker
110+
interceptor *Interceptor
111+
child storage.Appender
126112
}
127113

128114
func (a *interceptappender) SetOptions(opts *storage.AppendOptions) {
@@ -135,15 +121,6 @@ var _ storage.Appender = (*interceptappender)(nil)
135121

136122
// Append satisfies the Appender interface.
137123
func (a *interceptappender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
138-
if ref == 0 {
139-
ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l))
140-
}
141-
a.stalenessTrackers = append(a.stalenessTrackers, labelstore.StalenessTracker{
142-
GlobalRefID: uint64(ref),
143-
Labels: l,
144-
Value: v,
145-
})
146-
147124
if a.interceptor.onAppend != nil {
148125
return a.interceptor.onAppend(ref, l, t, v, a.child)
149126
}
@@ -155,8 +132,6 @@ func (a *interceptappender) Append(ref storage.SeriesRef, l labels.Labels, t int
155132

156133
// Commit satisfies the Appender interface.
157134
func (a *interceptappender) Commit() error {
158-
a.interceptor.lastSeriesCount.Store(int64(len(a.stalenessTrackers)))
159-
a.ls.TrackStaleness(a.stalenessTrackers)
160135
if a.child == nil {
161136
return nil
162137
}
@@ -165,8 +140,6 @@ func (a *interceptappender) Commit() error {
165140

166141
// Rollback satisfies the Appender interface.
167142
func (a *interceptappender) Rollback() error {
168-
a.interceptor.lastSeriesCount.Store(int64(len(a.stalenessTrackers)))
169-
a.ls.TrackStaleness(a.stalenessTrackers)
170143
if a.child == nil {
171144
return nil
172145
}
@@ -180,10 +153,6 @@ func (a *interceptappender) AppendExemplar(
180153
e exemplar.Exemplar,
181154
) (storage.SeriesRef, error) {
182155

183-
if ref == 0 {
184-
ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l))
185-
}
186-
187156
if a.interceptor.onAppendExemplar != nil {
188157
return a.interceptor.onAppendExemplar(ref, l, e, a.child)
189158
}
@@ -200,10 +169,6 @@ func (a *interceptappender) UpdateMetadata(
200169
m metadata.Metadata,
201170
) (storage.SeriesRef, error) {
202171

203-
if ref == 0 {
204-
ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l))
205-
}
206-
207172
if a.interceptor.onUpdateMetadata != nil {
208173
return a.interceptor.onUpdateMetadata(ref, l, m, a.child)
209174
}
@@ -221,10 +186,6 @@ func (a *interceptappender) AppendHistogram(
221186
fh *histogram.FloatHistogram,
222187
) (storage.SeriesRef, error) {
223188

224-
if ref == 0 {
225-
ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l))
226-
}
227-
// TODO histograms are not currently tracked for staleness causing them to be held forever
228189
if a.interceptor.onAppendHistogram != nil {
229190
return a.interceptor.onAppendHistogram(ref, l, t, h, fh, a.child)
230191
}
@@ -240,10 +201,6 @@ func (a *interceptappender) AppendCTZeroSample(
240201
t, ct int64,
241202
) (storage.SeriesRef, error) {
242203

243-
if ref == 0 {
244-
ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l))
245-
}
246-
247204
if a.interceptor.onAppendCTZeroSample != nil {
248205
return a.interceptor.onAppendCTZeroSample(ref, l, t, ct, a.child)
249206
}
@@ -261,10 +218,6 @@ func (a *interceptappender) AppendHistogramCTZeroSample(
261218
fh *histogram.FloatHistogram,
262219
) (storage.SeriesRef, error) {
263220

264-
if ref == 0 {
265-
ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l))
266-
}
267-
268221
if a.child == nil {
269222
return 0, nil
270223
}

internal/component/prometheus/receive_http/receive_http_test.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,6 @@ import (
1717
"time"
1818

1919
"github.com/golang/snappy"
20-
"github.com/grafana/alloy/internal/component"
21-
fnet "github.com/grafana/alloy/internal/component/common/net"
22-
alloyprom "github.com/grafana/alloy/internal/component/prometheus"
23-
"github.com/grafana/alloy/internal/service/labelstore"
24-
"github.com/grafana/alloy/internal/util"
25-
"github.com/grafana/alloy/syntax/alloytypes"
2620
"github.com/phayes/freeport"
2721
"github.com/prometheus/client_golang/prometheus"
2822
"github.com/prometheus/common/config"
@@ -35,6 +29,13 @@ import (
3529
"github.com/stretchr/testify/require"
3630
"google.golang.org/protobuf/proto"
3731
"google.golang.org/protobuf/protoadapt"
32+
33+
"github.com/grafana/alloy/internal/component"
34+
fnet "github.com/grafana/alloy/internal/component/common/net"
35+
alloyprom "github.com/grafana/alloy/internal/component/prometheus"
36+
"github.com/grafana/alloy/internal/service/labelstore"
37+
"github.com/grafana/alloy/internal/util"
38+
"github.com/grafana/alloy/syntax/alloytypes"
3839
)
3940

4041
// generateTestCertAndKey generates a self-signed certificate and private key for testing
@@ -569,10 +570,8 @@ func testAppendable(actualSamples chan testSample) []storage.Appendable {
569570
return ref, nil
570571
}
571572

572-
ls := labelstore.New(nil, prometheus.DefaultRegisterer)
573573
return []storage.Appendable{alloyprom.NewInterceptor(
574574
nil,
575-
ls,
576575
alloyprom.WithAppendHook(
577576
hookFn))}
578577
}

internal/component/prometheus/relabel/relabel.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,6 @@ func New(o component.Options, args Arguments) (*Component, error) {
156156
c.fanout = prometheus.NewFanout(args.ForwardTo, o.Registerer, ls)
157157
c.receiver = prometheus.NewInterceptor(
158158
c.fanout,
159-
ls,
160159
prometheus.WithComponentID(c.opts.ID),
161160
prometheus.WithAppendHook(func(ref storage.SeriesRef, l labels.Labels, t int64, v float64, next storage.Appender) (storage.SeriesRef, error) {
162161
if c.exited.Load() {

internal/component/prometheus/relabel/relabel_test.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ func TestRelabelThroughAppend(t *testing.T) {
3030
lbls := labels.FromStrings("__address__", "localhost")
3131

3232
app := appendable.Appender(t.Context())
33-
relabedRef, err := app.Append(storage.SeriesRef(0), lbls, time.Now().UnixMilli(), 0)
33+
relabedRef, err := app.Append(storage.SeriesRef(1), lbls, time.Now().UnixMilli(), 0)
3434
require.NoError(t, err)
3535
require.NoError(t, app.Commit())
3636

@@ -72,8 +72,7 @@ func TestValidator(t *testing.T) {
7272
}
7373

7474
func TestNil(t *testing.T) {
75-
ls := labelstore.New(nil, prom.DefaultRegisterer)
76-
fanout := prometheus.NewInterceptor(nil, ls, prometheus.WithAppendHook(func(ref storage.SeriesRef, _ labels.Labels, _ int64, _ float64, _ storage.Appender) (storage.SeriesRef, error) {
75+
fanout := prometheus.NewInterceptor(nil, prometheus.WithAppendHook(func(ref storage.SeriesRef, _ labels.Labels, _ int64, _ float64, _ storage.Appender) (storage.SeriesRef, error) {
7776
require.True(t, false)
7877
return ref, nil
7978
}))
@@ -138,8 +137,7 @@ func TestMetrics(t *testing.T) {
138137
}
139138

140139
func BenchmarkCache(b *testing.B) {
141-
ls := labelstore.New(nil, prom.DefaultRegisterer)
142-
fanout := prometheus.NewInterceptor(nil, ls, prometheus.WithAppendHook(func(ref storage.SeriesRef, l labels.Labels, _ int64, _ float64, _ storage.Appender) (storage.SeriesRef, error) {
140+
fanout := prometheus.NewInterceptor(nil, prometheus.WithAppendHook(func(ref storage.SeriesRef, l labels.Labels, _ int64, _ float64, _ storage.Appender) (storage.SeriesRef, error) {
143141
require.True(b, l.Has("new_label"))
144142
return ref, nil
145143
}))
@@ -174,8 +172,7 @@ func BenchmarkCache(b *testing.B) {
174172
}
175173

176174
func generateRelabel(t *testing.T) (storage.Appendable, *Component) {
177-
ls := labelstore.New(nil, prom.DefaultRegisterer)
178-
fanout := prometheus.NewInterceptor(nil, ls)
175+
fanout := prometheus.NewInterceptor(nil)
179176
var appendable storage.Appendable
180177
relabeller, err := New(component.Options{
181178
ID: "1",

internal/component/prometheus/remotewrite/remote_write.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,6 @@ func New(o component.Options, args Arguments) (*Component, error) {
153153

154154
res.receiver = prometheus.NewInterceptor(
155155
res.storage,
156-
ls,
157156
prometheus.WithComponentID(res.opts.ID),
158157
// In the methods below, conversion is needed because remote_writes assume
159158
// they are responsible for generating ref IDs. This means two

internal/component/prometheus/scrape/scrape.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,7 @@ func New(o component.Options, args Arguments) (*Component, error) {
353353
unregisterer: unregisterer,
354354
}
355355

356-
interceptor := c.newInterceptor(ls)
356+
interceptor := c.newInterceptor()
357357

358358
scraper, err := scrape.NewManager(
359359
scrapeOptions,
@@ -648,11 +648,12 @@ func (c *Component) populatePromLabels(targets []discovery.Target, jobName strin
648648
return allTargets
649649
}
650650

651-
func (c *Component) newInterceptor(ls labelstore.LabelStore) *prometheus.Interceptor {
651+
// TODO add a test that series creation still works through interceptor + fanout
652+
func (c *Component) newInterceptor() *prometheus.Interceptor {
652653
componentID := livedebugging.ComponentID(c.opts.ID)
653-
return prometheus.NewInterceptor(c.appendable, ls,
654+
return prometheus.NewInterceptor(c.appendable,
654655
prometheus.WithAppendHook(func(globalRef storage.SeriesRef, l labels.Labels, t int64, v float64, next storage.Appender) (storage.SeriesRef, error) {
655-
_, nextErr := next.Append(globalRef, l, t, v)
656+
newRef, nextErr := next.Append(globalRef, l, t, v)
656657
c.debugDataPublisher.PublishIfActive(livedebugging.NewData(
657658
componentID,
658659
livedebugging.PrometheusMetric,
@@ -661,10 +662,10 @@ func (c *Component) newInterceptor(ls labelstore.LabelStore) *prometheus.Interce
661662
return fmt.Sprintf("sample: ts=%d, labels=%s, value=%f", t, l, v)
662663
},
663664
))
664-
return globalRef, nextErr
665+
return newRef, nextErr
665666
}),
666667
prometheus.WithHistogramHook(func(globalRef storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram, next storage.Appender) (storage.SeriesRef, error) {
667-
_, nextErr := next.AppendHistogram(globalRef, l, t, h, fh)
668+
newRef, nextErr := next.AppendHistogram(globalRef, l, t, h, fh)
668669
c.debugDataPublisher.PublishIfActive(livedebugging.NewData(
669670
componentID,
670671
livedebugging.PrometheusMetric,
@@ -681,10 +682,10 @@ func (c *Component) newInterceptor(ls labelstore.LabelStore) *prometheus.Interce
681682
return data
682683
},
683684
))
684-
return globalRef, nextErr
685+
return newRef, nextErr
685686
}),
686687
prometheus.WithMetadataHook(func(globalRef storage.SeriesRef, l labels.Labels, m metadata.Metadata, next storage.Appender) (storage.SeriesRef, error) {
687-
_, nextErr := next.UpdateMetadata(globalRef, l, m)
688+
newRef, nextErr := next.UpdateMetadata(globalRef, l, m)
688689
c.debugDataPublisher.PublishIfActive(livedebugging.NewData(
689690
componentID,
690691
livedebugging.PrometheusMetric,
@@ -693,10 +694,10 @@ func (c *Component) newInterceptor(ls labelstore.LabelStore) *prometheus.Interce
693694
return fmt.Sprintf("metadata: labels=%s, type=%q, unit=%q, help=%q", l, m.Type, m.Unit, m.Help)
694695
},
695696
))
696-
return globalRef, nextErr
697+
return newRef, nextErr
697698
}),
698699
prometheus.WithExemplarHook(func(globalRef storage.SeriesRef, l labels.Labels, e exemplar.Exemplar, next storage.Appender) (storage.SeriesRef, error) {
699-
_, nextErr := next.AppendExemplar(globalRef, l, e)
700+
newRef, nextErr := next.AppendExemplar(globalRef, l, e)
700701
c.debugDataPublisher.PublishIfActive(livedebugging.NewData(
701702
componentID,
702703
livedebugging.PrometheusMetric,
@@ -705,7 +706,7 @@ func (c *Component) newInterceptor(ls labelstore.LabelStore) *prometheus.Interce
705706
return fmt.Sprintf("exemplar: ts=%d, labels=%s, exemplar_labels=%s, value=%f", e.Ts, l, e.Labels, e.Value)
706707
},
707708
))
708-
return globalRef, nextErr
709+
return newRef, nextErr
709710
}),
710711
)
711712
}

0 commit comments

Comments
 (0)