@@ -8,17 +8,18 @@ package rangefeed
88import (
99 "context"
1010 "fmt"
11+ "strings"
1112 "time"
1213
1314 "github.com/cockroachdb/cockroach/pkg/kv/kvpb"
1415 "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
1516 "github.com/cockroachdb/cockroach/pkg/settings"
1617 "github.com/cockroachdb/cockroach/pkg/settings/cluster"
17- "github.com/cockroachdb/cockroach/pkg/util/buildutil"
1818 "github.com/cockroachdb/cockroach/pkg/util/retry"
1919 "github.com/cockroachdb/cockroach/pkg/util/stop"
2020 "github.com/cockroachdb/cockroach/pkg/util/syncutil"
2121 "github.com/cockroachdb/errors"
22+ "github.com/cockroachdb/redact"
2223)
2324
2425// ┌─────────────────┐
@@ -104,17 +105,45 @@ const (
104105 // streamOverflowing is the state we are in when the stream has reached its
105106 // limit and is waiting to deliver an error.
106107 streamOverflowing streamState = iota
107- // streamOverflowed means the stream has overflowed and the error has been
108- // placed in the queue.
109- streamOverflowed streamState = iota
108+ // streamErrored means an error has been enqueued for this stream and further
109+ // buffered sends will be ignored. Streams in this state will not be found in
110+ // the status map.
111+ streamErrored streamState = iota
110112)
111113
114+ func (s streamState ) String () string {
115+ switch s {
116+ case streamActive :
117+ return "active"
118+ case streamOverflowing :
119+ return "overflowing"
120+ case streamErrored :
121+ return "errored"
122+ default :
123+ return "unknown"
124+ }
125+ }
126+
127+ func (s streamState ) SafeFormat (w redact.SafePrinter , _ rune ) {
128+ w .Printf ("%s" , redact .SafeString (s .String ()))
129+ }
130+
112131type streamStatus struct {
113132 // queueItems is the number of items for a given stream in the event queue.
114133 queueItems int64
115134 state streamState
116135}
117136
137+ func (s streamStatus ) String () string {
138+ return fmt .Sprintf ("%s [queue_len:%d]" , s .state , s .queueItems )
139+ }
140+
141+ func (s streamStatus ) SafeFormat (w redact.SafePrinter , _ rune ) {
142+ w .Printf ("%s" , redact .SafeString (s .String ()))
143+ }
144+
145+ var errNoSuchStream = errors .New ("stream already encountered an error or has not be added to buffered sender" )
146+
118147func NewBufferedSender (
119148 sender ServerStreamSender , settings * cluster.Settings , bsMetrics * BufferedSenderMetrics ,
120149) * BufferedSender {
@@ -130,8 +159,12 @@ func NewBufferedSender(
130159}
131160
132161// sendBuffered buffers the event before sending it to the underlying gRPC
133- // stream. It does not block. It errors in the case of a stopped sender of if
134- // the registration has exceeded its capacity.
162+ // stream. It does not block.
163+ //
164+ // It errors in the case of a stopped sender of if the registration has exceeded
165+ // its capacity. sendBuffered with rangefeed events for streams that have
166+ // already enqueued an error event or have not been added via addStream will
167+ // return an error.
135168func (bs * BufferedSender ) sendBuffered (
136169 ev * kvpb.MuxRangeFeedEvent , alloc * SharedBudgetAllocation ,
137170) error {
@@ -145,55 +178,24 @@ func (bs *BufferedSender) sendBuffered(
145178 // request. If the stream has hit its limit, we return an error to the
146179 // registration. This error should be the next event that is sent to
147180 // stream.
148- //
149- // NB: We don't error if the stream status is not found as this may be an
150- // event for an already closed stream. Such events are possible while the
151- // registration publishes the catch up scan buffer.
152181 status , ok := bs .queueMu .byStream [ev .StreamID ]
153- if ok {
154- switch status .state {
155- case streamActive :
156- if bs .queueMu .perStreamCapacity > 0 && status .queueItems == bs .queueMu .perStreamCapacity {
157- if ev .Error != nil {
158- // If _this_ event is an error, no use sending another error. This stream
159- // is going down. Admit this error and mark the stream as overflowed.
160- status .state = streamOverflowed
161- } else {
162- // This stream is at capacity, return an error to the registration that it
163- // should send back to us after cleaning up.
164- status .state = streamOverflowing
165- return newRetryErrBufferCapacityExceeded ()
166- }
167- }
168- case streamOverflowing :
169- // The unbufferedRegistration is the only component that sends non-error
170- // events to our stream. In response to the error we return when moving to
171- // stateOverflowing, it should immediately send us an error and mark itself
172- // as disconnected.
173- //
174- // The only unfortunate exception is if we get disconnected while flushing
175- // the catch-up scan buffer. In this case we admit the event and stay in
176- // state overflowing until we actually receive the error.
177- //
178- // TODO(ssd): Given the above exception, we should perhaps just move
179- // directly to streamOverflowed. But, I think instead we want to remove
180- // that exception if possible.
181- if ev .Error != nil {
182- status .state = streamOverflowed
183- }
184- case streamOverflowed :
185- // If we are overflowed, we don't expect any further events because the
186- // registration should have disconnected in response to the error.
187- //
188- // TODO(ssd): Consider adding an assertion here.
189- return nil
190- default :
191- panic (fmt .Sprintf ("unhandled stream state: %v" , status .state ))
182+ if ! ok {
183+ return errNoSuchStream
184+ }
185+ nextState , err := bs .nextPerStreamStateLocked (status , ev )
186+ if nextState == streamErrored {
187+ // We will be admitting this event but no events after this.
188+ assertTrue (err == nil , "expected error event to be admitted" )
189+ delete (bs .queueMu .byStream , ev .StreamID )
190+ } else {
191+ if err == nil {
192+ status .queueItems ++
192193 }
193- // We are admitting this event.
194- status .queueItems ++
194+ status .state = nextState
195195 bs .queueMu .byStream [ev .StreamID ] = status
196-
196+ }
197+ if err != nil {
198+ return err
197199 }
198200
199201 // TODO(wenyihu6): pass an actual context here
@@ -207,8 +209,51 @@ func (bs *BufferedSender) sendBuffered(
207209 return nil
208210}
209211
212+ // nextPerStreamStateLocked returns the next state that should be stored for the
213+ // stream related to the given rangefeed event. If an error is returned, the
214+ // event should not be admitted and the given error should be returned to the
215+ // client.
216+ func (bs * BufferedSender ) nextPerStreamStateLocked (
217+ status streamStatus , ev * kvpb.MuxRangeFeedEvent ,
218+ ) (streamState , error ) {
219+ // An error should always put us into stateErrored, so let's do that first.
220+ if ev .Error != nil {
221+ if status .state == streamErrored {
222+ assumedUnreachable ("unexpected buffered event on stream in state streamErrored" )
223+ }
224+ return streamErrored , nil
225+ }
226+
227+ switch status .state {
228+ case streamActive :
229+ if bs .queueMu .perStreamCapacity > 0 && status .queueItems == bs .queueMu .perStreamCapacity {
230+ // This stream is at capacity, return an error to the registration that it
231+ // should send back to us after cleaning up.
232+ return streamOverflowing , newRetryErrBufferCapacityExceeded ()
233+ }
234+ // Happy path.
235+ return streamActive , nil
236+ case streamOverflowing :
237+ // The only place we do concurrent buffered sends is during catch-up scan
238+ // publishing which may be concurrent with a disconnect. The catch-up scan
239+ // will stop publishing if it receives an error and try to send an error
240+ // back. A disconnect only sends an error. This path exclusively handles
241+ // non-errors.
242+ assumedUnreachable ("unexpected buffered event on stream in state streamOverflowing" )
243+ return streamOverflowing , newRetryErrBufferCapacityExceeded ()
244+ case streamErrored :
245+ // This is unexpected because streamErrored streams are removed from the
246+ // status map and thus should be handled in sendBuffered before this
247+ // function is called.
248+ assumedUnreachable ("unexpected buffered event on stream in state streamErrored" )
249+ return streamErrored , errNoSuchStream
250+ default :
251+ panic (fmt .Sprintf ("unhandled stream state: %v" , status .state ))
252+ }
253+ }
254+
210255// sendUnbuffered sends the event directly to the underlying
211- // ServerStreamSender. It bypasses the buffer and thus may block.
256+ // ServerStreamSender. It bypasses the buffer and thus may block.
212257func (bs * BufferedSender ) sendUnbuffered (ev * kvpb.MuxRangeFeedEvent ) error {
213258 return bs .sender .Send (ev )
214259}
@@ -273,24 +318,10 @@ func (bs *BufferedSender) addStream(streamID int64) {
273318 if _ , ok := bs .queueMu .byStream [streamID ]; ! ok {
274319 bs .queueMu .byStream [streamID ] = streamStatus {}
275320 } else {
276- if buildutil .CrdbTestBuild {
277- panic (fmt .Sprintf ("stream %d already exists in buffered sender" , streamID ))
278- }
321+ assumedUnreachable (fmt .Sprintf ("stream %d already exists in buffered sender" , streamID ))
279322 }
280323}
281324
282- // removeStream removes the per-stream state tracking from the sender.
283- //
284- // TODO(ssd): There may be items still in the queue when removeStream is called.
285- // We'd like to solve this by removing this as a possibility. But this is OK
286- // since we will eventually process the events and the client knows to ignore
287- // them.
288- func (bs * BufferedSender ) removeStream (streamID int64 ) {
289- bs .queueMu .Lock ()
290- defer bs .queueMu .Unlock ()
291- delete (bs .queueMu .byStream , streamID )
292- }
293-
294325// cleanup is called when the sender is stopped. It is expected to free up
295326// buffer queue and no new events should be buffered after this.
296327func (bs * BufferedSender ) cleanup (ctx context.Context ) {
@@ -309,6 +340,18 @@ func (bs *BufferedSender) len() int {
309340 return int (bs .queueMu .buffer .len ())
310341}
311342
343+ func (bs * BufferedSender ) TestingBufferSummary () string {
344+ bs .queueMu .Lock ()
345+ defer bs .queueMu .Unlock ()
346+
347+ summary := & strings.Builder {}
348+ fmt .Fprintf (summary , "buffered sender: queue_len=%d streams=%d" , bs .queueMu .buffer .len (), len (bs .queueMu .byStream ))
349+ for id , stream := range bs .queueMu .byStream {
350+ fmt .Fprintf (summary , "\n %d: %s" , id , stream )
351+ }
352+ return summary .String ()
353+ }
354+
312355// Used for testing only.
313356func (bs * BufferedSender ) waitForEmptyBuffer (ctx context.Context ) error {
314357 opts := retry.Options {
0 commit comments