Skip to content

Commit 3d849ac

Browse files
authored
Merge pull request #1659 from ydb-platform/attach-stream-goro-label
refactoring of attach stream goroutine label
2 parents 7ab31e5 + 06d5fae commit 3d849ac

File tree

1 file changed

+28
-32
lines changed

1 file changed

+28
-32
lines changed

internal/query/session_core.go

+28-32
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import (
2626
)
2727

2828
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
29-
var setGoroutineLabelForAttachStream = os.Getenv("YDB_QUERY_SESSION_ATTACH_STREAM_GOROUTINE_LABEL") == "1"
29+
var markGoroutineWithLabelNodeIDForAttachStream = os.Getenv("YDB_QUERY_SESSION_ATTACH_STREAM_GOROUTINE_LABEL") == "1"
3030

3131
type (
3232
Core interface {
@@ -116,7 +116,6 @@ func WithTrace(t *trace.Query) Option {
116116
}
117117
}
118118

119-
//nolint:funlen
120119
func Open(
121120
ctx context.Context, client Ydb_Query_V1.QueryServiceClient, opts ...Option,
122121
) (_ *sessionCore, finalErr error) {
@@ -159,20 +158,7 @@ func Open(
159158
core.id = response.GetSessionId()
160159
core.nodeID = uint32(response.GetNodeId())
161160

162-
if setGoroutineLabelForAttachStream {
163-
attachRes := make(chan error)
164-
pprof.Do(ctx, pprof.Labels(
165-
"node_id", strconv.Itoa(int(core.NodeID())),
166-
), func(ctx context.Context) {
167-
go func() {
168-
attachRes <- core.attach(ctx)
169-
}()
170-
})
171-
err = <-attachRes
172-
} else {
173-
err = core.attach(ctx)
174-
}
175-
if err != nil {
161+
if err = core.attach(ctx); err != nil {
176162
_ = core.deleteSession(ctx)
177163

178164
return nil, xerrors.WithStackTrace(err)
@@ -199,14 +185,14 @@ func (core *sessionCore) attach(ctx context.Context) (finalErr error) {
199185
}
200186
}()
201187

202-
attach, err := core.Client.AttachSession(attachCtx, &Ydb_Query.AttachSessionRequest{
188+
attachStream, err := core.Client.AttachSession(attachCtx, &Ydb_Query.AttachSessionRequest{
203189
SessionId: core.id,
204190
})
205191
if err != nil {
206192
return xerrors.WithStackTrace(err)
207193
}
208194

209-
_, err = attach.Recv()
195+
_, err = attachStream.Recv()
210196
if err != nil {
211197
return xerrors.WithStackTrace(err)
212198
}
@@ -224,24 +210,34 @@ func (core *sessionCore) attach(ctx context.Context) (finalErr error) {
224210
return nil
225211
})
226212

227-
go func() {
228-
defer func() {
229-
select {
230-
case <-core.done:
231-
return
232-
default:
233-
close(core.done)
234-
}
235-
}()
213+
if markGoroutineWithLabelNodeIDForAttachStream {
214+
pprof.Do(ctx, pprof.Labels(
215+
"node_id", strconv.Itoa(int(core.NodeID())),
216+
), func(context.Context) {
217+
go core.listenAttachStream(attachStream)
218+
})
219+
} else {
220+
go core.listenAttachStream(attachStream)
221+
}
236222

237-
for core.IsAlive() {
238-
if _, recvErr := attach.Recv(); recvErr != nil {
239-
return
240-
}
223+
return nil
224+
}
225+
226+
func (core *sessionCore) listenAttachStream(attachStream Ydb_Query_V1.QueryService_AttachSessionClient) {
227+
defer func() {
228+
select {
229+
case <-core.done:
230+
return
231+
default:
232+
close(core.done)
241233
}
242234
}()
243235

244-
return nil
236+
for core.IsAlive() {
237+
if _, recvErr := attachStream.Recv(); recvErr != nil {
238+
return
239+
}
240+
}
245241
}
246242

247243
func (core *sessionCore) deleteSession(ctx context.Context) (finalErr error) {

0 commit comments

Comments
 (0)