-
Notifications
You must be signed in to change notification settings - Fork 38
wip-drain-capture-5.3-codex #4189
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
📝 WalkthroughWalkthroughThe PR implements a node drain/stopping lifecycle feature with monotonic liveness progression. It introduces drain state tracking at the coordinator level, extends the maintainer manager with node-level liveness binding, adds a drain scheduler for migrating maintainers from draining nodes, and implements coordinated heartbeat and liveness-based communication between nodes and the coordinator via new message types. Changes
Sequence Diagram(s)sequenceDiagram
participant Node as Node
participant Coord as Coordinator
participant DCtrl as DrainController
participant Sched as DrainScheduler
participant OpCtrl as OperatorController
Node->>Coord: NodeHeartbeat (Draining liveness)
Coord->>DCtrl: handleNodeHeartbeat
DCtrl->>DCtrl: markDrainingObserved
activate DCtrl
DCtrl->>DCtrl: Update drain state
deactivate DCtrl
Sched->>DCtrl: GetDrainingNodes()
DCtrl-->>Sched: [node_id]
Sched->>Sched: Iterate draining nodes
Sched->>OpCtrl: CountOperatorsByNode(draining_node)
OpCtrl-->>Sched: in_flight_count
alt Changefeed not already moved
Sched->>OpCtrl: CreateOperator(MoveMaintainer)
OpCtrl->>OpCtrl: Schedule move
end
Note over Sched,OpCtrl: Repeat for next execution interval
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ❌ 3❌ Failed checks (2 warnings, 1 inconclusive)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Summary of ChangesHello @hongyunyan, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a comprehensive node draining mechanism, allowing for graceful shutdown or maintenance of individual nodes within the system. It achieves this by implementing a new Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
|
[FORMAT CHECKER NOTIFICATION] Notice: To remove the 📖 For more info, you can check the "Contribute Code" section in the development guide. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 7
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
server/module_election.go (1)
352-360:⚠️ Potential issue | 🟠 MajorSwapped log messages: timeout case logs "failed" and vice versa.
- Line 353: the
DeadlineExceededbranch (actual timeout) logs"log coordinator resign failed".- Line 358: the non-timeout branch logs
"log coordinator resign timeout".These are swapped.
Proposed fix
if resignErr := e.logElection.Resign(resignCtx); resignErr != nil { if errors.Is(errors.Cause(resignErr), context.DeadlineExceeded) { - log.Info("log coordinator resign failed", + log.Warn("log coordinator resign timeout", zap.String("nodeID", nodeID), zap.Error(resignErr)) cancel() return errors.Trace(resignErr) } - log.Warn("log coordinator resign timeout", + log.Warn("log coordinator resign failed", zap.String("nodeID", nodeID), zap.Error(resignErr)) }
🤖 Fix all issues with AI agents
In `@coordinator/drain_controller.go`:
- Around line 491-496: Remove the custom max function in drain_controller.go
(the func max(a, b int) int { ... }) that shadows the built-in; delete this
user-defined max and update any callers (if any) to rely on the Go built-in max
provided by the language/runtime (no other code changes required beyond removing
the function definition).
In `@coordinator/scheduler/drain_test.go`:
- Around line 46-94: Test mutates the map returned by
nodeManager.GetAliveNodes(), violating its contract; add a safe test helper API
on watcher.NodeManager (e.g., func (m *NodeManager) AddAliveNode(n *node.Info)
or RegisterNode) that properly inserts a node into the manager's internal map,
update TestDrainSchedulerSkipsInFlightAndSchedulesMove to call that helper
instead of writing to nodeManager.GetAliveNodes()[...], and update other tests
using the same pattern to use the new AddAliveNode/RegisterNode method; ensure
the new method encapsulates any locking/atomic semantics so callers never modify
the internal map directly.
In `@server/module_election.go`:
- Around line 201-209: Add the same stop-liveness guard used in
campaignCoordinator to the campaignLogCoordinator loop: check
e.svr.liveness.Load() >= stopLiveness after the normal coordinator exit path in
campaignLogCoordinator and return nil if true; log the exit similarly (use
nodeID and the liveness value string via (&liveness).String()) so the log
coordinator does not re-campaign while the node is stopping.
- Around line 73-74: The coordinatorMu is being held for the entire campaign
loop which blocks other callers and doesn't synchronize with concurrent resign
calls; change locking so coordinatorMu only wraps the actual resign operations
rather than the full loop: remove the defer e.coordinatorMu.Unlock() that spans
the loop in the function containing the campaign loop, and instead
acquire/release e.coordinatorMu around each call site that invokes e.resign() or
directly calls e.election.Resign (including inside watchLivenessAndResign); do
the same for logMu in campaignLogCoordinator so each log/resign action is
protected rather than the whole loop.
- Around line 266-269: In the block where you call e.resignLogCoordinator(), the
function returns errors.Trace(err) even when resignLogCoordinator() fails
(resignErr is non-nil) — change the return to propagate the resignErr instead of
err (e.g., return errors.Trace(resignErr)) so the actual resign failure is
returned; update the same pattern seen earlier around line 134 to ensure
resignErr is used wherever resignLogCoordinator() failures are returned.
- Around line 132-134: In the resign-handling block inside the election flow
(where resignErr := e.resign(ctx) is checked), the code currently returns
errors.Trace(err) which is nil; change the return to propagate the actual resign
error by returning errors.Trace(resignErr) so the caller sees the failure from
e.resign; ensure the logged message still uses resignErr (zap.Error(resignErr))
and no longer swallows the error.
- Around line 95-101: Multiple places call e.svr.liveness.Load() twice leading
to TOCTOU mismatches (e.g., the branch that checks liveness then separately
calls Load() again for logging in the block around the log.Info call); change
each pattern to call e.svr.liveness.Load() once into a local variable (e.g.,
liveness := e.svr.liveness.Load()) and reuse that variable for both the
conditional check and for logging (convert that single local to a string for
zap.String("liveness", ...) as needed). Apply this same single-load fix to all
occurrences of the double-load pattern you flagged (the blocks around the
log.Info calls at the other locations).
🧹 Nitpick comments (15)
server/module_election.go (3)
96-97:(&liveness).String()is unnecessarily verbose.Since
livenessis a local variable (addressable), Go allows calling a pointer-receiver method directly:liveness.String(). The explicit address-of is not needed and hurts readability.Also applies to: 127-128, 203-204, 236-237, 261-262, 319-320
317-326: Defercancel()to prevent context leak on unexpected panic.If
e.resignpanics,cancel()on line 326 is never reached. Since this runs in a loop, wrapping the block in a helper or usingdeferin a closure is safer:Proposed fix
- resignCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - if err := e.resign(resignCtx); err != nil { - liveness := e.svr.liveness.Load() - livenessStr := (&liveness).String() - log.Warn("resign coordinator on stopping liveness failed", - zap.String("nodeID", nodeID), - zap.String("liveness", livenessStr), - zap.Error(err)) - } - cancel() + func() { + resignCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := e.resign(resignCtx); err != nil { + liveness := e.svr.liveness.Load() + log.Warn("resign coordinator on stopping liveness failed", + zap.String("nodeID", nodeID), + zap.String("liveness", liveness.String()), + zap.Error(err)) + } + }()
299-329: Log coordinator resignation is handled incampaignLogCoordinator, notwatchLivenessAndResign.
watchLivenessAndResignonly resigns the coordinator role viae.resign(). The log coordinator, however, is resigned within thecampaignLogCoordinator()goroutine immediately after it campaigns successfully—if liveness reachesforbiddenLiveness(draining/stopping), it callsresignLogCoordinator()and returns (lines 260-270). This design separates concerns: each campaign function monitors liveness for its own role. Consider whether this split responsibility is sufficiently clear in the code, or if consolidating liveness-based resignation logic would improve maintainability.pkg/api/util.go (1)
185-195: Pre-existing race:String()reads*lnon-atomically.The new
Drainingcase is fine, butString()dereferences*ldirectly (line 186:switch *l) instead of usingl.Load(). This is a data race if called concurrently withStore(). While this is pre-existing, it's worth fixing since you're already touching this block.♻️ Proposed fix
func (l *Liveness) String() string { - switch *l { + switch l.Load() { case LivenessCaptureAlive: return "Alive" case LivenessCaptureDraining:pkg/api/util_liveness_test.go (1)
22-34: Good coverage of the core monotonic invariant.Consider adding edge cases for completeness: idempotent store (
Draining → Drainingreturnstrue), out-of-bounds values (Liveness(-1),Liveness(3)returnfalse), and intermediate regression (Stopping → Draining).coordinator/drain_controller_test.go (1)
45-68: Remove unusednowvariable.
now(line 47) is declared but only consumed by a blank identifier on line 68. Just remove both lines.♻️ Proposed fix
func TestDrainRemainingGuard(t *testing.T) { nodeID := node.ID("node-1") - now := time.Now() remaining := drainRemaining{ targetNode: nodeID, maintainersOnTarget: 0, @@ -65,4 +64,3 @@ remaining.stoppingObserved = true require.Equal(t, 0, remaining.remaining()) - - _ = now }api/v1/api.go (1)
191-225: Drain capture rewrite looks solid overall.The validation, coordinator check, and structured logging are well done. One minor note:
Line 213 uses
errors.New(...)to produce a raw, untyped error. Other error paths in this handler use normalized errors likeErrAPIInvalidParam. Consider using a normalized error (e.g.,errors.ErrSchedulerRequestFailed) so callers get a consistent RFC error code in the response.Proposed fix
- _ = c.Error(errors.New("coordinator is not fully initialized, wait a moment")) + _ = c.Error(errors.ErrSchedulerRequestFailed.GenWithStackByArgs("coordinator is not fully initialized, wait a moment"))coordinator/controller.go (1)
267-276:handleDrainOnPeriodTaskruns undernodeChangedmutex.
handleDrainOnPeriodTask(which may send messages viasendSetNodeLivenessRequest) runs whilenodeChanged.Lock()is held. If message sending ever blocks (e.g., full channel), this would delay node-change detection for other goroutines callingcheckOnNodeChanged.Currently the operations look lightweight, so this is likely fine, but consider moving the drain task outside the lock scope if drain processing grows more complex.
Possible restructuring
func (c *Controller) checkOnNodeChanged(ctx context.Context) { c.nodeChanged.Lock() - defer c.nodeChanged.Unlock() - if c.nodeChanged.changed { c.onNodeChanged(ctx) c.nodeChanged.changed = false } + c.nodeChanged.Unlock() + c.handleDrainOnPeriodTask() }coordinator/scheduler/drain.go (1)
62-106: Drain scheduling loop is well-structured; consider adding a summary log for observability.The
Execute()logic correctly handles batch limits, rotating cursor for fairness, and least-loaded destination selection. One observation:When moves are created during a drain cycle, there's no log output. For a 200ms interval this is understandable, but a summary log (e.g., at
Infolevel whencreated > 0) would help operators monitor drain progress.💡 Optional: add a summary log after the loop
s.lastNodeCursor = (s.lastNodeCursor + 1) % len(drainingNodes) + if created > 0 { + log.Info("drain scheduler created move operators", + zap.Int("created", created), + zap.Int("drainingNodes", len(drainingNodes))) + } return now.Add(drainScheduleInterval)maintainer/maintainer_manager.go (2)
247-252: Consider also rejectingAddMaintainerwhen the node is draining, not only stopping.Currently, new maintainers are only rejected when
LivenessCaptureStopping. A draining node is actively shedding maintainers via the drain scheduler, so accepting new ones from other schedulers would be counterproductive. Unless this is intentionally deferred, addingLivenessCaptureDrainingto the guard would prevent scheduling churn.Suggested change
func (m *Manager) onAddMaintainerRequest(req *heartbeatpb.AddMaintainerRequest) *heartbeatpb.MaintainerStatus { - if m.serverLiveness.Load() == api.LivenessCaptureStopping { - log.Warn("reject add maintainer request because node is stopping", + if lv := m.serverLiveness.Load(); lv == api.LivenessCaptureStopping || lv == api.LivenessCaptureDraining { + log.Warn("reject add maintainer request because node is draining or stopping", zap.Stringer("changefeedID", common.NewChangefeedIDFromPB(req.Id))) return nil }
309-355: RedundantLoad()calls afterStore()in liveness transition logic.Lines 330-333 call
Load()before and afterStore(), then compare. SinceStore()already returns whether the transition succeeded (monotonic check), the extra loads are unnecessary. You could simplify:Simplified transition detection
- before := m.serverLiveness.Load() - if m.serverLiveness.Store(target) { - after := m.serverLiveness.Load() - transitioned = before != after - if transitioned { - beforeStr := (&before).String() - afterStr := (&after).String() - log.Info("node liveness updated", - zap.String("from", beforeStr), - zap.String("to", afterStr), - zap.Uint64("nodeEpoch", m.nodeEpoch)) - } - } + before := m.serverLiveness.Load() + if m.serverLiveness.Store(target) { + transitioned = true + log.Info("node liveness updated", + zap.String("from", before.String()), + zap.String("to", target.String()), + zap.Uint64("nodeEpoch", m.nodeEpoch)) + }coordinator/drain_controller.go (4)
194-215:remaining()logic is correct but complex — consider adding inline comments explaining the state transitions.The method returns a positive value as long as the drain lifecycle hasn't fully completed (draining observed → work drained → stopping observed). The key invariant — never returning 0 until both
stoppingObservedand no remaining work — is correctly maintained. A brief comment per branch would improve readability for future maintainers.
355-384:getDrainingNodesacquiresc.muthen calls intonodeView(which acquiresv.mu.RLock).This nested lock acquisition is safe because
handleNodeHeartbeatacquiresv.mu.Lockandc.musequentially (never nested). However, this lock ordering should be documented to prevent future regressions where someone adds nested locking in the reverse order.
414-428: Error fromSendCommandis logged but not wrapped witherrors.Trace.Per coding guidelines, errors from library calls should be wrapped immediately. While the error is only logged here and not propagated, wrapping it would attach a stack trace for more useful diagnostics.
Suggested fix
+ "github.com/pingcap/ticdc/pkg/errors" ... if err != nil { log.Warn("failed to send set node liveness request", zap.Stringer("targetNode", targetNode), zap.Uint64("nodeEpoch", nodeStatus.nodeEpoch), zap.Int32("targetLiveness", int32(liveness)), - zap.Error(err)) + zap.Error(errors.Trace(err))) }As per coding guidelines, "When an error comes from a third-party or library call in Go, wrap it immediately with
errors.Trace(err)orerrors.WrapError(...)to attach a stack trace".
59-65: Consider addinglockor concurrency annotations tonodeLivenessStatus.Fields of
nodeLivenessStatusare mutated undernodeLivenessView.mu, but the struct itself has no documentation about its concurrency contract. A brief comment noting it's guarded bynodeLivenessView.muwould help prevent misuse.
| func max(a, b int) int { | ||
| if a > b { | ||
| return a | ||
| } | ||
| return b | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
cat go.mod | head -10Repository: pingcap/ticdc
Length of output: 306
Remove the custom max function and use Go's built-in instead.
The project targets Go 1.25.5, which has had max as a built-in since Go 1.21. The custom definition at lines 491-496 is unnecessary and shadows the built-in. Delete this function.
🤖 Prompt for AI Agents
In `@coordinator/drain_controller.go` around lines 491 - 496, Remove the custom
max function in drain_controller.go (the func max(a, b int) int { ... }) that
shadows the built-in; delete this user-defined max and update any callers (if
any) to rely on the Go built-in max provided by the language/runtime (no other
code changes required beyond removing the function definition).
| func TestDrainSchedulerSkipsInFlightAndSchedulesMove(t *testing.T) { | ||
| changefeedDB := changefeed.NewChangefeedDB(1) | ||
| ctrl := gomock.NewController(t) | ||
| backend := mock_changefeed.NewMockBackend(ctrl) | ||
| t.Cleanup(ctrl.Finish) | ||
| mc := messaging.NewMockMessageCenter() | ||
| appcontext.SetService(appcontext.MessageCenter, mc) | ||
| nodeManager := watcher.NewNodeManager(nil, nil) | ||
| appcontext.SetService(watcher.NodeManagerName, nodeManager) | ||
| selfNode := node.NewInfo("127.0.0.1:8300", "") | ||
| nodeManager.GetAliveNodes()[selfNode.ID] = selfNode | ||
| destNode := node.NewInfo("127.0.0.1:8301", "") | ||
| nodeManager.GetAliveNodes()[destNode.ID] = destNode | ||
|
|
||
| oc := operator.NewOperatorController(selfNode, changefeedDB, backend, 16) | ||
|
|
||
| cfID1 := common.NewChangeFeedIDWithName("cf-1", common.DefaultKeyspaceName) | ||
| cfID2 := common.NewChangeFeedIDWithName("cf-2", common.DefaultKeyspaceName) | ||
| cf1 := changefeed.NewChangefeed(cfID1, &config.ChangeFeedInfo{ | ||
| ChangefeedID: cfID1, | ||
| Config: config.GetDefaultReplicaConfig(), | ||
| SinkURI: "mysql://127.0.0.1:3306", | ||
| }, 10, false) | ||
| cf2 := changefeed.NewChangefeed(cfID2, &config.ChangeFeedInfo{ | ||
| ChangefeedID: cfID2, | ||
| Config: config.GetDefaultReplicaConfig(), | ||
| SinkURI: "mysql://127.0.0.1:3306", | ||
| }, 10, false) | ||
|
|
||
| target := node.ID("drain-node") | ||
| changefeedDB.AddReplicatingMaintainer(cf1, target) | ||
| changefeedDB.AddReplicatingMaintainer(cf2, target) | ||
|
|
||
| // One in-flight operator already exists, drain scheduler should skip it. | ||
| require.True(t, oc.AddOperator(oc.NewMoveMaintainerOperator(cf1, target, destNode.ID))) | ||
|
|
||
| s := NewDrainScheduler(16, oc, changefeedDB, &mockDrainController{ | ||
| drainingNodes: []node.ID{target}, | ||
| destNodes: map[node.ID]*node.Info{ | ||
| destNode.ID: destNode, | ||
| }, | ||
| }) | ||
|
|
||
| s.Execute() | ||
|
|
||
| // We already had one operator for cf1, now scheduler should add one for cf2. | ||
| require.NotNil(t, oc.GetOperator(cf1.ID)) | ||
| require.NotNil(t, oc.GetOperator(cf2.ID)) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Find GetAliveNodes implementation
fd -t f "\.go$" | xargs grep -l "GetAliveNodes" | head -5Repository: pingcap/ticdc
Length of output: 246
🏁 Script executed:
#!/bin/bash
# Search for GetAliveNodes function definition
ast-grep --pattern $'func ($_ ) GetAliveNodes() $_ {
$$$
}'Repository: pingcap/ticdc
Length of output: 271
🏁 Script executed:
#!/bin/bash
# Also search with ripgrep for GetAliveNodes definition
rg "func.*GetAliveNodes\(\)" -A 5Repository: pingcap/ticdc
Length of output: 454
🏁 Script executed:
#!/bin/bash
# Check for proper node registration methods in NodeManager
rg "func \(c \*NodeManager\)" server/watcher/module_node_manager.go | head -20Repository: pingcap/ticdc
Length of output: 592
🏁 Script executed:
#!/tool/bin/bash
# Look for any AddNode or registration methods
rg "AddNode|Register.*Node|add.*Node" server/watcher/module_node_manager.goRepository: pingcap/ticdc
Length of output: 147
🏁 Script executed:
#!/bin/bash
# Check the NodeManager struct definition to understand how nodes are stored
rg "type NodeManager struct" -A 10 server/watcher/module_node_manager.goRepository: pingcap/ticdc
Length of output: 317
🏁 Script executed:
#!/bin/bash
# Look at how nodes are updated in NodeManager (e.g., in Tick or Run methods)
rg "c\.nodes\." server/watcher/module_node_manager.go | head -15Repository: pingcap/ticdc
Length of output: 163
🏁 Script executed:
#!/bin/bash
# Check other test files to see how they register nodes with NodeManager
rg -l "NewNodeManager" | xargs grep -A 10 "NewNodeManager" | grep -A 10 "Alive\|nodes" | head -30Repository: pingcap/ticdc
Length of output: 2650
🏁 Script executed:
#!/bin/bash
# Check if there are any proper APIs for updating nodes in tests
rg "nodeManager\." maintainer/maintainer_test.go | head -10Repository: pingcap/ticdc
Length of output: 95
🏁 Script executed:
#!/bin/bash
# Let's verify how c.nodes.Load() works and confirm the return type
cat -n server/watcher/module_node_manager.go | sed -n '140,150p'Repository: pingcap/ticdc
Length of output: 448
🏁 Script executed:
#!/bin/bash
# Count how many test files use this pattern
rg "GetAliveNodes\(\)\[" --type go | wc -lRepository: pingcap/ticdc
Length of output: 59
🏁 Script executed:
#!/bin/bash
# Check if there's a comment in the original NodeManager about this being a test-only concern
rg "GetAliveNodes\(\)" server/watcher/module_node_manager.go -B 2 -A 5Repository: pingcap/ticdc
Length of output: 414
GetAliveNodes() violates its documented API contract.
Lines 56–58 directly mutate the map returned by GetAliveNodes(), but the function's documentation explicitly states "the caller mustn't modify the returned map." The underlying implementation returns a pointer to the internal atomic map rather than a copy, making mutation possible but contractually forbidden.
No public API exists for properly registering nodes in tests; this mutating pattern is used across 80 test instances in the codebase. While this indicates the practice is widespread and pragmatically accepted, the test should either enforce the contract or the API should be redesigned to make mutation safe for tests.
🤖 Prompt for AI Agents
In `@coordinator/scheduler/drain_test.go` around lines 46 - 94, Test mutates the
map returned by nodeManager.GetAliveNodes(), violating its contract; add a safe
test helper API on watcher.NodeManager (e.g., func (m *NodeManager)
AddAliveNode(n *node.Info) or RegisterNode) that properly inserts a node into
the manager's internal map, update
TestDrainSchedulerSkipsInFlightAndSchedulesMove to call that helper instead of
writing to nodeManager.GetAliveNodes()[...], and update other tests using the
same pattern to use the new AddAliveNode/RegisterNode method; ensure the new
method encapsulates any locking/atomic semantics so callers never modify the
internal map directly.
| e.coordinatorMu.Lock() | ||
| defer e.coordinatorMu.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mutex held for the entire (potentially infinite) campaign loop – no real synchronization with watchLivenessAndResign.
coordinatorMu is acquired at entry and held via defer for the life of the loop, which may never return. This means:
- The lock is never available for anyone else, so it provides no concurrency protection.
watchLivenessAndResign(line 318) callse.resign()concurrently without acquiring this mutex, soe.election.Resigncan race with the resign calls inside this function (lines 132, 168).
If the intent is to prevent concurrent Resign calls on the same Election, the mutex should guard each resign call site (including the watcher), not the entire loop. The same applies to logMu in campaignLogCoordinator.
🤖 Prompt for AI Agents
In `@server/module_election.go` around lines 73 - 74, The coordinatorMu is being
held for the entire campaign loop which blocks other callers and doesn't
synchronize with concurrent resign calls; change locking so coordinatorMu only
wraps the actual resign operations rather than the full loop: remove the defer
e.coordinatorMu.Unlock() that spans the loop in the function containing the
campaign loop, and instead acquire/release e.coordinatorMu around each call site
that invokes e.resign() or directly calls e.election.Resign (including inside
watchLivenessAndResign); do the same for logMu in campaignLogCoordinator so each
log/resign action is protected rather than the whole loop.
| if e.svr.liveness.Load() >= forbiddenLiveness { | ||
| liveness := e.svr.liveness.Load() | ||
| livenessStr := (&liveness).String() | ||
| log.Info("do not campaign coordinator, liveness is not alive", | ||
| zap.String("nodeID", nodeID), | ||
| zap.String("liveness", livenessStr)) | ||
| return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TOCTOU: double liveness.Load() may observe different values.
The first Load() (line 95) gates entry, the second (line 96) fetches the value for logging. Between the two calls the value can change, so the logged value may not match the condition that triggered the branch. Load once and reuse:
Proposed fix
- if e.svr.liveness.Load() >= forbiddenLiveness {
- liveness := e.svr.liveness.Load()
- livenessStr := (&liveness).String()
+ if liveness := e.svr.liveness.Load(); liveness >= forbiddenLiveness {
+ livenessStr := liveness.String()
log.Info("do not campaign coordinator, liveness is not alive",
zap.String("nodeID", nodeID),
zap.String("liveness", livenessStr))
return nil
}The same double-load pattern appears at lines 126-128, 202-204, 235-237, 260-262, and 319-320. Please fix all occurrences consistently.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if e.svr.liveness.Load() >= forbiddenLiveness { | |
| liveness := e.svr.liveness.Load() | |
| livenessStr := (&liveness).String() | |
| log.Info("do not campaign coordinator, liveness is not alive", | |
| zap.String("nodeID", nodeID), | |
| zap.String("liveness", livenessStr)) | |
| return nil | |
| if liveness := e.svr.liveness.Load(); liveness >= forbiddenLiveness { | |
| livenessStr := liveness.String() | |
| log.Info("do not campaign coordinator, liveness is not alive", | |
| zap.String("nodeID", nodeID), | |
| zap.String("liveness", livenessStr)) | |
| return nil | |
| } |
🤖 Prompt for AI Agents
In `@server/module_election.go` around lines 95 - 101, Multiple places call
e.svr.liveness.Load() twice leading to TOCTOU mismatches (e.g., the branch that
checks liveness then separately calls Load() again for logging in the block
around the log.Info call); change each pattern to call e.svr.liveness.Load()
once into a local variable (e.g., liveness := e.svr.liveness.Load()) and reuse
that variable for both the conditional check and for logging (convert that
single local to a string for zap.String("liveness", ...) as needed). Apply this
same single-load fix to all occurrences of the double-load pattern you flagged
(the blocks around the log.Info calls at the other locations).
| if resignErr := e.resign(ctx); resignErr != nil { | ||
| log.Warn("resign coordinator actively failed", zap.String("nodeID", nodeID), zap.Error(resignErr)) | ||
| return errors.Trace(err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: returns err (nil) instead of resignErr on resign failure.
At this point err is nil — the campaign succeeded. When e.resign fails, the function silently returns nil, swallowing the resign error and making the caller believe everything is fine.
Proposed fix
if resignErr := e.resign(ctx); resignErr != nil {
log.Warn("resign coordinator actively failed", zap.String("nodeID", nodeID), zap.Error(resignErr))
- return errors.Trace(err)
+ return errors.Trace(resignErr)
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if resignErr := e.resign(ctx); resignErr != nil { | |
| log.Warn("resign coordinator actively failed", zap.String("nodeID", nodeID), zap.Error(resignErr)) | |
| return errors.Trace(err) | |
| if resignErr := e.resign(ctx); resignErr != nil { | |
| log.Warn("resign coordinator actively failed", zap.String("nodeID", nodeID), zap.Error(resignErr)) | |
| return errors.Trace(resignErr) | |
| } |
🤖 Prompt for AI Agents
In `@server/module_election.go` around lines 132 - 134, In the resign-handling
block inside the election flow (where resignErr := e.resign(ctx) is checked),
the code currently returns errors.Trace(err) which is nil; change the return to
propagate the actual resign error by returning errors.Trace(resignErr) so the
caller sees the failure from e.resign; ensure the logged message still uses
resignErr (zap.Error(resignErr)) and no longer swallows the error.
|
|
||
| if e.svr.liveness.Load() >= stopLiveness { | ||
| liveness := e.svr.liveness.Load() | ||
| livenessStr := (&liveness).String() | ||
| log.Info("stop coordinator campaign loop after server enters stopping", | ||
| zap.String("nodeID", nodeID), | ||
| zap.String("liveness", livenessStr)) | ||
| return nil | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
campaignLogCoordinator is missing the analogous stopLiveness exit check.
campaignCoordinator exits the loop when liveness >= stopLiveness (lines 202-209), but campaignLogCoordinator has no such guard after a normal coordinator exit (line 291). If the node is stopping, the log coordinator loop will re-campaign indefinitely. This looks like an oversight — consider adding the same guard at the end of the campaignLogCoordinator loop.
🤖 Prompt for AI Agents
In `@server/module_election.go` around lines 201 - 209, Add the same stop-liveness
guard used in campaignCoordinator to the campaignLogCoordinator loop: check
e.svr.liveness.Load() >= stopLiveness after the normal coordinator exit path in
campaignLogCoordinator and return nil if true; log the exit similarly (use
nodeID and the liveness value string via (&liveness).String()) so the log
coordinator does not re-campaign while the node is stopping.
| if resignErr := e.resignLogCoordinator(); resignErr != nil { | ||
| log.Warn("resign log coordinator actively failed", | ||
| zap.String("nodeID", nodeID), zap.Error(resignErr)) | ||
| return errors.Trace(err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: returns err (nil) instead of resignErr on resign failure — same issue as line 134.
After a successful campaign (err is nil at this point), if resignLogCoordinator fails, the function returns errors.Trace(err) which is nil.
Proposed fix
if resignErr := e.resignLogCoordinator(); resignErr != nil {
log.Warn("resign log coordinator actively failed",
zap.String("nodeID", nodeID), zap.Error(resignErr))
- return errors.Trace(err)
+ return errors.Trace(resignErr)
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if resignErr := e.resignLogCoordinator(); resignErr != nil { | |
| log.Warn("resign log coordinator actively failed", | |
| zap.String("nodeID", nodeID), zap.Error(resignErr)) | |
| return errors.Trace(err) | |
| if resignErr := e.resignLogCoordinator(); resignErr != nil { | |
| log.Warn("resign log coordinator actively failed", | |
| zap.String("nodeID", nodeID), zap.Error(resignErr)) | |
| return errors.Trace(resignErr) | |
| } |
🤖 Prompt for AI Agents
In `@server/module_election.go` around lines 266 - 269, In the block where you
call e.resignLogCoordinator(), the function returns errors.Trace(err) even when
resignLogCoordinator() fails (resignErr is non-nil) — change the return to
propagate the resignErr instead of err (e.g., return errors.Trace(resignErr)) so
the actual resign failure is returned; update the same pattern seen earlier
around line 134 to ensure resignErr is used wherever resignLogCoordinator()
failures are returned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a node draining mechanism, which is a significant feature for graceful shutdown and maintenance. The implementation involves a new drainController to manage the draining state of nodes, a drainScheduler to migrate changefeeds away from draining nodes, and updates to the API, maintainer manager, and election logic to be aware of node liveness. The overall design seems solid, with a state machine approach to drive the drain process.
However, I've found a critical deadlock issue in the election module (server/module_election.go) where mutexes are held indefinitely within infinite loops. I've also identified a minor issue related to time consistency in the drain controller. Please see the detailed comments for suggestions on how to fix these issues.
| e.coordinatorMu.Lock() | ||
| defer e.coordinatorMu.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function contains an infinite loop. Placing e.coordinatorMu.Lock() at the beginning with a defer e.coordinatorMu.Unlock() will cause the lock to be held for the entire lifetime of the goroutine, leading to a deadlock if any other goroutine tries to acquire the same lock. The etcd/client/v3/concurrency.Election object is thread-safe, so this mutex seems unnecessary. It should be removed to prevent the deadlock.
| e.logMu.Lock() | ||
| defer e.logMu.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| if c.drainController.shouldSendDrainRequest(targetNode, now) { | ||
| c.drainController.sendSetNodeLivenessRequest(targetNode, heartbeatpb.NodeLiveness_DRAINING) | ||
| } | ||
| if c.drainController.canPromoteToStopping(targetNode, now) && | ||
| c.drainController.shouldSendStopRequest(targetNode, now) { | ||
| c.drainController.sendSetNodeLivenessRequest(targetNode, heartbeatpb.NodeLiveness_STOPPING) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To ensure time consistency and avoid redundant time.Now() calls, it's better to pass the now variable to sendSetNodeLivenessRequest. This will require changing the signature of sendSetNodeLivenessRequest in drain_controller.go as well, which I've commented on separately.
| if c.drainController.shouldSendDrainRequest(targetNode, now) { | |
| c.drainController.sendSetNodeLivenessRequest(targetNode, heartbeatpb.NodeLiveness_DRAINING) | |
| } | |
| if c.drainController.canPromoteToStopping(targetNode, now) && | |
| c.drainController.shouldSendStopRequest(targetNode, now) { | |
| c.drainController.sendSetNodeLivenessRequest(targetNode, heartbeatpb.NodeLiveness_STOPPING) | |
| } | |
| if c.drainController.shouldSendDrainRequest(targetNode, now) { | |
| c.drainController.sendSetNodeLivenessRequest(targetNode, heartbeatpb.NodeLiveness_DRAINING, now) | |
| } | |
| if c.drainController.canPromoteToStopping(targetNode, now) && | |
| c.drainController.shouldSendStopRequest(targetNode, now) { | |
| c.drainController.sendSetNodeLivenessRequest(targetNode, heartbeatpb.NodeLiveness_STOPPING, now) | |
| } |
| func (c *drainController) sendSetNodeLivenessRequest(targetNode node.ID, liveness heartbeatpb.NodeLiveness) { | ||
| nodeStatus := c.nodeView.getStatus(targetNode, time.Now()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To ensure time consistency and avoid redundant time.Now() calls, this function should accept now time.Time as an argument from its caller (handleDrainOnPeriodTask) and use it instead of calling time.Now() again.
| func (c *drainController) sendSetNodeLivenessRequest(targetNode node.ID, liveness heartbeatpb.NodeLiveness) { | |
| nodeStatus := c.nodeView.getStatus(targetNode, time.Now()) | |
| func (c *drainController) sendSetNodeLivenessRequest(targetNode node.ID, liveness heartbeatpb.NodeLiveness, now time.Time) { | |
| nodeStatus := c.nodeView.getStatus(targetNode, now) |
What problem does this PR solve?
Issue Number: close #xxx
What is changed and how it works?
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note
Summary by CodeRabbit
Release Notes
New Features
Improvements