@@ -263,12 +263,6 @@ export default class RedisClusterSlots<
263263 }
264264 this.smigratedSeqIdsSeen.add(event.seqId);
265265
266- // slots = new Array<Shard<M, F, S, RESP, TYPE_MAPPING>>(RedisClusterSlots.#SLOTS);
267- // masters = new Array<MasterNode<M, F, S, RESP, TYPE_MAPPING>>();
268- // replicas = new Array<ShardNode<M, F, S, RESP, TYPE_MAPPING>>();
269- // readonly nodeByAddress = new Map<string, MasterNode<M, F, S, RESP, TYPE_MAPPING> | ShardNode<M, F, S, RESP, TYPE_MAPPING>>();
270- // pubSubNode?: PubSubNode<M, F, S, RESP, TYPE_MAPPING>;
271-
272266 const sourceAddress = `${event.source.host}:${event.source.port}`;
273267 const sourceNode = this.nodeByAddress.get(sourceAddress);
274268 if(!sourceNode) {
@@ -277,56 +271,120 @@ export default class RedisClusterSlots<
277271 }
278272
279273 // 1. Pausing
280- //TODO - check the single pubsubnode
274+ // 1.1 Normal
281275 sourceNode.client?._pause();
276+ // 1.2 Sharded pubsub
282277 if('pubSub' in sourceNode) {
283278 sourceNode.pubSub?.client._pause();
284279 }
285-
286- const destinationAddress = `${event.destination.host}:${event.destination.port}`;
287- let destinationNode = this.nodeByAddress.get(destinationAddress);
288- let destinationShard: Shard<M, F, S, RESP, TYPE_MAPPING>;
289-
290- // 2. Create new Master
291- if(!destinationNode) {
292- const promises: Promise<unknown>[] = [];
293- destinationNode = this.#initiateSlotNode({ host: event.destination.host, port: event.destination.port, id: 'asdff' }, false, true, new Set(), promises);
294- await Promise.all(promises);
295- // 2.1 Pause
296- destinationNode.client?._pause();
297- // In case destination node didnt exist, this means Shard didnt exist as well, so creating a new Shard is completely fine
298- destinationShard = {
299- master: destinationNode
300- };
301- } else {
302- // In case destination node existed, this means there was a Shard already, so its best if we can find it.
303- const existingShard = this.slots.find(shard => shard.master.host === event.destination.host && shard.master.port === event.destination.port);
304- if(!existingShard) {
305- dbgMaintenance("Could not find shard");
306- throw new Error('Could not find shard');
307- }
308- destinationShard = existingShard;
309- }
310-
311- // 3. Soft update shards
312- for(const range of event.ranges) {
313- if(typeof range === 'number') {
314- this.slots[range] = destinationShard;
315- } else {
316- for (let slot = range[0]; slot <= range[1]; slot++) {
317- this.slots[slot] = destinationShard;
318- }
319- }
320- }
321-
322- // 4. For all affected clients (normal, pubsub, spubsub):
323- // 4.1 Wait for inflight commands to complete
324- // 4.2 Extract commands, channels, sharded channels
325- // 4.3 Kill if no slots are pointing to it
326- //
327-
328- // 5. Prepend extracted commands, chans
329- // 5.1 Unpause
280+ // 1.3 Regular pubsub
281+ if(this.pubSubNode?.address === sourceAddress) {
282+ this.pubSubNode?.client._pause();
283+ }
284+
285+ // const destinationAddress = `${event.destination.host}:${event.destination.port}`;
286+ // let destinationNode = this.nodeByAddress.get(destinationAddress);
287+ // let destinationShard: Shard<M, F, S, RESP, TYPE_MAPPING>;
288+
289+ // // 2. Create new Master
290+ // // TODO create new pubsubnode if needed
291+ // if(!destinationNode) {
292+ // const promises: Promise<unknown>[] = [];
293+ // destinationNode = this.#initiateSlotNode({ host: event.destination.host, port: event.destination.port, id: 'asdff' }, false, true, new Set(), promises);
294+ // await Promise.all(promises);
295+ // // 2.1 Pause
296+ // destinationNode.client?._pause();
297+ // // In case destination node didnt exist, this means Shard didnt exist as well, so creating a new Shard is completely fine
298+ // destinationShard = {
299+ // master: destinationNode
300+ // };
301+ // } else {
302+ // // In case destination node existed, this means there was a Shard already, so its best if we can find it.
303+ // const existingShard = this.slots.find(shard => shard.master.host === event.destination.host && shard.master.port === event.destination.port);
304+ // if(!existingShard) {
305+ // dbgMaintenance("Could not find shard");
306+ // throw new Error('Could not find shard');
307+ // }
308+ // destinationShard = existingShard;
309+ // }
310+
311+ // // 3. Soft update shards.
312+ // // After this step we are expecting any new commands that hash to the same slots to be routed to the destinationShard
313+ // const movingSlots = new Set<number>();
314+ // for(const range of event.ranges) {
315+ // if(typeof range === 'number') {
316+ // this.slots[range] = destinationShard;
317+ // movingSlots.add(range)
318+ // } else {
319+ // for (let slot = range[0]; slot <= range[1]; slot++) {
320+ // this.slots[slot] = destinationShard;
321+ // movingSlots.add(slot)
322+ // }
323+ // }
324+ // }
325+
326+ // // 4. For all affected clients (normal, pubsub, spubsub):
327+ // // 4.1 Wait for inflight commands to complete
328+ // const inflightPromises: Promise<void>[] = [];
329+ // //Normal
330+ // inflightPromises.push(sourceNode.client!._getQueue().waitForInflightCommandsToComplete());
331+ // //Sharded pubsub
332+ // if('pubSub' in sourceNode) {
333+ // inflightPromises.push(sourceNode.pubSub!.client._getQueue().waitForInflightCommandsToComplete());
334+ // }
335+ // //Regular pubsub
336+ // if(this.pubSubNode?.address === sourceAddress) {
337+ // inflightPromises.push(this.pubSubNode?.client._getQueue().waitForInflightCommandsToComplete());
338+ // }
339+ // await Promise.all(inflightPromises);
340+
341+
342+ // // 4.2 Extract commands, channels, sharded channels
343+ // // TODO dont forget to extract channels and resubscribe
344+ // const sourceStillHasSlots = this.slots.find(slot => slot.master.address === sourceAddress) !== undefined;
345+ // if(sourceStillHasSlots) {
346+ // const normalCommandsToMove = sourceNode.client!._getQueue().extractCommandsForSlots(movingSlots);
347+ // // 5. Prepend extracted commands, chans
348+ // //TODO pubsub, spubsub
349+ // destinationNode.client?._getQueue().prependCommandsToWrite(normalCommandsToMove);
350+
351+ // //unpause source node clients
352+ // sourceNode.client?._unpause();
353+ // if('pubSub' in sourceNode) {
354+ // sourceNode.pubSub?.client._unpause();
355+ // }
356+ // //TODO pubSubNode?
357+ // } else {
358+
359+ // const normalCommandsToMove = sourceNode.client!._getQueue().getAllCommands();
360+ // // 5. Prepend extracted commands, chans
361+ // destinationNode.client?._getQueue().prependCommandsToWrite(normalCommandsToMove);
362+ // if('pubSub' in destinationNode) {
363+ // // const pubsubListeners = destinationNode.pubSub?.client._getQueue().removePubSubListenersForSlots(movingSlots);
364+ // //TODO resubscribe. Might need to throw an event for cluster to do the job
365+ // }
366+ // //TODO pubSubNode?
367+
368+ // //Cleanup
369+ // this.masters = this.masters.filter(master => master.address !== sourceAddress);
370+ // //not sure if needed, since there should be no replicas in RE
371+ // this.replicas = this.replicas.filter(replica => replica.address !== sourceAddress);
372+ // this.nodeByAddress.delete(sourceAddress);
373+ // //TODO pubSubNode?
374+
375+ // // 4.3 Kill because no slots are pointing to it anymore
376+ // await sourceNode.client?.close()
377+ // if('pubSub' in sourceNode) {
378+ // await sourceNode.pubSub?.client.close();
379+ // }
380+ // //TODO pubSubNode?
381+ // }
382+
383+ // // 5.1 Unpause
384+ // destinationNode.client?._unpause();
385+ // if('pubSub' in destinationNode) {
386+ // destinationNode.pubSub?.client._unpause();
387+ // }
330388
331389 }
332390
0 commit comments