44 "context"
55 "errors"
66 "flag"
7+ "strconv"
78
89 "github.com/go-kit/log"
910 "github.com/go-kit/log/level"
@@ -19,13 +20,15 @@ type DataObjTeeConfig struct {
1920 Topic string `yaml:"topic"`
2021 MaxBufferedBytes int `yaml:"max_buffered_bytes"`
2122 PerPartitionRateBytes int `yaml:"per_partition_rate_bytes"`
23+ DebugMetricsEnabled bool `yaml:"debug_metrics_enabled"`
2224}
2325
2426func (c * DataObjTeeConfig ) RegisterFlags (f * flag.FlagSet ) {
2527 f .BoolVar (& c .Enabled , "distributor.dataobj-tee.enabled" , false , "Enable data object tee." )
2628 f .StringVar (& c .Topic , "distributor.dataobj-tee.topic" , "" , "Topic for data object tee." )
2729 f .IntVar (& c .MaxBufferedBytes , "distributor.dataobj-tee.max-buffered-bytes" , 100 << 20 , "Maximum number of bytes to buffer." )
2830 f .IntVar (& c .PerPartitionRateBytes , "distributor.dataobj-tee.per-partition-rate-bytes" , 1024 * 1024 , "The per-tenant partition rate (bytes/sec)." )
31+ f .BoolVar (& c .DebugMetricsEnabled , "distributor.dataobj-tee.debug-metrics-enabled" , false , "Enables optional debug metrics." )
2932}
3033
3134func (c * DataObjTeeConfig ) Validate () error {
@@ -56,6 +59,7 @@ type DataObjTee struct {
5659 // Metrics.
5760 failures prometheus.Counter
5861 total prometheus.Counter
62+ produced * prometheus.CounterVec
5963}
6064
6165// NewDataObjTee returns a new DataObjTee.
@@ -81,6 +85,10 @@ func NewDataObjTee(
8185 Name : "loki_distributor_dataobj_tee_duplicate_streams_total" ,
8286 Help : "Total number of streams duplicated." ,
8387 }),
88+ produced : promauto .With (reg ).NewCounterVec (prometheus.CounterOpts {
89+ Name : "loki_distributor_dataobj_tee_produced_bytes_total" ,
90+ Help : "Total number of bytes produced to each partition." ,
91+ }, []string {"tenant" , "partition" , "segmentation_key" }),
8492 }, nil
8593}
8694
@@ -139,4 +147,11 @@ func (t *DataObjTee) duplicate(ctx context.Context, tenant string, stream Segmen
139147 level .Error (t .logger ).Log ("msg" , "failed to produce records" , "err" , err )
140148 t .failures .Inc ()
141149 }
150+ if t .cfg .DebugMetricsEnabled {
151+ t .produced .WithLabelValues (
152+ tenant ,
153+ strconv .FormatInt (int64 (partition ), 10 ),
154+ string (stream .SegmentationKey ),
155+ ).Add (float64 (stream .Stream .Size ()))
156+ }
142157}
0 commit comments