Skip to content

Conversation

@hongyunyan
Copy link
Collaborator

@hongyunyan hongyunyan commented Feb 11, 2026

What problem does this PR solve?

Issue Number: close #xxx

What is changed and how it works?

Check List

Tests

  • Unit test
  • Integration test
  • Manual test (add detailed scripts or steps below)
  • No code

Questions

Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?

Release note

Please refer to [Release Notes Language Style Guide](https://pingcap.github.io/tidb-dev-guide/contribute-to-tidb/release-notes-style-guide.html) to write a quality release note.

If you don't think this PR needs a release note then fill it with `None`.

Summary by CodeRabbit

Release Notes

  • New Features

    • Added node draining capability to gracefully take nodes offline for maintenance
    • Introduced automatic workload migration from draining nodes to available nodes
    • Added new API method to initiate node drain operations and retrieve remaining work status
    • Implemented node liveness tracking with distinct states for normal operation and maintenance phases
  • Improvements

    • Enhanced node lifecycle management with monotonic state transitions during maintenance operations

@ti-chi-bot ti-chi-bot bot added do-not-merge/needs-linked-issue do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. release-note Denotes a PR that will be considered when it comes time to generate release notes. labels Feb 11, 2026
@ti-chi-bot
Copy link

ti-chi-bot bot commented Feb 11, 2026

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign wlwilliamx for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@ti-chi-bot ti-chi-bot bot added the size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. label Feb 11, 2026
@coderabbitai
Copy link

coderabbitai bot commented Feb 11, 2026

📝 Walkthrough

Walkthrough

The PR implements a node drain/stopping lifecycle feature with monotonic liveness progression. It introduces drain state tracking at the coordinator level, extends the maintainer manager with node-level liveness binding, adds a drain scheduler for migrating maintainers from draining nodes, and implements coordinated heartbeat and liveness-based communication between nodes and the coordinator via new message types.

Changes

Cohort / File(s) Summary
Drain Controller Infrastructure
coordinator/drain_controller.go, coordinator/drain_controller_test.go
Introduces new drain controller component with node liveness tracking (Alive/Draining/Stopping), TTL-based derivation, heartbeat processing, per-node drain state management, and lifecycle actions for marking drain/stop observations. Includes unit tests for liveness view TTL and drain remaining guard logic.
Coordinator Integration
coordinator/controller.go, coordinator/coordinator.go
Wires drain controller into coordinator's scheduler setup, adds RequestDrain and DrainSummary public methods to Controller, integrates drain-related processing into node heartbeat and liveness response flows, and implements DrainNode method for triggering and summarizing drain operations.
Scheduler & Node Selection
coordinator/scheduler/drain.go, coordinator/scheduler/drain_test.go, coordinator/scheduler/basic.go, coordinator/scheduler/balance.go
Introduces new drain scheduler for moving maintainers off draining nodes in batches. Refactors basic and balance schedulers to use injectable schedulable-node provider functions instead of direct NodeManager dependency, enabling dynamic node selection and drain-aware scheduling.
Liveness State Management
pkg/api/util.go, pkg/api/util_liveness_test.go
Introduces monotonic liveness progression with new LivenessCaptureDraining state; shifts LivenessCaptureStopping to enforce non-decreasing state transitions via compare-and-swap. Updates String() to handle new Draining state and adds test validating monotonic transitions.
Proto & Messaging Definitions
heartbeatpb/heartbeat.proto, pkg/messaging/message.go, pkg/scheduler/scheduler.go, pkg/server/coordinator.go
Adds new NodeLiveness enum (ALIVE, DRAINING, STOPPING) and related message types (NodeHeartbeat, SetNodeLivenessRequest, SetNodeLivenessResponse) to proto. Extends messaging layer with new IOType constants and decoding/encoding support. Adds DrainScheduler constant and CoordinatorInterface.DrainNode method.
Maintainer & Heartbeat Handling
maintainer/maintainer_manager.go, maintainer/maintainer_manager_liveness_test.go
Adds node-level liveness binding via new NewMaintainerManagerWithLiveness constructor. Extends message handling for SetNodeLivenessRequest, implements periodic NodeHeartbeat emission with current liveness and node epoch, adds liveness-based rejection of addMaintainer requests when stopping.
Server Integration
server/server.go, server/module_election.go
Integrates liveness with maintainer manager initialization. Adds synchronization guards (mutexes) for campaign flows, introduces watchLivenessAndResign watcher for liveness-based resignation, replaces exact stopping checks with monotonic comparisons (>= Draining), and enhances logging with liveness state context.
Operator Controller
coordinator/operator/operator_controller.go, coordinator/operator/operator_controller_test.go
Adds new public method CountOperatorsByNode to count in-flight operators affecting a target node. Updates test setup to inject MockMessageCenter and register NodeManager in app context for drain-related interactions.
API Drain Flow
api/v1/api.go
Replaces atomic counter-based drain logic with real drain flow: validates input, obtains and verifies coordinator, invokes DrainNode, and returns remaining work count. Simplifies response to single accepted response with computed remaining value.

Sequence Diagram(s)

sequenceDiagram
    participant Node as Node
    participant Coord as Coordinator
    participant DCtrl as DrainController
    participant Sched as DrainScheduler
    participant OpCtrl as OperatorController

    Node->>Coord: NodeHeartbeat (Draining liveness)
    Coord->>DCtrl: handleNodeHeartbeat
    DCtrl->>DCtrl: markDrainingObserved
    activate DCtrl
    DCtrl->>DCtrl: Update drain state
    deactivate DCtrl
    
    Sched->>DCtrl: GetDrainingNodes()
    DCtrl-->>Sched: [node_id]
    
    Sched->>Sched: Iterate draining nodes
    Sched->>OpCtrl: CountOperatorsByNode(draining_node)
    OpCtrl-->>Sched: in_flight_count
    
    alt Changefeed not already moved
        Sched->>OpCtrl: CreateOperator(MoveMaintainer)
        OpCtrl->>OpCtrl: Schedule move
    end
    
    Note over Sched,OpCtrl: Repeat for next execution interval
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Suggested labels

lgtm, size/XXL

Suggested reviewers

  • wk989898
  • lidezhu
  • flowbehappy

Poem

🐰 A drain state blooms, from Alive to Draining we go,
Monotonic progress, a measured and graceful flow,
Schedulers shuffle maintainers with care,
Through heartbeats and liveness, we glide through the air! ✨

🚥 Pre-merge checks | ❌ 3
❌ Failed checks (2 warnings, 1 inconclusive)
Check name Status Explanation Resolution
Description check ⚠️ Warning The PR description is incomplete; it contains only template placeholders with 'Issue Number: close #xxx' unfilled and 'What is changed and how it works?' section left empty. Complete the description with the actual issue number, a detailed explanation of changes, test coverage confirmation, and a non-placeholder release note.
Docstring Coverage ⚠️ Warning Docstring coverage is 14.29% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Title check ❓ Inconclusive The title 'wip-drain-capture-5.3-codex' is vague and uses non-descriptive terminology (WIP prefix, unclear abbreviations) that does not convey meaningful information about the changeset. Replace with a clear, descriptive title that summarizes the main feature or fix, e.g., 'Implement node drain controller for coordinated maintenance migration' or similar.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@gemini-code-assist
Copy link

Summary of Changes

Hello @hongyunyan, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a comprehensive node draining mechanism, allowing for graceful shutdown or maintenance of individual nodes within the system. It achieves this by implementing a new drainController in the coordinator, integrating it with existing and new schedulers, and establishing a robust node liveness communication protocol between the coordinator and maintainers. The changes enable the system to automatically migrate workloads from nodes marked for draining, ensuring service continuity and preventing new workloads from being assigned to unstable nodes.

Highlights

  • New Node Draining Mechanism: A comprehensive node draining mechanism has been introduced, allowing for graceful shutdown or maintenance of individual nodes. This includes a new drainController in the coordinator to manage the draining process and track node liveness states (Alive, Draining, Stopping).
  • Implemented Drain API: The /api/v1/captures/drain endpoint is now fully functional. It interacts with the new drainController to initiate node draining and provides real-time status updates on remaining work.
  • Scheduler Integration: Existing schedulers (basicScheduler, balanceScheduler) have been updated to integrate with the new drainController, ensuring they respect node liveness. A new drainScheduler has also been added to specifically handle the migration of changefeed maintainers from nodes marked for draining.
  • Node Liveness Protocol: New protobuf messages and an enum (NodeLiveness, NodeHeartbeat, SetNodeLivenessRequest, SetNodeLivenessResponse) have been added to facilitate robust communication of node liveness states between the coordinator and maintainers.
  • Maintainer Manager Enhancements: The maintainer/maintainer_manager.go now sends node heartbeats to the coordinator, processes SetNodeLivenessRequest messages to update its internal liveness state, and prevents new maintainer assignments if the node is in a stopping state.
  • Liveness-Aware Election: The coordinator and log coordinator election process now considers the node's liveness state. Nodes in a draining or stopping state will not campaign for leadership, and active leaders will resign if they transition to a stopping state.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • api/v1/api.go
    • Removed sync/atomic import.
    • Implemented drainCapture API logic.
    • Removed drainCaptureCounter.
  • coordinator/controller.go
    • Added drainController field.
    • Initialized drainController and DrainScheduler.
    • Updated BasicScheduler and BalanceScheduler to use drainController for schedulable nodes.
    • Added handleDrainOnPeriodTask.
    • Implemented RequestDrain and DrainSummary methods.
  • coordinator/coordinator.go
    • Implemented DrainNode method to interact with the new drainController.
  • coordinator/drain_controller.go
    • Added new file, implementing drainController logic, nodeLivenessView, and related structs and methods for managing node draining.
  • coordinator/drain_controller_test.go
    • Added new file, containing unit tests for nodeLivenessView and drainRemaining logic.
  • coordinator/operator/operator_controller.go
    • Added CountOperatorsByNode method to count in-flight operators affecting a specific node.
  • coordinator/operator/operator_controller_test.go
    • Updated test setup to include mock message center and node manager services.
  • coordinator/scheduler/balance.go
    • Modified balanceScheduler to use a getSchedulableNodes function instead of directly accessing nodeManager, integrating with the new drain logic.
  • coordinator/scheduler/basic.go
    • Modified basicScheduler to use a getSchedulableNodeIDs function instead of directly accessing nodeManager, integrating with the new drain logic.
  • coordinator/scheduler/drain.go
    • Added new file, implementing drainScheduler and DrainController interface for migrating maintainers from draining nodes.
  • coordinator/scheduler/drain_test.go
    • Added new file, containing unit tests for drainScheduler.
  • heartbeatpb/heartbeat.pb.go
    • Generated new protobuf code to include NodeLiveness enum and related heartbeat messages (NodeHeartbeat, SetNodeLivenessRequest, SetNodeLivenessResponse).
    • Updated descriptor indices for existing messages.
  • heartbeatpb/heartbeat.proto
    • Defined new protobuf messages and an enum for node liveness states and heartbeat communication.
  • maintainer/maintainer_manager.go
    • Imported pkg/api.
    • Added serverLiveness, nodeEpoch, nodeHeartbeatInterval, lastNodeHeartbeatTime fields.
    • Updated NewMaintainerManager to use NewMaintainerManagerWithLiveness.
    • Added TypeSetNodeLivenessRequest to message handling.
    • Updated sendMessages to accept messaging.IOTypeT.
    • Added logic to reject AddMaintainerRequest if node is stopping.
    • Implemented onSetNodeLivenessRequest, sendNodeHeartbeat, serverLivenessToHeartbeat, and heartbeatNodeLivenessToServer functions.
  • maintainer/maintainer_manager_liveness_test.go
    • Added new file, containing unit tests for the maintainer manager's liveness handling.
  • pkg/api/util.go
    • Added LivenessCaptureDraining to Liveness enum.
    • Updated Store method to ensure monotonic liveness transitions.
    • Updated String() method to include Draining state.
  • pkg/api/util_liveness_test.go
    • Added new file, containing unit tests for the monotonic behavior of the Liveness enum's Store method.
  • pkg/messaging/message.go
    • Added new message types for node heartbeat and liveness requests/responses.
    • Updated string representations and decoding logic for new message types.
  • pkg/scheduler/scheduler.go
    • Added DrainScheduler constant.
  • pkg/server/coordinator.go
    • Extended the Coordinator interface with a DrainNode method.
  • server/module_election.go
    • Added sync.Mutex for coordinator and log coordinator election.
    • Added watchLivenessAndResign goroutine.
    • Modified campaignCoordinator and campaignLogCoordinator to respect forbiddenLiveness (Draining/Stopping) and stopLiveness (Stopping) states.
    • Updated resign logic to be liveness-aware.
  • server/server.go
    • Configured the maintainer manager to use the server's liveness state.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@ti-chi-bot
Copy link

ti-chi-bot bot commented Feb 11, 2026

[FORMAT CHECKER NOTIFICATION]

Notice: To remove the do-not-merge/needs-linked-issue label, please provide the linked issue number on one line in the PR body, for example: Issue Number: close #123 or Issue Number: ref #456.

📖 For more info, you can check the "Contribute Code" section in the development guide.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
server/module_election.go (1)

352-360: ⚠️ Potential issue | 🟠 Major

Swapped log messages: timeout case logs "failed" and vice versa.

  • Line 353: the DeadlineExceeded branch (actual timeout) logs "log coordinator resign failed".
  • Line 358: the non-timeout branch logs "log coordinator resign timeout".

These are swapped.

Proposed fix
 	if resignErr := e.logElection.Resign(resignCtx); resignErr != nil {
 		if errors.Is(errors.Cause(resignErr), context.DeadlineExceeded) {
-			log.Info("log coordinator resign failed",
+			log.Warn("log coordinator resign timeout",
 				zap.String("nodeID", nodeID), zap.Error(resignErr))
 			cancel()
 			return errors.Trace(resignErr)
 		}
-		log.Warn("log coordinator resign timeout",
+		log.Warn("log coordinator resign failed",
 			zap.String("nodeID", nodeID), zap.Error(resignErr))
 	}
🤖 Fix all issues with AI agents
In `@coordinator/drain_controller.go`:
- Around line 491-496: Remove the custom max function in drain_controller.go
(the func max(a, b int) int { ... }) that shadows the built-in; delete this
user-defined max and update any callers (if any) to rely on the Go built-in max
provided by the language/runtime (no other code changes required beyond removing
the function definition).

In `@coordinator/scheduler/drain_test.go`:
- Around line 46-94: Test mutates the map returned by
nodeManager.GetAliveNodes(), violating its contract; add a safe test helper API
on watcher.NodeManager (e.g., func (m *NodeManager) AddAliveNode(n *node.Info)
or RegisterNode) that properly inserts a node into the manager's internal map,
update TestDrainSchedulerSkipsInFlightAndSchedulesMove to call that helper
instead of writing to nodeManager.GetAliveNodes()[...], and update other tests
using the same pattern to use the new AddAliveNode/RegisterNode method; ensure
the new method encapsulates any locking/atomic semantics so callers never modify
the internal map directly.

In `@server/module_election.go`:
- Around line 201-209: Add the same stop-liveness guard used in
campaignCoordinator to the campaignLogCoordinator loop: check
e.svr.liveness.Load() >= stopLiveness after the normal coordinator exit path in
campaignLogCoordinator and return nil if true; log the exit similarly (use
nodeID and the liveness value string via (&liveness).String()) so the log
coordinator does not re-campaign while the node is stopping.
- Around line 73-74: The coordinatorMu is being held for the entire campaign
loop which blocks other callers and doesn't synchronize with concurrent resign
calls; change locking so coordinatorMu only wraps the actual resign operations
rather than the full loop: remove the defer e.coordinatorMu.Unlock() that spans
the loop in the function containing the campaign loop, and instead
acquire/release e.coordinatorMu around each call site that invokes e.resign() or
directly calls e.election.Resign (including inside watchLivenessAndResign); do
the same for logMu in campaignLogCoordinator so each log/resign action is
protected rather than the whole loop.
- Around line 266-269: In the block where you call e.resignLogCoordinator(), the
function returns errors.Trace(err) even when resignLogCoordinator() fails
(resignErr is non-nil) — change the return to propagate the resignErr instead of
err (e.g., return errors.Trace(resignErr)) so the actual resign failure is
returned; update the same pattern seen earlier around line 134 to ensure
resignErr is used wherever resignLogCoordinator() failures are returned.
- Around line 132-134: In the resign-handling block inside the election flow
(where resignErr := e.resign(ctx) is checked), the code currently returns
errors.Trace(err) which is nil; change the return to propagate the actual resign
error by returning errors.Trace(resignErr) so the caller sees the failure from
e.resign; ensure the logged message still uses resignErr (zap.Error(resignErr))
and no longer swallows the error.
- Around line 95-101: Multiple places call e.svr.liveness.Load() twice leading
to TOCTOU mismatches (e.g., the branch that checks liveness then separately
calls Load() again for logging in the block around the log.Info call); change
each pattern to call e.svr.liveness.Load() once into a local variable (e.g.,
liveness := e.svr.liveness.Load()) and reuse that variable for both the
conditional check and for logging (convert that single local to a string for
zap.String("liveness", ...) as needed). Apply this same single-load fix to all
occurrences of the double-load pattern you flagged (the blocks around the
log.Info calls at the other locations).
🧹 Nitpick comments (15)
server/module_election.go (3)

96-97: (&liveness).String() is unnecessarily verbose.

Since liveness is a local variable (addressable), Go allows calling a pointer-receiver method directly: liveness.String(). The explicit address-of is not needed and hurts readability.

Also applies to: 127-128, 203-204, 236-237, 261-262, 319-320


317-326: Defer cancel() to prevent context leak on unexpected panic.

If e.resign panics, cancel() on line 326 is never reached. Since this runs in a loop, wrapping the block in a helper or using defer in a closure is safer:

Proposed fix
-		resignCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
-		if err := e.resign(resignCtx); err != nil {
-			liveness := e.svr.liveness.Load()
-			livenessStr := (&liveness).String()
-			log.Warn("resign coordinator on stopping liveness failed",
-				zap.String("nodeID", nodeID),
-				zap.String("liveness", livenessStr),
-				zap.Error(err))
-		}
-		cancel()
+		func() {
+			resignCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+			defer cancel()
+			if err := e.resign(resignCtx); err != nil {
+				liveness := e.svr.liveness.Load()
+				log.Warn("resign coordinator on stopping liveness failed",
+					zap.String("nodeID", nodeID),
+					zap.String("liveness", liveness.String()),
+					zap.Error(err))
+			}
+		}()

299-329: Log coordinator resignation is handled in campaignLogCoordinator, not watchLivenessAndResign.

watchLivenessAndResign only resigns the coordinator role via e.resign(). The log coordinator, however, is resigned within the campaignLogCoordinator() goroutine immediately after it campaigns successfully—if liveness reaches forbiddenLiveness (draining/stopping), it calls resignLogCoordinator() and returns (lines 260-270). This design separates concerns: each campaign function monitors liveness for its own role. Consider whether this split responsibility is sufficiently clear in the code, or if consolidating liveness-based resignation logic would improve maintainability.

pkg/api/util.go (1)

185-195: Pre-existing race: String() reads *l non-atomically.

The new Draining case is fine, but String() dereferences *l directly (line 186: switch *l) instead of using l.Load(). This is a data race if called concurrently with Store(). While this is pre-existing, it's worth fixing since you're already touching this block.

♻️ Proposed fix
 func (l *Liveness) String() string {
-	switch *l {
+	switch l.Load() {
 	case LivenessCaptureAlive:
 		return "Alive"
 	case LivenessCaptureDraining:
pkg/api/util_liveness_test.go (1)

22-34: Good coverage of the core monotonic invariant.

Consider adding edge cases for completeness: idempotent store (Draining → Draining returns true), out-of-bounds values (Liveness(-1), Liveness(3) return false), and intermediate regression (Stopping → Draining).

coordinator/drain_controller_test.go (1)

45-68: Remove unused now variable.

now (line 47) is declared but only consumed by a blank identifier on line 68. Just remove both lines.

♻️ Proposed fix
 func TestDrainRemainingGuard(t *testing.T) {
 	nodeID := node.ID("node-1")
-	now := time.Now()
 	remaining := drainRemaining{
 		targetNode:                 nodeID,
 		maintainersOnTarget:        0,
@@ -65,4 +64,3 @@
 	remaining.stoppingObserved = true
 	require.Equal(t, 0, remaining.remaining())
-
-	_ = now
 }
api/v1/api.go (1)

191-225: Drain capture rewrite looks solid overall.

The validation, coordinator check, and structured logging are well done. One minor note:

Line 213 uses errors.New(...) to produce a raw, untyped error. Other error paths in this handler use normalized errors like ErrAPIInvalidParam. Consider using a normalized error (e.g., errors.ErrSchedulerRequestFailed) so callers get a consistent RFC error code in the response.

Proposed fix
-	_ = c.Error(errors.New("coordinator is not fully initialized, wait a moment"))
+	_ = c.Error(errors.ErrSchedulerRequestFailed.GenWithStackByArgs("coordinator is not fully initialized, wait a moment"))
coordinator/controller.go (1)

267-276: handleDrainOnPeriodTask runs under nodeChanged mutex.

handleDrainOnPeriodTask (which may send messages via sendSetNodeLivenessRequest) runs while nodeChanged.Lock() is held. If message sending ever blocks (e.g., full channel), this would delay node-change detection for other goroutines calling checkOnNodeChanged.

Currently the operations look lightweight, so this is likely fine, but consider moving the drain task outside the lock scope if drain processing grows more complex.

Possible restructuring
 func (c *Controller) checkOnNodeChanged(ctx context.Context) {
 	c.nodeChanged.Lock()
-	defer c.nodeChanged.Unlock()
-
 	if c.nodeChanged.changed {
 		c.onNodeChanged(ctx)
 		c.nodeChanged.changed = false
 	}
+	c.nodeChanged.Unlock()
+
 	c.handleDrainOnPeriodTask()
 }
coordinator/scheduler/drain.go (1)

62-106: Drain scheduling loop is well-structured; consider adding a summary log for observability.

The Execute() logic correctly handles batch limits, rotating cursor for fairness, and least-loaded destination selection. One observation:

When moves are created during a drain cycle, there's no log output. For a 200ms interval this is understandable, but a summary log (e.g., at Info level when created > 0) would help operators monitor drain progress.

💡 Optional: add a summary log after the loop
 	s.lastNodeCursor = (s.lastNodeCursor + 1) % len(drainingNodes)
+	if created > 0 {
+		log.Info("drain scheduler created move operators",
+			zap.Int("created", created),
+			zap.Int("drainingNodes", len(drainingNodes)))
+	}
 	return now.Add(drainScheduleInterval)
maintainer/maintainer_manager.go (2)

247-252: Consider also rejecting AddMaintainer when the node is draining, not only stopping.

Currently, new maintainers are only rejected when LivenessCaptureStopping. A draining node is actively shedding maintainers via the drain scheduler, so accepting new ones from other schedulers would be counterproductive. Unless this is intentionally deferred, adding LivenessCaptureDraining to the guard would prevent scheduling churn.

Suggested change
 func (m *Manager) onAddMaintainerRequest(req *heartbeatpb.AddMaintainerRequest) *heartbeatpb.MaintainerStatus {
-	if m.serverLiveness.Load() == api.LivenessCaptureStopping {
-		log.Warn("reject add maintainer request because node is stopping",
+	if lv := m.serverLiveness.Load(); lv == api.LivenessCaptureStopping || lv == api.LivenessCaptureDraining {
+		log.Warn("reject add maintainer request because node is draining or stopping",
 			zap.Stringer("changefeedID", common.NewChangefeedIDFromPB(req.Id)))
 		return nil
 	}

309-355: Redundant Load() calls after Store() in liveness transition logic.

Lines 330-333 call Load() before and after Store(), then compare. Since Store() already returns whether the transition succeeded (monotonic check), the extra loads are unnecessary. You could simplify:

Simplified transition detection
-			before := m.serverLiveness.Load()
-			if m.serverLiveness.Store(target) {
-				after := m.serverLiveness.Load()
-				transitioned = before != after
-				if transitioned {
-					beforeStr := (&before).String()
-					afterStr := (&after).String()
-					log.Info("node liveness updated",
-						zap.String("from", beforeStr),
-						zap.String("to", afterStr),
-						zap.Uint64("nodeEpoch", m.nodeEpoch))
-				}
-			}
+			before := m.serverLiveness.Load()
+			if m.serverLiveness.Store(target) {
+				transitioned = true
+				log.Info("node liveness updated",
+					zap.String("from", before.String()),
+					zap.String("to", target.String()),
+					zap.Uint64("nodeEpoch", m.nodeEpoch))
+			}
coordinator/drain_controller.go (4)

194-215: remaining() logic is correct but complex — consider adding inline comments explaining the state transitions.

The method returns a positive value as long as the drain lifecycle hasn't fully completed (draining observed → work drained → stopping observed). The key invariant — never returning 0 until both stoppingObserved and no remaining work — is correctly maintained. A brief comment per branch would improve readability for future maintainers.


355-384: getDrainingNodes acquires c.mu then calls into nodeView (which acquires v.mu.RLock).

This nested lock acquisition is safe because handleNodeHeartbeat acquires v.mu.Lock and c.mu sequentially (never nested). However, this lock ordering should be documented to prevent future regressions where someone adds nested locking in the reverse order.


414-428: Error from SendCommand is logged but not wrapped with errors.Trace.

Per coding guidelines, errors from library calls should be wrapped immediately. While the error is only logged here and not propagated, wrapping it would attach a stack trace for more useful diagnostics.

Suggested fix
+	"github.com/pingcap/ticdc/pkg/errors"
 ...
 	if err != nil {
 		log.Warn("failed to send set node liveness request",
 			zap.Stringer("targetNode", targetNode),
 			zap.Uint64("nodeEpoch", nodeStatus.nodeEpoch),
 			zap.Int32("targetLiveness", int32(liveness)),
-			zap.Error(err))
+			zap.Error(errors.Trace(err)))
 	}

As per coding guidelines, "When an error comes from a third-party or library call in Go, wrap it immediately with errors.Trace(err) or errors.WrapError(...) to attach a stack trace".


59-65: Consider adding lock or concurrency annotations to nodeLivenessStatus.

Fields of nodeLivenessStatus are mutated under nodeLivenessView.mu, but the struct itself has no documentation about its concurrency contract. A brief comment noting it's guarded by nodeLivenessView.mu would help prevent misuse.

Comment on lines +491 to +496
func max(a, b int) int {
if a > b {
return a
}
return b
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

cat go.mod | head -10

Repository: pingcap/ticdc

Length of output: 306


Remove the custom max function and use Go's built-in instead.

The project targets Go 1.25.5, which has had max as a built-in since Go 1.21. The custom definition at lines 491-496 is unnecessary and shadows the built-in. Delete this function.

🤖 Prompt for AI Agents
In `@coordinator/drain_controller.go` around lines 491 - 496, Remove the custom
max function in drain_controller.go (the func max(a, b int) int { ... }) that
shadows the built-in; delete this user-defined max and update any callers (if
any) to rely on the Go built-in max provided by the language/runtime (no other
code changes required beyond removing the function definition).

Comment on lines +46 to +94
func TestDrainSchedulerSkipsInFlightAndSchedulesMove(t *testing.T) {
changefeedDB := changefeed.NewChangefeedDB(1)
ctrl := gomock.NewController(t)
backend := mock_changefeed.NewMockBackend(ctrl)
t.Cleanup(ctrl.Finish)
mc := messaging.NewMockMessageCenter()
appcontext.SetService(appcontext.MessageCenter, mc)
nodeManager := watcher.NewNodeManager(nil, nil)
appcontext.SetService(watcher.NodeManagerName, nodeManager)
selfNode := node.NewInfo("127.0.0.1:8300", "")
nodeManager.GetAliveNodes()[selfNode.ID] = selfNode
destNode := node.NewInfo("127.0.0.1:8301", "")
nodeManager.GetAliveNodes()[destNode.ID] = destNode

oc := operator.NewOperatorController(selfNode, changefeedDB, backend, 16)

cfID1 := common.NewChangeFeedIDWithName("cf-1", common.DefaultKeyspaceName)
cfID2 := common.NewChangeFeedIDWithName("cf-2", common.DefaultKeyspaceName)
cf1 := changefeed.NewChangefeed(cfID1, &config.ChangeFeedInfo{
ChangefeedID: cfID1,
Config: config.GetDefaultReplicaConfig(),
SinkURI: "mysql://127.0.0.1:3306",
}, 10, false)
cf2 := changefeed.NewChangefeed(cfID2, &config.ChangeFeedInfo{
ChangefeedID: cfID2,
Config: config.GetDefaultReplicaConfig(),
SinkURI: "mysql://127.0.0.1:3306",
}, 10, false)

target := node.ID("drain-node")
changefeedDB.AddReplicatingMaintainer(cf1, target)
changefeedDB.AddReplicatingMaintainer(cf2, target)

// One in-flight operator already exists, drain scheduler should skip it.
require.True(t, oc.AddOperator(oc.NewMoveMaintainerOperator(cf1, target, destNode.ID)))

s := NewDrainScheduler(16, oc, changefeedDB, &mockDrainController{
drainingNodes: []node.ID{target},
destNodes: map[node.ID]*node.Info{
destNode.ID: destNode,
},
})

s.Execute()

// We already had one operator for cf1, now scheduler should add one for cf2.
require.NotNil(t, oc.GetOperator(cf1.ID))
require.NotNil(t, oc.GetOperator(cf2.ID))
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Find GetAliveNodes implementation
fd -t f "\.go$" | xargs grep -l "GetAliveNodes" | head -5

Repository: pingcap/ticdc

Length of output: 246


🏁 Script executed:

#!/bin/bash
# Search for GetAliveNodes function definition
ast-grep --pattern $'func ($_ ) GetAliveNodes() $_ {
  $$$
}'

Repository: pingcap/ticdc

Length of output: 271


🏁 Script executed:

#!/bin/bash
# Also search with ripgrep for GetAliveNodes definition
rg "func.*GetAliveNodes\(\)" -A 5

Repository: pingcap/ticdc

Length of output: 454


🏁 Script executed:

#!/bin/bash
# Check for proper node registration methods in NodeManager
rg "func \(c \*NodeManager\)" server/watcher/module_node_manager.go | head -20

Repository: pingcap/ticdc

Length of output: 592


🏁 Script executed:

#!/tool/bin/bash
# Look for any AddNode or registration methods
rg "AddNode|Register.*Node|add.*Node" server/watcher/module_node_manager.go

Repository: pingcap/ticdc

Length of output: 147


🏁 Script executed:

#!/bin/bash
# Check the NodeManager struct definition to understand how nodes are stored
rg "type NodeManager struct" -A 10 server/watcher/module_node_manager.go

Repository: pingcap/ticdc

Length of output: 317


🏁 Script executed:

#!/bin/bash
# Look at how nodes are updated in NodeManager (e.g., in Tick or Run methods)
rg "c\.nodes\." server/watcher/module_node_manager.go | head -15

Repository: pingcap/ticdc

Length of output: 163


🏁 Script executed:

#!/bin/bash
# Check other test files to see how they register nodes with NodeManager
rg -l "NewNodeManager" | xargs grep -A 10 "NewNodeManager" | grep -A 10 "Alive\|nodes" | head -30

Repository: pingcap/ticdc

Length of output: 2650


🏁 Script executed:

#!/bin/bash
# Check if there are any proper APIs for updating nodes in tests
rg "nodeManager\." maintainer/maintainer_test.go | head -10

Repository: pingcap/ticdc

Length of output: 95


🏁 Script executed:

#!/bin/bash
# Let's verify how c.nodes.Load() works and confirm the return type
cat -n server/watcher/module_node_manager.go | sed -n '140,150p'

Repository: pingcap/ticdc

Length of output: 448


🏁 Script executed:

#!/bin/bash
# Count how many test files use this pattern
rg "GetAliveNodes\(\)\[" --type go | wc -l

Repository: pingcap/ticdc

Length of output: 59


🏁 Script executed:

#!/bin/bash
# Check if there's a comment in the original NodeManager about this being a test-only concern
rg "GetAliveNodes\(\)" server/watcher/module_node_manager.go -B 2 -A 5

Repository: pingcap/ticdc

Length of output: 414


GetAliveNodes() violates its documented API contract.

Lines 56–58 directly mutate the map returned by GetAliveNodes(), but the function's documentation explicitly states "the caller mustn't modify the returned map." The underlying implementation returns a pointer to the internal atomic map rather than a copy, making mutation possible but contractually forbidden.

No public API exists for properly registering nodes in tests; this mutating pattern is used across 80 test instances in the codebase. While this indicates the practice is widespread and pragmatically accepted, the test should either enforce the contract or the API should be redesigned to make mutation safe for tests.

🤖 Prompt for AI Agents
In `@coordinator/scheduler/drain_test.go` around lines 46 - 94, Test mutates the
map returned by nodeManager.GetAliveNodes(), violating its contract; add a safe
test helper API on watcher.NodeManager (e.g., func (m *NodeManager)
AddAliveNode(n *node.Info) or RegisterNode) that properly inserts a node into
the manager's internal map, update
TestDrainSchedulerSkipsInFlightAndSchedulesMove to call that helper instead of
writing to nodeManager.GetAliveNodes()[...], and update other tests using the
same pattern to use the new AddAliveNode/RegisterNode method; ensure the new
method encapsulates any locking/atomic semantics so callers never modify the
internal map directly.

Comment on lines +73 to +74
e.coordinatorMu.Lock()
defer e.coordinatorMu.Unlock()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Mutex held for the entire (potentially infinite) campaign loop – no real synchronization with watchLivenessAndResign.

coordinatorMu is acquired at entry and held via defer for the life of the loop, which may never return. This means:

  1. The lock is never available for anyone else, so it provides no concurrency protection.
  2. watchLivenessAndResign (line 318) calls e.resign() concurrently without acquiring this mutex, so e.election.Resign can race with the resign calls inside this function (lines 132, 168).

If the intent is to prevent concurrent Resign calls on the same Election, the mutex should guard each resign call site (including the watcher), not the entire loop. The same applies to logMu in campaignLogCoordinator.

🤖 Prompt for AI Agents
In `@server/module_election.go` around lines 73 - 74, The coordinatorMu is being
held for the entire campaign loop which blocks other callers and doesn't
synchronize with concurrent resign calls; change locking so coordinatorMu only
wraps the actual resign operations rather than the full loop: remove the defer
e.coordinatorMu.Unlock() that spans the loop in the function containing the
campaign loop, and instead acquire/release e.coordinatorMu around each call site
that invokes e.resign() or directly calls e.election.Resign (including inside
watchLivenessAndResign); do the same for logMu in campaignLogCoordinator so each
log/resign action is protected rather than the whole loop.

Comment on lines +95 to 101
if e.svr.liveness.Load() >= forbiddenLiveness {
liveness := e.svr.liveness.Load()
livenessStr := (&liveness).String()
log.Info("do not campaign coordinator, liveness is not alive",
zap.String("nodeID", nodeID),
zap.String("liveness", livenessStr))
return nil
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

TOCTOU: double liveness.Load() may observe different values.

The first Load() (line 95) gates entry, the second (line 96) fetches the value for logging. Between the two calls the value can change, so the logged value may not match the condition that triggered the branch. Load once and reuse:

Proposed fix
-	if e.svr.liveness.Load() >= forbiddenLiveness {
-		liveness := e.svr.liveness.Load()
-		livenessStr := (&liveness).String()
+	if liveness := e.svr.liveness.Load(); liveness >= forbiddenLiveness {
+		livenessStr := liveness.String()
 		log.Info("do not campaign coordinator, liveness is not alive",
 			zap.String("nodeID", nodeID),
 			zap.String("liveness", livenessStr))
 		return nil
 	}

The same double-load pattern appears at lines 126-128, 202-204, 235-237, 260-262, and 319-320. Please fix all occurrences consistently.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if e.svr.liveness.Load() >= forbiddenLiveness {
liveness := e.svr.liveness.Load()
livenessStr := (&liveness).String()
log.Info("do not campaign coordinator, liveness is not alive",
zap.String("nodeID", nodeID),
zap.String("liveness", livenessStr))
return nil
if liveness := e.svr.liveness.Load(); liveness >= forbiddenLiveness {
livenessStr := liveness.String()
log.Info("do not campaign coordinator, liveness is not alive",
zap.String("nodeID", nodeID),
zap.String("liveness", livenessStr))
return nil
}
🤖 Prompt for AI Agents
In `@server/module_election.go` around lines 95 - 101, Multiple places call
e.svr.liveness.Load() twice leading to TOCTOU mismatches (e.g., the branch that
checks liveness then separately calls Load() again for logging in the block
around the log.Info call); change each pattern to call e.svr.liveness.Load()
once into a local variable (e.g., liveness := e.svr.liveness.Load()) and reuse
that variable for both the conditional check and for logging (convert that
single local to a string for zap.String("liveness", ...) as needed). Apply this
same single-load fix to all occurrences of the double-load pattern you flagged
(the blocks around the log.Info calls at the other locations).

Comment on lines 132 to 134
if resignErr := e.resign(ctx); resignErr != nil {
log.Warn("resign coordinator actively failed", zap.String("nodeID", nodeID), zap.Error(resignErr))
return errors.Trace(err)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Bug: returns err (nil) instead of resignErr on resign failure.

At this point err is nil — the campaign succeeded. When e.resign fails, the function silently returns nil, swallowing the resign error and making the caller believe everything is fine.

Proposed fix
 		if resignErr := e.resign(ctx); resignErr != nil {
 			log.Warn("resign coordinator actively failed", zap.String("nodeID", nodeID), zap.Error(resignErr))
-			return errors.Trace(err)
+			return errors.Trace(resignErr)
 		}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if resignErr := e.resign(ctx); resignErr != nil {
log.Warn("resign coordinator actively failed", zap.String("nodeID", nodeID), zap.Error(resignErr))
return errors.Trace(err)
if resignErr := e.resign(ctx); resignErr != nil {
log.Warn("resign coordinator actively failed", zap.String("nodeID", nodeID), zap.Error(resignErr))
return errors.Trace(resignErr)
}
🤖 Prompt for AI Agents
In `@server/module_election.go` around lines 132 - 134, In the resign-handling
block inside the election flow (where resignErr := e.resign(ctx) is checked),
the code currently returns errors.Trace(err) which is nil; change the return to
propagate the actual resign error by returning errors.Trace(resignErr) so the
caller sees the failure from e.resign; ensure the logged message still uses
resignErr (zap.Error(resignErr)) and no longer swallows the error.

Comment on lines +201 to +209

if e.svr.liveness.Load() >= stopLiveness {
liveness := e.svr.liveness.Load()
livenessStr := (&liveness).String()
log.Info("stop coordinator campaign loop after server enters stopping",
zap.String("nodeID", nodeID),
zap.String("liveness", livenessStr))
return nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

campaignLogCoordinator is missing the analogous stopLiveness exit check.

campaignCoordinator exits the loop when liveness >= stopLiveness (lines 202-209), but campaignLogCoordinator has no such guard after a normal coordinator exit (line 291). If the node is stopping, the log coordinator loop will re-campaign indefinitely. This looks like an oversight — consider adding the same guard at the end of the campaignLogCoordinator loop.

🤖 Prompt for AI Agents
In `@server/module_election.go` around lines 201 - 209, Add the same stop-liveness
guard used in campaignCoordinator to the campaignLogCoordinator loop: check
e.svr.liveness.Load() >= stopLiveness after the normal coordinator exit path in
campaignLogCoordinator and return nil if true; log the exit similarly (use
nodeID and the liveness value string via (&liveness).String()) so the log
coordinator does not re-campaign while the node is stopping.

Comment on lines +266 to 269
if resignErr := e.resignLogCoordinator(); resignErr != nil {
log.Warn("resign log coordinator actively failed",
zap.String("nodeID", nodeID), zap.Error(resignErr))
return errors.Trace(err)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Bug: returns err (nil) instead of resignErr on resign failure — same issue as line 134.

After a successful campaign (err is nil at this point), if resignLogCoordinator fails, the function returns errors.Trace(err) which is nil.

Proposed fix
 		if resignErr := e.resignLogCoordinator(); resignErr != nil {
 			log.Warn("resign log coordinator actively failed",
 				zap.String("nodeID", nodeID), zap.Error(resignErr))
-			return errors.Trace(err)
+			return errors.Trace(resignErr)
 		}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if resignErr := e.resignLogCoordinator(); resignErr != nil {
log.Warn("resign log coordinator actively failed",
zap.String("nodeID", nodeID), zap.Error(resignErr))
return errors.Trace(err)
if resignErr := e.resignLogCoordinator(); resignErr != nil {
log.Warn("resign log coordinator actively failed",
zap.String("nodeID", nodeID), zap.Error(resignErr))
return errors.Trace(resignErr)
}
🤖 Prompt for AI Agents
In `@server/module_election.go` around lines 266 - 269, In the block where you
call e.resignLogCoordinator(), the function returns errors.Trace(err) even when
resignLogCoordinator() fails (resignErr is non-nil) — change the return to
propagate the resignErr instead of err (e.g., return errors.Trace(resignErr)) so
the actual resign failure is returned; update the same pattern seen earlier
around line 134 to ensure resignErr is used wherever resignLogCoordinator()
failures are returned.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a node draining mechanism, which is a significant feature for graceful shutdown and maintenance. The implementation involves a new drainController to manage the draining state of nodes, a drainScheduler to migrate changefeeds away from draining nodes, and updates to the API, maintainer manager, and election logic to be aware of node liveness. The overall design seems solid, with a state machine approach to drive the drain process.

However, I've found a critical deadlock issue in the election module (server/module_election.go) where mutexes are held indefinitely within infinite loops. I've also identified a minor issue related to time consistency in the drain controller. Please see the detailed comments for suggestions on how to fix these issues.

Comment on lines +73 to +74
e.coordinatorMu.Lock()
defer e.coordinatorMu.Unlock()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This function contains an infinite loop. Placing e.coordinatorMu.Lock() at the beginning with a defer e.coordinatorMu.Unlock() will cause the lock to be held for the entire lifetime of the goroutine, leading to a deadlock if any other goroutine tries to acquire the same lock. The etcd/client/v3/concurrency.Election object is thread-safe, so this mutex seems unnecessary. It should be removed to prevent the deadlock.

Comment on lines +214 to +215
e.logMu.Lock()
defer e.logMu.Unlock()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Similar to campaignCoordinator, this function contains an infinite loop, and holding a lock for the entire duration will cause a deadlock. The etcd/client/v3/concurrency.Election object is thread-safe, so this lock is likely unnecessary. It should be removed to prevent the deadlock.

Comment on lines +290 to +296
if c.drainController.shouldSendDrainRequest(targetNode, now) {
c.drainController.sendSetNodeLivenessRequest(targetNode, heartbeatpb.NodeLiveness_DRAINING)
}
if c.drainController.canPromoteToStopping(targetNode, now) &&
c.drainController.shouldSendStopRequest(targetNode, now) {
c.drainController.sendSetNodeLivenessRequest(targetNode, heartbeatpb.NodeLiveness_STOPPING)
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To ensure time consistency and avoid redundant time.Now() calls, it's better to pass the now variable to sendSetNodeLivenessRequest. This will require changing the signature of sendSetNodeLivenessRequest in drain_controller.go as well, which I've commented on separately.

Suggested change
if c.drainController.shouldSendDrainRequest(targetNode, now) {
c.drainController.sendSetNodeLivenessRequest(targetNode, heartbeatpb.NodeLiveness_DRAINING)
}
if c.drainController.canPromoteToStopping(targetNode, now) &&
c.drainController.shouldSendStopRequest(targetNode, now) {
c.drainController.sendSetNodeLivenessRequest(targetNode, heartbeatpb.NodeLiveness_STOPPING)
}
if c.drainController.shouldSendDrainRequest(targetNode, now) {
c.drainController.sendSetNodeLivenessRequest(targetNode, heartbeatpb.NodeLiveness_DRAINING, now)
}
if c.drainController.canPromoteToStopping(targetNode, now) &&
c.drainController.shouldSendStopRequest(targetNode, now) {
c.drainController.sendSetNodeLivenessRequest(targetNode, heartbeatpb.NodeLiveness_STOPPING, now)
}

Comment on lines +414 to +415
func (c *drainController) sendSetNodeLivenessRequest(targetNode node.ID, liveness heartbeatpb.NodeLiveness) {
nodeStatus := c.nodeView.getStatus(targetNode, time.Now())

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To ensure time consistency and avoid redundant time.Now() calls, this function should accept now time.Time as an argument from its caller (handleDrainOnPeriodTask) and use it instead of calling time.Now() again.

Suggested change
func (c *drainController) sendSetNodeLivenessRequest(targetNode node.ID, liveness heartbeatpb.NodeLiveness) {
nodeStatus := c.nodeView.getStatus(targetNode, time.Now())
func (c *drainController) sendSetNodeLivenessRequest(targetNode node.ID, liveness heartbeatpb.NodeLiveness, now time.Time) {
nodeStatus := c.nodeView.getStatus(targetNode, now)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

do-not-merge/needs-linked-issue do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. release-note Denotes a PR that will be considered when it comes time to generate release notes. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant