From f17f15aae0cc60ab55395f216723f89092058162 Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Wed, 29 Oct 2025 12:33:02 +0000 Subject: [PATCH 1/2] rangefeed: drop messages for overflowed streams This PR fixes some bugs discovered when trying to increase the test coverage of the buffered sender unit tests: - A stream would actually never move into the "overflowing" state. Rather it would stay in the "active" state and keep returning a buffer capacity error until the registration sent the error back to us, in which case it would go straight to overflowed. - This was papering over another bug in which the overflowing state could produce an invalid stream of events. Here, I add test coverage for the expected state transitions and fix these bugs by: - Dropping buffered messages for streams not in the stream map. This works because the stream will be in the stream map from the point of the RegisteringStream call and is only removed when the stream finally sends the error. Thus any message for a stream not in map represents either (1) a message sent to a StreamID before RegisteringStream has been called or (2) a message sent to a StreamID after an Error. (1) would be a programming error and (2) would be messages that are dropped server-side anyway. - Re-organizing the code such that it is harder to miss saving the stream status back to the status map. Note any potential bug here only existed on master and thus no release note is needed. Epic: CRDB-55217 Release note: None --- pkg/kv/kvserver/rangefeed/BUILD.bazel | 1 + pkg/kv/kvserver/rangefeed/buffered_sender.go | 171 ++++++++++++------ .../rangefeed/buffered_sender_test.go | 109 ++++++++++- .../rangefeed/unbuffered_registration.go | 7 + 4 files changed, 235 insertions(+), 53 deletions(-) diff --git a/pkg/kv/kvserver/rangefeed/BUILD.bazel b/pkg/kv/kvserver/rangefeed/BUILD.bazel index 82f73c5d5bb5..9ad1874fbb8d 100644 --- a/pkg/kv/kvserver/rangefeed/BUILD.bazel +++ b/pkg/kv/kvserver/rangefeed/BUILD.bazel @@ -59,6 +59,7 @@ go_library( "//pkg/util/uuid", "@com_github_cockroachdb_crlib//crtime", "@com_github_cockroachdb_errors//:errors", + "@com_github_cockroachdb_redact//:redact", ], ) diff --git a/pkg/kv/kvserver/rangefeed/buffered_sender.go b/pkg/kv/kvserver/rangefeed/buffered_sender.go index b63b418fc157..07a222fd2158 100644 --- a/pkg/kv/kvserver/rangefeed/buffered_sender.go +++ b/pkg/kv/kvserver/rangefeed/buffered_sender.go @@ -8,17 +8,18 @@ package rangefeed import ( "context" "fmt" + "strings" "time" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" + "github.com/cockroachdb/redact" ) // ┌─────────────────┐ @@ -109,12 +110,37 @@ const ( streamOverflowed streamState = iota ) +func (s streamState) String() string { + switch s { + case streamActive: + return "active" + case streamOverflowing: + return "overflowing" + case streamOverflowed: + return "overflowed" + default: + return "unknown" + } +} + +func (s streamState) SafeFormat(w redact.SafePrinter, _ rune) { + w.Printf("%s", redact.SafeString(s.String())) +} + type streamStatus struct { // queueItems is the number of items for a given stream in the event queue. queueItems int64 state streamState } +func (s streamStatus) String() string { + return fmt.Sprintf("%s [queue_len:%d]", s.state, s.queueItems) +} + +func (s streamStatus) SafeFormat(w redact.SafePrinter, _ rune) { + w.Printf("%s", redact.SafeString(s.String())) +} + func NewBufferedSender( sender ServerStreamSender, settings *cluster.Settings, bsMetrics *BufferedSenderMetrics, ) *BufferedSender { @@ -145,55 +171,23 @@ func (bs *BufferedSender) sendBuffered( // request. If the stream has hit its limit, we return an error to the // registration. This error should be the next event that is sent to // stream. - // - // NB: We don't error if the stream status is not found as this may be an - // event for an already closed stream. Such events are possible while the - // registration publishes the catch up scan buffer. status, ok := bs.queueMu.byStream[ev.StreamID] - if ok { - switch status.state { - case streamActive: - if bs.queueMu.perStreamCapacity > 0 && status.queueItems == bs.queueMu.perStreamCapacity { - if ev.Error != nil { - // If _this_ event is an error, no use sending another error. This stream - // is going down. Admit this error and mark the stream as overflowed. - status.state = streamOverflowed - } else { - // This stream is at capacity, return an error to the registration that it - // should send back to us after cleaning up. - status.state = streamOverflowing - return newRetryErrBufferCapacityExceeded() - } - } - case streamOverflowing: - // The unbufferedRegistration is the only component that sends non-error - // events to our stream. In response to the error we return when moving to - // stateOverflowing, it should immediately send us an error and mark itself - // as disconnected. - // - // The only unfortunate exception is if we get disconnected while flushing - // the catch-up scan buffer. In this case we admit the event and stay in - // state overflowing until we actually receive the error. - // - // TODO(ssd): Given the above exception, we should perhaps just move - // directly to streamOverflowed. But, I think instead we want to remove - // that exception if possible. - if ev.Error != nil { - status.state = streamOverflowed - } - case streamOverflowed: - // If we are overflowed, we don't expect any further events because the - // registration should have disconnected in response to the error. - // - // TODO(ssd): Consider adding an assertion here. - return nil - default: - panic(fmt.Sprintf("unhandled stream state: %v", status.state)) - } - // We are admitting this event. + if !ok { + // We don't error if the stream status is not found as this may be an + // event for an already closed stream. Such events are possible while the + // registration publishes the catch up scan buffer. + // + // The client ignores such events, but we drop it here regardless. + return nil + } + nextState, shouldAdmit, err := bs.nextPerQueueStateLocked(status, ev) + status.state = nextState + if shouldAdmit { status.queueItems++ - bs.queueMu.byStream[ev.StreamID] = status - + } + bs.queueMu.byStream[ev.StreamID] = status + if err != nil || !shouldAdmit { + return err } // TODO(wenyihu6): pass an actual context here @@ -207,8 +201,71 @@ func (bs *BufferedSender) sendBuffered( return nil } +func (bs *BufferedSender) nextPerQueueStateLocked( + status streamStatus, ev *kvpb.MuxRangeFeedEvent, +) (streamState, bool, error) { + switch status.state { + case streamActive: + if bs.queueMu.perStreamCapacity > 0 && status.queueItems == bs.queueMu.perStreamCapacity { + if ev.Error != nil { + // If _this_ event is an error, no use sending another error. This stream + // is going down. Admit this error and mark the stream as overflowed. + return streamOverflowed, true, nil + } else { + // This stream is at capacity, return an error to the registration that it + // should send back to us after cleaning up. + return streamOverflowing, false, newRetryErrBufferCapacityExceeded() + } + } + // Happy path. + return streamActive, true, nil + case streamOverflowing: + // The unbufferedRegistration is the only component that sends non-error + // events to our stream. In response to the error we return when moving to + // stateOverflowing, it should immediately send us an error and mark itself + // as disconnected. + // + // The only unfortunate exception is if we get disconnected while flushing + // the catch-up scan buffer. In this case we admit the event and stay in + // state overflowing until we actually receive the error. + // + // TODO(ssd): Given the above exception, we should perhaps just move + // directly to streamOverflowed. But, I think instead we want to remove + // that exception if possible. + if ev.Error != nil { + return streamOverflowed, true, nil + } else { + // Drop everything but error events. We didn't admit the event that put + // us into the overflowing state so we don't want to admit any later + // non-error events. + return streamOverflowing, false, nil + } + case streamOverflowed: + // TODO(ssd): It would be nice to be able to put this assertion here: + // + // assumedUnreachable("event on overflowed stream") + // + // But, it isn't yet possible because of the following: + // + // 1. Unbuffered Registration is publishing the catch-up buffer while not + // holding its lock. + // + // 2. An another error comes in and that error moves us to the overflowed + // state. + // + // 3. The unbuffered registration publishes another message from the + // catch-up buffer. + // + // One way to fix this is to remove the stream from the stream status map + // when we admit an error. We leave that to a future PR. + return streamOverflowed, false, nil + default: + panic(fmt.Sprintf("unhandled stream state: %v", status.state)) + } +} + // sendUnbuffered sends the event directly to the underlying -// ServerStreamSender. It bypasses the buffer and thus may block. +// ServerStreamSender. It bypasses the buffer and thus may block. func (bs *BufferedSender) sendUnbuffered(ev *kvpb.MuxRangeFeedEvent) error { return bs.sender.Send(ev) } @@ -273,9 +330,7 @@ func (bs *BufferedSender) addStream(streamID int64) { if _, ok := bs.queueMu.byStream[streamID]; !ok { bs.queueMu.byStream[streamID] = streamStatus{} } else { - if buildutil.CrdbTestBuild { - panic(fmt.Sprintf("stream %d already exists in buffered sender", streamID)) - } + assumedUnreachable(fmt.Sprintf("stream %d already exists in buffered sender", streamID)) } } @@ -309,6 +364,18 @@ func (bs *BufferedSender) len() int { return int(bs.queueMu.buffer.len()) } +func (bs *BufferedSender) TestingBufferSummary() string { + bs.queueMu.Lock() + defer bs.queueMu.Unlock() + + summary := &strings.Builder{} + fmt.Fprintf(summary, "buffered sender: queue_len=%d streams=%d", bs.queueMu.buffer.len(), len(bs.queueMu.byStream)) + for id, stream := range bs.queueMu.byStream { + fmt.Fprintf(summary, "\n %d: %s", id, stream) + } + return summary.String() +} + // Used for testing only. func (bs *BufferedSender) waitForEmptyBuffer(ctx context.Context) error { opts := retry.Options{ diff --git a/pkg/kv/kvserver/rangefeed/buffered_sender_test.go b/pkg/kv/kvserver/rangefeed/buffered_sender_test.go index 50ff548b9263..14d72d638ee3 100644 --- a/pkg/kv/kvserver/rangefeed/buffered_sender_test.go +++ b/pkg/kv/kvserver/rangefeed/buffered_sender_test.go @@ -48,6 +48,7 @@ func TestBufferedSenderDisconnectStream(t *testing.T) { t.Run("basic operation", func(t *testing.T) { var num atomic.Int32 + sm.RegisteringStream(streamID) sm.AddStream(int64(streamID), &cancelCtxDisconnector{ cancel: func() { num.Add(1) @@ -64,6 +65,7 @@ func TestBufferedSenderDisconnectStream(t *testing.T) { testServerStream.reset() }) t.Run("disconnect stream on the same stream is idempotent", func(t *testing.T) { + sm.RegisteringStream(streamID) sm.AddStream(int64(streamID), &cancelCtxDisconnector{ cancel: func() { require.NoError(t, sm.sender.sendBuffered(errEvent, nil)) @@ -113,6 +115,7 @@ func TestBufferedSenderChaosWithStop(t *testing.T) { require.LessOrEqualf(t, activeStreamStart, activeStreamEnd, "test programming error") if addStream || activeStreamStart == activeStreamEnd { streamID := activeStreamEnd + sm.RegisteringStream(streamID) sm.AddStream(streamID, &cancelCtxDisconnector{ cancel: func() { actualSum.Add(1) @@ -186,7 +189,10 @@ func TestBufferedSenderOnOverflow(t *testing.T) { RangefeedSingleBufferedSenderQueueMaxPerReg.Override(ctx, &st.SV, queueCap) bs := NewBufferedSender(testServerStream, st, NewBufferedSenderMetrics()) - bs.addStream(streamID) + smMetrics := NewStreamManagerMetrics() + sm := NewStreamManager(bs, smMetrics) + sm.RegisteringStream(streamID) + require.Equal(t, queueCap, bs.queueMu.perStreamCapacity) val1 := roachpb.Value{RawBytes: []byte("val"), Timestamp: hlc.Timestamp{WallTime: 1}} @@ -209,8 +215,109 @@ func TestBufferedSenderOnOverflow(t *testing.T) { require.Equal(t, queueCap, int64(bs.len())) // Overflow now. + t.Logf(bs.TestingBufferSummary()) err := bs.sendBuffered(muxEv, nil) require.EqualError(t, err, newRetryErrBufferCapacityExceeded().Error()) + t.Logf(bs.TestingBufferSummary()) + + // Start the stream manager now, which will start to drain the overflowed stream. + require.NoError(t, sm.Start(ctx, stopper)) + defer sm.Stop(ctx) + + testServerStream.waitForEventCount(t, int(queueCap)) + testServerStream.reset() + + // The stream should now be in state overflowing. Any non-error events are + // silently dropped, but the next error event is delivered to the client. + ev2 := kvpb.RangeFeedEvent{Error: &kvpb.RangeFeedError{Error: *newErrBufferCapacityExceeded()}} + muxErrEvent := &kvpb.MuxRangeFeedEvent{RangeFeedEvent: ev2, RangeID: 0, StreamID: streamID} + + t.Logf(bs.TestingBufferSummary()) + require.NoError(t, bs.sendBuffered(muxEv, nil)) + require.NoError(t, bs.sendBuffered(muxErrEvent, nil)) + t.Logf(bs.TestingBufferSummary()) + + testServerStream.waitForEvent(t, muxErrEvent) + require.Equal(t, 1, testServerStream.totalEventsSent()) + + // Once the error event has been delivered to the client, all additional + // requests should be dropped. + // + // We use a second stream "flush" the buffer and assert we don't see these + // events. This assumes buffered sender orders unrelated streams. + muxEv2 := *muxEv + muxEv2.StreamID = 2 + sm.RegisteringStream(2) + testServerStream.reset() + require.NoError(t, bs.sendBuffered(muxEv, nil)) + require.NoError(t, bs.sendBuffered(muxErrEvent, nil)) + require.NoError(t, bs.sendBuffered(&muxEv2, nil)) + testServerStream.waitForEvent(t, &muxEv2) + require.Equal(t, 1, testServerStream.totalEventsSent()) +} + +// TestBufferedSenderOnOverflowWithErrorEvent tests the special case of a stream +// hitting an overflow on an event that is itself an error. +func TestBufferedSenderOnOverflowWithErrorEvent(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + testServerStream := newTestServerStream() + st := cluster.MakeTestingClusterSettings() + + queueCap := int64(24) + streamID := int64(1) + + RangefeedSingleBufferedSenderQueueMaxPerReg.Override(ctx, &st.SV, queueCap) + bs := NewBufferedSender(testServerStream, st, NewBufferedSenderMetrics()) + smMetrics := NewStreamManagerMetrics() + sm := NewStreamManager(bs, smMetrics) + sm.RegisteringStream(streamID) + + require.Equal(t, queueCap, bs.queueMu.perStreamCapacity) + + val1 := roachpb.Value{RawBytes: []byte("val"), Timestamp: hlc.Timestamp{WallTime: 1}} + ev1 := new(kvpb.RangeFeedEvent) + ev1.MustSetValue(&kvpb.RangeFeedValue{Key: keyA, Value: val1}) + muxEv := &kvpb.MuxRangeFeedEvent{RangeFeedEvent: *ev1, RangeID: 0, StreamID: streamID} + + for range queueCap { + require.NoError(t, bs.sendBuffered(muxEv, nil)) + } + + // Overflow now. + ev2 := kvpb.RangeFeedEvent{Error: &kvpb.RangeFeedError{Error: *newErrBufferCapacityExceeded()}} + muxErrEvent := &kvpb.MuxRangeFeedEvent{RangeFeedEvent: ev2, RangeID: 0, StreamID: streamID} + // We admit the error event. + require.NoError(t, bs.sendBuffered(muxErrEvent, nil)) + t.Logf(bs.TestingBufferSummary()) + + // Start the stream manager now, which will start to drain the overflowed stream. + require.NoError(t, sm.Start(ctx, stopper)) + defer sm.Stop(ctx) + + testServerStream.waitForEventCount(t, int(queueCap+1)) + // We should see the event we sent to the stream. + require.True(t, testServerStream.hasEvent(muxErrEvent)) + testServerStream.reset() + + // All additional requests should be dropped, including errors because the + // first error should have moved us to overflowed. + // + // We use a second stream "flush" the buffer and assert we don't see these + // events. This assumes buffered sender orders unrelated streams. + muxEv2 := *muxEv + muxEv2.StreamID = 2 + sm.RegisteringStream(2) + testServerStream.reset() + require.NoError(t, bs.sendBuffered(muxEv, nil)) + require.NoError(t, bs.sendBuffered(muxErrEvent, nil)) + require.NoError(t, bs.sendBuffered(&muxEv2, nil)) + testServerStream.waitForEvent(t, &muxEv2) + require.Equal(t, 1, testServerStream.totalEventsSent()) } // TestBufferedSenderOnOverflowMultiStream tests that BufferedSender and diff --git a/pkg/kv/kvserver/rangefeed/unbuffered_registration.go b/pkg/kv/kvserver/rangefeed/unbuffered_registration.go index 5db40ea77077..51a78060a76b 100644 --- a/pkg/kv/kvserver/rangefeed/unbuffered_registration.go +++ b/pkg/kv/kvserver/rangefeed/unbuffered_registration.go @@ -7,6 +7,7 @@ package rangefeed import ( "context" + "fmt" "time" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" @@ -446,3 +447,9 @@ func assertTrue(cond bool, msg string) { panic(msg) } } + +func assumedUnreachable(msg string) { + if buildutil.CrdbTestBuild { + panic(fmt.Sprintf("assumed unreachable code reached: %s", msg)) + } +} From 239a0773e7e37f57d11073f4c90b9d61f359ce70 Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Wed, 29 Oct 2025 14:30:13 +0000 Subject: [PATCH 2/2] rangefeed: reject all buffered sends after error This changes the flow control in the buffered sender to allow us to immediately start dropping buffered sends after the first error is enqueued. This simplifies the code, makes it easier to write assertions about message ordering, and avoids cases where we have unaccounted for memory used by messages that were in the queue after an error. Epic: CRDB-55217 Release note: None --- pkg/kv/kvserver/rangefeed/buffered_sender.go | 101 ++++++------------ .../rangefeed/buffered_sender_test.go | 9 +- pkg/kv/kvserver/rangefeed/stream_manager.go | 30 +++--- .../kvserver/rangefeed/unbuffered_sender.go | 3 - 4 files changed, 53 insertions(+), 90 deletions(-) diff --git a/pkg/kv/kvserver/rangefeed/buffered_sender.go b/pkg/kv/kvserver/rangefeed/buffered_sender.go index 07a222fd2158..bb6385a584ef 100644 --- a/pkg/kv/kvserver/rangefeed/buffered_sender.go +++ b/pkg/kv/kvserver/rangefeed/buffered_sender.go @@ -105,9 +105,10 @@ const ( // streamOverflowing is the state we are in when the stream has reached its // limit and is waiting to deliver an error. streamOverflowing streamState = iota - // streamOverflowed means the stream has overflowed and the error has been - // placed in the queue. - streamOverflowed streamState = iota + // streamErrored means an error has been enqueued for this stream and further + // buffered sends will be ignored. Streams in this state will not be found in + // the status map. + streamErrored streamState = iota ) func (s streamState) String() string { @@ -116,8 +117,8 @@ func (s streamState) String() string { return "active" case streamOverflowing: return "overflowing" - case streamOverflowed: - return "overflowed" + case streamErrored: + return "errored" default: return "unknown" } @@ -185,7 +186,12 @@ func (bs *BufferedSender) sendBuffered( if shouldAdmit { status.queueItems++ } - bs.queueMu.byStream[ev.StreamID] = status + if nextState == streamErrored { + delete(bs.queueMu.byStream, ev.StreamID) + } else { + bs.queueMu.byStream[ev.StreamID] = status + } + if err != nil || !shouldAdmit { return err } @@ -204,61 +210,36 @@ func (bs *BufferedSender) sendBuffered( func (bs *BufferedSender) nextPerQueueStateLocked( status streamStatus, ev *kvpb.MuxRangeFeedEvent, ) (streamState, bool, error) { + // An error should always put us into stateErrored, so let's do that first. + if ev.Error != nil { + if status.state == streamErrored { + assumedUnreachable("unexpected buffered event on stream in state streamErrored") + } + return streamErrored, true, nil + } + switch status.state { case streamActive: if bs.queueMu.perStreamCapacity > 0 && status.queueItems == bs.queueMu.perStreamCapacity { - if ev.Error != nil { - // If _this_ event is an error, no use sending another error. This stream - // is going down. Admit this error and mark the stream as overflowed. - return streamOverflowed, true, nil - } else { - // This stream is at capacity, return an error to the registration that it - // should send back to us after cleaning up. - return streamOverflowing, false, newRetryErrBufferCapacityExceeded() - } + // This stream is at capacity, return an error to the registration that it + // should send back to us after cleaning up. + return streamOverflowing, false, newRetryErrBufferCapacityExceeded() } // Happy path. return streamActive, true, nil case streamOverflowing: - // The unbufferedRegistration is the only component that sends non-error - // events to our stream. In response to the error we return when moving to - // stateOverflowing, it should immediately send us an error and mark itself - // as disconnected. - // - // The only unfortunate exception is if we get disconnected while flushing - // the catch-up scan buffer. In this case we admit the event and stay in - // state overflowing until we actually receive the error. - // - // TODO(ssd): Given the above exception, we should perhaps just move - // directly to streamOverflowed. But, I think instead we want to remove - // that exception if possible. - if ev.Error != nil { - return streamOverflowed, true, nil - } else { - // Drop everything but error events. We didn't admit the event that put - // us into the overflowing state so we don't want to admit any later - // non-error events. - return streamOverflowing, false, nil - } - case streamOverflowed: - // TODO(ssd): It would be nice to be able to put this assertion here: - // - // assumedUnreachable("event on overflowed stream") - // - // But, it isn't yet possible because of the following: - // - // 1. Unbuffered Registration is publishing the catch-up buffer while not - // holding its lock. - // - // 2. An another error comes in and that error moves us to the overflowed - // state. - // - // 3. The unbuffered registration publishes another message from the - // catch-up buffer. - // - // One way to fix this is to remove the stream from the stream status map - // when we admit an error. We leave that to a future PR. - return streamOverflowed, false, nil + // The only place we do concurrent buffered sends is during catch-up scan + // publishing which may be concurrent with a disconnect. The catch-up scan + // will stop publishing if it receives an error and try to send an error + // back. A disconnect only sends an error. This path exclusively handles non + // errors. + assumedUnreachable("unexpected buffered event on stream in state streamOverflowing") + return streamOverflowing, false, nil + case streamErrored: + // This is unexpected because streamErrored streams are removed from the + // status map and future sends are ignored. + assumedUnreachable("unexpected buffered event on stream in state streamErrored") + return streamErrored, false, nil default: panic(fmt.Sprintf("unhandled stream state: %v", status.state)) } @@ -334,18 +315,6 @@ func (bs *BufferedSender) addStream(streamID int64) { } } -// removeStream removes the per-stream state tracking from the sender. -// -// TODO(ssd): There may be items still in the queue when removeStream is called. -// We'd like to solve this by removing this as a possibility. But this is OK -// since we will eventually process the events and the client knows to ignore -// them. -func (bs *BufferedSender) removeStream(streamID int64) { - bs.queueMu.Lock() - defer bs.queueMu.Unlock() - delete(bs.queueMu.byStream, streamID) -} - // cleanup is called when the sender is stopped. It is expected to free up // buffer queue and no new events should be buffered after this. func (bs *BufferedSender) cleanup(ctx context.Context) { diff --git a/pkg/kv/kvserver/rangefeed/buffered_sender_test.go b/pkg/kv/kvserver/rangefeed/buffered_sender_test.go index 14d72d638ee3..91a8dd2b5334 100644 --- a/pkg/kv/kvserver/rangefeed/buffered_sender_test.go +++ b/pkg/kv/kvserver/rangefeed/buffered_sender_test.go @@ -214,8 +214,8 @@ func TestBufferedSenderOnOverflow(t *testing.T) { require.NoError(t, bs.sendBuffered(muxEv, nil)) require.Equal(t, queueCap, int64(bs.len())) - // Overflow now. t.Logf(bs.TestingBufferSummary()) + // Overflow now. err := bs.sendBuffered(muxEv, nil) require.EqualError(t, err, newRetryErrBufferCapacityExceeded().Error()) t.Logf(bs.TestingBufferSummary()) @@ -227,14 +227,15 @@ func TestBufferedSenderOnOverflow(t *testing.T) { testServerStream.waitForEventCount(t, int(queueCap)) testServerStream.reset() - // The stream should now be in state overflowing. Any non-error events are - // silently dropped, but the next error event is delivered to the client. + // The stream should now be in state overflowing. The next error event will + // remove the stream from our state map. Any subsequent events will be + // dropped. ev2 := kvpb.RangeFeedEvent{Error: &kvpb.RangeFeedError{Error: *newErrBufferCapacityExceeded()}} muxErrEvent := &kvpb.MuxRangeFeedEvent{RangeFeedEvent: ev2, RangeID: 0, StreamID: streamID} t.Logf(bs.TestingBufferSummary()) - require.NoError(t, bs.sendBuffered(muxEv, nil)) require.NoError(t, bs.sendBuffered(muxErrEvent, nil)) + require.NoError(t, bs.sendBuffered(muxEv, nil)) t.Logf(bs.TestingBufferSummary()) testServerStream.waitForEvent(t, muxErrEvent) diff --git a/pkg/kv/kvserver/rangefeed/stream_manager.go b/pkg/kv/kvserver/rangefeed/stream_manager.go index 5e761c2c62ff..a3f5d57396bd 100644 --- a/pkg/kv/kvserver/rangefeed/stream_manager.go +++ b/pkg/kv/kvserver/rangefeed/stream_manager.go @@ -76,14 +76,13 @@ type sender interface { // all streams in StreamManager. run(ctx context.Context, stopper *stop.Stopper, onError func(int64)) error - // TODO(ssd): These two methods call into question whether StreamManager and - // sender can really be separate. We might consider combining the two for - // simplicity. + // TODO(ssd): This method calls into question whether StreamManager and sender + // can really be separate. We might consider combining the two for simplicity. // - // addStream is called when an individual stream is being added. + // addStream is called when an individual stream is being added. The sender is + // free to remove the stream once an error has been sent/enqueued to the given + // streamID. addStream(streamID int64) - // removeStream is called when an individual stream is being removed. - removeStream(streamID int64) // cleanup is called when the sender is stopped. It is expected to clean up // any resources used by the sender. @@ -118,17 +117,14 @@ func (sm *StreamManager) NewStream(streamID int64, rangeID roachpb.RangeID) (sin // streamID to avoid metrics inaccuracy when the error is sent before the stream // is added to the StreamManager. func (sm *StreamManager) OnError(streamID int64) { - func() { - sm.streams.Lock() - defer sm.streams.Unlock() - if d, ok := sm.streams.m[streamID]; ok { - assertTrue(d.IsDisconnected(), "OnError called on connected registration") - d.Unregister() - delete(sm.streams.m, streamID) - sm.metrics.ActiveMuxRangeFeed.Dec(1) - } - }() - sm.sender.removeStream(streamID) + sm.streams.Lock() + defer sm.streams.Unlock() + if d, ok := sm.streams.m[streamID]; ok { + assertTrue(d.IsDisconnected(), "OnError called on connected registration") + d.Unregister() + delete(sm.streams.m, streamID) + sm.metrics.ActiveMuxRangeFeed.Dec(1) + } } // DisconnectStream disconnects the stream with the given streamID. diff --git a/pkg/kv/kvserver/rangefeed/unbuffered_sender.go b/pkg/kv/kvserver/rangefeed/unbuffered_sender.go index 3899a035e531..1086cb91e33f 100644 --- a/pkg/kv/kvserver/rangefeed/unbuffered_sender.go +++ b/pkg/kv/kvserver/rangefeed/unbuffered_sender.go @@ -135,9 +135,6 @@ func (ubs *UnbufferedSender) sendUnbuffered(event *kvpb.MuxRangeFeedEvent) error // addStream implements sender. func (ubs *UnbufferedSender) addStream(int64) {} -// removeStream implements sender. -func (ubs *UnbufferedSender) removeStream(int64) {} - // cleanup implements sender. func (ubs *UnbufferedSender) cleanup(context.Context) {}