Skip to content

Conversation

@hongyunyan
Copy link
Collaborator

@hongyunyan hongyunyan commented Feb 11, 2026

What problem does this PR solve?

Issue Number: close #xxx

What is changed and how it works?

Check List

Tests

  • Unit test
  • Integration test
  • Manual test (add detailed scripts or steps below)
  • No code

Questions

Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?

Release note

Please refer to [Release Notes Language Style Guide](https://pingcap.github.io/tidb-dev-guide/contribute-to-tidb/release-notes-style-guide.html) to write a quality release note.

If you don't think this PR needs a release note then fill it with `None`.

Summary by CodeRabbit

  • New Features

    • Node draining workflow: requestable drains, coordinated progression, and remaining-work reporting via a DrainNode API.
    • Node liveness states and heartbeat propagation (Alive, Draining, Stopping, Unknown) with scheduler destination filtering.
    • Schedulers and maintainer integrate liveness to avoid scheduling to draining/stopping nodes; heartbeat and liveness control messages added.
  • Tests

    • Added tests covering drain controller, nodeliveness view, drain scheduler, maintainer liveness handling, and Liveness state machine.

@ti-chi-bot ti-chi-bot bot added do-not-merge/needs-linked-issue do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. release-note Denotes a PR that will be considered when it comes time to generate release notes. labels Feb 11, 2026
@ti-chi-bot
Copy link

ti-chi-bot bot commented Feb 11, 2026

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign kennytm for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 11, 2026

Warning

Rate limit exceeded

@hongyunyan has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 20 minutes and 46 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

📝 Walkthrough

Walkthrough

Adds node liveness states (Alive → Draining → Stopping), a Drain Controller to issue SetNodeLiveness commands, heartbeat-based liveness observation, liveness-aware schedulers (including a drain scheduler), and a Coordinator.DrainNode API; maintainer emits NodeHeartbeat and responds to SetNodeLiveness requests.

Changes

Cohort / File(s) Summary
Protocol & Messaging
heartbeatpb/heartbeat.proto, pkg/messaging/message.go
Added NodeLiveness enum, NodeHeartbeat, SetNodeLivenessRequest/Response protobufs and IOType mappings for node liveness message types.
API Liveness model
pkg/api/util.go, pkg/api/util_test.go
Introduced LivenessCaptureDraining and adjusted Stopping value; enforced monotonic Alive→Draining→Stopping transitions and tests.
Maintainer (node agent)
maintainer/maintainer_manager.go, maintainer/maintainer_manager_test.go, maintainer/node_liveness_test.go
Maintainer now tracks liveness and nodeEpoch, sends periodic NodeHeartbeat, handles SetNodeLivenessRequest/Response, enforces epoch checks, and updated tests.
Coordinator: drain orchestration
coordinator/drain/controller.go, coordinator/drain/controller_test.go, coordinator/coordinator.go
New in-memory drain Controller managing per-node drain state, sending DRAINING→STOPPING commands with resend throttling, observing heartbeats/responses, and Coordinator.DrainNode delegating to Controller.
Coordinator: nodeliveness view & integration
coordinator/nodeliveness/view.go, coordinator/nodeliveness/view_test.go, coordinator/controller.go, coordinator/controller_drain_test.go
New TTL-driven nodeliveness View; Controller wired with nodeLivenessView and drainController, heartbeats feed both, and drain logic invoked during periodic tasks; added DrainNode API on Controller.
Schedulers: filtering and drain scheduler
coordinator/scheduler/basic.go, coordinator/scheduler/balance.go, coordinator/scheduler/drain.go, coordinator/scheduler/drain_test.go, pkg/scheduler/scheduler.go
Basic/Balance schedulers now respect liveness (skip draining/stopping destinations); added drainScheduler to migrate workloads off draining/stopping nodes and integrated optional scheduler startup and tests.
Operator helpers
coordinator/operator/operator_controller.go
Added CountOperatorsInvolvingNode to count in-flight operators affecting a node (used in drain safety checks).
Server & API surface
api/v1/api.go, pkg/server/coordinator.go, server/server.go, server/module_election.go
API v1 replaced atomic drain counter with coordinator.DrainNode; added DrainNode to Coordinator interface; server initializes and passes liveness into MaintainerManager; election flows updated to consult liveness.
Tests & coverage
multiple *_test.go (coordinator, drain, nodeliveness, maintainer, api)
Extensive unit tests added for drain controller, nodeliveness view, drain scheduler, maintainer liveness handling, and Liveness state machine.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant API as API/v1
    participant Coord as Coordinator
    participant DC as DrainController
    participant LV as LivenessView
    participant MM as MaintainerManager
    participant DS as DrainScheduler

    Client->>API: DrainNode(target)
    API->>Coord: DrainNode(ctx, target)
    Coord->>DC: RequestDrain(target)
    DC-->>Coord: acknowledge request
    Coord->>DC: GetStatus(target) / compute remaining
    Coord-->>API: return remainingCount

    loop periodic heartbeat/advance
        MM->>Coord: NodeHeartbeat(liveness, epoch)
        Coord->>LV: ObserveHeartbeat(...)
        Coord->>DC: ObserveHeartbeat(...)
        DC-->>Coord: mark draining/stopping observed
        alt readyToStop(target)
            DC->>MM: SetNodeLivenessRequest(STOPPING, epoch)
            MM->>Coord: SetNodeLivenessResponse(applied, epoch)
            Coord->>DC: ObserveSetNodeLivenessResponse(...)
        else still draining
            DC->>MM: SetNodeLivenessRequest(DRAINING, epoch)
            MM->>Coord: SetNodeLivenessResponse(...)
        end
    end

    DS->>LV: GetDrainingOrStoppingNodes()
    DS->>Coord: request move operators for origins
    Coord->>DS: approve/create MoveMaintainerOperators
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Suggested labels

lgtm, approved

Suggested reviewers

  • wk989898
  • tenfyzhong
  • flowbehappy
  • lidezhu

Poem

🐰 I nudged nodes from Alive to Drain,
Heartbeats hopped across the plain,
Controllers whispered stop and go,
Schedulers moved the load just so,
A carrot-cheer — now servers rest again.

🚥 Pre-merge checks | ❌ 4
❌ Failed checks (3 warnings, 1 inconclusive)
Check name Status Explanation Resolution
Description check ⚠️ Warning The PR description is entirely unfinished, containing only the repository template with placeholder text and no actual implementation details, test information, or answers to required questions. Fill in the PR description with issue number, detailed explanation of changes, test coverage status, compatibility impacts, and release notes.
Docstring Coverage ⚠️ Warning Docstring coverage is 32.56% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Merge Conflict Detection ⚠️ Warning ❌ Merge conflicts detected (27 files):

⚔️ api/v1/api.go (content)
⚔️ cmd/kafka-consumer/writer_test.go (content)
⚔️ cmd/util/event_group.go (content)
⚔️ cmd/util/event_group_test.go (content)
⚔️ coordinator/controller.go (content)
⚔️ coordinator/coordinator.go (content)
⚔️ coordinator/operator/operator_controller.go (content)
⚔️ coordinator/scheduler/balance.go (content)
⚔️ coordinator/scheduler/basic.go (content)
⚔️ downstreamadapter/dispatcher/block_event_executor.go (content)
⚔️ heartbeatpb/heartbeat.pb.go (content)
⚔️ heartbeatpb/heartbeat.proto (content)
⚔️ logservice/txnutil/lock_resolver.go (content)
⚔️ maintainer/maintainer_manager.go (content)
⚔️ maintainer/maintainer_manager_test.go (content)
⚔️ pkg/api/util.go (content)
⚔️ pkg/common/event/dml_event.go (content)
⚔️ pkg/common/table_info_helper.go (content)
⚔️ pkg/messaging/message.go (content)
⚔️ pkg/scheduler/scheduler.go (content)
⚔️ pkg/server/coordinator.go (content)
⚔️ server/module_election.go (content)
⚔️ server/server.go (content)
⚔️ tests/integration_tests/correctness_for_shared_column_schema/conf/diff_config.toml (content)
⚔️ tests/integration_tests/correctness_for_shared_column_schema/data/downstream_prepare.sql (content)
⚔️ tests/integration_tests/correctness_for_shared_column_schema/data/upstream_dml.sql (content)
⚔️ tests/integration_tests/correctness_for_shared_column_schema/data/upstream_prepare.sql (content)

These conflicts must be resolved before merging into master.
Resolve conflicts locally and push changes to this branch.
Title check ❓ Inconclusive The title is vague and generic, using 'wip-drain-capture-5.2' which does not clearly convey what the changeset implements. Replace with a clear, concise summary of the main change, e.g., 'Add node drain capability with liveness coordination' or similar.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@ti-chi-bot ti-chi-bot bot added the size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. label Feb 11, 2026
@gemini-code-assist
Copy link

Summary of Changes

Hello @hongyunyan, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a robust node draining mechanism for TiCDC, enabling graceful shutdown and maintenance operations. It establishes a clear lifecycle for nodes (Alive, Draining, Stopping) and integrates this liveness awareness across critical components, including the API, coordinator, schedulers, and maintainers. The changes ensure that tasks are appropriately managed during node transitions, preventing new assignments to unhealthy nodes and facilitating the migration of existing workloads. Additionally, it refines table schema comparison by incorporating index column length for greater accuracy.

Highlights

  • Node Draining Mechanism: Introduced a comprehensive node draining mechanism, defining new liveness states (Alive, Draining, Stopping) to enable graceful shutdown and maintenance of TiCDC nodes.
  • Coordinator Drain Controller: Implemented a new drain.Controller within the coordinator to manage the node draining process, including sending liveness update requests to individual nodes.
  • API Integration for Drain Capture: Updated the v1 API endpoint for draining captures to utilize the new coordinator-level drain functionality, replacing previous dummy logic.
  • Scheduler Liveness Awareness: Enhanced existing schedulers (balanceScheduler, basicScheduler) to consider node liveness when selecting destination nodes for task scheduling, preventing new tasks from being assigned to draining or stopping nodes. A new drainScheduler was added to actively migrate tasks from draining nodes.
  • Maintainer Node Liveness Management: Modified the maintainer.Manager to periodically report its liveness state to the coordinator and to respond to liveness change requests, ensuring nodes accurately reflect their operational status. It also prevents new tasks from being added to stopping nodes.
  • Election Process Refinement: Adjusted the coordinator and log coordinator election logic to check node liveness before campaigning and to actively resign leadership if the node transitions to a stopping state.
  • Improved Table Schema Hashing: Updated the table information hashing and comparison logic to include index column length, which is crucial for accurate schema matching and handle decoding.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • api/v1/api.go
    • Removed sync/atomic import and added pkg/node import.
    • Replaced dummy drainCaptureCounter logic with actual calls to the coordinator's DrainNode method.
    • Removed the global drainCaptureCounter variable.
  • coordinator/controller.go
    • Imported coordinator/drain and coordinator/nodeliveness packages.
    • Added nodeLivenessView and drainController fields to the Controller struct.
    • Initialized nodeLivenessView and drainController in the NewController function.
    • Integrated drainController.Tick into the onPeriodTask method.
    • Added message handlers for TypeNodeHeartbeatRequest and TypeSetNodeLivenessResponse.
    • Implemented the DrainNode method to handle node drain requests from the API.
  • coordinator/controller_drain_test.go
    • Added a new test file to verify the DrainNode functionality in the coordinator.
  • coordinator/coordinator.go
    • Added the DrainNode method to the coordinator interface implementation.
  • coordinator/drain/controller.go
    • Added a new file defining the drain.Controller responsible for managing node drain progression.
  • coordinator/drain/controller_test.go
    • Added a new test file for the drain.Controller.
  • coordinator/nodeliveness/view.go
    • Added a new file defining nodeliveness.View to maintain an in-memory view of node liveness.
  • coordinator/nodeliveness/view_test.go
    • Added a new test file for nodeliveness.View.
  • coordinator/operator/operator_controller.go
    • Added CountOperatorsInvolvingNode method to count in-flight operators affecting a specific node.
  • coordinator/scheduler/balance.go
    • Imported coordinator/nodeliveness and added livenessView field.
    • Updated NewBalanceScheduler to accept livenessView.
    • Modified Execute method to filter active nodes based on livenessView.IsSchedulableDest.
  • coordinator/scheduler/basic.go
    • Imported coordinator/nodeliveness and added livenessView field.
    • Updated NewBasicScheduler to accept livenessView.
    • Modified doBasicSchedule method to filter node IDs based on livenessView.IsSchedulableDest.
  • coordinator/scheduler/drain.go
    • Added a new file defining drainScheduler to generate move operators for draining nodes.
  • coordinator/scheduler/drain_test.go
    • Added a new test file for drainScheduler.
  • heartbeatpb/heartbeat.pb.go
    • Added NodeLiveness enum with ALIVE, DRAINING, and STOPPING states.
    • Introduced new protobuf messages: NodeHeartbeat, SetNodeLivenessRequest, and SetNodeLivenessResponse.
    • Updated enum and message descriptor indices, and generated marshal/unmarshal/size methods for new messages.
  • heartbeatpb/heartbeat.proto
    • Defined the NodeLiveness enum and new protobuf messages for node heartbeats and liveness control.
  • maintainer/maintainer_manager.go
    • Imported pkg/api and added nodeHeartbeatInterval constant.
    • Added liveness and nodeEpoch fields to the Manager struct.
    • Updated NewMaintainerManager to initialize node liveness.
    • Added handling for TypeSetNodeLivenessRequest messages.
    • Implemented sendNodeHeartbeat for periodic liveness reporting and sendSetNodeLivenessResponse.
    • Added logic to reject AddMaintainerRequest if the node is in a stopping state.
    • Included helper functions fromNodeLivenessPB and toNodeLivenessPB for liveness state conversion.
  • maintainer/maintainer_manager_test.go
    • Updated NewMaintainerManager calls in tests to pass the liveness argument.
  • maintainer/node_liveness_test.go
    • Added a new test file to verify node liveness transitions and epoch mismatch handling in the maintainer.
  • pkg/api/util.go
    • Expanded the Liveness enum to include LivenessCaptureDraining.
    • Modified the Store method for Liveness to enforce monotonic state transitions (Alive -> Draining -> Stopping) and prevent skipping states.
    • Updated the String() method for Liveness to reflect the new states.
  • pkg/api/util_test.go
    • Added a new test file to validate the monotonic behavior of the Liveness.Store method.
  • pkg/common/table_info_helper.go
    • Included col.Length in the hashTableInfo digest calculation for index columns.
    • Included col.Length in the sameColumnsAndIndices comparison for index columns.
  • pkg/common/table_info_helper_test.go
    • Added a test case TestGetOrSetColumnSchema_DifferentIndexColumnLength to ensure digest changes when index column length differs.
  • pkg/messaging/message.go
    • Added new IOType constants for node drain related messages.
    • Updated String() method, decodeIOType, and NewSingleTargetMessage to support the new message types.
  • pkg/scheduler/scheduler.go
    • Added DrainScheduler constant.
    • Modified the Start method to include the drainScheduler in the task pool and exclude it from general checker schedulers.
  • pkg/server/coordinator.go
    • Imported pkg/node and added the DrainNode method to the Coordinator interface.
  • server/module_election.go
    • Modified campaignCoordinator and campaignLogCoordinator to check for LivenessCaptureAlive before campaigning.
    • Added logic to actively resign coordinator/log coordinator roles if the node's liveness transitions to LivenessCaptureStopping.
  • server/server.go
    • Updated the maintainer.NewMaintainerManager initialization to pass the server's liveness state.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a comprehensive node draining mechanism, a significant feature for improving cluster operability. The changes are extensive, touching upon the API, coordinator, schedulers, maintainers, and core server logic.

Key changes include:

  • A new drain package with a Controller to manage the node draining state machine.
  • A new nodeliveness package with a View to track node liveness with TTL.
  • A new drainScheduler to move work off draining nodes.
  • Updates to basicScheduler and balanceScheduler to be aware of node liveness.
  • A new implementation for the v1 drainCapture API.
  • Enhancements to the server election and shutdown logic to handle draining/stopping states gracefully.
  • Improved correctness in Liveness.Store to enforce monotonic state transitions.

The implementation appears robust and well-tested, with new unit and integration tests covering the new functionality. I have one suggestion for code simplification in the DrainNode function to improve readability.

Comment on lines 867 to 882
// v1 drain API must not return 0 until drain completion is proven.
if nodeState == nodeliveness.StateUnknown || !drainingObserved {
if remaining == 0 {
remaining = 1
}
return remaining, nil
}

// Return 0 only after STOPPING is observed and all work is done.
if stoppingObserved && maintainersOnTarget == 0 && inflightOpsInvolvingTarget == 0 {
return 0, nil
}
if remaining == 0 {
remaining = 1
}
return remaining, nil

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic to ensure remaining is at least 1 unless drain is complete is a bit complex and can be simplified. The current implementation has a duplicated check if remaining == 0 { remaining = 1 } and an early return that makes the flow harder to follow. A refactoring can make the logic more straightforward.

Suggested change
// v1 drain API must not return 0 until drain completion is proven.
if nodeState == nodeliveness.StateUnknown || !drainingObserved {
if remaining == 0 {
remaining = 1
}
return remaining, nil
}
// Return 0 only after STOPPING is observed and all work is done.
if stoppingObserved && maintainersOnTarget == 0 && inflightOpsInvolvingTarget == 0 {
return 0, nil
}
if remaining == 0 {
remaining = 1
}
return remaining, nil
// v1 drain API must not return 0 until drain completion is proven.
// Return 0 only after STOPPING is observed and all work is done, and the
// node liveness is not unknown.
if nodeState != nodeliveness.StateUnknown && stoppingObserved &&
maintainersOnTarget == 0 && inflightOpsInvolvingTarget == 0 {
return 0, nil
}
if remaining == 0 {
return 1, nil
}
return remaining, nil

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 12

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
api/v1/api.go (1)

195-195: ⚠️ Potential issue | 🟡 Minor

Stale TODO comment — the drain API is now implemented.

The comment says "currently it is a no-op" but the method now delegates to co.DrainNode. Remove or update the TODO.

Suggested fix
 // drainCapture drains all tables from a capture.
 // Usage:
 // curl -X PUT http://127.0.0.1:8300/api/v1/captures/drain
-// TODO: Implement this API in the future, currently it is a no-op.
 func (o *OpenAPIV1) drainCapture(c *gin.Context) {
server/module_election.go (1)

322-341: ⚠️ Potential issue | 🟡 Minor

Log messages for "resign failed" and "resign timeout" are swapped.

When DeadlineExceeded is the cause (line 330), the code logs "resign failed" (line 331) and returns the error. For other errors, it logs "resign timeout" (line 336). This is the inverse of the coordinator resign flow (lines 173-184) where DeadlineExceeded logs "timeout" and other errors log "failed".

Proposed fix
 func (e *elector) resignLogCoordinator() error {
 	if e.logElection == nil {
 		return nil
 	}
 	nodeID := string(e.svr.info.ID)
 	// use a new context to prevent the context from being cancelled.
 	resignCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 	if resignErr := e.logElection.Resign(resignCtx); resignErr != nil {
-		if errors.Is(errors.Cause(resignErr), context.DeadlineExceeded) {
-			log.Info("log coordinator resign failed",
+		if errors.Cause(resignErr) != context.DeadlineExceeded {
+			log.Info("log coordinator resign failed",
 				zap.String("nodeID", nodeID), zap.Error(resignErr))
 			cancel()
 			return errors.Trace(resignErr)
 		}
 		log.Warn("log coordinator resign timeout",
 			zap.String("nodeID", nodeID), zap.Error(resignErr))
 	}
 	cancel()
 	return nil
 }
🤖 Fix all issues with AI agents
In `@coordinator/controller_drain_test.go`:
- Line 1: Add the standard Apache 2.0 copyright header used across the
repository to the top of this file (above the "package coordinator" declaration)
so CI stops failing; copy the exact header text used in other files (for example
the header at the top of coordinator.go) and place it verbatim at the top of
coordinator/controller_drain_test.go.

In `@coordinator/drain/controller_test.go`:
- Line 1: Add the standard PingCAP copyright header to the top of
controller_test.go (above the line "package drain")—mirror the exact header used
in view_test.go so CI recognizes it; ensure the header appears before the
package declaration in controller_test.go.

In `@coordinator/drain/controller.go`:
- Line 1: Missing Apache 2.0 copyright header: add the project's standard Apache
2.0 header comment block to the top of controller.go above the existing package
drain declaration, ensuring it includes the correct copyright year and copyright
holder name used across the repo and matches the exact header format used in
other files.

In `@coordinator/nodeliveness/view_test.go`:
- Line 1: Add the standard PingCAP copyright/header block to the top of this
file (view_test.go) above the package nodeliveness declaration so the header
matches the project's required copyright format; ensure the exact standard
header text used across the repo is pasted verbatim and that there are no extra
characters or blank lines before the package statement.

In `@coordinator/nodeliveness/view.go`:
- Line 1: This file is missing the standard Apache 2.0 copyright header; add the
project's canonical Apache-2.0 header comment block immediately above the
"package nodeliveness" declaration in view.go (matching the same header style
used across the repo, e.g., full copyright line(s) and license notice or SPDX
identifier) so CI/pipeline recognizes the file as licensed.

In `@coordinator/scheduler/balance.go`:
- Around line 83-93: The code can call pkgScheduler.CheckBalanceStatus with an
empty activeNodes map which leads to division by zero inside CheckBalanceStatus;
modify the block around activeNodes (where s.nodeManager.GetAliveNodes(), the
liveness filter using s.livenessView.IsSchedulableDest, and the call to
pkgScheduler.CheckBalanceStatus with s.changefeedDB.GetTaskSizePerNode()) to
check if activeNodes is empty after filtering and return early (or set moveSize
to zero / skip calling CheckBalanceStatus) so CheckBalanceStatus never receives
an empty nodeSize map.

In `@coordinator/scheduler/drain_test.go`:
- Line 1: Add the standard Apache 2.0 copyright header to the top of
drain_test.go (the file that begins with "package scheduler") so the CI
recognizes proper copyright information; insert the full multi-line header
comment block immediately above the package declaration and ensure it matches
the project's standard header format exactly.

In `@coordinator/scheduler/drain.go`:
- Line 1: The file starting with "package scheduler" is missing the standard
Apache-2.0 copyright header which causes CI failure; add the same Apache 2.0
header block used in drain_test.go at the top of this file above the package
declaration so the header text and year/owner match the repository's standard.

In `@maintainer/node_liveness_test.go`:
- Line 1: Add the project's standard copyright/header to the top of the new test
file (node_liveness_test.go) so CI stops failing; open the file that declares
"package maintainer" and prepend the same license/author header used across
other test files in the repo (matching formatting and year) before the package
declaration.

In `@pkg/api/util_test.go`:
- Line 1: Add the standard Apache 2.0 copyright header to the top of this file
(pkg/api/util_test.go) so CI stops failing; open the file beginning with the
standard multi-line Apache 2.0 header comment block above the package api
declaration and ensure the header text exactly matches the project’s canonical
Apache 2.0 template.

In `@pkg/api/util.go`:
- Around line 170-185: The loop allows invalid Liveness values >
LivenessCaptureStopping to be CAS'd when old == LivenessCaptureStopping; add a
defensive range check at the start of the setter so any incoming v outside the
valid enum range (v < LivenessCaptureAlive || v > LivenessCaptureStopping)
returns false immediately. Locate the loop using l.Load(), the constants
LivenessCaptureAlive/LivenessCaptureDraining/LivenessCaptureStopping, and the
atomic.CompareAndSwapInt32 call and enforce the range check before the existing
guards and CAS attempt.

In `@server/module_election.go`:
- Around line 255-258: The resign failure is being swallowed because the handler
returns errors.Trace(err) (where err is the prior campaign result and likely
nil) instead of the resign error; update the error return in the block that
calls e.resignLogCoordinator() (the resignErr variable) to return
errors.Trace(resignErr) so the actual resign failure from
e.resignLogCoordinator() is propagated and not masked by the earlier err
variable.
🧹 Nitpick comments (10)
pkg/api/util.go (1)

193-203: Pre-existing: String() reads *l non-atomically.

Line 194 dereferences *l directly instead of using l.Load(). If String() is called concurrently with Store(), this is a data race. Not introduced by this PR, but worth noting since you're touching this area.

Suggested fix
 func (l *Liveness) String() string {
-	switch *l {
+	switch l.Load() {
 	case LivenessCaptureAlive:
api/v1/api.go (1)

202-219: Implementation looks good overall.

The flow is clean: validate input → get coordinator → call DrainNode → return remaining count with HTTP 202. One consideration: if req.CaptureID is empty, it will pass through to DrainNode with an empty node.ID. You may want to validate that CaptureID is non-empty up front and return a 400 error, consistent with how other endpoints handle missing parameters.

Optional: validate non-empty CaptureID
+	if req.CaptureID == "" {
+		_ = c.Error(errors.ErrAPIInvalidParam.GenWithStackByArgs("capture_id is required"))
+		return
+	}
+
 	co, err := o.server.GetCoordinator()
coordinator/controller.go (1)

845-883: DrainNode does not acquire apiLock unlike other public API methods.

Other API methods (CreateChangefeed, RemoveChangefeed, PauseChangefeed, etc.) acquire c.apiLock before accessing changefeedDB and operatorController. DrainNode skips this. If this is intentional (because DrainNode is read-only and the underlying stores are internally synchronized), consider adding a brief comment to document the rationale. Otherwise, consider acquiring c.apiLock.RLock() for consistency.

coordinator/scheduler/drain.go (1)

49-112: Round-robin cursor advancement may skip draining nodes unfairly.

s.rrCursor is incremented on every outer-loop iteration (line 98), regardless of how many inner-loop iterations completed. If the outer loop runs multiple times in a single Execute() call, the cursor advances by the number of outer iterations, potentially skipping nodes. For example, with 3 draining nodes and availableSize = 6, the cursor could advance by up to 6 in one call, wrapping around multiple times.

This is a minor fairness concern rather than a correctness bug, since all draining nodes are still visited in each inner-loop pass. If you want strict round-robin across Execute() calls, consider advancing the cursor only once per Execute() invocation.

server/module_election.go (1)

266-284: Resign error silently discarded when liveness transitions to STOPPING.

Line 278 discards the resignLogCoordinator() error with _ =. If the resign fails, the log coordinator's etcd key remains held, but leaderCancel() on line 279 stops the coordinator anyway. This could cause a brief period where another node cannot win the election. Consider logging the error at minimum.

Proposed fix
 				case <-ticker.C:
 					if e.svr.liveness.Load() == api.LivenessCaptureStopping {
 						log.Info("node is stopping, resign log coordinator actively",
 							zap.String("nodeID", nodeID))
-						_ = e.resignLogCoordinator()
+						if resignErr := e.resignLogCoordinator(); resignErr != nil {
+							log.Warn("resign log coordinator failed during stop",
+								zap.String("nodeID", nodeID), zap.Error(resignErr))
+						}
 						leaderCancel()
 						return
 					}
coordinator/nodeliveness/view.go (1)

60-86: Observe methods unconditionally overwrite records; no epoch or monotonicity guard.

Both ObserveHeartbeat and ObserveSetNodeLivenessResponse replace the record without checking nodeEpoch or ensuring monotonic liveness transitions. If a delayed/reordered heartbeat with ALIVE arrives after a SetNodeLivenessResponse with DRAINING, the view briefly regresses to Alive, potentially causing the drain scheduler to skip the node.

Consider adding an epoch check or a monotonicity guard (e.g., only accept transitions forward: Alive → Draining → Stopping).

maintainer/maintainer_manager.go (1)

442-466: Conversion helpers are stateless — could be package-level functions.

fromNodeLivenessPB and toNodeLivenessPB don't use any Manager state. Making them package-level functions (or moving them to a shared conversion package) would clarify their nature and improve reusability.

coordinator/drain/controller.go (3)

29-37: Consider using sync.RWMutex for better read concurrency.

GetStatus (line 106) is a read-only method but acquires a full Mutex.Lock(). If GetStatus is called on a hot path (e.g., from DrainNode API), switching to sync.RWMutex and using RLock in GetStatus would avoid contention with concurrent reads.


200-203: Info-level log on every successful liveness command send may be noisy.

sendSetNodeLiveness logs at Info level on every successful send (line 200). Since commands are resent every resendInterval (1 second, as wired in controller.go line 168) until observed, this could produce substantial log volume during drains. Consider Debug level for resends, or logging Info only on the first send.

As per coding guidelines, "log message strings should not include function names and should avoid hyphens (use spaces instead)" — the current message format is compliant.


181-204: Epoch 0 sent when liveness view has no record for the node.

When livenessView.GetNodeEpoch returns false (no record), epoch defaults to 0. The maintainer rejects requests where req.NodeEpoch != m.nodeEpoch (see maintainer_manager.go line 402), so the command will be silently rejected if the maintainer's epoch is non-zero. The command will be resent on the next tick, which is fine, but consider logging a debug message when the epoch is not found to aid troubleshooting.

Comment on lines +170 to +185
for {
old := l.Load()
if v <= old {
return false
}
if old == LivenessCaptureAlive && v != LivenessCaptureDraining {
return false
}
if old == LivenessCaptureDraining && v != LivenessCaptureStopping {
return false
}

if atomic.CompareAndSwapInt32((*int32)(l), int32(old), int32(v)) {
return true
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Missing guard for out-of-range values when old == LivenessCaptureStopping.

If old is Stopping(2), the v <= old check on line 172 rejects values ≤ 2, but any value > 2 (e.g., an invalid Liveness(3)) falls through all guards and gets CAS'd in. While callers should only use defined constants, adding a range check would make the state machine defensive.

Proposed fix
 func (l *Liveness) Store(v Liveness) bool {
+	if v < LivenessCaptureDraining || v > LivenessCaptureStopping {
+		return false
+	}
 	for {
 		old := l.Load()
🤖 Prompt for AI Agents
In `@pkg/api/util.go` around lines 170 - 185, The loop allows invalid Liveness
values > LivenessCaptureStopping to be CAS'd when old ==
LivenessCaptureStopping; add a defensive range check at the start of the setter
so any incoming v outside the valid enum range (v < LivenessCaptureAlive || v >
LivenessCaptureStopping) returns false immediately. Locate the loop using
l.Load(), the constants
LivenessCaptureAlive/LivenessCaptureDraining/LivenessCaptureStopping, and the
atomic.CompareAndSwapInt32 call and enforce the range check before the existing
guards and CAS attempt.

Comment on lines +255 to 258
if resignErr := e.resignLogCoordinator(); resignErr != nil {
log.Warn("resign log coordinator actively failed",
zap.String("nodeID", nodeID), zap.Error(resignErr))
return errors.Trace(err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Bug: returns err (which is nil after a successful campaign) instead of resignErr.

When resignLogCoordinator() fails, the error is logged but line 258 returns errors.Trace(err) where err is the campaign result (nil at this point), effectively swallowing the resign failure.

Proposed fix
 		if resignErr := e.resignLogCoordinator(); resignErr != nil {
 			log.Warn("resign log coordinator actively failed",
 				zap.String("nodeID", nodeID), zap.Error(resignErr))
-			return errors.Trace(err)
+			return errors.Trace(resignErr)
 		}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if resignErr := e.resignLogCoordinator(); resignErr != nil {
log.Warn("resign log coordinator actively failed",
zap.String("nodeID", nodeID), zap.Error(resignErr))
return errors.Trace(err)
if resignErr := e.resignLogCoordinator(); resignErr != nil {
log.Warn("resign log coordinator actively failed",
zap.String("nodeID", nodeID), zap.Error(resignErr))
return errors.Trace(resignErr)
}
🤖 Prompt for AI Agents
In `@server/module_election.go` around lines 255 - 258, The resign failure is
being swallowed because the handler returns errors.Trace(err) (where err is the
prior campaign result and likely nil) instead of the resign error; update the
error return in the block that calls e.resignLogCoordinator() (the resignErr
variable) to return errors.Trace(resignErr) so the actual resign failure from
e.resignLogCoordinator() is propagated and not masked by the earlier err
variable.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
server/module_election.go (2)

119-122: ⚠️ Potential issue | 🔴 Critical

Bug: returns err (which is nil after a successful campaign) instead of resignErr.

Same class of bug as flagged at line 239 for the log coordinator path. Here err is the campaign result (nil on success), so the resign failure is silently swallowed.

Proposed fix
 		if resignErr := e.resign(ctx); resignErr != nil {
 			log.Warn("resign coordinator actively failed", zap.String("nodeID", nodeID), zap.Error(resignErr))
-			return errors.Trace(err)
+			return errors.Trace(resignErr)
 		}

289-298: ⚠️ Potential issue | 🟡 Minor

Bug: "resign failed" and "resign timeout" log messages are swapped.

When context.DeadlineExceeded is matched (line 290), the log says "resign failed" (line 291). The else branch (non-timeout error) logs "resign timeout" (line 296). These labels are reversed.

Proposed fix
 	if resignErr := e.logElection.Resign(resignCtx); resignErr != nil {
 		if errors.Is(errors.Cause(resignErr), context.DeadlineExceeded) {
-			log.Info("log coordinator resign failed",
+			log.Warn("log coordinator resign timeout",
 				zap.String("nodeID", nodeID), zap.Error(resignErr))
 			cancel()
 			return errors.Trace(resignErr)
 		}
-		log.Warn("log coordinator resign timeout",
+		log.Warn("log coordinator resign failed",
 			zap.String("nodeID", nodeID), zap.Error(resignErr))
 	}
🤖 Fix all issues with AI agents
In `@coordinator/drain/controller.go`:
- Around line 210-233: sendSetNodeLiveness currently sends epoch=0 when
livenessView.GetNodeEpoch(nodeID) returns false, causing the maintainer
(maintainer_manager.go) to reject the request as an epoch mismatch; update
Controller.sendSetNodeLiveness to detect the missing epoch (ok == false) and log
a clear warning (include nodeID and that node epoch is unknown) before sending
the command so operators can diagnose why the drain may be delayed; keep the
message creation and send flow unchanged (still use epoch variable) so behavior
is unchanged but observable.

In `@coordinator/nodeliveness/view.go`:
- Around line 61-87: ObserveHeartbeat and ObserveSetNodeLivenessResponse
unconditionally overwrite the node record, allowing out-of-order messages to
regress liveness; fix by making updates monotonic per node epoch: in both
functions load existing record from v.nodes[nodeID], if resp.NodeEpoch or
hb.NodeEpoch is greater than the stored nodeEpoch accept and replace (new epoch
resets to ALIVE), if equal only accept the update when the new liveness is the
same or a forward transition (ALIVE→DRAINING→STOPPING) and update lastSeen
appropriately, and if the new epoch is older or the liveness would move backward
reject the update; use the record struct fields nodeEpoch and liveness to
perform these checks inside ObserveHeartbeat and ObserveSetNodeLivenessResponse
while holding v.mu.
🧹 Nitpick comments (7)
api/v1/api.go (1)

201-229: Wrap errors from internal calls before passing to gin context.

Per coding guidelines, errors from internal/library calls should be wrapped with errors.Trace(err) to attach stack traces. The errors from SelfInfo(), GetCoordinator(), and DrainNode() are passed directly to c.Error() without wrapping.

Proposed fix
 	self, err := o.server.SelfInfo()
 	if err != nil {
-		_ = c.Error(err)
+		_ = c.Error(errors.Trace(err))
 		return
 	}
 	// For compatibility with old arch TiCDC, draining the current owner is not allowed.
 	if target == self.ID {
 		_ = c.Error(errors.ErrSchedulerRequestFailed.GenWithStackByArgs("cannot drain the owner"))
 		return
 	}

 	co, err := o.server.GetCoordinator()
 	if err != nil {
-		_ = c.Error(err)
+		_ = c.Error(errors.Trace(err))
 		return
 	}

 	remaining, err := co.DrainNode(c.Request.Context(), target)
 	if err != nil {
-		_ = c.Error(err)
+		_ = c.Error(errors.Trace(err))
 		return
 	}

As per coding guidelines, "When an error comes from a third-party or library call in Go, wrap it immediately with errors.Trace(err) or errors.WrapError(...) to attach a stack trace".

coordinator/nodeliveness/view.go (1)

99-122: GetState uses time.Now() internally, unlike other methods that accept now.

GetDrainingOrStoppingNodes takes a now parameter for TTL checks, but GetState (and by extension IsSchedulableDest) calls time.Now() internally. This inconsistency makes the TTL behavior harder to test and could lead to subtle differences in TTL evaluation within the same tick.

Consider accepting now time.Time as a parameter for consistency, or documenting why the divergence is intentional.

maintainer/maintainer_manager.go (2)

448-472: fromNodeLivenessPB and toNodeLivenessPB don't use the receiver — make them standalone functions.

These methods are bound to *Manager but reference no fields from m. Converting them to package-level functions (or even placing them in a shared conversion utility) would clarify that they're pure mappings and allow reuse without a Manager instance.

♻️ Proposed refactor
-func (m *Manager) fromNodeLivenessPB(l heartbeatpb.NodeLiveness) api.Liveness {
+func fromNodeLivenessPB(l heartbeatpb.NodeLiveness) api.Liveness {
 	switch l {
 	case heartbeatpb.NodeLiveness_ALIVE:
 		return api.LivenessCaptureAlive
@@ ...
-func (m *Manager) toNodeLivenessPB(l api.Liveness) heartbeatpb.NodeLiveness {
+func toNodeLivenessPB(l api.Liveness) heartbeatpb.NodeLiveness {
 	switch l {
 	case api.LivenessCaptureAlive:
 		return heartbeatpb.NodeLiveness_ALIVE

Then update call sites (Lines 211, 417, 436) to drop the m. prefix.


279-284: Good: rejecting AddMaintainer when stopping prevents scheduling work onto a draining node.

This is a solid safety check. Consider also logging at Warn level instead of Info since this represents an unexpected request from the coordinator's perspective (it shouldn't be sending work to a stopping node).

coordinator/drain/controller.go (3)

132-150: AdvanceLiveness re-acquires the lock per node via GetStatus — consider a locked snapshot.

listDrainRequestedNodeIDs() takes a lock to snapshot node IDs, then for each node, GetStatus() re-acquires the lock, and trySendDrainCommand/trySendStopCommand each acquire it again via checkAndMarkCommandSend. This is 3 lock round-trips per node per tick.

Since state only moves forward (monotonic), this isn't a correctness issue, but it's worth noting for performance if the number of draining nodes grows. A single locked pass that snapshots both IDs and states would reduce contention.


36-55: No mechanism to remove nodes from the nodes map.

Once a node enters the drain workflow, its nodeState persists in the map indefinitely. If the coordinator is long-lived and nodes are added/removed over time, this is a slow memory leak. Consider adding a RemoveNode(nodeID) method or clearing entries once stoppingObserved is true and the node has been fully removed from the cluster.


15-15: resendInterval is a hardcoded constant — consider making it configurable.

The AI summary mentioned a resendInterval parameter in the constructor. The current implementation uses a package-level constant of 1 second. For testing and tuning in production, injecting this via the constructor (or a config struct) would be more flexible.

Comment on lines +210 to +233
// sendSetNodeLiveness sends a liveness command to the target maintainer manager.
func (c *Controller) sendSetNodeLiveness(nodeID node.ID, target heartbeatpb.NodeLiveness) {
var epoch uint64
if c.livenessView != nil {
if e, ok := c.livenessView.GetNodeEpoch(nodeID); ok {
epoch = e
}
}

msg := messaging.NewSingleTargetMessage(nodeID, messaging.MaintainerManagerTopic, &heartbeatpb.SetNodeLivenessRequest{
Target: target,
NodeEpoch: epoch,
})
if err := c.mc.SendCommand(msg); err != nil {
log.Warn("send set node liveness command failed",
zap.Stringer("nodeID", nodeID),
zap.String("target", target.String()),
zap.Error(err))
return
}
log.Info("send set node liveness command",
zap.Stringer("nodeID", nodeID),
zap.String("target", target.String()),
zap.Uint64("epoch", epoch))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Drain commands will be silently rejected when the node epoch is unknown.

When livenessView has no epoch for the target node (Line 214, ok is false), epoch defaults to 0. On the receiving side (maintainer_manager.go Line 408), the node compares req.NodeEpoch != m.nodeEpoch — and since newNodeEpoch() guarantees a non-zero value, the request is always rejected with an epoch mismatch log.

This means drain commands sent before the coordinator observes any heartbeat from the target node will be silently ineffective. The resend logic will eventually retry after a heartbeat arrives and populates the epoch, so it's self-healing, but it could delay drain initiation unexpectedly.

Consider either:

  1. Logging a warning when epoch is not found so operators can diagnose delays, or
  2. Treating epoch=0 as "match any" on the receiver side for the initial transition.
#!/bin/bash
# Verify how livenessView.GetNodeEpoch is populated — check if heartbeat observation stores the epoch
ast-grep --pattern $'func ($_ *View) GetNodeEpoch($_, $_) ($_, $_) {
  $$$
}'

# Also check where ObserveHeartbeat feeds into the view
rg -n "ObserveHeartbeat\|UpdateNodeEpoch\|SetNodeEpoch\|nodeEpoch" --type go -g '*/nodeliveness/*'
🤖 Prompt for AI Agents
In `@coordinator/drain/controller.go` around lines 210 - 233, sendSetNodeLiveness
currently sends epoch=0 when livenessView.GetNodeEpoch(nodeID) returns false,
causing the maintainer (maintainer_manager.go) to reject the request as an epoch
mismatch; update Controller.sendSetNodeLiveness to detect the missing epoch (ok
== false) and log a clear warning (include nodeID and that node epoch is
unknown) before sending the command so operators can diagnose why the drain may
be delayed; keep the message creation and send flow unchanged (still use epoch
variable) so behavior is unchanged but observable.

Comment on lines 61 to 87
func (v *View) ObserveHeartbeat(nodeID node.ID, hb *heartbeatpb.NodeHeartbeat, now time.Time) {
if hb == nil {
return
}

v.mu.Lock()
v.nodes[nodeID] = record{
lastSeen: now,
nodeEpoch: hb.NodeEpoch,
liveness: hb.Liveness,
}
v.mu.Unlock()
}

func (v *View) ObserveSetNodeLivenessResponse(nodeID node.ID, resp *heartbeatpb.SetNodeLivenessResponse, now time.Time) {
if resp == nil {
return
}

v.mu.Lock()
v.nodes[nodeID] = record{
lastSeen: now,
nodeEpoch: resp.NodeEpoch,
liveness: resp.Applied,
}
v.mu.Unlock()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Unconditional record overwrite may regress liveness state on out-of-order messages.

Both ObserveHeartbeat and ObserveSetNodeLivenessResponse unconditionally overwrite the record. If a delayed heartbeat (reporting ALIVE) arrives after a SetNodeLivenessResponse (applying DRAINING), the view would briefly revert to ALIVE, which could cause the scheduler to route work to a draining node.

Since liveness transitions are monotonic (ALIVE→DRAINING→STOPPING), consider rejecting updates that would move backward:

Sketch
 func (v *View) ObserveHeartbeat(nodeID node.ID, hb *heartbeatpb.NodeHeartbeat, now time.Time) {
 	if hb == nil {
 		return
 	}
 	v.mu.Lock()
+	if existing, ok := v.nodes[nodeID]; ok {
+		if hb.NodeEpoch == existing.nodeEpoch && hb.Liveness < existing.liveness {
+			v.mu.Unlock()
+			return
+		}
+	}
 	v.nodes[nodeID] = record{
 		lastSeen:  now,
 		nodeEpoch: hb.NodeEpoch,
 		liveness:  hb.Liveness,
 	}
 	v.mu.Unlock()
 }

This would also need analogous handling in ObserveSetNodeLivenessResponse. A new epoch (node restart) should still be allowed to reset to ALIVE.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (v *View) ObserveHeartbeat(nodeID node.ID, hb *heartbeatpb.NodeHeartbeat, now time.Time) {
if hb == nil {
return
}
v.mu.Lock()
v.nodes[nodeID] = record{
lastSeen: now,
nodeEpoch: hb.NodeEpoch,
liveness: hb.Liveness,
}
v.mu.Unlock()
}
func (v *View) ObserveSetNodeLivenessResponse(nodeID node.ID, resp *heartbeatpb.SetNodeLivenessResponse, now time.Time) {
if resp == nil {
return
}
v.mu.Lock()
v.nodes[nodeID] = record{
lastSeen: now,
nodeEpoch: resp.NodeEpoch,
liveness: resp.Applied,
}
v.mu.Unlock()
}
func (v *View) ObserveHeartbeat(nodeID node.ID, hb *heartbeatpb.NodeHeartbeat, now time.Time) {
if hb == nil {
return
}
v.mu.Lock()
if existing, ok := v.nodes[nodeID]; ok {
if hb.NodeEpoch == existing.nodeEpoch && hb.Liveness < existing.liveness {
v.mu.Unlock()
return
}
}
v.nodes[nodeID] = record{
lastSeen: now,
nodeEpoch: hb.NodeEpoch,
liveness: hb.Liveness,
}
v.mu.Unlock()
}
func (v *View) ObserveSetNodeLivenessResponse(nodeID node.ID, resp *heartbeatpb.SetNodeLivenessResponse, now time.Time) {
if resp == nil {
return
}
v.mu.Lock()
v.nodes[nodeID] = record{
lastSeen: now,
nodeEpoch: resp.NodeEpoch,
liveness: resp.Applied,
}
v.mu.Unlock()
}
🤖 Prompt for AI Agents
In `@coordinator/nodeliveness/view.go` around lines 61 - 87, ObserveHeartbeat and
ObserveSetNodeLivenessResponse unconditionally overwrite the node record,
allowing out-of-order messages to regress liveness; fix by making updates
monotonic per node epoch: in both functions load existing record from
v.nodes[nodeID], if resp.NodeEpoch or hb.NodeEpoch is greater than the stored
nodeEpoch accept and replace (new epoch resets to ALIVE), if equal only accept
the update when the new liveness is the same or a forward transition
(ALIVE→DRAINING→STOPPING) and update lastSeen appropriately, and if the new
epoch is older or the liveness would move backward reject the update; use the
record struct fields nodeEpoch and liveness to perform these checks inside
ObserveHeartbeat and ObserveSetNodeLivenessResponse while holding v.mu.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@coordinator/nodeliveness/view.go`:
- Around line 111-134: GetState currently calls time.Now() internally which
prevents deterministic tests and is inconsistent with
GetDrainingOrStoppingNodes; change View.GetState to accept a now time.Time
parameter (func (v *View) GetState(nodeID node.ID, now time.Time) State), update
its internal uses of now (replace the time.Now() call), and then update all
callers (notably IsSchedulableDest) to pass through a now value (e.g., take now
as a parameter or forward the now they already have) so TTL comparisons use the
same time source as GetDrainingOrStoppingNodes.
🧹 Nitpick comments (6)
coordinator/scheduler/drain.go (2)

105-118: Round-robin cursor advances on every outer-loop iteration, not per Execute call.

s.rrCursor is incremented at line 114 inside the for scheduled < availableSize loop, meaning it advances multiple times per Execute call (once per full sweep of draining nodes). This provides fairness across multiple batches within a single call, but is worth confirming this is the intended behavior — the comment on line 41 says "rotates the starting draining node" which could imply once-per-call advancement.


159-180: chooseLeastLoadedDest returns a false positive when all candidates equal origin and node.ID zero value is "".

If every candidate in destCandidates matches origin, chosen remains the zero-value empty string and the guard at line 176 (chosen.IsEmpty()) catches it. This works, but only because node.ID("") is never a legitimate node ID. The logic is correct but brittle — consider returning false explicitly when minSize == math.MaxInt for clarity.

coordinator/drain/controller.go (2)

130-139: GetStatus acquires an exclusive lock for a read-only operation.

Use RLock/RUnlock here to allow concurrent readers.

Suggested fix
 func (c *Controller) GetStatus(nodeID node.ID) (drainRequested, drainingObserved, stoppingObserved bool) {
-	c.mu.Lock()
-	defer c.mu.Unlock()
+	c.mu.RLock()
+	defer c.mu.RUnlock()
 
 	st, ok := c.nodes[nodeID]

144-162: TOCTOU between GetStatus and trySendDrainCommand/trySendStopCommand — benign but worth noting.

AdvanceLiveness reads state via GetStatus (acquires and releases lock), then conditionally calls send methods (which re-acquire the lock). Between these calls another goroutine could change the state. In practice this is harmless — the worst case is an extra throttled command or a skipped iteration that self-heals on the next cycle. No action needed now, but if the controller becomes multi-goroutine-hot, consolidating the check-and-send under a single lock hold would eliminate the gap.

coordinator/drain/controller_test.go (2)

38-45: Blocking channel reads will hang the test forever if no message is sent.

Lines 39, 60, and 71 use <-mc.GetMessageChannel() which blocks indefinitely. If a code change causes the message not to be sent, the test hangs rather than failing with a useful message.

Consider using a helper with a timeout:

Suggested approach
func receiveOrFail(t *testing.T, ch <-chan *messaging.TargetMessage) *messaging.TargetMessage {
	t.Helper()
	select {
	case msg := <-ch:
		return msg
	case <-time.After(3 * time.Second):
		require.FailNow(t, "timed out waiting for message")
		return nil
	}
}

Also applies to: 59-62, 70-73


26-74: Test coverage is thin — consider adding cases for key scenarios.

The single test covers the happy path well. Missing scenarios that would strengthen confidence:

  • Unknown epoch: RequestDrain before any heartbeat observed (epoch=0 path).
  • Multiple draining nodes: verifying fair round-robin or independent progression.
  • readyToStop=false: AdvanceLiveness should not send STOPPING when the callback returns false.
  • Idempotent RequestDrain: calling it twice should not double-send.

Comment on lines +111 to +134
func (v *View) GetState(nodeID node.ID) State {
v.mu.RLock()
r, ok := v.nodes[nodeID]
v.mu.RUnlock()

now := time.Now()
if !ok {
// Never observed: keep compatibility during rollout.
return StateAlive
}
if now.Sub(r.lastSeen) > v.ttl {
return StateUnknown
}
switch r.liveness {
case heartbeatpb.NodeLiveness_ALIVE:
return StateAlive
case heartbeatpb.NodeLiveness_DRAINING:
return StateDraining
case heartbeatpb.NodeLiveness_STOPPING:
return StateStopping
default:
return StateAlive
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

GetState uses time.Now() internally, making it untestable and inconsistent with GetDrainingOrStoppingNodes.

GetDrainingOrStoppingNodes accepts a now time.Time parameter, but GetState (and therefore IsSchedulableDest) hardcodes time.Now(). This makes TTL-dependent behavior impossible to test deterministically and creates an inconsistency in the API surface.

Suggested fix
-func (v *View) GetState(nodeID node.ID) State {
+func (v *View) GetState(nodeID node.ID, now time.Time) State {
 	v.mu.RLock()
 	r, ok := v.nodes[nodeID]
 	v.mu.RUnlock()
 
-	now := time.Now()
 	if !ok {
 		return StateAlive
 	}

Then update IsSchedulableDest similarly:

-func (v *View) IsSchedulableDest(nodeID node.ID) bool {
-	return v.GetState(nodeID) == StateAlive
+func (v *View) IsSchedulableDest(nodeID node.ID, now time.Time) bool {
+	return v.GetState(nodeID, now) == StateAlive
 }
🤖 Prompt for AI Agents
In `@coordinator/nodeliveness/view.go` around lines 111 - 134, GetState currently
calls time.Now() internally which prevents deterministic tests and is
inconsistent with GetDrainingOrStoppingNodes; change View.GetState to accept a
now time.Time parameter (func (v *View) GetState(nodeID node.ID, now time.Time)
State), update its internal uses of now (replace the time.Now() call), and then
update all callers (notably IsSchedulableDest) to pass through a now value
(e.g., take now as a parameter or forward the now they already have) so TTL
comparisons use the same time source as GetDrainingOrStoppingNodes.

@hongyunyan
Copy link
Collaborator Author

/gemini review

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a comprehensive node draining mechanism, a significant feature for improving cluster manageability. However, a critical security concern has been identified: the new drainCapture API endpoint in api/v1/api.go is exposed without authentication, which could allow unauthenticated users to disrupt the cluster. Additionally, a potential issue regarding error handling during log coordinator resignation should be addressed.

// Usage:
// curl -X PUT http://127.0.0.1:8300/api/v1/captures/drain
// TODO: Implement this API in the future, currently it is a no-op.
func (o *OpenAPIV1) drainCapture(c *gin.Context) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

The drainCapture function implements a sensitive cluster management operation (draining a capture), but the pull request does not ensure that the corresponding route registration is protected by authentication middleware. In RegisterOpenAPIV1Routes, the PUT /drain route is registered without authenticateMiddleware, unlike other sensitive operations in the same file (e.g., createChangefeed, updateChangefeed). This could allow unauthenticated users to disrupt the cluster by triggering node drains. It is recommended to add authenticateMiddleware to the route registration for drainCapture in RegisterOpenAPIV1Routes.

Comment on lines 289 to 299
if resignErr := e.logElection.Resign(resignCtx); resignErr != nil {
if errors.Is(errors.Cause(resignErr), context.DeadlineExceeded) {
log.Info("log coordinator resign failed",
log.Info("log coordinator resign timeout",
zap.String("nodeID", nodeID), zap.Error(resignErr))
cancel()
return errors.Trace(resignErr)
}
log.Warn("log coordinator resign timeout",
log.Warn("log coordinator resign failed",
zap.String("nodeID", nodeID), zap.Error(resignErr))
}
cancel()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This function currently swallows non-timeout errors from e.logElection.Resign. If Resign fails for a reason other than a timeout, the error is logged as a warning, but the function returns nil. This could mislead the caller into believing the resignation was successful when it wasn't, potentially leading to an inconsistent state where the node is still the log coordinator leader in etcd.

The error should be returned to the caller to handle it properly. Using defer cancel() would also simplify the context cancellation logic.

Suggested change
if resignErr := e.logElection.Resign(resignCtx); resignErr != nil {
if errors.Is(errors.Cause(resignErr), context.DeadlineExceeded) {
log.Info("log coordinator resign failed",
log.Info("log coordinator resign timeout",
zap.String("nodeID", nodeID), zap.Error(resignErr))
cancel()
return errors.Trace(resignErr)
}
log.Warn("log coordinator resign timeout",
log.Warn("log coordinator resign failed",
zap.String("nodeID", nodeID), zap.Error(resignErr))
}
cancel()
defer cancel()
if resignErr := e.logElection.Resign(resignCtx); resignErr != nil {
if errors.Is(errors.Cause(resignErr), context.DeadlineExceeded) {
log.Info("log coordinator resign timeout",
zap.String("nodeID", nodeID), zap.Error(resignErr))
} else {
log.Warn("log coordinator resign failed",
zap.String("nodeID", nodeID), zap.Error(resignErr))
}
return errors.Trace(resignErr)
}

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
server/module_election.go (2)

116-123: ⚠️ Potential issue | 🔴 Critical

Bug: errors.Trace(err) returns nil instead of the resign error.

At line 121, err is the result of Campaign (line 95), which is nil at this point (the campaign succeeded for execution to reach this branch). This swallows the resign failure. Same pattern as the known bug on line 239.

Proposed fix
 		if resignErr := e.resign(ctx); resignErr != nil {
 			log.Warn("resign coordinator actively failed", zap.String("nodeID", nodeID), zap.Error(resignErr))
-			return errors.Trace(err)
+			return errors.Trace(resignErr)
 		}

282-301: ⚠️ Potential issue | 🟠 Major

resignLogCoordinator silently swallows non-timeout resign errors.

When Resign fails with an error other than context.DeadlineExceeded, line 296 logs a warning but line 300 returns nil. This means a genuine resign failure (e.g., network error) is silently ignored and the caller assumes success.

Compare with the coordinator's resign path (line 155-161) which propagates the error. Consider returning the error here as well.

Proposed fix
 	if resignErr := e.logElection.Resign(resignCtx); resignErr != nil {
 		if errors.Is(errors.Cause(resignErr), context.DeadlineExceeded) {
 			log.Info("log coordinator resign timeout",
 				zap.String("nodeID", nodeID), zap.Error(resignErr))
 			cancel()
 			return errors.Trace(resignErr)
 		}
 		log.Warn("log coordinator resign failed",
 			zap.String("nodeID", nodeID), zap.Error(resignErr))
+		cancel()
+		return errors.Trace(resignErr)
 	}
 	cancel()
 	return nil
🤖 Fix all issues with AI agents
In `@coordinator/drain/controller.go`:
- Around line 12-13: Add a single blank line between the file license header and
the package declaration to match project formatting; update the top of
controller.go so the license block is followed by one empty line before the
"package drain" statement (the package declaration symbol is package drain).

In `@coordinator/nodeliveness/view_test.go`:
- Around line 12-13: Add a blank line between the license header and the package
declaration in this file so it matches the project's formatting (same change as
in view.go); specifically, insert an empty line above the "package nodeliveness"
declaration to separate the header from the package statement.

In `@coordinator/nodeliveness/view.go`:
- Around line 12-13: Add a single blank line between the file's license header
and the package declaration to match the repository style and avoid
gofmt/gofumpt or copyright-check lint failures; specifically insert one empty
line immediately above the existing "package nodeliveness" declaration so the
license block is separated from the package line.
🧹 Nitpick comments (4)
coordinator/nodeliveness/view_test.go (1)

24-43: Test relies on wall-clock time for TTL assertions, making it timing-sensitive.

GetState calls time.Now() internally. The test works because the observation timestamps are set far enough in the past, but on a very slow CI runner or with clock skew, the 5-second-from-now observation at line 35 could flicker. If GetState is refactored to accept now time.Time (as suggested in view.go), these tests become fully deterministic.

Current approach is acceptable for now given the generous margins (5s vs 30s TTL, 35s vs 30s TTL).

maintainer/maintainer_manager.go (2)

404-437: Potential TOCTOU between m.liveness.Load() and m.liveness.Store(target).

Lines 423-425 load current, compare target > current, then call Store(target). Between the load and the store, another goroutine (e.g., the server's signal handler calling Store for stopping) could advance the liveness. The Store CAS loop handles this correctly (it would fail and retry or return false), so the state is safe. However, line 436 sends a response with the current local variable which may be stale if Store returned false due to a concurrent advance.

This is a minor concern since Store returning false means current on line 431 is not updated, and the response on line 436 would report the pre-race value rather than the actual current state. This could cause the coordinator to have a momentarily stale view, which is self-correcting via the next heartbeat.


453-477: fromNodeLivenessPB and toNodeLivenessPB don't use the receiver — consider making them package-level functions.

These methods don't access any Manager state. Making them free functions (or at minimum unexported package-level functions) would better convey their stateless nature and allow reuse without a Manager instance.

Proposed change
-func (m *Manager) fromNodeLivenessPB(l heartbeatpb.NodeLiveness) api.Liveness {
+func fromNodeLivenessPB(l heartbeatpb.NodeLiveness) api.Liveness {
-func (m *Manager) toNodeLivenessPB(l api.Liveness) heartbeatpb.NodeLiveness {
+func toNodeLivenessPB(l api.Liveness) heartbeatpb.NodeLiveness {

Then update callers (e.g., lines 211, 422, 441) to drop the m. prefix.

coordinator/drain/controller.go (1)

141-162: AdvanceLiveness re-acquires the lock per-node via GetStatus after snapshotting the list — consider whether a single-pass under one lock would be simpler.

listDrainRequestedNodeIDs acquires and releases the lock, then the loop calls GetStatus (lock again) for each node, then trySendDrainCommand/trySendStopCommand (lock again in checkAndMarkCommandSend). This is correct and avoids holding the lock during I/O, but the triple lock acquisition per node adds overhead. Given the expected low number of draining nodes, this is fine.

Comment on lines +12 to +13
// limitations under the License.
package drain
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Missing blank line between license header and package declaration.

Same formatting issue as the other new files.

🤖 Prompt for AI Agents
In `@coordinator/drain/controller.go` around lines 12 - 13, Add a single blank
line between the file license header and the package declaration to match
project formatting; update the top of controller.go so the license block is
followed by one empty line before the "package drain" statement (the package
declaration symbol is package drain).

Comment on lines +12 to +13
// limitations under the License.
package nodeliveness
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Missing blank line between license header and package declaration.

Same as view.go — add a blank line for consistency with the rest of the codebase.

🤖 Prompt for AI Agents
In `@coordinator/nodeliveness/view_test.go` around lines 12 - 13, Add a blank line
between the license header and the package declaration in this file so it
matches the project's formatting (same change as in view.go); specifically,
insert an empty line above the "package nodeliveness" declaration to separate
the header from the package statement.

Comment on lines +12 to +13
// limitations under the License.
package nodeliveness
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Missing blank line between license header and package declaration.

Most Go files in this repo separate the license block from the package line with a blank line. This may also trigger gofmt/gofumpt or copyright-check lint failures.

Proposed fix
 // limitations under the License.
+
 package nodeliveness
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// limitations under the License.
package nodeliveness
// limitations under the License.
package nodeliveness
🤖 Prompt for AI Agents
In `@coordinator/nodeliveness/view.go` around lines 12 - 13, Add a single blank
line between the file's license header and the package declaration to match the
repository style and avoid gofmt/gofumpt or copyright-check lint failures;
specifically insert one empty line immediately above the existing "package
nodeliveness" declaration so the license block is separated from the package
line.

@ti-chi-bot
Copy link

ti-chi-bot bot commented Feb 13, 2026

[FORMAT CHECKER NOTIFICATION]

Notice: To remove the do-not-merge/needs-linked-issue label, please provide the linked issue number on one line in the PR body, for example: Issue Number: close #123 or Issue Number: ref #456.

📖 For more info, you can check the "Contribute Code" section in the development guide.

@hongyunyan
Copy link
Collaborator Author

/test mysql

@ti-chi-bot
Copy link

ti-chi-bot bot commented Feb 13, 2026

@hongyunyan: The following test failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
pull-cdc-mysql-integration-light 9582e10 link true /test pull-cdc-mysql-integration-light

Full PR test history. Your PR dashboard.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

do-not-merge/needs-linked-issue do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. release-note Denotes a PR that will be considered when it comes time to generate release notes. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant