-
Notifications
You must be signed in to change notification settings - Fork 38
wip drain capture #4124
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
wip drain capture #4124
Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
📝 WalkthroughWalkthroughThis pull request introduces a comprehensive node draining feature for TiCDC, enabling graceful node evacuation. It adds a DrainController to orchestrate draining state transitions, a three-state node liveness model (Alive → Draining → Stopping), and a DrainScheduler to migrate maintainers away from draining nodes. The system integrates new protobuf messages for heartbeats and liveness control, updates the drain API endpoint to use the coordinator, and refactors schedulers to use a DestNodeSelector abstraction for filtering schedulable destinations. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ❌ 3❌ Failed checks (2 warnings, 1 inconclusive)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Important Action Needed: IP Allowlist UpdateIf your organization protects your Git platform with IP whitelisting, please add the new CodeRabbit IP address to your allowlist:
Failure to add the new IP will result in interrupted reviews. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Summary of ChangesHello @hongyunyan, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a robust node draining mechanism, allowing for the graceful removal or shutdown of nodes by systematically migrating their assigned workloads. It establishes a clear, monotonic lifecycle for nodes (Alive, Draining, Stopping) and integrates this state management across the system's core components. The changes enable operators to initiate drains via an API, while the coordinator intelligently orchestrates task migration and ensures that only healthy nodes participate in scheduling and leadership elections, significantly enhancing operational flexibility and system stability. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request implements the drain capture feature, which is a significant piece of work involving a new liveness state machine for capture nodes, a DrainController to orchestrate the draining process, and a DrainScheduler to move workloads away from draining nodes. The changes are well-structured and include comprehensive tests for the new components. I've identified one bug in error handling within the election module and one suggestion for improving robustness in the maintainer manager. Overall, this is a solid implementation of a complex feature.
| zap.String("nodeID", nodeID), | ||
| zap.Error(resignErr)) | ||
| cancel() | ||
| return errors.Trace(err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There seems to be a bug in the error handling here. If e.resign(resignCtx) fails, resignErr will be non-nil, but the function returns errors.Trace(err). At this point, err (from the successful Campaign call) is nil, so the function will return nil, effectively masking the resignation error. It should return errors.Trace(resignErr) to properly propagate the failure, similar to the logic in campaignLogCoordinator.
| return errors.Trace(err) | |
| return errors.Trace(resignErr) |
| func (m *Manager) onAddMaintainerRequest(req *heartbeatpb.AddMaintainerRequest) *heartbeatpb.MaintainerStatus { | ||
| changefeedID := common.NewChangefeedIDFromPB(req.Id) | ||
|
|
||
| if m.liveness != nil && m.liveness.Load() == api.LivenessCaptureStopping { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check correctly refuses to add a maintainer if the node is Stopping. For improved robustness, it would be better to also refuse requests when the node is Draining. A draining node's purpose is to shed its workload, so it shouldn't accept new changefeeds. While the schedulers are designed to prevent this, this check acts as a final safeguard against race conditions where a move is scheduled just before the node's state changes to draining.
| if m.liveness != nil && m.liveness.Load() == api.LivenessCaptureStopping { | |
| if m.liveness != nil && m.liveness.Load() >= api.LivenessCaptureDraining { |
|
[FORMAT CHECKER NOTIFICATION] Notice: To remove the 📖 For more info, you can check the "Contribute Code" section in the development guide. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@coordinator/drain_controller.go`:
- Around line 16-27: The SendCommand call returns a third-party error that must
be wrapped with errors.Trace(err) before logging to preserve stack context;
locate the SendCommand invocation (symbol: SendCommand) and change the error
handling to wrap the returned err with errors.Trace(err) when calling
processLogger/zap logger (e.g., logger.Error or log.Error), and add the import
for "github.com/pingcap/errors" if not present so errors.Trace can be used.
In `@server/module_election.go`:
- Around line 122-131: The resign error path returns the wrong variable; when
e.resign(resignCtx) fails inside the block (resignErr), the function currently
returns errors.Trace(err) where err is nil—change the return to
errors.Trace(resignErr). Update the code around the resign call (the
resignCtx/resignErr handling inside the coordinator resignation logic that logs
with zap.String("nodeID", nodeID) and zap.Error(resignErr)) to cancel the
context and return errors.Trace(resignErr) instead of errors.Trace(err).
🧹 Nitpick comments (3)
api/v1/api_test.go (1)
35-83: Consider reducing duplication between fake coordinator types.
fakeCoordinatorandfakeCoordinatorWithoutDrainshare nearly identical implementations. You could embed a common base or use a single struct with an optionalDrainNodefunction field.♻️ Suggested refactor to reduce duplication
-type fakeCoordinator struct { - remaining int -} - -func (c *fakeCoordinator) Stop() {} -func (c *fakeCoordinator) Run(context.Context) error { return nil } -// ... all other methods ... -func (c *fakeCoordinator) DrainNode(node.ID) int { return c.remaining } - -type fakeCoordinatorWithoutDrain struct{} - -func (c *fakeCoordinatorWithoutDrain) Stop() {} -// ... duplicated methods ... +type fakeCoordinatorBase struct{} + +func (c *fakeCoordinatorBase) Stop() {} +func (c *fakeCoordinatorBase) Run(context.Context) error { return nil } +// ... common methods ... + +type fakeCoordinator struct { + fakeCoordinatorBase + remaining int +} + +func (c *fakeCoordinator) DrainNode(node.ID) int { return c.remaining } + +type fakeCoordinatorWithoutDrain struct { + fakeCoordinatorBase +}server/module_election.go (1)
287-316: Redundant cancel() call after coordinator exits.
cancel()is called on Line 308 inside the goroutine when resigning, and again on Line 316 afterco.Run(coCtx)returns. While callingcancel()multiple times is safe, consider restructuring to make the intent clearer. The goroutine already cancels the context on resign, so the outercancel()is only needed for non-resign exits.maintainer/maintainer_manager.go (1)
467-488: Minor: Redundant interval check when called withforce=true.When
sendNodeHeartbeat(force=true)callssendNodeHeartbeatTo(targetID, true), the interval check at lines 468-470 is redundant sinceforce=truewill skip it anyway. This is harmless but slightly confusing.♻️ Optional: Simplify by removing duplicate check
func (m *Manager) sendNodeHeartbeatTo(targetID node.ID, force bool) { - if !force && time.Since(m.lastNodeHeartbeatTime) < nodeHeartbeatInterval { - return - } m.lastNodeHeartbeatTime = time.Now()Move the interval check to only
sendNodeHeartbeat, sincesendNodeHeartbeatTois also called directly fromonSetNodeLivenessRequestwhere the force parameter controls the behavior.
| import ( | ||
| "sync" | ||
| "time" | ||
|
|
||
| "github.com/pingcap/log" | ||
| "github.com/pingcap/ticdc/coordinator/changefeed" | ||
| "github.com/pingcap/ticdc/coordinator/operator" | ||
| "github.com/pingcap/ticdc/heartbeatpb" | ||
| "github.com/pingcap/ticdc/pkg/messaging" | ||
| "github.com/pingcap/ticdc/pkg/node" | ||
| "go.uber.org/zap" | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# First, let's explore the structure and read the drain_controller.go file
cd /tmp && git ls-files pingcap/ticdc coordinator/drain_controller.go 2>/dev/null || echo "Not a git repo, searching directly"
# Find the file
fd drain_controller.goRepository: pingcap/ticdc
Length of output: 91
🏁 Script executed:
# Read the complete drain_controller.go file
cat -n coordinator/drain_controller.goRepository: pingcap/ticdc
Length of output: 7552
🏁 Script executed:
# Also check if errors package is already imported
rg "github.com/pingcap/errors" coordinator/drain_controller.goRepository: pingcap/ticdc
Length of output: 39
🏁 Script executed:
# Search for SendCommand calls in the file to understand the context
rg "SendCommand" coordinator/drain_controller.go -B 3 -A 3Repository: pingcap/ticdc
Length of output: 344
Wrap SendCommand error with stack trace metadata.
The SendCommand call at line 194 returns a library error that should be wrapped with errors.Trace(err) before logging to preserve stack context, per the coding guidelines for third-party error handling.
🔧 Suggested fix
import (
"sync"
"time"
+ "github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/coordinator/changefeed"
"github.com/pingcap/ticdc/coordinator/operator"
"github.com/pingcap/ticdc/heartbeatpb"
"github.com/pingcap/ticdc/pkg/messaging"
"github.com/pingcap/ticdc/pkg/node"
"go.uber.org/zap"
)
@@
msg := messaging.NewSingleTargetMessage(target, messaging.MaintainerManagerTopic, req)
if err := d.mc.SendCommand(msg); err != nil {
+ err = errors.Trace(err)
log.Warn("send set node liveness request failed",
zap.Stringer("target", target),
zap.Int32("liveness", int32(liveness)),
zap.Error(err))
}🤖 Prompt for AI Agents
In `@coordinator/drain_controller.go` around lines 16 - 27, The SendCommand call
returns a third-party error that must be wrapped with errors.Trace(err) before
logging to preserve stack context; locate the SendCommand invocation (symbol:
SendCommand) and change the error handling to wrap the returned err with
errors.Trace(err) when calling processLogger/zap logger (e.g., logger.Error or
log.Error), and add the import for "github.com/pingcap/errors" if not present so
errors.Trace can be used.
| resignCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) | ||
| if resignErr := e.resign(resignCtx); resignErr != nil { | ||
| log.Warn("resign coordinator actively failed", | ||
| zap.String("nodeID", nodeID), | ||
| zap.Error(resignErr)) | ||
| cancel() | ||
| return errors.Trace(err) | ||
| } | ||
| cancel() | ||
| return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Incorrect error variable returned on resign failure.
When e.resign(resignCtx) fails, the code returns errors.Trace(err) but err is nil at this point (the campaign succeeded). This should return errors.Trace(resignErr) instead.
🐛 Proposed fix
if resignErr := e.resign(resignCtx); resignErr != nil {
log.Warn("resign coordinator actively failed",
zap.String("nodeID", nodeID),
zap.Error(resignErr))
cancel()
- return errors.Trace(err)
+ return errors.Trace(resignErr)
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| resignCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) | |
| if resignErr := e.resign(resignCtx); resignErr != nil { | |
| log.Warn("resign coordinator actively failed", | |
| zap.String("nodeID", nodeID), | |
| zap.Error(resignErr)) | |
| cancel() | |
| return errors.Trace(err) | |
| } | |
| cancel() | |
| return nil | |
| resignCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) | |
| if resignErr := e.resign(resignCtx); resignErr != nil { | |
| log.Warn("resign coordinator actively failed", | |
| zap.String("nodeID", nodeID), | |
| zap.Error(resignErr)) | |
| cancel() | |
| return errors.Trace(resignErr) | |
| } | |
| cancel() | |
| return nil |
🤖 Prompt for AI Agents
In `@server/module_election.go` around lines 122 - 131, The resign error path
returns the wrong variable; when e.resign(resignCtx) fails inside the block
(resignErr), the function currently returns errors.Trace(err) where err is
nil—change the return to errors.Trace(resignErr). Update the code around the
resign call (the resignCtx/resignErr handling inside the coordinator resignation
logic that logs with zap.String("nodeID", nodeID) and zap.Error(resignErr)) to
cancel the context and return errors.Trace(resignErr) instead of
errors.Trace(err).
What problem does this PR solve?
Issue Number: close #xxx
What is changed and how it works?
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note
Summary by CodeRabbit
Release Notes
New Features
Tests