diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 1df42f5f7e14b..b7b9ddc0453f5 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -3188,6 +3188,14 @@ dataobj_tee: # Maximum number of bytes to buffer. # CLI flag: -distributor.dataobj-tee.max-buffered-bytes [max_buffered_bytes: | default = 104857600] + + # The per-tenant partition rate (bytes/sec). + # CLI flag: -distributor.dataobj-tee.per-partition-rate-bytes + [per_partition_rate_bytes: | default = 1048576] + + # Enables optional debug metrics. + # CLI flag: -distributor.dataobj-tee.debug-metrics-enabled + [debug_metrics_enabled: | default = false] ``` ### etcd diff --git a/pkg/distributor/dataobj_tee.go b/pkg/distributor/dataobj_tee.go index 17710b4cefaa1..34f8ba2bc8efd 100644 --- a/pkg/distributor/dataobj_tee.go +++ b/pkg/distributor/dataobj_tee.go @@ -4,10 +4,10 @@ import ( "context" "errors" "flag" + "strconv" "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/grafana/dskit/ring" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/twmb/franz-go/pkg/kgo" @@ -16,50 +16,70 @@ import ( ) type DataObjTeeConfig struct { - Enabled bool `yaml:"enabled"` - Topic string `yaml:"topic"` - MaxBufferedBytes int `yaml:"max_buffered_bytes"` + Enabled bool `yaml:"enabled"` + Topic string `yaml:"topic"` + MaxBufferedBytes int `yaml:"max_buffered_bytes"` + PerPartitionRateBytes int `yaml:"per_partition_rate_bytes"` + DebugMetricsEnabled bool `yaml:"debug_metrics_enabled"` } func (c *DataObjTeeConfig) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&c.Enabled, "distributor.dataobj-tee.enabled", false, "Enable data object tee.") f.StringVar(&c.Topic, "distributor.dataobj-tee.topic", "", "Topic for data object tee.") f.IntVar(&c.MaxBufferedBytes, "distributor.dataobj-tee.max-buffered-bytes", 100<<20, "Maximum number of bytes to buffer.") + f.IntVar(&c.PerPartitionRateBytes, "distributor.dataobj-tee.per-partition-rate-bytes", 1024*1024, "The per-tenant partition rate (bytes/sec).") + f.BoolVar(&c.DebugMetricsEnabled, "distributor.dataobj-tee.debug-metrics-enabled", false, "Enables optional debug metrics.") } func (c *DataObjTeeConfig) Validate() error { - if c.Enabled && c.Topic == "" { + if !c.Enabled { + return nil + } + if c.Topic == "" { return errors.New("the topic is required") } + if c.MaxBufferedBytes < 0 { + return errors.New("max buffered bytes cannot be negative") + } + if c.PerPartitionRateBytes < 0 { + return errors.New("per partition rate bytes cannot be negative") + } return nil } // DataObjTee is a tee that duplicates streams to the data object topic. // It is a temporary solution while we work on segmentation keys. type DataObjTee struct { - cfg *DataObjTeeConfig - client *kgo.Client - ringReader ring.PartitionRingReader - logger log.Logger + cfg *DataObjTeeConfig + limitsClient *ingestLimits + kafkaClient *kgo.Client + resolver *SegmentationPartitionResolver + logger log.Logger // Metrics. failures prometheus.Counter total prometheus.Counter + + // High cardinality metrics which are only emitted when debug metrics + // are enabled. + produced *prometheus.CounterVec } // NewDataObjTee returns a new DataObjTee. func NewDataObjTee( cfg *DataObjTeeConfig, - client *kgo.Client, - ringReader ring.PartitionRingReader, + resolver *SegmentationPartitionResolver, + limitsClient *ingestLimits, + kafkaClient *kgo.Client, logger log.Logger, reg prometheus.Registerer, ) (*DataObjTee, error) { return &DataObjTee{ - cfg: cfg, - client: client, - ringReader: ringReader, - logger: logger, + cfg: cfg, + resolver: resolver, + kafkaClient: kafkaClient, + limitsClient: limitsClient, + logger: logger, failures: promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "loki_distributor_dataobj_tee_duplicate_stream_failures_total", Help: "Total number of streams that could not be duplicated.", @@ -68,21 +88,54 @@ func NewDataObjTee( Name: "loki_distributor_dataobj_tee_duplicate_streams_total", Help: "Total number of streams duplicated.", }), + produced: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "loki_distributor_dataobj_tee_produced_bytes_total", + Help: "Total number of bytes produced to each partition.", + }, []string{"tenant", "partition", "segmentation_key"}), }, nil } +// A SegmentedStream is a KeyedStream with a segmentation key. +type SegmentedStream struct { + KeyedStream + SegmentationKey SegmentationKey +} + // Duplicate implements the [Tee] interface. -func (t *DataObjTee) Duplicate(_ context.Context, tenant string, streams []KeyedStream) { - for _, s := range streams { - go t.duplicate(tenant, s) +func (t *DataObjTee) Duplicate(ctx context.Context, tenant string, streams []KeyedStream) { + segmentationKeyStreams := make([]SegmentedStream, 0, len(streams)) + for _, stream := range streams { + segmentationKey, err := GetSegmentationKey(stream) + if err != nil { + level.Error(t.logger).Log("msg", "failed to get segmentation key", "err", err) + t.failures.Inc() + return + } + segmentationKeyStreams = append(segmentationKeyStreams, SegmentedStream{ + KeyedStream: stream, + SegmentationKey: segmentationKey, + }) + } + rates, err := t.limitsClient.UpdateRates(ctx, tenant, segmentationKeyStreams) + if err != nil { + level.Error(t.logger).Log("msg", "failed to update rates", "err", err) + } + // fastRates is a temporary lookup table that lets us find the rate + // for a segmentation key in constant time. + fastRates := make(map[uint64]uint64, len(rates)) + for _, rate := range rates { + fastRates[rate.StreamHash] = rate.Rate + } + for _, s := range segmentationKeyStreams { + go t.duplicate(ctx, tenant, s, fastRates[s.SegmentationKey.Sum64()]) } } -func (t *DataObjTee) duplicate(tenant string, stream KeyedStream) { +func (t *DataObjTee) duplicate(ctx context.Context, tenant string, stream SegmentedStream, rateBytes uint64) { t.total.Inc() - partition, err := t.ringReader.PartitionRing().ActivePartitionForKey(stream.HashKey) + partition, err := t.resolver.Resolve(ctx, stream.SegmentationKey, rateBytes) if err != nil { - level.Error(t.logger).Log("msg", "failed to get partition", "err", err) + level.Error(t.logger).Log("msg", "failed to resolve partition", "err", err) t.failures.Inc() return } @@ -92,9 +145,16 @@ func (t *DataObjTee) duplicate(tenant string, stream KeyedStream) { t.failures.Inc() return } - results := t.client.ProduceSync(context.TODO(), records...) + results := t.kafkaClient.ProduceSync(ctx, records...) if err := results.FirstErr(); err != nil { level.Error(t.logger).Log("msg", "failed to produce records", "err", err) t.failures.Inc() } + if t.cfg.DebugMetricsEnabled { + t.produced.WithLabelValues( + tenant, + strconv.FormatInt(int64(partition), 10), + string(stream.SegmentationKey), + ).Add(float64(stream.Stream.Size())) + } } diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index d0f990fad72b1..ff68a4a95e1ef 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -283,6 +283,8 @@ func New( return nil, fmt.Errorf("partition ring is required for kafka writes") } + ingestLimits := newIngestLimits(limitsFrontendClient, registerer) + var kafkaWriter KafkaProducer if cfg.KafkaEnabled { kafkaClient, err := kafka_client.NewWriterClient("distributor", cfg.KafkaConfig, 20, logger, registerer) @@ -295,10 +297,17 @@ func New( ) if cfg.DataObjTeeConfig.Enabled { + resolver := NewSegmentationPartitionResolver( + uint64(cfg.DataObjTeeConfig.PerPartitionRateBytes), + dataObjConsumerPartitionRing, + registerer, + logger, + ) dataObjTee, err := NewDataObjTee( &cfg.DataObjTeeConfig, + resolver, + ingestLimits, kafkaClient, - dataObjConsumerPartitionRing, logger, registerer, ) @@ -379,7 +388,7 @@ func New( writeFailuresManager: writefailures.NewManager(logger, registerer, cfg.WriteFailuresLogging, configs, "distributor"), kafkaWriter: kafkaWriter, partitionRing: partitionRing, - ingestLimits: newIngestLimits(limitsFrontendClient, registerer), + ingestLimits: ingestLimits, numMetadataPartitions: numMetadataPartitions, } diff --git a/pkg/distributor/ingest_limits.go b/pkg/distributor/ingest_limits.go index 4e6febcb91b3a..e1683a7f75ef7 100644 --- a/pkg/distributor/ingest_limits.go +++ b/pkg/distributor/ingest_limits.go @@ -194,7 +194,7 @@ func newExceedsLimitsRequest(tenant string, streams []KeyedStream) (*proto.Excee // UpdateRates updates the rates for the streams and returns a slice of the // updated rates for all streams. Any streams that could not have rates updated // have a rate of zero. -func (l *ingestLimits) UpdateRates(ctx context.Context, tenant string, streams []KeyedStream) ([]*proto.UpdateRatesResult, error) { +func (l *ingestLimits) UpdateRates(ctx context.Context, tenant string, streams []SegmentedStream) ([]*proto.UpdateRatesResult, error) { l.requests.WithLabelValues("UpdateRates").Inc() req, err := newUpdateRatesRequest(tenant, streams) if err != nil { @@ -209,7 +209,7 @@ func (l *ingestLimits) UpdateRates(ctx context.Context, tenant string, streams [ return resp.Results, nil } -func newUpdateRatesRequest(tenant string, streams []KeyedStream) (*proto.UpdateRatesRequest, error) { +func newUpdateRatesRequest(tenant string, streams []SegmentedStream) (*proto.UpdateRatesRequest, error) { // The distributor sends the hashes of all streams in the request to the // limits-frontend. The limits-frontend is responsible for deciding if // the request would exceed the tenants limits, and if so, which streams @@ -218,7 +218,7 @@ func newUpdateRatesRequest(tenant string, streams []KeyedStream) (*proto.UpdateR for _, stream := range streams { entriesSize, structuredMetadataSize := calculateStreamSizes(stream.Stream) streamMetadata = append(streamMetadata, &proto.StreamMetadata{ - StreamHash: stream.HashKeyNoShard, + StreamHash: stream.SegmentationKey.Sum64(), TotalSize: entriesSize + structuredMetadataSize, IngestionPolicy: stream.Policy, }) diff --git a/pkg/distributor/ingest_limits_test.go b/pkg/distributor/ingest_limits_test.go index 9786d3ce82cd0..00b61305c5a8d 100644 --- a/pkg/distributor/ingest_limits_test.go +++ b/pkg/distributor/ingest_limits_test.go @@ -302,7 +302,7 @@ func TestIngestLimits_UpdateRates(t *testing.T) { tests := []struct { name string tenant string - streams []KeyedStream + streams []SegmentedStream expectedRequest *proto.UpdateRatesRequest response *proto.UpdateRatesResponse responseErr error @@ -311,37 +311,31 @@ func TestIngestLimits_UpdateRates(t *testing.T) { }{{ name: "error should be returned if rates cannot be updated", tenant: "test", - streams: []KeyedStream{{ - HashKeyNoShard: 1, + streams: []SegmentedStream{{ + SegmentationKey: "test", }}, - expectedRequest: &proto.UpdateRatesRequest{ - Tenant: "test", - Streams: []*proto.StreamMetadata{{ - StreamHash: 1, - }}, - }, responseErr: errors.New("failed to update rates"), expectedErr: "failed to update rates", }, { name: "updates rates", tenant: "test", - streams: []KeyedStream{{ - HashKeyNoShard: 1, + streams: []SegmentedStream{{ + SegmentationKey: "test", }}, expectedRequest: &proto.UpdateRatesRequest{ Tenant: "test", Streams: []*proto.StreamMetadata{{ - StreamHash: 1, + StreamHash: 0xb5fb79e24c92922f, }}, }, response: &proto.UpdateRatesResponse{ Results: []*proto.UpdateRatesResult{{ - StreamHash: 1, + StreamHash: 0xb5fb79e24c92922f, Rate: 1024, }}, }, expectedResult: []*proto.UpdateRatesResult{{ - StreamHash: 1, + StreamHash: 0xb5fb79e24c92922f, Rate: 1024, }}, }} diff --git a/pkg/limits/frontend/frontend.go b/pkg/limits/frontend/frontend.go index d9f5d4d859bb0..c337a7f4374cc 100644 --- a/pkg/limits/frontend/frontend.go +++ b/pkg/limits/frontend/frontend.go @@ -137,10 +137,7 @@ func (f *Frontend) ExceedsLimits(ctx context.Context, req *proto.ExceedsLimitsRe return &proto.ExceedsLimitsResponse{Results: results}, nil } -func (f *Frontend) UpdateRates( - ctx context.Context, - req *proto.UpdateRatesRequest, -) (*proto.UpdateRatesResponse, error) { +func (f *Frontend) UpdateRates(ctx context.Context, req *proto.UpdateRatesRequest) (*proto.UpdateRatesResponse, error) { results := make([]*proto.UpdateRatesResult, 0, len(req.Streams)) resps, err := f.limitsClient.UpdateRates(ctx, req) if err != nil { diff --git a/pkg/limits/frontend/ring.go b/pkg/limits/frontend/ring.go index 5c83ad08f7c8e..459f9d10a80a8 100644 --- a/pkg/limits/frontend/ring.go +++ b/pkg/limits/frontend/ring.go @@ -73,10 +73,7 @@ func newRingLimitsClient( } // ExceedsLimits implements the [exceedsLimitsGatherer] interface. -func (r *ringLimitsClient) ExceedsLimits( - ctx context.Context, - req *proto.ExceedsLimitsRequest, -) ([]*proto.ExceedsLimitsResponse, error) { +func (r *ringLimitsClient) ExceedsLimits(ctx context.Context, req *proto.ExceedsLimitsRequest) ([]*proto.ExceedsLimitsResponse, error) { if len(req.Streams) == 0 { return nil, nil } @@ -101,10 +98,7 @@ func (r *ringLimitsClient) ExceedsLimits( } // UpdateRates implements the [exceedsLimitsGatherer] interface. -func (r *ringLimitsClient) UpdateRates( - ctx context.Context, - req *proto.UpdateRatesRequest, -) ([]*proto.UpdateRatesResponse, error) { +func (r *ringLimitsClient) UpdateRates(ctx context.Context, req *proto.UpdateRatesRequest) ([]*proto.UpdateRatesResponse, error) { if len(req.Streams) == 0 { return nil, nil } @@ -238,12 +232,7 @@ type doRPCsFunc func( // exhaustAllZones queries all zones, one at a time, until either all streams // have been answered or all zones have been exhausted. -func (r *ringLimitsClient) exhaustAllZones( - ctx context.Context, - tenant string, - streams []*proto.StreamMetadata, - doRPCs doRPCsFunc, -) ([]*proto.StreamMetadata, error) { +func (r *ringLimitsClient) exhaustAllZones(ctx context.Context, tenant string, streams []*proto.StreamMetadata, doRPCs doRPCsFunc) ([]*proto.StreamMetadata, error) { zonesIter, err := r.allZones(ctx) if err != nil { return nil, err diff --git a/pkg/limits/frontend/ring_test.go b/pkg/limits/frontend/ring_test.go index c9fa33b225f11..3df0241240352 100644 --- a/pkg/limits/frontend/ring_test.go +++ b/pkg/limits/frontend/ring_test.go @@ -15,7 +15,7 @@ import ( "github.com/grafana/loki/v3/pkg/limits/proto" ) -func TestRingGatherer_ExceedsLimits(t *testing.T) { +func TestRingLimitsClient_ExceedsLimits(t *testing.T) { tests := []struct { name string request *proto.ExceedsLimitsRequest @@ -442,7 +442,7 @@ func TestRingGatherer_ExceedsLimits(t *testing.T) { } } -func TestRingStreamUsageGatherer_GetZoneAwarePartitionConsumers(t *testing.T) { +func TestRingLimitsClient_GetZoneAwarePartitionConsumers(t *testing.T) { tests := []struct { name string instances []ring.InstanceDesc @@ -603,7 +603,7 @@ func TestRingStreamUsageGatherer_GetZoneAwarePartitionConsumers(t *testing.T) { } } -func TestRingStreamUsageGatherer_GetPartitionConsumers(t *testing.T) { +func TestRingLimitsClient_GetPartitionConsumers(t *testing.T) { tests := []struct { name string // Instances contains the complete set of instances that should be mocked. @@ -740,7 +740,7 @@ func TestRingStreamUsageGatherer_GetPartitionConsumers(t *testing.T) { } } -func TestRingStreamUsageGatherer_GetPartitionConsumers_Caching(t *testing.T) { +func TestRingLimitsClient_GetPartitionConsumers_Caching(t *testing.T) { // Set up the mock clients. req0 := proto.GetAssignedPartitionsResponse{ AssignedPartitions: map[int32]int64{