@@ -6,11 +6,23 @@ import * as decoding from 'lib0/decoding'
6
6
import { assert } from 'lib0/testing'
7
7
import * as api from '../api.js'
8
8
import * as protocol from '../protocol.js'
9
+ import * as number from 'lib0/number'
10
+ import * as env from 'lib0/environment'
9
11
import { createSubscriber } from '../subscriber.js'
10
12
import { isDeepStrictEqual } from 'util'
11
13
import { User } from './user.js'
14
+ import { createModuleLogger } from 'lib0/logging'
15
+ import toobusy from 'toobusy-js'
12
16
13
- const PERSIST_INTERVAL = 5000
17
+ const logSocketIO = createModuleLogger ( '@y/socket-io/server' )
18
+ const PERSIST_INTERVAL = number . parseInt ( env . getConf ( 'y-socket-io-server-persist-interval' ) || '3000' )
19
+ const REVALIDATE_TIMEOUT = number . parseInt ( env . getConf ( 'y-socket-io-server-revalidate-timeout' ) || '60000' )
20
+
21
+ process . on ( 'SIGINT' , function ( ) {
22
+ // calling .shutdown allows your process to exit normally
23
+ toobusy . shutdown ( ) ;
24
+ process . exit ( ) ;
25
+ } ) ;
14
26
15
27
/**
16
28
* @typedef {import('socket.io').Namespace } Namespace
@@ -141,7 +153,7 @@ export class YSocketIO {
141
153
if ( this . configuration . authenticate === null ) return next ( )
142
154
const userCache = this . socketUserCache . get ( socket )
143
155
const namespace = this . getNamespaceString ( socket . nsp )
144
- if ( ! userCache || Date . now ( ) - userCache . validatedAt > 60_000 ) {
156
+ if ( ! userCache || Date . now ( ) - userCache . validatedAt > REVALIDATE_TIMEOUT ) {
145
157
this . socketUserCache . delete ( socket )
146
158
const user = await this . configuration . authenticate ( socket )
147
159
if ( ! user ) return next ( new Error ( 'Unauthorized' ) )
@@ -158,9 +170,14 @@ export class YSocketIO {
158
170
this . nsp . on ( 'connection' , async ( socket ) => {
159
171
assert ( this . client )
160
172
assert ( this . subscriber )
173
+ const namespace = this . getNamespaceString ( socket . nsp )
174
+ if ( toobusy ( ) ) {
175
+ logSocketIO ( `warning server too busy, rejecting connection: ${ namespace } ` )
176
+ throw new Error ( 'server too busy, please try again latter' )
177
+ }
161
178
if ( ! socket . user ) throw new Error ( 'user does not exist in socket' )
162
179
163
- const namespace = this . getNamespaceString ( socket . nsp )
180
+ logSocketIO ( `new connection in namespace: ${ namespace } ` )
164
181
const stream = api . computeRedisRoomStreamName (
165
182
namespace ,
166
183
'index' ,
@@ -409,7 +426,7 @@ export class YSocketIO {
409
426
changed = getDoc . changed
410
427
}
411
428
assert ( doc )
412
- if ( changed ) this . debouncedPersist ( namespace , doc . ydoc )
429
+ this . debouncedPersist ( namespace , doc . ydoc )
413
430
this . namespaceDocMap . get ( namespace ) ?. ydoc . destroy ( )
414
431
this . namespaceDocMap . set ( namespace , doc )
415
432
await this . client . trimRoomStream ( namespace , 'index' , nsp . sockets . size === 0 )
0 commit comments