@@ -13,12 +13,14 @@ import { isDeepStrictEqual } from 'util'
13
13
import { User } from './user.js'
14
14
import { createModuleLogger } from 'lib0/logging'
15
15
import toobusy from 'toobusy-js'
16
+ import { promiseWithResolvers } from './utils.js'
16
17
17
18
const logSocketIO = createModuleLogger ( '@y/socket-io/server' )
18
19
const PERSIST_INTERVAL = number . parseInt ( env . getConf ( 'y-socket-io-server-persist-interval' ) || '3000' )
19
20
const MAX_PERSIST_INTERVAL = number . parseInt ( env . getConf ( 'y-socket-io-server-max-persist-interval' ) || '30000' )
20
21
const REVALIDATE_TIMEOUT = number . parseInt ( env . getConf ( 'y-socket-io-server-revalidate-timeout' ) || '60000' )
21
22
const WORKER_DISABLED = env . getConf ( 'y-worker-disabled' ) === 'true'
23
+ const DEFAULT_CLEAR_TIMEOUT = number . parseInt ( env . getConf ( 'y-socket-io-default-clear-timeout' ) || '30000' )
22
24
23
25
process . on ( 'SIGINT' , function ( ) {
24
26
// calling .shutdown allows your process to exit normally
@@ -137,11 +139,17 @@ export class YSocketIO {
137
139
*/
138
140
namespacePersistentMap = new Map ( )
139
141
/**
140
- * @type {Map<string, () => void> }
142
+ * @type {Map<string, { promise: Promise<void>, resolve: () => void } > }
141
143
* @private
142
144
* @readonly
143
145
*/
144
146
awaitingPersistMap = new Map ( )
147
+ /**
148
+ * @type {Map<string, NodeJS.Timeout> }
149
+ * @private
150
+ * @readonly
151
+ */
152
+ awaitingCleanupNamespace = new Map ( )
145
153
146
154
/**
147
155
* YSocketIO constructor.
@@ -213,6 +221,12 @@ export class YSocketIO {
213
221
'index' ,
214
222
redisPrefix
215
223
)
224
+ const prevAwaitCleanup = this . awaitingCleanupNamespace . get ( namespace )
225
+ if ( prevAwaitCleanup ) {
226
+ clearTimeout ( prevAwaitCleanup )
227
+ this . cleanupNamespace ( namespace , stream )
228
+ }
229
+
216
230
if ( ! this . namespaceMap . has ( namespace ) ) {
217
231
this . namespaceMap . set ( namespace , socket . nsp )
218
232
}
@@ -346,13 +360,9 @@ export class YSocketIO {
346
360
if ( ! ns ) continue
347
361
const nsp = this . namespaceMap . get ( ns )
348
362
if ( nsp ?. sockets . size === 0 && stream ) {
349
- this . subscriber . unsubscribe ( stream , this . redisMessageSubscriber )
350
- this . namespaceStreamMap . delete ( ns )
351
- this . streamNamespaceMap . delete ( stream )
352
- this . namespaceMap . delete ( ns )
353
- this . namespaceDocMap . get ( ns ) ?. ydoc . destroy ( )
354
- this . namespaceDocMap . delete ( ns )
355
- this . namespacePersistentMap . delete ( ns )
363
+ this . cleanupNamespace ( ns , stream , DEFAULT_CLEAR_TIMEOUT )
364
+ const doc = this . namespaceDocMap . get ( ns )
365
+ if ( doc ) this . debouncedPersist ( ns , doc . ydoc , true )
356
366
}
357
367
}
358
368
} )
@@ -403,13 +413,7 @@ export class YSocketIO {
403
413
const nsp = this . namespaceMap . get ( namespace )
404
414
if ( ! nsp ) return
405
415
if ( nsp . sockets . size === 0 && this . subscriber ) {
406
- this . subscriber . unsubscribe ( stream , this . redisMessageSubscriber )
407
- this . namespaceStreamMap . delete ( namespace )
408
- this . streamNamespaceMap . delete ( stream )
409
- this . namespaceMap . delete ( namespace )
410
- this . namespaceDocMap . get ( namespace ) ?. ydoc . destroy ( )
411
- this . namespaceDocMap . delete ( namespace )
412
- this . namespacePersistentMap . delete ( namespace )
416
+ this . cleanupNamespace ( namespace , stream , DEFAULT_CLEAR_TIMEOUT )
413
417
}
414
418
415
419
/** @type {Uint8Array[] } */
@@ -463,9 +467,9 @@ export class YSocketIO {
463
467
const lastPersistCalledAt = this . namespacePersistentMap . get ( namespace ) ?? 0
464
468
const now = Date . now ( )
465
469
const shouldPersist = now - lastPersistCalledAt > MAX_PERSIST_INTERVAL
466
- if ( changed || shouldPersist ) {
470
+ if ( changed || shouldPersist || nsp . sockets . size === 0 ) {
467
471
this . namespacePersistentMap . set ( namespace , now )
468
- this . debouncedPersist ( namespace , doc . ydoc )
472
+ this . debouncedPersist ( namespace , doc . ydoc , nsp . sockets . size === 0 )
469
473
}
470
474
this . namespaceDocMap . get ( namespace ) ?. ydoc . destroy ( )
471
475
this . namespaceDocMap . set ( namespace , doc )
@@ -474,47 +478,50 @@ export class YSocketIO {
474
478
/**
475
479
* @param {string } namespace
476
480
* @param {Y.Doc } doc
481
+ * @param {boolean= } immediate
477
482
*/
478
- async debouncedPersist ( namespace , doc ) {
483
+ debouncedPersist ( namespace , doc , immediate = false ) {
479
484
this . debouncedPersistDocMap . set ( namespace , doc )
480
- if ( this . debouncedPersistMap . has ( namespace ) ) return
485
+ if ( this . debouncedPersistMap . has ( namespace ) ) {
486
+ if ( ! immediate ) return
487
+ clearTimeout ( this . debouncedPersistMap . get ( namespace ) || undefined )
488
+ }
489
+ const timeoutInterval = immediate
490
+ ? 0
491
+ : PERSIST_INTERVAL + ( Math . random ( ) - 0.5 ) * PERSIST_INTERVAL
481
492
const timeout = setTimeout (
482
493
async ( ) => {
483
494
try {
484
495
assert ( this . client )
485
496
const doc = this . debouncedPersistDocMap . get ( namespace )
486
497
logSocketIO ( `trying to persist ${ namespace } ` )
487
498
if ( ! doc ) return
488
- /** @type {Promise<void> | null } */
489
- let workerPromise = null
490
499
if ( this . client . persistWorker ) {
491
- workerPromise = new Promise ( ( resolve ) => {
492
- assert ( this . client ?. persistWorker )
493
- this . awaitingPersistMap . set ( namespace , resolve )
494
-
495
- const docState = Y . encodeStateAsUpdateV2 ( doc )
496
- const buf = new Uint8Array ( new SharedArrayBuffer ( docState . length ) )
497
- buf . set ( docState )
498
- this . client . persistWorker . postMessage ( {
499
- room : namespace ,
500
- docstate : buf
501
- } )
500
+ /** @type { ReturnType<typeof promiseWithResolvers<void>> } */
501
+ const { promise , resolve } = promiseWithResolvers ( )
502
+ assert ( this . client ?. persistWorker )
503
+ this . awaitingPersistMap . set ( namespace , { promise , resolve } )
504
+
505
+ const docState = Y . encodeStateAsUpdateV2 ( doc )
506
+ const buf = new Uint8Array ( new SharedArrayBuffer ( docState . length ) )
507
+ buf . set ( docState )
508
+ this . client . persistWorker . postMessage ( {
509
+ room : namespace ,
510
+ docstate : buf
502
511
} )
503
- if ( workerPromise ) {
504
- await workerPromise
505
- }
512
+ await promise
506
513
} else {
507
514
await this . client . store . persistDoc ( namespace , 'index' , doc )
508
515
}
509
- await this . client . trimRoomStream ( namespace , 'index' , true )
516
+ await this . client . trimRoomStream ( namespace , 'index' )
510
517
} catch ( e ) {
511
518
console . error ( e )
512
519
} finally {
513
520
this . debouncedPersistDocMap . delete ( namespace )
514
521
this . debouncedPersistMap . delete ( namespace )
515
522
}
516
523
} ,
517
- PERSIST_INTERVAL + ( Math . random ( ) - 0.5 ) * PERSIST_INTERVAL
524
+ timeoutInterval
518
525
)
519
526
520
527
this . debouncedPersistMap . set ( namespace , timeout )
@@ -608,7 +615,45 @@ export class YSocketIO {
608
615
registerPersistWorkerResolve ( ) {
609
616
if ( ! this . client ?. persistWorker ) return
610
617
this . client . persistWorker . on ( 'message' , ( { event, room } ) => {
611
- if ( event === 'persisted' ) this . awaitingPersistMap . get ( room ) ?. ( )
618
+ if ( event === 'persisted' ) this . awaitingPersistMap . get ( room ) ?. resolve ( )
612
619
} )
613
620
}
621
+
622
+ /**
623
+ * @param {string } namespace
624
+ * @param {string } stream
625
+ * @param {number= } removeAfterWait
626
+ */
627
+ cleanupNamespace ( namespace , stream , removeAfterWait ) {
628
+ if ( ! removeAfterWait ) {
629
+ this . awaitingCleanupNamespace . delete ( namespace )
630
+ return this . cleanupNamespaceImpl ( namespace , stream )
631
+ }
632
+ if ( this . awaitingCleanupNamespace . has ( namespace ) ) return
633
+
634
+ const timer = setTimeout ( async ( ) => {
635
+ const awaitingPersist = this . awaitingPersistMap . get ( namespace )
636
+ if ( awaitingPersist ) await awaitingPersist . promise
637
+ this . cleanupNamespaceImpl ( namespace , stream )
638
+ this . awaitingCleanupNamespace . delete ( namespace )
639
+ logSocketIO ( `no active connection, namespace: ${ namespace } cleared` )
640
+ } , removeAfterWait )
641
+ this . awaitingCleanupNamespace . set ( namespace , timer )
642
+ }
643
+
644
+ /**
645
+ * @param {string } namespace
646
+ * @param {string } stream
647
+ * @private
648
+ */
649
+ cleanupNamespaceImpl ( namespace , stream ) {
650
+ this . subscriber ?. unsubscribe ( stream , this . redisMessageSubscriber )
651
+ this . namespaceStreamMap . delete ( namespace )
652
+ this . streamNamespaceMap . delete ( stream )
653
+ this . namespaceMap . delete ( namespace )
654
+ this . namespaceDocMap . get ( namespace ) ?. ydoc . destroy ( )
655
+ this . namespaceDocMap . delete ( namespace )
656
+ this . namespacePersistentMap . delete ( namespace )
657
+ this . client ?. trimRoomStream ( namespace , 'index' , true )
658
+ }
614
659
}
0 commit comments