Skip to content

Commit 94eb9ca

Browse files
committed
Add context to topic logs
1 parent d58c87c commit 94eb9ca

29 files changed

+785
-203
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Added `WithReaderLogContext`, `WithWriterLogContext` options to topic reader/writer to supply log entries with user context fields
2+
13
## v3.105.2
24
* Improved the `ydb.WithSessionPoolSessionUsageLimit` option for allow `time.Duration` as argument type for limit max session time to live since create time
35

go.mod

+3-2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ require (
88
github.com/jonboulle/clockwork v0.3.0
99
github.com/ydb-platform/ydb-go-genproto v0.0.0-20241112172322-ea1f63298f77
1010
go.uber.org/goleak v1.3.0
11+
go.uber.org/zap v1.27.0
1112
golang.org/x/net v0.33.0
1213
golang.org/x/sync v0.10.0
1314
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
@@ -18,15 +19,15 @@ require (
1819
// requires for tests only
1920
require (
2021
github.com/rekby/fixenv v0.6.1
21-
github.com/stretchr/testify v1.8.0
22+
github.com/stretchr/testify v1.8.1
2223
go.uber.org/mock v0.4.0
2324
)
2425

2526
require (
2627
github.com/davecgh/go-spew v1.1.1 // indirect
2728
github.com/golang/protobuf v1.5.3 // indirect
28-
github.com/kr/text v0.2.0 // indirect
2929
github.com/pmezard/go-difflib v1.0.0 // indirect
30+
go.uber.org/multierr v1.11.0 // indirect
3031
golang.org/x/sys v0.28.0 // indirect
3132
golang.org/x/text v0.21.0 // indirect
3233
google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 // indirect

go.sum

+7-2
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XP
1111
github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
1212
github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
1313
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
14-
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
1514
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
1615
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
1716
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -68,18 +67,24 @@ github.com/rekby/fixenv v0.6.1/go.mod h1:/b5LRc06BYJtslRtHKxsPWFT/ySpHV+rWvzTg+X
6867
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
6968
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
7069
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
70+
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
7171
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
7272
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
7373
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
74-
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
7574
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
75+
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
76+
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
7677
github.com/ydb-platform/ydb-go-genproto v0.0.0-20241112172322-ea1f63298f77 h1:LY6cI8cP4B9rrpTleZk95+08kl2gF4rixG7+V/dwL6Q=
7778
github.com/ydb-platform/ydb-go-genproto v0.0.0-20241112172322-ea1f63298f77/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I=
7879
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
7980
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
8081
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
8182
go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU=
8283
go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc=
84+
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
85+
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
86+
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
87+
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
8388
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
8489
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
8590
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=

internal/grpcwrapper/rawtopic/client.go

+1
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ func (c *Client) StreamWrite(
126126
Stream: protoResp,
127127
Tracer: tracer,
128128
InternalStreamID: uuid.New().String(),
129+
LogContext: &ctxStreamLifeTime,
129130
}, nil
130131
}
131132

internal/grpcwrapper/rawtopic/rawtopicwriter/streamwriter.go

+12-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package rawtopicwriter
22

33
import (
4+
"context"
45
"errors"
56
"fmt"
67
"reflect"
@@ -34,6 +35,7 @@ type StreamWriter struct {
3435
readMessagesCount int
3536
writtenMessagesCount int
3637
sessionID string
38+
LogContext *context.Context
3739
}
3840

3941
//nolint:funlen
@@ -50,7 +52,7 @@ func (w *StreamWriter) Recv() (ServerMessage, error) {
5052
defer func() {
5153
// defer needs for set good session id on first init response before trace the message
5254
trace.TopicOnWriterReceiveGRPCMessage(
53-
w.Tracer, w.InternalStreamID, w.sessionID, w.readMessagesCount, grpcMsg, sendErr,
55+
w.Tracer, w.LogContext, w.InternalStreamID, w.sessionID, w.readMessagesCount, grpcMsg, sendErr,
5456
)
5557
}()
5658
if sendErr != nil {
@@ -139,7 +141,15 @@ func (w *StreamWriter) Send(rawMsg ClientMessage) (err error) {
139141

140142
err = w.Stream.Send(&protoMsg)
141143
w.writtenMessagesCount++
142-
trace.TopicOnWriterSentGRPCMessage(w.Tracer, w.InternalStreamID, w.sessionID, w.writtenMessagesCount, &protoMsg, err)
144+
trace.TopicOnWriterSentGRPCMessage(
145+
w.Tracer,
146+
w.LogContext,
147+
w.InternalStreamID,
148+
w.sessionID,
149+
w.writtenMessagesCount,
150+
&protoMsg,
151+
err,
152+
)
143153
if err != nil {
144154
return xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf("ydb: failed to send grpc message to writer stream: %w", err)))
145155
}

internal/topic/topicclientinternal/client.go

+3-9
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,8 @@ func (c *Client) StartReader(
322322
if err != nil {
323323
return nil, err
324324
}
325-
trace.TopicOnReaderStart(internalReader.Tracer(), internalReader.ID(), consumer, err)
325+
326+
internalReader.TopicOnReaderStart(consumer, err)
326327

327328
return topicreader.NewReader(internalReader), nil
328329
}
@@ -365,15 +366,8 @@ func (c *Client) createWriterConfig(
365366
topicPath string,
366367
opts []topicoptions.WriterOption,
367368
) topicwriterinternal.WriterReconnectorConfig {
368-
var connector topicwriterinternal.ConnectFunc = func(ctx context.Context, tracer *trace.Topic) (
369-
topicwriterinternal.RawTopicWriterStream,
370-
error,
371-
) {
372-
return c.rawClient.StreamWrite(ctx, tracer)
373-
}
374-
375369
options := []topicoptions.WriterOption{
376-
topicwriterinternal.WithConnectFunc(connector),
370+
topicwriterinternal.WithRawClient(&c.rawClient),
377371
topicwriterinternal.WithTopic(topicPath),
378372
topicwriterinternal.WithCommonConfig(c.cfg.Common),
379373
topicwriterinternal.WithTrace(c.cfg.Trace),

internal/topic/topicreadercommon/committer.go

+1
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ func (c *Committer) pushCommitsLoop(ctx context.Context) {
148148

149149
onDone := trace.TopicOnReaderSendCommitMessage(
150150
c.tracer,
151+
&ctx,
151152
&commits,
152153
)
153154
err := c.send(commits.ToRawMessage())

internal/topic/topicreaderinternal/batched_stream_reader_interface.go

+1
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,5 @@ type batchedStreamReader interface {
1515
Commit(ctx context.Context, commitRange topicreadercommon.CommitRange) error
1616
CloseWithError(ctx context.Context, err error) error
1717
PopMessagesBatchTx(ctx context.Context, tx tx.Transaction, opts ReadMessageBatchOptions) (*topicreadercommon.PublicBatch, error) //nolint:lll
18+
TopicOnReaderStart(consumer string, err error)
1819
}

internal/topic/topicreaderinternal/batched_stream_reader_mock_test.go

+36
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/topic/topicreaderinternal/reader.go

+14-7
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ type Reader struct {
3939
readerID int64
4040
}
4141

42+
func (r *Reader) TopicOnReaderStart(consumer string, err error) {
43+
r.reader.TopicOnReaderStart(consumer, err)
44+
}
45+
4246
type ReadMessageBatchOptions struct {
4347
batcherGetOptions
4448
}
@@ -93,14 +97,17 @@ func NewReader(
9397
return newTopicStreamReader(client, readerID, stream, cfg.topicStreamReaderConfig)
9498
}
9599

100+
reader := newReaderReconnector(
101+
cfg.BaseContext,
102+
readerID,
103+
readerConnector,
104+
cfg.OperationTimeout(),
105+
cfg.RetrySettings,
106+
cfg.Trace,
107+
)
108+
96109
res := Reader{
97-
reader: newReaderReconnector(
98-
readerID,
99-
readerConnector,
100-
cfg.OperationTimeout(),
101-
cfg.RetrySettings,
102-
cfg.Trace,
103-
),
110+
reader: reader,
104111
defaultBatchConfig: cfg.DefaultBatchConfig,
105112
tracer: cfg.Trace,
106113
readerID: readerID,

0 commit comments

Comments
 (0)