-
Notifications
You must be signed in to change notification settings - Fork 38
wip-drain capture-codex potter #4188
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
wip-drain capture-codex potter #4188
Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
📝 WalkthroughWalkthroughThis pull request introduces a comprehensive node liveness tracking and coordinated draining system. New components include state management (Alive/Draining/Stopping), drain controllers, liveness views for filtering, heartbeat messaging infrastructure, and integrations across schedulers, APIs, and maintainer nodes to enable graceful node drain operations. Changes
Sequence Diagram(s)sequenceDiagram
participant Client as Client/API
participant Coord as Coordinator
participant DC as DrainController
participant NLV as NodeLiveness
participant MM as Maintainer
participant Sched as Scheduler
Client->>Coord: DrainNode(nodeID)
Coord->>DC: RequestDrain(nodeID)
DC->>NLV: Update state tracking
MM->>Coord: NodeHeartbeat (periodic)
Coord->>NLV: HandleNodeHeartbeat
Coord->>DC: SetNodeLivenessRequest
DC-->>Coord: SetNodeLiveness(DRAINING)
Coord->>MM: SetNodeLivenessRequest
MM->>Coord: SetNodeLivenessResponse(DRAINING)
Coord->>NLV: HandleSetNodeLivenessResponse
Sched->>NLV: FilterSchedulableDestNodes
NLV-->>Sched: Alive nodes only
Sched->>DC: Remaining(nodeID)
DC-->>Sched: Count of inflight ops + changefeeds
Sched->>Coord: MoveMaintainerOperator
Coord-->>Client: Drain progressing
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ❌ 3❌ Failed checks (2 warnings, 1 inconclusive)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Summary of ChangesHello @hongyunyan, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request implements a robust node draining feature, crucial for maintaining cluster stability during maintenance or scaling operations. It integrates new node liveness states into the coordinator's scheduling and election logic, ensuring that nodes can be gracefully removed without disrupting ongoing changefeeds. The changes span API updates, new controller and scheduler components, and modifications to the messaging and heartbeat protocols. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
|
[FORMAT CHECKER NOTIFICATION] Notice: To remove the 📖 For more info, you can check the "Contribute Code" section in the development guide. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a node draining mechanism, a significant feature for graceful shutdown and maintenance, implemented across multiple components like the API, coordinator, schedulers, and maintainer manager, using a new drain.Controller and DrainScheduler for core logic and a node liveness mechanism. However, a significant security oversight was identified where the drainCapture API endpoint is exposed without authentication, allowing for potential Denial of Service attacks. It is recommended to apply authentication middleware to all sensitive API v1 endpoints, especially those that perform state-changing operations like draining captures. Additionally, there are other areas for improvement, including a potential data race, an inconsistency in how node states are handled, and some confusing enum values.
| func (o *OpenAPIV1) drainCapture(c *gin.Context) { | ||
| var req drainCaptureRequest | ||
| if err := c.ShouldBindJSON(&req); err != nil { | ||
| _ = c.Error(errors.ErrAPIInvalidParam.Wrap(err)) | ||
| return | ||
| } | ||
| drainCaptureCounter.Add(1) | ||
| if drainCaptureCounter.Load()%10 == 0 { | ||
| log.Info("api v1 drainCapture", zap.Any("captureID", req.CaptureID), zap.Int64("currentTableCount", drainCaptureCounter.Load())) | ||
| c.JSON(http.StatusAccepted, &drainCaptureResp{ | ||
| CurrentTableCount: 10, | ||
| }) | ||
| } else { | ||
| log.Info("api v1 drainCapture done", zap.Any("captureID", req.CaptureID), zap.Int64("currentTableCount", drainCaptureCounter.Load())) | ||
| c.JSON(http.StatusAccepted, &drainCaptureResp{ | ||
| CurrentTableCount: 0, | ||
| }) | ||
|
|
||
| coordinator, err := o.server.GetCoordinator() | ||
| if err != nil { | ||
| _ = c.Error(err) | ||
| return | ||
| } | ||
| drainable, ok := coordinator.(interface { | ||
| DrainNode(ctx context.Context, nodeID string) (int, error) | ||
| }) | ||
| if !ok { | ||
| _ = c.Error(stdErrors.New("coordinator does not support node drain")) | ||
| return | ||
| } | ||
|
|
||
| remaining, err := drainable.DrainNode(c.Request.Context(), req.CaptureID) | ||
| if err != nil { | ||
| _ = c.Error(err) | ||
| return | ||
| } | ||
| log.Info("api v1 drainCapture", | ||
| zap.String("captureID", req.CaptureID), | ||
| zap.Int("remaining", remaining)) | ||
| c.JSON(http.StatusAccepted, &drainCaptureResp{ | ||
| CurrentTableCount: remaining, | ||
| }) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The drainCapture API endpoint, now fully implemented and functional, lacks authenticateMiddleware in its route registration (RegisterOpenAPIV1Routes). This is a significant security vulnerability, as it allows any unauthenticated user to trigger a node drain operation, potentially leading to a Denial of Service (DoS) by draining all nodes. It is crucial to protect the captureGroup or the specific drain route with authenticateMiddleware. Furthermore, for consistency with other error handling and to provide more structured error information to clients, it's better to use a typed error from the pkg/errors package instead of a raw string error from the standard library.
_ = c.Error(errors.ErrInternalServerError.WithMessage("coordinator does not support node drain"))| c.mu.Lock() | ||
| st.lastSendDrain = now | ||
| c.mu.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The variable st was captured from a previous critical section (lines 64-67) and is being used here after the lock has been released and re-acquired. This is a potential data race. To ensure thread safety, you should re-fetch the state from the map within this new critical section.
c.mu.Lock()
if st, ok := c.nodes[nodeID]; ok {
st.lastSendDrain = now
}
c.mu.Unlock()| const ( | ||
| // LivenessCaptureAlive means the capture is alive, and ready to serve. | ||
| LivenessCaptureAlive Liveness = 0 | ||
| // LivenessCaptureDraining means the capture is preparing to go offline. | ||
| // It should not be selected as a scheduling destination and should not campaign leadership. | ||
| LivenessCaptureDraining Liveness = 2 | ||
| // LivenessCaptureStopping means the capture is in the process of graceful shutdown. | ||
| LivenessCaptureStopping Liveness = 1 | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The integer values for the Liveness enum are non-sequential and confusing (Alive=0, Stopping=1, Draining=2). This is inconsistent with the corresponding protobuf enum NodeLiveness where DRAINING=1 and STOPPING=2. This discrepancy leads to confusing value-swapping logic in maintainer/maintainer_manager.go and makes the code error-prone.
To improve clarity and reduce the chance of bugs, please consider making the values sequential and consistent with the protobuf definition. A more logical order would be:
LivenessCaptureAlive Liveness = 0
LivenessCaptureDraining Liveness = 1
LivenessCaptureStopping Liveness = 2
This change would also simplify the mapping logic in maintainer/maintainer_manager.go.
| // GetNodesByState returns node IDs whose derived state equals state. | ||
| func (v *View) GetNodesByState(state State, now time.Time) []node.ID { | ||
| v.mu.RLock() | ||
| defer v.mu.RUnlock() | ||
|
|
||
| out := make([]node.ID, 0) | ||
| for id, r := range v.data { | ||
| if r == nil || !r.everSeenHeartbeat { | ||
| continue | ||
| } | ||
| s := StateAlive | ||
| if v.ttl > 0 && now.Sub(r.lastSeen) > v.ttl { | ||
| s = StateUnknown | ||
| } else { | ||
| switch r.liveness { | ||
| case heartbeatpb.NodeLiveness_DRAINING: | ||
| s = StateDraining | ||
| case heartbeatpb.NodeLiveness_STOPPING: | ||
| s = StateStopping | ||
| default: | ||
| s = StateAlive | ||
| } | ||
| } | ||
| if s == state { | ||
| out = append(out, id) | ||
| } | ||
| } | ||
| return out |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic in GetNodesByState is inconsistent with GetState regarding never-seen nodes. GetState returns StateAlive for a never-seen node (for backward compatibility during rollout), but GetNodesByState(StateAlive, ...) will not include it because it only iterates over nodes present in the data map.
This could lead to confusing behavior. For example, FilterSchedulableDestNodes would consider a never-seen node as schedulable, but a call to GetNodesByState(StateAlive) would not list it.
While the current usage in drain.Controller seems safe as it only queries for StateDraining and StateStopping, this inconsistency might cause issues in the future. Please consider aligning the behavior to make the component more predictable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 6
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
pkg/api/util.go (1)
201-211:⚠️ Potential issue | 🟡 Minor
String()performs a non-atomic read of the liveness value.
switch *ldereferences the pointer without usingatomic.LoadInt32. For consistency withLoad()and correctness under concurrent access, usel.Load().Proposed fix
func (l *Liveness) String() string { - switch *l { + switch l.Load() { case LivenessCaptureAlive: return "Alive" case LivenessCaptureDraining:
🤖 Fix all issues with AI agents
In `@coordinator/drain/controller.go`:
- Around line 214-238: The sendSetNodeLiveness method currently accepts an
unused now time.Time parameter; remove the unused parameter by changing the
Controller.sendSetNodeLiveness signature to func (c *Controller)
sendSetNodeLiveness(nodeID node.ID, target heartbeatpb.NodeLiveness) and update
all call sites that pass the now argument to call sendSetNodeLiveness(nodeID,
target) instead; ensure imports/unused variable checks are fixed after the
change and run go vet/build to confirm no remaining references to now remain.
In `@coordinator/nodeliveness/view.go`:
- Around line 162-189: GetNodesByState duplicates the TTL + liveness → State
mapping from GetState; extract that logic into a private, lock-free helper
(e.g., func (v *View) deriveStateFromRecord(r *record, now time.Time) State)
that reads r.everSeenHeartbeat, r.lastSeen, r.liveness and v.ttl to return the
derived State, then call this helper from both GetState and GetNodesByState (use
it on each r from v.data inside GetNodesByState while keeping the RLock). Ensure
the helper accepts a *record and now so no locking is performed inside it and
update both callers to use the new function to keep derivation rules in one
place.
In `@coordinator/scheduler/drain.go`:
- Around line 70-98: The drain scheduler currently uses s.batchSize directly and
can exceed controller capacity; compute availableSize := s.batchSize -
s.operatorController.OperatorSize() (clamp to >=0) at the start of Schedule loop
and use availableSize instead of s.batchSize when checking scheduled limits and
breaking out of loops (e.g., replace checks like scheduled >= s.batchSize with
scheduled >= availableSize). Keep all other logic (drainingNodes iteration,
s.rrIndex, s.changefeedDB.GetByNodeID, s.operatorController.HasOperatorByID,
pickLeastLoadedNode, s.operatorController.AddOperator, nodeTaskSize updates) the
same so drain scheduling respects the controller capacity cap.
In `@heartbeatpb/heartbeat.proto`:
- Around line 129-157: The NodeLiveness enum in heartbeat.proto has DRAINING=1
and STOPPING=2, but the Go constants LivenessCaptureDraining and
LivenessCaptureStopping in pkg/api/util.go are swapped; update the Go constants
so LivenessCaptureDraining = 1 and LivenessCaptureStopping = 2 to match the
proto (adjust the numeric values in the LivenessCapture... constant
declarations), then run tests/linters to ensure no usages rely on the old
numeric values; conversion helpers (if any) like those mapping NodeLiveness <->
Liveness should remain correct.
In `@server/module_election.go`:
- Around line 120-122: The resign failure is being masked by returning the wrong
error variable; update both places where we call e.resign(ctx) (the coordinator
resign path and the log coordinator path) to return the resignErr instead of err
and ensure the error logged with log.Warn still includes resignErr (e.g., change
the return from errors.Trace(err) to errors.Trace(resignErr) for the code paths
around e.resign(ctx) and the corresponding log coordinator branch).
- Around line 264-266: The resign error returned from the active resignation
path is being discarded because the code returns errors.Trace(err) (where err is
nil after a successful Campaign) instead of the actual resignErr; update the
return in the resignation failure branch in module_election.go to return
errors.Trace(resignErr) and ensure the warning log still uses
zap.Error(resignErr) (refer to resignErr and the log.Warn call near the active
resign branch in the election handling code).
🧹 Nitpick comments (12)
server/module_election.go (2)
135-156: Resign watchdog: silently discarding the resign error.Line 151 discards the resign error. If the resign fails (e.g., etcd is unreachable), the watchdog exits silently and the node continues holding leadership until the lease expires. Consider logging the error so operators can diagnose why leadership wasn't relinquished promptly.
Suggested improvement
log.Info("resign coordinator actively, liveness is stopping", zap.String("nodeID", nodeID)) resignCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - _ = e.resign(resignCtx) + if err := e.resign(resignCtx); err != nil { + log.Warn("resign coordinator failed in liveness watcher", + zap.String("nodeID", nodeID), zap.Error(err)) + } cancel() return
274-293: Same issue: log coordinator watchdog silently discards resign error.Same concern as the coordinator watchdog —
resignLogCoordinator()error is silently discarded on line 289. Also, unlike the coordinator watchdog, this doesn't use a timeout context for the resign call (the timeout is insideresignLogCoordinatoritself, so this is fine, but worth noting for consistency).Suggested improvement
log.Info("resign log coordinator actively, liveness is stopping", zap.String("nodeID", nodeID)) - _ = e.resignLogCoordinator() + if err := e.resignLogCoordinator(); err != nil { + log.Warn("resign log coordinator failed in liveness watcher", + zap.String("nodeID", nodeID), zap.Error(err)) + } returnpkg/api/util.go (1)
148-163: Numeric values don't match the documented monotonic progression order.The comment documents
ALIVE(0) -> DRAINING -> STOPPING, but the actual values areALIVE=0, DRAINING=2, STOPPING=1. WhileStore()uses explicit switch logic (so this is functionally safe), the non-monotonic numbering is confusing and inconsistent with the proto definition (DRAINING=1, STOPPING=2). This likely exists for backward compatibility withLivenessCaptureStopping=1, but a brief comment explaining this would help future readers.Also worth noting: the proto
NodeLivenessenum hasDRAINING=1, STOPPING=2while this Go enum hasDraining=2, Stopping=1— see the related comment onheartbeat.proto.pkg/api/util_test.go (1)
9-35: LGTM!Tests cover the critical transition paths: monotonic progression, direct
Alive→Stopping, and rejection of downgrades. Good use ofrequirefor clear assertion failures.One minor gap: the
Draining→Alivedowngrade rejection isn't explicitly tested (onlyStopping→DrainingandStopping→Aliveare). Consider adding a brief assertion if you want full transition matrix coverage:// In TestLivenessStoreMonotonic, after transitioning to Draining: require.False(t, l.Store(LivenessCaptureAlive)) // reject downgrademaintainer/maintainer_manager.go (1)
364-404:onSetNodeLivenessRequest: log message contains a hyphen.Line 368:
"ignore set node liveness request from non-coordinator"— the coding guideline says log message strings should avoid hyphens (use spaces instead). Consider"ignore set node liveness request from non coordinator"or rewording to"ignore set node liveness request, sender is not coordinator".As per coding guidelines, "log message strings should not include function names and should avoid hyphens (use spaces instead)".
Suggested wording
- log.Warn("ignore set node liveness request from non-coordinator", + log.Warn("ignore set node liveness request, sender is not coordinator",coordinator/coordinator.go (1)
419-425:DrainNodealways returnsnilerror — consider propagating errors from the controller.Currently
DrainNodealways returnsnilerror. If the controller'sDrainNodeencounters an invalid/unknown node ID, the caller (API layer) has no way to return a meaningful error to the user. Consider whethercontroller.DrainNodeshould return an error for unknown nodes, or whether you want to validate the node ID here.api/v1/api.go (2)
225-227: Log message includes handler namedrainCapture.Per coding guidelines, log message strings should not include function names. Consider rewording to something like
"drain capture requested via api v1".As per coding guidelines, "log message strings should not include function names and should avoid hyphens (use spaces instead)".
Suggested wording
- log.Info("api v1 drainCapture", + log.Info("drain capture requested via api v1",
212-218: Use the project'serrorspackage instead ofstdErrors.Newfor consistency with codebase patterns.Line 216 uses
stdErrors.New(...), but all other error handling in this file uses the project's error types (e.g.,errors.ErrAPIInvalidParam). WhilestdErrors.New()functionally results in an HTTP 500 response (which is semantically correct for this scenario), it bypasses the project's error handling conventions. Useerrors.ErrInternalServerError.GenWithStackByArgs(...)to maintain consistency with the established pattern.Suggested change
- _ = c.Error(stdErrors.New("coordinator does not support node drain")) + _ = c.Error(errors.ErrInternalServerError.GenWithStackByArgs("coordinator does not support node drain"))coordinator/controller.go (1)
126-126: Extract the 30 s liveness TTL into a named constant.The TTL is a tuning knob that governs when a node is deemed "unknown". A named constant (or configuration parameter) would make it easier to discover and adjust.
Proposed fix
const ( bootstrapperID = "coordinator" nodeChangeHandlerID = "coordinator-controller" createChangefeedMaxRetry = 10 createChangefeedRetryInterval = 5 * time.Second + defaultLivenessTTL = 30 * time.Second )- livenessView := nodeliveness.NewView(30 * time.Second) + livenessView := nodeliveness.NewView(defaultLivenessTTL)coordinator/scheduler/drain.go (1)
108-123:pickLeastLoadedNodeis correct but non-deterministic on ties.Map iteration order is random in Go, so when multiple nodes share the minimum load, the winner is arbitrary. This is acceptable for load-balancing. A stable tie-breaker (e.g., by node ID) would make behavior more predictable and testable, but is not strictly necessary.
coordinator/drain/controller.go (2)
129-138: Lock churn: acquiring and releasingmuper-node inside the loop.Each iteration of the two loops (lines 129-133 and 134-138) acquires and releases
c.muindividually. Since this runs on a periodic tick with a small number of draining nodes, performance impact is negligible, but you could batch both loops under a single lock acquisition for clarity.Proposed consolidation
func (c *Controller) tick(now time.Time) { if c.livenessView == nil { return } - for _, id := range c.livenessView.GetNodesByState(nodeliveness.StateDraining, now) { - c.mu.Lock() - c.mustGetStateLocked(id) - c.mu.Unlock() - } - for _, id := range c.livenessView.GetNodesByState(nodeliveness.StateStopping, now) { - c.mu.Lock() - c.mustGetStateLocked(id) - c.mu.Unlock() - } - - c.mu.Lock() - ids := make([]node.ID, 0, len(c.nodes)) - for id := range c.nodes { - ids = append(ids, id) - } - c.mu.Unlock() + drainingIDs := c.livenessView.GetNodesByState(nodeliveness.StateDraining, now) + stoppingIDs := c.livenessView.GetNodesByState(nodeliveness.StateStopping, now) + + c.mu.Lock() + for _, id := range drainingIDs { + c.mustGetStateLocked(id) + } + for _, id := range stoppingIDs { + c.mustGetStateLocked(id) + } + ids := make([]node.ID, 0, len(c.nodes)) + for id := range c.nodes { + ids = append(ids, id) + } + c.mu.Unlock() for _, id := range ids { c.tickNode(id, now) } }
240-245: Replace the custommaxInthelper with the built-inmaxfunction.Go 1.25.5 supports the built-in
max()function introduced in Go 1.21, so this custom helper is unnecessary.
| func (c *Controller) sendSetNodeLiveness(nodeID node.ID, target heartbeatpb.NodeLiveness, now time.Time) { | ||
| epoch := uint64(0) | ||
| if c.livenessView != nil { | ||
| if e, ok := c.livenessView.GetNodeEpoch(nodeID); ok { | ||
| epoch = e | ||
| } | ||
| } | ||
|
|
||
| req := &heartbeatpb.SetNodeLivenessRequest{ | ||
| Target: target, | ||
| NodeEpoch: epoch, | ||
| } | ||
| msg := messaging.NewSingleTargetMessage(nodeID, messaging.MaintainerManagerTopic, req) | ||
| if err := c.mc.SendCommand(msg); err != nil { | ||
| log.Warn("send set node liveness request failed", | ||
| zap.Stringer("target", nodeID), | ||
| zap.String("liveness", target.String()), | ||
| zap.Error(err)) | ||
| return | ||
| } | ||
| log.Info("send set node liveness request", | ||
| zap.Stringer("target", nodeID), | ||
| zap.String("liveness", target.String()), | ||
| zap.Uint64("nodeEpoch", epoch)) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unused now parameter in sendSetNodeLiveness.
The now time.Time parameter is accepted but never referenced in the function body. Either use it (e.g., for logging or metrics) or remove it to avoid confusion.
Proposed fix
-func (c *Controller) sendSetNodeLiveness(nodeID node.ID, target heartbeatpb.NodeLiveness, now time.Time) {
+func (c *Controller) sendSetNodeLiveness(nodeID node.ID, target heartbeatpb.NodeLiveness) {And update the three call sites (lines 69, 170, 197) to drop the now argument.
🤖 Prompt for AI Agents
In `@coordinator/drain/controller.go` around lines 214 - 238, The
sendSetNodeLiveness method currently accepts an unused now time.Time parameter;
remove the unused parameter by changing the Controller.sendSetNodeLiveness
signature to func (c *Controller) sendSetNodeLiveness(nodeID node.ID, target
heartbeatpb.NodeLiveness) and update all call sites that pass the now argument
to call sendSetNodeLiveness(nodeID, target) instead; ensure imports/unused
variable checks are fixed after the change and run go vet/build to confirm no
remaining references to now remain.
| // GetNodesByState returns node IDs whose derived state equals state. | ||
| func (v *View) GetNodesByState(state State, now time.Time) []node.ID { | ||
| v.mu.RLock() | ||
| defer v.mu.RUnlock() | ||
|
|
||
| out := make([]node.ID, 0) | ||
| for id, r := range v.data { | ||
| if r == nil || !r.everSeenHeartbeat { | ||
| continue | ||
| } | ||
| s := StateAlive | ||
| if v.ttl > 0 && now.Sub(r.lastSeen) > v.ttl { | ||
| s = StateUnknown | ||
| } else { | ||
| switch r.liveness { | ||
| case heartbeatpb.NodeLiveness_DRAINING: | ||
| s = StateDraining | ||
| case heartbeatpb.NodeLiveness_STOPPING: | ||
| s = StateStopping | ||
| default: | ||
| s = StateAlive | ||
| } | ||
| } | ||
| if s == state { | ||
| out = append(out, id) | ||
| } | ||
| } | ||
| return out |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Duplicated state-derivation logic — extract a lock-free helper to keep GetState and GetNodesByState in sync.
GetNodesByState re-implements the TTL + liveness → State mapping that GetState already performs (lines 172-184 vs 106-116). If the derivation rules change (e.g., a new state is added), the two copies can diverge silently.
Extract the derivation into a private method that operates on a *record without acquiring the lock, then call it from both public methods.
Proposed refactor
+// deriveState returns the liveness state for a record without acquiring any locks.
+func (v *View) deriveState(r *record, now time.Time) State {
+ if r == nil || !r.everSeenHeartbeat {
+ return StateAlive
+ }
+ if v.ttl > 0 && now.Sub(r.lastSeen) > v.ttl {
+ return StateUnknown
+ }
+ switch r.liveness {
+ case heartbeatpb.NodeLiveness_DRAINING:
+ return StateDraining
+ case heartbeatpb.NodeLiveness_STOPPING:
+ return StateStopping
+ default:
+ return StateAlive
+ }
+}
func (v *View) GetState(id node.ID, now time.Time) State {
- var (
- lastSeen time.Time
- liveness heartbeatpb.NodeLiveness
- everSeen bool
- )
v.mu.RLock()
- r := v.data[id]
- if r != nil {
- lastSeen = r.lastSeen
- liveness = r.liveness
- everSeen = r.everSeenHeartbeat
- }
+ r := v.data[id]
v.mu.RUnlock()
-
- if r == nil || !everSeen {
- return StateAlive
- }
- if v.ttl > 0 && now.Sub(lastSeen) > v.ttl {
- return StateUnknown
- }
- switch liveness {
- case heartbeatpb.NodeLiveness_DRAINING:
- return StateDraining
- case heartbeatpb.NodeLiveness_STOPPING:
- return StateStopping
- default:
- return StateAlive
- }
+ return v.deriveState(r, now)
}
func (v *View) GetNodesByState(state State, now time.Time) []node.ID {
v.mu.RLock()
defer v.mu.RUnlock()
out := make([]node.ID, 0)
for id, r := range v.data {
- if r == nil || !r.everSeenHeartbeat {
- continue
- }
- s := StateAlive
- if v.ttl > 0 && now.Sub(r.lastSeen) > v.ttl {
- s = StateUnknown
- } else {
- switch r.liveness {
- case heartbeatpb.NodeLiveness_DRAINING:
- s = StateDraining
- case heartbeatpb.NodeLiveness_STOPPING:
- s = StateStopping
- default:
- s = StateAlive
- }
- }
- if s == state {
+ if v.deriveState(r, now) == state {
out = append(out, id)
}
}
return out
}🤖 Prompt for AI Agents
In `@coordinator/nodeliveness/view.go` around lines 162 - 189, GetNodesByState
duplicates the TTL + liveness → State mapping from GetState; extract that logic
into a private, lock-free helper (e.g., func (v *View) deriveStateFromRecord(r
*record, now time.Time) State) that reads r.everSeenHeartbeat, r.lastSeen,
r.liveness and v.ttl to return the derived State, then call this helper from
both GetState and GetNodesByState (use it on each r from v.data inside
GetNodesByState while keeping the RLock). Ensure the helper accepts a *record
and now so no locking is performed inside it and update both callers to use the
new function to keep derivation rules in one place.
| scheduled := 0 | ||
| for i := 0; i < len(drainingNodes) && scheduled < s.batchSize; i++ { | ||
| origin := drainingNodes[(s.rrIndex+i)%len(drainingNodes)] | ||
| changefeeds := s.changefeedDB.GetByNodeID(origin) | ||
| if len(changefeeds) == 0 { | ||
| continue | ||
| } | ||
|
|
||
| for _, cf := range changefeeds { | ||
| if scheduled >= s.batchSize { | ||
| break | ||
| } | ||
| if s.operatorController.HasOperatorByID(cf.ID) { | ||
| continue | ||
| } | ||
|
|
||
| dest := pickLeastLoadedNode(destNodes, nodeTaskSize) | ||
| if dest == "" { | ||
| log.Info("no schedulable destination node for drain", | ||
| zap.Stringer("origin", origin)) | ||
| return now.Add(time.Second) | ||
| } | ||
|
|
||
| if !s.operatorController.AddOperator(operator.NewMoveMaintainerOperator(s.changefeedDB, cf, origin, dest)) { | ||
| continue | ||
| } | ||
| nodeTaskSize[dest]++ | ||
| scheduled++ | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Drain scheduler does not account for existing in-flight operators when capping batchSize.
The basic scheduler computes availableSize := batchSize - operatorController.OperatorSize() before scheduling. The drain scheduler uses s.batchSize directly, which could cause the operator controller to exceed its intended capacity when drain and regular scheduling run concurrently.
If drain should be rate-limited like other schedulers, subtract current operator count from the batch limit. If this is intentional (drain takes priority), a comment explaining the choice would help.
Proposed fix (if drain should respect capacity)
+ availableSize := s.batchSize - s.operatorController.OperatorSize()
+ if availableSize <= 0 {
+ return now.Add(drainCheckInterval)
+ }
+
scheduled := 0
- for i := 0; i < len(drainingNodes) && scheduled < s.batchSize; i++ {
+ for i := 0; i < len(drainingNodes) && scheduled < availableSize; i++ {
origin := drainingNodes[(s.rrIndex+i)%len(drainingNodes)]🤖 Prompt for AI Agents
In `@coordinator/scheduler/drain.go` around lines 70 - 98, The drain scheduler
currently uses s.batchSize directly and can exceed controller capacity; compute
availableSize := s.batchSize - s.operatorController.OperatorSize() (clamp to
>=0) at the start of Schedule loop and use availableSize instead of s.batchSize
when checking scheduled limits and breaking out of loops (e.g., replace checks
like scheduled >= s.batchSize with scheduled >= availableSize). Keep all other
logic (drainingNodes iteration, s.rrIndex, s.changefeedDB.GetByNodeID,
s.operatorController.HasOperatorByID, pickLeastLoadedNode,
s.operatorController.AddOperator, nodeTaskSize updates) the same so drain
scheduling respects the controller capacity cap.
| // NodeLiveness is the node-level liveness state reported by a NodeAgent. | ||
| // | ||
| // It is used by the coordinator to: | ||
| // - filter destination candidates for scheduling | ||
| // - drive node drain progress | ||
| // - avoid campaigning/residing leadership on nodes preparing to go offline | ||
| enum NodeLiveness { | ||
| ALIVE = 0; | ||
| DRAINING = 1; | ||
| STOPPING = 2; | ||
| } | ||
|
|
||
| // NodeHeartbeat is a node-scoped heartbeat, independent of maintainer heartbeats. | ||
| message NodeHeartbeat { | ||
| NodeLiveness liveness = 1; | ||
| uint64 node_epoch = 2; | ||
| } | ||
|
|
||
| // SetNodeLivenessRequest asks a node to upgrade its local liveness monotonically. | ||
| message SetNodeLivenessRequest { | ||
| NodeLiveness target = 1; | ||
| uint64 node_epoch = 2; | ||
| } | ||
|
|
||
| // SetNodeLivenessResponse reports the liveness applied by the node and its current epoch. | ||
| message SetNodeLivenessResponse { | ||
| NodeLiveness applied = 1; | ||
| uint64 node_epoch = 2; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# First, find and examine the Go Liveness enum definition
fd -e go -x grep -l "LivenessCapture" {} \; | head -5Repository: pingcap/ticdc
Length of output: 179
🏁 Script executed:
# Get the actual enum values from pkg/api/util.go
rg -n "LivenessCapture" -A 1 --type=go -g "!*test*" -g "!*.pb.go"Repository: pingcap/ticdc
Length of output: 3560
🏁 Script executed:
# Search for conversion functions or casting between NodeLiveness and Liveness
rg -n -B3 -A3 "NodeLiveness|Liveness.*=" --type=go -g "!*.pb.go" -g "!*_test.go" | head -100Repository: pingcap/ticdc
Length of output: 5778
🏁 Script executed:
# Look for proto imports and usage patterns
rg -n "heartbeatpb\." --type=go -g "!*.pb.go" | head -50Repository: pingcap/ticdc
Length of output: 5007
🏁 Script executed:
# Get the full onSetNodeLivenessRequest function implementation
sed -n '364,400p' maintainer/maintainer_manager.goRepository: pingcap/ticdc
Length of output: 1160
🏁 Script executed:
# Search for any other conversion patterns between the two types
rg -n "api\.Liveness|heartbeatpb\.NodeLiveness" --type=go -B2 -A2 maintainer/maintainer_manager.go | head -80Repository: pingcap/ticdc
Length of output: 1806
🏁 Script executed:
# Look for the conversion function at lines 455-460
sed -n '450,465p' maintainer/maintainer_manager.goRepository: pingcap/ticdc
Length of output: 451
🏁 Script executed:
# Search for any direct int/uint32 casting or numeric conversions with liveness
rg -n "int32|uint32.*iveness|iveness.*int32|iveness.*uint32" --type=go maintainer/Repository: pingcap/ticdc
Length of output: 1979
🏁 Script executed:
# Check for any generated proto code that might do numeric conversion
fd -e pb.go -x rg -l "NodeLiveness" {} \;Repository: pingcap/ticdc
Length of output: 86
🏁 Script executed:
# Search for any proto.Marshal or proto.Unmarshal operations that might bypass the conversion functions
rg -n "Marshal|Unmarshal" --type=go maintainer/maintainer_manager.goRepository: pingcap/ticdc
Length of output: 101
🏁 Script executed:
# Check for any proto conversion in the generated pb.go file
rg -n "NodeLiveness" heartbeatpb/heartbeat.pb.go | head -30Repository: pingcap/ticdc
Length of output: 2019
🏁 Script executed:
# Verify there are no other conversion functions or casting paths
rg -n "NodeLiveness_|LivenessCapture" --type=go -g "!*test*" -g "!*.pb.go" | grep -v "maintainer_manager\|module_election\|health.go\|util.go"Repository: pingcap/ticdc
Length of output: 693
🏁 Script executed:
# Final check: verify all uses of NodeLiveness in coordinator package
rg -n "NodeLiveness_" --type=go coordinator/ | head -20Repository: pingcap/ticdc
Length of output: 1600
Numeric value mismatch between proto NodeLiveness and Go Liveness enum should be aligned.
The proto defines DRAINING=1, STOPPING=2, but pkg/api/util.go defines LivenessCaptureDraining=2, LivenessCaptureStopping=1 — the numeric values for Draining and Stopping are swapped. While all current conversions between the two types use explicit switch statements that map by enum name (safe), the numeric mismatch is a maintenance risk that could lead to bugs if future code bypasses these conversions via unsafe casting.
Align the numeric values in both enums (recommend setting LivenessCaptureDraining=1, LivenessCaptureStopping=2 in Go to match the proto definition).
🤖 Prompt for AI Agents
In `@heartbeatpb/heartbeat.proto` around lines 129 - 157, The NodeLiveness enum in
heartbeat.proto has DRAINING=1 and STOPPING=2, but the Go constants
LivenessCaptureDraining and LivenessCaptureStopping in pkg/api/util.go are
swapped; update the Go constants so LivenessCaptureDraining = 1 and
LivenessCaptureStopping = 2 to match the proto (adjust the numeric values in the
LivenessCapture... constant declarations), then run tests/linters to ensure no
usages rely on the old numeric values; conversion helpers (if any) like those
mapping NodeLiveness <-> Liveness should remain correct.
| if resignErr := e.resign(ctx); resignErr != nil { | ||
| log.Warn("resign coordinator actively failed", zap.String("nodeID", nodeID), zap.Error(resignErr)) | ||
| return errors.Trace(err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pre-existing bug: returning err instead of resignErr on resign failure.
Line 122 returns errors.Trace(err), but at this point err is nil (the campaign succeeded on line 95 and passed the error checks on lines 101-113). This means a resign failure is silently swallowed and the function returns nil. The same bug exists at line 266 for the log coordinator path. While this predates this PR, the surrounding code was modified and this is worth fixing.
Proposed fix
- return errors.Trace(err)
+ return errors.Trace(resignErr)🤖 Prompt for AI Agents
In `@server/module_election.go` around lines 120 - 122, The resign failure is
being masked by returning the wrong error variable; update both places where we
call e.resign(ctx) (the coordinator resign path and the log coordinator path) to
return the resignErr instead of err and ensure the error logged with log.Warn
still includes resignErr (e.g., change the return from errors.Trace(err) to
errors.Trace(resignErr) for the code paths around e.resign(ctx) and the
corresponding log coordinator branch).
| log.Warn("resign log coordinator actively failed", | ||
| zap.String("nodeID", nodeID), zap.Error(resignErr)) | ||
| return errors.Trace(err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same pre-existing bug: returning err instead of resignErr at line 266.
Same issue as line 122 — err here is nil from logElection.Campaign(ctx, nodeID) succeeding, so the resign error is lost.
Proposed fix
- return errors.Trace(err)
+ return errors.Trace(resignErr)🤖 Prompt for AI Agents
In `@server/module_election.go` around lines 264 - 266, The resign error returned
from the active resignation path is being discarded because the code returns
errors.Trace(err) (where err is nil after a successful Campaign) instead of the
actual resignErr; update the return in the resignation failure branch in
module_election.go to return errors.Trace(resignErr) and ensure the warning log
still uses zap.Error(resignErr) (refer to resignErr and the log.Warn call near
the active resign branch in the election handling code).
What problem does this PR solve?
Issue Number: close #xxx
What is changed and how it works?
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note
Summary by CodeRabbit
Release Notes
New Features
Bug Fixes