Skip to content

Commit 9b187d6

Browse files
chore: add debug metrics
1 parent 4efba63 commit 9b187d6

File tree

4 files changed

+32
-27
lines changed

4 files changed

+32
-27
lines changed

docs/sources/shared/configuration.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3188,6 +3188,14 @@ dataobj_tee:
31883188
# Maximum number of bytes to buffer.
31893189
# CLI flag: -distributor.dataobj-tee.max-buffered-bytes
31903190
[max_buffered_bytes: <int> | default = 104857600]
3191+
3192+
# The per-tenant partition rate (bytes/sec).
3193+
# CLI flag: -distributor.dataobj-tee.per-partition-rate-bytes
3194+
[per_partition_rate_bytes: <int> | default = 1048576]
3195+
3196+
# Enables optional debug metrics.
3197+
# CLI flag: -distributor.dataobj-tee.debug-metrics-enabled
3198+
[debug_metrics_enabled: <boolean> | default = false]
31913199
```
31923200

31933201
### etcd

pkg/distributor/dataobj_tee.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"flag"
7+
"strconv"
78

89
"github.com/go-kit/log"
910
"github.com/go-kit/log/level"
@@ -19,13 +20,15 @@ type DataObjTeeConfig struct {
1920
Topic string `yaml:"topic"`
2021
MaxBufferedBytes int `yaml:"max_buffered_bytes"`
2122
PerPartitionRateBytes int `yaml:"per_partition_rate_bytes"`
23+
DebugMetricsEnabled bool `yaml:"debug_metrics_enabled"`
2224
}
2325

2426
func (c *DataObjTeeConfig) RegisterFlags(f *flag.FlagSet) {
2527
f.BoolVar(&c.Enabled, "distributor.dataobj-tee.enabled", false, "Enable data object tee.")
2628
f.StringVar(&c.Topic, "distributor.dataobj-tee.topic", "", "Topic for data object tee.")
2729
f.IntVar(&c.MaxBufferedBytes, "distributor.dataobj-tee.max-buffered-bytes", 100<<20, "Maximum number of bytes to buffer.")
2830
f.IntVar(&c.PerPartitionRateBytes, "distributor.dataobj-tee.per-partition-rate-bytes", 1024*1024, "The per-tenant partition rate (bytes/sec).")
31+
f.BoolVar(&c.DebugMetricsEnabled, "distributor.dataobj-tee.debug-metrics-enabled", false, "Enables optional debug metrics.")
2932
}
3033

3134
func (c *DataObjTeeConfig) Validate() error {
@@ -56,6 +59,10 @@ type DataObjTee struct {
5659
// Metrics.
5760
failures prometheus.Counter
5861
total prometheus.Counter
62+
63+
// High cardinality metrics which are only emitted when debug metrics
64+
// are enabled.
65+
produced *prometheus.CounterVec
5966
}
6067

6168
// NewDataObjTee returns a new DataObjTee.
@@ -81,6 +88,10 @@ func NewDataObjTee(
8188
Name: "loki_distributor_dataobj_tee_duplicate_streams_total",
8289
Help: "Total number of streams duplicated.",
8390
}),
91+
produced: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
92+
Name: "loki_distributor_dataobj_tee_produced_bytes_total",
93+
Help: "Total number of bytes produced to each partition.",
94+
}, []string{"tenant", "partition", "segmentation_key"}),
8495
}, nil
8596
}
8697

@@ -139,4 +150,11 @@ func (t *DataObjTee) duplicate(ctx context.Context, tenant string, stream Segmen
139150
level.Error(t.logger).Log("msg", "failed to produce records", "err", err)
140151
t.failures.Inc()
141152
}
153+
if t.cfg.DebugMetricsEnabled {
154+
t.produced.WithLabelValues(
155+
tenant,
156+
strconv.FormatInt(int64(partition), 10),
157+
string(stream.SegmentationKey),
158+
).Add(float64(stream.Stream.Size()))
159+
}
142160
}

pkg/distributor/ingest_limits_test.go

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -312,36 +312,30 @@ func TestIngestLimits_UpdateRates(t *testing.T) {
312312
name: "error should be returned if rates cannot be updated",
313313
tenant: "test",
314314
streams: []SegmentedStream{{
315-
HashKeyNoShard: 1,
315+
SegmentationKey: "test",
316316
}},
317-
expectedRequest: &proto.UpdateRatesRequest{
318-
Tenant: "test",
319-
Streams: []*proto.StreamMetadata{{
320-
StreamHash: 1,
321-
}},
322-
},
323317
responseErr: errors.New("failed to update rates"),
324318
expectedErr: "failed to update rates",
325319
}, {
326320
name: "updates rates",
327321
tenant: "test",
328322
streams: []SegmentedStream{{
329-
HashKeyNoShard: 1,
323+
SegmentationKey: "test",
330324
}},
331325
expectedRequest: &proto.UpdateRatesRequest{
332326
Tenant: "test",
333327
Streams: []*proto.StreamMetadata{{
334-
StreamHash: 1,
328+
StreamHash: 0xb5fb79e24c92922f,
335329
}},
336330
},
337331
response: &proto.UpdateRatesResponse{
338332
Results: []*proto.UpdateRatesResult{{
339-
StreamHash: 1,
333+
StreamHash: 0xb5fb79e24c92922f,
340334
Rate: 1024,
341335
}},
342336
},
343337
expectedResult: []*proto.UpdateRatesResult{{
344-
StreamHash: 1,
338+
StreamHash: 0xb5fb79e24c92922f,
345339
Rate: 1024,
346340
}},
347341
}}

pkg/distributor/segment.go

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,12 @@ import (
1818
// distribute load while preserving stream locality for tenants.
1919
type SegmentationKey string
2020

21-
<<<<<<< HEAD
2221
// Sum64 returns a 64 bit, non-cryptographic hash of the key.
23-
=======
24-
// Sum64 returns a 64 bit, non-crytogrpahic hash of the key.
25-
>>>>>>> 5ef092522a (feat: add segmentation keys and resolver)
2622
func (key SegmentationKey) Sum64() uint64 {
2723
h := fnv.New64a()
2824
// Use a reserved word here to avoid any possible hash conflicts with
29-
// streams.
30-
<<<<<<< HEAD
25+
// streams
3126
h.Write([]byte("__loki_segmentation_key__"))
32-
=======
33-
h.Write([]byte("__loki__segmentation_key__"))
34-
>>>>>>> 5ef092522a (feat: add segmentation keys and resolver)
3527
h.Write([]byte(key))
3628
return h.Sum64()
3729
}
@@ -52,19 +44,12 @@ func GetSegmentationKey(stream KeyedStream) (SegmentationKey, error) {
5244
type SegmentationPartitionResolver struct {
5345
perPartitionRateBytes uint64
5446
ringReader ring.PartitionRingReader
55-
<<<<<<< HEAD
5647
logger log.Logger
5748

5849
// Metrics.
5950
failed prometheus.Counter
6051
randomlySharded prometheus.Counter
6152
total prometheus.Counter
62-
=======
63-
fallback prometheus.Counter
64-
failed prometheus.Counter
65-
total prometheus.Counter
66-
logger log.Logger
67-
>>>>>>> 5ef092522a (feat: add segmentation keys and resolver)
6853
}
6954

7055
// NewSegmentationPartitionResolver returns a new SegmentationPartitionResolver.

0 commit comments

Comments
 (0)