@@ -127,6 +127,9 @@ import { headers as createHeaders } from "@nats-io/nats-core";
127
127
import type { MsgHdrs } from "@nats-io/nats-core" ;
128
128
import type { ValueType } from "@cocalc/nats/types" ;
129
129
import { isConnected , waitUntilConnected } from "@cocalc/nats/util" ;
130
+ import { ENFORCE_LIMITS_THROTTLE_MS } from "./stream" ;
131
+ import { asyncDebounce } from "@cocalc/util/async-utils" ;
132
+ import { waitUntilReady } from "@cocalc/nats/tiered-storage/client" ;
130
133
131
134
const PUBLISH_TIMEOUT = 15000 ;
132
135
@@ -143,10 +146,6 @@ const CONNECTION_CHECK_INTERVAL = 5000;
143
146
// which is convenient for consistency. This is not consistent with NATS's
144
147
// own KV store limit naming.
145
148
146
- // Significant throttling is VERY, VERY important, since purging old messages frequently
147
- // seems to put a very significant load on NATS!
148
- const ENFORCE_LIMITS_THROTTLE_MS = process . env . COCALC_TEST_MODE ? 100 : 30000 ;
149
-
150
149
export interface KVLimits {
151
150
// How many keys may be in the KV store. Oldest keys will be removed
152
151
// if the key-value store exceeds this size. -1 for unlimited.
@@ -231,6 +230,7 @@ export class GeneralKV<T = any> extends EventEmitter {
231
230
if ( this . all != null ) {
232
231
return ;
233
232
}
233
+ await waitUntilReady ( this . name ) ;
234
234
const kvm = new Kvm ( this . env . nc ) ;
235
235
await waitUntilConnected ( ) ;
236
236
this . kv = await kvm . create ( this . name , {
@@ -355,6 +355,9 @@ export class GeneralKV<T = any> extends EventEmitter {
355
355
} ;
356
356
357
357
private restartWatch = ( ) => {
358
+ // this triggers the end of the "for await (const x of this.watch) {"
359
+ // loop in startWatch, which results in another watch starting,
360
+ // assuming the object isn't closed.
358
361
this . watch ?. stop ( ) ;
359
362
} ;
360
363
@@ -499,7 +502,16 @@ export class GeneralKV<T = any> extends EventEmitter {
499
502
} ;
500
503
501
504
private monitorWatch = async ( ) => {
502
- this . env . nc . on ?.( "reconnect" , this . restartWatch ) ;
505
+ if ( this . env . nc . on != null ) {
506
+ this . env . nc . on ( "reconnect" , this . restartWatch ) ;
507
+ this . env . nc . on ( "status" , ( { type } ) => {
508
+ if ( type == "reconnect" ) {
509
+ this . ensureWatchIsValid ( ) ;
510
+ }
511
+ } ) ;
512
+ } else {
513
+ this . checkWatchOnReconnect ( ) ;
514
+ }
503
515
while ( this . revisions != null ) {
504
516
if ( ! ( await isConnected ( ) ) ) {
505
517
await waitUntilConnected ( ) ;
@@ -511,6 +523,52 @@ export class GeneralKV<T = any> extends EventEmitter {
511
523
}
512
524
} ;
513
525
526
+ private ensureWatchIsValid = asyncDebounce (
527
+ async ( ) => {
528
+ await waitUntilConnected ( ) ;
529
+ await delay ( 2000 ) ;
530
+ const isValid = await this . isWatchStillValid ( ) ;
531
+ if ( ! isValid ) {
532
+ if ( this . kv == null ) {
533
+ return ;
534
+ }
535
+ console . log ( `nats kv: ${ this . name } -- watch not valid, so recreating` ) ;
536
+ await this . restartWatch ( ) ;
537
+ }
538
+ } ,
539
+ 3000 ,
540
+ { leading : false , trailing : true } ,
541
+ ) ;
542
+
543
+ private isWatchStillValid = async ( ) => {
544
+ await waitUntilConnected ( ) ;
545
+ if ( this . kv == null || this . watch == null ) {
546
+ return false ;
547
+ }
548
+ try {
549
+ await this . watch . _data . info ( ) ;
550
+ return true ;
551
+ } catch ( err ) {
552
+ console . log ( `nats: watch info error -- ${ err } ` ) ;
553
+ return false ;
554
+ }
555
+ } ;
556
+
557
+ private checkWatchOnReconnect = async ( ) => {
558
+ while ( this . kv != null ) {
559
+ try {
560
+ for await ( const { type } of await this . env . nc . status ( ) ) {
561
+ if ( type == "reconnect" ) {
562
+ await this . ensureWatchIsValid ( ) ;
563
+ }
564
+ }
565
+ } catch {
566
+ await delay ( 15000 ) ;
567
+ await this . ensureWatchIsValid ( ) ;
568
+ }
569
+ }
570
+ } ;
571
+
514
572
close = ( ) => {
515
573
if ( this . revisions == null ) {
516
574
// already closed
0 commit comments