Skip to content

fix(valkey): Make error checking configurable and fix inconsistency #3297

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions contrib/redis/rueidis/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
package rueidis

import (
"github.com/redis/rueidis"
"gopkg.in/DataDog/dd-trace-go.v1/internal"
"gopkg.in/DataDog/dd-trace-go.v1/internal/namingschema"
)

type config struct {
rawCommand bool
serviceName string
errCheck func(err error) bool
}

// Option represents an option that can be used to create or wrap a client.
Expand All @@ -23,6 +25,9 @@ func defaultConfig() *config {
// Do not include the raw command by default since it could contain sensitive data.
rawCommand: internal.BoolEnv("DD_TRACE_REDIS_RAW_COMMAND", false),
serviceName: namingschema.ServiceName(defaultServiceName),
errCheck: func(err error) bool {
return err != nil && !rueidis.IsRedisNil(err)
},
}
}

Expand All @@ -39,3 +44,11 @@ func WithServiceName(name string) Option {
cfg.serviceName = name
}
}

// WithErrorCheck specifies a function fn which determines whether the passed
// error should be marked as an error.
func WithErrorCheck(fn func(err error) bool) Option {
return func(cfg *config) {
cfg.errCheck = fn
}
}
26 changes: 13 additions & 13 deletions contrib/redis/rueidis/rueidis.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,21 @@ func (c *client) startSpan(ctx context.Context, cmd command) (tracer.Span, conte

func (c *client) finishSpan(span tracer.Span, err error) {
var opts []tracer.FinishOption
if err != nil && !rueidis.IsRedisNil(err) {
if c.cfg.errCheck(err) {
opts = append(opts, tracer.WithError(err))
}
span.Finish(opts...)
}

func (c *client) firstError(s []rueidis.RedisResult) error {
for _, result := range s {
if err := result.Error(); c.cfg.errCheck(err) {
return err
}
}
return nil
}

func (c *client) B() rueidis.Builder {
return c.client.B()
}
Expand All @@ -117,14 +126,14 @@ func (c *client) Do(ctx context.Context, cmd rueidis.Completed) rueidis.RedisRes
span, ctx := c.startSpan(ctx, processCommand(&cmd))
resp := c.client.Do(ctx, cmd)
setClientCacheTags(span, resp)
span.Finish(tracer.WithError(resp.Error()))
c.finishSpan(span, resp.Error())
return resp
}

func (c *client) DoMulti(ctx context.Context, multi ...rueidis.Completed) []rueidis.RedisResult {
span, ctx := c.startSpan(ctx, processCommandMulti(multi))
resp := c.client.DoMulti(ctx, multi...)
c.finishSpan(span, firstError(resp))
c.finishSpan(span, c.firstError(resp))
return resp
}

Expand All @@ -150,7 +159,7 @@ func (c *client) DoCache(ctx context.Context, cmd rueidis.Cacheable, ttl time.Du
func (c *client) DoMultiCache(ctx context.Context, multi ...rueidis.CacheableTTL) []rueidis.RedisResult {
span, ctx := c.startSpan(ctx, processCommandMultiCache(multi))
resp := c.client.DoMultiCache(ctx, multi...)
c.finishSpan(span, firstError(resp))
c.finishSpan(span, c.firstError(resp))
return resp
}

Expand Down Expand Up @@ -268,15 +277,6 @@ func multiCommand(cmds []command) command {
}
}

func firstError(s []rueidis.RedisResult) error {
for _, result := range s {
if err := result.Error(); err != nil && !rueidis.IsRedisNil(err) {
return err
}
}
return nil
}

func setClientCacheTags(s tracer.Span, result rueidis.RedisResult) {
s.SetTag(ext.RedisClientCacheHit, result.IsCacheHit())
s.SetTag(ext.RedisClientCacheTTL, result.CacheTTL())
Expand Down
49 changes: 49 additions & 0 deletions contrib/redis/rueidis/rueidis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package rueidis

import (
"context"
"errors"
"fmt"
"os"
"testing"
Expand Down Expand Up @@ -253,6 +254,54 @@ func TestNewClient(t *testing.T) {
},
wantServiceName: "global-service",
},
{
name: "Test SET command with canceled context and custom error check",
opts: []Option{
WithErrorCheck(func(err error) bool {
return err != nil && !rueidis.IsRedisNil(err) && !errors.Is(err, context.Canceled)
}),
},
runTest: func(t *testing.T, ctx context.Context, client rueidis.Client) {
ctx, cancel := context.WithCancel(ctx)
cancel()
require.Error(t, client.Do(ctx, client.B().Set().Key("test_key").Value("test_value").Build()).Error())
},
assertSpans: func(t *testing.T, spans []mocktracer.Span) {
require.Len(t, spans, 1)

span := spans[0]
assert.Equal(t, "SET", span.Tag(ext.ResourceName))
assert.Nil(t, span.Tag(ext.RedisRawCommand))
assert.Equal(t, false, span.Tag(ext.RedisClientCacheHit))
assert.Less(t, span.Tag(ext.RedisClientCacheTTL), int64(0))
assert.Less(t, span.Tag(ext.RedisClientCachePXAT), int64(0))
assert.Less(t, span.Tag(ext.RedisClientCachePTTL), int64(0))
assert.Nil(t, span.Tag(ext.Error))
},
wantServiceName: "global-service",
},
{
name: "Test redis nil not attached to span",
opts: []Option{
WithRawCommand(true),
},
runTest: func(t *testing.T, ctx context.Context, client rueidis.Client) {
require.Error(t, client.Do(ctx, client.B().Get().Key("404").Build()).Error())
},
assertSpans: func(t *testing.T, spans []mocktracer.Span) {
require.Len(t, spans, 1)

span := spans[0]
assert.Equal(t, "GET", span.Tag(ext.ResourceName))
assert.Equal(t, "GET 404", span.Tag(ext.RedisRawCommand))
assert.Equal(t, false, span.Tag(ext.RedisClientCacheHit))
assert.Less(t, span.Tag(ext.RedisClientCacheTTL), int64(0))
assert.Less(t, span.Tag(ext.RedisClientCachePXAT), int64(0))
assert.Less(t, span.Tag(ext.RedisClientCachePTTL), int64(0))
assert.Nil(t, span.Tag(ext.Error))
},
wantServiceName: "global-service",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
13 changes: 13 additions & 0 deletions contrib/valkey-go/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
package valkey

import (
"github.com/valkey-io/valkey-go"
"gopkg.in/DataDog/dd-trace-go.v1/internal"
"gopkg.in/DataDog/dd-trace-go.v1/internal/namingschema"
)

type config struct {
rawCommand bool
serviceName string
errCheck func(err error) bool
}

// Option represents an option that can be used to create or wrap a client.
Expand All @@ -23,6 +25,9 @@ func defaultConfig() *config {
// Do not include the raw command by default since it could contain sensitive data.
rawCommand: internal.BoolEnv("DD_TRACE_VALKEY_RAW_COMMAND", false),
serviceName: namingschema.ServiceName(defaultServiceName),
errCheck: func(err error) bool {
return err != nil && !valkey.IsValkeyNil(err)
},
}
}

Expand All @@ -40,3 +45,11 @@ func WithServiceName(name string) Option {
cfg.serviceName = name
}
}

// WithErrorCheck specifies a function fn which determines whether the passed
// error should be marked as an error.
func WithErrorCheck(fn func(err error) bool) Option {
return func(cfg *config) {
cfg.errCheck = fn
}
}
26 changes: 13 additions & 13 deletions contrib/valkey-go/valkey.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,14 @@ func (c *client) Do(ctx context.Context, cmd valkey.Completed) valkey.ValkeyResu
span, ctx := c.startSpan(ctx, processCommand(&cmd))
resp := c.client.Do(ctx, cmd)
setClientCacheTags(span, resp)
span.Finish(tracer.WithError(resp.Error()))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A valkey nil error will get attached to the span here currently.

c.finishSpan(span, resp.Error())
return resp
}

func (c *client) DoMulti(ctx context.Context, multi ...valkey.Completed) []valkey.ValkeyResult {
span, ctx := c.startSpan(ctx, processCommandMulti(multi))
resp := c.client.DoMulti(ctx, multi...)
c.finishSpan(span, firstError(resp))
c.finishSpan(span, c.firstError(resp))
return resp
}

Expand All @@ -109,7 +109,7 @@ func (c *client) DoCache(ctx context.Context, cmd valkey.Cacheable, ttl time.Dur
func (c *client) DoMultiCache(ctx context.Context, multi ...valkey.CacheableTTL) []valkey.ValkeyResult {
span, ctx := c.startSpan(ctx, processCommandMultiCache(multi))
resp := c.client.DoMultiCache(ctx, multi...)
c.finishSpan(span, firstError(resp))
c.finishSpan(span, c.firstError(resp))
return resp
}

Expand Down Expand Up @@ -211,12 +211,21 @@ func (c *client) startSpan(ctx context.Context, cmd command) (tracer.Span, conte

func (c *client) finishSpan(span tracer.Span, err error) {
var opts []tracer.FinishOption
if err != nil && !valkey.IsValkeyNil(err) {
if c.cfg.errCheck(err) {
opts = append(opts, tracer.WithError(err))
}
span.Finish(opts...)
}

func (c *client) firstError(s []valkey.ValkeyResult) error {
for _, result := range s {
if err := result.Error(); c.cfg.errCheck(err) {
return err
}
}
return nil
}

type commander interface {
Commands() []string
}
Expand Down Expand Up @@ -271,15 +280,6 @@ func multiCommand(cmds []command) command {
}
}

func firstError(s []valkey.ValkeyResult) error {
for _, result := range s {
if err := result.Error(); err != nil && !valkey.IsValkeyNil(err) {
return err
}
}
return nil
}

func setClientCacheTags(s tracer.Span, result valkey.ValkeyResult) {
s.SetTag(ext.ValkeyClientCacheHit, result.IsCacheHit())
s.SetTag(ext.ValkeyClientCacheTTL, result.CacheTTL())
Expand Down
49 changes: 49 additions & 0 deletions contrib/valkey-go/valkey_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package valkey

import (
"context"
"errors"
"fmt"
"os"
"testing"
Expand Down Expand Up @@ -260,6 +261,54 @@ func TestNewClient(t *testing.T) {
},
wantServiceName: "global-service",
},
{
name: "Test SET command with canceled context and custom error check",
opts: []Option{
WithErrorCheck(func(err error) bool {
return err != nil && !valkey.IsValkeyNil(err) && !errors.Is(err, context.Canceled)
}),
},
runTest: func(t *testing.T, ctx context.Context, client valkey.Client) {
ctx, cancel := context.WithCancel(ctx)
cancel()
require.Error(t, client.Do(ctx, client.B().Set().Key("test_key").Value("test_value").Build()).Error())
},
assertSpans: func(t *testing.T, spans []mocktracer.Span) {
require.Len(t, spans, 1)

span := spans[0]
assert.Equal(t, "SET", span.Tag(ext.ResourceName))
assert.Nil(t, span.Tag(ext.ValkeyRawCommand))
assert.Equal(t, false, span.Tag(ext.ValkeyClientCacheHit))
assert.Less(t, span.Tag(ext.ValkeyClientCacheTTL), int64(0))
assert.Less(t, span.Tag(ext.ValkeyClientCachePXAT), int64(0))
assert.Less(t, span.Tag(ext.ValkeyClientCachePTTL), int64(0))
assert.Nil(t, span.Tag(ext.Error))
},
wantServiceName: "global-service",
},
{
name: "Test valkey nil not attached to span",
opts: []Option{
WithRawCommand(true),
},
runTest: func(t *testing.T, ctx context.Context, client valkey.Client) {
require.Error(t, client.Do(ctx, client.B().Get().Key("404").Build()).Error())
},
assertSpans: func(t *testing.T, spans []mocktracer.Span) {
require.Len(t, spans, 1)

span := spans[0]
assert.Equal(t, "GET", span.Tag(ext.ResourceName))
assert.Equal(t, "GET 404", span.Tag(ext.ValkeyRawCommand))
assert.Equal(t, false, span.Tag(ext.ValkeyClientCacheHit))
assert.Less(t, span.Tag(ext.ValkeyClientCacheTTL), int64(0))
assert.Less(t, span.Tag(ext.ValkeyClientCachePXAT), int64(0))
assert.Less(t, span.Tag(ext.ValkeyClientCachePTTL), int64(0))
assert.Nil(t, span.Tag(ext.Error))
},
wantServiceName: "global-service",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
Loading