Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions packages/client/lib/client/commands-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -577,11 +577,12 @@ export default class RedisCommandsQueue {
/**
* Gets all commands from the write queue without removing them.
*/
getAllCommands(): CommandToWrite[] {
extractAllCommands(): CommandToWrite[] {
const result: CommandToWrite[] = [];
let current = this.#toWrite.head;
while(current) {
result.push(current.value);
this.#toWrite.remove(current);
current = current.next;
}
return result;
Expand All @@ -591,7 +592,11 @@ export default class RedisCommandsQueue {
* Prepends commands to the write queue in reverse.
*/
prependCommandsToWrite(commands: CommandToWrite[]) {
for (let i = commands.length - 1; i <= 0; i--) {
if (!commands.length) {
return;
}

for (let i = commands.length - 1; i >= 0; i--) {
this.#toWrite.unshift(commands[i]);
}
}
Expand Down
2 changes: 1 addition & 1 deletion packages/client/lib/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1018,7 +1018,7 @@ export default class RedisClient<
* @internal
*/
_getQueue(): RedisCommandsQueue {
return this.#queue;
return this._self.#queue;
}

/**
Expand Down
10 changes: 7 additions & 3 deletions packages/client/lib/cluster/cluster-slots.ts
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ export default class RedisClusterSlots<
} else {
// If source shard doesnt have any slots left, this means we can safely move all commands to the new shard.
// Same goes for sharded pub sub listeners
const normalCommandsToMove = sourceNode.client!._getQueue().getAllCommands();
const normalCommandsToMove = sourceNode.client!._getQueue().extractAllCommands();
// 5. Prepend extracted commands, chans
destMasterNode.client?._getQueue().prependCommandsToWrite(normalCommandsToMove);
if('pubSub' in sourceNode) {
Expand All @@ -371,9 +371,13 @@ export default class RedisClusterSlots<
this.nodeByAddress.delete(sourceAddress);

// 4.3 Kill because no slots are pointing to it anymore
await sourceNode.client?.close()
if (sourceNode.client?.isOpen) {
await sourceNode.client?.close()
}
if('pubSub' in sourceNode) {
await sourceNode.pubSub?.client.close();
if (sourceNode.pubSub?.client.isOpen) {
await sourceNode.pubSub?.client.close();
}
}
}

Expand Down
Loading