diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 9af4550691b..d58e32329ea 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -577,11 +577,12 @@ export default class RedisCommandsQueue { /** * Gets all commands from the write queue without removing them. */ - getAllCommands(): CommandToWrite[] { + extractAllCommands(): CommandToWrite[] { const result: CommandToWrite[] = []; let current = this.#toWrite.head; while(current) { result.push(current.value); + this.#toWrite.remove(current); current = current.next; } return result; @@ -591,7 +592,11 @@ export default class RedisCommandsQueue { * Prepends commands to the write queue in reverse. */ prependCommandsToWrite(commands: CommandToWrite[]) { - for (let i = commands.length - 1; i <= 0; i--) { + if (!commands.length) { + return; + } + + for (let i = commands.length - 1; i >= 0; i--) { this.#toWrite.unshift(commands[i]); } } diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index 9ad044d0a15..faf345b4148 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -1018,7 +1018,7 @@ export default class RedisClient< * @internal */ _getQueue(): RedisCommandsQueue { - return this.#queue; + return this._self.#queue; } /** diff --git a/packages/client/lib/cluster/cluster-slots.ts b/packages/client/lib/cluster/cluster-slots.ts index b803625c22c..dfda846f7f8 100644 --- a/packages/client/lib/cluster/cluster-slots.ts +++ b/packages/client/lib/cluster/cluster-slots.ts @@ -356,7 +356,7 @@ export default class RedisClusterSlots< } else { // If source shard doesnt have any slots left, this means we can safely move all commands to the new shard. // Same goes for sharded pub sub listeners - const normalCommandsToMove = sourceNode.client!._getQueue().getAllCommands(); + const normalCommandsToMove = sourceNode.client!._getQueue().extractAllCommands(); // 5. Prepend extracted commands, chans destMasterNode.client?._getQueue().prependCommandsToWrite(normalCommandsToMove); if('pubSub' in sourceNode) { @@ -371,9 +371,13 @@ export default class RedisClusterSlots< this.nodeByAddress.delete(sourceAddress); // 4.3 Kill because no slots are pointing to it anymore - await sourceNode.client?.close() + if (sourceNode.client?.isOpen) { + await sourceNode.client?.close() + } if('pubSub' in sourceNode) { - await sourceNode.pubSub?.client.close(); + if (sourceNode.pubSub?.client.isOpen) { + await sourceNode.pubSub?.client.close(); + } } } diff --git a/packages/client/lib/cluster/maintenance.spec.ts b/packages/client/lib/cluster/maintenance.spec.ts new file mode 100644 index 00000000000..f19279cff64 --- /dev/null +++ b/packages/client/lib/cluster/maintenance.spec.ts @@ -0,0 +1,1088 @@ +import assert from "node:assert"; +import { setTimeout } from "node:timers/promises"; +import diagnostics_channel from "node:diagnostics_channel"; + +import testUtils from "../test-utils"; +import { DiagnosticsEvent } from "../client/enterprise-maintenance-manager"; + +describe("Cluster Maintenance", () => { + const KEYS = [ + "channel:11kv:1000", + "channel:osy:2000", + "channel:jn6:3000", + "channel:l00:4000", + "channel:4ez:5000", + "channel:4ek:6000", + "channel:9vn:7000", + "channel:dw1:8000", + "channel:9zi:9000", + "channel:4vl:10000", + "channel:utl:11000", + "channel:lyo:12000", + "channel:jzn:13000", + "channel:14uc:14000", + "channel:mz:15000", + "channel:d0v:16000", + ]; + + before(() => { + process.env.REDIS_EMIT_DIAGNOSTICS = "1"; + }); + + after(() => { + delete process.env.REDIS_EMIT_DIAGNOSTICS; + }); + + let diagnosticEvents: DiagnosticsEvent[] = []; + + const onMessage = (message: unknown) => { + const event = message as DiagnosticsEvent; + if (["SMIGRATING", "SMIGRATED"].includes(event.type)) { + diagnosticEvents.push(event); + } + }; + + beforeEach(() => { + diagnostics_channel.subscribe("redis.maintenance", onMessage); + diagnosticEvents = []; + }); + + afterEach(() => { + diagnostics_channel.unsubscribe("redis.maintenance", onMessage); + }); + + describe("Notifications", () => { + testUtils.testWithProxiedCluster( + "should NOT receive notifications when maintNotifications is disabled", + async (_cluster, faultInjectorClient) => { + assert.equal( + diagnosticEvents.length, + 0, + "should not have received any notifications yet" + ); + + // Trigger migration + await faultInjectorClient.triggerAction({ + type: "migrate", + parameters: { + cluster_index: 0, + }, + }); + + // Verify NO maintenance notifications received + assert.strictEqual( + diagnosticEvents.length, + 0, + "should NOT receive any SMIGRATING/SMIGRATED notifications when disabled" + ); + }, + { + numberOfMasters: 3, + clusterConfiguration: { + defaults: { + maintNotifications: "disabled", + }, + RESP: 3, + }, + } + ); + }); + + describe("Migrate - source: dying -> dest: existing", () => { + const MASTERS_COUNT = 3; + const MIGRATE_ACTION = { + type: "migrate", + parameters: { + cluster_index: 0, + slot_migration: "all", + destination_type: "existing", + }, + } as const; + + const PROXIED_CLUSTER_OPTIONS = { + freshContainer: true, + numberOfMasters: MASTERS_COUNT, + clusterConfiguration: { + defaults: { + maintNotifications: "enabled", + maintEndpointType: "auto", + }, + RESP: 3, + }, + } as const; + + testUtils.testWithProxiedCluster( + "normal - should handle migration", + async (cluster, faultInjectorClient) => { + const initialMasterAddresses = new Set( + cluster.masters.map((m) => m.address) + ); + + assert.equal( + diagnosticEvents.length, + 0, + "should not have received any notifications yet" + ); + assert.equal( + cluster.masters.length, + MASTERS_COUNT, + `should have ${MASTERS_COUNT} masters at start` + ); + + // Trigger migration + await faultInjectorClient.triggerAction(MIGRATE_ACTION); + + // Verify notifications were received + const sMigratingEventCount = diagnosticEvents.filter( + (event) => event.type === "SMIGRATING" + ).length; + assert( + sMigratingEventCount >= 1, + "should have received at least one SMIGRATING notification" + ); + const sMigratedEventCount = diagnosticEvents.filter( + (event) => event.type === "SMIGRATED" + ).length; + assert( + sMigratedEventCount >= 1, + "should have received at least one SMIGRATED notification" + ); + + // Verify topology changed + assert.equal( + cluster.masters.length, + MASTERS_COUNT - 1, + `should have ${MASTERS_COUNT - 1} masters after migrate` + ); + + // Verify at least one master address changed + const currentMasterAddresses = new Set( + cluster.masters.map((m) => m.address) + ); + + assert.notDeepStrictEqual( + currentMasterAddresses, + initialMasterAddresses, + "addresses should NOT be the same" + ); + + // Verify data is still accessible after migrate + for (const key of KEYS) { + await cluster.set(key, `updated-${key}`); + const value = await cluster.get(key); + assert.strictEqual( + value, + `updated-${key}`, + `New writes should succeed for ${key}` + ); + } + }, + PROXIED_CLUSTER_OPTIONS + ); + + testUtils.testWithProxiedCluster( + "sharded pubsub - should handle migration", + async (cluster, faultInjectorClient) => { + const stats: Record = {}; + for (const channel of KEYS) { + stats[channel] = { sent: 0, received: 0 }; + } + + for (const channel of KEYS) { + await cluster.sSubscribe(channel, (_msg, ch) => { + stats[ch].received++; + }); + } + + // Start publishing messages continuously + const publishController = new AbortController(); + const publishPromise = (async () => { + while (!publishController.signal.aborted) { + for (const channel of KEYS) { + cluster + .sPublish(channel, `${Date.now()}`) + .then(() => { + stats[channel].sent++; + }) + .catch(() => { + // Ignore publish errors during migrate + }); + } + await setTimeout(50); + } + })(); + + // Trigger migration during publishing + await faultInjectorClient.triggerAction(MIGRATE_ACTION); + + // Stop publishing + publishController.abort(); + await publishPromise; + + for (const channel of KEYS) { + assert.ok( + stats[channel].received <= stats[channel].sent, + `Channel ${channel}: received (${stats[channel].received}) should be <= sent (${stats[channel].sent}) during migrate` + ); + } + + // Wait for cluster to stabilize + await setTimeout(1000); + + // Reset stats for after-migration verification + for (const channel of KEYS) { + stats[channel] = { sent: 0, received: 0 }; + } + + // Publish messages after migration - all should be received + for (const channel of KEYS) { + for (let i = 0; i < 10; i++) { + await cluster.sPublish(channel, `after-${i}`); + stats[channel].sent++; + } + } + + // Wait for messages to be received + await setTimeout(500); + + // Verify all messages received after migration (subscription preserved) + for (const channel of KEYS) { + assert.strictEqual( + stats[channel].received, + stats[channel].sent, + `Channel ${channel}: all messages (${stats[channel].sent}) should be received after migrate - subscription preserved` + ); + } + }, + PROXIED_CLUSTER_OPTIONS + ); + + testUtils.testWithProxiedCluster( + "pubsub - should handle migration", + async (cluster, faultInjectorClient) => { + const stats: Record = {}; + for (const channel of KEYS) { + stats[channel] = { sent: 0, received: 0 }; + } + + for (const channel of KEYS) { + await cluster.subscribe(channel, (_msg, ch) => { + stats[ch].received++; + }); + } + + // Start publishing messages continuously + const publishController = new AbortController(); + const publishPromise = (async () => { + while (!publishController.signal.aborted) { + for (const channel of KEYS) { + cluster + .publish(channel, `${Date.now()}`) + .then(() => { + stats[channel].sent++; + }) + .catch(() => { + // Ignore publish errors during migrate + }); + } + await setTimeout(50); + } + })(); + + // Trigger migration during publishing + await faultInjectorClient.triggerAction(MIGRATE_ACTION); + + // Stop publishing + publishController.abort(); + await publishPromise; + + for (const channel of KEYS) { + assert.ok( + stats[channel].received <= stats[channel].sent, + `Channel ${channel}: received (${stats[channel].received}) should be <= sent (${stats[channel].sent}) during migrate` + ); + } + + // Wait for cluster to stabilize + await setTimeout(1000); + + // Reset stats for after-migration verification + for (const channel of KEYS) { + stats[channel] = { sent: 0, received: 0 }; + } + + // Publish messages after migration - all should be received + for (const channel of KEYS) { + for (let i = 0; i < 10; i++) { + await cluster.publish(channel, `after-${i}`); + stats[channel].sent++; + } + } + + // Wait for messages to be received + await setTimeout(500); + + // Verify all messages received after migration (subscription preserved) + for (const channel of KEYS) { + assert.strictEqual( + stats[channel].received, + stats[channel].sent, + `Channel ${channel}: all messages (${stats[channel].sent}) should be received after migrate - subscription preserved` + ); + } + }, + PROXIED_CLUSTER_OPTIONS + ); + }); + + describe("Migrate - source: dying -> dest: new", () => { + const MASTERS_NODES_COUNT = 3; + const VISIBLE_NODES_COUNT = 2; + const MIGRATE_ACTION = { + type: "migrate", + parameters: { + cluster_index: 0, + slot_migration: "all", + destination_type: "new", + }, + } as const; + + const PROXIED_CLUSTER_OPTIONS = { + freshContainer: true, + numberOfMasters: MASTERS_NODES_COUNT, + startWithReducedNodes: true, + clusterConfiguration: { + defaults: { + maintNotifications: "enabled", + maintEndpointType: "auto", + }, + RESP: 3, + }, + } as const; + + testUtils.testWithProxiedCluster( + "normal - should handle migration", + async (cluster, faultInjectorClient) => { + const initialMasterAddresses = new Set( + cluster.masters.map((m) => m.address) + ); + + assert.equal( + diagnosticEvents.length, + 0, + "should not have received any notifications yet" + ); + assert.equal( + cluster.masters.length, + VISIBLE_NODES_COUNT, + `should have ${VISIBLE_NODES_COUNT} masters at start` + ); + + // Trigger migration + await faultInjectorClient.triggerAction(MIGRATE_ACTION); + + // Wait for cluster to stabilize + await setTimeout(1000); + + // Verify notifications were received + const sMigratingEventCount = diagnosticEvents.filter( + (event) => event.type === "SMIGRATING" + ).length; + assert( + sMigratingEventCount >= 1, + "should have received at least one SMIGRATING notification" + ); + const sMigratedEventCount = diagnosticEvents.filter( + (event) => event.type === "SMIGRATED" + ).length; + assert( + sMigratedEventCount >= 1, + "should have received at least one SMIGRATED notification" + ); + + // Verify topology changed + assert.equal( + cluster.masters.length, + VISIBLE_NODES_COUNT, + `should have ${VISIBLE_NODES_COUNT} masters after migrate` + ); + + // Verify at least one master address changed + const currentMasterAddresses = new Set( + cluster.masters.map((m) => m.address) + ); + + assert.notDeepStrictEqual( + currentMasterAddresses, + initialMasterAddresses, + "addresses should NOT be the same" + ); + + // Verify data is still accessible after migrate + for (const key of KEYS) { + await cluster.set(key, `updated-${key}`); + const value = await cluster.get(key); + assert.strictEqual( + value, + `updated-${key}`, + `New writes should succeed for ${key}` + ); + } + }, + PROXIED_CLUSTER_OPTIONS + ); + + testUtils.testWithProxiedCluster( + "sharded pubsub - should handle migration", + async (cluster, faultInjectorClient) => { + const stats: Record = {}; + for (const channel of KEYS) { + stats[channel] = { sent: 0, received: 0 }; + } + + for (const channel of KEYS) { + await cluster.sSubscribe(channel, (_msg, ch) => { + stats[ch].received++; + }); + } + + // Start publishing messages continuously + const publishController = new AbortController(); + const publishPromise = (async () => { + while (!publishController.signal.aborted) { + for (const channel of KEYS) { + cluster + .sPublish(channel, `${Date.now()}`) + .then(() => { + stats[channel].sent++; + }) + .catch(() => { + // Ignore publish errors during migrate + }); + } + await setTimeout(50); + } + })(); + + // Trigger migration during publishing + await faultInjectorClient.triggerAction(MIGRATE_ACTION); + + // Stop publishing + publishController.abort(); + await publishPromise; + + for (const channel of KEYS) { + assert.ok( + stats[channel].received <= stats[channel].sent, + `Channel ${channel}: received (${stats[channel].received}) should be <= sent (${stats[channel].sent}) during migrate` + ); + } + + // Wait for cluster to stabilize + await setTimeout(1000); + + // Reset stats for after-migration verification + for (const channel of KEYS) { + stats[channel] = { sent: 0, received: 0 }; + } + + // Publish messages after migration - all should be received + for (const channel of KEYS) { + for (let i = 0; i < 10; i++) { + await cluster.sPublish(channel, `after-${i}`); + stats[channel].sent++; + } + } + + // Wait for messages to be received + await setTimeout(500); + + // Verify all messages received after migration (subscription preserved) + for (const channel of KEYS) { + assert.strictEqual( + stats[channel].received, + stats[channel].sent, + `Channel ${channel}: all messages (${stats[channel].sent}) should be received after migrate - subscription preserved` + ); + } + }, + PROXIED_CLUSTER_OPTIONS + ); + + testUtils.testWithProxiedCluster( + "pubsub - should handle migration", + async (cluster, faultInjectorClient) => { + const stats: Record = {}; + for (const channel of KEYS) { + stats[channel] = { sent: 0, received: 0 }; + } + + for (const channel of KEYS) { + await cluster.subscribe(channel, (_msg, ch) => { + stats[ch].received++; + }); + } + + // Start publishing messages continuously + const publishController = new AbortController(); + const publishPromise = (async () => { + while (!publishController.signal.aborted) { + for (const channel of KEYS) { + cluster + .publish(channel, `${Date.now()}`) + .then(() => { + stats[channel].sent++; + }) + .catch(() => { + // Ignore publish errors during migrate + }); + } + await setTimeout(50); + } + })(); + + // Trigger migration during publishing + await faultInjectorClient.triggerAction(MIGRATE_ACTION); + + // Stop publishing + publishController.abort(); + await publishPromise; + + for (const channel of KEYS) { + assert.ok( + stats[channel].received <= stats[channel].sent, + `Channel ${channel}: received (${stats[channel].received}) should be <= sent (${stats[channel].sent}) during migrate` + ); + } + + // Wait for cluster to stabilize + await setTimeout(1000); + + // Reset stats for after-migration verification + for (const channel of KEYS) { + stats[channel] = { sent: 0, received: 0 }; + } + + // Publish messages after migration - all should be received + for (const channel of KEYS) { + for (let i = 0; i < 10; i++) { + await cluster.publish(channel, `after-${i}`); + stats[channel].sent++; + } + } + + // Wait for messages to be received + await setTimeout(500); + + // Verify all messages received after migration (subscription preserved) + for (const channel of KEYS) { + assert.strictEqual( + stats[channel].received, + stats[channel].sent, + `Channel ${channel}: all messages (${stats[channel].sent}) should be received after migrate - subscription preserved` + ); + } + }, + PROXIED_CLUSTER_OPTIONS + ); + }); + + describe("Migrate - source: active -> dest: existing", () => { + const MASTERS_COUNT = 3; + const MIGRATE_ACTION = { + type: "migrate", + parameters: { + cluster_index: 0, + slot_migration: "half", + destination_type: "existing", + }, + } as const; + + const PROXIED_CLUSTER_OPTIONS = { + numberOfMasters: MASTERS_COUNT, + freshContainer: true, + clusterConfiguration: { + defaults: { + maintNotifications: "enabled", + maintEndpointType: "auto", + }, + RESP: 3, + }, + } as const; + + testUtils.testWithProxiedCluster( + "normal - should handle migration", + async (cluster, faultInjectorClient) => { + const initialMasterAddresses = new Set( + cluster.masters.map((m) => m.address) + ); + + assert.equal( + diagnosticEvents.length, + 0, + "should not have received any notifications yet" + ); + assert.equal( + cluster.masters.length, + MASTERS_COUNT, + `should have ${MASTERS_COUNT} masters at start` + ); + + // Trigger migration + await faultInjectorClient.triggerAction(MIGRATE_ACTION); + + // Verify notifications were received + const sMigratingEventCount = diagnosticEvents.filter( + (event) => event.type === "SMIGRATING" + ).length; + assert( + sMigratingEventCount >= 1, + "should have received at least one SMIGRATING notification" + ); + const sMigratedEventCount = diagnosticEvents.filter( + (event) => event.type === "SMIGRATED" + ).length; + assert( + sMigratedEventCount >= 1, + "should have received at least one SMIGRATED notification" + ); + + // Verify topology changed + assert.equal( + cluster.masters.length, + MASTERS_COUNT, + `should have ${MASTERS_COUNT} masters after migrate` + ); + + // Verify at least no master address changed + const currentMasterAddresses = new Set( + cluster.masters.map((m) => m.address) + ); + + assert.deepStrictEqual( + currentMasterAddresses, + initialMasterAddresses, + "addresses should NOT be the same" + ); + + // Verify data is still accessible after migrate + for (const key of KEYS) { + await cluster.set(key, `updated-${key}`); + const value = await cluster.get(key); + assert.strictEqual( + value, + `updated-${key}`, + `New writes should succeed for ${key}` + ); + } + }, + PROXIED_CLUSTER_OPTIONS + ); + + testUtils.testWithProxiedCluster( + "sharded pubsub - should handle migration", + async (cluster, faultInjectorClient) => { + const stats: Record = {}; + for (const channel of KEYS) { + stats[channel] = { sent: 0, received: 0 }; + } + + for (const channel of KEYS) { + await cluster.sSubscribe(channel, (_msg, ch) => { + stats[ch].received++; + }); + } + + // Start publishing messages continuously + const publishController = new AbortController(); + const publishPromise = (async () => { + while (!publishController.signal.aborted) { + for (const channel of KEYS) { + cluster + .sPublish(channel, `${Date.now()}`) + .then(() => { + stats[channel].sent++; + }) + .catch(() => { + // Ignore publish errors during migrate + }); + } + await setTimeout(50); + } + })(); + + // Trigger migration during publishing + await faultInjectorClient.triggerAction(MIGRATE_ACTION); + + // Stop publishing + publishController.abort(); + await publishPromise; + + for (const channel of KEYS) { + assert.ok( + stats[channel].received <= stats[channel].sent, + `Channel ${channel}: received (${stats[channel].received}) should be <= sent (${stats[channel].sent}) during migrate` + ); + } + + // Wait for cluster to stabilize + await setTimeout(1000); + + // Reset stats for after-migration verification + for (const channel of KEYS) { + stats[channel] = { sent: 0, received: 0 }; + } + + // Publish messages after migration - all should be received + for (const channel of KEYS) { + for (let i = 0; i < 10; i++) { + await cluster.sPublish(channel, `after-${i}`); + stats[channel].sent++; + } + } + + // Wait for messages to be received + await setTimeout(500); + + // Verify all messages received after migration (subscription preserved) + for (const channel of KEYS) { + assert.strictEqual( + stats[channel].received, + stats[channel].sent, + `Channel ${channel}: all messages (${stats[channel].sent}) should be received after migrate - subscription preserved` + ); + } + }, + PROXIED_CLUSTER_OPTIONS + ); + + testUtils.testWithProxiedCluster( + "pubsub - should handle migration", + async (cluster, faultInjectorClient) => { + const stats: Record = {}; + for (const channel of KEYS) { + stats[channel] = { sent: 0, received: 0 }; + } + + for (const channel of KEYS) { + await cluster.subscribe(channel, (_msg, ch) => { + stats[ch].received++; + }); + } + + // Start publishing messages continuously + const publishController = new AbortController(); + const publishPromise = (async () => { + while (!publishController.signal.aborted) { + for (const channel of KEYS) { + cluster + .publish(channel, `${Date.now()}`) + .then(() => { + stats[channel].sent++; + }) + .catch(() => { + // Ignore publish errors during migrate + }); + } + await setTimeout(50); + } + })(); + + // Trigger migration during publishing + await faultInjectorClient.triggerAction(MIGRATE_ACTION); + + // Stop publishing + publishController.abort(); + await publishPromise; + + for (const channel of KEYS) { + assert.ok( + stats[channel].received <= stats[channel].sent, + `Channel ${channel}: received (${stats[channel].received}) should be <= sent (${stats[channel].sent}) during migrate` + ); + } + + // Wait for cluster to stabilize + await setTimeout(1000); + + // Reset stats for after-migration verification + for (const channel of KEYS) { + stats[channel] = { sent: 0, received: 0 }; + } + + // Publish messages after migration - all should be received + for (const channel of KEYS) { + for (let i = 0; i < 10; i++) { + await cluster.publish(channel, `after-${i}`); + stats[channel].sent++; + } + } + + // Wait for messages to be received + await setTimeout(500); + + // Verify all messages received after migration (subscription preserved) + for (const channel of KEYS) { + assert.strictEqual( + stats[channel].received, + stats[channel].sent, + `Channel ${channel}: all messages (${stats[channel].sent}) should be received after migrate - subscription preserved` + ); + } + }, + PROXIED_CLUSTER_OPTIONS + ); + }); + + describe("Migrate - source: active -> dest: new", () => { + const MASTERS_NODES_COUNT = 3; + const VISIBLE_NODES_COUNT = 2; + const MIGRATE_ACTION = { + type: "migrate", + parameters: { + cluster_index: 0, + slot_migration: "half", + destination_type: "new", + }, + } as const; + + const PROXIED_CLUSTER_OPTIONS = { + numberOfMasters: MASTERS_NODES_COUNT, + startWithReducedNodes: true, + freshContainer: true, + clusterConfiguration: { + defaults: { + maintNotifications: "enabled", + maintEndpointType: "auto", + }, + RESP: 3, + }, + } as const; + + testUtils.testWithProxiedCluster( + "normal - should handle migration", + async (cluster, faultInjectorClient) => { + const initialMasterAddresses = new Set( + cluster.masters.map((m) => m.address) + ); + + assert.equal( + diagnosticEvents.length, + 0, + "should not have received any notifications yet" + ); + assert.equal( + cluster.masters.length, + VISIBLE_NODES_COUNT, + `should have ${VISIBLE_NODES_COUNT} masters at start` + ); + + // Trigger migration + await faultInjectorClient.triggerAction(MIGRATE_ACTION); + + // Wait for cluster to stabilize + await setTimeout(1000); + + // Verify notifications were received + const sMigratingEventCount = diagnosticEvents.filter( + (event) => event.type === "SMIGRATING" + ).length; + assert( + sMigratingEventCount >= 1, + "should have received at least one SMIGRATING notification" + ); + const sMigratedEventCount = diagnosticEvents.filter( + (event) => event.type === "SMIGRATED" + ).length; + assert( + sMigratedEventCount >= 1, + "should have received at least one SMIGRATED notification" + ); + + // Verify topology changed + assert.equal( + cluster.masters.length, + VISIBLE_NODES_COUNT + 1, + `should have ${VISIBLE_NODES_COUNT + 1} masters after migrate` + ); + + // Verify at least one master address changed + const currentMasterAddresses = new Set( + cluster.masters.map((m) => m.address) + ); + + assert.notDeepStrictEqual( + currentMasterAddresses, + initialMasterAddresses, + "addresses should NOT be the same" + ); + + // Verify data is still accessible after migrate + for (const key of KEYS) { + await cluster.set(key, `updated-${key}`); + const value = await cluster.get(key); + assert.strictEqual( + value, + `updated-${key}`, + `New writes should succeed for ${key}` + ); + } + }, + PROXIED_CLUSTER_OPTIONS + ); + + testUtils.testWithProxiedCluster( + "sharded pubsub - should handle migration", + async (cluster, faultInjectorClient) => { + const stats: Record = {}; + for (const channel of KEYS) { + stats[channel] = { sent: 0, received: 0 }; + } + + for (const channel of KEYS) { + await cluster.sSubscribe(channel, (_msg, ch) => { + stats[ch].received++; + }); + } + + // Start publishing messages continuously + const publishController = new AbortController(); + const publishPromise = (async () => { + while (!publishController.signal.aborted) { + for (const channel of KEYS) { + cluster + .sPublish(channel, `${Date.now()}`) + .then(() => { + stats[channel].sent++; + }) + .catch(() => { + // Ignore publish errors during migrate + }); + } + await setTimeout(50); + } + })(); + + // Trigger migration during publishing + await faultInjectorClient.triggerAction(MIGRATE_ACTION); + + // Stop publishing + publishController.abort(); + await publishPromise; + + for (const channel of KEYS) { + assert.ok( + stats[channel].received <= stats[channel].sent, + `Channel ${channel}: received (${stats[channel].received}) should be <= sent (${stats[channel].sent}) during migrate` + ); + } + + // Wait for cluster to stabilize + await setTimeout(1000); + + // Reset stats for after-migration verification + for (const channel of KEYS) { + stats[channel] = { sent: 0, received: 0 }; + } + + // Publish messages after migration - all should be received + for (const channel of KEYS) { + for (let i = 0; i < 10; i++) { + await cluster.sPublish(channel, `after-${i}`); + stats[channel].sent++; + } + } + + // Wait for messages to be received + await setTimeout(500); + + // Verify all messages received after migration (subscription preserved) + for (const channel of KEYS) { + assert.strictEqual( + stats[channel].received, + stats[channel].sent, + `Channel ${channel}: all messages (${stats[channel].sent}) should be received after migrate - subscription preserved` + ); + } + }, + PROXIED_CLUSTER_OPTIONS + ); + + testUtils.testWithProxiedCluster( + "pubsub - should handle migration", + async (cluster, faultInjectorClient) => { + const stats: Record = {}; + for (const channel of KEYS) { + stats[channel] = { sent: 0, received: 0 }; + } + + for (const channel of KEYS) { + await cluster.subscribe(channel, (_msg, ch) => { + stats[ch].received++; + }); + } + + // Start publishing messages continuously + const publishController = new AbortController(); + const publishPromise = (async () => { + while (!publishController.signal.aborted) { + for (const channel of KEYS) { + cluster + .publish(channel, `${Date.now()}`) + .then(() => { + stats[channel].sent++; + }) + .catch(() => { + // Ignore publish errors during migrate + }); + } + await setTimeout(50); + } + })(); + + // Trigger migration during publishing + await faultInjectorClient.triggerAction(MIGRATE_ACTION); + + // Stop publishing + publishController.abort(); + await publishPromise; + + for (const channel of KEYS) { + assert.ok( + stats[channel].received <= stats[channel].sent, + `Channel ${channel}: received (${stats[channel].received}) should be <= sent (${stats[channel].sent}) during migrate` + ); + } + + // Wait for cluster to stabilize + await setTimeout(1000); + + // Reset stats for after-migration verification + for (const channel of KEYS) { + stats[channel] = { sent: 0, received: 0 }; + } + + // Publish messages after migration - all should be received + for (const channel of KEYS) { + for (let i = 0; i < 10; i++) { + await cluster.publish(channel, `after-${i}`); + stats[channel].sent++; + } + } + + // Wait for messages to be received + await setTimeout(500); + + // Verify all messages received after migration (subscription preserved) + for (const channel of KEYS) { + assert.strictEqual( + stats[channel].received, + stats[channel].sent, + `Channel ${channel}: all messages (${stats[channel].sent}) should be received after migrate - subscription preserved` + ); + } + }, + PROXIED_CLUSTER_OPTIONS + ); + }); +}); diff --git a/packages/client/lib/tests/test-scenario/fault-injector-client.ts b/packages/client/lib/tests/test-scenario/fault-injector-client.ts index c03fa1afa1e..4fd35fa446c 100644 --- a/packages/client/lib/tests/test-scenario/fault-injector-client.ts +++ b/packages/client/lib/tests/test-scenario/fault-injector-client.ts @@ -1,5 +1,6 @@ import { setTimeout } from "node:timers/promises"; +// TODO remove types and utilize IFaultInjectorClient export type ActionType = | "dmc_restart" | "failover" @@ -54,20 +55,6 @@ export class FaultInjectorClient { return this.#request("POST", "/action", action); } - // public async printStatus() { - // const action = { - // type: 'execute_rladmin_command', - // parameters: { - // rladmin_command: "status", - // bdb_id: "1" - // } - // } - // const { action_id } = await this.#request<{action_id: string}>("POST", "/action", action); - // const status = await this.waitForAction(action_id); - // //@ts-ignore - // console.log(status.output.output); - // } - /** * Gets the status of a specific action. * @param actionId The ID of the action to check diff --git a/packages/test-utils/lib/dockers.ts b/packages/test-utils/lib/dockers.ts index 792e8792da1..2d75c451880 100644 --- a/packages/test-utils/lib/dockers.ts +++ b/packages/test-utils/lib/dockers.ts @@ -1,4 +1,5 @@ import { RedisClusterClientOptions } from '@redis/client/dist/lib/cluster'; +import { randomUUID } from 'node:crypto'; import { createConnection } from 'node:net'; import { once } from 'node:events'; import { createClient } from '@redis/client/index'; @@ -126,12 +127,16 @@ export interface ProxiedRedisServerDocker { export interface ProxiedRedisServerConfig { nOfProxies: number, defaultInterceptors: ('cluster'|'hitless'|'logger')[] + freshContainer?: boolean; } const RUNNING_PROXIED_SERVERS = new Map>(); export async function spawnProxiedRedisServer(config: ProxiedRedisServerConfig): Promise { - const key = JSON.stringify(config); + const key = JSON.stringify({ + ...config, + ...(config.freshContainer ? { randomKey: randomUUID() } : {}) + }); const runningServer = RUNNING_PROXIED_SERVERS.get(key); if (runningServer) { return runningServer; @@ -159,7 +164,8 @@ export async function spawnProxiedRedisServerDocker( "--network", "host", "-e", `LISTEN_PORT=${ports.join(',')}`, "-e", `API_PORT=${apiPort}`, - "-e", "TIEOUT=0", + "-e", "TIMEOUT=0", + "-e", "TARGET_HOST=0.0.0.0", "-e", `DEFAULT_INTERCEPTORS=${config.defaultInterceptors.join(',')}`, "-e", "ENABLE_LOGGING=true", "cae-resp-proxy-standalone" diff --git a/packages/test-utils/lib/fault-injector/db-config.ts b/packages/test-utils/lib/fault-injector/db-config.ts new file mode 100644 index 00000000000..321b6eef341 --- /dev/null +++ b/packages/test-utils/lib/fault-injector/db-config.ts @@ -0,0 +1,62 @@ +export const CreateDatabaseConfigType = { + CLUSTER: "cluster", +} as const; + +type ConfigType = + (typeof CreateDatabaseConfigType)[keyof typeof CreateDatabaseConfigType]; + +// 10000-19999 +const randomPort = () => Math.floor(Math.random() * (19999 - 10000) + 10000); + +const DB_CONFIGS = { + [CreateDatabaseConfigType.CLUSTER]: ( + name: string, + port: number = randomPort(), + size: number = 1073741824 // 1GB + ) => { + return { + name: name, + port: port, + memory_size: size, + replication: true, + eviction_policy: "noeviction", + sharding: true, + auto_upgrade: true, + shards_count: 3, + module_list: [ + { + module_args: "", + module_name: "ReJSON", + }, + { + module_args: "", + module_name: "search", + }, + { + module_args: "", + module_name: "timeseries", + }, + { + module_args: "", + module_name: "bf", + }, + ], + oss_cluster: true, + oss_cluster_api_preferred_ip_type: "external", + proxy_policy: "all-master-shards", + shards_placement: "sparse", + shard_key_regex: [ + { + regex: ".*\\{(?.*)\\}.*", + }, + { + regex: "(?.*)", + }, + ], + }; + }, +}; + +export const getCreateDatabaseConfig = (type: ConfigType, name: string) => { + return DB_CONFIGS[type](name); +}; diff --git a/packages/test-utils/lib/fault-injector/fault-injector-client.ts b/packages/test-utils/lib/fault-injector/fault-injector-client.ts new file mode 100644 index 00000000000..50f658da858 --- /dev/null +++ b/packages/test-utils/lib/fault-injector/fault-injector-client.ts @@ -0,0 +1,243 @@ +import { setTimeout } from "node:timers/promises"; + +import { + ActionRequest, + ActionStatus, + CreateDatabaseConfig, + IFaultInjectorClient, +} from "./types"; + +export class FaultInjectorClient implements IFaultInjectorClient { + private baseUrl: string; + #fetch: typeof fetch; + + constructor(baseUrl: string, fetchImpl: typeof fetch = fetch) { + this.baseUrl = baseUrl.replace(/\/+$/, ""); // trim trailing slash + this.#fetch = fetchImpl; + } + + /** + * Lists all available actions. + * @throws {Error} When the HTTP request fails or response cannot be parsed as JSON + */ + public listActions(): Promise { + return this.#request("GET", "/action"); + } + + /** + * Triggers a specific action. + * @param action The action request to trigger + * @throws {Error} When the HTTP request fails or response cannot be parsed as JSON + */ + public async triggerAction( + action: ActionRequest, + options?: { + timeoutMs?: number; + maxWaitTimeMs?: number; + } + ): Promise { + const { action_id } = await this.#request("POST", "/action", action); + + return this.waitForAction(action_id, options); + } + + /** + * Gets the status of a specific action. + * @param actionId The ID of the action to check + * @throws {Error} When the HTTP request fails or response cannot be parsed as JSON + */ + public getActionStatus(actionId: string): Promise { + return this.#request("GET", `/action/${actionId}`); + } + + /** + * Waits for an action to complete. + * @param actionId The ID of the action to wait for + * @param options Optional timeout and max wait time + * @throws {Error} When the action does not complete within the max wait time + */ + public async waitForAction( + actionId: string, + { + timeoutMs, + maxWaitTimeMs, + }: { + timeoutMs?: number; + maxWaitTimeMs?: number; + } = {} + ): Promise { + const timeout = timeoutMs || 1000; + const maxWaitTime = maxWaitTimeMs || 60000; + + const startTime = Date.now(); + + while (Date.now() - startTime < maxWaitTime) { + const action = await this.getActionStatus(actionId); + + if (action.status === "failed") { + throw new Error( + `Action id: ${actionId} failed! Error: ${action.error}` + ); + } + + if (["finished", "success"].includes(action.status)) { + return action; + } + + await setTimeout(timeout); + } + + throw new Error(`Timeout waiting for action ${actionId}`); + } + + async migrateAndBindAction({ + bdbId, + clusterIndex, + }: { + bdbId: string | number; + clusterIndex: string | number; + }) { + const bdbIdStr = bdbId.toString(); + const clusterIndexStr = clusterIndex.toString(); + + return this.triggerAction<{ + action_id: string; + }>({ + type: "sequence_of_actions", + parameters: { + bdbId: bdbIdStr, + actions: [ + { + type: "migrate", + params: { + cluster_index: clusterIndexStr, + bdb_id: bdbIdStr, + }, + }, + { + type: "bind", + params: { + cluster_index: clusterIndexStr, + bdb_id: bdbIdStr, + }, + }, + ], + }, + }); + } + + async #request( + method: string, + path: string, + body?: Object | string + ): Promise { + const url = `${this.baseUrl}${path}`; + const headers: Record = { + "Content-Type": "application/json", + }; + + let payload: string | undefined; + + if (body) { + if (typeof body === "string") { + headers["Content-Type"] = "text/plain"; + payload = body; + } else { + headers["Content-Type"] = "application/json"; + payload = JSON.stringify(body); + } + } + + const response = await this.#fetch(url, { method, headers, body: payload }); + + if (!response.ok) { + try { + const text = await response.text(); + throw new Error(`HTTP ${response.status} - ${text}`); + } catch { + throw new Error(`HTTP ${response.status}`); + } + } + + try { + return (await response.json()) as T; + } catch { + throw new Error( + `HTTP ${response.status} - Unable to parse response as JSON` + ); + } + } + + /** + * Deletes a database. + * @param clusterIndex The index of the cluster + * @param bdbId The database ID + * @throws {Error} When the HTTP request fails or response cannot be parsed as JSON + */ + public async deleteDatabase( + bdbId: number | string, + clusterIndex: number = 0 + ) { + return this.triggerAction({ + type: "delete_database", + parameters: { + cluster_index: clusterIndex, + bdb_id: bdbId.toString(), + }, + }); + } + + /** + * Deletes all databases. + * @param clusterIndex The index of the cluster + * @throws {Error} When the HTTP request fails or response cannot be parsed as JSON + */ + public async deleteAllDatabases(clusterIndex: number = 0) { + return this.triggerAction({ + type: "delete_database", + parameters: { + cluster_index: clusterIndex, + delete_all: true, + }, + }); + } + + /** + * Creates a new database. + * @param clusterIndex The index of the cluster + * @param databaseConfig The database configuration + * @throws {Error} When the HTTP request fails or response cannot be parsed as JSON + */ + public async createDatabase( + databaseConfig: CreateDatabaseConfig, + clusterIndex: number = 0 + ) { + const action = await this.triggerAction({ + type: "create_database", + parameters: { + cluster_index: clusterIndex, + database_config: databaseConfig, + }, + }); + + const dbConfig = + typeof action.output === "object" + ? action.output + : JSON.parse(action.output); + + const rawEndpoints = dbConfig.raw_endpoints[0]; + + if (!rawEndpoints) { + throw new Error("No endpoints found in database config"); + } + + return { + host: rawEndpoints.dns_name, + port: rawEndpoints.port, + password: dbConfig.password, + username: dbConfig.username, + tls: dbConfig.tls, + bdbId: dbConfig.bdb_id, + }; + } +} diff --git a/packages/test-utils/lib/fault-injector/index.ts b/packages/test-utils/lib/fault-injector/index.ts new file mode 100644 index 00000000000..4eae755de8e --- /dev/null +++ b/packages/test-utils/lib/fault-injector/index.ts @@ -0,0 +1,4 @@ +export * from "./fault-injector-client"; +export * from "./proxied-fault-injector-cluster"; +export * from "./types"; +export * from "./db-config"; diff --git a/packages/test-utils/lib/fault-injector/proxied-fault-injector-cluster.ts b/packages/test-utils/lib/fault-injector/proxied-fault-injector-cluster.ts new file mode 100644 index 00000000000..cc2247673a0 --- /dev/null +++ b/packages/test-utils/lib/fault-injector/proxied-fault-injector-cluster.ts @@ -0,0 +1,235 @@ +import { setTimeout } from "node:timers/promises"; + +import { ActionRequest, ActionStatus, IFaultInjectorClient } from "./types"; +import ProxyController from "../proxy/proxy-controller"; + +const TOTAL_SLOTS = 16384; + +interface MigrateParameters { + /** 'all' = source loses ALL its slots, 'half' = source loses half its slots */ + slot_migration: "all" | "half"; + + /** 'existing' = target node visible in CLUSTER SLOTS, 'new' = hidden node */ + destination_type: "existing" | "new"; +} + +export class ProxiedFaultInjectorClientForCluster + implements IFaultInjectorClient +{ + constructor(private readonly proxyController: ProxyController) {} + + async triggerAction(action: ActionRequest): Promise { + switch (action.type) { + case "migrate": { + return this.triggerMigrate( + action.parameters as unknown as MigrateParameters + ); + } + default: + throw new Error(`Action ${action.type} not implemented`); + } + } + + async getProxyNodes() { + const nodes = await this.proxyController.getNodes(); + return nodes.ids.map(parseNodeId); + } + + async updateClusterSlots(nodes: ProxyNode[]) { + return this.proxyController.addInterceptor( + "cluster", + "*2\r\n$7\r\ncluster\r\n$5\r\nslots\r\n", + buildClusterSlotsResponse(nodes, "raw"), + "raw" + ); + } + + /** + * Simulates a Redis cluster slot migration by sending SMIGRATING and SMIGRATED + * push notifications to the client. + * + * The migration process: + * 1. Retrieves cluster nodes and their connections + * 2. Calculates the slot range to migrate based on `slotMigration` parameter: + * - 'all': migrates all slots from the first node (0 to totalSlots/N - 1) + * - 'half': migrates half the slots from the first node (0 to totalSlots/N/2 - 1) + * 3. Sends SMIGRATING notification to the source node's first connection + * 4. Waits 2 seconds to simulate maintenance delay + * 5. Determines destination node based on `destinationType` parameter: + * - 'new': uses a node with no active connections (hidden node) + * - 'existing': uses the last node in the cluster (visible in CLUSTER SLOTS) + * 6. Sends SMIGRATED notification to complete the migration + * + * @param params - Migration configuration parameters + * @param params.slot_migration - Determines how many slots to migrate: 'all' or 'half' + * @param params.destination_type - Determines target node type: 'existing' or 'new' + * @private + */ + private async triggerMigrate( + params: MigrateParameters + ): Promise { + const [nodes, connections] = await Promise.all([ + this.getProxyNodes(), + this.proxyController.getConnections(), + ]); + + /** + * This is an assumption that there is at possibility of at least one node with no connections + * However for this to work correctly the cluster client should NOT be using the `minimizeConnections` + */ + const nodeWithoutConnections = nodes.find((node) => { + const connIds = connections[node.id]; + return !connIds || connIds.length === 0; + }); + + const visibleNodesCount = nodeWithoutConnections + ? nodes.length - 1 + : nodes.length; + + const shouldMigrateHalfSlots = params.slot_migration === "half"; + const slots = shouldMigrateHalfSlots + ? `0-${Math.floor(TOTAL_SLOTS / visibleNodesCount / 2) - 1}` + : `0-${Math.floor(TOTAL_SLOTS / visibleNodesCount) - 1}`; + + const sMigratingNotification = buildSMigratingNotification(slots); + + const sourceNode = nodes[0]; + + const migratingResult = await this.proxyController.sendToClient( + connections[sourceNode.id][0], + sMigratingNotification + ); + + if (!migratingResult.success) { + throw new Error( + `[Proxy] Failed to send SMIGRATING notification: ${migratingResult.error}` + ); + } + + // 2. Simulate maintenance delay + await setTimeout(2_000); + + const isNewDestination = params.destination_type === "new"; + + if (isNewDestination && !nodeWithoutConnections) { + throw new Error(`[Proxy] No node with no connections`); + } + + const destinationNode = isNewDestination + ? nodeWithoutConnections! + : nodes.at(-1)!; // Get the last node as the destination + + if (!destinationNode) { + throw new Error(`[Proxy] No destination node`); + } + + const sMigratedNotification = buildSMigratedNotification([ + { + targetNode: destinationNode, + slotRanges: slots, + }, + ]); + + const migratedResult = await this.proxyController.sendToClient( + connections[sourceNode.id][0], + sMigratedNotification + ); + + if (!migratedResult.success) { + throw new Error( + `[Proxy] Failed to send SMIGRATED notification: ${migratedResult.error}` + ); + } + + return { + status: "success", + error: null, + output: "Migration completed!", + }; + } +} + +interface ProxyNode { + id: string; + host: string; + port: number; + proxyPort: number; +} + +const parseNodeId = (id: string): ProxyNode => { + const [rest, port] = id.split("@"); + const [host, proxyPort] = rest.split(":"); + + return { + id, + host, + port: Number(port), + proxyPort: Number(proxyPort), + }; +}; + +const buildSMigratedNotification = ( + movedSlotsByDestination: Array<{ + targetNode: { host: string; port: number }; + slotRanges: string; // e.g., "0-5460" or "0-100,200-300,500" + }>, + seqId: number = 1, + encoding: "base64" | "raw" = "base64" +): string => { + if (movedSlotsByDestination.length === 0) { + throw new Error("No slots to migrate"); + } + + const entries = movedSlotsByDestination.map(({ targetNode, slotRanges }) => { + const hostPort = `${targetNode.host}:${targetNode.port}`; + return `*2\r\n+${hostPort}\r\n+${slotRanges}\r\n`; + }); + + const response = `>3\r\n+SMIGRATED\r\n:${seqId}\r\n*${ + movedSlotsByDestination.length + }\r\n${entries.join("")}`; + + return encoding === "raw" + ? response + : Buffer.from(response).toString(encoding); +}; + +const buildClusterSlotsResponse = ( + nodes: ProxyNode[], + encoding: "base64" | "raw" = "base64" +): string => { + const TOTAL_SLOTS = 16384; + const slotSize = Math.floor(TOTAL_SLOTS / nodes.length); + + let current = -1; + const slots = nodes.map((node, i) => { + const from = current + 1; + const to = i === nodes.length - 1 ? 16383 : current + slotSize; + current = to; + + // Use proxyPort as the port clients connect to + const host = node.host; + const port = node.port; + const nodeId = `proxy-${port}`; + + return `*3\r\n:${from}\r\n:${to}\r\n*3\r\n$${host.length}\r\n${host}\r\n:${port}\r\n$${nodeId.length}\r\n${nodeId}\r\n`; + }); + + return encoding === "raw" + ? `*${nodes.length}\r\n${slots.join("")}` + : Buffer.from(`*${nodes.length}\r\n${slots.join("")}`).toString(encoding); +}; + +const buildSMigratingNotification = ( + slotRanges: string, + seqId: number = 1, + encoding: "base64" | "raw" = "base64" +) => { + const response = `>3\r\n+SMIGRATING\r\n:${seqId}\r\n+${slotRanges}\r\n`; + + if (encoding === "raw") { + return response; + } + + return Buffer.from(response).toString(encoding); +}; diff --git a/packages/test-utils/lib/fault-injector/types.ts b/packages/test-utils/lib/fault-injector/types.ts new file mode 100644 index 00000000000..f77662f1e1a --- /dev/null +++ b/packages/test-utils/lib/fault-injector/types.ts @@ -0,0 +1,74 @@ +export type ActionType = + | "dmc_restart" + | "failover" + | "reshard" + | "sequence_of_actions" + | "network_failure" + | "execute_rlutil_command" + | "execute_rladmin_command" + | "migrate" + | "bind" + | "update_cluster_config" + | "shard_failure" + | "node_failure" + | "node_remove" + | "proxy_failure" + | "cluster_failure" + | "delete_database" + | "create_database"; + +export interface ActionRequest { + type: ActionType; + parameters?: { + bdb_id?: string; + [key: string]: unknown; + }; +} + +export interface ActionStatus { + status: string; + error: unknown; + output: string; +} + +export interface DatabaseModule { + module_name: string; + // Add additional module properties as needed based on your Go struct + [key: string]: unknown; +} + +export interface ShardKeyRegexPattern { + regex: string; + // Add additional pattern properties as needed based on your Go struct + [key: string]: unknown; +} + +export interface CreateDatabaseConfig { + name: string; + port: number; + memory_size: number; + replication: boolean; + eviction_policy: string; + sharding: boolean; + auto_upgrade: boolean; + shards_count: number; + module_list?: DatabaseModule[]; + oss_cluster: boolean; + oss_cluster_api_preferred_ip_type?: string; + proxy_policy?: string; + shards_placement?: string; + shard_key_regex?: ShardKeyRegexPattern[]; +} + +export interface DatabaseConfig { + host: string; + port: number; + password: string; + username: string; + tls: boolean; + bdbId: number; +} + +export interface IFaultInjectorClient { + triggerAction(action: ActionRequest): Promise; +} diff --git a/packages/test-utils/lib/index.ts b/packages/test-utils/lib/index.ts index 8475c62d975..e085945a37e 100644 --- a/packages/test-utils/lib/index.ts +++ b/packages/test-utils/lib/index.ts @@ -28,7 +28,15 @@ import * as os from 'node:os'; import * as path from 'node:path'; import { RedisProxy, getFreePortNumber } from './proxy/redis-proxy'; import ProxyController from './proxy/proxy-controller'; - +import { + CreateDatabaseConfig, + CreateDatabaseConfigType, + DatabaseConfig, + FaultInjectorClient, + getCreateDatabaseConfig, + IFaultInjectorClient, + ProxiedFaultInjectorClientForCluster, +} from "./fault-injector"; interface TestUtilsConfig { /** @@ -121,6 +129,24 @@ interface ClusterTestOptions< disableClusterSetup?: boolean; } +interface RETestOptions { + skipTest?: boolean; + testTimeout?: number; + dbConfig?: CreateDatabaseConfig; + dbName?: string; +} + +interface REClusterTestOptions< + M extends RedisModules, + F extends RedisFunctions, + S extends RedisScripts, + RESP extends RespVersions, + TYPE_MAPPING extends TypeMapping + // POLICIES extends CommandPolicies +> extends RETestOptions { + clusterConfiguration?: Partial>; +} + interface AllTestOptions< M extends RedisModules, F extends RedisFunctions, @@ -309,40 +335,65 @@ export default class TestUtils { TYPE_MAPPING extends TypeMapping = {} >( title: string, - fn: (proxiedClusterClient: RedisClusterType, proxyController: ProxyController) => unknown, - options: Omit, 'numberOfReplicas' | 'minimumDockerVersion' | 'serverArguments'> + fn: ( + proxiedClusterClient: RedisClusterType, + faultInjectorClient: IFaultInjectorClient + ) => unknown, + options: Omit< + ClusterTestOptions, + "numberOfReplicas" | "minimumDockerVersion" | "serverArguments" + > & { + startWithReducedNodes?: boolean; + freshContainer?: boolean; + } ) { let spawnPromise: ReturnType; before(function () { this.timeout(30000); - spawnPromise = spawnProxiedRedisServer({ nOfProxies: options.numberOfMasters ?? 3, defaultInterceptors: ['cluster', 'hitless'] }); + spawnPromise = spawnProxiedRedisServer({ + nOfProxies: options.numberOfMasters ?? 3, + defaultInterceptors: ["cluster", "hitless", "logger"], + freshContainer: options.freshContainer, + }); }); it(title, async function () { if (!spawnPromise) return this.skip(); - const { ports, apiPort } = await spawnPromise; + const { apiPort } = await spawnPromise; + + const proxyController = new ProxyController( + `http://localhost:${apiPort}` + ); + + const faultInjectorClient = new ProxiedFaultInjectorClientForCluster( + proxyController + ); + + const nodes = await faultInjectorClient.getProxyNodes(); + + if (options.startWithReducedNodes) { + nodes.pop(); + await faultInjectorClient.updateClusterSlots(nodes); + } const cluster = createCluster({ - rootNodes: ports.map(port => ({ + rootNodes: nodes.map((n) => ({ socket: { - port - } + port: n.port, + }, })), - minimizeConnections: options.clusterConfiguration?.minimizeConnections ?? true, - ...options.clusterConfiguration + ...options.clusterConfiguration, }); - const proxyController = new ProxyController(`http://localhost:${apiPort}`) - - if(options.disableClusterSetup) { - return fn(cluster, proxyController); + if (options.disableClusterSetup) { + return fn(cluster, faultInjectorClient); } await cluster.connect(); try { await TestUtils.#clusterFlushAll(cluster); - await fn(cluster, proxyController); + await fn(cluster, faultInjectorClient); } finally { await TestUtils.#clusterFlushAll(cluster); cluster.destroy(); @@ -642,6 +693,91 @@ export default class TestUtils { this.testWithCluster(`cluster.${title}`, fn, options.cluster); } + testWithRECluster< + M extends RedisModules = {}, + F extends RedisFunctions = {}, + S extends RedisScripts = {}, + RESP extends RespVersions = 2, + TYPE_MAPPING extends TypeMapping = {} + >( + title: string, + fn: ( + cluster: RedisClusterType, + { + faultInjectorClient, + }: { + faultInjectorClient: IFaultInjectorClient; + } + ) => unknown, + options: REClusterTestOptions + ) { + + let faultInjectorClient: FaultInjectorClient; + let dbConfig: DatabaseConfig; + + before(async function () { + this.timeout(30000); + + const baseUrl = process.env.RE_FAULT_INJECTOR_URL; + + if (!baseUrl) { + throw new Error("RE_FAULT_INJECTOR_URL environment variable must be set"); + } + + faultInjectorClient = new FaultInjectorClient(baseUrl); + + await faultInjectorClient.deleteAllDatabases(0); + dbConfig = await faultInjectorClient.createDatabase( + options.dbConfig || + getCreateDatabaseConfig( + CreateDatabaseConfigType.CLUSTER, + options.dbName ?? `test-db-${Date.now()}` + ), + 0 + ); + }); + + after(async function () { + this.timeout(30000); + + await faultInjectorClient.deleteAllDatabases(0); + }); + + it(title, async function () { + if (options.skipTest) return this.skip(); + if (options.testTimeout) { + this.timeout(options.testTimeout); + } + + const { defaults, ...rest } = options.clusterConfiguration ?? {}; + + const cluster = createCluster({ + rootNodes: [ + { + socket: { + host: dbConfig.host, + port: dbConfig.port, + }, + }, + ], + defaults: { + password: dbConfig.password, + username: dbConfig.username, + ...defaults, + }, + ...rest, + }); + + await cluster.connect(); + + try { + await TestUtils.#clusterFlushAll(cluster); + await fn(cluster, { faultInjectorClient }); + } finally { + cluster.destroy(); + } + }); + } spawnRedisServer< M extends RedisModules = {},