@@ -23,6 +23,10 @@ module.exports = adapter;
23
23
var requestTypes = {
24
24
clients : 0 ,
25
25
clientRooms : 1 ,
26
+ allRooms : 2 ,
27
+ remoteJoin : 3 ,
28
+ remoteLeave : 4 ,
29
+ customRequest : 5 ,
26
30
} ;
27
31
28
32
/**
@@ -86,6 +90,7 @@ function adapter(uri, opts) {
86
90
this . requestChannel = prefix + '-request#' + this . nsp . name + '#' ;
87
91
this . responseChannel = prefix + '-response#' + this . nsp . name + '#' ;
88
92
this . requests = { } ;
93
+ this . customHook = function ( ) { return null ; }
89
94
90
95
if ( String . prototype . startsWith ) {
91
96
this . channelMatches = function ( messageChannel , subscribedChannel ) {
@@ -212,6 +217,59 @@ function adapter(uri, opts) {
212
217
} ) ;
213
218
break ;
214
219
220
+ case requestTypes . allRooms :
221
+
222
+ var response = JSON . stringify ( {
223
+ requestid : request . requestid ,
224
+ rooms : Object . keys ( this . rooms )
225
+ } ) ;
226
+
227
+ pub . publish ( self . responseChannel , response ) ;
228
+ break ;
229
+
230
+ case requestTypes . remoteJoin :
231
+
232
+ var socket = this . nsp . connected [ request . sid ] ;
233
+ if ( ! socket ) { return ; }
234
+
235
+ function sendAck ( ) {
236
+ var response = JSON . stringify ( {
237
+ requestid : request . requestid
238
+ } ) ;
239
+
240
+ pub . publish ( self . responseChannel , response ) ;
241
+ }
242
+
243
+ socket . join ( request . room , sendAck ) ;
244
+ break ;
245
+
246
+ case requestTypes . remoteLeave :
247
+
248
+ var socket = this . nsp . connected [ request . sid ] ;
249
+ if ( ! socket ) { return ; }
250
+
251
+ function sendAck ( ) {
252
+ var response = JSON . stringify ( {
253
+ requestid : request . requestid
254
+ } ) ;
255
+
256
+ pub . publish ( self . responseChannel , response ) ;
257
+ }
258
+
259
+ socket . leave ( request . room , sendAck ) ;
260
+ break ;
261
+
262
+ case requestTypes . customRequest :
263
+ var data = this . customHook ( request . data ) ;
264
+
265
+ var response = JSON . stringify ( {
266
+ requestid : request . requestid ,
267
+ data : data
268
+ } ) ;
269
+
270
+ pub . publish ( self . responseChannel , response ) ;
271
+ break ;
272
+
215
273
default :
216
274
debug ( 'ignoring unknown request type: %s' , request . type ) ;
217
275
}
@@ -268,6 +326,42 @@ function adapter(uri, opts) {
268
326
delete self . requests [ request . requestid ] ;
269
327
break ;
270
328
329
+ case requestTypes . allRooms :
330
+ request . msgCount ++ ;
331
+
332
+ // ignore if response does not contain 'rooms' key
333
+ if ( ! response . rooms || ! Array . isArray ( response . rooms ) ) return ;
334
+
335
+ for ( var i = 0 ; i < response . rooms . length ; i ++ ) {
336
+ request . rooms [ response . rooms [ i ] ] = true ;
337
+ }
338
+
339
+ if ( request . msgCount === request . numsub ) {
340
+ clearTimeout ( request . timeout ) ;
341
+ if ( request . callback ) process . nextTick ( request . callback . bind ( null , null , Object . keys ( request . rooms ) ) ) ;
342
+ delete self . requests [ request . requestid ] ;
343
+ }
344
+ break ;
345
+
346
+ case requestTypes . remoteJoin :
347
+ case requestTypes . remoteLeave :
348
+ clearTimeout ( request . timeout ) ;
349
+ if ( request . callback ) process . nextTick ( request . callback . bind ( null , null ) ) ;
350
+ delete self . requests [ request . requestid ] ;
351
+ break ;
352
+
353
+ case requestTypes . customRequest :
354
+ request . msgCount ++ ;
355
+
356
+ request . replies . push ( response . data ) ;
357
+
358
+ if ( request . msgCount === request . numsub ) {
359
+ clearTimeout ( request . timeout ) ;
360
+ if ( request . callback ) process . nextTick ( request . callback . bind ( null , null , request . replies ) ) ;
361
+ delete self . requests [ request . requestid ] ;
362
+ }
363
+ break ;
364
+
271
365
default :
272
366
debug ( 'ignoring unknown request type: %s' , request . type ) ;
273
367
}
@@ -489,6 +583,190 @@ function adapter(uri, opts) {
489
583
pub . publish ( self . requestChannel , request ) ;
490
584
} ;
491
585
586
+ /**
587
+ * Gets the list of all rooms (accross every node)
588
+ *
589
+ * @param {Function } callback
590
+ * @api public
591
+ */
592
+
593
+ Redis . prototype . allRooms = function ( fn ) {
594
+
595
+ var self = this ;
596
+ var requestid = uid2 ( 6 ) ;
597
+
598
+ pub . send_command ( 'pubsub' , [ 'numsub' , self . requestChannel ] , function ( err , numsub ) {
599
+ if ( err ) {
600
+ self . emit ( 'error' , err ) ;
601
+ if ( fn ) fn ( err ) ;
602
+ return ;
603
+ }
604
+
605
+ numsub = parseInt ( numsub [ 1 ] , 10 ) ;
606
+
607
+ var request = JSON . stringify ( {
608
+ requestid : requestid ,
609
+ type : requestTypes . allRooms
610
+ } ) ;
611
+
612
+ // if there is no response for x second, return result
613
+ var timeout = setTimeout ( function ( ) {
614
+ var request = self . requests [ requestid ] ;
615
+ if ( fn ) process . nextTick ( fn . bind ( null , new Error ( 'timeout reached while waiting for allRooms response' ) , Object . keys ( request . rooms ) ) ) ;
616
+ delete self . requests [ requestid ] ;
617
+ } , self . requestsTimeout ) ;
618
+
619
+ self . requests [ requestid ] = {
620
+ type : requestTypes . allRooms ,
621
+ numsub : numsub ,
622
+ msgCount : 0 ,
623
+ rooms : { } ,
624
+ callback : fn ,
625
+ timeout : timeout
626
+ } ;
627
+
628
+ pub . publish ( self . requestChannel , request ) ;
629
+ } ) ;
630
+ } ;
631
+
632
+ /**
633
+ * Makes the socket with the given id join the room
634
+ *
635
+ * @param {String } socket id
636
+ * @param {String } room name
637
+ * @param {Function } callback
638
+ * @api public
639
+ */
640
+
641
+ Redis . prototype . remoteJoin = function ( id , room , fn ) {
642
+
643
+ var self = this ;
644
+ var requestid = uid2 ( 6 ) ;
645
+
646
+ var socket = this . nsp . connected [ id ] ;
647
+ if ( socket ) {
648
+ socket . join ( room ) ;
649
+ if ( fn ) process . nextTick ( fn . bind ( null , null ) ) ;
650
+ return ;
651
+ }
652
+
653
+ var request = JSON . stringify ( {
654
+ requestid : requestid ,
655
+ type : requestTypes . remoteJoin ,
656
+ sid : id ,
657
+ room : room
658
+ } ) ;
659
+
660
+ // if there is no response for x second, return result
661
+ var timeout = setTimeout ( function ( ) {
662
+ if ( fn ) process . nextTick ( fn . bind ( null , new Error ( 'timeout reached while waiting for remoteJoin response' ) ) ) ;
663
+ delete self . requests [ requestid ] ;
664
+ } , self . requestsTimeout ) ;
665
+
666
+ self . requests [ requestid ] = {
667
+ type : requestTypes . remoteJoin ,
668
+ callback : fn ,
669
+ timeout : timeout
670
+ } ;
671
+
672
+ pub . publish ( self . requestChannel , request ) ;
673
+ } ;
674
+
675
+ /**
676
+ * Makes the socket with the given id leave the room
677
+ *
678
+ * @param {String } socket id
679
+ * @param {String } room name
680
+ * @param {Function } callback
681
+ * @api public
682
+ */
683
+
684
+ Redis . prototype . remoteLeave = function ( id , room , fn ) {
685
+
686
+ var self = this ;
687
+ var requestid = uid2 ( 6 ) ;
688
+
689
+ var socket = this . nsp . connected [ id ] ;
690
+ if ( socket ) {
691
+ socket . leave ( room ) ;
692
+ if ( fn ) process . nextTick ( fn . bind ( null , null ) ) ;
693
+ return ;
694
+ }
695
+
696
+ var request = JSON . stringify ( {
697
+ requestid : requestid ,
698
+ type : requestTypes . remoteLeave ,
699
+ sid : id ,
700
+ room : room
701
+ } ) ;
702
+
703
+ // if there is no response for x second, return result
704
+ var timeout = setTimeout ( function ( ) {
705
+ if ( fn ) process . nextTick ( fn . bind ( null , new Error ( 'timeout reached while waiting for remoteLeave response' ) ) ) ;
706
+ delete self . requests [ requestid ] ;
707
+ } , self . requestsTimeout ) ;
708
+
709
+ self . requests [ requestid ] = {
710
+ type : requestTypes . remoteLeave ,
711
+ callback : fn ,
712
+ timeout : timeout
713
+ } ;
714
+
715
+ pub . publish ( self . requestChannel , request ) ;
716
+ } ;
717
+
718
+ /**
719
+ * Sends a new custom request to other nodes
720
+ *
721
+ * @param {Object } data (no binary)
722
+ * @param {Function } callback
723
+ * @api public
724
+ */
725
+
726
+ Redis . prototype . customRequest = function ( data , fn ) {
727
+ if ( typeof data === 'function' ) {
728
+ fn = data ;
729
+ data = null ;
730
+ }
731
+
732
+ var self = this ;
733
+ var requestid = uid2 ( 6 ) ;
734
+
735
+ pub . send_command ( 'pubsub' , [ 'numsub' , self . requestChannel ] , function ( err , numsub ) {
736
+ if ( err ) {
737
+ self . emit ( 'error' , err ) ;
738
+ if ( fn ) fn ( err ) ;
739
+ return ;
740
+ }
741
+
742
+ numsub = parseInt ( numsub [ 1 ] , 10 ) ;
743
+
744
+ var request = JSON . stringify ( {
745
+ requestid : requestid ,
746
+ type : requestTypes . customRequest ,
747
+ data : data
748
+ } ) ;
749
+
750
+ // if there is no response for x second, return result
751
+ var timeout = setTimeout ( function ( ) {
752
+ var request = self . requests [ requestid ] ;
753
+ if ( fn ) process . nextTick ( fn . bind ( null , new Error ( 'timeout reached while waiting for customRequest response' ) , request . replies ) ) ;
754
+ delete self . requests [ requestid ] ;
755
+ } , self . requestsTimeout ) ;
756
+
757
+ self . requests [ requestid ] = {
758
+ type : requestTypes . customRequest ,
759
+ numsub : numsub ,
760
+ msgCount : 0 ,
761
+ replies : [ ] ,
762
+ callback : fn ,
763
+ timeout : timeout
764
+ } ;
765
+
766
+ pub . publish ( self . requestChannel , request ) ;
767
+ } ) ;
768
+ } ;
769
+
492
770
Redis . uid = uid ;
493
771
Redis . pubClient = pub ;
494
772
Redis . subClient = sub ;
0 commit comments