Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ Main (unreleased)

- Add `meta_cache_address` to `beyla.ebpf` component. (@skl)

- Remove labelstore interactions from the prometheus interceptor simplifying prometheus pipelines. (@kgeckhart)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this change really need a changelog entry? It is more like an internal refinment that doesn't impact users.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤷 it's an optimization on a highly used pipeline


### Bugfixes

- `loki.source.api` no longer drops request when relabel rules drops a specific stream. (@kalleep)
Expand Down Expand Up @@ -98,7 +100,6 @@ v1.12.0-rc.0
- The `otelcol.processor.servicegraph` component now supports defining the maximum number of buckets for generated exponential histograms.
- See the upstream [core][https://github.com/open-telemetry/opentelemetry-collector/blob/v0.139.0/CHANGELOG.md] and [contrib][https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.139.0/CHANGELOG.md] changelogs for more details.


### Enhancements

- Add per-application rate limiting with the `strategy` attribute in the `faro.receiver` component, to prevent one application from consuming the rate limit quota of others. (@hhertout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ import (
"time"

"github.com/go-kit/log"
"github.com/prometheus/prometheus/storage"

"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/component/otelcol"
"github.com/grafana/alloy/internal/component/otelcol/exporter/prometheus/internal/convert"
"github.com/grafana/alloy/internal/component/otelcol/internal/lazyconsumer"
"github.com/grafana/alloy/internal/component/prometheus"
"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/internal/service/labelstore"
"github.com/prometheus/prometheus/storage"
)

func init() {
Expand Down Expand Up @@ -87,7 +88,7 @@ func New(o component.Options, c Arguments) (*Component, error) {
return nil, err
}
ls := service.(labelstore.LabelStore)
fanout := prometheus.NewFanout(nil, o.ID, o.Registerer, ls)
fanout := prometheus.NewFanout(nil, o.Registerer, ls)

converter := convert.New(o.Logger, fanout, convertArgumentsToConvertOptions(c))

Expand Down
3 changes: 1 addition & 2 deletions internal/component/prometheus/enrich/enrich.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,9 @@ func New(opts component.Options, args Arguments) (*Component, error) {
}
}

c.fanout = prometheus.NewFanout(args.ForwardTo, opts.ID, opts.Registerer, ls)
c.fanout = prometheus.NewFanout(args.ForwardTo, opts.Registerer, ls)
c.receiver = prometheus.NewInterceptor(
c.fanout,
ls,
prometheus.WithComponentID(c.opts.ID),
prometheus.WithAppendHook(func(_ storage.SeriesRef, l labels.Labels, t int64, v float64, next storage.Appender) (storage.SeriesRef, error) {
if c.exited.Load() {
Expand Down
12 changes: 6 additions & 6 deletions internal/component/prometheus/enrich/enrich_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@ import (
"testing"
"time"

prom "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/stretchr/testify/require"

"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/component/discovery"
"github.com/grafana/alloy/internal/component/prometheus"
"github.com/grafana/alloy/internal/service/labelstore"
"github.com/grafana/alloy/internal/util"
prom "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/stretchr/testify/require"
)

func TestEnricher(t *testing.T) {
Expand Down Expand Up @@ -108,9 +109,8 @@ func TestEnricher(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ls := labelstore.New(nil, prom.DefaultRegisterer)
fanout := prometheus.NewInterceptor(
nil, ls,
nil,
prometheus.WithAppendHook(func(ref storage.SeriesRef, l labels.Labels, _ int64, _ float64, _ storage.Appender) (storage.SeriesRef, error) {
for name, value := range tt.expectedLabels {
require.Equal(t, l.Get(name), value)
Expand Down
8 changes: 3 additions & 5 deletions internal/component/prometheus/fanout.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@ import (
var _ storage.Appendable = (*Fanout)(nil)

// Fanout supports the default Alloy style of appendables since it can go to multiple outputs. It also allows the intercepting of appends.
// It also maintains the responsibility of assigning global ref IDs to a series via the label store.
type Fanout struct {
mut sync.RWMutex
// children is where to fan out.
children []storage.Appendable
// ComponentID is what component this belongs to.
componentID string
children []storage.Appendable
writeLatency prometheus.Histogram
samplesCounter prometheus.Counter
ls labelstore.LabelStore
Expand All @@ -39,7 +38,7 @@ type Fanout struct {
}

// NewFanout creates a fanout appendable.
func NewFanout(children []storage.Appendable, componentID string, register prometheus.Registerer, ls labelstore.LabelStore) *Fanout {
func NewFanout(children []storage.Appendable, register prometheus.Registerer, ls labelstore.LabelStore) *Fanout {
wl := prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "prometheus_fanout_latency",
Help: "Write latency for sending to direct and indirect components",
Expand All @@ -55,7 +54,6 @@ func NewFanout(children []storage.Appendable, componentID string, register prome

return &Fanout{
children: children,
componentID: componentID,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drive-by: This wasn't referenced anywhere

writeLatency: wl,
samplesCounter: s,
ls: ls,
Expand Down
4 changes: 2 additions & 2 deletions internal/component/prometheus/fanout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ import (

func TestRollback(t *testing.T) {
ls := labelstore.New(nil, prometheus.DefaultRegisterer)
fanout := NewFanout([]storage.Appendable{NewFanout(nil, "1", prometheus.DefaultRegisterer, ls)}, "", prometheus.DefaultRegisterer, ls)
fanout := NewFanout([]storage.Appendable{NewFanout(nil, prometheus.DefaultRegisterer, ls)}, prometheus.DefaultRegisterer, ls)
app := fanout.Appender(t.Context())
err := app.Rollback()
require.NoError(t, err)
}

func TestCommit(t *testing.T) {
ls := labelstore.New(nil, prometheus.DefaultRegisterer)
fanout := NewFanout([]storage.Appendable{NewFanout(nil, "1", prometheus.DefaultRegisterer, ls)}, "", prometheus.DefaultRegisterer, ls)
fanout := NewFanout([]storage.Appendable{NewFanout(nil, prometheus.DefaultRegisterer, ls)}, prometheus.DefaultRegisterer, ls)
app := fanout.Appender(t.Context())
err := app.Commit()
require.NoError(t, err)
Expand Down
65 changes: 9 additions & 56 deletions internal/component/prometheus/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/storage"
"go.uber.org/atomic"

"github.com/grafana/alloy/internal/service/labelstore"
)

// Interceptor is a storage.Appendable which invokes callback functions upon
Expand All @@ -26,23 +23,16 @@ type Interceptor struct {
// next is the next appendable to pass in the chain.
next storage.Appendable

ls labelstore.LabelStore

// lastSeriesCount stores the number of series that were sent through the last interceptappender. It helps to estimate how
// much memory to allocate for the staleness trackers.
lastSeriesCount atomic.Int64

componentID string
}

var _ storage.Appendable = (*Interceptor)(nil)

// NewInterceptor creates a new Interceptor storage.Appendable. Options can be
// provided to NewInterceptor to install custom hooks for different methods.
func NewInterceptor(next storage.Appendable, ls labelstore.LabelStore, opts ...InterceptorOption) *Interceptor {
func NewInterceptor(next storage.Appendable, opts ...InterceptorOption) *Interceptor {
i := &Interceptor{
next: next,
ls: ls,
}
for _, opt := range opts {
opt(i)
Expand Down Expand Up @@ -102,27 +92,23 @@ func WithComponentID(id string) InterceptorOption {
}

// Appender satisfies the Appendable interface.
func (f *Interceptor) Appender(ctx context.Context) storage.Appender {
func (i *Interceptor) Appender(ctx context.Context) storage.Appender {
app := &interceptappender{
interceptor: f,
ls: f.ls,
stalenessTrackers: make([]labelstore.StalenessTracker, 0, f.lastSeriesCount.Load()),
interceptor: i,
}
if f.next != nil {
app.child = f.next.Appender(ctx)
if i.next != nil {
app.child = i.next.Appender(ctx)
}
return app
}

func (f *Interceptor) String() string {
return f.componentID + ".receiver"
func (i *Interceptor) String() string {
return i.componentID + ".receiver"
}

type interceptappender struct {
interceptor *Interceptor
child storage.Appender
ls labelstore.LabelStore
stalenessTrackers []labelstore.StalenessTracker
interceptor *Interceptor
child storage.Appender
}

func (a *interceptappender) SetOptions(opts *storage.AppendOptions) {
Expand All @@ -135,15 +121,6 @@ var _ storage.Appender = (*interceptappender)(nil)

// Append satisfies the Appender interface.
func (a *interceptappender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
if ref == 0 {
ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l))
}
a.stalenessTrackers = append(a.stalenessTrackers, labelstore.StalenessTracker{
GlobalRefID: uint64(ref),
Labels: l,
Value: v,
})

if a.interceptor.onAppend != nil {
return a.interceptor.onAppend(ref, l, t, v, a.child)
}
Expand All @@ -155,8 +132,6 @@ func (a *interceptappender) Append(ref storage.SeriesRef, l labels.Labels, t int

// Commit satisfies the Appender interface.
func (a *interceptappender) Commit() error {
a.interceptor.lastSeriesCount.Store(int64(len(a.stalenessTrackers)))
a.ls.TrackStaleness(a.stalenessTrackers)
if a.child == nil {
return nil
}
Expand All @@ -165,8 +140,6 @@ func (a *interceptappender) Commit() error {

// Rollback satisfies the Appender interface.
func (a *interceptappender) Rollback() error {
a.interceptor.lastSeriesCount.Store(int64(len(a.stalenessTrackers)))
a.ls.TrackStaleness(a.stalenessTrackers)
if a.child == nil {
return nil
}
Expand All @@ -180,10 +153,6 @@ func (a *interceptappender) AppendExemplar(
e exemplar.Exemplar,
) (storage.SeriesRef, error) {

if ref == 0 {
ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l))
}

if a.interceptor.onAppendExemplar != nil {
return a.interceptor.onAppendExemplar(ref, l, e, a.child)
}
Expand All @@ -200,10 +169,6 @@ func (a *interceptappender) UpdateMetadata(
m metadata.Metadata,
) (storage.SeriesRef, error) {

if ref == 0 {
ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l))
}

if a.interceptor.onUpdateMetadata != nil {
return a.interceptor.onUpdateMetadata(ref, l, m, a.child)
}
Expand All @@ -221,10 +186,6 @@ func (a *interceptappender) AppendHistogram(
fh *histogram.FloatHistogram,
) (storage.SeriesRef, error) {

if ref == 0 {
ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l))
}
// TODO histograms are not currently tracked for staleness causing them to be held forever
if a.interceptor.onAppendHistogram != nil {
return a.interceptor.onAppendHistogram(ref, l, t, h, fh, a.child)
}
Expand All @@ -240,10 +201,6 @@ func (a *interceptappender) AppendCTZeroSample(
t, ct int64,
) (storage.SeriesRef, error) {

if ref == 0 {
ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l))
}

if a.interceptor.onAppendCTZeroSample != nil {
return a.interceptor.onAppendCTZeroSample(ref, l, t, ct, a.child)
}
Expand All @@ -261,10 +218,6 @@ func (a *interceptappender) AppendHistogramCTZeroSample(
fh *histogram.FloatHistogram,
) (storage.SeriesRef, error) {

if ref == 0 {
ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l))
}

if a.child == nil {
return 0, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (c *crdManager) Run(ctx context.Context) error {
}()

// Start prometheus scrape manager.
alloyAppendable := prometheus.NewFanout(c.args.ForwardTo, c.opts.ID, c.opts.Registerer, c.ls)
alloyAppendable := prometheus.NewFanout(c.args.ForwardTo, c.opts.Registerer, c.ls)
opts := &scrape.Options{}
c.scrapeManager, err = scrape.NewManager(opts, slog.New(logging.NewSlogGoKitHandler(c.logger)), nil, alloyAppendable, unregisterer)
if err != nil {
Expand Down
Loading
Loading