From 8f7de91369030c14eed926b7f0ac320c3da3c847 Mon Sep 17 00:00:00 2001 From: Pavel Pashov Date: Wed, 10 Dec 2025 16:32:59 +0200 Subject: [PATCH 1/5] fix: access private queue through _self proxy and guard client close calls --- packages/client/lib/client/commands-queue.ts | 4 ++++ packages/client/lib/client/index.ts | 2 +- packages/client/lib/cluster/cluster-slots.ts | 8 ++++++-- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 9af4550691..15afb859a4 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -591,6 +591,10 @@ export default class RedisCommandsQueue { * Prepends commands to the write queue in reverse. */ prependCommandsToWrite(commands: CommandToWrite[]) { + if (!commands.length) { + return; + } + for (let i = commands.length - 1; i <= 0; i--) { this.#toWrite.unshift(commands[i]); } diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index 9ad044d0a1..faf345b414 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -1018,7 +1018,7 @@ export default class RedisClient< * @internal */ _getQueue(): RedisCommandsQueue { - return this.#queue; + return this._self.#queue; } /** diff --git a/packages/client/lib/cluster/cluster-slots.ts b/packages/client/lib/cluster/cluster-slots.ts index b803625c22..00351d6728 100644 --- a/packages/client/lib/cluster/cluster-slots.ts +++ b/packages/client/lib/cluster/cluster-slots.ts @@ -371,9 +371,13 @@ export default class RedisClusterSlots< this.nodeByAddress.delete(sourceAddress); // 4.3 Kill because no slots are pointing to it anymore - await sourceNode.client?.close() + if (sourceNode.client?.isOpen) { + await sourceNode.client?.close() + } if('pubSub' in sourceNode) { - await sourceNode.pubSub?.client.close(); + if (sourceNode.pubSub?.client.isOpen) { + await sourceNode.pubSub?.client.close(); + } } } From 7958e7b0c9b9dc10d78910bbc9e4e2fbfe7f35a7 Mon Sep 17 00:00:00 2001 From: Pavel Pashov Date: Wed, 10 Dec 2025 17:50:26 +0200 Subject: [PATCH 2/5] test(cluster): add fault injector infrastructure for hitless upgrade testing --- .../client/lib/cluster/maintenance.spec.ts | 91 +++++++++ .../test-scenario/fault-injector-client.ts | 15 +- packages/test-utils/lib/dockers.ts | 3 +- .../fault-injector/fault-injector-client.ts | 159 +++++++++++++++ .../test-utils/lib/fault-injector/index.ts | 2 + .../proxied-fault-injector-cluster.ts | 191 ++++++++++++++++++ .../test-utils/lib/fault-injector/types.ts | 49 +++++ packages/test-utils/lib/index.ts | 43 ++-- 8 files changed, 526 insertions(+), 27 deletions(-) create mode 100644 packages/client/lib/cluster/maintenance.spec.ts create mode 100644 packages/test-utils/lib/fault-injector/fault-injector-client.ts create mode 100644 packages/test-utils/lib/fault-injector/index.ts create mode 100644 packages/test-utils/lib/fault-injector/proxied-fault-injector-cluster.ts create mode 100644 packages/test-utils/lib/fault-injector/types.ts diff --git a/packages/client/lib/cluster/maintenance.spec.ts b/packages/client/lib/cluster/maintenance.spec.ts new file mode 100644 index 0000000000..29d538e43a --- /dev/null +++ b/packages/client/lib/cluster/maintenance.spec.ts @@ -0,0 +1,91 @@ +import assert from "node:assert"; +import diagnostics_channel from "node:diagnostics_channel"; + +import testUtils from "../test-utils"; +import { DiagnosticsEvent } from "../client/enterprise-maintenance-manager"; + +describe("Cluster Maintenance", () => { + const MASTERS_COUNT = 3; + + before(() => { + process.env.REDIS_EMIT_DIAGNOSTICS = "1"; + }); + + after(() => { + delete process.env.REDIS_EMIT_DIAGNOSTICS; + }); + + let diagnosticEvents: DiagnosticsEvent[] = []; + + const onMessage = (message: unknown) => { + const event = message as DiagnosticsEvent; + if (["SMIGRATING", "SMIGRATED"].includes(event.type)) { + diagnosticEvents.push(event); + } + }; + + beforeEach(() => { + diagnostics_channel.subscribe("redis.maintenance", onMessage); + diagnosticEvents = []; + }); + + afterEach(() => { + diagnostics_channel.unsubscribe("redis.maintenance", onMessage); + }); + + testUtils.testWithProxiedCluster( + "should handle failover", + async (cluster, { faultInjectorClient }) => { + assert.equal( + diagnosticEvents.length, + 0, + "should not have received any notifications yet" + ); + assert.equal( + cluster.masters.length, + MASTERS_COUNT, + `should have ${MASTERS_COUNT} masters at start` + ); + + const { action_id: failoverActionId } = + await faultInjectorClient.triggerAction({ + type: "failover", + parameters: { + cluster_index: 0, + }, + }); + + await faultInjectorClient.waitForAction(failoverActionId); + + const sMigratingEventCount = diagnosticEvents.filter( + (event) => event.type === "SMIGRATING" + ).length; + assert( + sMigratingEventCount >= 1, + "should have received at least one SMIGRATING notification" + ); + const sMigratedEventCount = diagnosticEvents.filter( + (event) => event.type === "SMIGRATED" + ).length; + assert( + sMigratedEventCount >= 1, + "should have received at least one SMIGRATED notification" + ); + assert.equal( + cluster.masters.length, + MASTERS_COUNT - 1, + `should have ${MASTERS_COUNT - 1} masters after failover` + ); + }, + { + numberOfMasters: MASTERS_COUNT, + clusterConfiguration: { + defaults: { + maintNotifications: "enabled", + maintEndpointType: "auto", + }, + RESP: 3, + }, + } + ); +}); diff --git a/packages/client/lib/tests/test-scenario/fault-injector-client.ts b/packages/client/lib/tests/test-scenario/fault-injector-client.ts index c03fa1afa1..4fd35fa446 100644 --- a/packages/client/lib/tests/test-scenario/fault-injector-client.ts +++ b/packages/client/lib/tests/test-scenario/fault-injector-client.ts @@ -1,5 +1,6 @@ import { setTimeout } from "node:timers/promises"; +// TODO remove types and utilize IFaultInjectorClient export type ActionType = | "dmc_restart" | "failover" @@ -54,20 +55,6 @@ export class FaultInjectorClient { return this.#request("POST", "/action", action); } - // public async printStatus() { - // const action = { - // type: 'execute_rladmin_command', - // parameters: { - // rladmin_command: "status", - // bdb_id: "1" - // } - // } - // const { action_id } = await this.#request<{action_id: string}>("POST", "/action", action); - // const status = await this.waitForAction(action_id); - // //@ts-ignore - // console.log(status.output.output); - // } - /** * Gets the status of a specific action. * @param actionId The ID of the action to check diff --git a/packages/test-utils/lib/dockers.ts b/packages/test-utils/lib/dockers.ts index 792e8792da..0de3601dd3 100644 --- a/packages/test-utils/lib/dockers.ts +++ b/packages/test-utils/lib/dockers.ts @@ -159,7 +159,8 @@ export async function spawnProxiedRedisServerDocker( "--network", "host", "-e", `LISTEN_PORT=${ports.join(',')}`, "-e", `API_PORT=${apiPort}`, - "-e", "TIEOUT=0", + "-e", "TIMEOUT=0", + "-e", "TARGET_HOST=0.0.0.0", "-e", `DEFAULT_INTERCEPTORS=${config.defaultInterceptors.join(',')}`, "-e", "ENABLE_LOGGING=true", "cae-resp-proxy-standalone" diff --git a/packages/test-utils/lib/fault-injector/fault-injector-client.ts b/packages/test-utils/lib/fault-injector/fault-injector-client.ts new file mode 100644 index 0000000000..38a0d83274 --- /dev/null +++ b/packages/test-utils/lib/fault-injector/fault-injector-client.ts @@ -0,0 +1,159 @@ +import { setTimeout } from "node:timers/promises"; + +import { ActionRequest, ActionStatus, IFaultInjectorClient } from "./types"; + +export class FaultInjectorClient implements IFaultInjectorClient { + private baseUrl: string; + #fetch: typeof fetch; + + constructor(baseUrl: string, fetchImpl: typeof fetch = fetch) { + this.baseUrl = baseUrl.replace(/\/+$/, ""); // trim trailing slash + this.#fetch = fetchImpl; + } + + /** + * Lists all available actions. + * @throws {Error} When the HTTP request fails or response cannot be parsed as JSON + */ + public listActions(): Promise { + return this.#request("GET", "/action"); + } + + /** + * Triggers a specific action. + * @param action The action request to trigger + * @throws {Error} When the HTTP request fails or response cannot be parsed as JSON + */ + public triggerAction( + action: ActionRequest + ): Promise { + return this.#request("POST", "/action", action); + } + + /** + * Gets the status of a specific action. + * @param actionId The ID of the action to check + * @throws {Error} When the HTTP request fails or response cannot be parsed as JSON + */ + public getActionStatus(actionId: string): Promise { + return this.#request("GET", `/action/${actionId}`); + } + + /** + * Waits for an action to complete. + * @param actionId The ID of the action to wait for + * @param options Optional timeout and max wait time + * @throws {Error} When the action does not complete within the max wait time + */ + public async waitForAction( + actionId: string, + { + timeoutMs, + maxWaitTimeMs, + }: { + timeoutMs?: number; + maxWaitTimeMs?: number; + } = {} + ): Promise { + const timeout = timeoutMs || 1000; + const maxWaitTime = maxWaitTimeMs || 60000; + + const startTime = Date.now(); + + while (Date.now() - startTime < maxWaitTime) { + const action = await this.getActionStatus(actionId); + + if (action.status === "failed") { + throw new Error( + `Action id: ${actionId} failed! Error: ${action.error}` + ); + } + + if (["finished", "success"].includes(action.status)) { + return action; + } + + await setTimeout(timeout); + } + + throw new Error(`Timeout waiting for action ${actionId}`); + } + + async migrateAndBindAction({ + bdbId, + clusterIndex, + }: { + bdbId: string | number; + clusterIndex: string | number; + }) { + const bdbIdStr = bdbId.toString(); + const clusterIndexStr = clusterIndex.toString(); + + return this.triggerAction<{ + action_id: string; + }>({ + type: "sequence_of_actions", + parameters: { + bdbId: bdbIdStr, + actions: [ + { + type: "migrate", + params: { + cluster_index: clusterIndexStr, + bdb_id: bdbIdStr, + }, + }, + { + type: "bind", + params: { + cluster_index: clusterIndexStr, + bdb_id: bdbIdStr, + }, + }, + ], + }, + }); + } + + async #request( + method: string, + path: string, + body?: Object | string + ): Promise { + const url = `${this.baseUrl}${path}`; + const headers: Record = { + "Content-Type": "application/json", + }; + + let payload: string | undefined; + + if (body) { + if (typeof body === "string") { + headers["Content-Type"] = "text/plain"; + payload = body; + } else { + headers["Content-Type"] = "application/json"; + payload = JSON.stringify(body); + } + } + + const response = await this.#fetch(url, { method, headers, body: payload }); + + if (!response.ok) { + try { + const text = await response.text(); + throw new Error(`HTTP ${response.status} - ${text}`); + } catch { + throw new Error(`HTTP ${response.status}`); + } + } + + try { + return (await response.json()) as T; + } catch { + throw new Error( + `HTTP ${response.status} - Unable to parse response as JSON` + ); + } + } +} diff --git a/packages/test-utils/lib/fault-injector/index.ts b/packages/test-utils/lib/fault-injector/index.ts new file mode 100644 index 0000000000..a22c8b0f1e --- /dev/null +++ b/packages/test-utils/lib/fault-injector/index.ts @@ -0,0 +1,2 @@ +export * from "./fault-injector-client"; +export * from "./proxied-fault-injector-cluster"; diff --git a/packages/test-utils/lib/fault-injector/proxied-fault-injector-cluster.ts b/packages/test-utils/lib/fault-injector/proxied-fault-injector-cluster.ts new file mode 100644 index 0000000000..1fe686a121 --- /dev/null +++ b/packages/test-utils/lib/fault-injector/proxied-fault-injector-cluster.ts @@ -0,0 +1,191 @@ +import { randomUUID } from "node:crypto"; +import { setTimeout } from "node:timers/promises"; + +import { ActionRequest, ActionStatus, IFaultInjectorClient } from "./types"; +import ProxyController from "../proxy/proxy-controller"; + +const TOTAL_SLOTS = 16384; + +export class ProxiedFaultInjectorClientForCluster + implements IFaultInjectorClient +{ + private readonly results = new Map(); + + constructor(private readonly proxyController: ProxyController) {} + + async triggerAction( + action: ActionRequest + ): Promise { + const actionId = randomUUID(); + + switch (action.type) { + case "failover": { + const result = await this.triggerFailover(); + this.results.set(actionId, result); + break; + } + default: + throw new Error(`Action ${action.type} not implemented`); + } + + return { action_id: actionId } as T; + } + + async getActionStatus(actionId: string): Promise { + throw new Error("Not implemented"); + } + + async waitForAction( + actionId: string, + _options?: { timeoutMs?: number; maxWaitTimeMs?: number } + ): Promise { + const result = this.results.get(actionId); + + if (!result) { + throw new Error(`Action ${actionId} not found`); + } + + return result; + } + + async migrateAndBindAction({ + bdbId, + clusterIndex, + }: { + bdbId: string | number; + clusterIndex: string | number; + }): Promise<{ action_id: string }> { + throw new Error("Not implemented"); + } + + /** + * Simulates a Redis Enterprise hitless failover by sending SMIGRATING and SMIGRATED + * push notifications to the client. The migrating slot range is calculated as 1/N of total + * slots (where N = number of master nodes) to simulate removing one node from the cluster. + * + * @returns Promise resolving to ActionStatus on success + * @throws Error if notification delivery fails + */ + private async triggerFailover(): Promise { + const nodes = await this.proxyController.getNodes(); + const nodeId = nodes.ids[0]; + const connections = await this.proxyController.getConnections(); + const connectionIds = connections[nodeId]; + + // 1. Calculate migrating slot range and send SMIGRATING notification + const slots = `0-${Math.floor(TOTAL_SLOTS / nodes.ids.length)}`; + const seqId = Date.now().toString(); + const sMigratingNotification = Buffer.from( + `>3\r\n+SMIGRATING\r\n:${seqId}\r\n+${slots}\r\n` + ).toString("base64"); + + const migratingResult = await this.proxyController.sendToClient( + connectionIds[0], + sMigratingNotification, + "base64" + ); + + if (!migratingResult.success) { + throw new Error( + `[Proxy] Failed to send SMIGRATING notification: ${migratingResult.error}` + ); + } + + // 2. Simulate maintenance delay + await setTimeout(2_000); + + // 3. Calculate remaining nodes, build and send SMIGRATED notification + const remainingNodes = nodes.ids.slice(1).map(parseNodeId); + const sMigratedNotification = + buildSMigratedNotification(remainingNodes).toString("base64"); + + const migratedResult = await this.proxyController.sendToClient( + connectionIds[0], + sMigratedNotification, + "base64" + ); + + if (!migratedResult.success) { + throw new Error( + `[Proxy] Failed to send SMIGRATED notification: ${migratedResult.error}` + ); + } + + return { + status: "success", + error: null, + output: "Failover completed!", + }; + } +} + +// TODO Check if we need this +// function buildClusterSlotsResponse(nodes: ProxyNode[]): Buffer { +// if (nodes.length === 0) { +// return Buffer.from("*0\r\n"); +// } + +// const totalSlots = 16384; +// const slotLength = Math.floor(totalSlots / nodes.length); + +// let current = -1; +// const mapping = nodes.map((node, i) => { +// const from = current + 1; +// const to = i === nodes.length - 1 ? 16383 : current + slotLength; +// current = to; +// const id = `proxy-id-${node.port}`; +// return `*3\r\n:${from}\r\n:${to}\r\n*3\r\n$${node.host.length}\r\n${node.host}\r\n:${node.port}\r\n$${id.length}\r\n${id}\r\n`; +// }); + +// const response = `*${nodes.length}\r\n${mapping.join("")}`; +// return Buffer.from(response); +// } + +interface ProxyNode { + id: string; + host: string; + port: number; + proxyPort: number; +} + +const parseNodeId = (id: string): ProxyNode => { + const [rest, port] = id.split("@"); + const [host, proxyPort] = rest.split(":"); + + return { + id, + host, + port: Number(port), + proxyPort: Number(proxyPort), + }; +}; + +function buildSMigratedNotification( + nodes: ProxyNode[], + seqId: number = Date.now() +): Buffer { + if (nodes.length === 0) { + return Buffer.from(`>3\r\n+SMIGRATED\r\n:${seqId}\r\n*0\r\n`); + } + + const slotLength = Math.floor(TOTAL_SLOTS / nodes.length); + + let current = -1; + const nodeEntries = nodes.map((node, i) => { + const from = current + 1; + const to = i === nodes.length - 1 ? 16383 : current + slotLength; + current = to; + + const hostPort = `${node.host}:${node.port}`; + const slotRange = `${from}-${to}`; + + // *2\r\n+host:port\r\n+slot-range\r\n + return `*2\r\n+${hostPort}\r\n+${slotRange}\r\n`; + }); + + // >3\r\n+SMIGRATED\r\n:{seqId}\r\n*{count}\r\n{entries} + const response = `>3\r\n+SMIGRATED\r\n:${seqId}\r\n*${ + nodes.length + }\r\n${nodeEntries.join("")}`; + return Buffer.from(response); +} diff --git a/packages/test-utils/lib/fault-injector/types.ts b/packages/test-utils/lib/fault-injector/types.ts new file mode 100644 index 0000000000..f73b474e2d --- /dev/null +++ b/packages/test-utils/lib/fault-injector/types.ts @@ -0,0 +1,49 @@ +export type ActionType = + | "dmc_restart" + | "failover" + | "reshard" + | "sequence_of_actions" + | "network_failure" + | "execute_rlutil_command" + | "execute_rladmin_command" + | "migrate" + | "bind" + | "update_cluster_config"; + +export interface ActionRequest { + type: ActionType; + parameters?: { + bdb_id?: string; + [key: string]: unknown; + }; +} + +export interface ActionStatus { + status: string; + error: unknown; + output: string; +} + +export interface IFaultInjectorClient { + triggerAction( + action: ActionRequest + ): Promise; + + getActionStatus(actionId: string): Promise; + + waitForAction( + actionId: string, + options?: { + timeoutMs?: number; + maxWaitTimeMs?: number; + } + ): Promise; + + migrateAndBindAction({ + bdbId, + clusterIndex, + }: { + bdbId: string | number; + clusterIndex: string | number; + }): Promise<{ action_id: string }>; +} diff --git a/packages/test-utils/lib/index.ts b/packages/test-utils/lib/index.ts index 8475c62d97..12874a5450 100644 --- a/packages/test-utils/lib/index.ts +++ b/packages/test-utils/lib/index.ts @@ -28,6 +28,8 @@ import * as os from 'node:os'; import * as path from 'node:path'; import { RedisProxy, getFreePortNumber } from './proxy/redis-proxy'; import ProxyController from './proxy/proxy-controller'; +import { ProxiedFaultInjectorClientForCluster } from './fault-injector'; +import { IFaultInjectorClient } from './fault-injector/types'; interface TestUtilsConfig { @@ -309,13 +311,28 @@ export default class TestUtils { TYPE_MAPPING extends TypeMapping = {} >( title: string, - fn: (proxiedClusterClient: RedisClusterType, proxyController: ProxyController) => unknown, - options: Omit, 'numberOfReplicas' | 'minimumDockerVersion' | 'serverArguments'> + fn: ( + proxiedClusterClient: RedisClusterType, + { + proxyController, + faultInjectorClient, + }: { + proxyController: ProxyController; + faultInjectorClient: IFaultInjectorClient; + } + ) => unknown, + options: Omit< + ClusterTestOptions, + "numberOfReplicas" | "minimumDockerVersion" | "serverArguments" + > ) { let spawnPromise: ReturnType; before(function () { this.timeout(30000); - spawnPromise = spawnProxiedRedisServer({ nOfProxies: options.numberOfMasters ?? 3, defaultInterceptors: ['cluster', 'hitless'] }); + spawnPromise = spawnProxiedRedisServer({ + nOfProxies: options.numberOfMasters ?? 3, + defaultInterceptors: ["cluster", "hitless", "logger"], + }); }); it(title, async function () { @@ -323,26 +340,28 @@ export default class TestUtils { const { ports, apiPort } = await spawnPromise; const cluster = createCluster({ - rootNodes: ports.map(port => ({ + rootNodes: ports.map((port) => ({ socket: { - port - } + port, + }, })), - minimizeConnections: options.clusterConfiguration?.minimizeConnections ?? true, - ...options.clusterConfiguration + ...options.clusterConfiguration, }); - const proxyController = new ProxyController(`http://localhost:${apiPort}`) + const proxyController = new ProxyController( + `http://localhost:${apiPort}` + ); + const faultInjectorClient = new ProxiedFaultInjectorClientForCluster(proxyController); - if(options.disableClusterSetup) { - return fn(cluster, proxyController); + if (options.disableClusterSetup) { + return fn(cluster, { proxyController, faultInjectorClient }); } await cluster.connect(); try { await TestUtils.#clusterFlushAll(cluster); - await fn(cluster, proxyController); + await fn(cluster, { proxyController, faultInjectorClient }); } finally { await TestUtils.#clusterFlushAll(cluster); cluster.destroy(); From 4f24f95e9f469723d2016d9b93b1405ae3126f3f Mon Sep 17 00:00:00 2001 From: Pavel Pashov Date: Thu, 11 Dec 2025 16:34:32 +0200 Subject: [PATCH 3/5] feat(test-utils): add RE database management and test utilities --- .../client/lib/cluster/maintenance.spec.ts | 15 +-- .../lib/fault-injector/db-config.ts | 62 ++++++++++ .../fault-injector/fault-injector-client.ts | 94 +++++++++++++- .../test-utils/lib/fault-injector/index.ts | 2 + .../proxied-fault-injector-cluster.ts | 42 +------ .../test-utils/lib/fault-injector/types.ts | 65 +++++++--- packages/test-utils/lib/index.ts | 115 +++++++++++++++++- 7 files changed, 318 insertions(+), 77 deletions(-) create mode 100644 packages/test-utils/lib/fault-injector/db-config.ts diff --git a/packages/client/lib/cluster/maintenance.spec.ts b/packages/client/lib/cluster/maintenance.spec.ts index 29d538e43a..090841b2c6 100644 --- a/packages/client/lib/cluster/maintenance.spec.ts +++ b/packages/client/lib/cluster/maintenance.spec.ts @@ -47,15 +47,12 @@ describe("Cluster Maintenance", () => { `should have ${MASTERS_COUNT} masters at start` ); - const { action_id: failoverActionId } = - await faultInjectorClient.triggerAction({ - type: "failover", - parameters: { - cluster_index: 0, - }, - }); - - await faultInjectorClient.waitForAction(failoverActionId); + await faultInjectorClient.triggerAction({ + type: "failover", + parameters: { + cluster_index: 0, + }, + }); const sMigratingEventCount = diagnosticEvents.filter( (event) => event.type === "SMIGRATING" diff --git a/packages/test-utils/lib/fault-injector/db-config.ts b/packages/test-utils/lib/fault-injector/db-config.ts new file mode 100644 index 0000000000..321b6eef34 --- /dev/null +++ b/packages/test-utils/lib/fault-injector/db-config.ts @@ -0,0 +1,62 @@ +export const CreateDatabaseConfigType = { + CLUSTER: "cluster", +} as const; + +type ConfigType = + (typeof CreateDatabaseConfigType)[keyof typeof CreateDatabaseConfigType]; + +// 10000-19999 +const randomPort = () => Math.floor(Math.random() * (19999 - 10000) + 10000); + +const DB_CONFIGS = { + [CreateDatabaseConfigType.CLUSTER]: ( + name: string, + port: number = randomPort(), + size: number = 1073741824 // 1GB + ) => { + return { + name: name, + port: port, + memory_size: size, + replication: true, + eviction_policy: "noeviction", + sharding: true, + auto_upgrade: true, + shards_count: 3, + module_list: [ + { + module_args: "", + module_name: "ReJSON", + }, + { + module_args: "", + module_name: "search", + }, + { + module_args: "", + module_name: "timeseries", + }, + { + module_args: "", + module_name: "bf", + }, + ], + oss_cluster: true, + oss_cluster_api_preferred_ip_type: "external", + proxy_policy: "all-master-shards", + shards_placement: "sparse", + shard_key_regex: [ + { + regex: ".*\\{(?.*)\\}.*", + }, + { + regex: "(?.*)", + }, + ], + }; + }, +}; + +export const getCreateDatabaseConfig = (type: ConfigType, name: string) => { + return DB_CONFIGS[type](name); +}; diff --git a/packages/test-utils/lib/fault-injector/fault-injector-client.ts b/packages/test-utils/lib/fault-injector/fault-injector-client.ts index 38a0d83274..50f658da85 100644 --- a/packages/test-utils/lib/fault-injector/fault-injector-client.ts +++ b/packages/test-utils/lib/fault-injector/fault-injector-client.ts @@ -1,6 +1,11 @@ import { setTimeout } from "node:timers/promises"; -import { ActionRequest, ActionStatus, IFaultInjectorClient } from "./types"; +import { + ActionRequest, + ActionStatus, + CreateDatabaseConfig, + IFaultInjectorClient, +} from "./types"; export class FaultInjectorClient implements IFaultInjectorClient { private baseUrl: string; @@ -24,10 +29,16 @@ export class FaultInjectorClient implements IFaultInjectorClient { * @param action The action request to trigger * @throws {Error} When the HTTP request fails or response cannot be parsed as JSON */ - public triggerAction( - action: ActionRequest - ): Promise { - return this.#request("POST", "/action", action); + public async triggerAction( + action: ActionRequest, + options?: { + timeoutMs?: number; + maxWaitTimeMs?: number; + } + ): Promise { + const { action_id } = await this.#request("POST", "/action", action); + + return this.waitForAction(action_id, options); } /** @@ -156,4 +167,77 @@ export class FaultInjectorClient implements IFaultInjectorClient { ); } } + + /** + * Deletes a database. + * @param clusterIndex The index of the cluster + * @param bdbId The database ID + * @throws {Error} When the HTTP request fails or response cannot be parsed as JSON + */ + public async deleteDatabase( + bdbId: number | string, + clusterIndex: number = 0 + ) { + return this.triggerAction({ + type: "delete_database", + parameters: { + cluster_index: clusterIndex, + bdb_id: bdbId.toString(), + }, + }); + } + + /** + * Deletes all databases. + * @param clusterIndex The index of the cluster + * @throws {Error} When the HTTP request fails or response cannot be parsed as JSON + */ + public async deleteAllDatabases(clusterIndex: number = 0) { + return this.triggerAction({ + type: "delete_database", + parameters: { + cluster_index: clusterIndex, + delete_all: true, + }, + }); + } + + /** + * Creates a new database. + * @param clusterIndex The index of the cluster + * @param databaseConfig The database configuration + * @throws {Error} When the HTTP request fails or response cannot be parsed as JSON + */ + public async createDatabase( + databaseConfig: CreateDatabaseConfig, + clusterIndex: number = 0 + ) { + const action = await this.triggerAction({ + type: "create_database", + parameters: { + cluster_index: clusterIndex, + database_config: databaseConfig, + }, + }); + + const dbConfig = + typeof action.output === "object" + ? action.output + : JSON.parse(action.output); + + const rawEndpoints = dbConfig.raw_endpoints[0]; + + if (!rawEndpoints) { + throw new Error("No endpoints found in database config"); + } + + return { + host: rawEndpoints.dns_name, + port: rawEndpoints.port, + password: dbConfig.password, + username: dbConfig.username, + tls: dbConfig.tls, + bdbId: dbConfig.bdb_id, + }; + } } diff --git a/packages/test-utils/lib/fault-injector/index.ts b/packages/test-utils/lib/fault-injector/index.ts index a22c8b0f1e..4eae755de8 100644 --- a/packages/test-utils/lib/fault-injector/index.ts +++ b/packages/test-utils/lib/fault-injector/index.ts @@ -1,2 +1,4 @@ export * from "./fault-injector-client"; export * from "./proxied-fault-injector-cluster"; +export * from "./types"; +export * from "./db-config"; diff --git a/packages/test-utils/lib/fault-injector/proxied-fault-injector-cluster.ts b/packages/test-utils/lib/fault-injector/proxied-fault-injector-cluster.ts index 1fe686a121..875e0482db 100644 --- a/packages/test-utils/lib/fault-injector/proxied-fault-injector-cluster.ts +++ b/packages/test-utils/lib/fault-injector/proxied-fault-injector-cluster.ts @@ -1,4 +1,3 @@ -import { randomUUID } from "node:crypto"; import { setTimeout } from "node:timers/promises"; import { ActionRequest, ActionStatus, IFaultInjectorClient } from "./types"; @@ -9,53 +8,16 @@ const TOTAL_SLOTS = 16384; export class ProxiedFaultInjectorClientForCluster implements IFaultInjectorClient { - private readonly results = new Map(); - constructor(private readonly proxyController: ProxyController) {} - async triggerAction( - action: ActionRequest - ): Promise { - const actionId = randomUUID(); - + async triggerAction(action: ActionRequest): Promise { switch (action.type) { case "failover": { - const result = await this.triggerFailover(); - this.results.set(actionId, result); - break; + return this.triggerFailover(); } default: throw new Error(`Action ${action.type} not implemented`); } - - return { action_id: actionId } as T; - } - - async getActionStatus(actionId: string): Promise { - throw new Error("Not implemented"); - } - - async waitForAction( - actionId: string, - _options?: { timeoutMs?: number; maxWaitTimeMs?: number } - ): Promise { - const result = this.results.get(actionId); - - if (!result) { - throw new Error(`Action ${actionId} not found`); - } - - return result; - } - - async migrateAndBindAction({ - bdbId, - clusterIndex, - }: { - bdbId: string | number; - clusterIndex: string | number; - }): Promise<{ action_id: string }> { - throw new Error("Not implemented"); } /** diff --git a/packages/test-utils/lib/fault-injector/types.ts b/packages/test-utils/lib/fault-injector/types.ts index f73b474e2d..f77662f1e1 100644 --- a/packages/test-utils/lib/fault-injector/types.ts +++ b/packages/test-utils/lib/fault-injector/types.ts @@ -8,7 +8,14 @@ export type ActionType = | "execute_rladmin_command" | "migrate" | "bind" - | "update_cluster_config"; + | "update_cluster_config" + | "shard_failure" + | "node_failure" + | "node_remove" + | "proxy_failure" + | "cluster_failure" + | "delete_database" + | "create_database"; export interface ActionRequest { type: ActionType; @@ -24,26 +31,44 @@ export interface ActionStatus { output: string; } -export interface IFaultInjectorClient { - triggerAction( - action: ActionRequest - ): Promise; +export interface DatabaseModule { + module_name: string; + // Add additional module properties as needed based on your Go struct + [key: string]: unknown; +} - getActionStatus(actionId: string): Promise; +export interface ShardKeyRegexPattern { + regex: string; + // Add additional pattern properties as needed based on your Go struct + [key: string]: unknown; +} - waitForAction( - actionId: string, - options?: { - timeoutMs?: number; - maxWaitTimeMs?: number; - } - ): Promise; +export interface CreateDatabaseConfig { + name: string; + port: number; + memory_size: number; + replication: boolean; + eviction_policy: string; + sharding: boolean; + auto_upgrade: boolean; + shards_count: number; + module_list?: DatabaseModule[]; + oss_cluster: boolean; + oss_cluster_api_preferred_ip_type?: string; + proxy_policy?: string; + shards_placement?: string; + shard_key_regex?: ShardKeyRegexPattern[]; +} - migrateAndBindAction({ - bdbId, - clusterIndex, - }: { - bdbId: string | number; - clusterIndex: string | number; - }): Promise<{ action_id: string }>; +export interface DatabaseConfig { + host: string; + port: number; + password: string; + username: string; + tls: boolean; + bdbId: number; +} + +export interface IFaultInjectorClient { + triggerAction(action: ActionRequest): Promise; } diff --git a/packages/test-utils/lib/index.ts b/packages/test-utils/lib/index.ts index 12874a5450..40ff3d2259 100644 --- a/packages/test-utils/lib/index.ts +++ b/packages/test-utils/lib/index.ts @@ -28,9 +28,15 @@ import * as os from 'node:os'; import * as path from 'node:path'; import { RedisProxy, getFreePortNumber } from './proxy/redis-proxy'; import ProxyController from './proxy/proxy-controller'; -import { ProxiedFaultInjectorClientForCluster } from './fault-injector'; -import { IFaultInjectorClient } from './fault-injector/types'; - +import { + CreateDatabaseConfig, + CreateDatabaseConfigType, + DatabaseConfig, + FaultInjectorClient, + getCreateDatabaseConfig, + IFaultInjectorClient, + ProxiedFaultInjectorClientForCluster, +} from "./fault-injector"; interface TestUtilsConfig { /** @@ -123,6 +129,24 @@ interface ClusterTestOptions< disableClusterSetup?: boolean; } +interface RETestOptions { + skipTest?: boolean; + testTimeout?: number; + dbConfig?: CreateDatabaseConfig; + dbName?: string; +} + +interface REClusterTestOptions< + M extends RedisModules, + F extends RedisFunctions, + S extends RedisScripts, + RESP extends RespVersions, + TYPE_MAPPING extends TypeMapping + // POLICIES extends CommandPolicies +> extends RETestOptions { + clusterConfiguration?: Partial>; +} + interface AllTestOptions< M extends RedisModules, F extends RedisFunctions, @@ -661,6 +685,91 @@ export default class TestUtils { this.testWithCluster(`cluster.${title}`, fn, options.cluster); } + testWithRECluster< + M extends RedisModules = {}, + F extends RedisFunctions = {}, + S extends RedisScripts = {}, + RESP extends RespVersions = 2, + TYPE_MAPPING extends TypeMapping = {} + >( + title: string, + fn: ( + cluster: RedisClusterType, + { + faultInjectorClient, + }: { + faultInjectorClient: IFaultInjectorClient; + } + ) => unknown, + options: REClusterTestOptions + ) { + + let faultInjectorClient: FaultInjectorClient; + let dbConfig: DatabaseConfig; + + before(async function () { + this.timeout(30000); + + const baseUrl = process.env.RE_FAULT_INJECTOR_URL; + + if (!baseUrl) { + throw new Error("RE_FAULT_INJECTOR_URL environment variable must be set"); + } + + faultInjectorClient = new FaultInjectorClient(baseUrl); + + await faultInjectorClient.deleteAllDatabases(0); + dbConfig = await faultInjectorClient.createDatabase( + options.dbConfig || + getCreateDatabaseConfig( + CreateDatabaseConfigType.CLUSTER, + options.dbName ?? `test-db-${Date.now()}` + ), + 0 + ); + }); + + after(async function () { + this.timeout(30000); + + await faultInjectorClient.deleteAllDatabases(0); + }); + + it(title, async function () { + if (options.skipTest) return this.skip(); + if (options.testTimeout) { + this.timeout(options.testTimeout); + } + + const { defaults, ...rest } = options.clusterConfiguration ?? {}; + + const cluster = createCluster({ + rootNodes: [ + { + socket: { + host: dbConfig.host, + port: dbConfig.port, + }, + }, + ], + defaults: { + password: dbConfig.password, + username: dbConfig.username, + ...defaults, + }, + ...rest, + }); + + await cluster.connect(); + + try { + await TestUtils.#clusterFlushAll(cluster); + await fn(cluster, { faultInjectorClient }); + } finally { + cluster.destroy(); + } + }); + } spawnRedisServer< M extends RedisModules = {}, From 72f044f6436d4010cfccac35c079956cfc999980 Mon Sep 17 00:00:00 2001 From: Pavel Pashov Date: Wed, 17 Dec 2025 17:42:47 +0200 Subject: [PATCH 4/5] fix: fix command queue extraction and prepend logic --- packages/client/lib/client/commands-queue.ts | 5 +++-- packages/client/lib/cluster/cluster-slots.ts | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 15afb859a4..d58e32329e 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -577,11 +577,12 @@ export default class RedisCommandsQueue { /** * Gets all commands from the write queue without removing them. */ - getAllCommands(): CommandToWrite[] { + extractAllCommands(): CommandToWrite[] { const result: CommandToWrite[] = []; let current = this.#toWrite.head; while(current) { result.push(current.value); + this.#toWrite.remove(current); current = current.next; } return result; @@ -595,7 +596,7 @@ export default class RedisCommandsQueue { return; } - for (let i = commands.length - 1; i <= 0; i--) { + for (let i = commands.length - 1; i >= 0; i--) { this.#toWrite.unshift(commands[i]); } } diff --git a/packages/client/lib/cluster/cluster-slots.ts b/packages/client/lib/cluster/cluster-slots.ts index 00351d6728..dfda846f7f 100644 --- a/packages/client/lib/cluster/cluster-slots.ts +++ b/packages/client/lib/cluster/cluster-slots.ts @@ -356,7 +356,7 @@ export default class RedisClusterSlots< } else { // If source shard doesnt have any slots left, this means we can safely move all commands to the new shard. // Same goes for sharded pub sub listeners - const normalCommandsToMove = sourceNode.client!._getQueue().getAllCommands(); + const normalCommandsToMove = sourceNode.client!._getQueue().extractAllCommands(); // 5. Prepend extracted commands, chans destMasterNode.client?._getQueue().prependCommandsToWrite(normalCommandsToMove); if('pubSub' in sourceNode) { From 60e8c6a4489fc7fe86d553163e7413ab3b2bab96 Mon Sep 17 00:00:00 2001 From: Pavel Pashov Date: Thu, 18 Dec 2025 16:35:38 +0200 Subject: [PATCH 5/5] test: add slot migration tests and refactor proxied fault injector --- .../client/lib/cluster/maintenance.spec.ts | 1088 ++++++++++++++++- packages/test-utils/lib/dockers.ts | 7 +- .../proxied-fault-injector-cluster.ts | 224 ++-- packages/test-utils/lib/index.ts | 44 +- 4 files changed, 1229 insertions(+), 134 deletions(-) diff --git a/packages/client/lib/cluster/maintenance.spec.ts b/packages/client/lib/cluster/maintenance.spec.ts index 090841b2c6..f19279cff6 100644 --- a/packages/client/lib/cluster/maintenance.spec.ts +++ b/packages/client/lib/cluster/maintenance.spec.ts @@ -1,11 +1,29 @@ import assert from "node:assert"; +import { setTimeout } from "node:timers/promises"; import diagnostics_channel from "node:diagnostics_channel"; import testUtils from "../test-utils"; import { DiagnosticsEvent } from "../client/enterprise-maintenance-manager"; describe("Cluster Maintenance", () => { - const MASTERS_COUNT = 3; + const KEYS = [ + "channel:11kv:1000", + "channel:osy:2000", + "channel:jn6:3000", + "channel:l00:4000", + "channel:4ez:5000", + "channel:4ek:6000", + "channel:9vn:7000", + "channel:dw1:8000", + "channel:9zi:9000", + "channel:4vl:10000", + "channel:utl:11000", + "channel:lyo:12000", + "channel:jzn:13000", + "channel:14uc:14000", + "channel:mz:15000", + "channel:d0v:16000", + ]; before(() => { process.env.REDIS_EMIT_DIAGNOSTICS = "1"; @@ -33,48 +51,56 @@ describe("Cluster Maintenance", () => { diagnostics_channel.unsubscribe("redis.maintenance", onMessage); }); - testUtils.testWithProxiedCluster( - "should handle failover", - async (cluster, { faultInjectorClient }) => { - assert.equal( - diagnosticEvents.length, - 0, - "should not have received any notifications yet" - ); - assert.equal( - cluster.masters.length, - MASTERS_COUNT, - `should have ${MASTERS_COUNT} masters at start` - ); - - await faultInjectorClient.triggerAction({ - type: "failover", - parameters: { - cluster_index: 0, + describe("Notifications", () => { + testUtils.testWithProxiedCluster( + "should NOT receive notifications when maintNotifications is disabled", + async (_cluster, faultInjectorClient) => { + assert.equal( + diagnosticEvents.length, + 0, + "should not have received any notifications yet" + ); + + // Trigger migration + await faultInjectorClient.triggerAction({ + type: "migrate", + parameters: { + cluster_index: 0, + }, + }); + + // Verify NO maintenance notifications received + assert.strictEqual( + diagnosticEvents.length, + 0, + "should NOT receive any SMIGRATING/SMIGRATED notifications when disabled" + ); + }, + { + numberOfMasters: 3, + clusterConfiguration: { + defaults: { + maintNotifications: "disabled", + }, + RESP: 3, }, - }); - - const sMigratingEventCount = diagnosticEvents.filter( - (event) => event.type === "SMIGRATING" - ).length; - assert( - sMigratingEventCount >= 1, - "should have received at least one SMIGRATING notification" - ); - const sMigratedEventCount = diagnosticEvents.filter( - (event) => event.type === "SMIGRATED" - ).length; - assert( - sMigratedEventCount >= 1, - "should have received at least one SMIGRATED notification" - ); - assert.equal( - cluster.masters.length, - MASTERS_COUNT - 1, - `should have ${MASTERS_COUNT - 1} masters after failover` - ); - }, - { + } + ); + }); + + describe("Migrate - source: dying -> dest: existing", () => { + const MASTERS_COUNT = 3; + const MIGRATE_ACTION = { + type: "migrate", + parameters: { + cluster_index: 0, + slot_migration: "all", + destination_type: "existing", + }, + } as const; + + const PROXIED_CLUSTER_OPTIONS = { + freshContainer: true, numberOfMasters: MASTERS_COUNT, clusterConfiguration: { defaults: { @@ -83,6 +109,980 @@ describe("Cluster Maintenance", () => { }, RESP: 3, }, - } - ); + } as const; + + testUtils.testWithProxiedCluster( + "normal - should handle migration", + async (cluster, faultInjectorClient) => { + const initialMasterAddresses = new Set( + cluster.masters.map((m) => m.address) + ); + + assert.equal( + diagnosticEvents.length, + 0, + "should not have received any notifications yet" + ); + assert.equal( + cluster.masters.length, + MASTERS_COUNT, + `should have ${MASTERS_COUNT} masters at start` + ); + + // Trigger migration + await faultInjectorClient.triggerAction(MIGRATE_ACTION); + + // Verify notifications were received + const sMigratingEventCount = diagnosticEvents.filter( + (event) => event.type === "SMIGRATING" + ).length; + assert( + sMigratingEventCount >= 1, + "should have received at least one SMIGRATING notification" + ); + const sMigratedEventCount = diagnosticEvents.filter( + (event) => event.type === "SMIGRATED" + ).length; + assert( + sMigratedEventCount >= 1, + "should have received at least one SMIGRATED notification" + ); + + // Verify topology changed + assert.equal( + cluster.masters.length, + MASTERS_COUNT - 1, + `should have ${MASTERS_COUNT - 1} masters after migrate` + ); + + // Verify at least one master address changed + const currentMasterAddresses = new Set( + cluster.masters.map((m) => m.address) + ); + + assert.notDeepStrictEqual( + currentMasterAddresses, + initialMasterAddresses, + "addresses should NOT be the same" + ); + + // Verify data is still accessible after migrate + for (const key of KEYS) { + await cluster.set(key, `updated-${key}`); + const value = await cluster.get(key); + assert.strictEqual( + value, + `updated-${key}`, + `New writes should succeed for ${key}` + ); + } + }, + PROXIED_CLUSTER_OPTIONS + ); + + testUtils.testWithProxiedCluster( + "sharded pubsub - should handle migration", + async (cluster, faultInjectorClient) => { + const stats: Record = {}; + for (const channel of KEYS) { + stats[channel] = { sent: 0, received: 0 }; + } + + for (const channel of KEYS) { + await cluster.sSubscribe(channel, (_msg, ch) => { + stats[ch].received++; + }); + } + + // Start publishing messages continuously + const publishController = new AbortController(); + const publishPromise = (async () => { + while (!publishController.signal.aborted) { + for (const channel of KEYS) { + cluster + .sPublish(channel, `${Date.now()}`) + .then(() => { + stats[channel].sent++; + }) + .catch(() => { + // Ignore publish errors during migrate + }); + } + await setTimeout(50); + } + })(); + + // Trigger migration during publishing + await faultInjectorClient.triggerAction(MIGRATE_ACTION); + + // Stop publishing + publishController.abort(); + await publishPromise; + + for (const channel of KEYS) { + assert.ok( + stats[channel].received <= stats[channel].sent, + `Channel ${channel}: received (${stats[channel].received}) should be <= sent (${stats[channel].sent}) during migrate` + ); + } + + // Wait for cluster to stabilize + await setTimeout(1000); + + // Reset stats for after-migration verification + for (const channel of KEYS) { + stats[channel] = { sent: 0, received: 0 }; + } + + // Publish messages after migration - all should be received + for (const channel of KEYS) { + for (let i = 0; i < 10; i++) { + await cluster.sPublish(channel, `after-${i}`); + stats[channel].sent++; + } + } + + // Wait for messages to be received + await setTimeout(500); + + // Verify all messages received after migration (subscription preserved) + for (const channel of KEYS) { + assert.strictEqual( + stats[channel].received, + stats[channel].sent, + `Channel ${channel}: all messages (${stats[channel].sent}) should be received after migrate - subscription preserved` + ); + } + }, + PROXIED_CLUSTER_OPTIONS + ); + + testUtils.testWithProxiedCluster( + "pubsub - should handle migration", + async (cluster, faultInjectorClient) => { + const stats: Record = {}; + for (const channel of KEYS) { + stats[channel] = { sent: 0, received: 0 }; + } + + for (const channel of KEYS) { + await cluster.subscribe(channel, (_msg, ch) => { + stats[ch].received++; + }); + } + + // Start publishing messages continuously + const publishController = new AbortController(); + const publishPromise = (async () => { + while (!publishController.signal.aborted) { + for (const channel of KEYS) { + cluster + .publish(channel, `${Date.now()}`) + .then(() => { + stats[channel].sent++; + }) + .catch(() => { + // Ignore publish errors during migrate + }); + } + await setTimeout(50); + } + })(); + + // Trigger migration during publishing + await faultInjectorClient.triggerAction(MIGRATE_ACTION); + + // Stop publishing + publishController.abort(); + await publishPromise; + + for (const channel of KEYS) { + assert.ok( + stats[channel].received <= stats[channel].sent, + `Channel ${channel}: received (${stats[channel].received}) should be <= sent (${stats[channel].sent}) during migrate` + ); + } + + // Wait for cluster to stabilize + await setTimeout(1000); + + // Reset stats for after-migration verification + for (const channel of KEYS) { + stats[channel] = { sent: 0, received: 0 }; + } + + // Publish messages after migration - all should be received + for (const channel of KEYS) { + for (let i = 0; i < 10; i++) { + await cluster.publish(channel, `after-${i}`); + stats[channel].sent++; + } + } + + // Wait for messages to be received + await setTimeout(500); + + // Verify all messages received after migration (subscription preserved) + for (const channel of KEYS) { + assert.strictEqual( + stats[channel].received, + stats[channel].sent, + `Channel ${channel}: all messages (${stats[channel].sent}) should be received after migrate - subscription preserved` + ); + } + }, + PROXIED_CLUSTER_OPTIONS + ); + }); + + describe("Migrate - source: dying -> dest: new", () => { + const MASTERS_NODES_COUNT = 3; + const VISIBLE_NODES_COUNT = 2; + const MIGRATE_ACTION = { + type: "migrate", + parameters: { + cluster_index: 0, + slot_migration: "all", + destination_type: "new", + }, + } as const; + + const PROXIED_CLUSTER_OPTIONS = { + freshContainer: true, + numberOfMasters: MASTERS_NODES_COUNT, + startWithReducedNodes: true, + clusterConfiguration: { + defaults: { + maintNotifications: "enabled", + maintEndpointType: "auto", + }, + RESP: 3, + }, + } as const; + + testUtils.testWithProxiedCluster( + "normal - should handle migration", + async (cluster, faultInjectorClient) => { + const initialMasterAddresses = new Set( + cluster.masters.map((m) => m.address) + ); + + assert.equal( + diagnosticEvents.length, + 0, + "should not have received any notifications yet" + ); + assert.equal( + cluster.masters.length, + VISIBLE_NODES_COUNT, + `should have ${VISIBLE_NODES_COUNT} masters at start` + ); + + // Trigger migration + await faultInjectorClient.triggerAction(MIGRATE_ACTION); + + // Wait for cluster to stabilize + await setTimeout(1000); + + // Verify notifications were received + const sMigratingEventCount = diagnosticEvents.filter( + (event) => event.type === "SMIGRATING" + ).length; + assert( + sMigratingEventCount >= 1, + "should have received at least one SMIGRATING notification" + ); + const sMigratedEventCount = diagnosticEvents.filter( + (event) => event.type === "SMIGRATED" + ).length; + assert( + sMigratedEventCount >= 1, + "should have received at least one SMIGRATED notification" + ); + + // Verify topology changed + assert.equal( + cluster.masters.length, + VISIBLE_NODES_COUNT, + `should have ${VISIBLE_NODES_COUNT} masters after migrate` + ); + + // Verify at least one master address changed + const currentMasterAddresses = new Set( + cluster.masters.map((m) => m.address) + ); + + assert.notDeepStrictEqual( + currentMasterAddresses, + initialMasterAddresses, + "addresses should NOT be the same" + ); + + // Verify data is still accessible after migrate + for (const key of KEYS) { + await cluster.set(key, `updated-${key}`); + const value = await cluster.get(key); + assert.strictEqual( + value, + `updated-${key}`, + `New writes should succeed for ${key}` + ); + } + }, + PROXIED_CLUSTER_OPTIONS + ); + + testUtils.testWithProxiedCluster( + "sharded pubsub - should handle migration", + async (cluster, faultInjectorClient) => { + const stats: Record = {}; + for (const channel of KEYS) { + stats[channel] = { sent: 0, received: 0 }; + } + + for (const channel of KEYS) { + await cluster.sSubscribe(channel, (_msg, ch) => { + stats[ch].received++; + }); + } + + // Start publishing messages continuously + const publishController = new AbortController(); + const publishPromise = (async () => { + while (!publishController.signal.aborted) { + for (const channel of KEYS) { + cluster + .sPublish(channel, `${Date.now()}`) + .then(() => { + stats[channel].sent++; + }) + .catch(() => { + // Ignore publish errors during migrate + }); + } + await setTimeout(50); + } + })(); + + // Trigger migration during publishing + await faultInjectorClient.triggerAction(MIGRATE_ACTION); + + // Stop publishing + publishController.abort(); + await publishPromise; + + for (const channel of KEYS) { + assert.ok( + stats[channel].received <= stats[channel].sent, + `Channel ${channel}: received (${stats[channel].received}) should be <= sent (${stats[channel].sent}) during migrate` + ); + } + + // Wait for cluster to stabilize + await setTimeout(1000); + + // Reset stats for after-migration verification + for (const channel of KEYS) { + stats[channel] = { sent: 0, received: 0 }; + } + + // Publish messages after migration - all should be received + for (const channel of KEYS) { + for (let i = 0; i < 10; i++) { + await cluster.sPublish(channel, `after-${i}`); + stats[channel].sent++; + } + } + + // Wait for messages to be received + await setTimeout(500); + + // Verify all messages received after migration (subscription preserved) + for (const channel of KEYS) { + assert.strictEqual( + stats[channel].received, + stats[channel].sent, + `Channel ${channel}: all messages (${stats[channel].sent}) should be received after migrate - subscription preserved` + ); + } + }, + PROXIED_CLUSTER_OPTIONS + ); + + testUtils.testWithProxiedCluster( + "pubsub - should handle migration", + async (cluster, faultInjectorClient) => { + const stats: Record = {}; + for (const channel of KEYS) { + stats[channel] = { sent: 0, received: 0 }; + } + + for (const channel of KEYS) { + await cluster.subscribe(channel, (_msg, ch) => { + stats[ch].received++; + }); + } + + // Start publishing messages continuously + const publishController = new AbortController(); + const publishPromise = (async () => { + while (!publishController.signal.aborted) { + for (const channel of KEYS) { + cluster + .publish(channel, `${Date.now()}`) + .then(() => { + stats[channel].sent++; + }) + .catch(() => { + // Ignore publish errors during migrate + }); + } + await setTimeout(50); + } + })(); + + // Trigger migration during publishing + await faultInjectorClient.triggerAction(MIGRATE_ACTION); + + // Stop publishing + publishController.abort(); + await publishPromise; + + for (const channel of KEYS) { + assert.ok( + stats[channel].received <= stats[channel].sent, + `Channel ${channel}: received (${stats[channel].received}) should be <= sent (${stats[channel].sent}) during migrate` + ); + } + + // Wait for cluster to stabilize + await setTimeout(1000); + + // Reset stats for after-migration verification + for (const channel of KEYS) { + stats[channel] = { sent: 0, received: 0 }; + } + + // Publish messages after migration - all should be received + for (const channel of KEYS) { + for (let i = 0; i < 10; i++) { + await cluster.publish(channel, `after-${i}`); + stats[channel].sent++; + } + } + + // Wait for messages to be received + await setTimeout(500); + + // Verify all messages received after migration (subscription preserved) + for (const channel of KEYS) { + assert.strictEqual( + stats[channel].received, + stats[channel].sent, + `Channel ${channel}: all messages (${stats[channel].sent}) should be received after migrate - subscription preserved` + ); + } + }, + PROXIED_CLUSTER_OPTIONS + ); + }); + + describe("Migrate - source: active -> dest: existing", () => { + const MASTERS_COUNT = 3; + const MIGRATE_ACTION = { + type: "migrate", + parameters: { + cluster_index: 0, + slot_migration: "half", + destination_type: "existing", + }, + } as const; + + const PROXIED_CLUSTER_OPTIONS = { + numberOfMasters: MASTERS_COUNT, + freshContainer: true, + clusterConfiguration: { + defaults: { + maintNotifications: "enabled", + maintEndpointType: "auto", + }, + RESP: 3, + }, + } as const; + + testUtils.testWithProxiedCluster( + "normal - should handle migration", + async (cluster, faultInjectorClient) => { + const initialMasterAddresses = new Set( + cluster.masters.map((m) => m.address) + ); + + assert.equal( + diagnosticEvents.length, + 0, + "should not have received any notifications yet" + ); + assert.equal( + cluster.masters.length, + MASTERS_COUNT, + `should have ${MASTERS_COUNT} masters at start` + ); + + // Trigger migration + await faultInjectorClient.triggerAction(MIGRATE_ACTION); + + // Verify notifications were received + const sMigratingEventCount = diagnosticEvents.filter( + (event) => event.type === "SMIGRATING" + ).length; + assert( + sMigratingEventCount >= 1, + "should have received at least one SMIGRATING notification" + ); + const sMigratedEventCount = diagnosticEvents.filter( + (event) => event.type === "SMIGRATED" + ).length; + assert( + sMigratedEventCount >= 1, + "should have received at least one SMIGRATED notification" + ); + + // Verify topology changed + assert.equal( + cluster.masters.length, + MASTERS_COUNT, + `should have ${MASTERS_COUNT} masters after migrate` + ); + + // Verify at least no master address changed + const currentMasterAddresses = new Set( + cluster.masters.map((m) => m.address) + ); + + assert.deepStrictEqual( + currentMasterAddresses, + initialMasterAddresses, + "addresses should NOT be the same" + ); + + // Verify data is still accessible after migrate + for (const key of KEYS) { + await cluster.set(key, `updated-${key}`); + const value = await cluster.get(key); + assert.strictEqual( + value, + `updated-${key}`, + `New writes should succeed for ${key}` + ); + } + }, + PROXIED_CLUSTER_OPTIONS + ); + + testUtils.testWithProxiedCluster( + "sharded pubsub - should handle migration", + async (cluster, faultInjectorClient) => { + const stats: Record = {}; + for (const channel of KEYS) { + stats[channel] = { sent: 0, received: 0 }; + } + + for (const channel of KEYS) { + await cluster.sSubscribe(channel, (_msg, ch) => { + stats[ch].received++; + }); + } + + // Start publishing messages continuously + const publishController = new AbortController(); + const publishPromise = (async () => { + while (!publishController.signal.aborted) { + for (const channel of KEYS) { + cluster + .sPublish(channel, `${Date.now()}`) + .then(() => { + stats[channel].sent++; + }) + .catch(() => { + // Ignore publish errors during migrate + }); + } + await setTimeout(50); + } + })(); + + // Trigger migration during publishing + await faultInjectorClient.triggerAction(MIGRATE_ACTION); + + // Stop publishing + publishController.abort(); + await publishPromise; + + for (const channel of KEYS) { + assert.ok( + stats[channel].received <= stats[channel].sent, + `Channel ${channel}: received (${stats[channel].received}) should be <= sent (${stats[channel].sent}) during migrate` + ); + } + + // Wait for cluster to stabilize + await setTimeout(1000); + + // Reset stats for after-migration verification + for (const channel of KEYS) { + stats[channel] = { sent: 0, received: 0 }; + } + + // Publish messages after migration - all should be received + for (const channel of KEYS) { + for (let i = 0; i < 10; i++) { + await cluster.sPublish(channel, `after-${i}`); + stats[channel].sent++; + } + } + + // Wait for messages to be received + await setTimeout(500); + + // Verify all messages received after migration (subscription preserved) + for (const channel of KEYS) { + assert.strictEqual( + stats[channel].received, + stats[channel].sent, + `Channel ${channel}: all messages (${stats[channel].sent}) should be received after migrate - subscription preserved` + ); + } + }, + PROXIED_CLUSTER_OPTIONS + ); + + testUtils.testWithProxiedCluster( + "pubsub - should handle migration", + async (cluster, faultInjectorClient) => { + const stats: Record = {}; + for (const channel of KEYS) { + stats[channel] = { sent: 0, received: 0 }; + } + + for (const channel of KEYS) { + await cluster.subscribe(channel, (_msg, ch) => { + stats[ch].received++; + }); + } + + // Start publishing messages continuously + const publishController = new AbortController(); + const publishPromise = (async () => { + while (!publishController.signal.aborted) { + for (const channel of KEYS) { + cluster + .publish(channel, `${Date.now()}`) + .then(() => { + stats[channel].sent++; + }) + .catch(() => { + // Ignore publish errors during migrate + }); + } + await setTimeout(50); + } + })(); + + // Trigger migration during publishing + await faultInjectorClient.triggerAction(MIGRATE_ACTION); + + // Stop publishing + publishController.abort(); + await publishPromise; + + for (const channel of KEYS) { + assert.ok( + stats[channel].received <= stats[channel].sent, + `Channel ${channel}: received (${stats[channel].received}) should be <= sent (${stats[channel].sent}) during migrate` + ); + } + + // Wait for cluster to stabilize + await setTimeout(1000); + + // Reset stats for after-migration verification + for (const channel of KEYS) { + stats[channel] = { sent: 0, received: 0 }; + } + + // Publish messages after migration - all should be received + for (const channel of KEYS) { + for (let i = 0; i < 10; i++) { + await cluster.publish(channel, `after-${i}`); + stats[channel].sent++; + } + } + + // Wait for messages to be received + await setTimeout(500); + + // Verify all messages received after migration (subscription preserved) + for (const channel of KEYS) { + assert.strictEqual( + stats[channel].received, + stats[channel].sent, + `Channel ${channel}: all messages (${stats[channel].sent}) should be received after migrate - subscription preserved` + ); + } + }, + PROXIED_CLUSTER_OPTIONS + ); + }); + + describe("Migrate - source: active -> dest: new", () => { + const MASTERS_NODES_COUNT = 3; + const VISIBLE_NODES_COUNT = 2; + const MIGRATE_ACTION = { + type: "migrate", + parameters: { + cluster_index: 0, + slot_migration: "half", + destination_type: "new", + }, + } as const; + + const PROXIED_CLUSTER_OPTIONS = { + numberOfMasters: MASTERS_NODES_COUNT, + startWithReducedNodes: true, + freshContainer: true, + clusterConfiguration: { + defaults: { + maintNotifications: "enabled", + maintEndpointType: "auto", + }, + RESP: 3, + }, + } as const; + + testUtils.testWithProxiedCluster( + "normal - should handle migration", + async (cluster, faultInjectorClient) => { + const initialMasterAddresses = new Set( + cluster.masters.map((m) => m.address) + ); + + assert.equal( + diagnosticEvents.length, + 0, + "should not have received any notifications yet" + ); + assert.equal( + cluster.masters.length, + VISIBLE_NODES_COUNT, + `should have ${VISIBLE_NODES_COUNT} masters at start` + ); + + // Trigger migration + await faultInjectorClient.triggerAction(MIGRATE_ACTION); + + // Wait for cluster to stabilize + await setTimeout(1000); + + // Verify notifications were received + const sMigratingEventCount = diagnosticEvents.filter( + (event) => event.type === "SMIGRATING" + ).length; + assert( + sMigratingEventCount >= 1, + "should have received at least one SMIGRATING notification" + ); + const sMigratedEventCount = diagnosticEvents.filter( + (event) => event.type === "SMIGRATED" + ).length; + assert( + sMigratedEventCount >= 1, + "should have received at least one SMIGRATED notification" + ); + + // Verify topology changed + assert.equal( + cluster.masters.length, + VISIBLE_NODES_COUNT + 1, + `should have ${VISIBLE_NODES_COUNT + 1} masters after migrate` + ); + + // Verify at least one master address changed + const currentMasterAddresses = new Set( + cluster.masters.map((m) => m.address) + ); + + assert.notDeepStrictEqual( + currentMasterAddresses, + initialMasterAddresses, + "addresses should NOT be the same" + ); + + // Verify data is still accessible after migrate + for (const key of KEYS) { + await cluster.set(key, `updated-${key}`); + const value = await cluster.get(key); + assert.strictEqual( + value, + `updated-${key}`, + `New writes should succeed for ${key}` + ); + } + }, + PROXIED_CLUSTER_OPTIONS + ); + + testUtils.testWithProxiedCluster( + "sharded pubsub - should handle migration", + async (cluster, faultInjectorClient) => { + const stats: Record = {}; + for (const channel of KEYS) { + stats[channel] = { sent: 0, received: 0 }; + } + + for (const channel of KEYS) { + await cluster.sSubscribe(channel, (_msg, ch) => { + stats[ch].received++; + }); + } + + // Start publishing messages continuously + const publishController = new AbortController(); + const publishPromise = (async () => { + while (!publishController.signal.aborted) { + for (const channel of KEYS) { + cluster + .sPublish(channel, `${Date.now()}`) + .then(() => { + stats[channel].sent++; + }) + .catch(() => { + // Ignore publish errors during migrate + }); + } + await setTimeout(50); + } + })(); + + // Trigger migration during publishing + await faultInjectorClient.triggerAction(MIGRATE_ACTION); + + // Stop publishing + publishController.abort(); + await publishPromise; + + for (const channel of KEYS) { + assert.ok( + stats[channel].received <= stats[channel].sent, + `Channel ${channel}: received (${stats[channel].received}) should be <= sent (${stats[channel].sent}) during migrate` + ); + } + + // Wait for cluster to stabilize + await setTimeout(1000); + + // Reset stats for after-migration verification + for (const channel of KEYS) { + stats[channel] = { sent: 0, received: 0 }; + } + + // Publish messages after migration - all should be received + for (const channel of KEYS) { + for (let i = 0; i < 10; i++) { + await cluster.sPublish(channel, `after-${i}`); + stats[channel].sent++; + } + } + + // Wait for messages to be received + await setTimeout(500); + + // Verify all messages received after migration (subscription preserved) + for (const channel of KEYS) { + assert.strictEqual( + stats[channel].received, + stats[channel].sent, + `Channel ${channel}: all messages (${stats[channel].sent}) should be received after migrate - subscription preserved` + ); + } + }, + PROXIED_CLUSTER_OPTIONS + ); + + testUtils.testWithProxiedCluster( + "pubsub - should handle migration", + async (cluster, faultInjectorClient) => { + const stats: Record = {}; + for (const channel of KEYS) { + stats[channel] = { sent: 0, received: 0 }; + } + + for (const channel of KEYS) { + await cluster.subscribe(channel, (_msg, ch) => { + stats[ch].received++; + }); + } + + // Start publishing messages continuously + const publishController = new AbortController(); + const publishPromise = (async () => { + while (!publishController.signal.aborted) { + for (const channel of KEYS) { + cluster + .publish(channel, `${Date.now()}`) + .then(() => { + stats[channel].sent++; + }) + .catch(() => { + // Ignore publish errors during migrate + }); + } + await setTimeout(50); + } + })(); + + // Trigger migration during publishing + await faultInjectorClient.triggerAction(MIGRATE_ACTION); + + // Stop publishing + publishController.abort(); + await publishPromise; + + for (const channel of KEYS) { + assert.ok( + stats[channel].received <= stats[channel].sent, + `Channel ${channel}: received (${stats[channel].received}) should be <= sent (${stats[channel].sent}) during migrate` + ); + } + + // Wait for cluster to stabilize + await setTimeout(1000); + + // Reset stats for after-migration verification + for (const channel of KEYS) { + stats[channel] = { sent: 0, received: 0 }; + } + + // Publish messages after migration - all should be received + for (const channel of KEYS) { + for (let i = 0; i < 10; i++) { + await cluster.publish(channel, `after-${i}`); + stats[channel].sent++; + } + } + + // Wait for messages to be received + await setTimeout(500); + + // Verify all messages received after migration (subscription preserved) + for (const channel of KEYS) { + assert.strictEqual( + stats[channel].received, + stats[channel].sent, + `Channel ${channel}: all messages (${stats[channel].sent}) should be received after migrate - subscription preserved` + ); + } + }, + PROXIED_CLUSTER_OPTIONS + ); + }); }); diff --git a/packages/test-utils/lib/dockers.ts b/packages/test-utils/lib/dockers.ts index 0de3601dd3..2d75c45188 100644 --- a/packages/test-utils/lib/dockers.ts +++ b/packages/test-utils/lib/dockers.ts @@ -1,4 +1,5 @@ import { RedisClusterClientOptions } from '@redis/client/dist/lib/cluster'; +import { randomUUID } from 'node:crypto'; import { createConnection } from 'node:net'; import { once } from 'node:events'; import { createClient } from '@redis/client/index'; @@ -126,12 +127,16 @@ export interface ProxiedRedisServerDocker { export interface ProxiedRedisServerConfig { nOfProxies: number, defaultInterceptors: ('cluster'|'hitless'|'logger')[] + freshContainer?: boolean; } const RUNNING_PROXIED_SERVERS = new Map>(); export async function spawnProxiedRedisServer(config: ProxiedRedisServerConfig): Promise { - const key = JSON.stringify(config); + const key = JSON.stringify({ + ...config, + ...(config.freshContainer ? { randomKey: randomUUID() } : {}) + }); const runningServer = RUNNING_PROXIED_SERVERS.get(key); if (runningServer) { return runningServer; diff --git a/packages/test-utils/lib/fault-injector/proxied-fault-injector-cluster.ts b/packages/test-utils/lib/fault-injector/proxied-fault-injector-cluster.ts index 875e0482db..cc2247673a 100644 --- a/packages/test-utils/lib/fault-injector/proxied-fault-injector-cluster.ts +++ b/packages/test-utils/lib/fault-injector/proxied-fault-injector-cluster.ts @@ -5,6 +5,14 @@ import ProxyController from "../proxy/proxy-controller"; const TOTAL_SLOTS = 16384; +interface MigrateParameters { + /** 'all' = source loses ALL its slots, 'half' = source loses half its slots */ + slot_migration: "all" | "half"; + + /** 'existing' = target node visible in CLUSTER SLOTS, 'new' = hidden node */ + destination_type: "existing" | "new"; +} + export class ProxiedFaultInjectorClientForCluster implements IFaultInjectorClient { @@ -12,39 +20,84 @@ export class ProxiedFaultInjectorClientForCluster async triggerAction(action: ActionRequest): Promise { switch (action.type) { - case "failover": { - return this.triggerFailover(); + case "migrate": { + return this.triggerMigrate( + action.parameters as unknown as MigrateParameters + ); } default: throw new Error(`Action ${action.type} not implemented`); } } + async getProxyNodes() { + const nodes = await this.proxyController.getNodes(); + return nodes.ids.map(parseNodeId); + } + + async updateClusterSlots(nodes: ProxyNode[]) { + return this.proxyController.addInterceptor( + "cluster", + "*2\r\n$7\r\ncluster\r\n$5\r\nslots\r\n", + buildClusterSlotsResponse(nodes, "raw"), + "raw" + ); + } + /** - * Simulates a Redis Enterprise hitless failover by sending SMIGRATING and SMIGRATED - * push notifications to the client. The migrating slot range is calculated as 1/N of total - * slots (where N = number of master nodes) to simulate removing one node from the cluster. + * Simulates a Redis cluster slot migration by sending SMIGRATING and SMIGRATED + * push notifications to the client. + * + * The migration process: + * 1. Retrieves cluster nodes and their connections + * 2. Calculates the slot range to migrate based on `slotMigration` parameter: + * - 'all': migrates all slots from the first node (0 to totalSlots/N - 1) + * - 'half': migrates half the slots from the first node (0 to totalSlots/N/2 - 1) + * 3. Sends SMIGRATING notification to the source node's first connection + * 4. Waits 2 seconds to simulate maintenance delay + * 5. Determines destination node based on `destinationType` parameter: + * - 'new': uses a node with no active connections (hidden node) + * - 'existing': uses the last node in the cluster (visible in CLUSTER SLOTS) + * 6. Sends SMIGRATED notification to complete the migration * - * @returns Promise resolving to ActionStatus on success - * @throws Error if notification delivery fails + * @param params - Migration configuration parameters + * @param params.slot_migration - Determines how many slots to migrate: 'all' or 'half' + * @param params.destination_type - Determines target node type: 'existing' or 'new' + * @private */ - private async triggerFailover(): Promise { - const nodes = await this.proxyController.getNodes(); - const nodeId = nodes.ids[0]; - const connections = await this.proxyController.getConnections(); - const connectionIds = connections[nodeId]; - - // 1. Calculate migrating slot range and send SMIGRATING notification - const slots = `0-${Math.floor(TOTAL_SLOTS / nodes.ids.length)}`; - const seqId = Date.now().toString(); - const sMigratingNotification = Buffer.from( - `>3\r\n+SMIGRATING\r\n:${seqId}\r\n+${slots}\r\n` - ).toString("base64"); + private async triggerMigrate( + params: MigrateParameters + ): Promise { + const [nodes, connections] = await Promise.all([ + this.getProxyNodes(), + this.proxyController.getConnections(), + ]); + + /** + * This is an assumption that there is at possibility of at least one node with no connections + * However for this to work correctly the cluster client should NOT be using the `minimizeConnections` + */ + const nodeWithoutConnections = nodes.find((node) => { + const connIds = connections[node.id]; + return !connIds || connIds.length === 0; + }); + + const visibleNodesCount = nodeWithoutConnections + ? nodes.length - 1 + : nodes.length; + + const shouldMigrateHalfSlots = params.slot_migration === "half"; + const slots = shouldMigrateHalfSlots + ? `0-${Math.floor(TOTAL_SLOTS / visibleNodesCount / 2) - 1}` + : `0-${Math.floor(TOTAL_SLOTS / visibleNodesCount) - 1}`; + + const sMigratingNotification = buildSMigratingNotification(slots); + + const sourceNode = nodes[0]; const migratingResult = await this.proxyController.sendToClient( - connectionIds[0], - sMigratingNotification, - "base64" + connections[sourceNode.id][0], + sMigratingNotification ); if (!migratingResult.success) { @@ -56,15 +109,30 @@ export class ProxiedFaultInjectorClientForCluster // 2. Simulate maintenance delay await setTimeout(2_000); - // 3. Calculate remaining nodes, build and send SMIGRATED notification - const remainingNodes = nodes.ids.slice(1).map(parseNodeId); - const sMigratedNotification = - buildSMigratedNotification(remainingNodes).toString("base64"); + const isNewDestination = params.destination_type === "new"; + + if (isNewDestination && !nodeWithoutConnections) { + throw new Error(`[Proxy] No node with no connections`); + } + + const destinationNode = isNewDestination + ? nodeWithoutConnections! + : nodes.at(-1)!; // Get the last node as the destination + + if (!destinationNode) { + throw new Error(`[Proxy] No destination node`); + } + + const sMigratedNotification = buildSMigratedNotification([ + { + targetNode: destinationNode, + slotRanges: slots, + }, + ]); const migratedResult = await this.proxyController.sendToClient( - connectionIds[0], - sMigratedNotification, - "base64" + connections[sourceNode.id][0], + sMigratedNotification ); if (!migratedResult.success) { @@ -76,33 +144,11 @@ export class ProxiedFaultInjectorClientForCluster return { status: "success", error: null, - output: "Failover completed!", + output: "Migration completed!", }; } } -// TODO Check if we need this -// function buildClusterSlotsResponse(nodes: ProxyNode[]): Buffer { -// if (nodes.length === 0) { -// return Buffer.from("*0\r\n"); -// } - -// const totalSlots = 16384; -// const slotLength = Math.floor(totalSlots / nodes.length); - -// let current = -1; -// const mapping = nodes.map((node, i) => { -// const from = current + 1; -// const to = i === nodes.length - 1 ? 16383 : current + slotLength; -// current = to; -// const id = `proxy-id-${node.port}`; -// return `*3\r\n:${from}\r\n:${to}\r\n*3\r\n$${node.host.length}\r\n${node.host}\r\n:${node.port}\r\n$${id.length}\r\n${id}\r\n`; -// }); - -// const response = `*${nodes.length}\r\n${mapping.join("")}`; -// return Buffer.from(response); -// } - interface ProxyNode { id: string; host: string; @@ -122,32 +168,68 @@ const parseNodeId = (id: string): ProxyNode => { }; }; -function buildSMigratedNotification( - nodes: ProxyNode[], - seqId: number = Date.now() -): Buffer { - if (nodes.length === 0) { - return Buffer.from(`>3\r\n+SMIGRATED\r\n:${seqId}\r\n*0\r\n`); +const buildSMigratedNotification = ( + movedSlotsByDestination: Array<{ + targetNode: { host: string; port: number }; + slotRanges: string; // e.g., "0-5460" or "0-100,200-300,500" + }>, + seqId: number = 1, + encoding: "base64" | "raw" = "base64" +): string => { + if (movedSlotsByDestination.length === 0) { + throw new Error("No slots to migrate"); } - const slotLength = Math.floor(TOTAL_SLOTS / nodes.length); + const entries = movedSlotsByDestination.map(({ targetNode, slotRanges }) => { + const hostPort = `${targetNode.host}:${targetNode.port}`; + return `*2\r\n+${hostPort}\r\n+${slotRanges}\r\n`; + }); + + const response = `>3\r\n+SMIGRATED\r\n:${seqId}\r\n*${ + movedSlotsByDestination.length + }\r\n${entries.join("")}`; + + return encoding === "raw" + ? response + : Buffer.from(response).toString(encoding); +}; + +const buildClusterSlotsResponse = ( + nodes: ProxyNode[], + encoding: "base64" | "raw" = "base64" +): string => { + const TOTAL_SLOTS = 16384; + const slotSize = Math.floor(TOTAL_SLOTS / nodes.length); let current = -1; - const nodeEntries = nodes.map((node, i) => { + const slots = nodes.map((node, i) => { const from = current + 1; - const to = i === nodes.length - 1 ? 16383 : current + slotLength; + const to = i === nodes.length - 1 ? 16383 : current + slotSize; current = to; - const hostPort = `${node.host}:${node.port}`; - const slotRange = `${from}-${to}`; + // Use proxyPort as the port clients connect to + const host = node.host; + const port = node.port; + const nodeId = `proxy-${port}`; - // *2\r\n+host:port\r\n+slot-range\r\n - return `*2\r\n+${hostPort}\r\n+${slotRange}\r\n`; + return `*3\r\n:${from}\r\n:${to}\r\n*3\r\n$${host.length}\r\n${host}\r\n:${port}\r\n$${nodeId.length}\r\n${nodeId}\r\n`; }); - // >3\r\n+SMIGRATED\r\n:{seqId}\r\n*{count}\r\n{entries} - const response = `>3\r\n+SMIGRATED\r\n:${seqId}\r\n*${ - nodes.length - }\r\n${nodeEntries.join("")}`; - return Buffer.from(response); -} + return encoding === "raw" + ? `*${nodes.length}\r\n${slots.join("")}` + : Buffer.from(`*${nodes.length}\r\n${slots.join("")}`).toString(encoding); +}; + +const buildSMigratingNotification = ( + slotRanges: string, + seqId: number = 1, + encoding: "base64" | "raw" = "base64" +) => { + const response = `>3\r\n+SMIGRATING\r\n:${seqId}\r\n+${slotRanges}\r\n`; + + if (encoding === "raw") { + return response; + } + + return Buffer.from(response).toString(encoding); +}; diff --git a/packages/test-utils/lib/index.ts b/packages/test-utils/lib/index.ts index 40ff3d2259..e085945a37 100644 --- a/packages/test-utils/lib/index.ts +++ b/packages/test-utils/lib/index.ts @@ -337,18 +337,15 @@ export default class TestUtils { title: string, fn: ( proxiedClusterClient: RedisClusterType, - { - proxyController, - faultInjectorClient, - }: { - proxyController: ProxyController; - faultInjectorClient: IFaultInjectorClient; - } + faultInjectorClient: IFaultInjectorClient ) => unknown, options: Omit< ClusterTestOptions, "numberOfReplicas" | "minimumDockerVersion" | "serverArguments" - > + > & { + startWithReducedNodes?: boolean; + freshContainer?: boolean; + } ) { let spawnPromise: ReturnType; before(function () { @@ -356,36 +353,47 @@ export default class TestUtils { spawnPromise = spawnProxiedRedisServer({ nOfProxies: options.numberOfMasters ?? 3, defaultInterceptors: ["cluster", "hitless", "logger"], + freshContainer: options.freshContainer, }); }); it(title, async function () { if (!spawnPromise) return this.skip(); - const { ports, apiPort } = await spawnPromise; + const { apiPort } = await spawnPromise; + + const proxyController = new ProxyController( + `http://localhost:${apiPort}` + ); + + const faultInjectorClient = new ProxiedFaultInjectorClientForCluster( + proxyController + ); + + const nodes = await faultInjectorClient.getProxyNodes(); + + if (options.startWithReducedNodes) { + nodes.pop(); + await faultInjectorClient.updateClusterSlots(nodes); + } const cluster = createCluster({ - rootNodes: ports.map((port) => ({ + rootNodes: nodes.map((n) => ({ socket: { - port, + port: n.port, }, })), ...options.clusterConfiguration, }); - const proxyController = new ProxyController( - `http://localhost:${apiPort}` - ); - const faultInjectorClient = new ProxiedFaultInjectorClientForCluster(proxyController); - if (options.disableClusterSetup) { - return fn(cluster, { proxyController, faultInjectorClient }); + return fn(cluster, faultInjectorClient); } await cluster.connect(); try { await TestUtils.#clusterFlushAll(cluster); - await fn(cluster, { proxyController, faultInjectorClient }); + await fn(cluster, faultInjectorClient); } finally { await TestUtils.#clusterFlushAll(cluster); cluster.destroy();