Skip to content
7 changes: 7 additions & 0 deletions src/lease-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ export class LeaseManager extends EventEmitter {
* Adds a message to the inventory, kicking off the deadline extender if it
* isn't already running.
*
* @fires LeaseManager#full
*
* @param {Message} message The message.
* @private
*/
Expand Down Expand Up @@ -141,6 +143,10 @@ export class LeaseManager extends EventEmitter {
}
/**
* Removes ALL messages from inventory, and returns the ones removed.
*
* @fires LeaseManager#free
* @fires LeaseManager#empty
*
* @private
*/
clear(): Message[] {
Expand Down Expand Up @@ -197,6 +203,7 @@ export class LeaseManager extends EventEmitter {
* messages are left over.
*
* @fires LeaseManager#free
* @fires LeaseManager#empty
*
* @param {Message} message The message to remove.
* @private
Expand Down
9 changes: 5 additions & 4 deletions src/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ export class Message implements tracing.MessageWithAttributes {
* close() function.
* @property {SubscriberCloseBehavior} [options.behavior] The behavior of the close operation.
* - NackImmediately: Sends nacks for all messages held by the client library, and
* wait for them to send.
* wait for them to send. (default to match old behavior)
* - WaitForProcessing: Continues normal ack/nack and leasing processes until close
* to the timeout, then switches to NackImmediately behavior to close down.
* Use {@link SubscriberCloseBehaviors} for enum values.
Expand Down Expand Up @@ -970,9 +970,10 @@ export class Subscriber extends EventEmitter {

const options = this._options.closeOptions;

// If no behavior is specified, default to Wait.
// If no behavior is specified, default to Nack. This most closely matches
// the old behavior.
const behavior =
options?.behavior ?? SubscriberCloseBehaviors.WaitForProcessing;
options?.behavior ?? SubscriberCloseBehaviors.NackImmediately;

// The timeout can't realistically be longer than the longest time we're willing
// to lease messages.
Expand Down Expand Up @@ -1000,7 +1001,7 @@ export class Subscriber extends EventEmitter {
const shutdownStart = Date.now();
if (
behavior === SubscriberCloseBehaviors.WaitForProcessing &&
!this._inventory.isEmpty
!this._inventory.isEmpty()
) {
const waitTimeout = timeout.subtract(FINAL_NACK_TIMEOUT);

Expand Down
2 changes: 1 addition & 1 deletion test/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class FakeLeaseManager extends EventEmitter {
remove(message: s.Message): void {}

_isEmpty = true;
get isEmpty() {
isEmpty(): boolean {
return this._isEmpty;
}
}
Expand Down
Loading