Skip to content

Commit 4f3cefd

Browse files
refactor: minor cleanup
1 parent d3388bf commit 4f3cefd

File tree

3 files changed

+31
-47
lines changed

3 files changed

+31
-47
lines changed

lib/index.ts

+15-36
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import uid2 = require("uid2");
22
import msgpack = require("notepack.io");
33
import { Adapter, BroadcastOptions, Room } from "socket.io-adapter";
4+
import { parseNumSubResponse, sumValues } from "./util";
45

56
const debug = require("debug")("socket.io-redis");
67

@@ -688,7 +689,7 @@ export class RedisAdapter extends Adapter {
688689
*/
689690
public async allRooms(): Promise<Set<Room>> {
690691
const localRooms = new Set(this.rooms.keys());
691-
const numSub = await this.getNumSub();
692+
const numSub = await this.serverCount();
692693
debug('waiting for %d responses to "allRooms" request', numSub);
693694

694695
if (numSub <= 1) {
@@ -732,7 +733,7 @@ export class RedisAdapter extends Adapter {
732733
return localSockets;
733734
}
734735

735-
const numSub = await this.getNumSub();
736+
const numSub = await this.serverCount();
736737
debug('waiting for %d responses to "fetchSockets" request', numSub);
737738

738739
if (numSub <= 1) {
@@ -849,7 +850,7 @@ export class RedisAdapter extends Adapter {
849850

850851
private async serverSideEmitWithAck(packet: any[]) {
851852
const ack = packet.pop();
852-
const numSub = (await this.getNumSub()) - 1; // ignore self
853+
const numSub = (await this.serverCount()) - 1; // ignore self
853854

854855
debug('waiting for %d responses to "serverSideEmit" request', numSub);
855856

@@ -889,13 +890,7 @@ export class RedisAdapter extends Adapter {
889890
this.pubClient.publish(this.requestChannel, request);
890891
}
891892

892-
/**
893-
* Get the number of subscribers of the request channel
894-
*
895-
* @private
896-
*/
897-
898-
private getNumSub(): Promise<number> {
893+
override serverCount(): Promise<number> {
899894
if (
900895
this.pubClient.constructor.name === "Cluster" ||
901896
this.pubClient.isCluster
@@ -904,39 +899,27 @@ export class RedisAdapter extends Adapter {
904899
const nodes = this.pubClient.nodes();
905900
return Promise.all(
906901
nodes.map((node) =>
907-
node.send_command("pubsub", ["numsub", this.requestChannel])
902+
node
903+
.send_command("pubsub", ["numsub", this.requestChannel])
904+
.then(parseNumSubResponse)
908905
)
909-
).then((values) => {
910-
let numSub = 0;
911-
values.forEach((value) => {
912-
numSub += parseInt(value[1], 10);
913-
});
914-
return numSub;
915-
});
906+
).then(sumValues);
916907
} else if (typeof this.pubClient.pSubscribe === "function") {
917908
// node-redis client
918909
const isCluster = Array.isArray(this.pubClient.masters);
919910
if (isCluster) {
920911
const nodes = this.pubClient.masters;
921912
return Promise.all(
922913
nodes.map((node) => {
923-
return node.client.sendCommand([
924-
"pubsub",
925-
"numsub",
926-
this.requestChannel,
927-
]);
914+
return node.client
915+
.sendCommand(["pubsub", "numsub", this.requestChannel])
916+
.then(parseNumSubResponse);
928917
})
929-
).then((values) => {
930-
let numSub = 0;
931-
values.map((value) => {
932-
numSub += parseInt(value[1], 10);
933-
});
934-
return numSub;
935-
});
918+
).then(sumValues);
936919
} else {
937920
return this.pubClient
938921
.sendCommand(["pubsub", "numsub", this.requestChannel])
939-
.then((res) => parseInt(res[1], 10));
922+
.then(parseNumSubResponse);
940923
}
941924
} else {
942925
// ioredis or node-redis v3 client
@@ -946,17 +929,13 @@ export class RedisAdapter extends Adapter {
946929
["numsub", this.requestChannel],
947930
(err, numSub) => {
948931
if (err) return reject(err);
949-
resolve(parseInt(numSub[1], 10));
932+
resolve(parseNumSubResponse(numSub));
950933
}
951934
);
952935
});
953936
}
954937
}
955938

956-
serverCount(): Promise<number> {
957-
return this.getNumSub();
958-
}
959-
960939
close(): Promise<void> | void {
961940
const isRedisV4 = typeof this.pubClient.pSubscribe === "function";
962941
if (isRedisV4) {

lib/sharded-adapter.ts

+6-11
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { ClusterAdapter, ClusterMessage, MessageType } from "./cluster-adapter";
22
import { decode, encode } from "notepack.io";
3-
import { hasBinary } from "./util";
3+
import { hasBinary, parseNumSubResponse, sumValues } from "./util";
44
import debugModule from "debug";
55

66
const debug = debugModule("socket.io-redis");
@@ -60,7 +60,6 @@ class ShardedRedisAdapter extends ClusterAdapter {
6060
private readonly opts: Required<ShardedRedisAdapterOptions>;
6161
private readonly channel: string;
6262
private readonly responseChannel: string;
63-
private readonly cleanup: () => void;
6463

6564
constructor(nsp, pubClient, subClient, opts: ShardedRedisAdapterOptions) {
6665
super(nsp);
@@ -196,19 +195,15 @@ class ShardedRedisAdapter extends ClusterAdapter {
196195
) {
197196
return Promise.all(
198197
this.pubClient.nodes().map((node) => {
199-
return node.sendCommand(["PUBSUB", "SHARDNUMSUB", this.channel]);
198+
return node
199+
.sendCommand(["PUBSUB", "SHARDNUMSUB", this.channel])
200+
.then(parseNumSubResponse);
200201
})
201-
).then((values) => {
202-
let numSub = 0;
203-
values.forEach((value) => {
204-
numSub += parseInt(value[1], 10);
205-
});
206-
return numSub;
207-
});
202+
).then(sumValues);
208203
} else {
209204
return this.pubClient
210205
.sendCommand(["PUBSUB", "SHARDNUMSUB", this.channel])
211-
.then((res) => parseInt(res[1], 10));
206+
.then(parseNumSubResponse);
212207
}
213208
}
214209
}

lib/util.ts

+10
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,13 @@ export function hasBinary(obj: any, toJSON?: boolean): boolean {
3434
export function randomId() {
3535
return randomBytes(8).toString("hex");
3636
}
37+
38+
export function parseNumSubResponse(res) {
39+
return parseInt(res[1], 10);
40+
}
41+
42+
export function sumValues(values) {
43+
return values.reduce((acc, val) => {
44+
return acc + val;
45+
}, 0);
46+
}

0 commit comments

Comments
 (0)