Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 41 additions & 10 deletions pkg/cache/index/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func NewIndexedClient(
Client: client,
name: cacheName,
cacheProvider: cacheProvider,
indexName: "index_" + cacheName,
cancel: cancel,
forceFlush: make(chan bool),
hasFlushed: make(chan bool, 1),
Expand Down Expand Up @@ -127,6 +128,7 @@ type IndexedClient struct {
// internal index configuration
name string `msg:"-"`
cacheProvider string `msg:"-"`
indexName string `msg:"-"`
options atomic.Value `msg:"-"`
ico IndexedClientOptions `msg:"-"`
lastWrite atomicx.Time `msg:"-"`
Expand Down Expand Up @@ -181,7 +183,7 @@ func (idx *IndexedClient) updateIndex(cacheKey string, size int64, la, lw, e tim
cacheSize = atomic.AddInt64(&idx.CacheSize, obj.Size)
count = atomic.AddInt64(&idx.ObjectCount, 1)
}
metrics.ObserveCacheSizeChange(idx.name, idx.cacheProvider, cacheSize, count)
metrics.ObserveCacheSizeChange(idx.indexName, idx.cacheProvider, cacheSize, count)
idx.lastWrite.Store(time.Now())
idx.Objects.Store(cacheKey, obj)
}
Expand All @@ -194,6 +196,7 @@ func (idx *IndexedClient) StoreReference(cacheKey string, data cache.ReferenceOb
if !ok {
return ErrInvalidCacheBackend
}
start := time.Now()
if err := mc.StoreReference(cacheKey, data, ttl); err != nil {
return err
}
Expand All @@ -203,6 +206,7 @@ func (idx *IndexedClient) StoreReference(cacheKey string, data cache.ReferenceOb
expiry = now.Add(ttl)
}
idx.updateIndex(cacheKey, int64(data.Size()), now, now, expiry)
metrics.ObserveCacheOperation(idx.indexName, idx.cacheProvider, "setDirect", "none", float64(data.Size()), time.Since(start))
return nil
}

Expand All @@ -224,11 +228,13 @@ func (idx *IndexedClient) Store(cacheKey string, byteData []byte, ttl time.Durat
expiry = now.Add(ttl)
obj.Expiration.Store(expiry)
}
start := time.Now()
// store the object in the cache
if err := idx.Client.Store(cacheKey, obj.ToBytes(), ttl); err != nil {
return err
}
idx.updateIndex(cacheKey, obj.Size, now, now, expiry)
metrics.ObserveCacheOperation(idx.indexName, idx.cacheProvider, "set", "none", float64(len(byteData)), time.Since(start))
return nil
}

Expand All @@ -250,23 +256,44 @@ func (idx *IndexedClient) RetrieveReference(cacheKey string) (any, status.Lookup
if !ok {
return nil, status.LookupStatusError, ErrInvalidCacheBackend
}
idx.updateAccessTime(cacheKey)
return mc.RetrieveReference(cacheKey)
start := time.Now()
v, s, err := mc.RetrieveReference(cacheKey)
if err == nil && s == status.LookupStatusHit {
idx.updateAccessTime(cacheKey)
}
if ro, ok := v.(cache.ReferenceObject); ok {
metrics.ObserveCacheOperation(idx.indexName, idx.cacheProvider, "getDirect", s.String(), float64(ro.Size()), time.Since(start))
}
return v, s, err
}

// Retrieve implements the cache.Client interface, looking up the object and updating the index last access time
func (idx *IndexedClient) Retrieve(cacheKey string) ([]byte, status.LookupStatus, error) {
if cacheKey == IndexKey {
return nil, status.LookupStatusError, ErrIndexInvalidCacheKey
}
data, s, err := idx.Client.Retrieve(cacheKey)
start := time.Now()
var (
data []byte
s = status.LookupStatusHit
err error
o *Object
)
defer func() {
size := float64(0)
if o != nil {
size = float64(len(o.Value))
}
metrics.ObserveCacheOperation(idx.indexName, idx.cacheProvider, "get", s.String(), size, time.Since(start))
}()
data, s, err = idx.Client.Retrieve(cacheKey)
if err != nil {
return nil, s, err
}
if s != status.LookupStatusHit {
return nil, s, err
}
o, err := ObjectFromBytes(data)
o, err = ObjectFromBytes(data)
if err != nil {
return nil, status.LookupStatusError, err
}
Expand All @@ -276,19 +303,23 @@ func (idx *IndexedClient) Retrieve(cacheKey string) ([]byte, status.LookupStatus

// Remove implements the cache.Client interface and removes the object from the cache and index
func (idx *IndexedClient) Remove(cacheKeys ...string) error {
start := time.Now()
var totalBytes float64
// remove the objects from the index
for _, key := range cacheKeys {
if o, ok := idx.Objects.Load(key); ok {
obj := o.(*Object)
totalBytes += float64(obj.Size)
size := atomic.AddInt64(&idx.CacheSize, -obj.Size)
count := atomic.AddInt64(&idx.ObjectCount, -1)
metrics.ObserveCacheOperation(idx.name, idx.cacheProvider, "del", "none", float64(obj.Size))
idx.Objects.Delete(key)
metrics.ObserveCacheSizeChange(idx.name, idx.cacheProvider, size, count)
metrics.ObserveCacheSizeChange(idx.indexName, idx.cacheProvider, size, count)
}
}
idx.lastWrite.Store(time.Now())
return idx.Client.Remove(cacheKeys...)
err := idx.Client.Remove(cacheKeys...)
metrics.ObserveCacheDel(idx.indexName, idx.cacheProvider, totalBytes, time.Since(start))
return err
}

// Stop the indexed cache, flush its state, and close the underlying cache
Expand Down Expand Up @@ -398,7 +429,7 @@ func (idx *IndexedClient) reap() {
})

if len(removals) > 0 {
metrics.ObserveCacheEvent(idx.name, idx.cacheProvider, "eviction", "ttl")
metrics.ObserveCacheEvent(idx.indexName, idx.cacheProvider, "eviction", "ttl")
if err := idx.Remove(removals...); err != nil {
logger.Error("reap remove error", logging.Pairs{"cacheName": idx.name, "error": err})
}
Expand All @@ -410,7 +441,7 @@ func (idx *IndexedClient) reap() {

evictionType, removals := reap(cacheSize, objectCount, remainders, *opts)
if len(removals) > 0 {
metrics.ObserveCacheEvent(idx.name, idx.cacheProvider, "eviction", evictionType)
metrics.ObserveCacheEvent(idx.indexName, idx.cacheProvider, "eviction", evictionType)
if err := idx.Remove(removals...); err != nil {
logger.Error("reap remove error", logging.Pairs{"cacheName": idx.name, "error": err})
}
Expand Down
42 changes: 26 additions & 16 deletions pkg/cache/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,48 +57,54 @@ type Manager struct {
}

func (cm *Manager) StoreReference(cacheKey string, data cache.ReferenceObject, ttl time.Duration) error {
metrics.ObserveCacheOperation(cm.config.Name, cm.config.Provider, "setDirect", "none", float64(data.Size()))
logger.Debug("cache store", logging.Pairs{"key": cacheKey, "provider": cm.config.Provider})
return cm.Client.(cache.MemoryCache).StoreReference(cacheKey, data, ttl)
start := time.Now()
err := cm.Client.(cache.MemoryCache).StoreReference(cacheKey, data, ttl)
metrics.ObserveCacheOperation(cm.config.Name, cm.config.Provider, "setDirect", "none", float64(data.Size()), time.Since(start))
return err
}

func (cm *Manager) Store(cacheKey string, byteData []byte, ttl time.Duration) error {
metrics.ObserveCacheOperation(cm.config.Name, cm.config.Provider, "set", "none", float64(len(byteData)))
logger.Debug("cache store", logging.Pairs{"key": cacheKey, "provider": cm.config.Provider})
return cm.Client.Store(cacheKey, byteData, ttl)
start := time.Now()
err := cm.Client.Store(cacheKey, byteData, ttl)
metrics.ObserveCacheOperation(cm.config.Name, cm.config.Provider, "set", "none", float64(len(byteData)), time.Since(start))
return err
}

func (cm *Manager) observeRetrieval(cacheKey string, size int, s status.LookupStatus, err error) {
func (cm *Manager) observeRetrieval(cacheKey string, size int, s status.LookupStatus, err error, elapsed time.Duration) {
switch {
case errors.Is(err, cache.ErrKNF) || s == status.LookupStatusKeyMiss:
logger.Debug("cache miss", logging.Pairs{"key": cacheKey, "provider": cm.config.Provider})
metrics.ObserveCacheMiss(cm.config.Name, cm.config.Provider)
case err != nil:
metrics.ObserveCacheMiss(cm.config.Name, cm.config.Provider, elapsed)
case status.IsSuccessful(s):
logger.Debug("cache retrieve", logging.Pairs{"key": cacheKey, "provider": cm.config.Provider})
metrics.ObserveCacheOperation(cm.config.Name, cm.config.Provider, "get", "hit", float64(size), elapsed)
default:
logger.Debug("cache retrieve failed", logging.Pairs{"key": cacheKey, "provider": cm.config.Provider})
metrics.ObserveCacheEvent(cm.config.Name, cm.config.Provider, "error", "failed to retrieve cache entry")
case s == status.LookupStatusHit:
logger.Debug("cache retrieve", logging.Pairs{"key": cacheKey, "provider": cm.config.Provider})
metrics.ObserveCacheOperation(cm.config.Name, cm.config.Provider, "get", "hit", float64(size))
}
}

func (cm *Manager) RetrieveReference(cacheKey string) (any, status.LookupStatus, error) {
start := time.Now()
v, s, err := cm.Client.(cache.MemoryCache).RetrieveReference(cacheKey)
elapsed := time.Since(start)
if ro, ok := v.(cache.ReferenceObject); ok {
cm.observeRetrieval(cacheKey, ro.Size(), s, err)
cm.observeRetrieval(cacheKey, ro.Size(), s, err, elapsed)
}
return v, s, err
}

type retrieveResult struct {
Data any
Data []byte
Status status.LookupStatus
}

func (cm *Manager) Retrieve(cacheKey string) ([]byte, status.LookupStatus, error) {
start := time.Now()
val, err, shared := cm.sf.Do(cacheKey, func() (any, error) {
b, s, err := cm.Client.Retrieve(cacheKey)
cm.observeRetrieval(cacheKey, len(b), s, err)
return &retrieveResult{
Data: b,
Status: s,
Expand All @@ -113,13 +119,17 @@ func (cm *Manager) Retrieve(cacheKey string) ([]byte, status.LookupStatus, error
s = status.LookupStatusProxyError
}
}
return rr.Data.([]byte), s, err
cm.observeRetrieval(cacheKey, len(rr.Data), s, err, time.Since(start))
return rr.Data, s, err
}

func (cm *Manager) Remove(cacheKeys ...string) error {
metrics.ObserveCacheDel(cm.config.Name, cm.config.Provider, float64(len(cacheKeys)-1))
logger.Debug("cache remove", logging.Pairs{"keys": cacheKeys, "provider": cm.config.Provider})
return cm.Client.Remove(cacheKeys...)
start := time.Now()
err := cm.Client.Remove(cacheKeys...)
// key count, not bytes: the manager doesn't track object sizes; the index layer reports byte counts
metrics.ObserveCacheDel(cm.config.Name, cm.config.Provider, float64(len(cacheKeys)), time.Since(start))
return err
}

func (cm *Manager) Connect() error {
Expand Down
15 changes: 8 additions & 7 deletions pkg/cache/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,19 @@ package metrics

import (
"fmt"
"time"

"github.com/trickstercache/trickster/v2/pkg/observability/metrics"
)

// ObserveCacheMiss records a Cache Miss event
func ObserveCacheMiss(cacheName, cacheProvider string) {
ObserveCacheOperation(cacheName, cacheProvider, "get", "miss", 0)
func ObserveCacheMiss(cacheName, cacheProvider string, elapsed time.Duration) {
ObserveCacheOperation(cacheName, cacheProvider, "get", "miss", 0, elapsed)
}

// ObserveCacheDel records a cache deletion event
func ObserveCacheDel(cache, cacheProvider string, count float64) {
ObserveCacheOperation(cache, cacheProvider, "del", "none", count)
func ObserveCacheDel(cache, cacheProvider string, count float64, elapsed time.Duration) {
ObserveCacheOperation(cache, cacheProvider, "del", "none", count, elapsed)
}

// CacheError returns an empty cache object and the formatted error
Expand All @@ -38,9 +39,9 @@ func CacheError(cacheKey, cacheName, cacheProvider string, msg string) ([]byte,
return nil, fmt.Errorf(msg, cacheKey)
}

// ObserveCacheOperation increments counters as cache operations occur
func ObserveCacheOperation(cache, cacheProvider, operation, status string, bytes float64) {
metrics.CacheObjectOperations.WithLabelValues(cache, cacheProvider, operation, status).Inc()
// ObserveCacheOperation records cache operations with timing and byte counts
func ObserveCacheOperation(cache, cacheProvider, operation, status string, bytes float64, elapsed time.Duration) {
metrics.CacheObjectOperationDuration.WithLabelValues(cache, cacheProvider, operation, status).Observe(elapsed.Seconds())
if bytes > 0 {
metrics.CacheByteOperations.WithLabelValues(cache, cacheProvider, operation, status).Add(bytes)
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/cache/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package metrics

import (
"testing"
"time"
)

var testCacheKey, testCacheName, testCacheProvider string
Expand All @@ -29,12 +30,12 @@ func init() {
}

func TestObserveCacheMiss(t *testing.T) {
ObserveCacheMiss(testCacheName, testCacheProvider)
ObserveCacheMiss(testCacheName, testCacheProvider, time.Millisecond)
}

// ObserveCacheDel records a cache deletion event
func TestObserveCacheDel(t *testing.T) {
ObserveCacheDel(testCacheName, testCacheProvider, 0)
ObserveCacheDel(testCacheName, testCacheProvider, 0, time.Millisecond)
}

func TestCacheError(t *testing.T) {
Expand All @@ -45,8 +46,8 @@ func TestCacheError(t *testing.T) {
}

func TestObserveCacheOperation(t *testing.T) {
ObserveCacheOperation(testCacheName, testCacheProvider, "set", "ok", 0)
ObserveCacheOperation(testCacheName, testCacheProvider, "set", "ok", 1)
ObserveCacheOperation(testCacheName, testCacheProvider, "set", "ok", 0, time.Millisecond)
ObserveCacheOperation(testCacheName, testCacheProvider, "set", "ok", 1, time.Millisecond)
}

func TestObserveCacheEvent(t *testing.T) {
Expand Down
61 changes: 55 additions & 6 deletions pkg/observability/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
dto "github.com/prometheus/client_model/go"
"github.com/trickstercache/trickster/v2/pkg/backends/providers"
)

Expand Down Expand Up @@ -181,13 +182,13 @@ var (
[]string{"backend_name", "provider", "method", "status", "http_status", "path"},
)

// CacheObjectOperations is a Counter of operations (in # of objects) performed on a Trickster cache
CacheObjectOperations = prometheus.NewCounterVec(
prometheus.CounterOpts{
CacheObjectOperationDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: metricNamespace,
Subsystem: cacheSubsystem,
Name: "operation_objects_total",
Help: "Count (in # of objects) of operations performed on a Trickster cache.",
Name: "operation_duration_seconds",
Help: "Time required in seconds to perform an operation on a Trickster cache.",
Buckets: defaultBuckets,
},
[]string{"cache_name", "provider", "operation", "status"},
)
Expand Down Expand Up @@ -319,6 +320,45 @@ var (
)
)

type histogramCounterRenamed struct {
desc *prometheus.Desc
histogram *prometheus.HistogramVec
}

func (hcr *histogramCounterRenamed) Describe(ch chan<- *prometheus.Desc) {
ch <- hcr.desc
}

func (hcr *histogramCounterRenamed) Collect(ch chan<- prometheus.Metric) {
histCh := make(chan prometheus.Metric, 128)
hcr.histogram.Collect(histCh)
close(histCh)
for m := range histCh {
var dm dto.Metric
if err := m.Write(&dm); err != nil {
continue
}
h := dm.GetHistogram()
if h == nil {
continue
}
labelValues := make([]string, 0, len(dm.GetLabel()))
for _, lp := range dm.GetLabel() {
labelValues = append(labelValues, lp.GetValue())
}
cm, err := prometheus.NewConstMetric(
hcr.desc,
prometheus.CounterValue,
float64(h.GetSampleCount()),
labelValues...,
)
if err != nil {
continue
}
ch <- cm
}
}

func init() {
// Register Metrics
prometheus.MustRegister(FrontendRequestStatus)
Expand All @@ -333,7 +373,16 @@ func init() {
prometheus.MustRegister(ProxyConnectionAccepted)
prometheus.MustRegister(ProxyConnectionClosed)
prometheus.MustRegister(ProxyConnectionFailed)
prometheus.MustRegister(CacheObjectOperations)
prometheus.MustRegister(CacheObjectOperationDuration)
prometheus.MustRegister(&histogramCounterRenamed{
desc: prometheus.NewDesc(
"trickster_cache_operation_objects_total",
"Count (in # of objects) of operations performed on a Trickster cache.",
[]string{"cache_name", "operation", "provider", "status"},
nil,
),
histogram: CacheObjectOperationDuration,
})
prometheus.MustRegister(CacheByteOperations)
prometheus.MustRegister(CacheEvents)
prometheus.MustRegister(CacheObjects)
Expand Down
Loading