Skip to content

Commit 83ed927

Browse files
feat: write to dataobj partitions based on segmentation key
1 parent 75034c1 commit 83ed927

File tree

7 files changed

+89
-52
lines changed

7 files changed

+89
-52
lines changed

pkg/distributor/dataobj_tee.go

Lines changed: 64 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77

88
"github.com/go-kit/log"
99
"github.com/go-kit/log/level"
10-
"github.com/grafana/dskit/ring"
1110
"github.com/prometheus/client_golang/prometheus"
1211
"github.com/prometheus/client_golang/prometheus/promauto"
1312
"github.com/twmb/franz-go/pkg/kgo"
@@ -16,31 +15,43 @@ import (
1615
)
1716

1817
type DataObjTeeConfig struct {
19-
Enabled bool `yaml:"enabled"`
20-
Topic string `yaml:"topic"`
21-
MaxBufferedBytes int `yaml:"max_buffered_bytes"`
18+
Enabled bool `yaml:"enabled"`
19+
Topic string `yaml:"topic"`
20+
MaxBufferedBytes int `yaml:"max_buffered_bytes"`
21+
PerPartitionRateBytes int `yaml:"per_partition_rate_bytes"`
2222
}
2323

2424
func (c *DataObjTeeConfig) RegisterFlags(f *flag.FlagSet) {
2525
f.BoolVar(&c.Enabled, "distributor.dataobj-tee.enabled", false, "Enable data object tee.")
2626
f.StringVar(&c.Topic, "distributor.dataobj-tee.topic", "", "Topic for data object tee.")
2727
f.IntVar(&c.MaxBufferedBytes, "distributor.dataobj-tee.max-buffered-bytes", 100<<20, "Maximum number of bytes to buffer.")
28+
f.IntVar(&c.PerPartitionRateBytes, "distributor.dataobj-tee.per-partition-rate-bytes", 1024*1024, "The per-tenant partition rate (bytes/sec).")
2829
}
2930

3031
func (c *DataObjTeeConfig) Validate() error {
31-
if c.Enabled && c.Topic == "" {
32+
if !c.Enabled {
33+
return nil
34+
}
35+
if c.Topic == "" {
3236
return errors.New("the topic is required")
3337
}
38+
if c.MaxBufferedBytes < 0 {
39+
return errors.New("max buffered bytes cannot be negative")
40+
}
41+
if c.PerPartitionRateBytes < 0 {
42+
return errors.New("per partition rate bytes cannot be negative")
43+
}
3444
return nil
3545
}
3646

3747
// DataObjTee is a tee that duplicates streams to the data object topic.
3848
// It is a temporary solution while we work on segmentation keys.
3949
type DataObjTee struct {
40-
cfg *DataObjTeeConfig
41-
client *kgo.Client
42-
ringReader ring.PartitionRingReader
43-
logger log.Logger
50+
cfg *DataObjTeeConfig
51+
limitsClient *ingestLimits
52+
kafkaClient *kgo.Client
53+
resolver *SegmentationPartitionResolver
54+
logger log.Logger
4455

4556
// Metrics.
4657
failures prometheus.Counter
@@ -50,16 +61,18 @@ type DataObjTee struct {
5061
// NewDataObjTee returns a new DataObjTee.
5162
func NewDataObjTee(
5263
cfg *DataObjTeeConfig,
53-
client *kgo.Client,
54-
ringReader ring.PartitionRingReader,
64+
resolver *SegmentationPartitionResolver,
65+
limitsClient *ingestLimits,
66+
kafkaClient *kgo.Client,
5567
logger log.Logger,
5668
reg prometheus.Registerer,
5769
) (*DataObjTee, error) {
5870
return &DataObjTee{
59-
cfg: cfg,
60-
client: client,
61-
ringReader: ringReader,
62-
logger: logger,
71+
cfg: cfg,
72+
resolver: resolver,
73+
kafkaClient: kafkaClient,
74+
limitsClient: limitsClient,
75+
logger: logger,
6376
failures: promauto.With(reg).NewCounter(prometheus.CounterOpts{
6477
Name: "loki_distributor_dataobj_tee_duplicate_stream_failures_total",
6578
Help: "Total number of streams that could not be duplicated.",
@@ -71,18 +84,47 @@ func NewDataObjTee(
7184
}, nil
7285
}
7386

87+
// A SegmentedStream is a KeyedStream with a segmentation key.
88+
type SegmentedStream struct {
89+
KeyedStream
90+
SegmentationKey SegmentationKey
91+
}
92+
7493
// Duplicate implements the [Tee] interface.
75-
func (t *DataObjTee) Duplicate(_ context.Context, tenant string, streams []KeyedStream) {
76-
for _, s := range streams {
77-
go t.duplicate(tenant, s)
94+
func (t *DataObjTee) Duplicate(ctx context.Context, tenant string, streams []KeyedStream) {
95+
segmentationKeyStreams := make([]SegmentedStream, 0, len(streams))
96+
for _, stream := range streams {
97+
segmentationKey, err := GetSegmentationKey(stream)
98+
if err != nil {
99+
level.Error(t.logger).Log("msg", "failed to get segmentation key", "err", err)
100+
t.failures.Inc()
101+
return
102+
}
103+
segmentationKeyStreams = append(segmentationKeyStreams, SegmentedStream{
104+
KeyedStream: stream,
105+
SegmentationKey: segmentationKey,
106+
})
107+
}
108+
rates, err := t.limitsClient.UpdateRates(ctx, tenant, segmentationKeyStreams)
109+
if err != nil {
110+
level.Error(t.logger).Log("msg", "failed to update rates", "err", err)
111+
}
112+
// fastRates is a temporary lookup table that lets us find the rate
113+
// for a segmentation key in constant time.
114+
fastRates := make(map[uint64]uint64, len(rates))
115+
for _, rate := range rates {
116+
fastRates[rate.StreamHash] = rate.Rate
117+
}
118+
for _, s := range segmentationKeyStreams {
119+
go t.duplicate(ctx, tenant, s, fastRates[s.SegmentationKey.Sum64()])
78120
}
79121
}
80122

81-
func (t *DataObjTee) duplicate(tenant string, stream KeyedStream) {
123+
func (t *DataObjTee) duplicate(ctx context.Context, tenant string, stream SegmentedStream, rateBytes uint64) {
82124
t.total.Inc()
83-
partition, err := t.ringReader.PartitionRing().ActivePartitionForKey(stream.HashKey)
125+
partition, err := t.resolver.Resolve(ctx, stream.SegmentationKey, rateBytes)
84126
if err != nil {
85-
level.Error(t.logger).Log("msg", "failed to get partition", "err", err)
127+
level.Error(t.logger).Log("msg", "failed to resolve partition", "err", err)
86128
t.failures.Inc()
87129
return
88130
}
@@ -92,7 +134,7 @@ func (t *DataObjTee) duplicate(tenant string, stream KeyedStream) {
92134
t.failures.Inc()
93135
return
94136
}
95-
results := t.client.ProduceSync(context.TODO(), records...)
137+
results := t.kafkaClient.ProduceSync(ctx, records...)
96138
if err := results.FirstErr(); err != nil {
97139
level.Error(t.logger).Log("msg", "failed to produce records", "err", err)
98140
t.failures.Inc()

pkg/distributor/distributor.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,8 @@ func New(
283283
return nil, fmt.Errorf("partition ring is required for kafka writes")
284284
}
285285

286+
ingestLimits := newIngestLimits(limitsFrontendClient, registerer)
287+
286288
var kafkaWriter KafkaProducer
287289
if cfg.KafkaEnabled {
288290
kafkaClient, err := kafka_client.NewWriterClient("distributor", cfg.KafkaConfig, 20, logger, registerer)
@@ -295,10 +297,17 @@ func New(
295297
)
296298

297299
if cfg.DataObjTeeConfig.Enabled {
300+
resolver := NewSegmentationPartitionResolver(
301+
uint64(cfg.DataObjTeeConfig.PerPartitionRateBytes),
302+
dataObjConsumerPartitionRing,
303+
registerer,
304+
logger,
305+
)
298306
dataObjTee, err := NewDataObjTee(
299307
&cfg.DataObjTeeConfig,
308+
resolver,
309+
ingestLimits,
300310
kafkaClient,
301-
dataObjConsumerPartitionRing,
302311
logger,
303312
registerer,
304313
)
@@ -379,7 +388,7 @@ func New(
379388
writeFailuresManager: writefailures.NewManager(logger, registerer, cfg.WriteFailuresLogging, configs, "distributor"),
380389
kafkaWriter: kafkaWriter,
381390
partitionRing: partitionRing,
382-
ingestLimits: newIngestLimits(limitsFrontendClient, registerer),
391+
ingestLimits: ingestLimits,
383392
numMetadataPartitions: numMetadataPartitions,
384393
}
385394

pkg/distributor/ingest_limits.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ func newExceedsLimitsRequest(tenant string, streams []KeyedStream) (*proto.Excee
194194
// UpdateRates updates the rates for the streams and returns a slice of the
195195
// updated rates for all streams. Any streams that could not have rates updated
196196
// have a rate of zero.
197-
func (l *ingestLimits) UpdateRates(ctx context.Context, tenant string, streams []KeyedStream) ([]*proto.UpdateRatesResult, error) {
197+
func (l *ingestLimits) UpdateRates(ctx context.Context, tenant string, streams []SegmentedStream) ([]*proto.UpdateRatesResult, error) {
198198
l.requests.WithLabelValues("UpdateRates").Inc()
199199
req, err := newUpdateRatesRequest(tenant, streams)
200200
if err != nil {
@@ -209,7 +209,7 @@ func (l *ingestLimits) UpdateRates(ctx context.Context, tenant string, streams [
209209
return resp.Results, nil
210210
}
211211

212-
func newUpdateRatesRequest(tenant string, streams []KeyedStream) (*proto.UpdateRatesRequest, error) {
212+
func newUpdateRatesRequest(tenant string, streams []SegmentedStream) (*proto.UpdateRatesRequest, error) {
213213
// The distributor sends the hashes of all streams in the request to the
214214
// limits-frontend. The limits-frontend is responsible for deciding if
215215
// the request would exceed the tenants limits, and if so, which streams
@@ -218,7 +218,7 @@ func newUpdateRatesRequest(tenant string, streams []KeyedStream) (*proto.UpdateR
218218
for _, stream := range streams {
219219
entriesSize, structuredMetadataSize := calculateStreamSizes(stream.Stream)
220220
streamMetadata = append(streamMetadata, &proto.StreamMetadata{
221-
StreamHash: stream.HashKeyNoShard,
221+
StreamHash: stream.SegmentationKey.Sum64(),
222222
TotalSize: entriesSize + structuredMetadataSize,
223223
IngestionPolicy: stream.Policy,
224224
})

pkg/distributor/ingest_limits_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ func TestIngestLimits_UpdateRates(t *testing.T) {
302302
tests := []struct {
303303
name string
304304
tenant string
305-
streams []KeyedStream
305+
streams []SegmentedStream
306306
expectedRequest *proto.UpdateRatesRequest
307307
response *proto.UpdateRatesResponse
308308
responseErr error
@@ -311,7 +311,7 @@ func TestIngestLimits_UpdateRates(t *testing.T) {
311311
}{{
312312
name: "error should be returned if rates cannot be updated",
313313
tenant: "test",
314-
streams: []KeyedStream{{
314+
streams: []SegmentedStream{{
315315
HashKeyNoShard: 1,
316316
}},
317317
expectedRequest: &proto.UpdateRatesRequest{
@@ -325,7 +325,7 @@ func TestIngestLimits_UpdateRates(t *testing.T) {
325325
}, {
326326
name: "updates rates",
327327
tenant: "test",
328-
streams: []KeyedStream{{
328+
streams: []SegmentedStream{{
329329
HashKeyNoShard: 1,
330330
}},
331331
expectedRequest: &proto.UpdateRatesRequest{

pkg/limits/frontend/frontend.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,10 +137,7 @@ func (f *Frontend) ExceedsLimits(ctx context.Context, req *proto.ExceedsLimitsRe
137137
return &proto.ExceedsLimitsResponse{Results: results}, nil
138138
}
139139

140-
func (f *Frontend) UpdateRates(
141-
ctx context.Context,
142-
req *proto.UpdateRatesRequest,
143-
) (*proto.UpdateRatesResponse, error) {
140+
func (f *Frontend) UpdateRates(ctx context.Context, req *proto.UpdateRatesRequest) (*proto.UpdateRatesResponse, error) {
144141
results := make([]*proto.UpdateRatesResult, 0, len(req.Streams))
145142
resps, err := f.limitsClient.UpdateRates(ctx, req)
146143
if err != nil {

pkg/limits/frontend/ring.go

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,7 @@ func newRingLimitsClient(
7373
}
7474

7575
// ExceedsLimits implements the [exceedsLimitsGatherer] interface.
76-
func (r *ringLimitsClient) ExceedsLimits(
77-
ctx context.Context,
78-
req *proto.ExceedsLimitsRequest,
79-
) ([]*proto.ExceedsLimitsResponse, error) {
76+
func (r *ringLimitsClient) ExceedsLimits(ctx context.Context, req *proto.ExceedsLimitsRequest) ([]*proto.ExceedsLimitsResponse, error) {
8077
if len(req.Streams) == 0 {
8178
return nil, nil
8279
}
@@ -101,10 +98,7 @@ func (r *ringLimitsClient) ExceedsLimits(
10198
}
10299

103100
// UpdateRates implements the [exceedsLimitsGatherer] interface.
104-
func (r *ringLimitsClient) UpdateRates(
105-
ctx context.Context,
106-
req *proto.UpdateRatesRequest,
107-
) ([]*proto.UpdateRatesResponse, error) {
101+
func (r *ringLimitsClient) UpdateRates(ctx context.Context, req *proto.UpdateRatesRequest) ([]*proto.UpdateRatesResponse, error) {
108102
if len(req.Streams) == 0 {
109103
return nil, nil
110104
}
@@ -238,12 +232,7 @@ type doRPCsFunc func(
238232

239233
// exhaustAllZones queries all zones, one at a time, until either all streams
240234
// have been answered or all zones have been exhausted.
241-
func (r *ringLimitsClient) exhaustAllZones(
242-
ctx context.Context,
243-
tenant string,
244-
streams []*proto.StreamMetadata,
245-
doRPCs doRPCsFunc,
246-
) ([]*proto.StreamMetadata, error) {
235+
func (r *ringLimitsClient) exhaustAllZones(ctx context.Context, tenant string, streams []*proto.StreamMetadata, doRPCs doRPCsFunc) ([]*proto.StreamMetadata, error) {
247236
zonesIter, err := r.allZones(ctx)
248237
if err != nil {
249238
return nil, err

pkg/limits/frontend/ring_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import (
1515
"github.com/grafana/loki/v3/pkg/limits/proto"
1616
)
1717

18-
func TestRingGatherer_ExceedsLimits(t *testing.T) {
18+
func TestRingLimitsClient_ExceedsLimits(t *testing.T) {
1919
tests := []struct {
2020
name string
2121
request *proto.ExceedsLimitsRequest
@@ -442,7 +442,7 @@ func TestRingGatherer_ExceedsLimits(t *testing.T) {
442442
}
443443
}
444444

445-
func TestRingStreamUsageGatherer_GetZoneAwarePartitionConsumers(t *testing.T) {
445+
func TestRingLimitsClient_GetZoneAwarePartitionConsumers(t *testing.T) {
446446
tests := []struct {
447447
name string
448448
instances []ring.InstanceDesc
@@ -603,7 +603,7 @@ func TestRingStreamUsageGatherer_GetZoneAwarePartitionConsumers(t *testing.T) {
603603
}
604604
}
605605

606-
func TestRingStreamUsageGatherer_GetPartitionConsumers(t *testing.T) {
606+
func TestRingLimitsClient_GetPartitionConsumers(t *testing.T) {
607607
tests := []struct {
608608
name string
609609
// Instances contains the complete set of instances that should be mocked.
@@ -740,7 +740,7 @@ func TestRingStreamUsageGatherer_GetPartitionConsumers(t *testing.T) {
740740
}
741741
}
742742

743-
func TestRingStreamUsageGatherer_GetPartitionConsumers_Caching(t *testing.T) {
743+
func TestRingLimitsClient_GetPartitionConsumers_Caching(t *testing.T) {
744744
// Set up the mock clients.
745745
req0 := proto.GetAssignedPartitionsResponse{
746746
AssignedPartitions: map[int32]int64{

0 commit comments

Comments
 (0)