Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/kv/kvserver/rangefeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ go_library(
"//pkg/util/uuid",
"@com_github_cockroachdb_crlib//crtime",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
],
)

Expand Down
166 changes: 101 additions & 65 deletions pkg/kv/kvserver/rangefeed/buffered_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,18 @@ package rangefeed
import (
"context"
"fmt"
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)

// ┌─────────────────┐
Expand Down Expand Up @@ -104,17 +105,43 @@ const (
// streamOverflowing is the state we are in when the stream has reached its
// limit and is waiting to deliver an error.
streamOverflowing streamState = iota
// streamOverflowed means the stream has overflowed and the error has been
// placed in the queue.
streamOverflowed streamState = iota
// streamErrored means an error has been enqueued for this stream and further
// buffered sends will be ignored. Streams in this state will not be found in
// the status map.
streamErrored streamState = iota
)

func (s streamState) String() string {
switch s {
case streamActive:
return "active"
case streamOverflowing:
return "overflowing"
case streamErrored:
return "errored"
default:
return "unknown"
}
}

func (s streamState) SafeFormat(w redact.SafePrinter, _ rune) {
w.Printf("%s", redact.SafeString(s.String()))
}

type streamStatus struct {
// queueItems is the number of items for a given stream in the event queue.
queueItems int64
state streamState
}

func (s streamStatus) String() string {
return fmt.Sprintf("%s [queue_len:%d]", s.state, s.queueItems)
}

func (s streamStatus) SafeFormat(w redact.SafePrinter, _ rune) {
w.Printf("%s", redact.SafeString(s.String()))
}

func NewBufferedSender(
sender ServerStreamSender, settings *cluster.Settings, bsMetrics *BufferedSenderMetrics,
) *BufferedSender {
Expand Down Expand Up @@ -145,55 +172,28 @@ func (bs *BufferedSender) sendBuffered(
// request. If the stream has hit its limit, we return an error to the
// registration. This error should be the next event that is sent to
// stream.
//
// NB: We don't error if the stream status is not found as this may be an
// event for an already closed stream. Such events are possible while the
// registration publishes the catch up scan buffer.
status, ok := bs.queueMu.byStream[ev.StreamID]
if ok {
switch status.state {
case streamActive:
if bs.queueMu.perStreamCapacity > 0 && status.queueItems == bs.queueMu.perStreamCapacity {
if ev.Error != nil {
// If _this_ event is an error, no use sending another error. This stream
// is going down. Admit this error and mark the stream as overflowed.
status.state = streamOverflowed
} else {
// This stream is at capacity, return an error to the registration that it
// should send back to us after cleaning up.
status.state = streamOverflowing
return newRetryErrBufferCapacityExceeded()
}
}
case streamOverflowing:
// The unbufferedRegistration is the only component that sends non-error
// events to our stream. In response to the error we return when moving to
// stateOverflowing, it should immediately send us an error and mark itself
// as disconnected.
//
// The only unfortunate exception is if we get disconnected while flushing
// the catch-up scan buffer. In this case we admit the event and stay in
// state overflowing until we actually receive the error.
//
// TODO(ssd): Given the above exception, we should perhaps just move
// directly to streamOverflowed. But, I think instead we want to remove
// that exception if possible.
if ev.Error != nil {
status.state = streamOverflowed
}
case streamOverflowed:
// If we are overflowed, we don't expect any further events because the
// registration should have disconnected in response to the error.
//
// TODO(ssd): Consider adding an assertion here.
return nil
default:
panic(fmt.Sprintf("unhandled stream state: %v", status.state))
}
// We are admitting this event.
if !ok {
// We don't error if the stream status is not found as this may be an
// event for an already closed stream. Such events are possible while the
// registration publishes the catch up scan buffer.
//
// The client ignores such events, but we drop it here regardless.
return nil
}
nextState, shouldAdmit, err := bs.nextPerQueueStateLocked(status, ev)
status.state = nextState
if shouldAdmit {
status.queueItems++
}
if nextState == streamErrored {
delete(bs.queueMu.byStream, ev.StreamID)
} else {
bs.queueMu.byStream[ev.StreamID] = status
}

if err != nil || !shouldAdmit {
return err
}

// TODO(wenyihu6): pass an actual context here
Expand All @@ -207,8 +207,46 @@ func (bs *BufferedSender) sendBuffered(
return nil
}

func (bs *BufferedSender) nextPerQueueStateLocked(
status streamStatus, ev *kvpb.MuxRangeFeedEvent,
) (streamState, bool, error) {
// An error should always put us into stateErrored, so let's do that first.
if ev.Error != nil {
if status.state == streamErrored {
assumedUnreachable("unexpected buffered event on stream in state streamErrored")
}
return streamErrored, true, nil
}

switch status.state {
case streamActive:
if bs.queueMu.perStreamCapacity > 0 && status.queueItems == bs.queueMu.perStreamCapacity {
// This stream is at capacity, return an error to the registration that it
// should send back to us after cleaning up.
return streamOverflowing, false, newRetryErrBufferCapacityExceeded()
}
// Happy path.
return streamActive, true, nil
case streamOverflowing:
// The only place we do concurrent buffered sends is during catch-up scan
// publishing which may be concurrent with a disconnect. The catch-up scan
// will stop publishing if it receives an error and try to send an error
// back. A disconnect only sends an error. This path exclusively handles non
// errors.
assumedUnreachable("unexpected buffered event on stream in state streamOverflowing")
return streamOverflowing, false, nil
case streamErrored:
// This is unexpected because streamErrored streams are removed from the
// status map and future sends are ignored.
assumedUnreachable("unexpected buffered event on stream in state streamErrored")
return streamErrored, false, nil
default:
panic(fmt.Sprintf("unhandled stream state: %v", status.state))
}
}

// sendUnbuffered sends the event directly to the underlying
// ServerStreamSender. It bypasses the buffer and thus may block.
// ServerStreamSender. It bypasses the buffer and thus may block.
func (bs *BufferedSender) sendUnbuffered(ev *kvpb.MuxRangeFeedEvent) error {
return bs.sender.Send(ev)
}
Expand Down Expand Up @@ -273,24 +311,10 @@ func (bs *BufferedSender) addStream(streamID int64) {
if _, ok := bs.queueMu.byStream[streamID]; !ok {
bs.queueMu.byStream[streamID] = streamStatus{}
} else {
if buildutil.CrdbTestBuild {
panic(fmt.Sprintf("stream %d already exists in buffered sender", streamID))
}
assumedUnreachable(fmt.Sprintf("stream %d already exists in buffered sender", streamID))
}
}

// removeStream removes the per-stream state tracking from the sender.
//
// TODO(ssd): There may be items still in the queue when removeStream is called.
// We'd like to solve this by removing this as a possibility. But this is OK
// since we will eventually process the events and the client knows to ignore
// them.
func (bs *BufferedSender) removeStream(streamID int64) {
bs.queueMu.Lock()
defer bs.queueMu.Unlock()
delete(bs.queueMu.byStream, streamID)
}

// cleanup is called when the sender is stopped. It is expected to free up
// buffer queue and no new events should be buffered after this.
func (bs *BufferedSender) cleanup(ctx context.Context) {
Expand All @@ -309,6 +333,18 @@ func (bs *BufferedSender) len() int {
return int(bs.queueMu.buffer.len())
}

func (bs *BufferedSender) TestingBufferSummary() string {
bs.queueMu.Lock()
defer bs.queueMu.Unlock()

summary := &strings.Builder{}
fmt.Fprintf(summary, "buffered sender: queue_len=%d streams=%d", bs.queueMu.buffer.len(), len(bs.queueMu.byStream))
for id, stream := range bs.queueMu.byStream {
fmt.Fprintf(summary, "\n %d: %s", id, stream)
}
return summary.String()
}

// Used for testing only.
func (bs *BufferedSender) waitForEmptyBuffer(ctx context.Context) error {
opts := retry.Options{
Expand Down
110 changes: 109 additions & 1 deletion pkg/kv/kvserver/rangefeed/buffered_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func TestBufferedSenderDisconnectStream(t *testing.T) {

t.Run("basic operation", func(t *testing.T) {
var num atomic.Int32
sm.RegisteringStream(streamID)
sm.AddStream(int64(streamID), &cancelCtxDisconnector{
cancel: func() {
num.Add(1)
Expand All @@ -64,6 +65,7 @@ func TestBufferedSenderDisconnectStream(t *testing.T) {
testServerStream.reset()
})
t.Run("disconnect stream on the same stream is idempotent", func(t *testing.T) {
sm.RegisteringStream(streamID)
sm.AddStream(int64(streamID), &cancelCtxDisconnector{
cancel: func() {
require.NoError(t, sm.sender.sendBuffered(errEvent, nil))
Expand Down Expand Up @@ -113,6 +115,7 @@ func TestBufferedSenderChaosWithStop(t *testing.T) {
require.LessOrEqualf(t, activeStreamStart, activeStreamEnd, "test programming error")
if addStream || activeStreamStart == activeStreamEnd {
streamID := activeStreamEnd
sm.RegisteringStream(streamID)
sm.AddStream(streamID, &cancelCtxDisconnector{
cancel: func() {
actualSum.Add(1)
Expand Down Expand Up @@ -186,7 +189,10 @@ func TestBufferedSenderOnOverflow(t *testing.T) {

RangefeedSingleBufferedSenderQueueMaxPerReg.Override(ctx, &st.SV, queueCap)
bs := NewBufferedSender(testServerStream, st, NewBufferedSenderMetrics())
bs.addStream(streamID)
smMetrics := NewStreamManagerMetrics()
sm := NewStreamManager(bs, smMetrics)
sm.RegisteringStream(streamID)

require.Equal(t, queueCap, bs.queueMu.perStreamCapacity)

val1 := roachpb.Value{RawBytes: []byte("val"), Timestamp: hlc.Timestamp{WallTime: 1}}
Expand All @@ -208,9 +214,111 @@ func TestBufferedSenderOnOverflow(t *testing.T) {
require.NoError(t, bs.sendBuffered(muxEv, nil))
require.Equal(t, queueCap, int64(bs.len()))

t.Logf(bs.TestingBufferSummary())
// Overflow now.
err := bs.sendBuffered(muxEv, nil)
require.EqualError(t, err, newRetryErrBufferCapacityExceeded().Error())
t.Logf(bs.TestingBufferSummary())

// Start the stream manager now, which will start to drain the overflowed stream.
require.NoError(t, sm.Start(ctx, stopper))
defer sm.Stop(ctx)

testServerStream.waitForEventCount(t, int(queueCap))
testServerStream.reset()

// The stream should now be in state overflowing. The next error event will
// remove the stream from our state map. Any subsequent events will be
// dropped.
ev2 := kvpb.RangeFeedEvent{Error: &kvpb.RangeFeedError{Error: *newErrBufferCapacityExceeded()}}
muxErrEvent := &kvpb.MuxRangeFeedEvent{RangeFeedEvent: ev2, RangeID: 0, StreamID: streamID}

t.Logf(bs.TestingBufferSummary())
require.NoError(t, bs.sendBuffered(muxErrEvent, nil))
require.NoError(t, bs.sendBuffered(muxEv, nil))
t.Logf(bs.TestingBufferSummary())

testServerStream.waitForEvent(t, muxErrEvent)
require.Equal(t, 1, testServerStream.totalEventsSent())

// Once the error event has been delivered to the client, all additional
// requests should be dropped.
//
// We use a second stream "flush" the buffer and assert we don't see these
// events. This assumes buffered sender orders unrelated streams.
muxEv2 := *muxEv
muxEv2.StreamID = 2
sm.RegisteringStream(2)
testServerStream.reset()
require.NoError(t, bs.sendBuffered(muxEv, nil))
require.NoError(t, bs.sendBuffered(muxErrEvent, nil))
require.NoError(t, bs.sendBuffered(&muxEv2, nil))
testServerStream.waitForEvent(t, &muxEv2)
require.Equal(t, 1, testServerStream.totalEventsSent())
}

// TestBufferedSenderOnOverflowWithErrorEvent tests the special case of a stream
// hitting an overflow on an event that is itself an error.
func TestBufferedSenderOnOverflowWithErrorEvent(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

stopper := stop.NewStopper()
defer stopper.Stop(ctx)
testServerStream := newTestServerStream()
st := cluster.MakeTestingClusterSettings()

queueCap := int64(24)
streamID := int64(1)

RangefeedSingleBufferedSenderQueueMaxPerReg.Override(ctx, &st.SV, queueCap)
bs := NewBufferedSender(testServerStream, st, NewBufferedSenderMetrics())
smMetrics := NewStreamManagerMetrics()
sm := NewStreamManager(bs, smMetrics)
sm.RegisteringStream(streamID)

require.Equal(t, queueCap, bs.queueMu.perStreamCapacity)

val1 := roachpb.Value{RawBytes: []byte("val"), Timestamp: hlc.Timestamp{WallTime: 1}}
ev1 := new(kvpb.RangeFeedEvent)
ev1.MustSetValue(&kvpb.RangeFeedValue{Key: keyA, Value: val1})
muxEv := &kvpb.MuxRangeFeedEvent{RangeFeedEvent: *ev1, RangeID: 0, StreamID: streamID}

for range queueCap {
require.NoError(t, bs.sendBuffered(muxEv, nil))
}

// Overflow now.
ev2 := kvpb.RangeFeedEvent{Error: &kvpb.RangeFeedError{Error: *newErrBufferCapacityExceeded()}}
muxErrEvent := &kvpb.MuxRangeFeedEvent{RangeFeedEvent: ev2, RangeID: 0, StreamID: streamID}
// We admit the error event.
require.NoError(t, bs.sendBuffered(muxErrEvent, nil))
t.Logf(bs.TestingBufferSummary())

// Start the stream manager now, which will start to drain the overflowed stream.
require.NoError(t, sm.Start(ctx, stopper))
defer sm.Stop(ctx)

testServerStream.waitForEventCount(t, int(queueCap+1))
// We should see the event we sent to the stream.
require.True(t, testServerStream.hasEvent(muxErrEvent))
testServerStream.reset()

// All additional requests should be dropped, including errors because the
// first error should have moved us to overflowed.
//
// We use a second stream "flush" the buffer and assert we don't see these
// events. This assumes buffered sender orders unrelated streams.
muxEv2 := *muxEv
muxEv2.StreamID = 2
sm.RegisteringStream(2)
testServerStream.reset()
require.NoError(t, bs.sendBuffered(muxEv, nil))
require.NoError(t, bs.sendBuffered(muxErrEvent, nil))
require.NoError(t, bs.sendBuffered(&muxEv2, nil))
testServerStream.waitForEvent(t, &muxEv2)
require.Equal(t, 1, testServerStream.totalEventsSent())
}

// TestBufferedSenderOnOverflowMultiStream tests that BufferedSender and
Expand Down
Loading
Loading