From 4ac9594f0776e59ff6ebfd21b671eb0aaca9435e Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Thu, 20 Nov 2025 10:34:29 +0100 Subject: [PATCH 1/3] feat: Query-level bucket rate limiting --- pkg/querier/http.go | 21 +++ pkg/querier/limits/definitions.go | 2 + pkg/storage/bucket/object_client_adapter.go | 25 ++- pkg/storage/bucket/rate_limited_reader.go | 115 +++++++++++++ .../bucket/rate_limited_reader_test.go | 159 ++++++++++++++++++ pkg/validation/limits.go | 61 ++++--- 6 files changed, 361 insertions(+), 22 deletions(-) create mode 100644 pkg/storage/bucket/rate_limited_reader.go create mode 100644 pkg/storage/bucket/rate_limited_reader_test.go diff --git a/pkg/querier/http.go b/pkg/querier/http.go index 5413167df2314..69e60daaea746 100644 --- a/pkg/querier/http.go +++ b/pkg/querier/http.go @@ -36,6 +36,7 @@ import ( querier_limits "github.com/grafana/loki/v3/pkg/querier/limits" "github.com/grafana/loki/v3/pkg/querier/pattern" "github.com/grafana/loki/v3/pkg/querier/queryrange" + "github.com/grafana/loki/v3/pkg/storage/bucket" index_stats "github.com/grafana/loki/v3/pkg/storage/stores/index/stats" "github.com/grafana/loki/v3/pkg/tracing" "github.com/grafana/loki/v3/pkg/util/constants" @@ -92,6 +93,16 @@ func (q *QuerierAPI) RangeQueryHandler(ctx context.Context, req *queryrange.Loki return result, err } + // Add bandwidth limit to context for this query if configured + tenantID, err := tenant.TenantID(ctx) + if err == nil { + rateLimit := q.limits.QueryBucketGetObjectRateLimit(ctx, tenantID) + if rateLimit > 0 { + burstLimit := q.limits.QueryBucketGetObjectRateLimitBurst(ctx, tenantID) + ctx = bucket.WithQueryBandwidthLimit(ctx, rateLimit, burstLimit) + } + } + if q.cfg.EngineV2.Enable && hasDataObjectsAvailable(q.cfg, params.Start(), params.End()) { query := q.engineV2.Query(params) result, err = query.Exec(ctx) @@ -133,6 +144,16 @@ func (q *QuerierAPI) InstantQueryHandler(ctx context.Context, req *queryrange.Lo return logqlmodel.Result{}, err } + // Add bandwidth limit to context for this query if configured + tenantID, err := tenant.TenantID(ctx) + if err == nil { + rateLimit := q.limits.QueryBucketGetObjectRateLimit(ctx, tenantID) + if rateLimit > 0 { + burstLimit := q.limits.QueryBucketGetObjectRateLimitBurst(ctx, tenantID) + ctx = bucket.WithQueryBandwidthLimit(ctx, rateLimit, burstLimit) + } + } + if q.cfg.EngineV2.Enable && hasDataObjectsAvailable(q.cfg, params.Start(), params.End()) { query := q.engineV2.Query(params) result, err := query.Exec(ctx) diff --git a/pkg/querier/limits/definitions.go b/pkg/querier/limits/definitions.go index 64108e5aed501..f672e31531b44 100644 --- a/pkg/querier/limits/definitions.go +++ b/pkg/querier/limits/definitions.go @@ -21,4 +21,6 @@ type Limits interface { MaxStreamsMatchersPerQuery(context.Context, string) int MaxConcurrentTailRequests(context.Context, string) int MaxEntriesLimitPerQuery(context.Context, string) int + QueryBucketGetObjectRateLimit(context.Context, string) int64 + QueryBucketGetObjectRateLimitBurst(context.Context, string) int64 } diff --git a/pkg/storage/bucket/object_client_adapter.go b/pkg/storage/bucket/object_client_adapter.go index 9930f176163e3..4715df52a0784 100644 --- a/pkg/storage/bucket/object_client_adapter.go +++ b/pkg/storage/bucket/object_client_adapter.go @@ -128,11 +128,34 @@ func (o *ObjectClientAdapter) GetObject(ctx context.Context, objectKey string) ( level.Warn(o.logger).Log("msg", "failed to get size of object", "err", err) } + // Wrap with rate-limited reader if query has a bandwidth limit + if limiter := getQueryRateLimiter(ctx); limiter != nil { + reader = &rateLimitedReader{ + ReadCloser: reader, + limiter: limiter, + ctx: ctx, + } + } + return reader, size, err } func (o *ObjectClientAdapter) GetObjectRange(ctx context.Context, objectKey string, offset, length int64) (io.ReadCloser, error) { - return o.hedgedBucket.GetRange(ctx, objectKey, offset, length) + reader, err := o.hedgedBucket.GetRange(ctx, objectKey, offset, length) + if err != nil { + return nil, err + } + + // Wrap with rate-limited reader if query has a bandwidth limit + if limiter := getQueryRateLimiter(ctx); limiter != nil { + reader = &rateLimitedReader{ + ReadCloser: reader, + limiter: limiter, + ctx: ctx, + } + } + + return reader, nil } // List objects with given prefix. diff --git a/pkg/storage/bucket/rate_limited_reader.go b/pkg/storage/bucket/rate_limited_reader.go new file mode 100644 index 0000000000000..bdb1f35aba593 --- /dev/null +++ b/pkg/storage/bucket/rate_limited_reader.go @@ -0,0 +1,115 @@ +package bucket + +import ( + "context" + "io" + + "golang.org/x/time/rate" +) + +// minReadSize is the minimum chunk size for reading data. +// This ensures we read in reasonable-sized batches rather than very small ones. +const minReadSize = 512 + +type rateLimiterKey struct{} + +// queryRateLimiter holds a rate limiter for a query. +type queryRateLimiter struct { + limiter *rate.Limiter +} + +// WithQueryBandwidthLimit adds a bandwidth limit to the context for this query. +// All GetObject calls within this query will share the same rate limiter. +// bytesPerSecond is the maximum bytes per second for this query. +// burstBytes is the maximum burst bytes allowed. +// If bytesPerSecond is 0 or negative, rate limiting is disabled. +func WithQueryBandwidthLimit(ctx context.Context, bytesPerSecond int64, burstBytes int64) context.Context { + if bytesPerSecond <= 0 { + return ctx + } + + // Set burst to rate if not specified or invalid + burst := int(bytesPerSecond) + if burstBytes > 0 { + burst = int(burstBytes) + } + + // Create a limiter with the specified rate and burst + limiter := rate.NewLimiter(rate.Limit(bytesPerSecond), burst) + + return context.WithValue(ctx, rateLimiterKey{}, &queryRateLimiter{ + limiter: limiter, + }) +} + +// getQueryRateLimiter extracts the rate limiter from context. +// Returns nil if no rate limiter is configured. +func getQueryRateLimiter(ctx context.Context) *rate.Limiter { + rl, ok := ctx.Value(rateLimiterKey{}).(*queryRateLimiter) + if !ok || rl == nil { + return nil + } + return rl.limiter +} + +// rateLimitedReader wraps an io.ReadCloser and limits the read rate using a shared limiter. +type rateLimitedReader struct { + io.ReadCloser + limiter *rate.Limiter + ctx context.Context +} + +// Read reads data from the underlying reader while respecting the rate limit. +// It reads in batches that don't exceed the burst size, waiting for rate limiter +// approval before each read to ensure we don't read ahead of the rate limit. +func (r *rateLimitedReader) Read(p []byte) (n int, err error) { + if len(p) == 0 { + return 0, nil + } + + burst := r.limiter.Burst() + if burst <= 0 { + // This should never happen with limiters created by WithQueryBandwidthLimit + // but handle it defensively - if burst is invalid, we can't rate limit + return r.ReadCloser.Read(p) + } + + // Read in batches, waiting for rate limiter approval before each batch + totalRead := 0 + for totalRead < len(p) { + remaining := len(p) - totalRead + // Use minReadSize as the read size, but don't exceed burst or remaining buffer + readSize := minReadSize + if readSize > remaining { + readSize = remaining + } + if readSize > burst { + readSize = burst + } + + // Wait for rate limiter approval before reading this batch + if err := r.limiter.WaitN(r.ctx, readSize); err != nil { + if totalRead > 0 { + // Return what we've read so far + return totalRead, nil + } + return 0, err + } + + // Read from underlying reader (up to the approved read size) + batch := p[totalRead : totalRead+readSize] + read, err := r.ReadCloser.Read(batch) + totalRead += read + + if err != nil { + return totalRead, err + } + + // If we read less than requested, we've hit EOF or the reader is done + if read < readSize { + return totalRead, err + } + } + + return totalRead, nil +} diff --git a/pkg/storage/bucket/rate_limited_reader_test.go b/pkg/storage/bucket/rate_limited_reader_test.go new file mode 100644 index 0000000000000..22f1f99d47c32 --- /dev/null +++ b/pkg/storage/bucket/rate_limited_reader_test.go @@ -0,0 +1,159 @@ +package bucket + +import ( + "bytes" + "context" + "io" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestRateLimitedReader(t *testing.T) { + tests := []struct { + name string + dataSize int + rateLimit int64 // bytes per second, 0 means unlimited + burstLimit int64 // burst bytes, 0 means use rate limit + expectThrottle bool + minDuration time.Duration // minimum expected duration if throttled + }{ + { + name: "no rate limit configured", + dataSize: 1000, + rateLimit: 0, + burstLimit: 0, + expectThrottle: false, + }, + { + name: "rate limit higher than data size", + dataSize: 1000, + rateLimit: 100000, // 100KB/s, much higher than data + burstLimit: 0, + expectThrottle: false, // Burst allows immediate read + }, + { + name: "rate limit throttles read", + dataSize: 1000, // 1KB + rateLimit: 100, // 100 bytes/s + burstLimit: 0, // Defaults to rate limit + expectThrottle: true, + // First 100 bytes are immediate (burst), remaining 900 bytes at 100 bytes/s = 9 seconds + minDuration: 8 * time.Second, + }, + { + name: "rate limit with custom burst", + dataSize: 1000, // 1KB + rateLimit: 100, // 100 bytes/s + burstLimit: 200, // 200 bytes burst + expectThrottle: true, + // First 200 bytes are immediate (burst), remaining 800 bytes at 100 bytes/s = 8 seconds + minDuration: 7 * time.Second, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create test data + data := make([]byte, tt.dataSize) + for i := range data { + data[i] = byte(i % 256) + } + + // Create reader + reader := io.NopCloser(bytes.NewReader(data)) + + // Configure context with rate limit (or not) + ctx := context.Background() + if tt.rateLimit > 0 { + ctx = WithQueryBandwidthLimit(ctx, tt.rateLimit, tt.burstLimit) + } + + // Create rate-limited reader if limiter exists + var rlReader io.ReadCloser = reader + if limiter := getQueryRateLimiter(ctx); limiter != nil { + rlReader = &rateLimitedReader{ + ReadCloser: reader, + limiter: limiter, + ctx: ctx, + } + } + + // Read data and measure time + start := time.Now() + buf := make([]byte, tt.dataSize) + n, err := rlReader.Read(buf) + elapsed := time.Since(start) + + require.NoError(t, err) + require.Equal(t, tt.dataSize, n) + require.Equal(t, data, buf) + + // Verify rate limit was respected + if tt.expectThrottle { + require.GreaterOrEqual(t, elapsed, tt.minDuration, + "read should be throttled and take at least %v (read %d bytes at %d bytes/s)", tt.minDuration, tt.dataSize, tt.rateLimit) + } + }) + } +} + +func TestRateLimitedReader_MultipleReadersShareLimiter(t *testing.T) { + // Test that multiple readers sharing the same limiter respect the total rate limit + dataSize := 500 // 500 bytes per reader + rateLimit := int64(200) // 200 bytes/s total + numReaders := 2 + + // Create test data + data := make([]byte, dataSize) + for i := range data { + data[i] = byte(i % 256) + } + + // Configure context with rate limit + ctx := WithQueryBandwidthLimit(context.Background(), rateLimit, 0) + limiter := getQueryRateLimiter(ctx) + require.NotNil(t, limiter) + + // Create multiple readers sharing the same limiter + readers := make([]io.ReadCloser, numReaders) + for i := 0; i < numReaders; i++ { + reader := io.NopCloser(bytes.NewReader(data)) + readers[i] = &rateLimitedReader{ + ReadCloser: reader, + limiter: limiter, + ctx: ctx, + } + } + + // Read from all readers concurrently + start := time.Now() + done := make(chan bool, numReaders) + buffers := make([][]byte, numReaders) + + for i := 0; i < numReaders; i++ { + buffers[i] = make([]byte, dataSize) + go func(idx int) { + _, err := readers[idx].Read(buffers[idx]) + require.NoError(t, err) + done <- true + }(i) + } + + // Wait for all reads to complete + for i := 0; i < numReaders; i++ { + <-done + } + elapsed := time.Since(start) + + // Verify all data was read correctly + for i := 0; i < numReaders; i++ { + require.Equal(t, data, buffers[i]) + } + + // Total bytes read: 2 * 500 = 1000 bytes + // At 200 bytes/s with 200 byte burst: first 200 bytes immediate, remaining 800 bytes at 200 bytes/s = 4 seconds + require.GreaterOrEqual(t, elapsed, 3*time.Second, + "concurrent reads should share rate limit and be throttled (read %d bytes total at %d bytes/s)", dataSize*numReaders, rateLimit) +} diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 263a86cdff0b2..ce96677760c29 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -125,27 +125,29 @@ type Limits struct { PerStreamRateLimitBurst flagext.ByteSize `yaml:"per_stream_rate_limit_burst" json:"per_stream_rate_limit_burst"` // Querier enforced limits. - MaxChunksPerQuery int `yaml:"max_chunks_per_query" json:"max_chunks_per_query"` - MaxQuerySeries int `yaml:"max_query_series" json:"max_query_series"` - MaxQueryLookback model.Duration `yaml:"max_query_lookback" json:"max_query_lookback"` - MaxQueryLength model.Duration `yaml:"max_query_length" json:"max_query_length"` - MaxQueryRange model.Duration `yaml:"max_query_range" json:"max_query_range"` - MaxQueryParallelism int `yaml:"max_query_parallelism" json:"max_query_parallelism"` - TSDBMaxQueryParallelism int `yaml:"tsdb_max_query_parallelism" json:"tsdb_max_query_parallelism"` - TSDBMaxBytesPerShard flagext.ByteSize `yaml:"tsdb_max_bytes_per_shard" json:"tsdb_max_bytes_per_shard"` - TSDBShardingStrategy string `yaml:"tsdb_sharding_strategy" json:"tsdb_sharding_strategy"` - TSDBPrecomputeChunks bool `yaml:"tsdb_precompute_chunks" json:"tsdb_precompute_chunks"` - CardinalityLimit int `yaml:"cardinality_limit" json:"cardinality_limit"` - MaxStreamsMatchersPerQuery int `yaml:"max_streams_matchers_per_query" json:"max_streams_matchers_per_query"` - MaxConcurrentTailRequests int `yaml:"max_concurrent_tail_requests" json:"max_concurrent_tail_requests"` - MaxEntriesLimitPerQuery int `yaml:"max_entries_limit_per_query" json:"max_entries_limit_per_query"` - MaxCacheFreshness model.Duration `yaml:"max_cache_freshness_per_query" json:"max_cache_freshness_per_query"` - MaxMetadataCacheFreshness model.Duration `yaml:"max_metadata_cache_freshness" json:"max_metadata_cache_freshness"` - MaxStatsCacheFreshness model.Duration `yaml:"max_stats_cache_freshness" json:"max_stats_cache_freshness"` - MaxQueriersPerTenant uint `yaml:"max_queriers_per_tenant" json:"max_queriers_per_tenant"` - MaxQueryCapacity float64 `yaml:"max_query_capacity" json:"max_query_capacity"` - QueryReadyIndexNumDays int `yaml:"query_ready_index_num_days" json:"query_ready_index_num_days"` - QueryTimeout model.Duration `yaml:"query_timeout" json:"query_timeout"` + MaxChunksPerQuery int `yaml:"max_chunks_per_query" json:"max_chunks_per_query"` + MaxQuerySeries int `yaml:"max_query_series" json:"max_query_series"` + MaxQueryLookback model.Duration `yaml:"max_query_lookback" json:"max_query_lookback"` + MaxQueryLength model.Duration `yaml:"max_query_length" json:"max_query_length"` + MaxQueryRange model.Duration `yaml:"max_query_range" json:"max_query_range"` + MaxQueryParallelism int `yaml:"max_query_parallelism" json:"max_query_parallelism"` + TSDBMaxQueryParallelism int `yaml:"tsdb_max_query_parallelism" json:"tsdb_max_query_parallelism"` + TSDBMaxBytesPerShard flagext.ByteSize `yaml:"tsdb_max_bytes_per_shard" json:"tsdb_max_bytes_per_shard"` + TSDBShardingStrategy string `yaml:"tsdb_sharding_strategy" json:"tsdb_sharding_strategy"` + TSDBPrecomputeChunks bool `yaml:"tsdb_precompute_chunks" json:"tsdb_precompute_chunks"` + CardinalityLimit int `yaml:"cardinality_limit" json:"cardinality_limit"` + MaxStreamsMatchersPerQuery int `yaml:"max_streams_matchers_per_query" json:"max_streams_matchers_per_query"` + MaxConcurrentTailRequests int `yaml:"max_concurrent_tail_requests" json:"max_concurrent_tail_requests"` + MaxEntriesLimitPerQuery int `yaml:"max_entries_limit_per_query" json:"max_entries_limit_per_query"` + MaxCacheFreshness model.Duration `yaml:"max_cache_freshness_per_query" json:"max_cache_freshness_per_query"` + MaxMetadataCacheFreshness model.Duration `yaml:"max_metadata_cache_freshness" json:"max_metadata_cache_freshness"` + MaxStatsCacheFreshness model.Duration `yaml:"max_stats_cache_freshness" json:"max_stats_cache_freshness"` + MaxQueriersPerTenant uint `yaml:"max_queriers_per_tenant" json:"max_queriers_per_tenant"` + MaxQueryCapacity float64 `yaml:"max_query_capacity" json:"max_query_capacity"` + QueryReadyIndexNumDays int `yaml:"query_ready_index_num_days" json:"query_ready_index_num_days"` + QueryTimeout model.Duration `yaml:"query_timeout" json:"query_timeout"` + QueryBucketGetObjectRateLimit flagext.ByteSize `yaml:"query_bucket_get_object_rate_limit" json:"query_bucket_get_object_rate_limit"` + QueryBucketGetObjectRateLimitBurst flagext.ByteSize `yaml:"query_bucket_get_object_rate_limit_burst" json:"query_bucket_get_object_rate_limit_burst"` // Query frontend enforced limits. The default is actually parameterized by the queryrange config. QuerySplitDuration model.Duration `yaml:"split_queries_by_interval" json:"split_queries_by_interval"` @@ -366,6 +368,11 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.MaxChunksPerQuery, "store.query-chunk-limit", 2e6, "Maximum number of chunks that can be fetched in a single query.") + _ = l.QueryBucketGetObjectRateLimit.Set("0") + f.Var(&l.QueryBucketGetObjectRateLimit, "querier.query-bucket-get-object-rate-limit", "Maximum bytes per second for bucket GetObject operations during a query. 0 means unlimited. Also expressible in human readable forms (1MB, 256KB, etc).") + _ = l.QueryBucketGetObjectRateLimitBurst.Set("0") + f.Var(&l.QueryBucketGetObjectRateLimitBurst, "querier.query-bucket-get-object-rate-limit-burst", "Maximum burst bytes for bucket GetObject operations during a query. 0 means unlimited. Also expressible in human readable forms (1MB, 256KB, etc).") + _ = l.MaxQueryLength.Set("721h") f.Var(&l.MaxQueryLength, "store.max-query-length", "The limit to length of chunk store queries. 0 to disable.") f.IntVar(&l.MaxQuerySeries, "querier.max-query-series", 500, "Limit the maximum of unique series that is returned by a metric query. When the limit is reached an error is returned.") @@ -941,6 +948,18 @@ func (o *Overrides) MaxEntriesLimitPerQuery(_ context.Context, userID string) in return o.getOverridesForUser(userID).MaxEntriesLimitPerQuery } +// QueryBucketGetObjectRateLimit returns the maximum bytes per second for bucket GetObject operations during a query. +// Returns 0 if unlimited. +func (o *Overrides) QueryBucketGetObjectRateLimit(_ context.Context, userID string) int64 { + return int64(o.getOverridesForUser(userID).QueryBucketGetObjectRateLimit.Val()) +} + +// QueryBucketGetObjectRateLimitBurst returns the maximum burst bytes for bucket GetObject operations during a query. +// Returns 0 if unlimited. +func (o *Overrides) QueryBucketGetObjectRateLimitBurst(_ context.Context, userID string) int64 { + return int64(o.getOverridesForUser(userID).QueryBucketGetObjectRateLimitBurst.Val()) +} + func (o *Overrides) QueryTimeout(_ context.Context, userID string) time.Duration { return time.Duration(o.getOverridesForUser(userID).QueryTimeout) } From 6a5cf7a4f8352378c863443e1c2cf4d6ac80d06a Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Thu, 20 Nov 2025 12:28:20 +0100 Subject: [PATCH 2/3] Log rate limits --- pkg/storage/bucket/object_client_adapter.go | 12 +-- pkg/storage/bucket/rate_limited_reader.go | 87 ++++++++++++++++--- .../bucket/rate_limited_reader_test.go | 12 +-- 3 files changed, 77 insertions(+), 34 deletions(-) diff --git a/pkg/storage/bucket/object_client_adapter.go b/pkg/storage/bucket/object_client_adapter.go index 4715df52a0784..4a76c5d7abc9b 100644 --- a/pkg/storage/bucket/object_client_adapter.go +++ b/pkg/storage/bucket/object_client_adapter.go @@ -130,11 +130,7 @@ func (o *ObjectClientAdapter) GetObject(ctx context.Context, objectKey string) ( // Wrap with rate-limited reader if query has a bandwidth limit if limiter := getQueryRateLimiter(ctx); limiter != nil { - reader = &rateLimitedReader{ - ReadCloser: reader, - limiter: limiter, - ctx: ctx, - } + reader = newRateLimitedReader(ctx, reader, o.logger) } return reader, size, err @@ -148,11 +144,7 @@ func (o *ObjectClientAdapter) GetObjectRange(ctx context.Context, objectKey stri // Wrap with rate-limited reader if query has a bandwidth limit if limiter := getQueryRateLimiter(ctx); limiter != nil { - reader = &rateLimitedReader{ - ReadCloser: reader, - limiter: limiter, - ctx: ctx, - } + reader = newRateLimitedReader(ctx, reader, o.logger) } return reader, nil diff --git a/pkg/storage/bucket/rate_limited_reader.go b/pkg/storage/bucket/rate_limited_reader.go index bdb1f35aba593..12735e961b3f6 100644 --- a/pkg/storage/bucket/rate_limited_reader.go +++ b/pkg/storage/bucket/rate_limited_reader.go @@ -2,9 +2,16 @@ package bucket import ( "context" + "fmt" "io" + "time" + "github.com/dustin/go-humanize" + "github.com/go-kit/log" + "github.com/go-kit/log/level" "golang.org/x/time/rate" + + util_log "github.com/grafana/loki/v3/pkg/util/log" ) // minReadSize is the minimum chunk size for reading data. @@ -57,6 +64,16 @@ type rateLimitedReader struct { io.ReadCloser limiter *rate.Limiter ctx context.Context + logger log.Logger +} + +func newRateLimitedReader(ctx context.Context, readCloser io.ReadCloser, logger log.Logger) *rateLimitedReader { + return &rateLimitedReader{ + ReadCloser: readCloser, + limiter: getQueryRateLimiter(ctx), + ctx: ctx, + logger: logger, + } } // Read reads data from the underlying reader while respecting the rate limit. @@ -74,26 +91,68 @@ func (r *rateLimitedReader) Read(p []byte) (n int, err error) { return r.ReadCloser.Read(p) } - // Read in batches, waiting for rate limiter approval before each batch + // Cap the read size to the minimum read size and the burst + minReadSize := min(minReadSize, burst) totalRead := 0 - for totalRead < len(p) { - remaining := len(p) - totalRead - // Use minReadSize as the read size, but don't exceed burst or remaining buffer - readSize := minReadSize - if readSize > remaining { - readSize = remaining - } - if readSize > burst { - readSize = burst + + // Other logging stats + var ( + rateLimitedCount int + totalWaitTime time.Duration + maxWaitTime time.Duration + ) + + // Defer logging to ensure it happens on all exit paths + defer func() { + if rateLimitedCount > 0 && r.logger != nil { + logger := util_log.WithContext(r.ctx, r.logger) + level.Debug(logger).Log( + "msg", "query rate limited during bucket read operation", + "rateLimitedCount", rateLimitedCount, + "totalWaitTime", totalWaitTime.String(), + "maxWaitTime", maxWaitTime.String(), + "readBufferSize", humanize.Bytes(uint64(len(p))), + "readBytes", humanize.Bytes(uint64(totalRead)), + "remainingBytes", humanize.Bytes(uint64(len(p)-totalRead)), + "err", err, + ) } + }() - // Wait for rate limiter approval before reading this batch - if err := r.limiter.WaitN(r.ctx, readSize); err != nil { + for totalRead < len(p) { + remaining := len(p) - totalRead + // Use minReadSize but cap to the remaining + readSize := min(minReadSize, remaining) + + // Reserve rate limiter tokens for this batch read + reservation := r.limiter.ReserveN(time.Now(), readSize) + if !reservation.OK() { + // Reservation failed (e.g., readSize > burst), return error + // This should not happen in practice since we cap readSize to burst if totalRead > 0 { - // Return what we've read so far return totalRead, nil } - return 0, err + return 0, fmt.Errorf("rate limited reader: reservation failed. readSize (%d) > burst: (%d)?", readSize, burst) + } + + // If we need to wait, record the logging stats and wait for the delay + if delay := reservation.Delay(); delay > 0 { + rateLimitedCount++ + totalWaitTime += delay + maxWaitTime = max(maxWaitTime, delay) + + timer := time.NewTimer(delay) + select { + case <-timer.C: + // Delay completed, proceed + case <-r.ctx.Done(): + timer.Stop() + reservation.Cancel() + if totalRead > 0 { + return totalRead, nil + } + return 0, r.ctx.Err() + } } // Read from underlying reader (up to the approved read size) diff --git a/pkg/storage/bucket/rate_limited_reader_test.go b/pkg/storage/bucket/rate_limited_reader_test.go index 22f1f99d47c32..a5e21399593ae 100644 --- a/pkg/storage/bucket/rate_limited_reader_test.go +++ b/pkg/storage/bucket/rate_limited_reader_test.go @@ -73,11 +73,7 @@ func TestRateLimitedReader(t *testing.T) { // Create rate-limited reader if limiter exists var rlReader io.ReadCloser = reader if limiter := getQueryRateLimiter(ctx); limiter != nil { - rlReader = &rateLimitedReader{ - ReadCloser: reader, - limiter: limiter, - ctx: ctx, - } + rlReader = newRateLimitedReader(ctx, reader, nil) } // Read data and measure time @@ -120,11 +116,7 @@ func TestRateLimitedReader_MultipleReadersShareLimiter(t *testing.T) { readers := make([]io.ReadCloser, numReaders) for i := 0; i < numReaders; i++ { reader := io.NopCloser(bytes.NewReader(data)) - readers[i] = &rateLimitedReader{ - ReadCloser: reader, - limiter: limiter, - ctx: ctx, - } + readers[i] = newRateLimitedReader(ctx, reader, nil) } // Read from all readers concurrently From ee4c0410ec3508f4b9dc6139e44fe7b865e0bee2 Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Thu, 20 Nov 2025 13:03:00 +0100 Subject: [PATCH 3/3] Increase min read size --- docs/sources/shared/configuration.md | 10 ++++++++++ pkg/querier/testutil/limits.go | 8 ++++++++ pkg/storage/bucket/rate_limited_reader.go | 5 ++++- 3 files changed, 22 insertions(+), 1 deletion(-) diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 1df42f5f7e14b..28a8ebdecfcae 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -4316,6 +4316,16 @@ discover_generic_fields: # CLI flag: -querier.query-timeout [query_timeout: | default = 1m] +# Maximum bytes per second for bucket GetObject operations during a query. 0 +# means unlimited. Also expressible in human readable forms (1MB, 256KB, etc). +# CLI flag: -querier.query-bucket-get-object-rate-limit +[query_bucket_get_object_rate_limit: | default = 0B] + +# Maximum burst bytes for bucket GetObject operations during a query. 0 means +# unlimited. Also expressible in human readable forms (1MB, 256KB, etc). +# CLI flag: -querier.query-bucket-get-object-rate-limit-burst +[query_bucket_get_object_rate_limit_burst: | default = 0B] + # Split queries by a time interval and execute in parallel. The value 0 disables # splitting by time. This also determines how cache keys are chosen when result # caching is enabled. diff --git a/pkg/querier/testutil/limits.go b/pkg/querier/testutil/limits.go index 04dcddffa4b20..379fd4e1d021c 100644 --- a/pkg/querier/testutil/limits.go +++ b/pkg/querier/testutil/limits.go @@ -99,3 +99,11 @@ func (m *MockLimits) DebugEngineStreams(_ string) bool { func (m *MockLimits) DebugEngineTasks(_ string) bool { return false } + +func (m *MockLimits) QueryBucketGetObjectRateLimit(_ context.Context, _ string) int64 { + return 0 // 0 means unlimited +} + +func (m *MockLimits) QueryBucketGetObjectRateLimitBurst(_ context.Context, _ string) int64 { + return 0 // 0 means unlimited +} diff --git a/pkg/storage/bucket/rate_limited_reader.go b/pkg/storage/bucket/rate_limited_reader.go index 12735e961b3f6..f42f47894b9c6 100644 --- a/pkg/storage/bucket/rate_limited_reader.go +++ b/pkg/storage/bucket/rate_limited_reader.go @@ -16,7 +16,10 @@ import ( // minReadSize is the minimum chunk size for reading data. // This ensures we read in reasonable-sized batches rather than very small ones. -const minReadSize = 512 +// For typical 1-2MB objects (most of our chunks), 64KB provides a good balance between efficiency +// and rate limiting responsiveness. +// E.g. for 2MB object, 64KB read size is 32 reads, which is reasonable. +const minReadSize = 64 * 1024 // 64KB type rateLimiterKey struct{}