-
Notifications
You must be signed in to change notification settings - Fork 38
wip-drain-capture-5.2 #4190
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
wip-drain-capture-5.2 #4190
Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
Warning Rate limit exceeded
⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📝 WalkthroughWalkthroughAdds node liveness states (Alive → Draining → Stopping), a Drain Controller to issue SetNodeLiveness commands, heartbeat-based liveness observation, liveness-aware schedulers (including a drain scheduler), and a Coordinator.DrainNode API; maintainer emits NodeHeartbeat and responds to SetNodeLiveness requests. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant API as API/v1
participant Coord as Coordinator
participant DC as DrainController
participant LV as LivenessView
participant MM as MaintainerManager
participant DS as DrainScheduler
Client->>API: DrainNode(target)
API->>Coord: DrainNode(ctx, target)
Coord->>DC: RequestDrain(target)
DC-->>Coord: acknowledge request
Coord->>DC: GetStatus(target) / compute remaining
Coord-->>API: return remainingCount
loop periodic heartbeat/advance
MM->>Coord: NodeHeartbeat(liveness, epoch)
Coord->>LV: ObserveHeartbeat(...)
Coord->>DC: ObserveHeartbeat(...)
DC-->>Coord: mark draining/stopping observed
alt readyToStop(target)
DC->>MM: SetNodeLivenessRequest(STOPPING, epoch)
MM->>Coord: SetNodeLivenessResponse(applied, epoch)
Coord->>DC: ObserveSetNodeLivenessResponse(...)
else still draining
DC->>MM: SetNodeLivenessRequest(DRAINING, epoch)
MM->>Coord: SetNodeLivenessResponse(...)
end
end
DS->>LV: GetDrainingOrStoppingNodes()
DS->>Coord: request move operators for origins
Coord->>DS: approve/create MoveMaintainerOperators
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ❌ 4❌ Failed checks (3 warnings, 1 inconclusive)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Summary of ChangesHello @hongyunyan, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a robust node draining mechanism for TiCDC, enabling graceful shutdown and maintenance operations. It establishes a clear lifecycle for nodes (Alive, Draining, Stopping) and integrates this liveness awareness across critical components, including the API, coordinator, schedulers, and maintainers. The changes ensure that tasks are appropriately managed during node transitions, preventing new assignments to unhealthy nodes and facilitating the migration of existing workloads. Additionally, it refines table schema comparison by incorporating index column length for greater accuracy. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a comprehensive node draining mechanism, a significant feature for improving cluster operability. The changes are extensive, touching upon the API, coordinator, schedulers, maintainers, and core server logic.
Key changes include:
- A new
drainpackage with aControllerto manage the node draining state machine. - A new
nodelivenesspackage with aViewto track node liveness with TTL. - A new
drainSchedulerto move work off draining nodes. - Updates to
basicSchedulerandbalanceSchedulerto be aware of node liveness. - A new implementation for the v1
drainCaptureAPI. - Enhancements to the server election and shutdown logic to handle draining/stopping states gracefully.
- Improved correctness in
Liveness.Storeto enforce monotonic state transitions.
The implementation appears robust and well-tested, with new unit and integration tests covering the new functionality. I have one suggestion for code simplification in the DrainNode function to improve readability.
coordinator/controller.go
Outdated
| // v1 drain API must not return 0 until drain completion is proven. | ||
| if nodeState == nodeliveness.StateUnknown || !drainingObserved { | ||
| if remaining == 0 { | ||
| remaining = 1 | ||
| } | ||
| return remaining, nil | ||
| } | ||
|
|
||
| // Return 0 only after STOPPING is observed and all work is done. | ||
| if stoppingObserved && maintainersOnTarget == 0 && inflightOpsInvolvingTarget == 0 { | ||
| return 0, nil | ||
| } | ||
| if remaining == 0 { | ||
| remaining = 1 | ||
| } | ||
| return remaining, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic to ensure remaining is at least 1 unless drain is complete is a bit complex and can be simplified. The current implementation has a duplicated check if remaining == 0 { remaining = 1 } and an early return that makes the flow harder to follow. A refactoring can make the logic more straightforward.
| // v1 drain API must not return 0 until drain completion is proven. | |
| if nodeState == nodeliveness.StateUnknown || !drainingObserved { | |
| if remaining == 0 { | |
| remaining = 1 | |
| } | |
| return remaining, nil | |
| } | |
| // Return 0 only after STOPPING is observed and all work is done. | |
| if stoppingObserved && maintainersOnTarget == 0 && inflightOpsInvolvingTarget == 0 { | |
| return 0, nil | |
| } | |
| if remaining == 0 { | |
| remaining = 1 | |
| } | |
| return remaining, nil | |
| // v1 drain API must not return 0 until drain completion is proven. | |
| // Return 0 only after STOPPING is observed and all work is done, and the | |
| // node liveness is not unknown. | |
| if nodeState != nodeliveness.StateUnknown && stoppingObserved && | |
| maintainersOnTarget == 0 && inflightOpsInvolvingTarget == 0 { | |
| return 0, nil | |
| } | |
| if remaining == 0 { | |
| return 1, nil | |
| } | |
| return remaining, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 12
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
api/v1/api.go (1)
195-195:⚠️ Potential issue | 🟡 MinorStale TODO comment — the drain API is now implemented.
The comment says "currently it is a no-op" but the method now delegates to
co.DrainNode. Remove or update the TODO.Suggested fix
// drainCapture drains all tables from a capture. // Usage: // curl -X PUT http://127.0.0.1:8300/api/v1/captures/drain -// TODO: Implement this API in the future, currently it is a no-op. func (o *OpenAPIV1) drainCapture(c *gin.Context) {server/module_election.go (1)
322-341:⚠️ Potential issue | 🟡 MinorLog messages for "resign failed" and "resign timeout" are swapped.
When
DeadlineExceededis the cause (line 330), the code logs "resign failed" (line 331) and returns the error. For other errors, it logs "resign timeout" (line 336). This is the inverse of the coordinator resign flow (lines 173-184) whereDeadlineExceededlogs "timeout" and other errors log "failed".Proposed fix
func (e *elector) resignLogCoordinator() error { if e.logElection == nil { return nil } nodeID := string(e.svr.info.ID) // use a new context to prevent the context from being cancelled. resignCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) if resignErr := e.logElection.Resign(resignCtx); resignErr != nil { - if errors.Is(errors.Cause(resignErr), context.DeadlineExceeded) { - log.Info("log coordinator resign failed", + if errors.Cause(resignErr) != context.DeadlineExceeded { + log.Info("log coordinator resign failed", zap.String("nodeID", nodeID), zap.Error(resignErr)) cancel() return errors.Trace(resignErr) } log.Warn("log coordinator resign timeout", zap.String("nodeID", nodeID), zap.Error(resignErr)) } cancel() return nil }
🤖 Fix all issues with AI agents
In `@coordinator/controller_drain_test.go`:
- Line 1: Add the standard Apache 2.0 copyright header used across the
repository to the top of this file (above the "package coordinator" declaration)
so CI stops failing; copy the exact header text used in other files (for example
the header at the top of coordinator.go) and place it verbatim at the top of
coordinator/controller_drain_test.go.
In `@coordinator/drain/controller_test.go`:
- Line 1: Add the standard PingCAP copyright header to the top of
controller_test.go (above the line "package drain")—mirror the exact header used
in view_test.go so CI recognizes it; ensure the header appears before the
package declaration in controller_test.go.
In `@coordinator/drain/controller.go`:
- Line 1: Missing Apache 2.0 copyright header: add the project's standard Apache
2.0 header comment block to the top of controller.go above the existing package
drain declaration, ensuring it includes the correct copyright year and copyright
holder name used across the repo and matches the exact header format used in
other files.
In `@coordinator/nodeliveness/view_test.go`:
- Line 1: Add the standard PingCAP copyright/header block to the top of this
file (view_test.go) above the package nodeliveness declaration so the header
matches the project's required copyright format; ensure the exact standard
header text used across the repo is pasted verbatim and that there are no extra
characters or blank lines before the package statement.
In `@coordinator/nodeliveness/view.go`:
- Line 1: This file is missing the standard Apache 2.0 copyright header; add the
project's canonical Apache-2.0 header comment block immediately above the
"package nodeliveness" declaration in view.go (matching the same header style
used across the repo, e.g., full copyright line(s) and license notice or SPDX
identifier) so CI/pipeline recognizes the file as licensed.
In `@coordinator/scheduler/balance.go`:
- Around line 83-93: The code can call pkgScheduler.CheckBalanceStatus with an
empty activeNodes map which leads to division by zero inside CheckBalanceStatus;
modify the block around activeNodes (where s.nodeManager.GetAliveNodes(), the
liveness filter using s.livenessView.IsSchedulableDest, and the call to
pkgScheduler.CheckBalanceStatus with s.changefeedDB.GetTaskSizePerNode()) to
check if activeNodes is empty after filtering and return early (or set moveSize
to zero / skip calling CheckBalanceStatus) so CheckBalanceStatus never receives
an empty nodeSize map.
In `@coordinator/scheduler/drain_test.go`:
- Line 1: Add the standard Apache 2.0 copyright header to the top of
drain_test.go (the file that begins with "package scheduler") so the CI
recognizes proper copyright information; insert the full multi-line header
comment block immediately above the package declaration and ensure it matches
the project's standard header format exactly.
In `@coordinator/scheduler/drain.go`:
- Line 1: The file starting with "package scheduler" is missing the standard
Apache-2.0 copyright header which causes CI failure; add the same Apache 2.0
header block used in drain_test.go at the top of this file above the package
declaration so the header text and year/owner match the repository's standard.
In `@maintainer/node_liveness_test.go`:
- Line 1: Add the project's standard copyright/header to the top of the new test
file (node_liveness_test.go) so CI stops failing; open the file that declares
"package maintainer" and prepend the same license/author header used across
other test files in the repo (matching formatting and year) before the package
declaration.
In `@pkg/api/util_test.go`:
- Line 1: Add the standard Apache 2.0 copyright header to the top of this file
(pkg/api/util_test.go) so CI stops failing; open the file beginning with the
standard multi-line Apache 2.0 header comment block above the package api
declaration and ensure the header text exactly matches the project’s canonical
Apache 2.0 template.
In `@pkg/api/util.go`:
- Around line 170-185: The loop allows invalid Liveness values >
LivenessCaptureStopping to be CAS'd when old == LivenessCaptureStopping; add a
defensive range check at the start of the setter so any incoming v outside the
valid enum range (v < LivenessCaptureAlive || v > LivenessCaptureStopping)
returns false immediately. Locate the loop using l.Load(), the constants
LivenessCaptureAlive/LivenessCaptureDraining/LivenessCaptureStopping, and the
atomic.CompareAndSwapInt32 call and enforce the range check before the existing
guards and CAS attempt.
In `@server/module_election.go`:
- Around line 255-258: The resign failure is being swallowed because the handler
returns errors.Trace(err) (where err is the prior campaign result and likely
nil) instead of the resign error; update the error return in the block that
calls e.resignLogCoordinator() (the resignErr variable) to return
errors.Trace(resignErr) so the actual resign failure from
e.resignLogCoordinator() is propagated and not masked by the earlier err
variable.
🧹 Nitpick comments (10)
pkg/api/util.go (1)
193-203: Pre-existing:String()reads*lnon-atomically.Line 194 dereferences
*ldirectly instead of usingl.Load(). IfString()is called concurrently withStore(), this is a data race. Not introduced by this PR, but worth noting since you're touching this area.Suggested fix
func (l *Liveness) String() string { - switch *l { + switch l.Load() { case LivenessCaptureAlive:api/v1/api.go (1)
202-219: Implementation looks good overall.The flow is clean: validate input → get coordinator → call
DrainNode→ return remaining count with HTTP 202. One consideration: ifreq.CaptureIDis empty, it will pass through toDrainNodewith an emptynode.ID. You may want to validate thatCaptureIDis non-empty up front and return a 400 error, consistent with how other endpoints handle missing parameters.Optional: validate non-empty CaptureID
+ if req.CaptureID == "" { + _ = c.Error(errors.ErrAPIInvalidParam.GenWithStackByArgs("capture_id is required")) + return + } + co, err := o.server.GetCoordinator()coordinator/controller.go (1)
845-883:DrainNodedoes not acquireapiLockunlike other public API methods.Other API methods (
CreateChangefeed,RemoveChangefeed,PauseChangefeed, etc.) acquirec.apiLockbefore accessingchangefeedDBandoperatorController.DrainNodeskips this. If this is intentional (becauseDrainNodeis read-only and the underlying stores are internally synchronized), consider adding a brief comment to document the rationale. Otherwise, consider acquiringc.apiLock.RLock()for consistency.coordinator/scheduler/drain.go (1)
49-112: Round-robin cursor advancement may skip draining nodes unfairly.
s.rrCursoris incremented on every outer-loop iteration (line 98), regardless of how many inner-loop iterations completed. If the outer loop runs multiple times in a singleExecute()call, the cursor advances by the number of outer iterations, potentially skipping nodes. For example, with 3 draining nodes andavailableSize = 6, the cursor could advance by up to 6 in one call, wrapping around multiple times.This is a minor fairness concern rather than a correctness bug, since all draining nodes are still visited in each inner-loop pass. If you want strict round-robin across
Execute()calls, consider advancing the cursor only once perExecute()invocation.server/module_election.go (1)
266-284: Resign error silently discarded when liveness transitions to STOPPING.Line 278 discards the
resignLogCoordinator()error with_ =. If the resign fails, the log coordinator's etcd key remains held, butleaderCancel()on line 279 stops the coordinator anyway. This could cause a brief period where another node cannot win the election. Consider logging the error at minimum.Proposed fix
case <-ticker.C: if e.svr.liveness.Load() == api.LivenessCaptureStopping { log.Info("node is stopping, resign log coordinator actively", zap.String("nodeID", nodeID)) - _ = e.resignLogCoordinator() + if resignErr := e.resignLogCoordinator(); resignErr != nil { + log.Warn("resign log coordinator failed during stop", + zap.String("nodeID", nodeID), zap.Error(resignErr)) + } leaderCancel() return }coordinator/nodeliveness/view.go (1)
60-86: Observe methods unconditionally overwrite records; no epoch or monotonicity guard.Both
ObserveHeartbeatandObserveSetNodeLivenessResponsereplace the record without checkingnodeEpochor ensuring monotonic liveness transitions. If a delayed/reordered heartbeat withALIVEarrives after aSetNodeLivenessResponsewithDRAINING, the view briefly regresses toAlive, potentially causing the drain scheduler to skip the node.Consider adding an epoch check or a monotonicity guard (e.g., only accept transitions forward: Alive → Draining → Stopping).
maintainer/maintainer_manager.go (1)
442-466: Conversion helpers are stateless — could be package-level functions.
fromNodeLivenessPBandtoNodeLivenessPBdon't use anyManagerstate. Making them package-level functions (or moving them to a shared conversion package) would clarify their nature and improve reusability.coordinator/drain/controller.go (3)
29-37: Consider usingsync.RWMutexfor better read concurrency.
GetStatus(line 106) is a read-only method but acquires a fullMutex.Lock(). IfGetStatusis called on a hot path (e.g., fromDrainNodeAPI), switching tosync.RWMutexand usingRLockinGetStatuswould avoid contention with concurrent reads.
200-203: Info-level log on every successful liveness command send may be noisy.
sendSetNodeLivenesslogs atInfolevel on every successful send (line 200). Since commands are resent everyresendInterval(1 second, as wired incontroller.goline 168) until observed, this could produce substantial log volume during drains. ConsiderDebuglevel for resends, or loggingInfoonly on the first send.As per coding guidelines, "log message strings should not include function names and should avoid hyphens (use spaces instead)" — the current message format is compliant.
181-204: Epoch 0 sent when liveness view has no record for the node.When
livenessView.GetNodeEpochreturnsfalse(no record),epochdefaults to0. The maintainer rejects requests wherereq.NodeEpoch != m.nodeEpoch(seemaintainer_manager.goline 402), so the command will be silently rejected if the maintainer's epoch is non-zero. The command will be resent on the next tick, which is fine, but consider logging a debug message when the epoch is not found to aid troubleshooting.
| for { | ||
| old := l.Load() | ||
| if v <= old { | ||
| return false | ||
| } | ||
| if old == LivenessCaptureAlive && v != LivenessCaptureDraining { | ||
| return false | ||
| } | ||
| if old == LivenessCaptureDraining && v != LivenessCaptureStopping { | ||
| return false | ||
| } | ||
|
|
||
| if atomic.CompareAndSwapInt32((*int32)(l), int32(old), int32(v)) { | ||
| return true | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing guard for out-of-range values when old == LivenessCaptureStopping.
If old is Stopping(2), the v <= old check on line 172 rejects values ≤ 2, but any value > 2 (e.g., an invalid Liveness(3)) falls through all guards and gets CAS'd in. While callers should only use defined constants, adding a range check would make the state machine defensive.
Proposed fix
func (l *Liveness) Store(v Liveness) bool {
+ if v < LivenessCaptureDraining || v > LivenessCaptureStopping {
+ return false
+ }
for {
old := l.Load()🤖 Prompt for AI Agents
In `@pkg/api/util.go` around lines 170 - 185, The loop allows invalid Liveness
values > LivenessCaptureStopping to be CAS'd when old ==
LivenessCaptureStopping; add a defensive range check at the start of the setter
so any incoming v outside the valid enum range (v < LivenessCaptureAlive || v >
LivenessCaptureStopping) returns false immediately. Locate the loop using
l.Load(), the constants
LivenessCaptureAlive/LivenessCaptureDraining/LivenessCaptureStopping, and the
atomic.CompareAndSwapInt32 call and enforce the range check before the existing
guards and CAS attempt.
| if resignErr := e.resignLogCoordinator(); resignErr != nil { | ||
| log.Warn("resign log coordinator actively failed", | ||
| zap.String("nodeID", nodeID), zap.Error(resignErr)) | ||
| return errors.Trace(err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: returns err (which is nil after a successful campaign) instead of resignErr.
When resignLogCoordinator() fails, the error is logged but line 258 returns errors.Trace(err) where err is the campaign result (nil at this point), effectively swallowing the resign failure.
Proposed fix
if resignErr := e.resignLogCoordinator(); resignErr != nil {
log.Warn("resign log coordinator actively failed",
zap.String("nodeID", nodeID), zap.Error(resignErr))
- return errors.Trace(err)
+ return errors.Trace(resignErr)
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if resignErr := e.resignLogCoordinator(); resignErr != nil { | |
| log.Warn("resign log coordinator actively failed", | |
| zap.String("nodeID", nodeID), zap.Error(resignErr)) | |
| return errors.Trace(err) | |
| if resignErr := e.resignLogCoordinator(); resignErr != nil { | |
| log.Warn("resign log coordinator actively failed", | |
| zap.String("nodeID", nodeID), zap.Error(resignErr)) | |
| return errors.Trace(resignErr) | |
| } |
🤖 Prompt for AI Agents
In `@server/module_election.go` around lines 255 - 258, The resign failure is
being swallowed because the handler returns errors.Trace(err) (where err is the
prior campaign result and likely nil) instead of the resign error; update the
error return in the block that calls e.resignLogCoordinator() (the resignErr
variable) to return errors.Trace(resignErr) so the actual resign failure from
e.resignLogCoordinator() is propagated and not masked by the earlier err
variable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
server/module_election.go (2)
119-122:⚠️ Potential issue | 🔴 CriticalBug: returns
err(which isnilafter a successful campaign) instead ofresignErr.Same class of bug as flagged at line 239 for the log coordinator path. Here
erris the campaign result (nilon success), so the resign failure is silently swallowed.Proposed fix
if resignErr := e.resign(ctx); resignErr != nil { log.Warn("resign coordinator actively failed", zap.String("nodeID", nodeID), zap.Error(resignErr)) - return errors.Trace(err) + return errors.Trace(resignErr) }
289-298:⚠️ Potential issue | 🟡 MinorBug: "resign failed" and "resign timeout" log messages are swapped.
When
context.DeadlineExceededis matched (line 290), the log says "resign failed" (line 291). The else branch (non-timeout error) logs "resign timeout" (line 296). These labels are reversed.Proposed fix
if resignErr := e.logElection.Resign(resignCtx); resignErr != nil { if errors.Is(errors.Cause(resignErr), context.DeadlineExceeded) { - log.Info("log coordinator resign failed", + log.Warn("log coordinator resign timeout", zap.String("nodeID", nodeID), zap.Error(resignErr)) cancel() return errors.Trace(resignErr) } - log.Warn("log coordinator resign timeout", + log.Warn("log coordinator resign failed", zap.String("nodeID", nodeID), zap.Error(resignErr)) }
🤖 Fix all issues with AI agents
In `@coordinator/drain/controller.go`:
- Around line 210-233: sendSetNodeLiveness currently sends epoch=0 when
livenessView.GetNodeEpoch(nodeID) returns false, causing the maintainer
(maintainer_manager.go) to reject the request as an epoch mismatch; update
Controller.sendSetNodeLiveness to detect the missing epoch (ok == false) and log
a clear warning (include nodeID and that node epoch is unknown) before sending
the command so operators can diagnose why the drain may be delayed; keep the
message creation and send flow unchanged (still use epoch variable) so behavior
is unchanged but observable.
In `@coordinator/nodeliveness/view.go`:
- Around line 61-87: ObserveHeartbeat and ObserveSetNodeLivenessResponse
unconditionally overwrite the node record, allowing out-of-order messages to
regress liveness; fix by making updates monotonic per node epoch: in both
functions load existing record from v.nodes[nodeID], if resp.NodeEpoch or
hb.NodeEpoch is greater than the stored nodeEpoch accept and replace (new epoch
resets to ALIVE), if equal only accept the update when the new liveness is the
same or a forward transition (ALIVE→DRAINING→STOPPING) and update lastSeen
appropriately, and if the new epoch is older or the liveness would move backward
reject the update; use the record struct fields nodeEpoch and liveness to
perform these checks inside ObserveHeartbeat and ObserveSetNodeLivenessResponse
while holding v.mu.
🧹 Nitpick comments (7)
api/v1/api.go (1)
201-229: Wrap errors from internal calls before passing to gin context.Per coding guidelines, errors from internal/library calls should be wrapped with
errors.Trace(err)to attach stack traces. The errors fromSelfInfo(),GetCoordinator(), andDrainNode()are passed directly toc.Error()without wrapping.Proposed fix
self, err := o.server.SelfInfo() if err != nil { - _ = c.Error(err) + _ = c.Error(errors.Trace(err)) return } // For compatibility with old arch TiCDC, draining the current owner is not allowed. if target == self.ID { _ = c.Error(errors.ErrSchedulerRequestFailed.GenWithStackByArgs("cannot drain the owner")) return } co, err := o.server.GetCoordinator() if err != nil { - _ = c.Error(err) + _ = c.Error(errors.Trace(err)) return } remaining, err := co.DrainNode(c.Request.Context(), target) if err != nil { - _ = c.Error(err) + _ = c.Error(errors.Trace(err)) return }As per coding guidelines, "When an error comes from a third-party or library call in Go, wrap it immediately with
errors.Trace(err)orerrors.WrapError(...)to attach a stack trace".coordinator/nodeliveness/view.go (1)
99-122:GetStateusestime.Now()internally, unlike other methods that acceptnow.
GetDrainingOrStoppingNodestakes anowparameter for TTL checks, butGetState(and by extensionIsSchedulableDest) callstime.Now()internally. This inconsistency makes the TTL behavior harder to test and could lead to subtle differences in TTL evaluation within the same tick.Consider accepting
now time.Timeas a parameter for consistency, or documenting why the divergence is intentional.maintainer/maintainer_manager.go (2)
448-472:fromNodeLivenessPBandtoNodeLivenessPBdon't use the receiver — make them standalone functions.These methods are bound to
*Managerbut reference no fields fromm. Converting them to package-level functions (or even placing them in a shared conversion utility) would clarify that they're pure mappings and allow reuse without aManagerinstance.♻️ Proposed refactor
-func (m *Manager) fromNodeLivenessPB(l heartbeatpb.NodeLiveness) api.Liveness { +func fromNodeLivenessPB(l heartbeatpb.NodeLiveness) api.Liveness { switch l { case heartbeatpb.NodeLiveness_ALIVE: return api.LivenessCaptureAlive @@ ... -func (m *Manager) toNodeLivenessPB(l api.Liveness) heartbeatpb.NodeLiveness { +func toNodeLivenessPB(l api.Liveness) heartbeatpb.NodeLiveness { switch l { case api.LivenessCaptureAlive: return heartbeatpb.NodeLiveness_ALIVEThen update call sites (Lines 211, 417, 436) to drop the
m.prefix.
279-284: Good: rejectingAddMaintainerwhen stopping prevents scheduling work onto a draining node.This is a solid safety check. Consider also logging at
Warnlevel instead ofInfosince this represents an unexpected request from the coordinator's perspective (it shouldn't be sending work to a stopping node).coordinator/drain/controller.go (3)
132-150:AdvanceLivenessre-acquires the lock per node viaGetStatus— consider a locked snapshot.
listDrainRequestedNodeIDs()takes a lock to snapshot node IDs, then for each node,GetStatus()re-acquires the lock, andtrySendDrainCommand/trySendStopCommandeach acquire it again viacheckAndMarkCommandSend. This is 3 lock round-trips per node per tick.Since state only moves forward (monotonic), this isn't a correctness issue, but it's worth noting for performance if the number of draining nodes grows. A single locked pass that snapshots both IDs and states would reduce contention.
36-55: No mechanism to remove nodes from thenodesmap.Once a node enters the drain workflow, its
nodeStatepersists in the map indefinitely. If the coordinator is long-lived and nodes are added/removed over time, this is a slow memory leak. Consider adding aRemoveNode(nodeID)method or clearing entries oncestoppingObservedis true and the node has been fully removed from the cluster.
15-15:resendIntervalis a hardcoded constant — consider making it configurable.The AI summary mentioned a
resendIntervalparameter in the constructor. The current implementation uses a package-level constant of 1 second. For testing and tuning in production, injecting this via the constructor (or a config struct) would be more flexible.
| // sendSetNodeLiveness sends a liveness command to the target maintainer manager. | ||
| func (c *Controller) sendSetNodeLiveness(nodeID node.ID, target heartbeatpb.NodeLiveness) { | ||
| var epoch uint64 | ||
| if c.livenessView != nil { | ||
| if e, ok := c.livenessView.GetNodeEpoch(nodeID); ok { | ||
| epoch = e | ||
| } | ||
| } | ||
|
|
||
| msg := messaging.NewSingleTargetMessage(nodeID, messaging.MaintainerManagerTopic, &heartbeatpb.SetNodeLivenessRequest{ | ||
| Target: target, | ||
| NodeEpoch: epoch, | ||
| }) | ||
| if err := c.mc.SendCommand(msg); err != nil { | ||
| log.Warn("send set node liveness command failed", | ||
| zap.Stringer("nodeID", nodeID), | ||
| zap.String("target", target.String()), | ||
| zap.Error(err)) | ||
| return | ||
| } | ||
| log.Info("send set node liveness command", | ||
| zap.Stringer("nodeID", nodeID), | ||
| zap.String("target", target.String()), | ||
| zap.Uint64("epoch", epoch)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Drain commands will be silently rejected when the node epoch is unknown.
When livenessView has no epoch for the target node (Line 214, ok is false), epoch defaults to 0. On the receiving side (maintainer_manager.go Line 408), the node compares req.NodeEpoch != m.nodeEpoch — and since newNodeEpoch() guarantees a non-zero value, the request is always rejected with an epoch mismatch log.
This means drain commands sent before the coordinator observes any heartbeat from the target node will be silently ineffective. The resend logic will eventually retry after a heartbeat arrives and populates the epoch, so it's self-healing, but it could delay drain initiation unexpectedly.
Consider either:
- Logging a warning when epoch is not found so operators can diagnose delays, or
- Treating epoch=0 as "match any" on the receiver side for the initial transition.
#!/bin/bash
# Verify how livenessView.GetNodeEpoch is populated — check if heartbeat observation stores the epoch
ast-grep --pattern $'func ($_ *View) GetNodeEpoch($_, $_) ($_, $_) {
$$$
}'
# Also check where ObserveHeartbeat feeds into the view
rg -n "ObserveHeartbeat\|UpdateNodeEpoch\|SetNodeEpoch\|nodeEpoch" --type go -g '*/nodeliveness/*'🤖 Prompt for AI Agents
In `@coordinator/drain/controller.go` around lines 210 - 233, sendSetNodeLiveness
currently sends epoch=0 when livenessView.GetNodeEpoch(nodeID) returns false,
causing the maintainer (maintainer_manager.go) to reject the request as an epoch
mismatch; update Controller.sendSetNodeLiveness to detect the missing epoch (ok
== false) and log a clear warning (include nodeID and that node epoch is
unknown) before sending the command so operators can diagnose why the drain may
be delayed; keep the message creation and send flow unchanged (still use epoch
variable) so behavior is unchanged but observable.
| func (v *View) ObserveHeartbeat(nodeID node.ID, hb *heartbeatpb.NodeHeartbeat, now time.Time) { | ||
| if hb == nil { | ||
| return | ||
| } | ||
|
|
||
| v.mu.Lock() | ||
| v.nodes[nodeID] = record{ | ||
| lastSeen: now, | ||
| nodeEpoch: hb.NodeEpoch, | ||
| liveness: hb.Liveness, | ||
| } | ||
| v.mu.Unlock() | ||
| } | ||
|
|
||
| func (v *View) ObserveSetNodeLivenessResponse(nodeID node.ID, resp *heartbeatpb.SetNodeLivenessResponse, now time.Time) { | ||
| if resp == nil { | ||
| return | ||
| } | ||
|
|
||
| v.mu.Lock() | ||
| v.nodes[nodeID] = record{ | ||
| lastSeen: now, | ||
| nodeEpoch: resp.NodeEpoch, | ||
| liveness: resp.Applied, | ||
| } | ||
| v.mu.Unlock() | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unconditional record overwrite may regress liveness state on out-of-order messages.
Both ObserveHeartbeat and ObserveSetNodeLivenessResponse unconditionally overwrite the record. If a delayed heartbeat (reporting ALIVE) arrives after a SetNodeLivenessResponse (applying DRAINING), the view would briefly revert to ALIVE, which could cause the scheduler to route work to a draining node.
Since liveness transitions are monotonic (ALIVE→DRAINING→STOPPING), consider rejecting updates that would move backward:
Sketch
func (v *View) ObserveHeartbeat(nodeID node.ID, hb *heartbeatpb.NodeHeartbeat, now time.Time) {
if hb == nil {
return
}
v.mu.Lock()
+ if existing, ok := v.nodes[nodeID]; ok {
+ if hb.NodeEpoch == existing.nodeEpoch && hb.Liveness < existing.liveness {
+ v.mu.Unlock()
+ return
+ }
+ }
v.nodes[nodeID] = record{
lastSeen: now,
nodeEpoch: hb.NodeEpoch,
liveness: hb.Liveness,
}
v.mu.Unlock()
}This would also need analogous handling in ObserveSetNodeLivenessResponse. A new epoch (node restart) should still be allowed to reset to ALIVE.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func (v *View) ObserveHeartbeat(nodeID node.ID, hb *heartbeatpb.NodeHeartbeat, now time.Time) { | |
| if hb == nil { | |
| return | |
| } | |
| v.mu.Lock() | |
| v.nodes[nodeID] = record{ | |
| lastSeen: now, | |
| nodeEpoch: hb.NodeEpoch, | |
| liveness: hb.Liveness, | |
| } | |
| v.mu.Unlock() | |
| } | |
| func (v *View) ObserveSetNodeLivenessResponse(nodeID node.ID, resp *heartbeatpb.SetNodeLivenessResponse, now time.Time) { | |
| if resp == nil { | |
| return | |
| } | |
| v.mu.Lock() | |
| v.nodes[nodeID] = record{ | |
| lastSeen: now, | |
| nodeEpoch: resp.NodeEpoch, | |
| liveness: resp.Applied, | |
| } | |
| v.mu.Unlock() | |
| } | |
| func (v *View) ObserveHeartbeat(nodeID node.ID, hb *heartbeatpb.NodeHeartbeat, now time.Time) { | |
| if hb == nil { | |
| return | |
| } | |
| v.mu.Lock() | |
| if existing, ok := v.nodes[nodeID]; ok { | |
| if hb.NodeEpoch == existing.nodeEpoch && hb.Liveness < existing.liveness { | |
| v.mu.Unlock() | |
| return | |
| } | |
| } | |
| v.nodes[nodeID] = record{ | |
| lastSeen: now, | |
| nodeEpoch: hb.NodeEpoch, | |
| liveness: hb.Liveness, | |
| } | |
| v.mu.Unlock() | |
| } | |
| func (v *View) ObserveSetNodeLivenessResponse(nodeID node.ID, resp *heartbeatpb.SetNodeLivenessResponse, now time.Time) { | |
| if resp == nil { | |
| return | |
| } | |
| v.mu.Lock() | |
| v.nodes[nodeID] = record{ | |
| lastSeen: now, | |
| nodeEpoch: resp.NodeEpoch, | |
| liveness: resp.Applied, | |
| } | |
| v.mu.Unlock() | |
| } |
🤖 Prompt for AI Agents
In `@coordinator/nodeliveness/view.go` around lines 61 - 87, ObserveHeartbeat and
ObserveSetNodeLivenessResponse unconditionally overwrite the node record,
allowing out-of-order messages to regress liveness; fix by making updates
monotonic per node epoch: in both functions load existing record from
v.nodes[nodeID], if resp.NodeEpoch or hb.NodeEpoch is greater than the stored
nodeEpoch accept and replace (new epoch resets to ALIVE), if equal only accept
the update when the new liveness is the same or a forward transition
(ALIVE→DRAINING→STOPPING) and update lastSeen appropriately, and if the new
epoch is older or the liveness would move backward reject the update; use the
record struct fields nodeEpoch and liveness to perform these checks inside
ObserveHeartbeat and ObserveSetNodeLivenessResponse while holding v.mu.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@coordinator/nodeliveness/view.go`:
- Around line 111-134: GetState currently calls time.Now() internally which
prevents deterministic tests and is inconsistent with
GetDrainingOrStoppingNodes; change View.GetState to accept a now time.Time
parameter (func (v *View) GetState(nodeID node.ID, now time.Time) State), update
its internal uses of now (replace the time.Now() call), and then update all
callers (notably IsSchedulableDest) to pass through a now value (e.g., take now
as a parameter or forward the now they already have) so TTL comparisons use the
same time source as GetDrainingOrStoppingNodes.
🧹 Nitpick comments (6)
coordinator/scheduler/drain.go (2)
105-118: Round-robin cursor advances on every outer-loop iteration, not perExecutecall.
s.rrCursoris incremented at line 114 inside thefor scheduled < availableSizeloop, meaning it advances multiple times perExecutecall (once per full sweep of draining nodes). This provides fairness across multiple batches within a single call, but is worth confirming this is the intended behavior — the comment on line 41 says "rotates the starting draining node" which could imply once-per-call advancement.
159-180:chooseLeastLoadedDestreturns a false positive when all candidates equal origin andnode.IDzero value is"".If every candidate in
destCandidatesmatchesorigin,chosenremains the zero-value empty string and the guard at line 176 (chosen.IsEmpty()) catches it. This works, but only becausenode.ID("")is never a legitimate node ID. The logic is correct but brittle — consider returningfalseexplicitly whenminSize == math.MaxIntfor clarity.coordinator/drain/controller.go (2)
130-139:GetStatusacquires an exclusive lock for a read-only operation.Use
RLock/RUnlockhere to allow concurrent readers.Suggested fix
func (c *Controller) GetStatus(nodeID node.ID) (drainRequested, drainingObserved, stoppingObserved bool) { - c.mu.Lock() - defer c.mu.Unlock() + c.mu.RLock() + defer c.mu.RUnlock() st, ok := c.nodes[nodeID]
144-162: TOCTOU betweenGetStatusandtrySendDrainCommand/trySendStopCommand— benign but worth noting.
AdvanceLivenessreads state viaGetStatus(acquires and releases lock), then conditionally calls send methods (which re-acquire the lock). Between these calls another goroutine could change the state. In practice this is harmless — the worst case is an extra throttled command or a skipped iteration that self-heals on the next cycle. No action needed now, but if the controller becomes multi-goroutine-hot, consolidating the check-and-send under a single lock hold would eliminate the gap.coordinator/drain/controller_test.go (2)
38-45: Blocking channel reads will hang the test forever if no message is sent.Lines 39, 60, and 71 use
<-mc.GetMessageChannel()which blocks indefinitely. If a code change causes the message not to be sent, the test hangs rather than failing with a useful message.Consider using a helper with a timeout:
Suggested approach
func receiveOrFail(t *testing.T, ch <-chan *messaging.TargetMessage) *messaging.TargetMessage { t.Helper() select { case msg := <-ch: return msg case <-time.After(3 * time.Second): require.FailNow(t, "timed out waiting for message") return nil } }Also applies to: 59-62, 70-73
26-74: Test coverage is thin — consider adding cases for key scenarios.The single test covers the happy path well. Missing scenarios that would strengthen confidence:
- Unknown epoch:
RequestDrainbefore any heartbeat observed (epoch=0 path).- Multiple draining nodes: verifying fair round-robin or independent progression.
- readyToStop=false:
AdvanceLivenessshould not send STOPPING when the callback returns false.- Idempotent
RequestDrain: calling it twice should not double-send.
| func (v *View) GetState(nodeID node.ID) State { | ||
| v.mu.RLock() | ||
| r, ok := v.nodes[nodeID] | ||
| v.mu.RUnlock() | ||
|
|
||
| now := time.Now() | ||
| if !ok { | ||
| // Never observed: keep compatibility during rollout. | ||
| return StateAlive | ||
| } | ||
| if now.Sub(r.lastSeen) > v.ttl { | ||
| return StateUnknown | ||
| } | ||
| switch r.liveness { | ||
| case heartbeatpb.NodeLiveness_ALIVE: | ||
| return StateAlive | ||
| case heartbeatpb.NodeLiveness_DRAINING: | ||
| return StateDraining | ||
| case heartbeatpb.NodeLiveness_STOPPING: | ||
| return StateStopping | ||
| default: | ||
| return StateAlive | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
GetState uses time.Now() internally, making it untestable and inconsistent with GetDrainingOrStoppingNodes.
GetDrainingOrStoppingNodes accepts a now time.Time parameter, but GetState (and therefore IsSchedulableDest) hardcodes time.Now(). This makes TTL-dependent behavior impossible to test deterministically and creates an inconsistency in the API surface.
Suggested fix
-func (v *View) GetState(nodeID node.ID) State {
+func (v *View) GetState(nodeID node.ID, now time.Time) State {
v.mu.RLock()
r, ok := v.nodes[nodeID]
v.mu.RUnlock()
- now := time.Now()
if !ok {
return StateAlive
}Then update IsSchedulableDest similarly:
-func (v *View) IsSchedulableDest(nodeID node.ID) bool {
- return v.GetState(nodeID) == StateAlive
+func (v *View) IsSchedulableDest(nodeID node.ID, now time.Time) bool {
+ return v.GetState(nodeID, now) == StateAlive
}🤖 Prompt for AI Agents
In `@coordinator/nodeliveness/view.go` around lines 111 - 134, GetState currently
calls time.Now() internally which prevents deterministic tests and is
inconsistent with GetDrainingOrStoppingNodes; change View.GetState to accept a
now time.Time parameter (func (v *View) GetState(nodeID node.ID, now time.Time)
State), update its internal uses of now (replace the time.Now() call), and then
update all callers (notably IsSchedulableDest) to pass through a now value
(e.g., take now as a parameter or forward the now they already have) so TTL
comparisons use the same time source as GetDrainingOrStoppingNodes.
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a comprehensive node draining mechanism, a significant feature for improving cluster manageability. However, a critical security concern has been identified: the new drainCapture API endpoint in api/v1/api.go is exposed without authentication, which could allow unauthenticated users to disrupt the cluster. Additionally, a potential issue regarding error handling during log coordinator resignation should be addressed.
| // Usage: | ||
| // curl -X PUT http://127.0.0.1:8300/api/v1/captures/drain | ||
| // TODO: Implement this API in the future, currently it is a no-op. | ||
| func (o *OpenAPIV1) drainCapture(c *gin.Context) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The drainCapture function implements a sensitive cluster management operation (draining a capture), but the pull request does not ensure that the corresponding route registration is protected by authentication middleware. In RegisterOpenAPIV1Routes, the PUT /drain route is registered without authenticateMiddleware, unlike other sensitive operations in the same file (e.g., createChangefeed, updateChangefeed). This could allow unauthenticated users to disrupt the cluster by triggering node drains. It is recommended to add authenticateMiddleware to the route registration for drainCapture in RegisterOpenAPIV1Routes.
| if resignErr := e.logElection.Resign(resignCtx); resignErr != nil { | ||
| if errors.Is(errors.Cause(resignErr), context.DeadlineExceeded) { | ||
| log.Info("log coordinator resign failed", | ||
| log.Info("log coordinator resign timeout", | ||
| zap.String("nodeID", nodeID), zap.Error(resignErr)) | ||
| cancel() | ||
| return errors.Trace(resignErr) | ||
| } | ||
| log.Warn("log coordinator resign timeout", | ||
| log.Warn("log coordinator resign failed", | ||
| zap.String("nodeID", nodeID), zap.Error(resignErr)) | ||
| } | ||
| cancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function currently swallows non-timeout errors from e.logElection.Resign. If Resign fails for a reason other than a timeout, the error is logged as a warning, but the function returns nil. This could mislead the caller into believing the resignation was successful when it wasn't, potentially leading to an inconsistent state where the node is still the log coordinator leader in etcd.
The error should be returned to the caller to handle it properly. Using defer cancel() would also simplify the context cancellation logic.
| if resignErr := e.logElection.Resign(resignCtx); resignErr != nil { | |
| if errors.Is(errors.Cause(resignErr), context.DeadlineExceeded) { | |
| log.Info("log coordinator resign failed", | |
| log.Info("log coordinator resign timeout", | |
| zap.String("nodeID", nodeID), zap.Error(resignErr)) | |
| cancel() | |
| return errors.Trace(resignErr) | |
| } | |
| log.Warn("log coordinator resign timeout", | |
| log.Warn("log coordinator resign failed", | |
| zap.String("nodeID", nodeID), zap.Error(resignErr)) | |
| } | |
| cancel() | |
| defer cancel() | |
| if resignErr := e.logElection.Resign(resignCtx); resignErr != nil { | |
| if errors.Is(errors.Cause(resignErr), context.DeadlineExceeded) { | |
| log.Info("log coordinator resign timeout", | |
| zap.String("nodeID", nodeID), zap.Error(resignErr)) | |
| } else { | |
| log.Warn("log coordinator resign failed", | |
| zap.String("nodeID", nodeID), zap.Error(resignErr)) | |
| } | |
| return errors.Trace(resignErr) | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
server/module_election.go (2)
116-123:⚠️ Potential issue | 🔴 CriticalBug:
errors.Trace(err)returnsnilinstead of the resign error.At line 121,
erris the result ofCampaign(line 95), which isnilat this point (the campaign succeeded for execution to reach this branch). This swallows the resign failure. Same pattern as the known bug on line 239.Proposed fix
if resignErr := e.resign(ctx); resignErr != nil { log.Warn("resign coordinator actively failed", zap.String("nodeID", nodeID), zap.Error(resignErr)) - return errors.Trace(err) + return errors.Trace(resignErr) }
282-301:⚠️ Potential issue | 🟠 Major
resignLogCoordinatorsilently swallows non-timeout resign errors.When
Resignfails with an error other thancontext.DeadlineExceeded, line 296 logs a warning but line 300 returnsnil. This means a genuine resign failure (e.g., network error) is silently ignored and the caller assumes success.Compare with the coordinator's resign path (line 155-161) which propagates the error. Consider returning the error here as well.
Proposed fix
if resignErr := e.logElection.Resign(resignCtx); resignErr != nil { if errors.Is(errors.Cause(resignErr), context.DeadlineExceeded) { log.Info("log coordinator resign timeout", zap.String("nodeID", nodeID), zap.Error(resignErr)) cancel() return errors.Trace(resignErr) } log.Warn("log coordinator resign failed", zap.String("nodeID", nodeID), zap.Error(resignErr)) + cancel() + return errors.Trace(resignErr) } cancel() return nil
🤖 Fix all issues with AI agents
In `@coordinator/drain/controller.go`:
- Around line 12-13: Add a single blank line between the file license header and
the package declaration to match project formatting; update the top of
controller.go so the license block is followed by one empty line before the
"package drain" statement (the package declaration symbol is package drain).
In `@coordinator/nodeliveness/view_test.go`:
- Around line 12-13: Add a blank line between the license header and the package
declaration in this file so it matches the project's formatting (same change as
in view.go); specifically, insert an empty line above the "package nodeliveness"
declaration to separate the header from the package statement.
In `@coordinator/nodeliveness/view.go`:
- Around line 12-13: Add a single blank line between the file's license header
and the package declaration to match the repository style and avoid
gofmt/gofumpt or copyright-check lint failures; specifically insert one empty
line immediately above the existing "package nodeliveness" declaration so the
license block is separated from the package line.
🧹 Nitpick comments (4)
coordinator/nodeliveness/view_test.go (1)
24-43: Test relies on wall-clock time for TTL assertions, making it timing-sensitive.
GetStatecallstime.Now()internally. The test works because the observation timestamps are set far enough in the past, but on a very slow CI runner or with clock skew, the 5-second-from-now observation at line 35 could flicker. IfGetStateis refactored to acceptnow time.Time(as suggested inview.go), these tests become fully deterministic.Current approach is acceptable for now given the generous margins (5s vs 30s TTL, 35s vs 30s TTL).
maintainer/maintainer_manager.go (2)
404-437: Potential TOCTOU betweenm.liveness.Load()andm.liveness.Store(target).Lines 423-425 load
current, comparetarget > current, then callStore(target). Between the load and the store, another goroutine (e.g., the server's signal handler callingStorefor stopping) could advance the liveness. TheStoreCAS loop handles this correctly (it would fail and retry or return false), so the state is safe. However, line 436 sends a response with thecurrentlocal variable which may be stale ifStorereturned false due to a concurrent advance.This is a minor concern since
Storereturning false meanscurrenton line 431 is not updated, and the response on line 436 would report the pre-race value rather than the actual current state. This could cause the coordinator to have a momentarily stale view, which is self-correcting via the next heartbeat.
453-477:fromNodeLivenessPBandtoNodeLivenessPBdon't use the receiver — consider making them package-level functions.These methods don't access any
Managerstate. Making them free functions (or at minimum unexported package-level functions) would better convey their stateless nature and allow reuse without aManagerinstance.Proposed change
-func (m *Manager) fromNodeLivenessPB(l heartbeatpb.NodeLiveness) api.Liveness { +func fromNodeLivenessPB(l heartbeatpb.NodeLiveness) api.Liveness {-func (m *Manager) toNodeLivenessPB(l api.Liveness) heartbeatpb.NodeLiveness { +func toNodeLivenessPB(l api.Liveness) heartbeatpb.NodeLiveness {Then update callers (e.g., lines 211, 422, 441) to drop the
m.prefix.coordinator/drain/controller.go (1)
141-162:AdvanceLivenessre-acquires the lock per-node viaGetStatusafter snapshotting the list — consider whether a single-pass under one lock would be simpler.
listDrainRequestedNodeIDsacquires and releases the lock, then the loop callsGetStatus(lock again) for each node, thentrySendDrainCommand/trySendStopCommand(lock again incheckAndMarkCommandSend). This is correct and avoids holding the lock during I/O, but the triple lock acquisition per node adds overhead. Given the expected low number of draining nodes, this is fine.
| // limitations under the License. | ||
| package drain |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing blank line between license header and package declaration.
Same formatting issue as the other new files.
🤖 Prompt for AI Agents
In `@coordinator/drain/controller.go` around lines 12 - 13, Add a single blank
line between the file license header and the package declaration to match
project formatting; update the top of controller.go so the license block is
followed by one empty line before the "package drain" statement (the package
declaration symbol is package drain).
| // limitations under the License. | ||
| package nodeliveness |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing blank line between license header and package declaration.
Same as view.go — add a blank line for consistency with the rest of the codebase.
🤖 Prompt for AI Agents
In `@coordinator/nodeliveness/view_test.go` around lines 12 - 13, Add a blank line
between the license header and the package declaration in this file so it
matches the project's formatting (same change as in view.go); specifically,
insert an empty line above the "package nodeliveness" declaration to separate
the header from the package statement.
| // limitations under the License. | ||
| package nodeliveness |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing blank line between license header and package declaration.
Most Go files in this repo separate the license block from the package line with a blank line. This may also trigger gofmt/gofumpt or copyright-check lint failures.
Proposed fix
// limitations under the License.
+
package nodeliveness📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // limitations under the License. | |
| package nodeliveness | |
| // limitations under the License. | |
| package nodeliveness |
🤖 Prompt for AI Agents
In `@coordinator/nodeliveness/view.go` around lines 12 - 13, Add a single blank
line between the file's license header and the package declaration to match the
repository style and avoid gofmt/gofumpt or copyright-check lint failures;
specifically insert one empty line immediately above the existing "package
nodeliveness" declaration so the license block is separated from the package
line.
|
[FORMAT CHECKER NOTIFICATION] Notice: To remove the 📖 For more info, you can check the "Contribute Code" section in the development guide. |
|
/test mysql |
|
@hongyunyan: The following test failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
What problem does this PR solve?
Issue Number: close #xxx
What is changed and how it works?
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note
Summary by CodeRabbit
New Features
Tests