Skip to content

WIP #1669

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft

WIP #1669

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 20 additions & 9 deletions internal/conn/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ import (
)

type Pool struct {
usages int64
config Config
dialOptions []grpc.DialOption
conns xsync.Map[string, *conn]
done chan struct{}
usages int64
config Config
dialOptions []grpc.DialOption
conns xsync.Map[string, *conn]
done chan struct{}
parkerStoped chan struct{}
}

func (p *Pool) DialTimeout() time.Duration {
Expand Down Expand Up @@ -160,6 +161,7 @@ func (p *Pool) Release(ctx context.Context) (finalErr error) {
)

wg.Add(cap(errCh))

p.conns.Range(func(_ string, c *conn) bool {
go func(c closer.Closer) {
defer wg.Done()
Expand All @@ -170,6 +172,9 @@ func (p *Pool) Release(ctx context.Context) (finalErr error) {

return true
})

<-p.parkerStoped

wg.Wait()
close(errCh)

Expand All @@ -186,8 +191,11 @@ func (p *Pool) Release(ctx context.Context) (finalErr error) {
}

func (p *Pool) connParker(ctx context.Context, ttl, interval time.Duration) {
defer close(p.parkerStoped)

ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
select {
case <-p.done:
Expand Down Expand Up @@ -216,10 +224,11 @@ func NewPool(ctx context.Context, config Config) *Pool {
defer onDone()

p := &Pool{
usages: 1,
config: config,
dialOptions: config.GrpcDialOptions(),
done: make(chan struct{}),
usages: 1,
config: config,
dialOptions: config.GrpcDialOptions(),
done: make(chan struct{}),
parkerStoped: make(chan struct{}),
}

p.dialOptions = append(p.dialOptions,
Expand Down Expand Up @@ -248,6 +257,8 @@ func NewPool(ctx context.Context, config Config) *Pool {

if ttl := config.ConnectionTTL(); ttl > 0 {
go p.connParker(xcontext.ValueOnly(ctx), ttl, ttl/2) //nolint:gomnd
} else {
close(p.parkerStoped)
}

return p
Expand Down
5 changes: 5 additions & 0 deletions internal/query/execute_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import (
"context"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xruntime"

Check failure on line 5 in internal/query/execute_query.go

View workflow job for this annotation

GitHub Actions / unit (1.21.x, macOS)

no required module provides package github.com/ydb-platform/ydb-go-sdk/v3/internal/xruntime; to add it:

Check failure on line 5 in internal/query/execute_query.go

View workflow job for this annotation

GitHub Actions / unit (1.24.x, macOS)

no required module provides package github.com/ydb-platform/ydb-go-sdk/v3/internal/xruntime; to add it:

Check failure on line 5 in internal/query/execute_query.go

View workflow job for this annotation

GitHub Actions / unit (1.24.x, macOS)

no required module provides package github.com/ydb-platform/ydb-go-sdk/v3/internal/xruntime; to add it:

Check failure on line 5 in internal/query/execute_query.go

View workflow job for this annotation

GitHub Actions / unit (1.24.x, macOS)

no required module provides package github.com/ydb-platform/ydb-go-sdk/v3/internal/xruntime; to add it:

Check failure on line 5 in internal/query/execute_query.go

View workflow job for this annotation

GitHub Actions / unit (1.24.x, macOS)

no required module provides package github.com/ydb-platform/ydb-go-sdk/v3/internal/xruntime; to add it:

Check failure on line 5 in internal/query/execute_query.go

View workflow job for this annotation

GitHub Actions / unit (1.24.x, macOS)

no required module provides package github.com/ydb-platform/ydb-go-sdk/v3/internal/xruntime; to add it:

Check failure on line 5 in internal/query/execute_query.go

View workflow job for this annotation

GitHub Actions / unit (1.24.x, macOS)

no required module provides package github.com/ydb-platform/ydb-go-sdk/v3/internal/xruntime; to add it:

Check failure on line 5 in internal/query/execute_query.go

View workflow job for this annotation

GitHub Actions / unit (1.24.x, macOS)

no required module provides package github.com/ydb-platform/ydb-go-sdk/v3/internal/xruntime; to add it:

Check failure on line 5 in internal/query/execute_query.go

View workflow job for this annotation

GitHub Actions / unit (1.24.x, macOS)

no required module provides package github.com/ydb-platform/ydb-go-sdk/v3/internal/xruntime; to add it:

Check failure on line 5 in internal/query/execute_query.go

View workflow job for this annotation

GitHub Actions / unit (1.24.x, macOS)

no required module provides package github.com/ydb-platform/ydb-go-sdk/v3/internal/xruntime; to add it:

Check failure on line 5 in internal/query/execute_query.go

View workflow job for this annotation

GitHub Actions / unit (1.24.x, macOS)

no required module provides package github.com/ydb-platform/ydb-go-sdk/v3/internal/xruntime; to add it:

Check failure on line 5 in internal/query/execute_query.go

View workflow job for this annotation

GitHub Actions / unit (1.21.x, windows)

no required module provides package github.com/ydb-platform/ydb-go-sdk/v3/internal/xruntime; to add it:

Check failure on line 5 in internal/query/execute_query.go

View workflow job for this annotation

GitHub Actions / unit (1.21.x, ubuntu)

no required module provides package github.com/ydb-platform/ydb-go-sdk/v3/internal/xruntime; to add it:

Check failure on line 5 in internal/query/execute_query.go

View workflow job for this annotation

GitHub Actions / integration (1.21.x, 24.1, ubuntu)

no required module provides package github.com/ydb-platform/ydb-go-sdk/v3/internal/xruntime; to add it:

Check failure on line 5 in internal/query/execute_query.go

View workflow job for this annotation

GitHub Actions / integration (1.24.x, 24.2, ubuntu)

no required module provides package github.com/ydb-platform/ydb-go-sdk/v3/internal/xruntime; to add it:

Check failure on line 5 in internal/query/execute_query.go

View workflow job for this annotation

GitHub Actions / integration (1.21.x, 24.3, ubuntu)

no required module provides package github.com/ydb-platform/ydb-go-sdk/v3/internal/xruntime; to add it:

Check failure on line 5 in internal/query/execute_query.go

View workflow job for this annotation

GitHub Actions / integration (1.21.x, 24.2, ubuntu)

no required module provides package github.com/ydb-platform/ydb-go-sdk/v3/internal/xruntime; to add it:

Check failure on line 5 in internal/query/execute_query.go

View workflow job for this annotation

GitHub Actions / integration (1.24.x, 24.1, ubuntu)

no required module provides package github.com/ydb-platform/ydb-go-sdk/v3/internal/xruntime; to add it:

Check failure on line 5 in internal/query/execute_query.go

View workflow job for this annotation

GitHub Actions / unit (1.24.x, windows)

no required module provides package github.com/ydb-platform/ydb-go-sdk/v3/internal/xruntime; to add it:

Check failure on line 5 in internal/query/execute_query.go

View workflow job for this annotation

GitHub Actions / unit (1.24.x, windows)

no required module provides package github.com/ydb-platform/ydb-go-sdk/v3/internal/xruntime; to add it:

Check failure on line 5 in internal/query/execute_query.go

View workflow job for this annotation

GitHub Actions / unit (1.24.x, windows)

no required module provides package github.com/ydb-platform/ydb-go-sdk/v3/internal/xruntime; to add it:

Check failure on line 5 in internal/query/execute_query.go

View workflow job for this annotation

GitHub Actions / unit (1.24.x, windows)

no required module provides package github.com/ydb-platform/ydb-go-sdk/v3/internal/xruntime; to add it:

Check failure on line 5 in internal/query/execute_query.go

View workflow job for this annotation

GitHub Actions / unit (1.24.x, windows)

no required module provides package github.com/ydb-platform/ydb-go-sdk/v3/internal/xruntime; to add it:

Check failure on line 5 in internal/query/execute_query.go

View workflow job for this annotation

GitHub Actions / unit (1.24.x, windows)

no required module provides package github.com/ydb-platform/ydb-go-sdk/v3/internal/xruntime; to add it:

Check failure on line 5 in internal/query/execute_query.go

View workflow job for this annotation

GitHub Actions / unit (1.24.x, windows)

no required module provides package github.com/ydb-platform/ydb-go-sdk/v3/internal/xruntime; to add it:

Check failure on line 5 in internal/query/execute_query.go

View workflow job for this annotation

GitHub Actions / unit (1.24.x, windows)

no required module provides package github.com/ydb-platform/ydb-go-sdk/v3/internal/xruntime; to add it:

Check failure on line 5 in internal/query/execute_query.go

View workflow job for this annotation

GitHub Actions / unit (1.24.x, windows)

no required module provides package github.com/ydb-platform/ydb-go-sdk/v3/internal/xruntime; to add it:

Check failure on line 5 in internal/query/execute_query.go

View workflow job for this annotation

GitHub Actions / unit (1.24.x, windows)

no required module provides package github.com/ydb-platform/ydb-go-sdk/v3/internal/xruntime; to add it:

Check failure on line 5 in internal/query/execute_query.go

View workflow job for this annotation

GitHub Actions / integration (1.24.x, 24.3, ubuntu)

no required module provides package github.com/ydb-platform/ydb-go-sdk/v3/internal/xruntime; to add it:

Check failure on line 5 in internal/query/execute_query.go

View workflow job for this annotation

GitHub Actions / experiment (1.24.x, nightly, ubuntu)

no required module provides package github.com/ydb-platform/ydb-go-sdk/v3/internal/xruntime; to add it:

Check failure on line 5 in internal/query/execute_query.go

View workflow job for this annotation

GitHub Actions / experiment (1.24.x, nightly, ubuntu)

no required module provides package github.com/ydb-platform/ydb-go-sdk/v3/internal/xruntime; to add it:

Check failure on line 5 in internal/query/execute_query.go

View workflow job for this annotation

GitHub Actions / experiment (1.24.x, nightly, ubuntu)

no required module provides package github.com/ydb-platform/ydb-go-sdk/v3/internal/xruntime; to add it:

Check failure on line 5 in internal/query/execute_query.go

View workflow job for this annotation

GitHub Actions / experiment (1.24.x, nightly, ubuntu)

no required module provides package github.com/ydb-platform/ydb-go-sdk/v3/internal/xruntime; to add it:

Check failure on line 5 in internal/query/execute_query.go

View workflow job for this annotation

GitHub Actions / experiment (1.24.x, nightly, ubuntu)

no required module provides package github.com/ydb-platform/ydb-go-sdk/v3/internal/xruntime; to add it:

Check failure on line 5 in internal/query/execute_query.go

View workflow job for this annotation

GitHub Actions / experiment (1.24.x, nightly, ubuntu)

no required module provides package github.com/ydb-platform/ydb-go-sdk/v3/internal/xruntime; to add it:

Check failure on line 5 in internal/query/execute_query.go

View workflow job for this annotation

GitHub Actions / experiment (1.24.x, nightly, ubuntu)

no required module provides package github.com/ydb-platform/ydb-go-sdk/v3/internal/xruntime; to add it:

Check failure on line 5 in internal/query/execute_query.go

View workflow job for this annotation

GitHub Actions / experiment (1.24.x, nightly, ubuntu)

no required module provides package github.com/ydb-platform/ydb-go-sdk/v3/internal/xruntime; to add it:

Check failure on line 5 in internal/query/execute_query.go

View workflow job for this annotation

GitHub Actions / experiment (1.24.x, nightly, ubuntu)

no required module provides package github.com/ydb-platform/ydb-go-sdk/v3/internal/xruntime; to add it:

Check failure on line 5 in internal/query/execute_query.go

View workflow job for this annotation

GitHub Actions / experiment (1.24.x, nightly, ubuntu)

no required module provides package github.com/ydb-platform/ydb-go-sdk/v3/internal/xruntime; to add it:

Check failure on line 5 in internal/query/execute_query.go

View workflow job for this annotation

GitHub Actions / unit (1.24.x, ubuntu)

no required module provides package github.com/ydb-platform/ydb-go-sdk/v3/internal/xruntime; to add it:

Check failure on line 5 in internal/query/execute_query.go

View workflow job for this annotation

GitHub Actions / unit (1.24.x, ubuntu)

no required module provides package github.com/ydb-platform/ydb-go-sdk/v3/internal/xruntime; to add it:

Check failure on line 5 in internal/query/execute_query.go

View workflow job for this annotation

GitHub Actions / unit (1.24.x, ubuntu)

no required module provides package github.com/ydb-platform/ydb-go-sdk/v3/internal/xruntime; to add it:

Check failure on line 5 in internal/query/execute_query.go

View workflow job for this annotation

GitHub Actions / unit (1.24.x, ubuntu)

no required module provides package github.com/ydb-platform/ydb-go-sdk/v3/internal/xruntime; to add it:

Check failure on line 5 in internal/query/execute_query.go

View workflow job for this annotation

GitHub Actions / unit (1.24.x, ubuntu)

no required module provides package github.com/ydb-platform/ydb-go-sdk/v3/internal/xruntime; to add it:

Check failure on line 5 in internal/query/execute_query.go

View workflow job for this annotation

GitHub Actions / unit (1.24.x, ubuntu)

no required module provides package github.com/ydb-platform/ydb-go-sdk/v3/internal/xruntime; to add it:

Check failure on line 5 in internal/query/execute_query.go

View workflow job for this annotation

GitHub Actions / unit (1.24.x, ubuntu)

no required module provides package github.com/ydb-platform/ydb-go-sdk/v3/internal/xruntime; to add it:

Check failure on line 5 in internal/query/execute_query.go

View workflow job for this annotation

GitHub Actions / unit (1.24.x, ubuntu)

no required module provides package github.com/ydb-platform/ydb-go-sdk/v3/internal/xruntime; to add it:

Check failure on line 5 in internal/query/execute_query.go

View workflow job for this annotation

GitHub Actions / unit (1.24.x, ubuntu)

no required module provides package github.com/ydb-platform/ydb-go-sdk/v3/internal/xruntime; to add it:

Check failure on line 5 in internal/query/execute_query.go

View workflow job for this annotation

GitHub Actions / unit (1.24.x, ubuntu)

no required module provides package github.com/ydb-platform/ydb-go-sdk/v3/internal/xruntime; to add it:
"io"
"time"

Expand Down Expand Up @@ -146,6 +147,10 @@
return nil, xerrors.WithStackTrace(err)
}

xruntime.AddCleanup(r, func(cancelStream context.CancelFunc) {
cancelStream()
}, executeCancel)

return r, nil
}

Expand Down
18 changes: 9 additions & 9 deletions internal/query/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,20 +228,20 @@ func nextPart(stream Ydb_Query_V1.QueryService_ExecuteQueryClient) (
func (r *streamResult) Close(ctx context.Context) (finalErr error) {
defer r.closeOnce()

if r.trace != nil {
onDone := trace.QueryOnResultClose(r.trace, &ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/query.(*streamResult).Close"),
)
defer func() {
onDone(finalErr)
}()
}

for {
select {
case <-r.closed:
return nil
default:
if r.trace != nil {
onDone := trace.QueryOnResultClose(r.trace, &ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/query.(*streamResult).Close"),
)
defer func() {
onDone(finalErr)
}()
}

_, err := r.nextPart(ctx)
if err != nil {
if xerrors.Is(err, io.EOF) {
Expand Down
1 change: 0 additions & 1 deletion internal/query/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package query

import (
"context"

"github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/options"
Expand Down
39 changes: 14 additions & 25 deletions internal/query/session_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package query
import (
"context"
"os"
"runtime/pprof"
"strconv"
"sync/atomic"
"time"

Expand Down Expand Up @@ -38,10 +36,11 @@ type (
SetStatus(code Status)
}
sessionCore struct {
cc grpc.ClientConnInterface
Client Ydb_Query_V1.QueryServiceClient
Trace *trace.Query
done chan struct{}
cc grpc.ClientConnInterface
Client Ydb_Query_V1.QueryServiceClient
Trace *trace.Query
done chan struct{}
attachStreamExited chan struct{}

deleteTimeout time.Duration
id string
Expand Down Expand Up @@ -120,9 +119,10 @@ func Open(
ctx context.Context, client Ydb_Query_V1.QueryServiceClient, opts ...Option,
) (_ *sessionCore, finalErr error) {
core := &sessionCore{
Client: client,
Trace: &trace.Query{},
done: make(chan struct{}),
Client: client,
Trace: &trace.Query{},
done: make(chan struct{}),
attachStreamExited: make(chan struct{}),
}

for _, opt := range opts {
Expand Down Expand Up @@ -200,6 +200,8 @@ func (core *sessionCore) attach(ctx context.Context) (finalErr error) {
core.closeOnce = xsync.OnceFunc(func(ctx context.Context) error {
defer cancelAttach()

<-core.attachStreamExited

core.SetStatus(StatusClosing)
defer core.SetStatus(StatusClosed)

Expand All @@ -210,31 +212,18 @@ func (core *sessionCore) attach(ctx context.Context) (finalErr error) {
return nil
})

if markGoroutineWithLabelNodeIDForAttachStream {
pprof.Do(ctx, pprof.Labels(
"node_id", strconv.Itoa(int(core.NodeID())),
), func(context.Context) {
go core.listenAttachStream(attachStream)
})
} else {
go core.listenAttachStream(attachStream)
}
go core.listenAttachStream(attachStream)

return nil
}

func (core *sessionCore) listenAttachStream(attachStream Ydb_Query_V1.QueryService_AttachSessionClient) {
defer func() {
select {
case <-core.done:
return
default:
close(core.done)
}
close(core.attachStreamExited)
}()

for core.IsAlive() {
if _, recvErr := attachStream.Recv(); recvErr != nil {
if s, recvErr := attachStream.Recv(); recvErr != nil || s.GetStatus() != Ydb.StatusIds_SUCCESS {
return
}
}
Expand Down
39 changes: 17 additions & 22 deletions internal/repeater/repeater.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/ydb-platform/ydb-go-sdk/v3/internal/backoff"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
)

Expand All @@ -29,8 +28,8 @@ type repeater struct {
// Task is a function that must be executed periodically.
task func(context.Context) error

cancel context.CancelFunc
stopped chan struct{}
done chan struct{}
workerStopped chan struct{}

force chan struct{}
clock clockwork.Clock
Expand Down Expand Up @@ -96,16 +95,14 @@ func New(
task func(ctx context.Context) (err error),
opts ...option,
) *repeater {
ctx, cancel := xcontext.WithCancel(ctx)

r := &repeater{
interval: interval,
task: task,
cancel: cancel,
stopped: make(chan struct{}),
force: make(chan struct{}, 1),
clock: clockwork.NewRealClock(),
trace: &trace.Driver{},
interval: interval,
task: task,
done: make(chan struct{}),
workerStopped: make(chan struct{}),
force: make(chan struct{}, 1),
clock: clockwork.NewRealClock(),
trace: &trace.Driver{},
}

for _, opt := range opts {
Expand All @@ -114,17 +111,17 @@ func New(
}
}

go r.worker(ctx, r.clock.NewTicker(interval))
go r.worker(r.clock.NewTicker(interval))

return r
}

func (r *repeater) stop(onCancel func()) {
r.cancel()
close(r.done)
if onCancel != nil {
onCancel()
}
<-r.stopped
<-r.workerStopped
}

// Stop stops to execute its task.
Expand Down Expand Up @@ -162,11 +159,9 @@ func (r *repeater) wakeUp(e Event) (err error) {
return r.task(ctx)
}

func (r *repeater) worker(ctx context.Context, tick clockwork.Ticker) {
defer func() {
close(r.stopped)
tick.Stop()
}()
func (r *repeater) worker(tick clockwork.Ticker) {
defer close(r.workerStopped)
defer tick.Stop()

// force returns backoff with delays [500ms...32s]
force := backoff.New(
Expand All @@ -187,7 +182,7 @@ func (r *repeater) worker(ctx context.Context, tick clockwork.Ticker) {
defer force.Stop()

select {
case <-ctx.Done():
case <-r.done:
return EventCancel
case <-tick.Chan():
return EventTick
Expand All @@ -210,7 +205,7 @@ func (r *repeater) worker(ctx context.Context, tick clockwork.Ticker) {

for {
select {
case <-ctx.Done():
case <-r.done:
return

case <-tick.Chan():
Expand Down
3 changes: 1 addition & 2 deletions internal/xtest/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ package xtest

import (
"context"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
"runtime/pprof"
"testing"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
)

func Context(t testing.TB) context.Context {
Expand Down
Loading
Loading