From 86046b7444fbf9d61e5c71c369718deb6ed04815 Mon Sep 17 00:00:00 2001 From: "H. Temelski" Date: Thu, 13 Mar 2025 18:01:07 +0200 Subject: [PATCH 1/7] [CAE-342] Fix a couple of bugs --- packages/client/lib/client/index.ts | 8 ++++++++ packages/client/lib/sentinel/index.spec.ts | 5 +++++ packages/client/lib/sentinel/index.ts | 6 +++++- packages/client/lib/sentinel/test-util.ts | 11 ++++++++--- 4 files changed, 26 insertions(+), 4 deletions(-) diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index 5dae1271ecb..41a5881e857 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -325,6 +325,14 @@ export default class RedisClient< return this._self.#watchEpoch !== undefined; } + get watchEpoch() { + return this._self.#watchEpoch + } + + setWatchEpoch(watchEpoch: number | undefined) { + this._self.#watchEpoch = watchEpoch + } + setDirtyWatch(msg: string) { this._self.#dirtyWatch = msg; } diff --git a/packages/client/lib/sentinel/index.spec.ts b/packages/client/lib/sentinel/index.spec.ts index be5522bdd8d..4e1cd5a8fb1 100644 --- a/packages/client/lib/sentinel/index.spec.ts +++ b/packages/client/lib/sentinel/index.spec.ts @@ -197,6 +197,11 @@ async function steadyState(frame: SentinelFramework) { await assert.doesNotReject(sentinel.get('x')); }); + + it('failed to connect', async function() { + sentinel = frame.getSentinelClient({sentinelRootNodes: [{host: "127.0.0.1", port: 1010}], maxCommandRediscovers: 0}) + await assert.rejects(sentinel.connect()); + }); it('try to connect multiple times', async function () { sentinel = frame.getSentinelClient(); diff --git a/packages/client/lib/sentinel/index.ts b/packages/client/lib/sentinel/index.ts index 92a87fbb145..e8b4a6bb17b 100644 --- a/packages/client/lib/sentinel/index.ts +++ b/packages/client/lib/sentinel/index.ts @@ -682,9 +682,10 @@ class RedisSentinelInternal< async #connect() { let count = 0; - while (true) { + while (true) { this.#trace("starting connect loop"); + count+=1; if (this.#destroy) { this.#trace("in #connect and want to destroy") return; @@ -1106,10 +1107,12 @@ class RedisSentinelInternal< this.#trace(`transform: opening a new master`); const masterPromises = []; const masterWatches: Array = []; + const watchEpochs: Array = []; this.#trace(`transform: destroying old masters if open`); for (const client of this.#masterClients) { masterWatches.push(client.isWatching); + watchEpochs.push(client.watchEpoch); if (client.isOpen) { client.destroy() @@ -1135,6 +1138,7 @@ class RedisSentinelInternal< }); if (masterWatches[i]) { + client.setWatchEpoch(watchEpochs[i]) client.setDirtyWatch("sentinel config changed in middle of a WATCH Transaction"); } this.#masterClients.push(client); diff --git a/packages/client/lib/sentinel/test-util.ts b/packages/client/lib/sentinel/test-util.ts index 25dd4c4371a..72e0564ffa5 100644 --- a/packages/client/lib/sentinel/test-util.ts +++ b/packages/client/lib/sentinel/test-util.ts @@ -180,9 +180,14 @@ export class SentinelFramework extends DockerBase { RedisScripts, RespVersions, TypeMapping>>, errors = true) { - if (opts?.sentinelRootNodes !== undefined) { - throw new Error("cannot specify sentinelRootNodes here"); - } + // remove this safeguard + // in order to test the case when + // connecting to sentinel fails + + // if (opts?.sentinelRootNodes !== undefined) { + // throw new Error("cannot specify sentinelRootNodes here"); + // } + if (opts?.name !== undefined) { throw new Error("cannot specify sentinel db name here"); } From f94e8a591b3cfc4118196f4db0c3cdcee2e1bc17 Mon Sep 17 00:00:00 2001 From: "H. Temelski" Date: Wed, 19 Mar 2025 11:31:12 +0200 Subject: [PATCH 2/7] Fixed issue with nodes masterauth persistency, changed docker container --- packages/client/lib/sentinel/index.spec.ts | 4 ++-- packages/client/lib/sentinel/test-util.ts | 14 ++++---------- 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/packages/client/lib/sentinel/index.spec.ts b/packages/client/lib/sentinel/index.spec.ts index 4e1cd5a8fb1..4f7a259f617 100644 --- a/packages/client/lib/sentinel/index.spec.ts +++ b/packages/client/lib/sentinel/index.spec.ts @@ -78,7 +78,7 @@ async function steadyState(frame: SentinelFramework) { } ["redis-sentinel-test-password", undefined].forEach(function (password) { - describe.skip(`Sentinel - password = ${password}`, () => { + describe(`Sentinel - password = ${password}`, () => { const config: RedisSentinelConfig = { sentinelName: "test", numberOfNodes: 3, password: password }; const frame = new SentinelFramework(config); let tracer = new Array(); @@ -441,7 +441,7 @@ async function steadyState(frame: SentinelFramework) { assert.equal(await promise, null); }); - it('reserve client, takes a client out of pool', async function () { + it.skip('reserve client, takes a client out of pool', async function () { this.timeout(30000); sentinel = frame.getSentinelClient({ masterPoolSize: 2, reserveClient: true }); diff --git a/packages/client/lib/sentinel/test-util.ts b/packages/client/lib/sentinel/test-util.ts index 72e0564ffa5..520cc73110d 100644 --- a/packages/client/lib/sentinel/test-util.ts +++ b/packages/client/lib/sentinel/test-util.ts @@ -55,7 +55,7 @@ export interface RedisServerDocker { abstract class DockerBase { async spawnRedisServerDocker({ image, version }: RedisServerDockerConfig, serverArguments: Array, environment?: string): Promise { const port = (await portIterator.next()).value; - let cmdLine = `docker run --init -d --network host `; + let cmdLine = `docker run --init -d --network host -e PORT=${port.toString()} `; if (environment !== undefined) { cmdLine += `-e ${environment} `; } @@ -250,13 +250,13 @@ export class SentinelFramework extends DockerBase { } protected async spawnRedisSentinelNodeDocker() { - const imageInfo: RedisServerDockerConfig = this.config.nodeDockerConfig ?? { image: "redis/redis-stack-server", version: "latest" }; + const imageInfo: RedisServerDockerConfig = this.config.nodeDockerConfig ?? { image: "redislabs/client-libs-test", version: "8.0-M05-pre" }; const serverArguments: Array = this.config.nodeServerArguments ?? []; let environment; if (this.config.password !== undefined) { - environment = `REDIS_ARGS="{port} --requirepass ${this.config.password}"`; + environment = `REDIS_PASSWORD=${this.config.password}`; } else { - environment = 'REDIS_ARGS="{port}"'; + environment = undefined; } const docker = await this.spawnRedisServerDocker(imageInfo, serverArguments, environment); @@ -281,9 +281,6 @@ export class SentinelFramework extends DockerBase { for (let i = 0; i < (this.config.numberOfNodes ?? 0) - 1; i++) { promises.push( this.spawnRedisSentinelNodeDocker().then(async node => { - if (this.config.password !== undefined) { - await node.client.configSet({'masterauth': this.config.password}) - } await node.client.replicaOf('127.0.0.1', master.docker.port); return node; }) @@ -406,9 +403,6 @@ export class SentinelFramework extends DockerBase { const masterPort = await this.getMasterPort(); const newNode = await this.spawnRedisSentinelNodeDocker(); - if (this.config.password !== undefined) { - await newNode.client.configSet({'masterauth': this.config.password}) - } await newNode.client.replicaOf('127.0.0.1', masterPort); this.#nodeList.push(newNode); From 9d08e166d7c4735eef0d9323ad96bca731de7f87 Mon Sep 17 00:00:00 2001 From: "H. Temelski" Date: Fri, 21 Mar 2025 12:31:10 +0200 Subject: [PATCH 3/7] [CAE-342] Fixed a couple of sentinel issues, enabled most tests --- packages/client/lib/client/index.ts | 8 ++------ packages/client/lib/sentinel/index.spec.ts | 6 +++--- packages/client/lib/sentinel/index.ts | 5 +---- 3 files changed, 6 insertions(+), 13 deletions(-) diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index 41a5881e857..85d64776858 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -325,12 +325,8 @@ export default class RedisClient< return this._self.#watchEpoch !== undefined; } - get watchEpoch() { - return this._self.#watchEpoch - } - - setWatchEpoch(watchEpoch: number | undefined) { - this._self.#watchEpoch = watchEpoch + get isDirtyWatch() { + return this._self.#dirtyWatch !== undefined } setDirtyWatch(msg: string) { diff --git a/packages/client/lib/sentinel/index.spec.ts b/packages/client/lib/sentinel/index.spec.ts index 4f7a259f617..9e0628bcd55 100644 --- a/packages/client/lib/sentinel/index.spec.ts +++ b/packages/client/lib/sentinel/index.spec.ts @@ -486,7 +486,7 @@ async function steadyState(frame: SentinelFramework) { }); // by taking a lease, we know we will block on master as no clients are available, but as read occuring, means replica read occurs - it('replica reads', async function () { + it.skip('replica reads', async function () { this.timeout(30000); sentinel = frame.getSentinelClient({ replicaPoolSize: 1 }); @@ -723,7 +723,7 @@ async function steadyState(frame: SentinelFramework) { tracer.push("multi was rejected"); }); - it('plain pubsub - channel', async function () { + it.skip('plain pubsub - channel', async function () { this.timeout(30000); sentinel = frame.getSentinelClient(); @@ -762,7 +762,7 @@ async function steadyState(frame: SentinelFramework) { assert.equal(tester, false); }); - it('plain pubsub - pattern', async function () { + it.skip('plain pubsub - pattern', async function () { this.timeout(30000); sentinel = frame.getSentinelClient(); diff --git a/packages/client/lib/sentinel/index.ts b/packages/client/lib/sentinel/index.ts index e8b4a6bb17b..2ec445e639b 100644 --- a/packages/client/lib/sentinel/index.ts +++ b/packages/client/lib/sentinel/index.ts @@ -1107,12 +1107,10 @@ class RedisSentinelInternal< this.#trace(`transform: opening a new master`); const masterPromises = []; const masterWatches: Array = []; - const watchEpochs: Array = []; this.#trace(`transform: destroying old masters if open`); for (const client of this.#masterClients) { - masterWatches.push(client.isWatching); - watchEpochs.push(client.watchEpoch); + masterWatches.push(client.isWatching || client.isDirtyWatch); if (client.isOpen) { client.destroy() @@ -1138,7 +1136,6 @@ class RedisSentinelInternal< }); if (masterWatches[i]) { - client.setWatchEpoch(watchEpochs[i]) client.setDirtyWatch("sentinel config changed in middle of a WATCH Transaction"); } this.#masterClients.push(client); From 8b085631cc562ed56e23f239852a86bd922f2d15 Mon Sep 17 00:00:00 2001 From: "H. Temelski" Date: Tue, 25 Mar 2025 13:38:44 +0200 Subject: [PATCH 4/7] [CAE-342] Added comment --- packages/client/lib/client/index.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index 85d64776858..d96f1e3fba6 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -299,6 +299,9 @@ export default class RedisClient< #monitorCallback?: MonitorCallback; private _self = this; private _commandOptions?: CommandOptions; + // flag used to annotate that the client + // was in a watch transaction when + // a topology change occured #dirtyWatch?: string; #epoch: number; #watchEpoch?: number; From 9633d9f52a7bac4d9f514fed7537619bc5fce752 Mon Sep 17 00:00:00 2001 From: "H. Temelski" Date: Fri, 25 Apr 2025 15:45:15 +0300 Subject: [PATCH 5/7] [CAE-342] Migrate majority of tests to testUtils --- packages/client/lib/sentinel/index.spec.ts | 2016 +++++++++----------- packages/client/lib/sentinel/index.ts | 18 +- packages/client/lib/sentinel/test-util.ts | 2 +- packages/client/lib/test-utils.ts | 65 +- packages/test-utils/lib/dockers.ts | 110 +- packages/test-utils/lib/index.ts | 115 +- 6 files changed, 1148 insertions(+), 1178 deletions(-) diff --git a/packages/client/lib/sentinel/index.spec.ts b/packages/client/lib/sentinel/index.spec.ts index 9e0628bcd55..a4fa361eed2 100644 --- a/packages/client/lib/sentinel/index.spec.ts +++ b/packages/client/lib/sentinel/index.spec.ts @@ -1,29 +1,334 @@ import { strict as assert } from 'node:assert'; import { setTimeout } from 'node:timers/promises'; +import testUtils, { GLOBAL } from '../test-utils'; +import { RESP_TYPES } from '../RESP/decoder'; import { WatchError } from "../errors"; +import { MATH_FUNCTION } from '../commands/FUNCTION_LOAD.spec'; import { RedisSentinelConfig, SentinelFramework } from "./test-util"; -import { RedisNode, RedisSentinelClientType, RedisSentinelEvent, RedisSentinelType } from "./types"; -import { RedisSentinelFactory } from '.'; -import { RedisClientType } from '../client'; -import { RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping, NumberReply } from '../RESP/types'; - +import { RedisSentinelEvent, RedisSentinelType, RedisSentinelClientType, RedisNode } from "./types"; +import { RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping } from '../RESP/types'; import { promisify } from 'node:util'; import { exec } from 'node:child_process'; -import { RESP_TYPES } from '../RESP/decoder'; -import { defineScript } from '../lua-script'; -import { MATH_FUNCTION } from '../commands/FUNCTION_LOAD.spec'; -import RedisBloomModules from '@redis/bloom'; -import { RedisTcpSocketOptions } from '../client/socket'; -import { SQUARE_SCRIPT } from '../client/index.spec'; - const execAsync = promisify(exec); -/* used to ensure test environment resets to normal state - i.e. - - all redis nodes are active and are part of the topology - before allowing things to continue. -*/ +[GLOBAL.SENTINEL.PASSWORD, GLOBAL.SENTINEL.OPEN].forEach(testOptions => { + describe(`test with password - ${testOptions.password}`, () => { + testUtils.testWithClientSentinel('client should be authenticated', async sentinel => { + await assert.doesNotReject(sentinel.set('x', 1)); + }, testOptions); + + testUtils.testWithClientSentinel('try to connect multiple times', async sentinel => { + await assert.rejects(sentinel.connect()); + }, testOptions); + + + testUtils.testWithClientSentinel('should respect type mapping', async sentinel => { + const typeMapped = sentinel.withTypeMapping({ + [RESP_TYPES.SIMPLE_STRING]: Buffer + }); + + const resp = await typeMapped.ping(); + assert.deepEqual(resp, Buffer.from('PONG')); + }, testOptions); + + testUtils.testWithClientSentinel('many readers', async sentinel => { + await sentinel.set("x", 1); + for (let i = 0; i < 10; i++) { + if (await sentinel.get("x") == "1") { + break; + } + await setTimeout(1000); + } + + const promises: Array> = []; + for (let i = 0; i < 500; i++) { + promises.push(sentinel.get("x")); + } + + const resp = await Promise.all(promises); + assert.equal(resp.length, 500); + for (let i = 0; i < 500; i++) { + assert.equal(resp[i], "1", `failed on match at ${i}`); + } + }, testOptions); + + testUtils.testWithClientSentinel('use', async sentinel => { + await sentinel.use( + async (client: any ) => { + await assert.doesNotReject(client.get('x')); + } + ); + }, testOptions); + + testUtils.testWithClientSentinel('watch does not carry over leases', async sentinel => { + assert.equal(await sentinel.use(client => client.watch("x")), 'OK') + assert.equal(await sentinel.use(client => client.set('x', 1)), 'OK'); + assert.deepEqual(await sentinel.use(client => client.multi().get('x').exec()), ['1']); + }, testOptions); + + testUtils.testWithClientSentinel('plain pubsub - channel', async sentinel => { + let pubSubResolve; + const pubSubPromise = new Promise((res) => { + pubSubResolve = res; + }); + + let tester = false; + await sentinel.subscribe('test', () => { + tester = true; + pubSubResolve(1); + }) + + await sentinel.publish('test', 'hello world'); + await pubSubPromise; + assert.equal(tester, true); + + // now unsubscribe + tester = false; + await sentinel.unsubscribe('test') + await sentinel.publish('test', 'hello world'); + await setTimeout(1000); + + assert.equal(tester, false); + }, testOptions); + + testUtils.testWithClientSentinel('plain pubsub - pattern', async sentinel => { + let pubSubResolve; + const pubSubPromise = new Promise((res) => { + pubSubResolve = res; + }); + + let tester = false; + await sentinel.pSubscribe('test*', () => { + tester = true; + pubSubResolve(1); + }) + + await sentinel.publish('testy', 'hello world'); + await pubSubPromise; + assert.equal(tester, true); + + // now unsubscribe + tester = false; + await sentinel.pUnsubscribe('test*'); + await sentinel.publish('testy', 'hello world'); + await setTimeout(1000); + + assert.equal(tester, false); + }, testOptions) + }); +}); + +describe(`test with scripts`, () => { + testUtils.testWithClientSentinel('with script', async sentinel => { + const [, reply] = await Promise.all([ + sentinel.set('key', '2'), + sentinel.square('key') + ]); + + assert.equal(reply, 4); + }, GLOBAL.SENTINEL.WITH_SCRIPT); + + testUtils.testWithClientSentinel('with script multi', async sentinel => { + const reply = await sentinel.multi().set('key', 2).square('key').exec(); + assert.deepEqual(reply, ['OK', 4]); + }, GLOBAL.SENTINEL.WITH_SCRIPT); + + testUtils.testWithClientSentinel('use with script', async sentinel => { + const reply = await sentinel.use( + async (client: any) => { + assert.equal(await client.set('key', '2'), 'OK'); + assert.equal(await client.get('key'), '2'); + return client.square('key') + } + ); + }, GLOBAL.SENTINEL.WITH_SCRIPT) +}); + + +describe(`test with functions`, () => { + testUtils.testWithClientSentinel('with function', async sentinel => { + await sentinel.functionLoad( + MATH_FUNCTION.code, + { REPLACE: true } + ); + + await sentinel.set('key', '2'); + const resp = await sentinel.math.square('key'); + + assert.equal(resp, 4); + }, GLOBAL.SENTINEL.WITH_FUNCTION); + + testUtils.testWithClientSentinel('with function multi', async sentinel => { + await sentinel.functionLoad( + MATH_FUNCTION.code, + { REPLACE: true } + ); + + const reply = await sentinel.multi().set('key', 2).math.square('key').exec(); + assert.deepEqual(reply, ['OK', 4]); + }, GLOBAL.SENTINEL.WITH_FUNCTION); + + testUtils.testWithClientSentinel('use with function', async sentinel => { + await sentinel.functionLoad( + MATH_FUNCTION.code, + { REPLACE: true } + ); + + const reply = await sentinel.use( + async (client: any) => { + await client.set('key', '2'); + return client.math.square('key'); + } + ); + + assert.equal(reply, 4); + }, GLOBAL.SENTINEL.WITH_FUNCTION); +}); + +describe(`test with modules`, () => { + testUtils.testWithClientSentinel('with module', async sentinel => { + const resp = await sentinel.bf.add('key', 'item') + assert.equal(resp, true); + }, GLOBAL.SENTINEL.WITH_MODULE); + + testUtils.testWithClientSentinel('with module multi', async sentinel => { + const resp = await sentinel.multi().bf.add('key', 'item').exec(); + assert.deepEqual(resp, [true]); + }, GLOBAL.SENTINEL.WITH_MODULE); + + testUtils.testWithClientSentinel('use with module', async sentinel => { + const reply = await sentinel.use( + async (client: any) => { + return client.bf.add('key', 'item'); + } + ); + + assert.equal(reply, true); + }, GLOBAL.SENTINEL.WITH_MODULE); +}); + +describe(`test with replica pool size 1`, () => { + testUtils.testWithClientSentinel('client lease', async sentinel => { + sentinel.on("error", () => { }); + + const clientLease = await sentinel.aquire(); + clientLease.set('x', 456); + + let matched = false; + /* waits for replication */ + for (let i = 0; i < 15; i++) { + try { + assert.equal(await sentinel.get("x"), '456'); + matched = true; + break; + } catch (err) { + await setTimeout(1000); + } + } + + clientLease.release(); + + assert.equal(matched, true); + }, GLOBAL.SENTINEL.WITH_REPLICA_POOL_SIZE_1); + + testUtils.testWithClientSentinel('block on pool', async sentinel => { + const promise = sentinel.use( + async client => { + await setTimeout(1000); + return await client.get("x"); + } + ) + + await sentinel.set("x", 1); + assert.equal(await promise, null); + }, GLOBAL.SENTINEL.WITH_REPLICA_POOL_SIZE_1); + + testUtils.testWithClientSentinel('pipeline', async sentinel => { + const resp = await sentinel.multi().set('x', 1).get('x').execAsPipeline(); + assert.deepEqual(resp, ['OK', '1']); + }, GLOBAL.SENTINEL.WITH_REPLICA_POOL_SIZE_1); +}); + +describe(`test with masterPoolSize 2, reserve client true`, () => { + // TODO: flaky test, sometimes fails with `promise1 === null` + testUtils.testWithClientSentinel('reserve client, takes a client out of pool', async sentinel => { + const promise1 = sentinel.use( + async client => { + const val = await client.get("x"); + await client.set("x", 2); + return val; + } + ) + + const promise2 = sentinel.use( + async client => { + return client.get("x"); + } + ) + + await sentinel.set("x", 1); + assert.equal(await promise1, "1"); + assert.equal(await promise2, "2"); + }, Object.assign(GLOBAL.SENTINEL.WITH_RESERVE_CLIENT_MASTER_POOL_SIZE_2, {skipTest: true})); +}); + +describe(`test with masterPoolSize 2`, () => { + testUtils.testWithClientSentinel('multple clients', async sentinel => { + sentinel.on("error", () => { }); + + const promise = sentinel.use( + async client => { + await sentinel!.set("x", 1); + await client.get("x"); + } + ) + + await assert.doesNotReject(promise); + }, GLOBAL.SENTINEL.WITH_MASTER_POOL_SIZE_2); + + testUtils.testWithClientSentinel('use - watch - clean', async sentinel => { + let promise = sentinel.use(async (client) => { + await client.set("x", 1); + await client.watch("x"); + return client.multi().get("x").exec(); + }); + + assert.deepEqual(await promise, ['1']); + }, GLOBAL.SENTINEL.WITH_MASTER_POOL_SIZE_2); + + testUtils.testWithClientSentinel('use - watch - dirty', async sentinel => { + let promise = sentinel.use(async (client) => { + await client.set('x', 1); + await client.watch('x'); + await sentinel!.set('x', 2); + return client.multi().get('x').exec(); + }); + + await assert.rejects(promise, new WatchError()); + }, GLOBAL.SENTINEL.WITH_MASTER_POOL_SIZE_2); + testUtils.testWithClientSentinel('lease - watch - clean', async sentinel => { + const leasedClient = await sentinel.aquire(); + await leasedClient.set('x', 1); + await leasedClient.watch('x'); + assert.deepEqual(await leasedClient.multi().get('x').exec(), ['1']) + }, GLOBAL.SENTINEL.WITH_MASTER_POOL_SIZE_2); + + testUtils.testWithClientSentinel('lease - watch - dirty', async sentinel => { + const leasedClient = await sentinel.aquire(); + await leasedClient.set('x', 1); + await leasedClient.watch('x'); + await leasedClient.set('x', 2); + + await assert.rejects(leasedClient.multi().get('x').exec(), new WatchError()); + }, GLOBAL.SENTINEL.WITH_MASTER_POOL_SIZE_2); +}); + + +// TODO: Figure out how to modify the test utils +// so it would have fine grained controll over +// sentinel +// it should somehow replicate the `SentinelFramework` object functionallities async function steadyState(frame: SentinelFramework) { let checkedMaster = false; let checkedReplicas = false; @@ -34,7 +339,6 @@ async function steadyState(frame: SentinelFramework) { checkedMaster = true; } } - if (!checkedReplicas) { const replicas = (await frame.sentinelReplicas()); checkedReplicas = true; @@ -43,17 +347,14 @@ async function steadyState(frame: SentinelFramework) { } } } - let nodeResolve, nodeReject; const nodePromise = new Promise((res, rej) => { nodeResolve = res; nodeReject = rej; }) - const seenNodes = new Set(); let sentinel: RedisSentinelType | undefined; const tracer = []; - try { sentinel = frame.getSentinelClient({ replicaPoolSize: 1, scanInterval: 2000 }, false) .on('topology-change', (event: RedisSentinelEvent) => { @@ -66,7 +367,6 @@ async function steadyState(frame: SentinelFramework) { }).on('error', err => { }); sentinel.setTracer(tracer); await sentinel.connect(); - await nodePromise; await sentinel.flushAll(); @@ -77,1194 +377,592 @@ async function steadyState(frame: SentinelFramework) { } } -["redis-sentinel-test-password", undefined].forEach(function (password) { - describe(`Sentinel - password = ${password}`, () => { - const config: RedisSentinelConfig = { sentinelName: "test", numberOfNodes: 3, password: password }; - const frame = new SentinelFramework(config); - let tracer = new Array(); - let stopMeasuringBlocking = false; - let longestDelta = 0; - let longestTestDelta = 0; - let last: number; - - before(async function () { - this.timeout(15000); - - last = Date.now(); - - function deltaMeasurer() { - const delta = Date.now() - last; - if (delta > longestDelta) { - longestDelta = delta; - } - if (delta > longestTestDelta) { - longestTestDelta = delta; - } - if (!stopMeasuringBlocking) { - last = Date.now(); - setImmediate(deltaMeasurer); - } +describe('legacy tests', () => { + const config: RedisSentinelConfig = { sentinelName: "test", numberOfNodes: 3, password: undefined }; + const frame = new SentinelFramework(config); + let tracer = new Array(); + let stopMeasuringBlocking = false; + let longestDelta = 0; + let longestTestDelta = 0; + let last: number; + + before(async function () { + this.timeout(15000); + + last = Date.now(); + + function deltaMeasurer() { + const delta = Date.now() - last; + if (delta > longestDelta) { + longestDelta = delta; + } + if (delta > longestTestDelta) { + longestTestDelta = delta; } + if (!stopMeasuringBlocking) { + last = Date.now(); + setImmediate(deltaMeasurer); + } + } + setImmediate(deltaMeasurer); + await frame.spawnRedisSentinel(); + }); - setImmediate(deltaMeasurer); + after(async function () { + this.timeout(15000); - await frame.spawnRedisSentinel(); - }); - - after(async function () { - this.timeout(15000); - - stopMeasuringBlocking = true; - - await frame.cleanup(); + stopMeasuringBlocking = true; + + await frame.cleanup(); + }) + + describe('Sentinel Client', function () { + let sentinel: RedisSentinelType< RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping> | undefined; + + beforeEach(async function () { + this.timeout(0); + + await frame.getAllRunning(); + await steadyState(frame); + longestTestDelta = 0; }) - - describe('Sentinel Client', function () { - let sentinel: RedisSentinelType< RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping> | undefined; - - beforeEach(async function () { - this.timeout(0); - - await frame.getAllRunning(); - await steadyState(frame); - longestTestDelta = 0; - }) - - afterEach(async function () { - this.timeout(30000); + afterEach(async function () { + this.timeout(30000); + // avoid errors in afterEach that end testing + if (sentinel !== undefined) { + sentinel.on('error', () => { }); + } - // avoid errors in afterEach that end testing - if (sentinel !== undefined) { - sentinel.on('error', () => { }); - } - - if (this!.currentTest!.state === 'failed') { - console.log(`longest event loop blocked delta: ${longestDelta}`); - console.log(`longest event loop blocked in failing test: ${longestTestDelta}`); - console.log("trace:"); - for (const line of tracer) { - console.log(line); - } - console.log(`sentinel object state:`) - console.log(`master: ${JSON.stringify(sentinel?.getMasterNode())}`) - console.log(`replicas: ${JSON.stringify(sentinel?.getReplicaNodes().entries)}`) - const results = await Promise.all([ - frame.sentinelSentinels(), - frame.sentinelMaster(), - frame.sentinelReplicas() - ]) - console.log(`sentinel sentinels:\n${JSON.stringify(results[0], undefined, '\t')}`); - console.log(`sentinel master:\n${JSON.stringify(results[1], undefined, '\t')}`); - console.log(`sentinel replicas:\n${JSON.stringify(results[2], undefined, '\t')}`); - const { stdout, stderr } = await execAsync("docker ps -a"); - console.log(`docker stdout:\n${stdout}`); - - const ids = frame.getAllDockerIds(); - console.log("docker logs"); - - for (const [id, port] of ids) { - console.log(`${id}/${port}\n`); - const { stdout, stderr } = await execAsync(`docker logs ${id}`, {maxBuffer: 8192 * 8192 * 4}); - console.log(stdout); - } + if (this!.currentTest!.state === 'failed') { + console.log(`longest event loop blocked delta: ${longestDelta}`); + console.log(`longest event loop blocked in failing test: ${longestTestDelta}`); + console.log("trace:"); + for (const line of tracer) { + console.log(line); } - tracer.length = 0; - - if (sentinel !== undefined) { - await sentinel.destroy(); - sentinel = undefined; + console.log(`sentinel object state:`) + console.log(`master: ${JSON.stringify(sentinel?.getMasterNode())}`) + console.log(`replicas: ${JSON.stringify(sentinel?.getReplicaNodes().entries)}`) + const results = await Promise.all([ + frame.sentinelSentinels(), + frame.sentinelMaster(), + frame.sentinelReplicas() + ]) + console.log(`sentinel sentinels:\n${JSON.stringify(results[0], undefined, '\t')}`); + console.log(`sentinel master:\n${JSON.stringify(results[1], undefined, '\t')}`); + console.log(`sentinel replicas:\n${JSON.stringify(results[2], undefined, '\t')}`); + const { stdout, stderr } = await execAsync("docker ps -a"); + console.log(`docker stdout:\n${stdout}`); + const ids = frame.getAllDockerIds(); + console.log("docker logs"); + for (const [id, port] of ids) { + console.log(`${id}/${port}\n`); + const { stdout, stderr } = await execAsync(`docker logs ${id}`, {maxBuffer: 8192 * 8192 * 4}); + console.log(stdout); } - }) - - it('basic bootstrap', async function () { - sentinel = frame.getSentinelClient(); - await sentinel.connect(); + } + tracer.length = 0; - await assert.doesNotReject(sentinel.set('x', 1)); + if (sentinel !== undefined) { + await sentinel.destroy(); + sentinel = undefined; + } + }) - }); - - it('basic teardown worked', async function () { - const nodePorts = frame.getAllNodesPort(); - const sentinelPorts = frame.getAllSentinelsPort(); - - assert.notEqual(nodePorts.length, 0); - assert.notEqual(sentinelPorts.length, 0); - - sentinel = frame.getSentinelClient(); - await sentinel.connect(); - - await assert.doesNotReject(sentinel.get('x')); - }); + it('use', async function () { + this.timeout(30000); - it('failed to connect', async function() { - sentinel = frame.getSentinelClient({sentinelRootNodes: [{host: "127.0.0.1", port: 1010}], maxCommandRediscovers: 0}) - await assert.rejects(sentinel.connect()); - }); - - it('try to connect multiple times', async function () { - sentinel = frame.getSentinelClient(); - const connectPromise = sentinel.connect(); - await assert.rejects(sentinel.connect()); - await connectPromise; - }); - - it('with type mapping', async function () { - const commandOptions = { - typeMapping: { - [RESP_TYPES.SIMPLE_STRING]: Buffer - } + sentinel = frame.getSentinelClient({ replicaPoolSize: 1 }); + sentinel.on("error", () => { }); + await sentinel.connect(); + + await sentinel.use( + async (client: RedisSentinelClientType, ) => { + const masterNode = sentinel!.getMasterNode(); + await frame.stopNode(masterNode!.port.toString()); + await assert.doesNotReject(client.get('x')); } - sentinel = frame.getSentinelClient({ commandOptions: commandOptions }); - await sentinel.connect(); - - const resp = await sentinel.ping(); - assert.deepEqual(resp, Buffer.from('PONG')) + ); + }); + + // stops master to force sentinel to update + it('stop master', async function () { + this.timeout(30000); + + sentinel = frame.getSentinelClient(); + sentinel.setTracer(tracer); + sentinel.on("error", () => { }); + await sentinel.connect(); + + tracer.push(`connected`); + + let masterChangeResolve; + const masterChangePromise = new Promise((res) => { + masterChangeResolve = res; }) - - it('with a script', async function () { - const options = { - scripts: { - square: SQUARE_SCRIPT - } + + const masterNode = await sentinel.getMasterNode(); + sentinel.on('topology-change', (event: RedisSentinelEvent) => { + tracer.push(`got topology-change event: ${JSON.stringify(event)}`); + if (event.type === "MASTER_CHANGE" && event.node.port != masterNode!.port) { + tracer.push(`got expected master change event`); + masterChangeResolve(event.node); } - - sentinel = frame.getSentinelClient(options); - await sentinel.connect(); - - const [, reply] = await Promise.all([ - sentinel.set('key', '2'), - sentinel.square('key') - ]); - - assert.equal(reply, 4); + }); + + tracer.push(`stopping master node`); + await frame.stopNode(masterNode!.port.toString()); + tracer.push(`stopped master node`); + + tracer.push(`waiting on master change promise`); + const newMaster = await masterChangePromise as RedisNode; + tracer.push(`got new master node of ${newMaster.port}`); + assert.notEqual(masterNode!.port, newMaster.port); + }); + + // if master changes, client should make sure user knows watches are invalid + it('watch across master change', async function () { + this.timeout(30000); + + sentinel = frame.getSentinelClient({ masterPoolSize: 2 }); + sentinel.setTracer(tracer); + sentinel.on("error", () => { }); + await sentinel.connect(); + + tracer.push("connected"); + + const client = await sentinel.aquire(); + tracer.push("aquired lease"); + + await client.set("x", 1); + await client.watch("x"); + + tracer.push("did a watch on lease"); + + let resolve; + const promise = new Promise((res) => { + resolve = res; }) - it('multi with a script', async function () { - const options = { - scripts: { - square: SQUARE_SCRIPT - } + const masterNode = sentinel.getMasterNode(); + tracer.push(`got masterPort as ${masterNode!.port}`); + + sentinel.on('topology-change', (event: RedisSentinelEvent) => { + tracer.push(`got topology-change event: ${JSON.stringify(event)}`); + if (event.type === "MASTER_CHANGE" && event.node.port != masterNode!.port) { + tracer.push("resolving promise"); + resolve(event.node); } - - sentinel = frame.getSentinelClient(options); - await sentinel.connect(); + }); + + tracer.push("stopping master node"); + await frame.stopNode(masterNode!.port.toString()); + tracer.push("stopped master node and waiting on promise"); + + const newMaster = await promise as RedisNode; + tracer.push(`promise returned, newMaster = ${JSON.stringify(newMaster)}`); + assert.notEqual(masterNode!.port, newMaster.port); + tracer.push(`newMaster does not equal old master`); + + tracer.push(`waiting to assert that a multi/exec now fails`); + await assert.rejects(async () => { await client.multi().get("x").exec() }, new Error("sentinel config changed in middle of a WATCH Transaction")); + tracer.push(`asserted that a multi/exec now fails`); + }); + + // same as above, but set a watch before and after master change, shouldn't change the fact that watches are invalid + it('watch before and after master change', async function () { + this.timeout(30000); + + sentinel = frame.getSentinelClient({ masterPoolSize: 2 }); + sentinel.setTracer(tracer); + sentinel.on("error", () => { }); + await sentinel.connect(); + tracer.push("connected"); + + const client = await sentinel.aquire(); + tracer.push("got leased client"); + await client.set("x", 1); + await client.watch("x"); - const reply = await sentinel.multi().set('key', 2).square('key').exec(); + tracer.push("set and watched x"); - assert.deepEqual(reply, ['OK', 4]); + let resolve; + const promise = new Promise((res) => { + resolve = res; }) - - it('with a function', async function () { - const options = { - functions: { - math: MATH_FUNCTION.library - } + + const masterNode = sentinel.getMasterNode(); + tracer.push(`initial masterPort = ${masterNode!.port} `); + + sentinel.on('topology-change', (event: RedisSentinelEvent) => { + tracer.push(`got topology-change event: ${JSON.stringify(event)}`); + if (event.type === "MASTER_CHANGE" && event.node.port != masterNode!.port) { + tracer.push("got a master change event that is not the same as before"); + resolve(event.node); } - sentinel = frame.getSentinelClient(options); - await sentinel.connect(); - - await sentinel.functionLoad( - MATH_FUNCTION.code, - { REPLACE: true } - ); - - await sentinel.set('key', '2'); - const resp = await sentinel.math.square('key'); - - assert.equal(resp, 4); + }); + + tracer.push("stopping master"); + await frame.stopNode(masterNode!.port.toString()); + tracer.push("stopped master"); + + tracer.push("waiting on master change promise"); + const newMaster = await promise as RedisNode; + tracer.push(`got master change port as ${newMaster.port}`); + assert.notEqual(masterNode!.port, newMaster.port); + + tracer.push("watching again, shouldn't matter"); + await client.watch("y"); + + tracer.push("expecting multi to be rejected"); + await assert.rejects(async () => { await client.multi().get("x").exec() }, new Error("sentinel config changed in middle of a WATCH Transaction")); + tracer.push("multi was rejected"); + }); + + + // pubsub continues to work, even with a master change + it('pubsub - channel - with master change', async function () { + this.timeout(30000); + + sentinel = frame.getSentinelClient(); + sentinel.setTracer(tracer); + sentinel.on("error", () => { }); + await sentinel.connect(); + tracer.push(`connected`); + + let pubSubResolve; + const pubSubPromise = new Promise((res) => { + pubSubResolve = res; }) - it('multi with a function', async function () { - const options = { - functions: { - math: MATH_FUNCTION.library - } - } - sentinel = frame.getSentinelClient(options); - await sentinel.connect(); - - await sentinel.functionLoad( - MATH_FUNCTION.code, - { REPLACE: true } - ); + let tester = false; + await sentinel.subscribe('test', () => { + tracer.push(`got pubsub message`); + tester = true; + pubSubResolve(1); + }) - const reply = await sentinel.multi().set('key', 2).math.square('key').exec(); - assert.deepEqual(reply, ['OK', 4]); + let masterChangeResolve; + const masterChangePromise = new Promise((res) => { + masterChangeResolve = res; }) - - it('with a module', async function () { - const options = { - modules: RedisBloomModules + + const masterNode = sentinel.getMasterNode(); + tracer.push(`got masterPort as ${masterNode!.port}`); + + sentinel.on('topology-change', (event: RedisSentinelEvent) => { + tracer.push(`got topology-change event: ${JSON.stringify(event)}`); + if (event.type === "MASTER_CHANGE" && event.node.port != masterNode!.port) { + tracer.push("got a master change event that is not the same as before"); + masterChangeResolve(event.node); } - sentinel = frame.getSentinelClient(options); - await sentinel.connect(); - - const resp = await sentinel.bf.add('key', 'item') - assert.equal(resp, true); + }); + + tracer.push("stopping master"); + await frame.stopNode(masterNode!.port.toString()); + tracer.push("stopped master and waiting on change promise"); + + const newMaster = await masterChangePromise as RedisNode; + tracer.push(`got master change port as ${newMaster.port}`); + assert.notEqual(masterNode!.port, newMaster.port); + + tracer.push(`publishing pubsub message`); + await sentinel.publish('test', 'hello world'); + tracer.push(`published pubsub message and waiting pn pubsub promise`); + await pubSubPromise; + tracer.push(`got pubsub promise`); + + assert.equal(tester, true); + + // now unsubscribe + tester = false + await sentinel.unsubscribe('test') + await sentinel.publish('test', 'hello world'); + await setTimeout(1000); + + assert.equal(tester, false); + }); + + it('pubsub - pattern - with master change', async function () { + this.timeout(30000); + + sentinel = frame.getSentinelClient(); + sentinel.setTracer(tracer); + sentinel.on("error", () => { }); + await sentinel.connect(); + tracer.push(`connected`); + + let pubSubResolve; + const pubSubPromise = new Promise((res) => { + pubSubResolve = res; }) - it('multi with a module', async function () { - const options = { - modules: RedisBloomModules - } - sentinel = frame.getSentinelClient(options); - await sentinel.connect(); - - const resp = await sentinel.multi().bf.add('key', 'item').exec(); - assert.deepEqual(resp, [true]); + let tester = false; + await sentinel.pSubscribe('test*', () => { + tracer.push(`got pubsub message`); + tester = true; + pubSubResolve(1); }) - - it('many readers', async function () { - this.timeout(10000); - - sentinel = frame.getSentinelClient({ replicaPoolSize: 8 }); - await sentinel.connect(); - - await sentinel.set("x", 1); - for (let i = 0; i < 10; i++) { - if (await sentinel.get("x") == "1") { - break; - } - await setTimeout(1000); - } - - const promises: Array> = []; - for (let i = 0; i < 500; i++) { - promises.push(sentinel.get("x")); - } - - const resp = await Promise.all(promises); - assert.equal(resp.length, 500); - for (let i = 0; i < 500; i++) { - assert.equal(resp[i], "1", `failed on match at ${i}`); + + let masterChangeResolve; + const masterChangePromise = new Promise((res) => { + masterChangeResolve = res; + }) + + const masterNode = sentinel.getMasterNode(); + tracer.push(`got masterPort as ${masterNode!.port}`); + + sentinel.on('topology-change', (event: RedisSentinelEvent) => { + tracer.push(`got topology-change event: ${JSON.stringify(event)}`); + if (event.type === "MASTER_CHANGE" && event.node.port != masterNode!.port) { + tracer.push("got a master change event that is not the same as before"); + masterChangeResolve(event.node); } }); - - it('use', async function () { - this.timeout(30000); - - sentinel = frame.getSentinelClient({ replicaPoolSize: 1 }); - sentinel.on("error", () => { }); - await sentinel.connect(); - - await sentinel.use( - async (client: RedisSentinelClientType, ) => { - const masterNode = sentinel!.getMasterNode(); - await frame.stopNode(masterNode!.port.toString()); - await assert.doesNotReject(client.get('x')); - } - ); - }); - - it('use with script', async function () { - this.timeout(10000); - - const options = { - scripts: { - square: SQUARE_SCRIPT - } - } - - sentinel = frame.getSentinelClient(options); - await sentinel.connect(); - - const reply = await sentinel.use( - async (client: RedisSentinelClientType) => { - assert.equal(await client.set('key', '2'), 'OK'); - assert.equal(await client.get('key'), '2'); - return client.square('key') - } - ); - - assert.equal(reply, 4); + + tracer.push("stopping master"); + await frame.stopNode(masterNode!.port.toString()); + tracer.push("stopped master and waiting on master change promise"); + + const newMaster = await masterChangePromise as RedisNode; + tracer.push(`got master change port as ${newMaster.port}`); + assert.notEqual(masterNode!.port, newMaster.port); + + tracer.push(`publishing pubsub message`); + await sentinel.publish('testy', 'hello world'); + tracer.push(`published pubsub message and waiting on pubsub promise`); + await pubSubPromise; + tracer.push(`got pubsub promise`); + assert.equal(tester, true); + + // now unsubscribe + tester = false + await sentinel.pUnsubscribe('test*'); + await sentinel.publish('testy', 'hello world'); + await setTimeout(1000); + + assert.equal(tester, false); + }); + + // if we stop a node, the comand should "retry" until we reconfigure topology and execute on new topology + it('command immeaditely after stopping master', async function () { + this.timeout(30000); + + sentinel = frame.getSentinelClient(); + sentinel.setTracer(tracer); + sentinel.on("error", () => { }); + await sentinel.connect(); + + tracer.push("connected"); + + let masterChangeResolve; + const masterChangePromise = new Promise((res) => { + masterChangeResolve = res; }) - - it('use with a function', async function () { - this.timeout(10000); - - const options = { - functions: { - math: MATH_FUNCTION.library - } + + const masterNode = sentinel.getMasterNode(); + tracer.push(`original master port = ${masterNode!.port}`); + + let changeCount = 0; + sentinel.on('topology-change', (event: RedisSentinelEvent) => { + tracer.push(`got topology-change event: ${JSON.stringify(event)}`); + if (event.type === "MASTER_CHANGE" && event.node.port != masterNode!.port) { + changeCount++; + tracer.push(`got topology-change event we expected`); + masterChangeResolve(event.node); } - sentinel = frame.getSentinelClient(options); - await sentinel.connect(); - - await sentinel.functionLoad( - MATH_FUNCTION.code, - { REPLACE: true } - ); - - const reply = await sentinel.use( - async (client: RedisSentinelClientType) => { - await client.set('key', '2'); - return client.math.square('key'); - } - ); - - assert.equal(reply, 4); + }); + + tracer.push(`stopping masterNode`); + await frame.stopNode(masterNode!.port.toString()); + tracer.push(`stopped masterNode`); + assert.equal(await sentinel.set('x', 123), 'OK'); + tracer.push(`did the set operation`); + const presumamblyNewMaster = sentinel.getMasterNode(); + tracer.push(`new master node seems to be ${presumamblyNewMaster?.port} and waiting on master change promise`); + + const newMaster = await masterChangePromise as RedisNode; + tracer.push(`got new masternode event saying master is at ${newMaster.port}`); + assert.notEqual(masterNode!.port, newMaster.port); + + tracer.push(`doing the get`); + const val = await sentinel.get('x'); + tracer.push(`did the get and got ${val}`); + const newestMaster = sentinel.getMasterNode() + tracer.push(`after get, we see master as ${newestMaster?.port}`); + + switch (changeCount) { + case 1: + // if we only changed masters once, we should have the proper value + assert.equal(val, '123'); + break; + case 2: + // we changed masters twice quickly, so probably didn't replicate + // therefore, this is soewhat flakey, but the above is the common case + assert(val == '123' || val == null); + break; + default: + assert(false, "unexpected case"); + } + }); + + it('shutdown sentinel node', async function () { + this.timeout(30000); + + sentinel = frame.getSentinelClient(); + sentinel.setTracer(tracer); + sentinel.on("error", () => { }); + await sentinel.connect(); + tracer.push("connected"); + + let sentinelChangeResolve; + const sentinelChangePromise = new Promise((res) => { + sentinelChangeResolve = res; }) - - it('use with a module', async function () { - const options = { - modules: RedisBloomModules + + const sentinelNode = sentinel.getSentinelNode(); + tracer.push(`sentinelNode = ${sentinelNode?.port}`) + + sentinel.on('topology-change', (event: RedisSentinelEvent) => { + tracer.push(`got topology-change event: ${JSON.stringify(event)}`); + if (event.type === "SENTINEL_CHANGE") { + tracer.push("got sentinel change event"); + sentinelChangeResolve(event.node); } - sentinel = frame.getSentinelClient(options); - await sentinel.connect(); - - const reply = await sentinel.use( - async (client: RedisSentinelClientType) => { - return client.bf.add('key', 'item'); - } - ); - - assert.equal(reply, true); - }) - - it('block on pool', async function () { - this.timeout(30000); - - sentinel = frame.getSentinelClient({ replicaPoolSize: 1 }); - sentinel.on("error", () => { }); - await sentinel.connect(); - - const promise = sentinel.use( - async client => { - await setTimeout(1000); - return await client.get("x"); - } - ) - - await sentinel.set("x", 1); - assert.equal(await promise, null); }); - it.skip('reserve client, takes a client out of pool', async function () { - this.timeout(30000); - - sentinel = frame.getSentinelClient({ masterPoolSize: 2, reserveClient: true }); - await sentinel.connect(); - - const promise1 = sentinel.use( - async client => { - const val = await client.get("x"); - await client.set("x", 2); - return val; - } - ) + tracer.push("Stopping sentinel node"); + await frame.stopSentinel(sentinelNode!.port.toString()); + tracer.push("Stopped sentinel node and waiting on sentinel change promise"); + const newSentinel = await sentinelChangePromise as RedisNode; + tracer.push("got sentinel change promise"); + assert.notEqual(sentinelNode!.port, newSentinel.port); + }); - const promise2 = sentinel.use( - async client => { - return client.get("x"); - } - ) + it('timer works, and updates sentinel list', async function () { + this.timeout(30000); + + sentinel = frame.getSentinelClient({ scanInterval: 1000 }); + sentinel.setTracer(tracer); + await sentinel.connect(); + tracer.push("connected"); - await sentinel.set("x", 1); - assert.equal(await promise1, "1"); - assert.equal(await promise2, "2"); + let sentinelChangeResolve; + const sentinelChangePromise = new Promise((res) => { + sentinelChangeResolve = res; }) - - it('multiple clients', async function () { - this.timeout(30000); - - sentinel = frame.getSentinelClient({ masterPoolSize: 2 }); - sentinel.on("error", () => { }); - await sentinel.connect(); - - let set = false; - - const promise = sentinel.use( - async client => { - await sentinel!.set("x", 1); - await client.get("x"); - } - ) - - await assert.doesNotReject(promise); - }); - - // by taking a lease, we know we will block on master as no clients are available, but as read occuring, means replica read occurs - it.skip('replica reads', async function () { - this.timeout(30000); - - sentinel = frame.getSentinelClient({ replicaPoolSize: 1 }); - sentinel.on("error", () => { }); - await sentinel.connect(); - - const clientLease = await sentinel.aquire(); - clientLease.set('x', 456); - - let matched = false; - /* waits for replication */ - for (let i = 0; i < 15; i++) { - try { - assert.equal(await sentinel.get("x"), '456'); - matched = true; - break; - } catch (err) { - await setTimeout(1000); - } + + sentinel.on('topology-change', (event: RedisSentinelEvent) => { + tracer.push(`got topology-change event: ${JSON.stringify(event)}`); + if (event.type === "SENTINE_LIST_CHANGE" && event.size == 4) { + tracer.push(`got sentinel list change event with right size`); + sentinelChangeResolve(event.size); } - - clientLease.release(); - - assert.equal(matched, true); }); - - it('pipeline', async function () { - this.timeout(30000); - - sentinel = frame.getSentinelClient({ replicaPoolSize: 1 }); - await sentinel.connect(); - - const resp = await sentinel.multi().set('x', 1).get('x').execAsPipeline(); - - assert.deepEqual(resp, ['OK', '1']); + + tracer.push(`adding sentinel`); + await frame.addSentinel(); + tracer.push(`added sentinel and waiting on sentinel change promise`); + const newSentinelSize = await sentinelChangePromise as number; + + assert.equal(newSentinelSize, 4); + }); + + it('stop replica, bring back replica', async function () { + this.timeout(30000); + + sentinel = frame.getSentinelClient({ replicaPoolSize: 1 }); + sentinel.setTracer(tracer); + sentinel.on('error', err => { }); + await sentinel.connect(); + tracer.push("connected"); + + let sentinelRemoveResolve; + const sentinelRemovePromise = new Promise((res) => { + sentinelRemoveResolve = res; }) - - it('use - watch - clean', async function () { - this.timeout(30000); - - sentinel = frame.getSentinelClient({ masterPoolSize: 2 }); - await sentinel.connect(); - - let promise = sentinel.use(async (client) => { - await client.set("x", 1); - await client.watch("x"); - return client.multi().get("x").exec(); - }); - - assert.deepEqual(await promise, ['1']); - }); - - it('use - watch - dirty', async function () { - this.timeout(30000); - - sentinel = frame.getSentinelClient({ masterPoolSize: 2 }); - await sentinel.connect(); - - let promise = sentinel.use(async (client) => { - await client.set('x', 1); - await client.watch('x'); - await sentinel!.set('x', 2); - return client.multi().get('x').exec(); - }); - - await assert.rejects(promise, new WatchError()); - }); - - it('lease - watch - clean', async function () { - sentinel = frame.getSentinelClient({ masterPoolSize: 2 }); - await sentinel.connect(); - - const leasedClient = await sentinel.aquire(); - await leasedClient.set('x', 1); - await leasedClient.watch('x'); - assert.deepEqual(await leasedClient.multi().get('x').exec(), ['1']) - }); - - it('lease - watch - dirty', async function () { - sentinel = frame.getSentinelClient({ masterPoolSize: 2 }); - await sentinel.connect(); - - const leasedClient = await sentinel.aquire(); - await leasedClient.set('x', 1); - await leasedClient.watch('x'); - await leasedClient.set('x', 2); - - await assert.rejects(leasedClient.multi().get('x').exec(), new WatchError()); - }); - - - it('watch does not carry through leases', async function () { - this.timeout(10000); - sentinel = frame.getSentinelClient(); - await sentinel.connect(); - - // each of these commands is an independent lease - assert.equal(await sentinel.use(client => client.watch("x")), 'OK') - assert.equal(await sentinel.use(client => client.set('x', 1)), 'OK'); - assert.deepEqual(await sentinel.use(client => client.multi().get('x').exec()), ['1']); - }); - - // stops master to force sentinel to update - it('stop master', async function () { - this.timeout(30000); - - sentinel = frame.getSentinelClient(); - sentinel.setTracer(tracer); - sentinel.on("error", () => { }); - await sentinel.connect(); - - tracer.push(`connected`); - - let masterChangeResolve; - const masterChangePromise = new Promise((res) => { - masterChangeResolve = res; - }) - - const masterNode = await sentinel.getMasterNode(); - sentinel.on('topology-change', (event: RedisSentinelEvent) => { - tracer.push(`got topology-change event: ${JSON.stringify(event)}`); - if (event.type === "MASTER_CHANGE" && event.node.port != masterNode!.port) { - tracer.push(`got expected master change event`); - masterChangeResolve(event.node); - } - }); - - tracer.push(`stopping master node`); - await frame.stopNode(masterNode!.port.toString()); - tracer.push(`stopped master node`); - - tracer.push(`waiting on master change promise`); - const newMaster = await masterChangePromise as RedisNode; - tracer.push(`got new master node of ${newMaster.port}`); - assert.notEqual(masterNode!.port, newMaster.port); - }); - - // if master changes, client should make sure user knows watches are invalid - it('watch across master change', async function () { - this.timeout(30000); - - sentinel = frame.getSentinelClient({ masterPoolSize: 2 }); - sentinel.setTracer(tracer); - sentinel.on("error", () => { }); - await sentinel.connect(); - - tracer.push("connected"); - - const client = await sentinel.aquire(); - tracer.push("aquired lease"); - - await client.set("x", 1); - await client.watch("x"); - - tracer.push("did a watch on lease"); - - let resolve; - const promise = new Promise((res) => { - resolve = res; - }) - - const masterNode = sentinel.getMasterNode(); - tracer.push(`got masterPort as ${masterNode!.port}`); - - sentinel.on('topology-change', (event: RedisSentinelEvent) => { - tracer.push(`got topology-change event: ${JSON.stringify(event)}`); - if (event.type === "MASTER_CHANGE" && event.node.port != masterNode!.port) { - tracer.push("resolving promise"); - resolve(event.node); - } - }); - - tracer.push("stopping master node"); - await frame.stopNode(masterNode!.port.toString()); - tracer.push("stopped master node and waiting on promise"); - - const newMaster = await promise as RedisNode; - tracer.push(`promise returned, newMaster = ${JSON.stringify(newMaster)}`); - assert.notEqual(masterNode!.port, newMaster.port); - tracer.push(`newMaster does not equal old master`); - - tracer.push(`waiting to assert that a multi/exec now fails`); - await assert.rejects(async () => { await client.multi().get("x").exec() }, new Error("sentinel config changed in middle of a WATCH Transaction")); - tracer.push(`asserted that a multi/exec now fails`); - }); - - // same as above, but set a watch before and after master change, shouldn't change the fact that watches are invalid - it('watch before and after master change', async function () { - this.timeout(30000); - - sentinel = frame.getSentinelClient({ masterPoolSize: 2 }); - sentinel.setTracer(tracer); - sentinel.on("error", () => { }); - await sentinel.connect(); - tracer.push("connected"); - - const client = await sentinel.aquire(); - tracer.push("got leased client"); - await client.set("x", 1); - await client.watch("x"); - - tracer.push("set and watched x"); - - let resolve; - const promise = new Promise((res) => { - resolve = res; - }) - - const masterNode = sentinel.getMasterNode(); - tracer.push(`initial masterPort = ${masterNode!.port} `); - - sentinel.on('topology-change', (event: RedisSentinelEvent) => { - tracer.push(`got topology-change event: ${JSON.stringify(event)}`); - if (event.type === "MASTER_CHANGE" && event.node.port != masterNode!.port) { - tracer.push("got a master change event that is not the same as before"); - resolve(event.node); - } - }); - - tracer.push("stopping master"); - await frame.stopNode(masterNode!.port.toString()); - tracer.push("stopped master"); - - tracer.push("waiting on master change promise"); - const newMaster = await promise as RedisNode; - tracer.push(`got master change port as ${newMaster.port}`); - assert.notEqual(masterNode!.port, newMaster.port); - - tracer.push("watching again, shouldn't matter"); - await client.watch("y"); - - tracer.push("expecting multi to be rejected"); - await assert.rejects(async () => { await client.multi().get("x").exec() }, new Error("sentinel config changed in middle of a WATCH Transaction")); - tracer.push("multi was rejected"); - }); - - it.skip('plain pubsub - channel', async function () { - this.timeout(30000); - - sentinel = frame.getSentinelClient(); - sentinel.setTracer(tracer); - await sentinel.connect(); - tracer.push(`connected`); - - let pubSubResolve; - const pubSubPromise = new Promise((res) => { - pubSubResolve = res; - }); - - let tester = false; - await sentinel.subscribe('test', () => { - tracer.push(`got pubsub message`); - tester = true; - pubSubResolve(1); - }) - - tracer.push(`publishing pubsub message`); - await sentinel.publish('test', 'hello world'); - tracer.push(`waiting on pubsub promise`); - await pubSubPromise; - tracer.push(`got pubsub promise`); - assert.equal(tester, true); - - // now unsubscribe - tester = false - tracer.push(`unsubscribing pubsub listener`); - await sentinel.unsubscribe('test') - tracer.push(`pubishing pubsub message`); - await sentinel.publish('test', 'hello world'); - await setTimeout(1000); - - tracer.push(`ensuring pubsub was unsubscribed via an assert`); - assert.equal(tester, false); - }); - - it.skip('plain pubsub - pattern', async function () { - this.timeout(30000); - - sentinel = frame.getSentinelClient(); - sentinel.setTracer(tracer); - await sentinel.connect(); - tracer.push(`connected`); - - let pubSubResolve; - const pubSubPromise = new Promise((res) => { - pubSubResolve = res; - }); - - let tester = false; - await sentinel.pSubscribe('test*', () => { - tracer.push(`got pubsub message`); - tester = true; - pubSubResolve(1); - }) - - tracer.push(`publishing pubsub message`); - await sentinel.publish('testy', 'hello world'); - tracer.push(`waiting on pubsub promise`); - await pubSubPromise; - tracer.push(`got pubsub promise`); - assert.equal(tester, true); - - // now unsubscribe - tester = false - tracer.push(`unsubscribing pubsub listener`); - await sentinel.pUnsubscribe('test*'); - tracer.push(`pubishing pubsub message`); - await sentinel.publish('testy', 'hello world'); - await setTimeout(1000); - - tracer.push(`ensuring pubsub was unsubscribed via an assert`); - assert.equal(tester, false); - }); - - // pubsub continues to work, even with a master change - it('pubsub - channel - with master change', async function () { - this.timeout(30000); - - sentinel = frame.getSentinelClient(); - sentinel.setTracer(tracer); - sentinel.on("error", () => { }); - await sentinel.connect(); - tracer.push(`connected`); - - let pubSubResolve; - const pubSubPromise = new Promise((res) => { - pubSubResolve = res; - }) - - let tester = false; - await sentinel.subscribe('test', () => { - tracer.push(`got pubsub message`); - tester = true; - pubSubResolve(1); - }) - - let masterChangeResolve; - const masterChangePromise = new Promise((res) => { - masterChangeResolve = res; - }) - - const masterNode = sentinel.getMasterNode(); - tracer.push(`got masterPort as ${masterNode!.port}`); - - sentinel.on('topology-change', (event: RedisSentinelEvent) => { - tracer.push(`got topology-change event: ${JSON.stringify(event)}`); - if (event.type === "MASTER_CHANGE" && event.node.port != masterNode!.port) { - tracer.push("got a master change event that is not the same as before"); - masterChangeResolve(event.node); - } - }); - - tracer.push("stopping master"); - await frame.stopNode(masterNode!.port.toString()); - tracer.push("stopped master and waiting on change promise"); - - const newMaster = await masterChangePromise as RedisNode; - tracer.push(`got master change port as ${newMaster.port}`); - assert.notEqual(masterNode!.port, newMaster.port); - - tracer.push(`publishing pubsub message`); - await sentinel.publish('test', 'hello world'); - tracer.push(`published pubsub message and waiting pn pubsub promise`); - await pubSubPromise; - tracer.push(`got pubsub promise`); - - assert.equal(tester, true); - - // now unsubscribe - tester = false - await sentinel.unsubscribe('test') - await sentinel.publish('test', 'hello world'); - await setTimeout(1000); - - assert.equal(tester, false); - }); - - it('pubsub - pattern - with master change', async function () { - this.timeout(30000); - - sentinel = frame.getSentinelClient(); - sentinel.setTracer(tracer); - sentinel.on("error", () => { }); - await sentinel.connect(); - tracer.push(`connected`); - - let pubSubResolve; - const pubSubPromise = new Promise((res) => { - pubSubResolve = res; - }) - - let tester = false; - await sentinel.pSubscribe('test*', () => { - tracer.push(`got pubsub message`); - tester = true; - pubSubResolve(1); - }) - - let masterChangeResolve; - const masterChangePromise = new Promise((res) => { - masterChangeResolve = res; - }) - - const masterNode = sentinel.getMasterNode(); - tracer.push(`got masterPort as ${masterNode!.port}`); - - sentinel.on('topology-change', (event: RedisSentinelEvent) => { - tracer.push(`got topology-change event: ${JSON.stringify(event)}`); - if (event.type === "MASTER_CHANGE" && event.node.port != masterNode!.port) { - tracer.push("got a master change event that is not the same as before"); - masterChangeResolve(event.node); - } - }); - - tracer.push("stopping master"); - await frame.stopNode(masterNode!.port.toString()); - tracer.push("stopped master and waiting on master change promise"); - - const newMaster = await masterChangePromise as RedisNode; - tracer.push(`got master change port as ${newMaster.port}`); - assert.notEqual(masterNode!.port, newMaster.port); - - tracer.push(`publishing pubsub message`); - await sentinel.publish('testy', 'hello world'); - tracer.push(`published pubsub message and waiting on pubsub promise`); - await pubSubPromise; - tracer.push(`got pubsub promise`); - assert.equal(tester, true); - - // now unsubscribe - tester = false - await sentinel.pUnsubscribe('test*'); - await sentinel.publish('testy', 'hello world'); - await setTimeout(1000); - - assert.equal(tester, false); - }); - - // if we stop a node, the comand should "retry" until we reconfigure topology and execute on new topology - it('command immeaditely after stopping master', async function () { - this.timeout(30000); - - sentinel = frame.getSentinelClient(); - sentinel.setTracer(tracer); - sentinel.on("error", () => { }); - await sentinel.connect(); - - tracer.push("connected"); - - let masterChangeResolve; - const masterChangePromise = new Promise((res) => { - masterChangeResolve = res; - }) - - const masterNode = sentinel.getMasterNode(); - tracer.push(`original master port = ${masterNode!.port}`); - - let changeCount = 0; - sentinel.on('topology-change', (event: RedisSentinelEvent) => { - tracer.push(`got topology-change event: ${JSON.stringify(event)}`); - if (event.type === "MASTER_CHANGE" && event.node.port != masterNode!.port) { - changeCount++; - tracer.push(`got topology-change event we expected`); - masterChangeResolve(event.node); + + const replicaPort = await frame.getRandonNonMasterNode(); + + sentinel.on('topology-change', (event: RedisSentinelEvent) => { + tracer.push(`got topology-change event: ${JSON.stringify(event)}`); + if (event.type === "REPLICA_REMOVE") { + if (event.node.port.toString() == replicaPort) { + tracer.push("got expected replica removed event"); + sentinelRemoveResolve(event.node); + } else { + tracer.push(`got replica removed event for a different node: ${event.node.port}`); } - }); - - tracer.push(`stopping masterNode`); - await frame.stopNode(masterNode!.port.toString()); - tracer.push(`stopped masterNode`); - assert.equal(await sentinel.set('x', 123), 'OK'); - tracer.push(`did the set operation`); - const presumamblyNewMaster = sentinel.getMasterNode(); - tracer.push(`new master node seems to be ${presumamblyNewMaster?.port} and waiting on master change promise`); - - const newMaster = await masterChangePromise as RedisNode; - tracer.push(`got new masternode event saying master is at ${newMaster.port}`); - assert.notEqual(masterNode!.port, newMaster.port); - - tracer.push(`doing the get`); - const val = await sentinel.get('x'); - tracer.push(`did the get and got ${val}`); - const newestMaster = sentinel.getMasterNode() - tracer.push(`after get, we see master as ${newestMaster?.port}`); - - switch (changeCount) { - case 1: - // if we only changed masters once, we should have the proper value - assert.equal(val, '123'); - break; - case 2: - // we changed masters twice quickly, so probably didn't replicate - // therefore, this is soewhat flakey, but the above is the common case - assert(val == '123' || val == null); - break; - default: - assert(false, "unexpected case"); } }); - - it('shutdown sentinel node', async function () { - this.timeout(30000); - - sentinel = frame.getSentinelClient(); - sentinel.setTracer(tracer); - sentinel.on("error", () => { }); - await sentinel.connect(); - tracer.push("connected"); - - let sentinelChangeResolve; - const sentinelChangePromise = new Promise((res) => { - sentinelChangeResolve = res; - }) - - const sentinelNode = sentinel.getSentinelNode(); - tracer.push(`sentinelNode = ${sentinelNode?.port}`) - - sentinel.on('topology-change', (event: RedisSentinelEvent) => { - tracer.push(`got topology-change event: ${JSON.stringify(event)}`); - if (event.type === "SENTINEL_CHANGE") { - tracer.push("got sentinel change event"); - sentinelChangeResolve(event.node); - } - }); - - tracer.push("Stopping sentinel node"); - await frame.stopSentinel(sentinelNode!.port.toString()); - tracer.push("Stopped sentinel node and waiting on sentinel change promise"); - const newSentinel = await sentinelChangePromise as RedisNode; - tracer.push("got sentinel change promise"); - assert.notEqual(sentinelNode!.port, newSentinel.port); - }); - - it('timer works, and updates sentinel list', async function () { - this.timeout(30000); - - sentinel = frame.getSentinelClient({ scanInterval: 1000 }); - sentinel.setTracer(tracer); - await sentinel.connect(); - tracer.push("connected"); - - let sentinelChangeResolve; - const sentinelChangePromise = new Promise((res) => { - sentinelChangeResolve = res; - }) - - sentinel.on('topology-change', (event: RedisSentinelEvent) => { - tracer.push(`got topology-change event: ${JSON.stringify(event)}`); - if (event.type === "SENTINE_LIST_CHANGE" && event.size == 4) { - tracer.push(`got sentinel list change event with right size`); - sentinelChangeResolve(event.size); - } - }); - - tracer.push(`adding sentinel`); - await frame.addSentinel(); - tracer.push(`added sentinel and waiting on sentinel change promise`); - const newSentinelSize = await sentinelChangePromise as number; - - assert.equal(newSentinelSize, 4); - }); - - it('stop replica, bring back replica', async function () { - this.timeout(30000); - - sentinel = frame.getSentinelClient({ replicaPoolSize: 1 }); - sentinel.setTracer(tracer); - sentinel.on('error', err => { }); - await sentinel.connect(); - tracer.push("connected"); - - let sentinelRemoveResolve; - const sentinelRemovePromise = new Promise((res) => { - sentinelRemoveResolve = res; - }) - - const replicaPort = await frame.getRandonNonMasterNode(); - - sentinel.on('topology-change', (event: RedisSentinelEvent) => { - tracer.push(`got topology-change event: ${JSON.stringify(event)}`); - if (event.type === "REPLICA_REMOVE") { - if (event.node.port.toString() == replicaPort) { - tracer.push("got expected replica removed event"); - sentinelRemoveResolve(event.node); - } else { - tracer.push(`got replica removed event for a different node: ${event.node.port}`); - } - } - }); - - tracer.push(`replicaPort = ${replicaPort} and stopping it`); - await frame.stopNode(replicaPort); - tracer.push("stopped replica and waiting on sentinel removed promise"); - const stoppedNode = await sentinelRemovePromise as RedisNode; - tracer.push("got removed promise"); - assert.equal(stoppedNode.port, Number(replicaPort)); - - let sentinelRestartedResolve; - const sentinelRestartedPromise = new Promise((res) => { - sentinelRestartedResolve = res; - }) - - sentinel.on('topology-change', (event: RedisSentinelEvent) => { - tracer.push(`got topology-change event: ${JSON.stringify(event)}`); - if (event.type === "REPLICA_ADD") { - tracer.push("got replica added event"); - sentinelRestartedResolve(event.node); - } - }); - - tracer.push("restarting replica"); - await frame.restartNode(replicaPort); - tracer.push("restarted replica and waiting on restart promise"); - const restartedNode = await sentinelRestartedPromise as RedisNode; - tracer.push("got restarted promise"); - assert.equal(restartedNode.port, Number(replicaPort)); + + tracer.push(`replicaPort = ${replicaPort} and stopping it`); + await frame.stopNode(replicaPort); + tracer.push("stopped replica and waiting on sentinel removed promise"); + const stoppedNode = await sentinelRemovePromise as RedisNode; + tracer.push("got removed promise"); + assert.equal(stoppedNode.port, Number(replicaPort)); + + let sentinelRestartedResolve; + const sentinelRestartedPromise = new Promise((res) => { + sentinelRestartedResolve = res; }) - - it('add a node / new replica', async function () { - this.timeout(30000); - - sentinel = frame.getSentinelClient({ scanInterval: 2000, replicaPoolSize: 1 }); - sentinel.setTracer(tracer); - // need to handle errors, as the spawning a new docker node can cause existing connections to time out - sentinel.on('error', err => { }); - await sentinel.connect(); - tracer.push("connected"); - - let nodeAddedResolve: (value: RedisNode) => void; - const nodeAddedPromise = new Promise((res) => { - nodeAddedResolve = res as (value: RedisNode) => void; - }); - - const portSet = new Set(); - for (const port of frame.getAllNodesPort()) { - portSet.add(port); + + sentinel.on('topology-change', (event: RedisSentinelEvent) => { + tracer.push(`got topology-change event: ${JSON.stringify(event)}`); + if (event.type === "REPLICA_ADD") { + tracer.push("got replica added event"); + sentinelRestartedResolve(event.node); } - - // "on" and not "once" as due to connection timeouts, can happen multiple times, and want right one - sentinel.on('topology-change', (event: RedisSentinelEvent) => { - tracer.push(`got topology-change event: ${JSON.stringify(event)}`); - if (event.type === "REPLICA_ADD") { - if (!portSet.has(event.node.port)) { - tracer.push("got expected replica added event"); - nodeAddedResolve(event.node); - } - } - }); - - tracer.push("adding node"); - await frame.addNode(); - tracer.push("added node and waiting on added promise"); - await nodeAddedPromise; - }) + }); + + tracer.push("restarting replica"); + await frame.restartNode(replicaPort); + tracer.push("restarted replica and waiting on restart promise"); + const restartedNode = await sentinelRestartedPromise as RedisNode; + tracer.push("got restarted promise"); + assert.equal(restartedNode.port, Number(replicaPort)); }) - - describe('Sentinel Factory', function () { - let master: RedisClientType | undefined; - let replica: RedisClientType | undefined; - - beforeEach(async function () { - this.timeout(0); - await frame.getAllRunning(); - - await steadyState(frame); - longestTestDelta = 0; - }) - - afterEach(async function () { - if (this!.currentTest!.state === 'failed') { - console.log(`longest event loop blocked delta: ${longestDelta}`); - console.log(`longest event loop blocked in failing test: ${longestTestDelta}`); - console.log("trace:"); - for (const line of tracer) { - console.log(line); - } - const results = await Promise.all([ - frame.sentinelSentinels(), - frame.sentinelMaster(), - frame.sentinelReplicas() - ]) - console.log(`sentinel sentinels:\n${JSON.stringify(results[0], undefined, '\t')}`); - console.log(`sentinel master:\n${JSON.stringify(results[1], undefined, '\t')}`); - console.log(`sentinel replicas:\n${JSON.stringify(results[2], undefined, '\t')}`); - const { stdout, stderr } = await execAsync("docker ps -a"); - console.log(`docker stdout:\n${stdout}`); - console.log(`docker stderr:\n${stderr}`); - } - tracer.length = 0; - - if (master !== undefined) { - if (master.isOpen) { - master.destroy(); - } - master = undefined; - } - - if (replica !== undefined) { - if (replica.isOpen) { - replica.destroy(); + it('add a node / new replica', async function () { + this.timeout(30000); + + sentinel = frame.getSentinelClient({ scanInterval: 2000, replicaPoolSize: 1 }); + sentinel.setTracer(tracer); + // need to handle errors, as the spawning a new docker node can cause existing connections to time out + sentinel.on('error', err => { }); + await sentinel.connect(); + tracer.push("connected"); + + let nodeAddedResolve: (value: RedisNode) => void; + const nodeAddedPromise = new Promise((res) => { + nodeAddedResolve = res as (value: RedisNode) => void; + }); + + const portSet = new Set(); + for (const port of frame.getAllNodesPort()) { + portSet.add(port); + } + + // "on" and not "once" as due to connection timeouts, can happen multiple times, and want right one + sentinel.on('topology-change', (event: RedisSentinelEvent) => { + tracer.push(`got topology-change event: ${JSON.stringify(event)}`); + if (event.type === "REPLICA_ADD") { + if (!portSet.has(event.node.port)) { + tracer.push("got expected replica added event"); + nodeAddedResolve(event.node); } - replica = undefined; - } - }) - - it('sentinel factory - master', async function () { - const sentinelPorts = frame.getAllSentinelsPort(); - const sentinels: Array = []; - for (const port of sentinelPorts) { - sentinels.push({ host: "localhost", port: port }); - } - - const factory = new RedisSentinelFactory({ name: frame.config.sentinelName, sentinelRootNodes: sentinels, sentinelClientOptions: {password: password}, nodeClientOptions: {password: password} }) - await factory.updateSentinelRootNodes(); - - master = await factory.getMasterClient(); - await master.connect(); - - assert.equal(await master.set("x", 1), 'OK'); - }) - - it('sentinel factory - replica', async function () { - const sentinelPorts = frame.getAllSentinelsPort(); - const sentinels: Array = []; - - for (const port of sentinelPorts) { - sentinels.push({ host: "localhost", port: port }); - } - - const factory = new RedisSentinelFactory({ name: frame.config.sentinelName, sentinelRootNodes: sentinels, sentinelClientOptions: {password: password}, nodeClientOptions: {password: password} }) - await factory.updateSentinelRootNodes(); - - const masterNode = await factory.getMasterNode(); - replica = await factory.getReplicaClient(); - const replicaSocketOptions = replica.options?.socket as unknown as RedisTcpSocketOptions | undefined; - assert.notEqual(masterNode.port, replicaSocketOptions?.port) - }) - - it('sentinel factory - bad node', async function () { - const factory = new RedisSentinelFactory({ name: frame.config.sentinelName, sentinelRootNodes: [{ host: "locahost", port: 1 }] }); - await assert.rejects(factory.updateSentinelRootNodes(), new Error("Couldn't connect to any sentinel node")); - }) - - it('sentinel factory - invalid db name', async function () { - this.timeout(15000); - - const sentinelPorts = frame.getAllSentinelsPort(); - const sentinels: Array = []; - - for (const port of sentinelPorts) { - sentinels.push({ host: "localhost", port: port }); - } - - const factory = new RedisSentinelFactory({ name: "invalid-name", sentinelRootNodes: sentinels, sentinelClientOptions: {password: password}, nodeClientOptions: {password: password} }) - await assert.rejects(factory.updateSentinelRootNodes(), new Error("ERR No such master with that name")); - }) - - it('sentinel factory - no available nodes', async function () { - this.timeout(15000); - - const sentinelPorts = frame.getAllSentinelsPort(); - const sentinels: Array = []; - - for (const port of sentinelPorts) { - sentinels.push({ host: "localhost", port: port }); - } - - const factory = new RedisSentinelFactory({ name: frame.config.sentinelName, sentinelRootNodes: sentinels, sentinelClientOptions: {password: password}, nodeClientOptions: {password: password} }) - - for (const node of frame.getAllNodesPort()) { - await frame.stopNode(node.toString()); } - - await setTimeout(1000); - - await assert.rejects(factory.getMasterNode(), new Error("Master Node Not Enumerated")); - }) + }); + + tracer.push("adding node"); + await frame.addNode(); + tracer.push("added node and waiting on added promise"); + await nodeAddedPromise; }) - }) + }); }); + + + diff --git a/packages/client/lib/sentinel/index.ts b/packages/client/lib/sentinel/index.ts index 2ec445e639b..e6d5b9a2dba 100644 --- a/packages/client/lib/sentinel/index.ts +++ b/packages/client/lib/sentinel/index.ts @@ -124,8 +124,11 @@ export class RedisSentinelClient< value: V ) { const proxy = Object.create(this); - proxy._commandOptions = Object.create(this._self.#commandOptions ?? null); - proxy._commandOptions[key] = value; + // Create new commandOptions object with the inherited properties + proxy._self.#commandOptions = { + ...(this._self.#commandOptions || {}), + [key]: value + }; return proxy as RedisSentinelClientType< M, F, @@ -328,7 +331,7 @@ export default class RedisSentinel< TYPE_MAPPING extends TypeMapping, >(options: OPTIONS) { const proxy = Object.create(this); - proxy._commandOptions = options; + proxy._self.#commandOptions = options; return proxy as RedisSentinelType< M, F, @@ -345,9 +348,12 @@ export default class RedisSentinel< key: K, value: V ) { - const proxy = Object.create(this._self); - proxy._commandOptions = Object.create(this._self.#commandOptions ?? null); - proxy._commandOptions[key] = value; + const proxy = Object.create(this); + // Create new commandOptions object with the inherited properties + proxy._self.#commandOptions = { + ...(this._self.#commandOptions || {}), + [key]: value + }; return proxy as RedisSentinelType< M, F, diff --git a/packages/client/lib/sentinel/test-util.ts b/packages/client/lib/sentinel/test-util.ts index 520cc73110d..38a4c891616 100644 --- a/packages/client/lib/sentinel/test-util.ts +++ b/packages/client/lib/sentinel/test-util.ts @@ -80,7 +80,7 @@ abstract class DockerBase { async dockerRemove(dockerId: string): Promise { try { - await this.dockerStop(dockerId); `` + await this.dockerStop(dockerId); } catch (err) { // its ok if stop failed, as we are just going to remove, will just be slower console.log(`dockerStop failed in remove: ${err}`); diff --git a/packages/client/lib/test-utils.ts b/packages/client/lib/test-utils.ts index f7862a9d685..223278e04de 100644 --- a/packages/client/lib/test-utils.ts +++ b/packages/client/lib/test-utils.ts @@ -2,9 +2,11 @@ import TestUtils from '@redis/test-utils'; import { SinonSpy } from 'sinon'; import { setTimeout } from 'node:timers/promises'; import { CredentialsProvider } from './authx'; -import { Command } from './RESP/types'; -import { BasicCommandParser } from './client/parser'; - +import { Command, NumberReply } from './RESP/types'; +import { BasicCommandParser, CommandParser } from './client/parser'; +import { MATH_FUNCTION } from './commands/FUNCTION_LOAD.spec'; +import { defineScript } from './lua-script'; +import RedisBloomModules from '@redis/bloom'; const utils = TestUtils.createFromConfig({ dockerImageName: 'redislabs/client-libs-test', dockerImageVersionArgument: 'redis-version', @@ -42,6 +44,18 @@ const streamingCredentialsProvider: CredentialsProvider = } as const; +const SQUARE_SCRIPT = defineScript({ + SCRIPT: + `local number = redis.call('GET', KEYS[1]) + return number * number`, + NUMBER_OF_KEYS: 1, + FIRST_KEY_INDEX: 0, + parseCommand(parser: CommandParser, key: string) { + parser.pushKey(key); + }, + transformReply: undefined as unknown as () => NumberReply +}); + export const GLOBAL = { SERVERS: { OPEN: { @@ -86,6 +100,51 @@ export const GLOBAL = { useReplicas: true } } + }, + SENTINEL: { + OPEN: { + serverArguments: [...DEBUG_MODE_ARGS], + password: undefined, + }, + PASSWORD: { + serverArguments: [...DEBUG_MODE_ARGS], + password: 'test_password', + }, + WITH_SCRIPT: { + serverArguments: [...DEBUG_MODE_ARGS], + password: undefined, + scripts: { + square: SQUARE_SCRIPT, + }, + }, + WITH_FUNCTION: { + serverArguments: [...DEBUG_MODE_ARGS], + password: undefined, + functions: { + math: MATH_FUNCTION.library, + } + }, + WITH_MODULE: { + serverArguments: [...DEBUG_MODE_ARGS], + password: undefined, + modules: RedisBloomModules, + }, + WITH_REPLICA_POOL_SIZE_1: { + serverArguments: [...DEBUG_MODE_ARGS], + password: undefined, + replicaPoolSize: 1, + }, + WITH_RESERVE_CLIENT_MASTER_POOL_SIZE_2: { + serverArguments: [...DEBUG_MODE_ARGS], + password: undefined, + masterPoolSize: 2, + reserveClient: true, + }, + WITH_MASTER_POOL_SIZE_2: { + serverArguments: [...DEBUG_MODE_ARGS], + password: undefined, + masterPoolSize: 2, + } } }; diff --git a/packages/test-utils/lib/dockers.ts b/packages/test-utils/lib/dockers.ts index e3ff5edc38b..2aeb33919b3 100644 --- a/packages/test-utils/lib/dockers.ts +++ b/packages/test-utils/lib/dockers.ts @@ -4,7 +4,6 @@ import { once } from 'node:events'; import { createClient } from '@redis/client/index'; import { setTimeout } from 'node:timers/promises'; // import { ClusterSlotsReply } from '@redis/client/dist/lib/commands/CLUSTER_SLOTS'; - import { execFile as execFileCallback } from 'node:child_process'; import { promisify } from 'node:util'; @@ -51,21 +50,30 @@ export interface RedisServerDocker { async function spawnRedisServerDocker({ image, version -}: RedisServerDockerConfig, serverArguments: Array): Promise { +}: RedisServerDockerConfig, serverArguments: Array, dockerEnv?: Map): Promise { const port = (await portIterator.next()).value; const portStr = port.toString(); const dockerArgs = [ 'run', - '-e', `PORT=${portStr}`, + '--init', + '-e', `PORT=${portStr}` + ]; + + dockerEnv?.forEach((key: string, value: string) => { + dockerArgs.push('-e', `${key}:${value}`); + }); + + dockerArgs.push( '-d', '--network', 'host', - `${image}:${version}`, - '--port', portStr - ]; + `${image}:${version}` + ); if (serverArguments.length > 0) { - dockerArgs.push(...serverArguments); + for (let i = 0; i < serverArguments.length; i++) { + dockerArgs.push(serverArguments[i].replace('{port}', `${portStr}`)) + } } console.log(`[Docker] Spawning Redis container - Image: ${image}:${version}, Port: ${port}`); @@ -291,3 +299,91 @@ after(() => { }) ); }); + + +const RUNNING_NODES = new Map, Array>(); +const RUNNING_SENTINELS = new Map, Array>(); + +export async function spawnRedisSentinel( + dockerConfigs: RedisServerDockerConfig, + serverArguments: Array, + password?: string, +): Promise> { + const runningNodes = RUNNING_SENTINELS.get(serverArguments); + if (runningNodes) { + return runningNodes; + } + + const dockerEnv: Map = new Map(); + + if (password !== undefined) { + dockerEnv.set("REDIS_PASSWORD", password); + serverArguments.push("--requirepass", password) + } + + const master = await spawnRedisServerDocker(dockerConfigs, serverArguments, dockerEnv); + const redisNodes: Array = [master]; + const replicaPromises: Array> = []; + + const replicasCount = 2; + for (let i = 0; i < replicasCount; i++) { + replicaPromises.push((async () => { + const replica = await spawnRedisServerDocker(dockerConfigs, serverArguments, dockerEnv); + const client = createClient({ + socket: { + port: replica.port + }, + password: password + }); + + await client.connect(); + await client.replicaOf("127.0.0.1", master.port); + await client.close(); + + return replica; + })()); + } + + const replicas = await Promise.all(replicaPromises); + redisNodes.push(...replicas); + RUNNING_NODES.set(serverArguments, redisNodes); + + const sentinelPromises: Array> = []; + const sentinelCount = 3; + + for (let i = 0; i < sentinelCount; i++) { + sentinelPromises.push((async () => { + const sentinelArgs: Array = ["sh", "-c"]; + + let sentinelConfig = ` +port {port} +sentinel monitor mymaster 127.0.0.1 ${master.port} 2 +sentinel down-after-milliseconds mymaster 5000 +sentinel failover-timeout mymaster 6000 +`; + if (password !== undefined) { + sentinelConfig += `requirepass ${password}\n` + sentinelConfig += `sentinel auth-pass mymaster ${password}\n` + } + + sentinelArgs.push(`echo "${sentinelConfig}" > /tmp/sentinel.conf && redis-sentinel /tmp/sentinel.conf`); + return await spawnRedisServerDocker({image: "redis", version: "latest"}, sentinelArgs); + })()); + } + + const sentinelNodes = await Promise.all(sentinelPromises); + RUNNING_SENTINELS.set(serverArguments, sentinelNodes); + + return sentinelNodes; +} + + +after(() => { + return Promise.all( + [...RUNNING_NODES.values(), ...RUNNING_SENTINELS.values()].map(async dockersPromise => { + return Promise.all( + dockersPromise.map(({ dockerId }) => dockerRemove(dockerId)) + ); + }) + ); +}); \ No newline at end of file diff --git a/packages/test-utils/lib/index.ts b/packages/test-utils/lib/index.ts index 1c564749ff2..d793d5d6bde 100644 --- a/packages/test-utils/lib/index.ts +++ b/packages/test-utils/lib/index.ts @@ -6,8 +6,11 @@ import { TypeMapping, // CommandPolicies, createClient, + createSentinel, RedisClientOptions, RedisClientType, + RedisSentinelOptions, + RedisSentinelType, RedisPoolOptions, RedisClientPoolType, createClientPool, @@ -15,7 +18,8 @@ import { RedisClusterOptions, RedisClusterType } from '@redis/client/index'; -import { RedisServerDockerConfig, spawnRedisServer, spawnRedisCluster } from './dockers'; +import { RedisNode } from '@redis/client/lib/sentinel/types' +import { RedisServerDockerConfig, spawnRedisServer, spawnRedisCluster, spawnRedisSentinel } from './dockers'; import yargs from 'yargs'; import { hideBin } from 'yargs/helpers'; @@ -68,6 +72,25 @@ interface ClientTestOptions< disableClientSetup?: boolean; } +interface SentinelTestOptions< + M extends RedisModules, + F extends RedisFunctions, + S extends RedisScripts, + RESP extends RespVersions, + TYPE_MAPPING extends TypeMapping +> extends CommonTestOptions { + sentinelOptions?: Partial>; + clientOptions?: Partial>; + scripts?: S; + functions?: F; + modules?: M; + password?: string; + disableClientSetup?: boolean; + replicaPoolSize?: number; + masterPoolSize?: number; + reserveClient?: boolean; +} + interface ClientPoolTestOptions< M extends RedisModules, F extends RedisFunctions, @@ -179,7 +202,6 @@ export default class TestUtils { } isVersionGreaterThanHook(minimumVersion: Array | undefined): void { - const isVersionGreaterThanHook = this.isVersionGreaterThan.bind(this); const versionNumber = this.#VERSION_NUMBERS.join('.'); const minimumVersionString = minimumVersion?.join('.'); @@ -274,6 +296,76 @@ export default class TestUtils { }); } + testWithClientSentinel< + M extends RedisModules = {}, + F extends RedisFunctions = {}, + S extends RedisScripts = {}, + RESP extends RespVersions = 2, + TYPE_MAPPING extends TypeMapping = {} + >( + title: string, + fn: (sentinel: RedisSentinelType) => unknown, + options: SentinelTestOptions + ): void { + let dockerPromises: ReturnType; + + if (this.isVersionGreaterThan(options.minimumDockerVersion)) { + const dockerImage = this.#DOCKER_IMAGE; + before(function () { + this.timeout(30000); + + dockerPromises = spawnRedisSentinel(dockerImage, options.serverArguments, options?.password); + return dockerPromises; + }); + } + + it(title, async function () { + this.timeout(30000); + if (options.skipTest) return this.skip(); + if (!dockerPromises) return this.skip(); + + + const promises = await dockerPromises; + const rootNodes: Array = promises.map(promise => ({ + host: "127.0.0.1", + port: promise.port + })); + + const sentinel = createSentinel({ + name: 'mymaster', + sentinelRootNodes: rootNodes, + nodeClientOptions: { + password: options?.password || undefined, + }, + sentinelClientOptions: { + password: options?.password || undefined, + }, + replicaPoolSize: options?.replicaPoolSize || 0, + scripts: options?.scripts || {}, + modules: options?.modules || {}, + functions: options?.functions || {}, + masterPoolSize: options?.masterPoolSize || undefined, + reserveClient: options?.reserveClient || false, + }) as RedisSentinelType; + + if (options.disableClientSetup) { + return fn(sentinel); + } + + await sentinel.connect(); + + try { + await sentinel.flushAll(); + await fn(sentinel); + } finally { + if (sentinel.isOpen) { + await sentinel.flushAll(); + sentinel.destroy(); + } + } + }); + } + testWithClientIfVersionWithinRange< M extends RedisModules = {}, F extends RedisFunctions = {}, @@ -292,8 +384,27 @@ export default class TestUtils { } else { console.warn(`Skipping test ${title} because server version ${this.#VERSION_NUMBERS.join('.')} is not within range ${range[0].join(".")} - ${range[1] !== 'LATEST' ? range[1].join(".") : 'LATEST'}`) } + } + testWithClienSentineltIfVersionWithinRange< + M extends RedisModules = {}, + F extends RedisFunctions = {}, + S extends RedisScripts = {}, + RESP extends RespVersions = 2, + TYPE_MAPPING extends TypeMapping = {} +>( + range: ([minVersion: Array, maxVersion: Array] | [minVersion: Array, 'LATEST']), + title: string, + fn: (sentinel: RedisSentinelType) => unknown, + options: SentinelTestOptions +): void { + + if (this.isVersionInRange(range[0], range[1] === 'LATEST' ? [Infinity, Infinity, Infinity] : range[1])) { + return this.testWithClientSentinel(`${title} [${range[0].join('.')}] - [${(range[1] === 'LATEST') ? range[1] : range[1].join(".")}] `, fn, options) + } else { + console.warn(`Skipping test ${title} because server version ${this.#VERSION_NUMBERS.join('.')} is not within range ${range[0].join(".")} - ${range[1] !== 'LATEST' ? range[1].join(".") : 'LATEST'}`) } +} testWithClientPool< M extends RedisModules = {}, From 3d8caee2ac87a6fcd51eba2b170839ab29c2be96 Mon Sep 17 00:00:00 2001 From: "H. Temelski" Date: Fri, 25 Apr 2025 16:13:42 +0300 Subject: [PATCH 6/7] [CAE-342] Minor refactor --- packages/client/lib/sentinel/index.spec.ts | 7 +++-- packages/client/lib/test-utils.ts | 30 ++++++++++++++++++++-- 2 files changed, 31 insertions(+), 6 deletions(-) diff --git a/packages/client/lib/sentinel/index.spec.ts b/packages/client/lib/sentinel/index.spec.ts index a4fa361eed2..8441ad6c5f8 100644 --- a/packages/client/lib/sentinel/index.spec.ts +++ b/packages/client/lib/sentinel/index.spec.ts @@ -1,12 +1,11 @@ import { strict as assert } from 'node:assert'; import { setTimeout } from 'node:timers/promises'; -import testUtils, { GLOBAL } from '../test-utils'; +import testUtils, { GLOBAL, MATH_FUNCTION } from '../test-utils'; import { RESP_TYPES } from '../RESP/decoder'; import { WatchError } from "../errors"; -import { MATH_FUNCTION } from '../commands/FUNCTION_LOAD.spec'; import { RedisSentinelConfig, SentinelFramework } from "./test-util"; import { RedisSentinelEvent, RedisSentinelType, RedisSentinelClientType, RedisNode } from "./types"; -import { RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping } from '../RESP/types'; +import { RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping, NumberReply } from '../RESP/types'; import { promisify } from 'node:util'; import { exec } from 'node:child_process'; const execAsync = promisify(exec); @@ -417,7 +416,7 @@ describe('legacy tests', () => { }) describe('Sentinel Client', function () { - let sentinel: RedisSentinelType< RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping> | undefined; + let sentinel: RedisSentinelType | undefined; beforeEach(async function () { this.timeout(0); diff --git a/packages/client/lib/test-utils.ts b/packages/client/lib/test-utils.ts index 223278e04de..2b27e702050 100644 --- a/packages/client/lib/test-utils.ts +++ b/packages/client/lib/test-utils.ts @@ -4,7 +4,6 @@ import { setTimeout } from 'node:timers/promises'; import { CredentialsProvider } from './authx'; import { Command, NumberReply } from './RESP/types'; import { BasicCommandParser, CommandParser } from './client/parser'; -import { MATH_FUNCTION } from './commands/FUNCTION_LOAD.spec'; import { defineScript } from './lua-script'; import RedisBloomModules from '@redis/bloom'; const utils = TestUtils.createFromConfig({ @@ -56,6 +55,33 @@ const SQUARE_SCRIPT = defineScript({ transformReply: undefined as unknown as () => NumberReply }); +export const MATH_FUNCTION = { + name: 'math', + engine: 'LUA', + code: + `#!LUA name=math + redis.register_function { + function_name = "square", + callback = function(keys, args) + local number = redis.call('GET', keys[1]) + return number * number + end, + flags = { "no-writes" } + }`, + library: { + square: { + NAME: 'square', + IS_READ_ONLY: true, + NUMBER_OF_KEYS: 1, + FIRST_KEY_INDEX: 0, + parseCommand(parser: CommandParser, key: string) { + parser.pushKey(key); + }, + transformReply: undefined as unknown as () => NumberReply + } + } +}; + export const GLOBAL = { SERVERS: { OPEN: { @@ -122,7 +148,7 @@ export const GLOBAL = { password: undefined, functions: { math: MATH_FUNCTION.library, - } + }, }, WITH_MODULE: { serverArguments: [...DEBUG_MODE_ARGS], From 07b7cdf8f39854ae265dfc482bfc83c492ba39b9 Mon Sep 17 00:00:00 2001 From: "H. Temelski" Date: Fri, 25 Apr 2025 16:22:28 +0300 Subject: [PATCH 7/7] . --- packages/client/lib/sentinel/index.spec.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/client/lib/sentinel/index.spec.ts b/packages/client/lib/sentinel/index.spec.ts index 8441ad6c5f8..89690d29da9 100644 --- a/packages/client/lib/sentinel/index.spec.ts +++ b/packages/client/lib/sentinel/index.spec.ts @@ -448,6 +448,7 @@ describe('legacy tests', () => { frame.sentinelMaster(), frame.sentinelReplicas() ]) + console.log(`sentinel sentinels:\n${JSON.stringify(results[0], undefined, '\t')}`); console.log(`sentinel master:\n${JSON.stringify(results[1], undefined, '\t')}`); console.log(`sentinel replicas:\n${JSON.stringify(results[2], undefined, '\t')}`);