diff --git a/pkg/cache/index/client.go b/pkg/cache/index/client.go index b254c9f7d..5dbca92db 100644 --- a/pkg/cache/index/client.go +++ b/pkg/cache/index/client.go @@ -62,6 +62,7 @@ func NewIndexedClient( Client: client, name: cacheName, cacheProvider: cacheProvider, + indexName: "index_" + cacheName, cancel: cancel, forceFlush: make(chan bool), hasFlushed: make(chan bool, 1), @@ -127,6 +128,7 @@ type IndexedClient struct { // internal index configuration name string `msg:"-"` cacheProvider string `msg:"-"` + indexName string `msg:"-"` options atomic.Value `msg:"-"` ico IndexedClientOptions `msg:"-"` lastWrite atomicx.Time `msg:"-"` @@ -181,7 +183,7 @@ func (idx *IndexedClient) updateIndex(cacheKey string, size int64, la, lw, e tim cacheSize = atomic.AddInt64(&idx.CacheSize, obj.Size) count = atomic.AddInt64(&idx.ObjectCount, 1) } - metrics.ObserveCacheSizeChange(idx.name, idx.cacheProvider, cacheSize, count) + metrics.ObserveCacheSizeChange(idx.indexName, idx.cacheProvider, cacheSize, count) idx.lastWrite.Store(time.Now()) idx.Objects.Store(cacheKey, obj) } @@ -194,6 +196,7 @@ func (idx *IndexedClient) StoreReference(cacheKey string, data cache.ReferenceOb if !ok { return ErrInvalidCacheBackend } + start := time.Now() if err := mc.StoreReference(cacheKey, data, ttl); err != nil { return err } @@ -203,6 +206,7 @@ func (idx *IndexedClient) StoreReference(cacheKey string, data cache.ReferenceOb expiry = now.Add(ttl) } idx.updateIndex(cacheKey, int64(data.Size()), now, now, expiry) + metrics.ObserveCacheOperation(idx.indexName, idx.cacheProvider, "setDirect", "none", float64(data.Size()), time.Since(start)) return nil } @@ -224,11 +228,13 @@ func (idx *IndexedClient) Store(cacheKey string, byteData []byte, ttl time.Durat expiry = now.Add(ttl) obj.Expiration.Store(expiry) } + start := time.Now() // store the object in the cache if err := idx.Client.Store(cacheKey, obj.ToBytes(), ttl); err != nil { return err } idx.updateIndex(cacheKey, obj.Size, now, now, expiry) + metrics.ObserveCacheOperation(idx.indexName, idx.cacheProvider, "set", "none", float64(len(byteData)), time.Since(start)) return nil } @@ -250,8 +256,15 @@ func (idx *IndexedClient) RetrieveReference(cacheKey string) (any, status.Lookup if !ok { return nil, status.LookupStatusError, ErrInvalidCacheBackend } - idx.updateAccessTime(cacheKey) - return mc.RetrieveReference(cacheKey) + start := time.Now() + v, s, err := mc.RetrieveReference(cacheKey) + if err == nil && s == status.LookupStatusHit { + idx.updateAccessTime(cacheKey) + } + if ro, ok := v.(cache.ReferenceObject); ok { + metrics.ObserveCacheOperation(idx.indexName, idx.cacheProvider, "getDirect", s.String(), float64(ro.Size()), time.Since(start)) + } + return v, s, err } // Retrieve implements the cache.Client interface, looking up the object and updating the index last access time @@ -259,14 +272,28 @@ func (idx *IndexedClient) Retrieve(cacheKey string) ([]byte, status.LookupStatus if cacheKey == IndexKey { return nil, status.LookupStatusError, ErrIndexInvalidCacheKey } - data, s, err := idx.Client.Retrieve(cacheKey) + start := time.Now() + var ( + data []byte + s = status.LookupStatusHit + err error + o *Object + ) + defer func() { + size := float64(0) + if o != nil { + size = float64(len(o.Value)) + } + metrics.ObserveCacheOperation(idx.indexName, idx.cacheProvider, "get", s.String(), size, time.Since(start)) + }() + data, s, err = idx.Client.Retrieve(cacheKey) if err != nil { return nil, s, err } if s != status.LookupStatusHit { return nil, s, err } - o, err := ObjectFromBytes(data) + o, err = ObjectFromBytes(data) if err != nil { return nil, status.LookupStatusError, err } @@ -276,19 +303,23 @@ func (idx *IndexedClient) Retrieve(cacheKey string) ([]byte, status.LookupStatus // Remove implements the cache.Client interface and removes the object from the cache and index func (idx *IndexedClient) Remove(cacheKeys ...string) error { + start := time.Now() + var totalBytes float64 // remove the objects from the index for _, key := range cacheKeys { if o, ok := idx.Objects.Load(key); ok { obj := o.(*Object) + totalBytes += float64(obj.Size) size := atomic.AddInt64(&idx.CacheSize, -obj.Size) count := atomic.AddInt64(&idx.ObjectCount, -1) - metrics.ObserveCacheOperation(idx.name, idx.cacheProvider, "del", "none", float64(obj.Size)) idx.Objects.Delete(key) - metrics.ObserveCacheSizeChange(idx.name, idx.cacheProvider, size, count) + metrics.ObserveCacheSizeChange(idx.indexName, idx.cacheProvider, size, count) } } idx.lastWrite.Store(time.Now()) - return idx.Client.Remove(cacheKeys...) + err := idx.Client.Remove(cacheKeys...) + metrics.ObserveCacheDel(idx.indexName, idx.cacheProvider, totalBytes, time.Since(start)) + return err } // Stop the indexed cache, flush its state, and close the underlying cache @@ -398,7 +429,7 @@ func (idx *IndexedClient) reap() { }) if len(removals) > 0 { - metrics.ObserveCacheEvent(idx.name, idx.cacheProvider, "eviction", "ttl") + metrics.ObserveCacheEvent(idx.indexName, idx.cacheProvider, "eviction", "ttl") if err := idx.Remove(removals...); err != nil { logger.Error("reap remove error", logging.Pairs{"cacheName": idx.name, "error": err}) } @@ -410,7 +441,7 @@ func (idx *IndexedClient) reap() { evictionType, removals := reap(cacheSize, objectCount, remainders, *opts) if len(removals) > 0 { - metrics.ObserveCacheEvent(idx.name, idx.cacheProvider, "eviction", evictionType) + metrics.ObserveCacheEvent(idx.indexName, idx.cacheProvider, "eviction", evictionType) if err := idx.Remove(removals...); err != nil { logger.Error("reap remove error", logging.Pairs{"cacheName": idx.name, "error": err}) } diff --git a/pkg/cache/manager/manager.go b/pkg/cache/manager/manager.go index 79b77152d..f497c3a5b 100644 --- a/pkg/cache/manager/manager.go +++ b/pkg/cache/manager/manager.go @@ -57,48 +57,54 @@ type Manager struct { } func (cm *Manager) StoreReference(cacheKey string, data cache.ReferenceObject, ttl time.Duration) error { - metrics.ObserveCacheOperation(cm.config.Name, cm.config.Provider, "setDirect", "none", float64(data.Size())) logger.Debug("cache store", logging.Pairs{"key": cacheKey, "provider": cm.config.Provider}) - return cm.Client.(cache.MemoryCache).StoreReference(cacheKey, data, ttl) + start := time.Now() + err := cm.Client.(cache.MemoryCache).StoreReference(cacheKey, data, ttl) + metrics.ObserveCacheOperation(cm.config.Name, cm.config.Provider, "setDirect", "none", float64(data.Size()), time.Since(start)) + return err } func (cm *Manager) Store(cacheKey string, byteData []byte, ttl time.Duration) error { - metrics.ObserveCacheOperation(cm.config.Name, cm.config.Provider, "set", "none", float64(len(byteData))) logger.Debug("cache store", logging.Pairs{"key": cacheKey, "provider": cm.config.Provider}) - return cm.Client.Store(cacheKey, byteData, ttl) + start := time.Now() + err := cm.Client.Store(cacheKey, byteData, ttl) + metrics.ObserveCacheOperation(cm.config.Name, cm.config.Provider, "set", "none", float64(len(byteData)), time.Since(start)) + return err } -func (cm *Manager) observeRetrieval(cacheKey string, size int, s status.LookupStatus, err error) { +func (cm *Manager) observeRetrieval(cacheKey string, size int, s status.LookupStatus, err error, elapsed time.Duration) { switch { case errors.Is(err, cache.ErrKNF) || s == status.LookupStatusKeyMiss: logger.Debug("cache miss", logging.Pairs{"key": cacheKey, "provider": cm.config.Provider}) - metrics.ObserveCacheMiss(cm.config.Name, cm.config.Provider) - case err != nil: + metrics.ObserveCacheMiss(cm.config.Name, cm.config.Provider, elapsed) + case status.IsSuccessful(s): + logger.Debug("cache retrieve", logging.Pairs{"key": cacheKey, "provider": cm.config.Provider}) + metrics.ObserveCacheOperation(cm.config.Name, cm.config.Provider, "get", "hit", float64(size), elapsed) + default: logger.Debug("cache retrieve failed", logging.Pairs{"key": cacheKey, "provider": cm.config.Provider}) metrics.ObserveCacheEvent(cm.config.Name, cm.config.Provider, "error", "failed to retrieve cache entry") - case s == status.LookupStatusHit: - logger.Debug("cache retrieve", logging.Pairs{"key": cacheKey, "provider": cm.config.Provider}) - metrics.ObserveCacheOperation(cm.config.Name, cm.config.Provider, "get", "hit", float64(size)) } } func (cm *Manager) RetrieveReference(cacheKey string) (any, status.LookupStatus, error) { + start := time.Now() v, s, err := cm.Client.(cache.MemoryCache).RetrieveReference(cacheKey) + elapsed := time.Since(start) if ro, ok := v.(cache.ReferenceObject); ok { - cm.observeRetrieval(cacheKey, ro.Size(), s, err) + cm.observeRetrieval(cacheKey, ro.Size(), s, err, elapsed) } return v, s, err } type retrieveResult struct { - Data any + Data []byte Status status.LookupStatus } func (cm *Manager) Retrieve(cacheKey string) ([]byte, status.LookupStatus, error) { + start := time.Now() val, err, shared := cm.sf.Do(cacheKey, func() (any, error) { b, s, err := cm.Client.Retrieve(cacheKey) - cm.observeRetrieval(cacheKey, len(b), s, err) return &retrieveResult{ Data: b, Status: s, @@ -113,13 +119,17 @@ func (cm *Manager) Retrieve(cacheKey string) ([]byte, status.LookupStatus, error s = status.LookupStatusProxyError } } - return rr.Data.([]byte), s, err + cm.observeRetrieval(cacheKey, len(rr.Data), s, err, time.Since(start)) + return rr.Data, s, err } func (cm *Manager) Remove(cacheKeys ...string) error { - metrics.ObserveCacheDel(cm.config.Name, cm.config.Provider, float64(len(cacheKeys)-1)) logger.Debug("cache remove", logging.Pairs{"keys": cacheKeys, "provider": cm.config.Provider}) - return cm.Client.Remove(cacheKeys...) + start := time.Now() + err := cm.Client.Remove(cacheKeys...) + // key count, not bytes: the manager doesn't track object sizes; the index layer reports byte counts + metrics.ObserveCacheDel(cm.config.Name, cm.config.Provider, float64(len(cacheKeys)), time.Since(start)) + return err } func (cm *Manager) Connect() error { diff --git a/pkg/cache/metrics/metrics.go b/pkg/cache/metrics/metrics.go index bb8e865a3..6cda26c4b 100644 --- a/pkg/cache/metrics/metrics.go +++ b/pkg/cache/metrics/metrics.go @@ -18,18 +18,19 @@ package metrics import ( "fmt" + "time" "github.com/trickstercache/trickster/v2/pkg/observability/metrics" ) // ObserveCacheMiss records a Cache Miss event -func ObserveCacheMiss(cacheName, cacheProvider string) { - ObserveCacheOperation(cacheName, cacheProvider, "get", "miss", 0) +func ObserveCacheMiss(cacheName, cacheProvider string, elapsed time.Duration) { + ObserveCacheOperation(cacheName, cacheProvider, "get", "miss", 0, elapsed) } // ObserveCacheDel records a cache deletion event -func ObserveCacheDel(cache, cacheProvider string, count float64) { - ObserveCacheOperation(cache, cacheProvider, "del", "none", count) +func ObserveCacheDel(cache, cacheProvider string, count float64, elapsed time.Duration) { + ObserveCacheOperation(cache, cacheProvider, "del", "none", count, elapsed) } // CacheError returns an empty cache object and the formatted error @@ -38,9 +39,9 @@ func CacheError(cacheKey, cacheName, cacheProvider string, msg string) ([]byte, return nil, fmt.Errorf(msg, cacheKey) } -// ObserveCacheOperation increments counters as cache operations occur -func ObserveCacheOperation(cache, cacheProvider, operation, status string, bytes float64) { - metrics.CacheObjectOperations.WithLabelValues(cache, cacheProvider, operation, status).Inc() +// ObserveCacheOperation records cache operations with timing and byte counts +func ObserveCacheOperation(cache, cacheProvider, operation, status string, bytes float64, elapsed time.Duration) { + metrics.CacheObjectOperationDuration.WithLabelValues(cache, cacheProvider, operation, status).Observe(elapsed.Seconds()) if bytes > 0 { metrics.CacheByteOperations.WithLabelValues(cache, cacheProvider, operation, status).Add(bytes) } diff --git a/pkg/cache/metrics/metrics_test.go b/pkg/cache/metrics/metrics_test.go index 4bc4a4e23..7ecbaf1e0 100644 --- a/pkg/cache/metrics/metrics_test.go +++ b/pkg/cache/metrics/metrics_test.go @@ -18,6 +18,7 @@ package metrics import ( "testing" + "time" ) var testCacheKey, testCacheName, testCacheProvider string @@ -29,12 +30,12 @@ func init() { } func TestObserveCacheMiss(t *testing.T) { - ObserveCacheMiss(testCacheName, testCacheProvider) + ObserveCacheMiss(testCacheName, testCacheProvider, time.Millisecond) } // ObserveCacheDel records a cache deletion event func TestObserveCacheDel(t *testing.T) { - ObserveCacheDel(testCacheName, testCacheProvider, 0) + ObserveCacheDel(testCacheName, testCacheProvider, 0, time.Millisecond) } func TestCacheError(t *testing.T) { @@ -45,8 +46,8 @@ func TestCacheError(t *testing.T) { } func TestObserveCacheOperation(t *testing.T) { - ObserveCacheOperation(testCacheName, testCacheProvider, "set", "ok", 0) - ObserveCacheOperation(testCacheName, testCacheProvider, "set", "ok", 1) + ObserveCacheOperation(testCacheName, testCacheProvider, "set", "ok", 0, time.Millisecond) + ObserveCacheOperation(testCacheName, testCacheProvider, "set", "ok", 1, time.Millisecond) } func TestObserveCacheEvent(t *testing.T) { diff --git a/pkg/observability/metrics/metrics.go b/pkg/observability/metrics/metrics.go index 043d77744..cb3b3bc03 100644 --- a/pkg/observability/metrics/metrics.go +++ b/pkg/observability/metrics/metrics.go @@ -22,6 +22,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" + dto "github.com/prometheus/client_model/go" "github.com/trickstercache/trickster/v2/pkg/backends/providers" ) @@ -181,13 +182,13 @@ var ( []string{"backend_name", "provider", "method", "status", "http_status", "path"}, ) - // CacheObjectOperations is a Counter of operations (in # of objects) performed on a Trickster cache - CacheObjectOperations = prometheus.NewCounterVec( - prometheus.CounterOpts{ + CacheObjectOperationDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ Namespace: metricNamespace, Subsystem: cacheSubsystem, - Name: "operation_objects_total", - Help: "Count (in # of objects) of operations performed on a Trickster cache.", + Name: "operation_duration_seconds", + Help: "Time required in seconds to perform an operation on a Trickster cache.", + Buckets: defaultBuckets, }, []string{"cache_name", "provider", "operation", "status"}, ) @@ -319,6 +320,45 @@ var ( ) ) +type histogramCounterRenamed struct { + desc *prometheus.Desc + histogram *prometheus.HistogramVec +} + +func (hcr *histogramCounterRenamed) Describe(ch chan<- *prometheus.Desc) { + ch <- hcr.desc +} + +func (hcr *histogramCounterRenamed) Collect(ch chan<- prometheus.Metric) { + histCh := make(chan prometheus.Metric, 128) + hcr.histogram.Collect(histCh) + close(histCh) + for m := range histCh { + var dm dto.Metric + if err := m.Write(&dm); err != nil { + continue + } + h := dm.GetHistogram() + if h == nil { + continue + } + labelValues := make([]string, 0, len(dm.GetLabel())) + for _, lp := range dm.GetLabel() { + labelValues = append(labelValues, lp.GetValue()) + } + cm, err := prometheus.NewConstMetric( + hcr.desc, + prometheus.CounterValue, + float64(h.GetSampleCount()), + labelValues..., + ) + if err != nil { + continue + } + ch <- cm + } +} + func init() { // Register Metrics prometheus.MustRegister(FrontendRequestStatus) @@ -333,7 +373,16 @@ func init() { prometheus.MustRegister(ProxyConnectionAccepted) prometheus.MustRegister(ProxyConnectionClosed) prometheus.MustRegister(ProxyConnectionFailed) - prometheus.MustRegister(CacheObjectOperations) + prometheus.MustRegister(CacheObjectOperationDuration) + prometheus.MustRegister(&histogramCounterRenamed{ + desc: prometheus.NewDesc( + "trickster_cache_operation_objects_total", + "Count (in # of objects) of operations performed on a Trickster cache.", + []string{"cache_name", "operation", "provider", "status"}, + nil, + ), + histogram: CacheObjectOperationDuration, + }) prometheus.MustRegister(CacheByteOperations) prometheus.MustRegister(CacheEvents) prometheus.MustRegister(CacheObjects)