diff --git a/package.json b/package.json index 1dc0e30..6ca60c7 100644 --- a/package.json +++ b/package.json @@ -15,6 +15,7 @@ "test-db": "docker-compose -f ./docker-compose.test.yaml up", "dist": "tsup", "lint": "standard && tsc", + "lint:fix": "standard --fix && tsc", "test": "dotenvx run --env-file=.env -- node tests/index.js", "test-inspect": "dotenvx run --env-file=.env -- node --inspect-brk tests/index.js", "preversion": "npm run lint && npm run dist", @@ -65,6 +66,7 @@ "redis": "^4.6.12", "socket.io": "^4.7.5", "socket.io-client": "^4.8.0", + "toobusy-js": "^0.5.1", "y-protocols": "^1.0.6", "yjs": "^13.6.18" }, @@ -80,6 +82,7 @@ "@dotenvx/dotenvx": "^1.14.0", "@redis/client": "^1.6.0", "@types/node": "^20.11.5", + "@types/toobusy-js": "^0.5.4", "@types/ws": "^8.5.10", "concurrently": "^8.2.2", "standard": "^17.1.0", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 72e83e1..13f8261 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -20,6 +20,9 @@ importers: socket.io-client: specifier: ^4.8.0 version: 4.8.0 + toobusy-js: + specifier: ^0.5.1 + version: 0.5.1 y-protocols: specifier: ^1.0.6 version: 1.0.6(yjs@13.6.18) @@ -43,6 +46,9 @@ importers: '@types/node': specifier: ^20.11.5 version: 20.12.11 + '@types/toobusy-js': + specifier: ^0.5.4 + version: 0.5.4 '@types/ws': specifier: ^8.5.10 version: 8.5.10 @@ -424,6 +430,9 @@ packages: '@types/node@20.12.11': resolution: {integrity: sha512-vDg9PZ/zi+Nqp6boSOT7plNuthRugEKixDv5sFTIpkE89MmNtEArAShI4mxuX2+UrLEe9pxC1vm2cjm9YlWbJw==} + '@types/toobusy-js@0.5.4': + resolution: {integrity: sha512-hsKMbYiaL3ZWx7B3FYyN0rEJexw7I1HgKbNToX3ZZJv6373to954wlA7zrXR3/XoVwZnFwWqFguBs91sNzJGKQ==} + '@types/ws@8.5.10': resolution: {integrity: sha512-vmQSUcfalpIq0R9q7uTo2lXs6eGIpt9wtnLdMv9LVpIjCA/+ufZRozlVoVelIYixx1ugCBKDhn89vnsEGOCx9A==} @@ -1808,6 +1817,10 @@ packages: resolution: {integrity: sha512-65P7iz6X5yEr1cwcgvQxbbIw7Uk3gOy5dIdtZ4rDveLqhrdJP+Li/Hx6tyK0NEb+2GCyneCMJiGqrADCSNk8sQ==} engines: {node: '>=8.0'} + toobusy-js@0.5.1: + resolution: {integrity: sha512-GiCux/c8G2TV0FTDgtxnXOxmSAndaI/9b1YxT14CqyeBDtTZAcJLx9KlXT3qECi8D0XCc78T4sN/7gWtjRyCaA==} + engines: {node: '>=0.9.1'} + tr46@1.0.1: resolution: {integrity: sha512-dTpowEjclQ7Kgx5SdBkqRzVhERQXov8/l9Ft9dVM9fmg0W0KQSVaXX9T4i6twCPNtYiZM53lpSSUAwJbFPOHxA==} @@ -2278,6 +2291,8 @@ snapshots: dependencies: undici-types: 5.26.5 + '@types/toobusy-js@0.5.4': {} + '@types/ws@8.5.10': dependencies: '@types/node': 20.12.11 @@ -3916,6 +3931,8 @@ snapshots: dependencies: is-number: 7.0.0 + toobusy-js@0.5.1: {} + tr46@1.0.1: dependencies: punycode: 2.3.1 diff --git a/src/api.js b/src/api.js index 693793c..64860fd 100644 --- a/src/api.js +++ b/src/api.js @@ -19,6 +19,7 @@ let ydocUpdateCallback = env.getConf('ydoc-update-callback') if (ydocUpdateCallback != null && ydocUpdateCallback.slice(-1) !== '/') { ydocUpdateCallback += '/' } +const WORKER_DISABLED = env.getConf('y-worker-disabled') === 'true' /** * @param {string} a @@ -117,20 +118,27 @@ export class Api { this.redisWorkerGroupName = this.prefix + ':worker' this.workerSetName = `${this.prefix}:worker:${this.consumername}:idset` this._destroyed = false + /** @type {import('worker_threads').Worker | null} */ + this.persistWorker = null + + const addScript = WORKER_DISABLED + ? 'redis.call("XADD", KEYS[1], "*", "m", ARGV[1])' + : ` + if redis.call("EXISTS", KEYS[1]) == 0 then + redis.call("XADD", "${this.redisWorkerStreamName}", "*", "compact", KEYS[1]) + elseif redis.call("XLEN", KEYS[1]) > 100 then + redis.call("SADD", "${this.prefix}:worker:checklist", KEYS[1]) + end + redis.call("XADD", KEYS[1], "*", "m", ARGV[1]) + ` + this.redis = redis.createClient({ url, // scripting: https://github.com/redis/node-redis/#lua-scripts scripts: { addMessage: redis.defineScript({ NUMBER_OF_KEYS: 1, - SCRIPT: ` - if redis.call("EXISTS", KEYS[1]) == 0 then - redis.call("XADD", "${this.redisWorkerStreamName}", "*", "compact", KEYS[1]) - elseif redis.call("XLEN", KEYS[1]) > 100 then - redis.call("SADD", "${this.prefix}:worker:checklist", KEYS[1]) - end - redis.call("XADD", KEYS[1], "*", "m", ARGV[1]) - `, + SCRIPT: addScript, /** * @param {string} key * @param {Buffer} message @@ -265,6 +273,35 @@ export class Api { } } + /** + * @param {string} room + * @param {string} docid + */ + async getRedisLastId (room, docid) { + const ms = extractMessagesFromStreamReply(await this.redis.xRead(redis.commandOptions({ returnBuffers: true }), { key: computeRedisRoomStreamName(room, docid, this.prefix), id: '0' }), this.prefix) + const docMessages = ms.get(room)?.get(docid) || null + return docMessages?.lastId.toString() || '0' + } + + /** + * @param {string} room + * @param {string} docid + * @param {boolean} [remove=false] + */ + async trimRoomStream (room, docid, remove = false) { + const roomName = computeRedisRoomStreamName(room, docid, this.prefix) + const redisLastId = await this.getRedisLastId(room, docid) + const lastId = number.parseInt(redisLastId.split('-')[0]) + if (remove) { + await this.redis.del(roomName) + } else { + await this.redis.multi() + .xTrim(roomName, 'MINID', lastId - this.redisMinMessageLifetime) + .xDelIfEmpty(roomName) + .exec() + } + } + /** * @param {Object} opts * @param {number} [opts.blockTime] diff --git a/src/index.js b/src/index.js index 29e80d4..8c183d9 100644 --- a/src/index.js +++ b/src/index.js @@ -3,4 +3,5 @@ export * from './server.js' export * from './storage.js' export * from './api.js' export * from './subscriber.js' +export * from './persist-worker-thread.js' export * from './y-socket-io/index.js' diff --git a/src/persist-worker-thread.js b/src/persist-worker-thread.js new file mode 100644 index 0000000..4e588be --- /dev/null +++ b/src/persist-worker-thread.js @@ -0,0 +1,44 @@ +import * as Y from 'yjs' +import * as logging from 'lib0/logging' +import { isMainThread, parentPort } from 'worker_threads' + +export class PersistWorkerThread { + /** + * @private + * @readonly + */ + log = logging.createModuleLogger('@y/persist-worker-thread') + + /** + * @param {import('./storage.js').AbstractStorage} store + */ + constructor (store) { + if (isMainThread) { + this.log('persist worker cannot run on main thread') + return + } + this.store = store + parentPort?.on('message', this.persist) + } + + /** + * @param {{ room: string, docstate: SharedArrayBuffer }} props + */ + persist = async ({ room, docstate }) => { + this.log(`persisting ${room} in worker`) + const state = new Uint8Array(docstate) + const doc = new Y.Doc() + Y.applyUpdateV2(doc, state) + await this.store?.persistDoc(room, 'index', doc) + doc.destroy() + parentPort?.postMessage({ event: 'persisted', room }) + } +} + +/** + * @param {import('./storage.js').AbstractStorage} store + */ +export function createPersistWorkerThread (store) { + if (isMainThread) throw new Error('cannot create persist worker in main thread') + return new PersistWorkerThread(store) +} diff --git a/src/socketio.js b/src/socketio.js index e288f3a..0d26540 100644 --- a/src/socketio.js +++ b/src/socketio.js @@ -37,9 +37,10 @@ class YSocketIOServer { * @param {string} [conf.redisPrefix] * @param {string} [conf.redisUrl] * @param {import('./y-socket-io/y-socket-io.js').YSocketIOConfiguration['authenticate']} conf.authenticate + * @param {import('worker_threads').Worker=} [conf.persistWorker] */ -export const registerYSocketIOServer = async (io, store, { authenticate, redisUrl, redisPrefix }) => { +export const registerYSocketIOServer = async (io, store, { authenticate, redisUrl, redisPrefix, persistWorker }) => { const app = new YSocketIO(io, { authenticate }) - const { client, subscriber } = await app.initialize(store, { redisUrl, redisPrefix }) + const { client, subscriber } = await app.initialize(store, { redisUrl, redisPrefix, persistWorker }) return new YSocketIOServer(app, client, subscriber) } diff --git a/src/y-socket-io/client.js b/src/y-socket-io/client.js index ebeacde..c1a1d11 100644 --- a/src/y-socket-io/client.js +++ b/src/y-socket-io/client.js @@ -351,6 +351,8 @@ export class SocketIOProvider extends Observable { * @readonly */ onSocketDisconnection = (event) => { + if (event === 'io server disconnect') this.socket.connect() + this.emit('connection-close', [event, this]) this.synced = false AwarenessProtocol.removeAwarenessStates( diff --git a/src/y-socket-io/utils.js b/src/y-socket-io/utils.js new file mode 100644 index 0000000..42481de --- /dev/null +++ b/src/y-socket-io/utils.js @@ -0,0 +1,17 @@ +/** + * Basically Promise.withResolvers() + * @template T + * @see https://developer.mozilla.org/docs/Web/JavaScript/Reference/Global_Objects/Promise/withResolvers + */ +export function promiseWithResolvers () { + /** @type {(value: T | PromiseLike) => void} */ + let res = () => {} + /** @type {(reason?: Error) => void} */ + let rej = () => {} + /** @type {Promise} */ + const promise = new Promise((resolve, reject) => { + res = resolve + rej = reject + }) + return { promise, resolve: res, reject: rej } +} diff --git a/src/y-socket-io/y-socket-io.js b/src/y-socket-io/y-socket-io.js index 590facf..8a50f55 100644 --- a/src/y-socket-io/y-socket-io.js +++ b/src/y-socket-io/y-socket-io.js @@ -4,10 +4,29 @@ import * as promise from 'lib0/promise' import * as encoding from 'lib0/encoding' import * as decoding from 'lib0/decoding' import { assert } from 'lib0/testing' -import { User } from './user.js' import * as api from '../api.js' import * as protocol from '../protocol.js' +import * as number from 'lib0/number' +import * as env from 'lib0/environment' import { createSubscriber } from '../subscriber.js' +import { isDeepStrictEqual } from 'util' +import { User } from './user.js' +import { createModuleLogger } from 'lib0/logging' +import toobusy from 'toobusy-js' +import { promiseWithResolvers } from './utils.js' + +const logSocketIO = createModuleLogger('@y/socket-io/server') +const PERSIST_INTERVAL = number.parseInt(env.getConf('y-socket-io-server-persist-interval') || '3000') +const MAX_PERSIST_INTERVAL = number.parseInt(env.getConf('y-socket-io-server-max-persist-interval') || '30000') +const REVALIDATE_TIMEOUT = number.parseInt(env.getConf('y-socket-io-server-revalidate-timeout') || '60000') +const WORKER_DISABLED = env.getConf('y-worker-disabled') === 'true' +const DEFAULT_CLEAR_TIMEOUT = number.parseInt(env.getConf('y-socket-io-default-clear-timeout') || '30000') + +process.on('SIGINT', function () { + // calling .shutdown allows your process to exit normally + toobusy.shutdown() + process.exit() +}) /** * @typedef {import('socket.io').Namespace} Namespace @@ -89,6 +108,48 @@ export class YSocketIO { * @readonly */ namespaceMap = new Map() + /** + * @type {Map} + * @private + * @readonly + */ + namespaceDocMap = new Map() + /** + * @type {Map} + * @private + * @readonly + */ + socketUserCache = new Map() + /** + * @type {Map} + * @private + * @readonly + */ + debouncedPersistMap = new Map() + /** + * @type {Map} + * @private + * @readonly + */ + debouncedPersistDocMap = new Map() + /** + * @type {Map} + * @private + * @readonly + */ + namespacePersistentMap = new Map() + /** + * @type {Map, resolve: () => void }>} + * @private + * @readonly + */ + awaitingPersistMap = new Map() + /** + * @type {Map} + * @private + * @readonly + */ + awaitingCleanupNamespace = new Map() /** * YSocketIO constructor. @@ -109,39 +170,67 @@ export class YSocketIO { * * It also starts socket connection listeners. * @param {import('../storage.js').AbstractStorage} store - * @param {{ redisPrefix?: string, redisUrl?: string }=} opts + * @param {{ redisPrefix?: string, redisUrl?: string, persistWorker?: import('worker_threads').Worker }=} opts * @public */ - async initialize (store, { redisUrl, redisPrefix = 'y' } = {}) { + async initialize (store, { redisUrl, redisPrefix = 'y', persistWorker } = {}) { const [client, subscriber] = await promise.all([ api.createApiClient(store, { redisUrl, redisPrefix }), createSubscriber(store, { redisUrl, redisPrefix }) ]) this.client = client this.subscriber = subscriber + if (persistWorker) { + this.client.persistWorker = persistWorker + this.registerPersistWorkerResolve() + } this.nsp = this.io.of(/^\/yjs\|.*$/) this.nsp.use(async (socket, next) => { - if (this.configuration.authenticate == null) return next() - const user = await this.configuration.authenticate(socket) - if (user) { - socket.user = new User(this.getNamespaceString(socket.nsp), user.userid) - return next() - } else return next(new Error('Unauthorized')) + if (this.configuration.authenticate === null) return next() + const userCache = this.socketUserCache.get(socket) + const namespace = this.getNamespaceString(socket.nsp) + if (!userCache || Date.now() - userCache.validatedAt > REVALIDATE_TIMEOUT) { + this.socketUserCache.delete(socket) + const user = await this.configuration.authenticate(socket) + if (!user) return next(new Error('Unauthorized')) + this.socketUserCache.set(socket, { user, validatedAt: Date.now() }) + socket.user = new User(namespace, user.userid) + } else { + socket.user = new User(namespace, userCache.user.userid) + } + + if (socket.user) return next() + else return next(new Error('Unauthorized')) }) this.nsp.on('connection', async (socket) => { assert(this.client) assert(this.subscriber) + const namespace = this.getNamespaceString(socket.nsp) + if (toobusy()) { + logSocketIO(`warning server too busy, rejecting connection: ${namespace}`) + // wait a bit to prevent client reconnect too fast + await promise.wait(100) + socket.send('server too busy, please try again latter') + socket.disconnect(true) + return + } if (!socket.user) throw new Error('user does not exist in socket') - const namespace = this.getNamespaceString(socket.nsp) + logSocketIO(`new connection in namespace: ${namespace}`) const stream = api.computeRedisRoomStreamName( namespace, 'index', redisPrefix ) + const prevAwaitCleanup = this.awaitingCleanupNamespace.get(namespace) + if (prevAwaitCleanup) { + clearTimeout(prevAwaitCleanup) + this.cleanupNamespace(namespace, stream) + } + if (!this.namespaceMap.has(namespace)) { this.namespaceMap.set(namespace, socket.nsp) } @@ -156,17 +245,21 @@ export class YSocketIO { this.initSyncListeners(socket) this.initAwarenessListeners(socket) this.initSocketListeners(socket) + ;(async () => { + assert(this.client) + assert(socket.user) + const doc = (WORKER_DISABLED && this.namespaceDocMap.get(namespace)) || (await this.client.getDoc(namespace, 'index')) + if (WORKER_DISABLED) this.namespaceDocMap.set(namespace, doc) - const doc = await this.client.getDoc(namespace, 'index') - - if ( - api.isSmallerRedisId(doc.redisLastId, socket.user.initialRedisSubId) - ) { - // our subscription is newer than the content that we received from the api - // need to renew subscription id and make sure that we catch the latest content. - this.subscriber.ensureSubId(stream, doc.redisLastId) - } - this.startSynchronization(socket, doc) + if ( + api.isSmallerRedisId(doc.redisLastId, socket.user.initialRedisSubId) + ) { + // our subscription is newer than the content that we received from the api + // need to renew subscription id and make sure that we catch the latest content. + this.subscriber?.ensureSubId(stream, doc.redisLastId) + } + this.startSynchronization(socket, doc) + })() }) return { client, subscriber } @@ -200,22 +293,29 @@ export class YSocketIO { syncStep2 ) => { assert(this.client) - const doc = await this.client.getDoc( - this.getNamespaceString(socket.nsp), - 'index' - ) + const namespace = this.getNamespaceString(socket.nsp) + const doc = (WORKER_DISABLED && this.namespaceDocMap.get(namespace)) || (await this.client.getDoc(namespace, 'index')) + if (WORKER_DISABLED) this.namespaceDocMap.set(namespace, doc) + assert(doc) syncStep2(Y.encodeStateAsUpdate(doc.ydoc, stateVector)) } ) + /** @type {unknown} */ + let prevMsg = null socket.on('sync-update', (/** @type {ArrayBuffer} */ update) => { + if (isDeepStrictEqual(update, prevMsg)) return assert(this.client) + const namespace = this.getNamespaceString(socket.nsp) const message = Buffer.from(update.slice(0, update.byteLength)) - this.client.addMessage( - this.getNamespaceString(socket.nsp), - 'index', - Buffer.from(this.toRedis('sync-update', message)) - ).catch(console.error) + this.client + .addMessage( + namespace, + 'index', + Buffer.from(this.toRedis('sync-update', message)) + ) + .catch(console.error) + prevMsg = update }) } @@ -232,14 +332,19 @@ export class YSocketIO { * @readonly */ initAwarenessListeners = (socket) => { + /** @type {unknown} */ + const prevMsg = null socket.on('awareness-update', (/** @type {ArrayBuffer} */ update) => { + if (isDeepStrictEqual(update, prevMsg)) return assert(this.client) const message = Buffer.from(update.slice(0, update.byteLength)) - this.client.addMessage( - this.getNamespaceString(socket.nsp), - 'index', - Buffer.from(this.toRedis('awareness-update', new Uint8Array(message))) - ).catch(console.error) + this.client + .addMessage( + this.getNamespaceString(socket.nsp), + 'index', + Buffer.from(this.toRedis('awareness-update', new Uint8Array(message))) + ) + .catch(console.error) }) } @@ -253,17 +358,27 @@ export class YSocketIO { socket.on('disconnect', async () => { assert(this.subscriber) if (!socket.user) return - for (const ns of socket.user.subs) { - const stream = this.namespaceStreamMap.get(ns) + this.socketUserCache.delete(socket) + for (const stream of socket.user.subs) { + const ns = this.streamNamespaceMap.get(stream) + if (!ns) continue const nsp = this.namespaceMap.get(ns) if (nsp?.sockets.size === 0 && stream) { - this.subscriber.unsubscribe(stream, this.redisMessageSubscriber) - this.namespaceStreamMap.delete(ns) - this.streamNamespaceMap.delete(stream) - this.namespaceMap.delete(ns) + this.cleanupNamespace(ns, stream, DEFAULT_CLEAR_TIMEOUT) + const doc = this.namespaceDocMap.get(ns) + if (doc) this.debouncedPersist(ns, doc.ydoc, true) } } }) + socket.onAnyOutgoing(async (ev) => { + if (ev !== 'reload') return + if (!WORKER_DISABLED) return + const namespace = this.getNamespaceString(socket.nsp) + logSocketIO(`reload event triggered, updating namespace doc in: ${namespace}`) + assert(this.client) + const doc = await this.client.getDoc(namespace, 'index') + this.namespaceDocMap.set(namespace, doc) + }) } /** @@ -280,11 +395,13 @@ export class YSocketIO { (/** @type {Uint8Array} */ update) => { assert(this.client) const message = Buffer.from(update.slice(0, update.byteLength)) - this.client.addMessage( - this.getNamespaceString(socket.nsp), - 'index', - Buffer.from(this.toRedis('sync-step-2', message)) - ).catch(console.error) + this.client + .addMessage( + this.getNamespaceString(socket.nsp), + 'index', + Buffer.from(this.toRedis('sync-step-2', message)) + ) + .catch(console.error) } ) if (doc.awareness.states.size > 0) { @@ -303,16 +420,13 @@ export class YSocketIO { * @param {string} stream * @param {Array} messages */ - redisMessageSubscriber = (stream, messages) => { + redisMessageSubscriber = async (stream, messages) => { const namespace = this.streamNamespaceMap.get(stream) if (!namespace) return const nsp = this.namespaceMap.get(namespace) if (!nsp) return if (nsp.sockets.size === 0 && this.subscriber) { - this.subscriber.unsubscribe(stream, this.redisMessageSubscriber) - this.namespaceStreamMap.delete(namespace) - this.streamNamespaceMap.delete(stream) - this.namespaceMap.delete(namespace) + this.cleanupNamespace(namespace, stream, DEFAULT_CLEAR_TIMEOUT) } /** @type {Uint8Array[]} */ @@ -334,6 +448,96 @@ export class YSocketIO { if (msg.length === 0) continue nsp.emit('awareness-update', msg) } + + if (!WORKER_DISABLED) return + + let changed = false + const existDoc = this.namespaceDocMap.get(namespace) + if (existDoc) { + existDoc.ydoc.on('afterTransaction', (tr) => { + changed = tr.changed.size > 0 + }) + Y.transact(existDoc.ydoc, () => { + for (const msg of updates) { + if (msg.length === 0) continue + Y.applyUpdate(existDoc.ydoc, msg) + } + for (const msg of awareness) { + if (msg.length === 0) continue + AwarenessProtocol.applyAwarenessUpdate(existDoc.awareness, msg, null) + } + }) + } + + assert(this.client) + let doc = existDoc + if (!existDoc) { + const getDoc = await this.client.getDoc(namespace, 'index') + doc = getDoc + changed = getDoc.changed + } + assert(doc) + const lastPersistCalledAt = this.namespacePersistentMap.get(namespace) ?? 0 + const now = Date.now() + const shouldPersist = now - lastPersistCalledAt > MAX_PERSIST_INTERVAL + if (changed || shouldPersist || nsp.sockets.size === 0) { + this.namespacePersistentMap.set(namespace, now) + this.debouncedPersist(namespace, doc.ydoc, nsp.sockets.size === 0) + } + this.namespaceDocMap.get(namespace)?.ydoc.destroy() + this.namespaceDocMap.set(namespace, doc) + } + + /** + * @param {string} namespace + * @param {Y.Doc} doc + * @param {boolean=} immediate + */ + debouncedPersist (namespace, doc, immediate = false) { + this.debouncedPersistDocMap.set(namespace, doc) + if (this.debouncedPersistMap.has(namespace)) { + if (!immediate) return + clearTimeout(this.debouncedPersistMap.get(namespace) || undefined) + } + const timeoutInterval = immediate + ? 0 + : PERSIST_INTERVAL + (Math.random() - 0.5) * PERSIST_INTERVAL + const timeout = setTimeout( + async () => { + try { + assert(this.client) + const doc = this.debouncedPersistDocMap.get(namespace) + logSocketIO(`trying to persist ${namespace}`) + if (!doc) return + if (this.client.persistWorker) { + /** @type {ReturnType>} */ + const { promise, resolve } = promiseWithResolvers() + assert(this.client?.persistWorker) + this.awaitingPersistMap.set(namespace, { promise, resolve }) + + const docState = Y.encodeStateAsUpdateV2(doc) + const buf = new Uint8Array(new SharedArrayBuffer(docState.length)) + buf.set(docState) + this.client.persistWorker.postMessage({ + room: namespace, + docstate: buf + }) + await promise + } else { + await this.client.store.persistDoc(namespace, 'index', doc) + } + await this.client.trimRoomStream(namespace, 'index') + } catch (e) { + console.error(e) + } finally { + this.debouncedPersistDocMap.delete(namespace) + this.debouncedPersistMap.delete(namespace) + } + }, + timeoutInterval + ) + + this.debouncedPersistMap.set(namespace, timeout) } /** @@ -420,4 +624,49 @@ export class YSocketIO { console.error(e) } } + + registerPersistWorkerResolve () { + if (!this.client?.persistWorker) return + this.client.persistWorker.on('message', ({ event, room }) => { + if (event === 'persisted') this.awaitingPersistMap.get(room)?.resolve() + }) + } + + /** + * @param {string} namespace + * @param {string} stream + * @param {number=} removeAfterWait + */ + cleanupNamespace (namespace, stream, removeAfterWait) { + if (!removeAfterWait) { + this.awaitingCleanupNamespace.delete(namespace) + return this.cleanupNamespaceImpl(namespace, stream) + } + if (this.awaitingCleanupNamespace.has(namespace)) return + + const timer = setTimeout(async () => { + const awaitingPersist = this.awaitingPersistMap.get(namespace) + if (awaitingPersist) await awaitingPersist.promise + this.cleanupNamespaceImpl(namespace, stream) + this.awaitingCleanupNamespace.delete(namespace) + logSocketIO(`no active connection, namespace: ${namespace} cleared`) + }, removeAfterWait) + this.awaitingCleanupNamespace.set(namespace, timer) + } + + /** + * @param {string} namespace + * @param {string} stream + * @private + */ + cleanupNamespaceImpl (namespace, stream) { + this.subscriber?.unsubscribe(stream, this.redisMessageSubscriber) + this.namespaceStreamMap.delete(namespace) + this.streamNamespaceMap.delete(stream) + this.namespaceMap.delete(namespace) + this.namespaceDocMap.get(namespace)?.ydoc.destroy() + this.namespaceDocMap.delete(namespace) + this.namespacePersistentMap.delete(namespace) + this.client?.trimRoomStream(namespace, 'index', true) + } }