Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3188,6 +3188,14 @@ dataobj_tee:
# Maximum number of bytes to buffer.
# CLI flag: -distributor.dataobj-tee.max-buffered-bytes
[max_buffered_bytes: <int> | default = 104857600]

# The per-tenant partition rate (bytes/sec).
# CLI flag: -distributor.dataobj-tee.per-partition-rate-bytes
[per_partition_rate_bytes: <int> | default = 1048576]

# Enables optional debug metrics.
# CLI flag: -distributor.dataobj-tee.debug-metrics-enabled
[debug_metrics_enabled: <boolean> | default = false]
```

### etcd
Expand Down
104 changes: 82 additions & 22 deletions pkg/distributor/dataobj_tee.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import (
"context"
"errors"
"flag"
"strconv"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/ring"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/twmb/franz-go/pkg/kgo"
Expand All @@ -16,50 +16,70 @@ import (
)

type DataObjTeeConfig struct {
Enabled bool `yaml:"enabled"`
Topic string `yaml:"topic"`
MaxBufferedBytes int `yaml:"max_buffered_bytes"`
Enabled bool `yaml:"enabled"`
Topic string `yaml:"topic"`
MaxBufferedBytes int `yaml:"max_buffered_bytes"`
PerPartitionRateBytes int `yaml:"per_partition_rate_bytes"`
DebugMetricsEnabled bool `yaml:"debug_metrics_enabled"`
}

func (c *DataObjTeeConfig) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&c.Enabled, "distributor.dataobj-tee.enabled", false, "Enable data object tee.")
f.StringVar(&c.Topic, "distributor.dataobj-tee.topic", "", "Topic for data object tee.")
f.IntVar(&c.MaxBufferedBytes, "distributor.dataobj-tee.max-buffered-bytes", 100<<20, "Maximum number of bytes to buffer.")
f.IntVar(&c.PerPartitionRateBytes, "distributor.dataobj-tee.per-partition-rate-bytes", 1024*1024, "The per-tenant partition rate (bytes/sec).")
f.BoolVar(&c.DebugMetricsEnabled, "distributor.dataobj-tee.debug-metrics-enabled", false, "Enables optional debug metrics.")
}

func (c *DataObjTeeConfig) Validate() error {
if c.Enabled && c.Topic == "" {
if !c.Enabled {
return nil
}
if c.Topic == "" {
return errors.New("the topic is required")
}
if c.MaxBufferedBytes < 0 {
return errors.New("max buffered bytes cannot be negative")
}
if c.PerPartitionRateBytes < 0 {
return errors.New("per partition rate bytes cannot be negative")
}
return nil
}

// DataObjTee is a tee that duplicates streams to the data object topic.
// It is a temporary solution while we work on segmentation keys.
type DataObjTee struct {
cfg *DataObjTeeConfig
client *kgo.Client
ringReader ring.PartitionRingReader
logger log.Logger
cfg *DataObjTeeConfig
limitsClient *ingestLimits
kafkaClient *kgo.Client
resolver *SegmentationPartitionResolver
logger log.Logger

// Metrics.
failures prometheus.Counter
total prometheus.Counter

// High cardinality metrics which are only emitted when debug metrics
// are enabled.
produced *prometheus.CounterVec
}

// NewDataObjTee returns a new DataObjTee.
func NewDataObjTee(
cfg *DataObjTeeConfig,
client *kgo.Client,
ringReader ring.PartitionRingReader,
resolver *SegmentationPartitionResolver,
limitsClient *ingestLimits,
kafkaClient *kgo.Client,
logger log.Logger,
reg prometheus.Registerer,
) (*DataObjTee, error) {
return &DataObjTee{
cfg: cfg,
client: client,
ringReader: ringReader,
logger: logger,
cfg: cfg,
resolver: resolver,
kafkaClient: kafkaClient,
limitsClient: limitsClient,
logger: logger,
failures: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "loki_distributor_dataobj_tee_duplicate_stream_failures_total",
Help: "Total number of streams that could not be duplicated.",
Expand All @@ -68,21 +88,54 @@ func NewDataObjTee(
Name: "loki_distributor_dataobj_tee_duplicate_streams_total",
Help: "Total number of streams duplicated.",
}),
produced: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "loki_distributor_dataobj_tee_produced_bytes_total",
Help: "Total number of bytes produced to each partition.",
}, []string{"tenant", "partition", "segmentation_key"}),
}, nil
}

// A SegmentedStream is a KeyedStream with a segmentation key.
type SegmentedStream struct {
KeyedStream
SegmentationKey SegmentationKey
}

// Duplicate implements the [Tee] interface.
func (t *DataObjTee) Duplicate(_ context.Context, tenant string, streams []KeyedStream) {
for _, s := range streams {
go t.duplicate(tenant, s)
func (t *DataObjTee) Duplicate(ctx context.Context, tenant string, streams []KeyedStream) {
segmentationKeyStreams := make([]SegmentedStream, 0, len(streams))
for _, stream := range streams {
segmentationKey, err := GetSegmentationKey(stream)
if err != nil {
level.Error(t.logger).Log("msg", "failed to get segmentation key", "err", err)
t.failures.Inc()
return
}
segmentationKeyStreams = append(segmentationKeyStreams, SegmentedStream{
KeyedStream: stream,
SegmentationKey: segmentationKey,
})
}
rates, err := t.limitsClient.UpdateRates(ctx, tenant, segmentationKeyStreams)
if err != nil {
level.Error(t.logger).Log("msg", "failed to update rates", "err", err)
}
// fastRates is a temporary lookup table that lets us find the rate
// for a segmentation key in constant time.
fastRates := make(map[uint64]uint64, len(rates))
for _, rate := range rates {
fastRates[rate.StreamHash] = rate.Rate
}
for _, s := range segmentationKeyStreams {
go t.duplicate(ctx, tenant, s, fastRates[s.SegmentationKey.Sum64()])
}
}

func (t *DataObjTee) duplicate(tenant string, stream KeyedStream) {
func (t *DataObjTee) duplicate(ctx context.Context, tenant string, stream SegmentedStream, rateBytes uint64) {
t.total.Inc()
partition, err := t.ringReader.PartitionRing().ActivePartitionForKey(stream.HashKey)
partition, err := t.resolver.Resolve(ctx, stream.SegmentationKey, rateBytes)
if err != nil {
level.Error(t.logger).Log("msg", "failed to get partition", "err", err)
level.Error(t.logger).Log("msg", "failed to resolve partition", "err", err)
t.failures.Inc()
return
}
Expand All @@ -92,9 +145,16 @@ func (t *DataObjTee) duplicate(tenant string, stream KeyedStream) {
t.failures.Inc()
return
}
results := t.client.ProduceSync(context.TODO(), records...)
results := t.kafkaClient.ProduceSync(ctx, records...)
if err := results.FirstErr(); err != nil {
level.Error(t.logger).Log("msg", "failed to produce records", "err", err)
t.failures.Inc()
}
if t.cfg.DebugMetricsEnabled {
t.produced.WithLabelValues(
tenant,
strconv.FormatInt(int64(partition), 10),
string(stream.SegmentationKey),
).Add(float64(stream.Stream.Size()))
}
}
13 changes: 11 additions & 2 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ func New(
return nil, fmt.Errorf("partition ring is required for kafka writes")
}

ingestLimits := newIngestLimits(limitsFrontendClient, registerer)

var kafkaWriter KafkaProducer
if cfg.KafkaEnabled {
kafkaClient, err := kafka_client.NewWriterClient("distributor", cfg.KafkaConfig, 20, logger, registerer)
Expand All @@ -295,10 +297,17 @@ func New(
)

if cfg.DataObjTeeConfig.Enabled {
resolver := NewSegmentationPartitionResolver(
uint64(cfg.DataObjTeeConfig.PerPartitionRateBytes),
dataObjConsumerPartitionRing,
registerer,
logger,
)
dataObjTee, err := NewDataObjTee(
&cfg.DataObjTeeConfig,
resolver,
ingestLimits,
kafkaClient,
dataObjConsumerPartitionRing,
logger,
registerer,
)
Expand Down Expand Up @@ -379,7 +388,7 @@ func New(
writeFailuresManager: writefailures.NewManager(logger, registerer, cfg.WriteFailuresLogging, configs, "distributor"),
kafkaWriter: kafkaWriter,
partitionRing: partitionRing,
ingestLimits: newIngestLimits(limitsFrontendClient, registerer),
ingestLimits: ingestLimits,
numMetadataPartitions: numMetadataPartitions,
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/distributor/ingest_limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func newExceedsLimitsRequest(tenant string, streams []KeyedStream) (*proto.Excee
// UpdateRates updates the rates for the streams and returns a slice of the
// updated rates for all streams. Any streams that could not have rates updated
// have a rate of zero.
func (l *ingestLimits) UpdateRates(ctx context.Context, tenant string, streams []KeyedStream) ([]*proto.UpdateRatesResult, error) {
func (l *ingestLimits) UpdateRates(ctx context.Context, tenant string, streams []SegmentedStream) ([]*proto.UpdateRatesResult, error) {
l.requests.WithLabelValues("UpdateRates").Inc()
req, err := newUpdateRatesRequest(tenant, streams)
if err != nil {
Expand All @@ -209,7 +209,7 @@ func (l *ingestLimits) UpdateRates(ctx context.Context, tenant string, streams [
return resp.Results, nil
}

func newUpdateRatesRequest(tenant string, streams []KeyedStream) (*proto.UpdateRatesRequest, error) {
func newUpdateRatesRequest(tenant string, streams []SegmentedStream) (*proto.UpdateRatesRequest, error) {
// The distributor sends the hashes of all streams in the request to the
// limits-frontend. The limits-frontend is responsible for deciding if
// the request would exceed the tenants limits, and if so, which streams
Expand All @@ -218,7 +218,7 @@ func newUpdateRatesRequest(tenant string, streams []KeyedStream) (*proto.UpdateR
for _, stream := range streams {
entriesSize, structuredMetadataSize := calculateStreamSizes(stream.Stream)
streamMetadata = append(streamMetadata, &proto.StreamMetadata{
StreamHash: stream.HashKeyNoShard,
StreamHash: stream.SegmentationKey.Sum64(),
TotalSize: entriesSize + structuredMetadataSize,
IngestionPolicy: stream.Policy,
})
Expand Down
22 changes: 8 additions & 14 deletions pkg/distributor/ingest_limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func TestIngestLimits_UpdateRates(t *testing.T) {
tests := []struct {
name string
tenant string
streams []KeyedStream
streams []SegmentedStream
expectedRequest *proto.UpdateRatesRequest
response *proto.UpdateRatesResponse
responseErr error
Expand All @@ -311,37 +311,31 @@ func TestIngestLimits_UpdateRates(t *testing.T) {
}{{
name: "error should be returned if rates cannot be updated",
tenant: "test",
streams: []KeyedStream{{
HashKeyNoShard: 1,
streams: []SegmentedStream{{
SegmentationKey: "test",
}},
expectedRequest: &proto.UpdateRatesRequest{
Tenant: "test",
Streams: []*proto.StreamMetadata{{
StreamHash: 1,
}},
},
responseErr: errors.New("failed to update rates"),
expectedErr: "failed to update rates",
}, {
name: "updates rates",
tenant: "test",
streams: []KeyedStream{{
HashKeyNoShard: 1,
streams: []SegmentedStream{{
SegmentationKey: "test",
}},
expectedRequest: &proto.UpdateRatesRequest{
Tenant: "test",
Streams: []*proto.StreamMetadata{{
StreamHash: 1,
StreamHash: 0xb5fb79e24c92922f,
}},
},
response: &proto.UpdateRatesResponse{
Results: []*proto.UpdateRatesResult{{
StreamHash: 1,
StreamHash: 0xb5fb79e24c92922f,
Rate: 1024,
}},
},
expectedResult: []*proto.UpdateRatesResult{{
StreamHash: 1,
StreamHash: 0xb5fb79e24c92922f,
Rate: 1024,
}},
}}
Expand Down
5 changes: 1 addition & 4 deletions pkg/limits/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,7 @@ func (f *Frontend) ExceedsLimits(ctx context.Context, req *proto.ExceedsLimitsRe
return &proto.ExceedsLimitsResponse{Results: results}, nil
}

func (f *Frontend) UpdateRates(
ctx context.Context,
req *proto.UpdateRatesRequest,
) (*proto.UpdateRatesResponse, error) {
func (f *Frontend) UpdateRates(ctx context.Context, req *proto.UpdateRatesRequest) (*proto.UpdateRatesResponse, error) {
results := make([]*proto.UpdateRatesResult, 0, len(req.Streams))
resps, err := f.limitsClient.UpdateRates(ctx, req)
if err != nil {
Expand Down
17 changes: 3 additions & 14 deletions pkg/limits/frontend/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,7 @@ func newRingLimitsClient(
}

// ExceedsLimits implements the [exceedsLimitsGatherer] interface.
func (r *ringLimitsClient) ExceedsLimits(
ctx context.Context,
req *proto.ExceedsLimitsRequest,
) ([]*proto.ExceedsLimitsResponse, error) {
func (r *ringLimitsClient) ExceedsLimits(ctx context.Context, req *proto.ExceedsLimitsRequest) ([]*proto.ExceedsLimitsResponse, error) {
if len(req.Streams) == 0 {
return nil, nil
}
Expand All @@ -101,10 +98,7 @@ func (r *ringLimitsClient) ExceedsLimits(
}

// UpdateRates implements the [exceedsLimitsGatherer] interface.
func (r *ringLimitsClient) UpdateRates(
ctx context.Context,
req *proto.UpdateRatesRequest,
) ([]*proto.UpdateRatesResponse, error) {
func (r *ringLimitsClient) UpdateRates(ctx context.Context, req *proto.UpdateRatesRequest) ([]*proto.UpdateRatesResponse, error) {
if len(req.Streams) == 0 {
return nil, nil
}
Expand Down Expand Up @@ -238,12 +232,7 @@ type doRPCsFunc func(

// exhaustAllZones queries all zones, one at a time, until either all streams
// have been answered or all zones have been exhausted.
func (r *ringLimitsClient) exhaustAllZones(
ctx context.Context,
tenant string,
streams []*proto.StreamMetadata,
doRPCs doRPCsFunc,
) ([]*proto.StreamMetadata, error) {
func (r *ringLimitsClient) exhaustAllZones(ctx context.Context, tenant string, streams []*proto.StreamMetadata, doRPCs doRPCsFunc) ([]*proto.StreamMetadata, error) {
zonesIter, err := r.allZones(ctx)
if err != nil {
return nil, err
Expand Down
8 changes: 4 additions & 4 deletions pkg/limits/frontend/ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/grafana/loki/v3/pkg/limits/proto"
)

func TestRingGatherer_ExceedsLimits(t *testing.T) {
func TestRingLimitsClient_ExceedsLimits(t *testing.T) {
tests := []struct {
name string
request *proto.ExceedsLimitsRequest
Expand Down Expand Up @@ -442,7 +442,7 @@ func TestRingGatherer_ExceedsLimits(t *testing.T) {
}
}

func TestRingStreamUsageGatherer_GetZoneAwarePartitionConsumers(t *testing.T) {
func TestRingLimitsClient_GetZoneAwarePartitionConsumers(t *testing.T) {
tests := []struct {
name string
instances []ring.InstanceDesc
Expand Down Expand Up @@ -603,7 +603,7 @@ func TestRingStreamUsageGatherer_GetZoneAwarePartitionConsumers(t *testing.T) {
}
}

func TestRingStreamUsageGatherer_GetPartitionConsumers(t *testing.T) {
func TestRingLimitsClient_GetPartitionConsumers(t *testing.T) {
tests := []struct {
name string
// Instances contains the complete set of instances that should be mocked.
Expand Down Expand Up @@ -740,7 +740,7 @@ func TestRingStreamUsageGatherer_GetPartitionConsumers(t *testing.T) {
}
}

func TestRingStreamUsageGatherer_GetPartitionConsumers_Caching(t *testing.T) {
func TestRingLimitsClient_GetPartitionConsumers_Caching(t *testing.T) {
// Set up the mock clients.
req0 := proto.GetAssignedPartitionsResponse{
AssignedPartitions: map[int32]int64{
Expand Down
Loading