Skip to content

Commit 3a24f5d

Browse files
feat: write to dataobj partitions based on segmentation key (#19946)
1 parent 8394909 commit 3a24f5d

File tree

8 files changed

+120
-63
lines changed

8 files changed

+120
-63
lines changed

docs/sources/shared/configuration.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3188,6 +3188,14 @@ dataobj_tee:
31883188
# Maximum number of bytes to buffer.
31893189
# CLI flag: -distributor.dataobj-tee.max-buffered-bytes
31903190
[max_buffered_bytes: <int> | default = 104857600]
3191+
3192+
# The per-tenant partition rate (bytes/sec).
3193+
# CLI flag: -distributor.dataobj-tee.per-partition-rate-bytes
3194+
[per_partition_rate_bytes: <int> | default = 1048576]
3195+
3196+
# Enables optional debug metrics.
3197+
# CLI flag: -distributor.dataobj-tee.debug-metrics-enabled
3198+
[debug_metrics_enabled: <boolean> | default = false]
31913199
```
31923200

31933201
### etcd

pkg/distributor/dataobj_tee.go

Lines changed: 82 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@ import (
44
"context"
55
"errors"
66
"flag"
7+
"strconv"
78

89
"github.com/go-kit/log"
910
"github.com/go-kit/log/level"
10-
"github.com/grafana/dskit/ring"
1111
"github.com/prometheus/client_golang/prometheus"
1212
"github.com/prometheus/client_golang/prometheus/promauto"
1313
"github.com/twmb/franz-go/pkg/kgo"
@@ -16,50 +16,70 @@ import (
1616
)
1717

1818
type DataObjTeeConfig struct {
19-
Enabled bool `yaml:"enabled"`
20-
Topic string `yaml:"topic"`
21-
MaxBufferedBytes int `yaml:"max_buffered_bytes"`
19+
Enabled bool `yaml:"enabled"`
20+
Topic string `yaml:"topic"`
21+
MaxBufferedBytes int `yaml:"max_buffered_bytes"`
22+
PerPartitionRateBytes int `yaml:"per_partition_rate_bytes"`
23+
DebugMetricsEnabled bool `yaml:"debug_metrics_enabled"`
2224
}
2325

2426
func (c *DataObjTeeConfig) RegisterFlags(f *flag.FlagSet) {
2527
f.BoolVar(&c.Enabled, "distributor.dataobj-tee.enabled", false, "Enable data object tee.")
2628
f.StringVar(&c.Topic, "distributor.dataobj-tee.topic", "", "Topic for data object tee.")
2729
f.IntVar(&c.MaxBufferedBytes, "distributor.dataobj-tee.max-buffered-bytes", 100<<20, "Maximum number of bytes to buffer.")
30+
f.IntVar(&c.PerPartitionRateBytes, "distributor.dataobj-tee.per-partition-rate-bytes", 1024*1024, "The per-tenant partition rate (bytes/sec).")
31+
f.BoolVar(&c.DebugMetricsEnabled, "distributor.dataobj-tee.debug-metrics-enabled", false, "Enables optional debug metrics.")
2832
}
2933

3034
func (c *DataObjTeeConfig) Validate() error {
31-
if c.Enabled && c.Topic == "" {
35+
if !c.Enabled {
36+
return nil
37+
}
38+
if c.Topic == "" {
3239
return errors.New("the topic is required")
3340
}
41+
if c.MaxBufferedBytes < 0 {
42+
return errors.New("max buffered bytes cannot be negative")
43+
}
44+
if c.PerPartitionRateBytes < 0 {
45+
return errors.New("per partition rate bytes cannot be negative")
46+
}
3447
return nil
3548
}
3649

3750
// DataObjTee is a tee that duplicates streams to the data object topic.
3851
// It is a temporary solution while we work on segmentation keys.
3952
type DataObjTee struct {
40-
cfg *DataObjTeeConfig
41-
client *kgo.Client
42-
ringReader ring.PartitionRingReader
43-
logger log.Logger
53+
cfg *DataObjTeeConfig
54+
limitsClient *ingestLimits
55+
kafkaClient *kgo.Client
56+
resolver *SegmentationPartitionResolver
57+
logger log.Logger
4458

4559
// Metrics.
4660
failures prometheus.Counter
4761
total prometheus.Counter
62+
63+
// High cardinality metrics which are only emitted when debug metrics
64+
// are enabled.
65+
produced *prometheus.CounterVec
4866
}
4967

5068
// NewDataObjTee returns a new DataObjTee.
5169
func NewDataObjTee(
5270
cfg *DataObjTeeConfig,
53-
client *kgo.Client,
54-
ringReader ring.PartitionRingReader,
71+
resolver *SegmentationPartitionResolver,
72+
limitsClient *ingestLimits,
73+
kafkaClient *kgo.Client,
5574
logger log.Logger,
5675
reg prometheus.Registerer,
5776
) (*DataObjTee, error) {
5877
return &DataObjTee{
59-
cfg: cfg,
60-
client: client,
61-
ringReader: ringReader,
62-
logger: logger,
78+
cfg: cfg,
79+
resolver: resolver,
80+
kafkaClient: kafkaClient,
81+
limitsClient: limitsClient,
82+
logger: logger,
6383
failures: promauto.With(reg).NewCounter(prometheus.CounterOpts{
6484
Name: "loki_distributor_dataobj_tee_duplicate_stream_failures_total",
6585
Help: "Total number of streams that could not be duplicated.",
@@ -68,21 +88,54 @@ func NewDataObjTee(
6888
Name: "loki_distributor_dataobj_tee_duplicate_streams_total",
6989
Help: "Total number of streams duplicated.",
7090
}),
91+
produced: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
92+
Name: "loki_distributor_dataobj_tee_produced_bytes_total",
93+
Help: "Total number of bytes produced to each partition.",
94+
}, []string{"tenant", "partition", "segmentation_key"}),
7195
}, nil
7296
}
7397

98+
// A SegmentedStream is a KeyedStream with a segmentation key.
99+
type SegmentedStream struct {
100+
KeyedStream
101+
SegmentationKey SegmentationKey
102+
}
103+
74104
// Duplicate implements the [Tee] interface.
75-
func (t *DataObjTee) Duplicate(_ context.Context, tenant string, streams []KeyedStream) {
76-
for _, s := range streams {
77-
go t.duplicate(tenant, s)
105+
func (t *DataObjTee) Duplicate(ctx context.Context, tenant string, streams []KeyedStream) {
106+
segmentationKeyStreams := make([]SegmentedStream, 0, len(streams))
107+
for _, stream := range streams {
108+
segmentationKey, err := GetSegmentationKey(stream)
109+
if err != nil {
110+
level.Error(t.logger).Log("msg", "failed to get segmentation key", "err", err)
111+
t.failures.Inc()
112+
return
113+
}
114+
segmentationKeyStreams = append(segmentationKeyStreams, SegmentedStream{
115+
KeyedStream: stream,
116+
SegmentationKey: segmentationKey,
117+
})
118+
}
119+
rates, err := t.limitsClient.UpdateRates(ctx, tenant, segmentationKeyStreams)
120+
if err != nil {
121+
level.Error(t.logger).Log("msg", "failed to update rates", "err", err)
122+
}
123+
// fastRates is a temporary lookup table that lets us find the rate
124+
// for a segmentation key in constant time.
125+
fastRates := make(map[uint64]uint64, len(rates))
126+
for _, rate := range rates {
127+
fastRates[rate.StreamHash] = rate.Rate
128+
}
129+
for _, s := range segmentationKeyStreams {
130+
go t.duplicate(ctx, tenant, s, fastRates[s.SegmentationKey.Sum64()])
78131
}
79132
}
80133

81-
func (t *DataObjTee) duplicate(tenant string, stream KeyedStream) {
134+
func (t *DataObjTee) duplicate(ctx context.Context, tenant string, stream SegmentedStream, rateBytes uint64) {
82135
t.total.Inc()
83-
partition, err := t.ringReader.PartitionRing().ActivePartitionForKey(stream.HashKey)
136+
partition, err := t.resolver.Resolve(ctx, stream.SegmentationKey, rateBytes)
84137
if err != nil {
85-
level.Error(t.logger).Log("msg", "failed to get partition", "err", err)
138+
level.Error(t.logger).Log("msg", "failed to resolve partition", "err", err)
86139
t.failures.Inc()
87140
return
88141
}
@@ -92,9 +145,16 @@ func (t *DataObjTee) duplicate(tenant string, stream KeyedStream) {
92145
t.failures.Inc()
93146
return
94147
}
95-
results := t.client.ProduceSync(context.TODO(), records...)
148+
results := t.kafkaClient.ProduceSync(ctx, records...)
96149
if err := results.FirstErr(); err != nil {
97150
level.Error(t.logger).Log("msg", "failed to produce records", "err", err)
98151
t.failures.Inc()
99152
}
153+
if t.cfg.DebugMetricsEnabled {
154+
t.produced.WithLabelValues(
155+
tenant,
156+
strconv.FormatInt(int64(partition), 10),
157+
string(stream.SegmentationKey),
158+
).Add(float64(stream.Stream.Size()))
159+
}
100160
}

pkg/distributor/distributor.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,8 @@ func New(
283283
return nil, fmt.Errorf("partition ring is required for kafka writes")
284284
}
285285

286+
ingestLimits := newIngestLimits(limitsFrontendClient, registerer)
287+
286288
var kafkaWriter KafkaProducer
287289
if cfg.KafkaEnabled {
288290
kafkaClient, err := kafka_client.NewWriterClient("distributor", cfg.KafkaConfig, 20, logger, registerer)
@@ -295,10 +297,17 @@ func New(
295297
)
296298

297299
if cfg.DataObjTeeConfig.Enabled {
300+
resolver := NewSegmentationPartitionResolver(
301+
uint64(cfg.DataObjTeeConfig.PerPartitionRateBytes),
302+
dataObjConsumerPartitionRing,
303+
registerer,
304+
logger,
305+
)
298306
dataObjTee, err := NewDataObjTee(
299307
&cfg.DataObjTeeConfig,
308+
resolver,
309+
ingestLimits,
300310
kafkaClient,
301-
dataObjConsumerPartitionRing,
302311
logger,
303312
registerer,
304313
)
@@ -379,7 +388,7 @@ func New(
379388
writeFailuresManager: writefailures.NewManager(logger, registerer, cfg.WriteFailuresLogging, configs, "distributor"),
380389
kafkaWriter: kafkaWriter,
381390
partitionRing: partitionRing,
382-
ingestLimits: newIngestLimits(limitsFrontendClient, registerer),
391+
ingestLimits: ingestLimits,
383392
numMetadataPartitions: numMetadataPartitions,
384393
}
385394

pkg/distributor/ingest_limits.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ func newExceedsLimitsRequest(tenant string, streams []KeyedStream) (*proto.Excee
194194
// UpdateRates updates the rates for the streams and returns a slice of the
195195
// updated rates for all streams. Any streams that could not have rates updated
196196
// have a rate of zero.
197-
func (l *ingestLimits) UpdateRates(ctx context.Context, tenant string, streams []KeyedStream) ([]*proto.UpdateRatesResult, error) {
197+
func (l *ingestLimits) UpdateRates(ctx context.Context, tenant string, streams []SegmentedStream) ([]*proto.UpdateRatesResult, error) {
198198
l.requests.WithLabelValues("UpdateRates").Inc()
199199
req, err := newUpdateRatesRequest(tenant, streams)
200200
if err != nil {
@@ -209,7 +209,7 @@ func (l *ingestLimits) UpdateRates(ctx context.Context, tenant string, streams [
209209
return resp.Results, nil
210210
}
211211

212-
func newUpdateRatesRequest(tenant string, streams []KeyedStream) (*proto.UpdateRatesRequest, error) {
212+
func newUpdateRatesRequest(tenant string, streams []SegmentedStream) (*proto.UpdateRatesRequest, error) {
213213
// The distributor sends the hashes of all streams in the request to the
214214
// limits-frontend. The limits-frontend is responsible for deciding if
215215
// the request would exceed the tenants limits, and if so, which streams
@@ -218,7 +218,7 @@ func newUpdateRatesRequest(tenant string, streams []KeyedStream) (*proto.UpdateR
218218
for _, stream := range streams {
219219
entriesSize, structuredMetadataSize := calculateStreamSizes(stream.Stream)
220220
streamMetadata = append(streamMetadata, &proto.StreamMetadata{
221-
StreamHash: stream.HashKeyNoShard,
221+
StreamHash: stream.SegmentationKey.Sum64(),
222222
TotalSize: entriesSize + structuredMetadataSize,
223223
IngestionPolicy: stream.Policy,
224224
})

pkg/distributor/ingest_limits_test.go

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ func TestIngestLimits_UpdateRates(t *testing.T) {
302302
tests := []struct {
303303
name string
304304
tenant string
305-
streams []KeyedStream
305+
streams []SegmentedStream
306306
expectedRequest *proto.UpdateRatesRequest
307307
response *proto.UpdateRatesResponse
308308
responseErr error
@@ -311,37 +311,31 @@ func TestIngestLimits_UpdateRates(t *testing.T) {
311311
}{{
312312
name: "error should be returned if rates cannot be updated",
313313
tenant: "test",
314-
streams: []KeyedStream{{
315-
HashKeyNoShard: 1,
314+
streams: []SegmentedStream{{
315+
SegmentationKey: "test",
316316
}},
317-
expectedRequest: &proto.UpdateRatesRequest{
318-
Tenant: "test",
319-
Streams: []*proto.StreamMetadata{{
320-
StreamHash: 1,
321-
}},
322-
},
323317
responseErr: errors.New("failed to update rates"),
324318
expectedErr: "failed to update rates",
325319
}, {
326320
name: "updates rates",
327321
tenant: "test",
328-
streams: []KeyedStream{{
329-
HashKeyNoShard: 1,
322+
streams: []SegmentedStream{{
323+
SegmentationKey: "test",
330324
}},
331325
expectedRequest: &proto.UpdateRatesRequest{
332326
Tenant: "test",
333327
Streams: []*proto.StreamMetadata{{
334-
StreamHash: 1,
328+
StreamHash: 0xb5fb79e24c92922f,
335329
}},
336330
},
337331
response: &proto.UpdateRatesResponse{
338332
Results: []*proto.UpdateRatesResult{{
339-
StreamHash: 1,
333+
StreamHash: 0xb5fb79e24c92922f,
340334
Rate: 1024,
341335
}},
342336
},
343337
expectedResult: []*proto.UpdateRatesResult{{
344-
StreamHash: 1,
338+
StreamHash: 0xb5fb79e24c92922f,
345339
Rate: 1024,
346340
}},
347341
}}

pkg/limits/frontend/frontend.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,10 +137,7 @@ func (f *Frontend) ExceedsLimits(ctx context.Context, req *proto.ExceedsLimitsRe
137137
return &proto.ExceedsLimitsResponse{Results: results}, nil
138138
}
139139

140-
func (f *Frontend) UpdateRates(
141-
ctx context.Context,
142-
req *proto.UpdateRatesRequest,
143-
) (*proto.UpdateRatesResponse, error) {
140+
func (f *Frontend) UpdateRates(ctx context.Context, req *proto.UpdateRatesRequest) (*proto.UpdateRatesResponse, error) {
144141
results := make([]*proto.UpdateRatesResult, 0, len(req.Streams))
145142
resps, err := f.limitsClient.UpdateRates(ctx, req)
146143
if err != nil {

pkg/limits/frontend/ring.go

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,7 @@ func newRingLimitsClient(
7373
}
7474

7575
// ExceedsLimits implements the [exceedsLimitsGatherer] interface.
76-
func (r *ringLimitsClient) ExceedsLimits(
77-
ctx context.Context,
78-
req *proto.ExceedsLimitsRequest,
79-
) ([]*proto.ExceedsLimitsResponse, error) {
76+
func (r *ringLimitsClient) ExceedsLimits(ctx context.Context, req *proto.ExceedsLimitsRequest) ([]*proto.ExceedsLimitsResponse, error) {
8077
if len(req.Streams) == 0 {
8178
return nil, nil
8279
}
@@ -101,10 +98,7 @@ func (r *ringLimitsClient) ExceedsLimits(
10198
}
10299

103100
// UpdateRates implements the [exceedsLimitsGatherer] interface.
104-
func (r *ringLimitsClient) UpdateRates(
105-
ctx context.Context,
106-
req *proto.UpdateRatesRequest,
107-
) ([]*proto.UpdateRatesResponse, error) {
101+
func (r *ringLimitsClient) UpdateRates(ctx context.Context, req *proto.UpdateRatesRequest) ([]*proto.UpdateRatesResponse, error) {
108102
if len(req.Streams) == 0 {
109103
return nil, nil
110104
}
@@ -238,12 +232,7 @@ type doRPCsFunc func(
238232

239233
// exhaustAllZones queries all zones, one at a time, until either all streams
240234
// have been answered or all zones have been exhausted.
241-
func (r *ringLimitsClient) exhaustAllZones(
242-
ctx context.Context,
243-
tenant string,
244-
streams []*proto.StreamMetadata,
245-
doRPCs doRPCsFunc,
246-
) ([]*proto.StreamMetadata, error) {
235+
func (r *ringLimitsClient) exhaustAllZones(ctx context.Context, tenant string, streams []*proto.StreamMetadata, doRPCs doRPCsFunc) ([]*proto.StreamMetadata, error) {
247236
zonesIter, err := r.allZones(ctx)
248237
if err != nil {
249238
return nil, err

pkg/limits/frontend/ring_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import (
1515
"github.com/grafana/loki/v3/pkg/limits/proto"
1616
)
1717

18-
func TestRingGatherer_ExceedsLimits(t *testing.T) {
18+
func TestRingLimitsClient_ExceedsLimits(t *testing.T) {
1919
tests := []struct {
2020
name string
2121
request *proto.ExceedsLimitsRequest
@@ -442,7 +442,7 @@ func TestRingGatherer_ExceedsLimits(t *testing.T) {
442442
}
443443
}
444444

445-
func TestRingStreamUsageGatherer_GetZoneAwarePartitionConsumers(t *testing.T) {
445+
func TestRingLimitsClient_GetZoneAwarePartitionConsumers(t *testing.T) {
446446
tests := []struct {
447447
name string
448448
instances []ring.InstanceDesc
@@ -603,7 +603,7 @@ func TestRingStreamUsageGatherer_GetZoneAwarePartitionConsumers(t *testing.T) {
603603
}
604604
}
605605

606-
func TestRingStreamUsageGatherer_GetPartitionConsumers(t *testing.T) {
606+
func TestRingLimitsClient_GetPartitionConsumers(t *testing.T) {
607607
tests := []struct {
608608
name string
609609
// Instances contains the complete set of instances that should be mocked.
@@ -740,7 +740,7 @@ func TestRingStreamUsageGatherer_GetPartitionConsumers(t *testing.T) {
740740
}
741741
}
742742

743-
func TestRingStreamUsageGatherer_GetPartitionConsumers_Caching(t *testing.T) {
743+
func TestRingLimitsClient_GetPartitionConsumers_Caching(t *testing.T) {
744744
// Set up the mock clients.
745745
req0 := proto.GetAssignedPartitionsResponse{
746746
AssignedPartitions: map[int32]int64{

0 commit comments

Comments
 (0)