Skip to content

Commit 647e1c6

Browse files
authored
feat: impl worker health check & recreation (#28)
1 parent f3b9f1a commit 647e1c6

File tree

4 files changed

+100
-15
lines changed

4 files changed

+100
-15
lines changed

src/api.js

-2
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,6 @@ export class Api {
122122
this.redisWorkerGroupName = this.prefix + ':worker'
123123
this.workerSetName = `${this.prefix}:worker:${this.consumername}:idset`
124124
this._destroyed = false
125-
/** @type {import('worker_threads').Worker | null} */
126-
this.persistWorker = null
127125

128126
const addScript = WORKER_DISABLED
129127
? `

src/persist-worker-thread.js

+5-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,11 @@ export class PersistWorkerThread {
1818
return
1919
}
2020
this.store = store
21-
parentPort?.on('message', this.persist)
21+
parentPort?.postMessage({ event: 'ready' })
22+
parentPort?.on('message', ({ event, ...rest }) => {
23+
if (event === 'ping') parentPort?.postMessage({ event: 'pong' })
24+
else this.persist(rest)
25+
})
2226
}
2327

2428
/**

src/socketio.js

+3-3
Original file line numberDiff line numberDiff line change
@@ -37,17 +37,17 @@ class YSocketIOServer {
3737
* @param {string} [conf.redisPrefix]
3838
* @param {string} [conf.redisUrl]
3939
* @param {import('./y-socket-io/y-socket-io.js').YSocketIOConfiguration['authenticate']} conf.authenticate
40-
* @param {import('worker_threads').Worker=} [conf.persistWorker]
40+
* @param {() => import('worker_threads').Worker=} [conf.getPersistWorker]
4141
* @param {boolean} [conf.enableAwareness]
4242
*/
4343
export const registerYSocketIOServer = async (io, store, {
4444
authenticate,
4545
redisUrl,
4646
redisPrefix,
47-
persistWorker,
47+
getPersistWorker,
4848
enableAwareness = true
4949
}) => {
5050
const app = new YSocketIO(io, { authenticate, enableAwareness })
51-
const { client, subscriber } = await app.initialize(store, { redisUrl, redisPrefix, persistWorker })
51+
const { client, subscriber } = await app.initialize(store, { redisUrl, redisPrefix, getPersistWorker })
5252
return new YSocketIOServer(app, client, subscriber)
5353
}

src/y-socket-io/y-socket-io.js

+92-9
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ const MAX_PERSIST_INTERVAL = number.parseInt(env.getConf('y-socket-io-server-max
2121
const REVALIDATE_TIMEOUT = number.parseInt(env.getConf('y-socket-io-server-revalidate-timeout') || '60000')
2222
const WORKER_DISABLED = env.getConf('y-worker-disabled') === 'true'
2323
const DEFAULT_CLEAR_TIMEOUT = number.parseInt(env.getConf('y-socket-io-default-clear-timeout') || '30000')
24+
const WORKER_HEALTH_CHECK_INTERVAL = number.parseInt(env.getConf('y-socket-io-worker-health-check-interval') || '5000')
2425

2526
process.on('SIGINT', function () {
2627
// calling .shutdown allows your process to exit normally
@@ -146,6 +147,26 @@ export class YSocketIO {
146147
* @readonly
147148
*/
148149
awaitingCleanupNamespace = new Map()
150+
/**
151+
* @type {boolean}
152+
* @private
153+
*/
154+
workerReady = false
155+
/**
156+
* @type {number | null}
157+
* @private
158+
*/
159+
workerLastHeartbeat = null
160+
/**
161+
* @type {{ promise: Promise<boolean>, resolve: (result: boolean) => void } | null}
162+
* @private
163+
*/
164+
workerHeartbeatContext = null
165+
/**
166+
* @type {NodeJS.Timeout | null}
167+
* @private
168+
*/
169+
persistWorkerHealthCheckTimeout = null
149170

150171
/**
151172
* YSocketIO constructor.
@@ -169,20 +190,22 @@ export class YSocketIO {
169190
*
170191
* It also starts socket connection listeners.
171192
* @param {import('../storage.js').AbstractStorage} store
172-
* @param {{ redisPrefix?: string, redisUrl?: string, persistWorker?: import('worker_threads').Worker }=} opts
193+
* @param {{ redisPrefix?: string, redisUrl?: string, getPersistWorker?: () => import('worker_threads').Worker }=} opts
173194
* @public
174195
*/
175-
async initialize (store, { redisUrl, redisPrefix = 'y', persistWorker } = {}) {
196+
async initialize (store, { redisUrl, redisPrefix = 'y', getPersistWorker } = {}) {
176197
const { enableAwareness } = this.configuration
177198
const [client, subscriber] = await promise.all([
178199
api.createApiClient(store, { redisUrl, redisPrefix, enableAwareness }),
179200
createSubscriber(store, { redisUrl, redisPrefix, enableAwareness })
180201
])
181202
this.client = client
182203
this.subscriber = subscriber
183-
if (persistWorker) {
184-
this.client.persistWorker = persistWorker
204+
if (getPersistWorker) {
205+
this.getPersistWorker = getPersistWorker
206+
this.persistWorker = getPersistWorker()
185207
this.registerPersistWorkerResolve()
208+
this.registerPersistWorkerHealthCheck()
186209
}
187210

188211
this.nsp = this.io.of(/^\/yjs\|.*$/)
@@ -518,16 +541,15 @@ export class YSocketIO {
518541
const doc = this.namespaceDocMap.get(namespace)?.ydoc
519542
logSocketIO(`trying to persist ${namespace}`)
520543
if (!doc) return
521-
if (this.client.persistWorker) {
544+
if (this.persistWorker && this.workerReady) {
522545
/** @type {ReturnType<typeof promiseWithResolvers<void>>} */
523546
const { promise, resolve } = promiseWithResolvers()
524-
assert(this.client?.persistWorker)
525547
this.awaitingPersistMap.set(namespace, { promise, resolve })
526548

527549
const docState = Y.encodeStateAsUpdateV2(doc)
528550
const buf = new Uint8Array(new SharedArrayBuffer(docState.length))
529551
buf.set(docState)
530-
this.client.persistWorker.postMessage({
552+
this.persistWorker.postMessage({
531553
room: namespace,
532554
docstate: buf
533555
})
@@ -627,6 +649,9 @@ export class YSocketIO {
627649

628650
destroy () {
629651
try {
652+
if (this.persistWorkerHealthCheckTimeout) {
653+
clearInterval(this.persistWorkerHealthCheckTimeout)
654+
}
630655
this.subscriber?.destroy()
631656
return this.client?.destroy()
632657
} catch (e) {
@@ -635,9 +660,13 @@ export class YSocketIO {
635660
}
636661

637662
registerPersistWorkerResolve () {
638-
if (!this.client?.persistWorker) return
639-
this.client.persistWorker.on('message', ({ event, room }) => {
663+
if (!this.persistWorker) return
664+
this.persistWorker.on('message', ({ event, room }) => {
640665
if (event === 'persisted') this.awaitingPersistMap.get(room)?.resolve()
666+
if (event === 'pong' && this.workerHeartbeatContext) {
667+
this.workerHeartbeatContext.resolve(true)
668+
}
669+
this.workerReady = true
641670
})
642671
}
643672

@@ -677,4 +706,58 @@ export class YSocketIO {
677706
this.namespaceDocMap.delete(namespace)
678707
this.namespacePersistentMap.delete(namespace)
679708
}
709+
710+
async waitUntilWorkerReady () {
711+
if (!this.persistWorker || this.workerReady) return
712+
/** @type {ReturnType<typeof promiseWithResolvers<void>>} */
713+
const { promise, resolve } = promiseWithResolvers()
714+
const timer = setInterval(() => {
715+
if (!this.workerReady) return
716+
clearInterval(timer)
717+
resolve()
718+
}, 100)
719+
await promise
720+
}
721+
722+
registerPersistWorkerHealthCheck () {
723+
this.persistWorkerHealthCheckTimeout = setTimeout(async () => {
724+
const workerHealth = await this.workerHealthCheck()
725+
if (!workerHealth) {
726+
logSocketIO('worker thread is unhealthy, recreating')
727+
assert(this.getPersistWorker)
728+
this.workerReady = false
729+
await this.persistWorker?.removeAllListeners().terminate()
730+
this.persistWorker = this.getPersistWorker()
731+
this.registerPersistWorkerResolve()
732+
await this.waitUntilWorkerReady()
733+
}
734+
this.registerPersistWorkerHealthCheck()
735+
}, WORKER_HEALTH_CHECK_INTERVAL)
736+
}
737+
738+
async workerHealthCheck () {
739+
if (!this.persistWorker || this.workerHeartbeatContext) return null
740+
if (
741+
this.workerLastHeartbeat &&
742+
Date.now() - this.workerLastHeartbeat < WORKER_HEALTH_CHECK_INTERVAL * 2
743+
) {
744+
return true
745+
}
746+
747+
/** @type {ReturnType<typeof promiseWithResolvers<boolean>>} */
748+
const { promise: heartbeatPromise, resolve } = promiseWithResolvers()
749+
this.workerHeartbeatContext = { promise: heartbeatPromise, resolve }
750+
const now = performance.now()
751+
this.persistWorker.postMessage({ event: 'ping' })
752+
const health = await Promise.race([
753+
heartbeatPromise,
754+
promise.wait(3000).then(() => false)
755+
])
756+
this.workerHeartbeatContext = null
757+
if (health) {
758+
logSocketIO(`worker health check: responded in ${performance.now() - now}ms`)
759+
this.workerLastHeartbeat = Date.now()
760+
}
761+
return health
762+
}
680763
}

0 commit comments

Comments
 (0)