Skip to content

Commit f1f351f

Browse files
committed
Fix handling of messages that overflow the buffer limit
1 parent 47ba357 commit f1f351f

File tree

1 file changed

+49
-15
lines changed

1 file changed

+49
-15
lines changed

packages/grpc-js/src/retrying-call.ts

Lines changed: 49 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,12 @@ interface WriteBufferEntry {
135135
* state.
136136
*/
137137
callback?: WriteCallback;
138+
/**
139+
* Indicates whether the message is allocated in the buffer tracker. Ignored
140+
* if entryType is not MESSAGE. Should be the return value of
141+
* bufferTracker.allocate.
142+
*/
143+
allocated: boolean;
138144
}
139145

140146
const PREVIONS_RPC_ATTEMPTS_METADATA_KEY = 'grpc-previous-rpc-attempts';
@@ -216,6 +222,22 @@ export class RetryingCall implements Call {
216222
}
217223
}
218224

225+
private maybefreeMessageBufferEntry(messageIndex: number) {
226+
if (this.state !== 'COMMITTED') {
227+
return;
228+
}
229+
const bufferEntry = this.writeBuffer[messageIndex];
230+
if (bufferEntry.entryType === 'MESSAGE') {
231+
if (bufferEntry.allocated) {
232+
this.bufferTracker.free(bufferEntry.message!.message.length, this.callNumber);
233+
}
234+
this.writeBuffer[messageIndex] = {
235+
entryType: 'FREED',
236+
allocated: false
237+
};
238+
}
239+
}
240+
219241
private commitCall(index: number) {
220242
if (this.state === 'COMMITTED') {
221243
return;
@@ -237,13 +259,7 @@ export class RetryingCall implements Call {
237259
this.underlyingCalls[i].call.cancelWithStatus(Status.CANCELLED, 'Discarded in favor of other hedged attempt');
238260
}
239261
for (let messageIndex = 0; messageIndex < this.underlyingCalls[index].nextMessageToSend - 1; messageIndex += 1) {
240-
const bufferEntry = this.writeBuffer[messageIndex];
241-
if (bufferEntry.entryType === 'MESSAGE') {
242-
this.bufferTracker.free(bufferEntry.message!.message.length, this.callNumber);
243-
this.writeBuffer[messageIndex] = {
244-
entryType: 'FREED'
245-
};
246-
}
262+
this.maybefreeMessageBufferEntry(messageIndex);
247263
}
248264
}
249265

@@ -513,6 +529,15 @@ export class RetryingCall implements Call {
513529
this.maybeStartHedgingTimer();
514530
}
515531

532+
private handleChildWriteCompleted(childIndex: number) {
533+
const childCall = this.underlyingCalls[childIndex];
534+
const messageIndex = childCall.nextMessageToSend;
535+
this.writeBuffer[messageIndex].callback?.();
536+
this.maybefreeMessageBufferEntry(messageIndex);
537+
childCall.nextMessageToSend += 1;
538+
this.sendNextChildMessage(childIndex);
539+
}
540+
516541
private sendNextChildMessage(childIndex: number) {
517542
const childCall = this.underlyingCalls[childIndex];
518543
if (childCall.state === 'COMPLETED') {
@@ -525,8 +550,7 @@ export class RetryingCall implements Call {
525550
childCall.call.sendMessageWithContext({
526551
callback: (error) => {
527552
// Ignore error
528-
childCall.nextMessageToSend += 1;
529-
this.sendNextChildMessage(childIndex);
553+
this.handleChildWriteCompleted(childIndex);
530554
}
531555
}, bufferEntry.message!.message);
532556
break;
@@ -550,25 +574,34 @@ export class RetryingCall implements Call {
550574
const messageIndex = this.writeBuffer.length;
551575
const bufferEntry: WriteBufferEntry = {
552576
entryType: 'MESSAGE',
553-
message: writeObj
577+
message: writeObj,
578+
allocated: this.bufferTracker.allocate(message.length, this.callNumber)
554579
};
555580
this.writeBuffer[messageIndex] = bufferEntry;
556-
if (this.bufferTracker.allocate(message.length, this.callNumber)) {
581+
if (bufferEntry.allocated) {
557582
context.callback?.();
558583
for (const [callIndex, call] of this.underlyingCalls.entries()) {
559584
if (call.state === 'ACTIVE' && call.nextMessageToSend === messageIndex) {
560585
call.call.sendMessageWithContext({
561586
callback: (error) => {
562587
// Ignore error
563-
call.nextMessageToSend += 1;
564-
this.sendNextChildMessage(callIndex);
588+
this.handleChildWriteCompleted(callIndex);
565589
}
566590
}, message);
567591
}
568592
}
569593
} else {
570594
this.commitCallWithMostMessages();
571-
bufferEntry.callback = context.callback;
595+
const call = this.underlyingCalls[this.committedCallIndex!];
596+
bufferEntry.callback = context.callback;
597+
if (call.state === 'ACTIVE' && call.nextMessageToSend === messageIndex) {
598+
call.call.sendMessageWithContext({
599+
callback: (error) => {
600+
// Ignore error
601+
this.handleChildWriteCompleted(this.committedCallIndex!);
602+
}
603+
}, message);
604+
}
572605
}
573606
}
574607
startRead(): void {
@@ -584,7 +617,8 @@ export class RetryingCall implements Call {
584617
this.trace('halfClose called');
585618
const halfCloseIndex = this.writeBuffer.length;
586619
this.writeBuffer[halfCloseIndex] = {
587-
entryType: 'HALF_CLOSE'
620+
entryType: 'HALF_CLOSE',
621+
allocated: false
588622
};
589623
for (const call of this.underlyingCalls) {
590624
if (call?.state === 'ACTIVE' && call.nextMessageToSend === halfCloseIndex) {

0 commit comments

Comments
 (0)