Skip to content

Conversation

@3AceShowHand
Copy link
Collaborator

@3AceShowHand 3AceShowHand commented Jan 28, 2026

What problem does this PR solve?

Issue Number: close #xxx

What is changed and how it works?

Check List

Tests

  • Unit test
  • Integration test
  • Manual test (add detailed scripts or steps below)
  • No code

Questions

Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?

Release note

Please refer to [Release Notes Language Style Guide](https://pingcap.github.io/tidb-dev-guide/contribute-to-tidb/release-notes-style-guide.html) to write a quality release note.

If you don't think this PR needs a release note then fill it with `None`.

Summary by CodeRabbit

Release Notes

  • New Features
    • Added capture drain API endpoints (PUT and GET /api/v2/captures/{capture_id}/drain) to gracefully migrate workloads off nodes
    • Implemented drain status monitoring to track migration progress and remaining maintainer/dispatcher counts
    • Introduced node liveness states (Alive, Draining, Stopping) to manage node lifecycle during drain operations
    • Integrated drain scheduling to batch workload migrations across cluster nodes

✏️ Tip: You can customize this high-level summary in your review settings.

@ti-chi-bot ti-chi-bot bot added do-not-merge/needs-linked-issue release-note Denotes a PR that will be considered when it comes time to generate release notes. labels Jan 28, 2026
@ti-chi-bot
Copy link

ti-chi-bot bot commented Jan 28, 2026

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign hicqu for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@ti-chi-bot ti-chi-bot bot added the size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. label Jan 28, 2026
@coderabbitai
Copy link

coderabbitai bot commented Jan 28, 2026

📝 Walkthrough

Walkthrough

Introduces the Drain Capture feature for TiCDC, enabling graceful node migration by adding drain state management, scheduler integration, liveness tracking, and HTTP APIs to initiate and monitor draining operations across the cluster.

Changes

Cohort / File(s) Summary
Specification & Design Documents
.kiro/specs/drain-capture/design.md, .kiro/specs/drain-capture/requirements.md, .kiro/specs/drain-capture/tasks.md, cdc-api-v2-capture-drain-cn.md, docs/design/2026-01-10-dynamic-stream.md
Comprehensive design, requirements, implementation plan, API execution paths, and dynamic stream design notes documenting the drain feature architecture, state transitions, scheduler integration, error handling, and observability strategy.
API Layer
api/v2/api.go, api/v2/capture.go, api/v2/model.go, pkg/server/coordinator.go
New HTTP endpoints (PUT/GET) for drain operations; handlers validate capture IDs, ensure non-coordinator targets, and fetch coordinator before invoking drain/status queries; new request/response types encapsulate drain counts and status.
Coordinator Drain Management
coordinator/controller.go, coordinator/coordinator.go, coordinator/drain_state.go, coordinator/drain_state_test.go
New DrainState type manages drain operation lifecycle with thread-safe state transitions (Alive→Draining→Stopping); Controller integrates drain APIs and registers DrainScheduler; thin wrapper methods delegate to controller.
Drain Scheduler & Node Filtering
coordinator/scheduler/drain.go, coordinator/scheduler/drain_test.go, coordinator/scheduler/basic.go, coordinator/scheduler/balance.go
New DrainScheduler generates batched MoveMaintainer operators for draining nodes with workload-aware destination selection; basic.go and balance.go updated to filter nodes via GetSchedulableNodeIDs() excluding draining/stopping nodes.
Liveness & Node State Machine
pkg/node/node.go, pkg/node/liveness_test.go, pkg/api/util.go, heartbeatpb/heartbeat.proto
New atomic Liveness type (Alive/Draining/Stopping) with state transition methods (StoreDraining(), DrainComplete(), IsSchedulable()); proto augmented with draining_capture field in MaintainerHeartbeat.
Node Manager Liveness Tracking
server/watcher/module_node_manager.go, server/watcher/module_node_manager_test.go
Node manager gains liveness tracking via sync.Map; new accessors (GetNodeLiveness(), SetNodeLiveness()); filtering methods (GetSchedulableNodes(), GetCoordinatorCandidates()) restrict scheduling/election to Alive nodes; test helpers for scaffolding.
Scheduler Constant
pkg/scheduler/scheduler.go
Single constant addition for the drain scheduler name.

Sequence Diagram

sequenceDiagram
    participant Client
    participant API as API Handler
    participant Coordinator
    participant DrainState
    participant NodeManager
    participant DrainScheduler
    participant Maintainer

    Client->>API: PUT /drain (captureID)
    API->>Coordinator: DrainCapture(captureID)
    Coordinator->>DrainState: SetDrainingTarget(captureID, counts)
    DrainState->>NodeManager: SetNodeLiveness(captureID, Draining)
    NodeManager-->>DrainState: ✓
    DrainState-->>Coordinator: ✓
    Coordinator-->>API: DrainCaptureResponse
    API-->>Client: 200 OK

    Note over DrainScheduler: Asynchronous Drain Execution

    DrainScheduler->>DrainState: GetDrainingTarget()
    DrainState-->>DrainScheduler: captureID
    DrainScheduler->>NodeManager: GetSchedulableNodes()
    NodeManager-->>DrainScheduler: nodes (excluding Draining)
    DrainScheduler->>DrainScheduler: Select destinations, batch MoveMaintainer
    DrainScheduler->>Maintainer: MoveMaintainer ops
    Maintainer-->>DrainScheduler: ✓

    loop Until all migrated
        Maintainer->>Coordinator: Report progress (heartbeat)
        Coordinator-->>DrainScheduler: Query progress
    end

    DrainScheduler->>DrainState: ClearDrainingTarget()
    DrainState->>NodeManager: SetNodeLiveness(captureID, Stopping)
    NodeManager-->>DrainState: ✓
    DrainState-->>DrainScheduler: ✓
Loading

Estimated Code Review Effort

🎯 4 (Complex) | ⏱️ ~60 minutes

This PR introduces a new feature spanning multiple subsystems: a thread-safe state machine for drain operations, integration with the scheduler framework, liveness state management with atomic transitions, and API handlers. The changes are heterogeneous—mixing logic (drain state, scheduler, filtering), proto/type definitions, and tests across ~20 files—requiring careful review of state transition semantics, concurrency safety, and integration points with existing schedulers and node management.

Poem

🐰 A drain flows gently through our nodes so grand,
Migrating work with scheduler's steady hand,
From Alive to Draining, with grace we flow,
Each maintainer finds a new home below.

🚥 Pre-merge checks | ❌ 3
❌ Failed checks (2 warnings, 1 inconclusive)
Check name Status Explanation Resolution
Description check ⚠️ Warning The PR description is entirely a template with placeholders unfilled. Critical sections like 'What is changed and how it works?' and 'Issue Number' are empty or contain placeholders (close #xxx). No actual problem statement, implementation details, test descriptions, or release notes are provided. Complete the PR description by: (1) filling in the actual issue number, (2) describing what changed and how the drain capture feature works, (3) confirming test coverage with specifics, (4) answering questions about performance/compatibility, (5) providing an actual release note or 'None', and (6) removing the template comment markers.
Docstring Coverage ⚠️ Warning Docstring coverage is 21.74% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Title check ❓ Inconclusive The title '[DNM] Drain capture' is vague and generic, using the 'DNM' prefix (Do Not Merge) without clearly describing the main feature or change. It lacks specificity about what the drain capture feature actually accomplishes. Revise the title to clearly describe the feature, e.g., 'Add drain capture API and scheduler implementation' or 'Implement graceful node drain for TiCDC'. Remove the DNM prefix if the PR is ready for review.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@ti-chi-bot
Copy link

ti-chi-bot bot commented Jan 28, 2026

[FORMAT CHECKER NOTIFICATION]

Notice: To remove the do-not-merge/needs-linked-issue label, please provide the linked issue number on one line in the PR body, for example: Issue Number: close #123 or Issue Number: ref #456.

📖 For more info, you can check the "Contribute Code" section in the development guide.

@gemini-code-assist
Copy link

Summary of Changes

Hello @3AceShowHand, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces the 'Drain Capture' feature for TiCDC, enabling operators to gracefully remove a TiCDC node from a cluster without interrupting data synchronization. It achieves this by systematically migrating all associated workloads, including Maintainers and their Dispatchers, to other healthy nodes. The design incorporates new node liveness states, a dedicated drain scheduler, and API endpoints for initiation and monitoring, ensuring a controlled and observable node decommissioning process.

Highlights

  • New Feature: Drain Capture: Introduces the 'Drain Capture' functionality, enabling graceful migration of all workloads (maintainers and dispatchers) from a specified TiCDC node to other nodes for maintenance or decommissioning without interrupting data synchronization.
  • Liveness State Extension: A new LivenessCaptureDraining state is added to the node's liveness model. This state signifies that a node is actively being drained, preventing new workloads from being scheduled to it and initiating the migration of existing tasks.
  • Two-Phase Workload Migration: The draining process follows a two-phase approach: first, the Coordinator migrates Maintainers to healthy nodes, and then, the newly migrated Maintainers automatically handle the migration of their associated Dispatchers from the original draining node.
  • New API Endpoints: New HTTP API endpoints are exposed: PUT /api/v2/captures/{capture_id}/drain to initiate a drain operation and GET /api/v2/captures/{capture_id}/drain to query the current status and progress of a draining node.
  • Dedicated Drain Scheduler: A DrainScheduler is integrated into the Coordinator's scheduling framework. It generates MoveMaintainer operators in batches, prioritizes drain operations over general balancing, and ensures that draining nodes are excluded from all new scheduling decisions.
  • NodeManager Enhancements: The NodeManager is extended to track and manage the liveness state of each node, providing filtered lists of schedulable nodes and coordinator candidates based on their current liveness status.
  • Robust Error Handling and Recovery: The design includes comprehensive error handling and recovery mechanisms for scenarios such as coordinator failover, destination node failures during migration, and draining node crashes, ensuring the system's stability and data integrity.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a significant new feature: the ability to drain a capture node. The implementation includes new API endpoints, a drain scheduler, and updates to node liveness states. The overall structure is well-thought-out, with clear separation of concerns. However, there are a few critical issues related to scheduler execution order and inter-component communication that need to be addressed to ensure correctness and align with the design documents. I've also included some suggestions for code clarity and to fix potential load-balancing issues during the drain process.

Comment on lines 126 to +129
repeated MaintainerStatus statuses = 1;
// draining_capture is the capture ID being drained, empty if no drain in progress.
// Maintainer should not schedule new dispatchers to this capture.
string draining_capture = 2;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The draining_capture field has been added to the MaintainerHeartbeat message. This message flows from the maintainer to the coordinator. However, the notification about a draining capture should flow from the coordinator (which initiates the drain) to the maintainers.

This implementation contradicts the design specified in your design documents (.kiro/specs/drain-capture/requirements.md and .kiro/specs/drain-capture/design.md), which call for a new DrainNotification message from the coordinator to the maintainers. Using MaintainerHeartbeat for this purpose is logically incorrect and could lead to issues with propagating the drain status. Please implement the notification mechanism as designed, with the correct message directionality.

Comment on lines +110 to +136
// Generate MoveMaintainer operators in batches
batchCount := 0
for _, cf := range maintainers {
if batchCount >= availableBatch {
break
}

// Skip if operator already exists
if s.operatorController.GetOperator(cf.ID) != nil {
continue
}

// Select destination with lowest workload
destNode := s.selectDestination(schedulableNodes)
if destNode == "" {
continue
}

op := operator.NewMoveMaintainerOperator(s.changefeedDB, cf, drainingTarget, destNode)
if s.operatorController.AddOperator(op) {
batchCount++
log.Info("generated move maintainer operator for drain",
zap.String("changefeed", cf.ID.String()),
zap.String("from", drainingTarget.String()),
zap.String("to", destNode.String()))
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The selectDestination function is called inside a loop to generate a batch of MoveMaintainer operators. However, it doesn't account for in-flight migrations within the current batch. This can lead to all maintainers in a batch being moved to the same destination node if it initially has the lowest workload, causing an unbalanced distribution.

The design document (.kiro/specs/drain-capture/design.md) suggests tracking in-flight assignments to prevent this. Please consider updating the logic to track planned migrations within the Execute function and pass this information to selectDestination to ensure a more balanced distribution of maintainers across available nodes.

Comment on lines +174 to +178
func (c *NodeManager) SetNodeLiveness(id node.ID, liveness node.Liveness) {
var l node.Liveness
actual, _ := c.nodeLiveness.LoadOrStore(id, &l)
actual.(*node.Liveness).Store(liveness)
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The implementation of SetNodeLiveness can be simplified for better clarity and efficiency. The current implementation allocates a new node.Liveness variable on the stack in every call, which is unnecessary if the key already exists in the map.

Consider refactoring this to a more straightforward implementation that avoids this extra allocation.

func (c *NodeManager) SetNodeLiveness(id node.ID, liveness node.Liveness) {
	val, _ := c.nodeLiveness.LoadOrStore(id, new(node.Liveness))
	val.(*node.Liveness).Store(liveness)
}

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 12

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
coordinator/scheduler/basic.go (1)

76-82: Guard against empty schedulable node set before scheduling.

If all nodes are draining/stopping, nodeIDs is empty and BasicSchedule may panic or behave unexpectedly. A short-circuit avoids scheduling with no destinations.

🔧 Proposed fix
-	nodeIDs := s.nodeManager.GetSchedulableNodeIDs()
+	nodeIDs := s.nodeManager.GetSchedulableNodeIDs()
+	if len(nodeIDs) == 0 {
+		return
+	}
 	nodeSize := make(map[node.ID]int)
🤖 Fix all issues with AI agents
In @.kiro/specs/drain-capture/design.md:
- Around line 62-67: The fenced code block under "Liveness 状态转换规则" is missing a
language identifier; update the opening fence from ``` to ```text so the block
becomes a text-coded fence containing the three transition lines (Alive →
Draining..., Draining → Stopping..., Alive → Stopping...) to satisfy MD040;
ensure only the fence marker is changed and content remains unchanged.
- Around line 843-907: The markdown uses bold-only lines like "**Validates:
Requirements 2.7, 3.6, ...**" which triggers MD036; update each Property section
(e.g., "Property 1: Draining Node Exclusion from Scheduling", "Property 2:
Single Draining Target Invariant", etc.) to replace the bold-only "Validates:
..." lines with either a proper heading (e.g., "#### Validates" or "Validates:")
or plain paragraph text (e.g., "Validates: Requirements 2.7, 3.6, ...") so they
are not solely bold formatting, and ensure consistency across all Property N
blocks (Properties 1–11) to satisfy markdownlint MD036.

In `@api/v2/api.go`:
- Around line 83-87: The drain endpoints under captureGroup are missing
authentication: add authenticateMiddleware to the middleware chain for the
routes that register DrainCapture and GetDrainStatus so they match other admin
operations; specifically, ensure captureGroup.PUT("/:capture_id/drain", ...) and
captureGroup.GET("/:capture_id/drain", ...) are registered with
authenticateMiddleware (in addition to coordinatorMiddleware) before invoking
the handlers DrainCapture and GetDrainStatus to enforce consistent auth.

In `@api/v2/capture.go`:
- Around line 72-92: The check is comparing targetNode.ID to the local node
(selfInfo.ID) instead of the actual coordinator; update the drain-prevention
logic in the handler to fetch the coordinator via h.server.GetCoordinatorInfo()
(or equivalent) and compare targetNode.ID against coordinator.ID, returning the
same ErrAPIInvalidParam if they match; ensure you handle errors returned by
GetCoordinatorInfo() similarly to the existing SelfInfo() error handling so
failures surface correctly.

In `@cdc-api-v2-capture-drain-cn.md`:
- Around line 279-304: The document uses bolded section titles (e.g., "**HTTP
入口与路由**", "**v2 handler 与 owner API 桥接**", etc.) which triggers markdownlint
MD036; replace each bolded title with a proper Markdown heading (for example "##
HTTP 入口与路由", "## v2 handler 与 owner API 桥接", "## Owner job 队列与 drain 入口", "##
scheduler v3 drain 与 tick 主循环", "## 表迁移消息与状态机") keeping a consistent heading
level and preserving the following bullet lists and function references
(`(*server).startStatusHTTP`, `RegisterRoutes`, `(*OpenAPIV2).drainCapture`,
`(*ownerImpl).Tick`, `(*coordinator).DrainCapture`, `(*Manager).HandleTasks`,
etc.); ensure a blank line before each heading and adjust any nested sections to
use appropriate subheading levels to maintain semantic structure.

In `@coordinator/drain_state.go`:
- Around line 55-79: DrainState currently only updates in-memory fields
(drainingTarget, startTime, initialMaintainerCount, initialDispatcherCount) and
node liveness via nodeManager.SetNodeLiveness, so a coordinator restart loses
the drain record; persist the drain record and liveness and restore it on
startup. Add/use a durable store interface (e.g., EtcdStore) and modify
SetDrainingTarget to write a persistent drain entry (including target ID,
startTime, maintainer/dispatcher counts and liveness state) before—or as part
of—setting the in-memory fields and calling nodeManager.SetNodeLiveness; ensure
writes are durable/checked and handle errors by rolling back in-memory changes.
Implement a startup/hydration path (e.g., DrainState.RecoverFromStore invoked
during coordinator bootstrap) that reads persisted drain entries, reconstructs
DrainState fields and re-applies nodeManager.SetNodeLiveness for each persisted
draining target, and ensure successful drain completion clears the persistent
record (so SetDrainingTarget, any Complete/Cancel methods, and DrainState
recovery logic are updated to maintain persistence).

In `@coordinator/scheduler/balance.go`:
- Around line 78-89: The balance routine must skip balancing when there are no
schedulable nodes to avoid a divide-by-zero in pkgScheduler.CheckBalanceStatus;
after obtaining schedulableNodes from s.nodeManager.GetSchedulableNodes(), add a
guard that if len(schedulableNodes) == 0 you return
now.Add(s.checkBalanceInterval) (i.e., skip calling
pkgScheduler.CheckBalanceStatus and pkgScheduler.Balance). This prevents the
panic and preserves the existing retry scheduling logic.

In `@coordinator/scheduler/drain.go`:
- Around line 152-161: The current allDispatchersMigrated on type drainScheduler
always returns true causing drain to finish before dispatchers leave; change it
to query the dispatcher membership/heartbeat for the given node.ID (e.g., via
the dispatcher registry/heartbeat store or the maintainer discovery API) and
return true only when the live dispatcher count for nodeID is zero; update
allDispatchersMigrated to call the appropriate store/heartbeat method, handle
transient errors (retry/log and return false on uncertainty) and remove the
temporary stub so drain only completes when no dispatchers remain.

In `@docs/design/2026-01-10-dynamic-stream.md`:
- Around line 52-62: The fenced code block showing the runtime structure
(containing Push, pathMap, *pathInfo, stream.addEvent, stream.handleLoop,
pendingQueue, signalQueue, popEvents, handler.Handle and dest) needs a language
identifier to satisfy markdownlint MD040; update the opening fence from ``` to a
tagged fence such as ```text (or ```bash/```go if more appropriate) so the block
becomes ```text ... ``` and leave the existing content unchanged.

In `@pkg/api/util.go`:
- Around line 198-206: The Liveness.String method dereferences *l directly which
breaks the atomic access contract; change the switch to use l.Load() instead of
*l so it reads the value atomically (consistent with other methods like
IsSchedulable). Locate the String method on type Liveness and replace uses of *l
in the switch with the result of l.Load(), preserving the same case labels
(LivenessCaptureAlive, LivenessCaptureStopping, LivenessCaptureDraining) and
default behavior.

In `@pkg/node/node.go`:
- Around line 40-68: The public Liveness.Store method currently permits
arbitrary state changes (including reverting from Draining back to Alive) which
breaks the one-way lifecycle enforced by StoreDraining and DrainComplete; update
Liveness.Store to prevent invalid reversions by either: 1) making Store
unexported (rename to store) and restrict its use to initialization paths only,
or 2) enforce a guarded write that only allows transitioning from a
zero/uninitialized state to a valid initial state (returning false otherwise),
or 3) remove Store entirely and update call sites to use the existing atomic
CompareAndSwap-based methods (StoreDraining, DrainComplete) for state
transitions; adjust all references to Liveness.Store accordingly so transitions
can only occur via the guarded methods.

In `@server/watcher/module_node_manager.go`:
- Around line 219-223: The node liveness map is never cleaned up when nodes are
removed; call ClearNodeLiveness(id) when a node removal is detected to prevent
unbounded growth. Update the node removal path in NodeManager.Tick() (the branch
that currently triggers registered handlers on node removal) or inside the node
change handler to invoke ClearNodeLiveness(id) immediately after detecting the
removed node (using the same node ID passed to the handlers), ensuring the
nodeLiveness entry is deleted alongside any other removal logic.
🧹 Nitpick comments (1)
coordinator/scheduler/drain.go (1)

110-135: Avoid concentrating a batch on one node.
selectDestination only considers current workload, so multiple migrations in the same tick can pile onto a single node. Consider accounting for in-flight assignments in this batch.

♻️ In-flight–aware selection
-	// Generate MoveMaintainer operators in batches
-	batchCount := 0
+	// Generate MoveMaintainer operators in batches
+	batchCount := 0
+	inflight := make(map[node.ID]int)
 	for _, cf := range maintainers {
@@
-		destNode := s.selectDestination(schedulableNodes)
+		destNode := s.selectDestination(schedulableNodes, inflight)
 		if destNode == "" {
 			continue
 		}
@@
 		if s.operatorController.AddOperator(op) {
 			batchCount++
+			inflight[destNode]++
 			log.Info("generated move maintainer operator for drain",
@@
-func (s *drainScheduler) selectDestination(nodes map[node.ID]*node.Info) node.ID {
+func (s *drainScheduler) selectDestination(nodes map[node.ID]*node.Info, inflight map[node.ID]int) node.ID {
@@
-		if count := nodeTaskSize[id]; count < minCount {
+		if count := nodeTaskSize[id] + inflight[id]; count < minCount {
 			minCount = count
 			minNode = id
 		}
 	}

Also applies to: 164-175

Comment on lines +62 to +67
**Liveness 状态转换规则**:
```
Alive → Draining:通过 drain API 触发
Draining → Stopping:drain 完成(所有任务迁移完成)
Alive → Stopping:节点优雅关闭(非 drain 场景)
```
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Add a language to fenced code blocks (MD040).
The fenced block lacks a language identifier.

✍️ Suggested fix
-```
+```text
 Alive → Draining:通过 drain API 触发
 Draining → Stopping:drain 完成(所有任务迁移完成)
 Alive → Stopping:节点优雅关闭(非 drain 场景)
</details>

<details>
<summary>🧰 Tools</summary>

<details>
<summary>🪛 markdownlint-cli2 (0.18.1)</summary>

63-63: Fenced code blocks should have a language specified

(MD040, fenced-code-language)

</details>

</details>

<details>
<summary>🤖 Prompt for AI Agents</summary>

In @.kiro/specs/drain-capture/design.md around lines 62 - 67, The fenced code
block under "Liveness 状态转换规则" is missing a language identifier; update the
opening fence from totext so the block becomes a text-coded fence
containing the three transition lines (Alive → Draining..., Draining →
Stopping..., Alive → Stopping...) to satisfy MD040; ensure only the fence marker
is changed and content remains unchanged.


</details>

<!-- fingerprinting:phantom:medusa:eagle -->

<!-- This is an auto-generated comment by CodeRabbit -->

Comment on lines +843 to +907
### Property 1: Draining Node Exclusion from Scheduling

*For any* scheduling decision (maintainer or dispatcher), when a draining target is set, the draining node SHALL NOT be selected as a destination node.

**Validates: Requirements 2.7, 3.6, 4.2, 4.5, 5.2, 5.3, 5.4**

### Property 2: Single Draining Target Invariant

*For any* cluster state, there SHALL be at most one draining target at any time.

**Validates: Requirements 2.3, 2.4**

### Property 3: Maintainer-DDLSpan Co-location

*For any* maintainer, its associated Table Trigger Event Dispatcher (ddlSpan) SHALL always be on the same node as the maintainer.

**Validates: Architectural constraint**

### Property 4: Drain State Transition

*For any* drain operation, the node liveness SHALL transition from Alive → Draining → Stopping, and never back to Alive (except the “only draining node left” recovery case).

**Validates: Requirements 9.1, 9.2, 9.3, 9.4, 9.5**

### Property 5: Batch Size Constraint

*For any* batch of migration operators generated by the drain scheduler, the number of concurrent in-progress operators SHALL NOT exceed the configured batch size.

**Validates: Requirements 3.3**

### Property 6: Drain Completion Condition

*For any* drain operation, the drain SHALL be marked complete only when both maintainer count AND dispatcher count on the draining node are zero.

**Validates: Requirements 2.9, 3.8**

### Property 7: Drain Notification Propagation

*For any* draining target set in coordinator, the draining capture ID SHALL be propagated to all maintainers via the coordinator→maintainer notification channel within one heartbeat cycle.

**Validates: Requirements 10.1, 10.2**

### Property 8: Workload Balanced Distribution

*For any* set of maintainers being migrated from a draining node, the destination nodes SHALL be selected based on lowest workload.

**Validates: Requirements 2.8, 3.5**

### Property 9: Draining Node Cannot Become Coordinator (with exception)

*For any* coordinator election, nodes with Draining or Stopping liveness SHALL NOT be selected as coordinator candidates, EXCEPT when the draining node is the only node left in cluster.

**Validates: Requirements 9.6, 9.8**

### Property 10: Draining Node Crash Cleanup

*For any* draining node that crashes, the drain state SHALL be cleared and the maintainers SHALL be rescheduled by basic scheduler.

**Validates: Requirements 9.7**

### Property 11: Destination Node Failure Handling

*For any* destination node failure during maintainer migration, the MoveMaintainer operator SHALL either mark the maintainer as absent (if origin stopped) or convert to add operation (if origin not stopped).

**Validates: Requirements 8.2**
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Use headings (or plain text) instead of bold-only lines (MD036).
The **Validates: ...** lines are styled like headings but are just bold text. Convert them to a proper heading or plain text to satisfy markdownlint.

✍️ Example adjustment
-**Validates: Requirements 2.7, 3.6, 4.2, 4.5, 5.2, 5.3, 5.4**
+#### Validates: Requirements 2.7, 3.6, 4.2, 4.5, 5.2, 5.3, 5.4
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
### Property 1: Draining Node Exclusion from Scheduling
*For any* scheduling decision (maintainer or dispatcher), when a draining target is set, the draining node SHALL NOT be selected as a destination node.
**Validates: Requirements 2.7, 3.6, 4.2, 4.5, 5.2, 5.3, 5.4**
### Property 2: Single Draining Target Invariant
*For any* cluster state, there SHALL be at most one draining target at any time.
**Validates: Requirements 2.3, 2.4**
### Property 3: Maintainer-DDLSpan Co-location
*For any* maintainer, its associated Table Trigger Event Dispatcher (ddlSpan) SHALL always be on the same node as the maintainer.
**Validates: Architectural constraint**
### Property 4: Drain State Transition
*For any* drain operation, the node liveness SHALL transition from Alive → Draining → Stopping, and never back to Alive (except the only draining node left recovery case).
**Validates: Requirements 9.1, 9.2, 9.3, 9.4, 9.5**
### Property 5: Batch Size Constraint
*For any* batch of migration operators generated by the drain scheduler, the number of concurrent in-progress operators SHALL NOT exceed the configured batch size.
**Validates: Requirements 3.3**
### Property 6: Drain Completion Condition
*For any* drain operation, the drain SHALL be marked complete only when both maintainer count AND dispatcher count on the draining node are zero.
**Validates: Requirements 2.9, 3.8**
### Property 7: Drain Notification Propagation
*For any* draining target set in coordinator, the draining capture ID SHALL be propagated to all maintainers via the coordinator→maintainer notification channel within one heartbeat cycle.
**Validates: Requirements 10.1, 10.2**
### Property 8: Workload Balanced Distribution
*For any* set of maintainers being migrated from a draining node, the destination nodes SHALL be selected based on lowest workload.
**Validates: Requirements 2.8, 3.5**
### Property 9: Draining Node Cannot Become Coordinator (with exception)
*For any* coordinator election, nodes with Draining or Stopping liveness SHALL NOT be selected as coordinator candidates, EXCEPT when the draining node is the only node left in cluster.
**Validates: Requirements 9.6, 9.8**
### Property 10: Draining Node Crash Cleanup
*For any* draining node that crashes, the drain state SHALL be cleared and the maintainers SHALL be rescheduled by basic scheduler.
**Validates: Requirements 9.7**
### Property 11: Destination Node Failure Handling
*For any* destination node failure during maintainer migration, the MoveMaintainer operator SHALL either mark the maintainer as absent (if origin stopped) or convert to add operation (if origin not stopped).
**Validates: Requirements 8.2**
### Property 1: Draining Node Exclusion from Scheduling
*For any* scheduling decision (maintainer or dispatcher), when a draining target is set, the draining node SHALL NOT be selected as a destination node.
#### Validates: Requirements 2.7, 3.6, 4.2, 4.5, 5.2, 5.3, 5.4
### Property 2: Single Draining Target Invariant
*For any* cluster state, there SHALL be at most one draining target at any time.
#### Validates: Requirements 2.3, 2.4
### Property 3: Maintainer-DDLSpan Co-location
*For any* maintainer, its associated Table Trigger Event Dispatcher (ddlSpan) SHALL always be on the same node as the maintainer.
#### Validates: Architectural constraint
### Property 4: Drain State Transition
*For any* drain operation, the node liveness SHALL transition from Alive → Draining → Stopping, and never back to Alive (except the "only draining node left" recovery case).
#### Validates: Requirements 9.1, 9.2, 9.3, 9.4, 9.5
### Property 5: Batch Size Constraint
*For any* batch of migration operators generated by the drain scheduler, the number of concurrent in-progress operators SHALL NOT exceed the configured batch size.
#### Validates: Requirements 3.3
### Property 6: Drain Completion Condition
*For any* drain operation, the drain SHALL be marked complete only when both maintainer count AND dispatcher count on the draining node are zero.
#### Validates: Requirements 2.9, 3.8
### Property 7: Drain Notification Propagation
*For any* draining target set in coordinator, the draining capture ID SHALL be propagated to all maintainers via the coordinator→maintainer notification channel within one heartbeat cycle.
#### Validates: Requirements 10.1, 10.2
### Property 8: Workload Balanced Distribution
*For any* set of maintainers being migrated from a draining node, the destination nodes SHALL be selected based on lowest workload.
#### Validates: Requirements 2.8, 3.5
### Property 9: Draining Node Cannot Become Coordinator (with exception)
*For any* coordinator election, nodes with Draining or Stopping liveness SHALL NOT be selected as coordinator candidates, EXCEPT when the draining node is the only node left in cluster.
#### Validates: Requirements 9.6, 9.8
### Property 10: Draining Node Crash Cleanup
*For any* draining node that crashes, the drain state SHALL be cleared and the maintainers SHALL be rescheduled by basic scheduler.
#### Validates: Requirements 9.7
### Property 11: Destination Node Failure Handling
*For any* destination node failure during maintainer migration, the MoveMaintainer operator SHALL either mark the maintainer as absent (if origin stopped) or convert to add operation (if origin not stopped).
#### Validates: Requirements 8.2
🧰 Tools
🪛 markdownlint-cli2 (0.18.1)

847-847: Emphasis used instead of a heading

(MD036, no-emphasis-as-heading)


853-853: Emphasis used instead of a heading

(MD036, no-emphasis-as-heading)


859-859: Emphasis used instead of a heading

(MD036, no-emphasis-as-heading)


865-865: Emphasis used instead of a heading

(MD036, no-emphasis-as-heading)


871-871: Emphasis used instead of a heading

(MD036, no-emphasis-as-heading)


877-877: Emphasis used instead of a heading

(MD036, no-emphasis-as-heading)


883-883: Emphasis used instead of a heading

(MD036, no-emphasis-as-heading)


889-889: Emphasis used instead of a heading

(MD036, no-emphasis-as-heading)


895-895: Emphasis used instead of a heading

(MD036, no-emphasis-as-heading)


901-901: Emphasis used instead of a heading

(MD036, no-emphasis-as-heading)


907-907: Emphasis used instead of a heading

(MD036, no-emphasis-as-heading)

🤖 Prompt for AI Agents
In @.kiro/specs/drain-capture/design.md around lines 843 - 907, The markdown
uses bold-only lines like "**Validates: Requirements 2.7, 3.6, ...**" which
triggers MD036; update each Property section (e.g., "Property 1: Draining Node
Exclusion from Scheduling", "Property 2: Single Draining Target Invariant",
etc.) to replace the bold-only "Validates: ..." lines with either a proper
heading (e.g., "#### Validates" or "Validates:") or plain paragraph text (e.g.,
"Validates: Requirements 2.7, 3.6, ...") so they are not solely bold formatting,
and ensure consistency across all Property N blocks (Properties 1–11) to satisfy
markdownlint MD036.

Comment on lines 83 to +87
captureGroup := v2.Group("/captures")
captureGroup.Use(coordinatorMiddleware)
captureGroup.GET("", api.ListCaptures)
captureGroup.PUT("/:capture_id/drain", api.DrainCapture)
captureGroup.GET("/:capture_id/drain", api.GetDrainStatus)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

git ls-files | grep -E "api.*\.go$" | head -20

Repository: pingcap/ticdc

Length of output: 517


🏁 Script executed:

find . -type f -name "api.go" | head -10

Repository: pingcap/ticdc

Length of output: 88


🏁 Script executed:

cat -n api/v2/api.go | sed -n '75,95p'

Repository: pingcap/ticdc

Length of output: 1443


🏁 Script executed:

rg -n "authenticateMiddleware" api/v2/api.go

Repository: pingcap/ticdc

Length of output: 1907


🏁 Script executed:

rg -n "coordinatorMiddleware" api/v2/api.go

Repository: pingcap/ticdc

Length of output: 1564


🏁 Script executed:

cat -n api/middleware/authenticate_middleware.go

Repository: pingcap/ticdc

Length of output: 5410


🏁 Script executed:

cat -n api/v2/capture.go | head -150

Repository: pingcap/ticdc

Length of output: 5658


Add authentication to drain endpoints for consistency and security.
The DrainCapture and GetDrainStatus routes lack authenticateMiddleware, while other admin operations in the changefeed group consistently use it. Since drain is a mutating admin operation and the handlers do not perform internal auth checks, these routes should be protected by the same authentication middleware applied elsewhere.

Recommended change
	captureGroup := v2.Group("/captures")
	captureGroup.Use(coordinatorMiddleware)
	captureGroup.GET("", api.ListCaptures)
-	captureGroup.PUT("/:capture_id/drain", api.DrainCapture)
-	captureGroup.GET("/:capture_id/drain", api.GetDrainStatus)
+	captureGroup.PUT("/:capture_id/drain", authenticateMiddleware, api.DrainCapture)
+	captureGroup.GET("/:capture_id/drain", authenticateMiddleware, api.GetDrainStatus)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
captureGroup := v2.Group("/captures")
captureGroup.Use(coordinatorMiddleware)
captureGroup.GET("", api.ListCaptures)
captureGroup.PUT("/:capture_id/drain", api.DrainCapture)
captureGroup.GET("/:capture_id/drain", api.GetDrainStatus)
captureGroup := v2.Group("/captures")
captureGroup.Use(coordinatorMiddleware)
captureGroup.GET("", api.ListCaptures)
captureGroup.PUT("/:capture_id/drain", authenticateMiddleware, api.DrainCapture)
captureGroup.GET("/:capture_id/drain", authenticateMiddleware, api.GetDrainStatus)
🤖 Prompt for AI Agents
In `@api/v2/api.go` around lines 83 - 87, The drain endpoints under captureGroup
are missing authentication: add authenticateMiddleware to the middleware chain
for the routes that register DrainCapture and GetDrainStatus so they match other
admin operations; specifically, ensure captureGroup.PUT("/:capture_id/drain",
...) and captureGroup.GET("/:capture_id/drain", ...) are registered with
authenticateMiddleware (in addition to coordinatorMiddleware) before invoking
the handlers DrainCapture and GetDrainStatus to enforce consistent auth.

Comment on lines +72 to +92
// Check if current node is coordinator
selfInfo, err := h.server.SelfInfo()
if err != nil {
_ = c.Error(err)
return
}

nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName)

// Validate target capture exists
targetNode := nodeManager.GetNodeInfo(node.ID(captureID))
if targetNode == nil {
_ = c.Error(errors.ErrCaptureNotExist.GenWithStackByArgs(captureID))
return
}

// Check if target is coordinator
if targetNode.ID == selfInfo.ID {
_ = c.Error(errors.ErrAPIInvalidParam.GenWithStackByArgs("cannot drain coordinator node"))
return
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# Search for coordinator ID related code
rg -n "GetCoordinatorID|coordinatorID|CoordinatorID" -g '*.go' | head -30

Repository: pingcap/ticdc

Length of output: 2672


🏁 Script executed:

# Search for OpenAPI v2 handler registration and routing
rg -n "OpenAPIV2|/api/v2" -g '*.go' | head -40

Repository: pingcap/ticdc

Length of output: 2940


🏁 Script executed:

# Locate the DrainCapture handler implementation
rg -n "DrainCapture|drain" -g '*.go' api/v2/ | head -20

Repository: pingcap/ticdc

Length of output: 1679


🏁 Script executed:

# Find where the coordinator information is stored/accessed
rg -n "selfInfo|GetSelfInfo|coordinator" -g '*.go' | grep -i "coordinator\|selfinfo" | head -30

Repository: pingcap/ticdc

Length of output: 2940


🏁 Script executed:

# Read the full DrainCapture handler implementation
cat -n api/v2/capture.go | sed -n '55,120p'

Repository: pingcap/ticdc

Length of output: 2477


🏁 Script executed:

# Check NodeManager methods for getting coordinator ID
cat -n server/watcher/module_node_manager.go | sed -n '1,150p'

Repository: pingcap/ticdc

Length of output: 5090


🏁 Script executed:

# Check how handlers are registered and if routes can reach non-coordinator nodes
cat -n api/http.go | head -50

Repository: pingcap/ticdc

Length of output: 2093


🏁 Script executed:

# Look for the SelfInfo method and how it's used
rg -n "func.*SelfInfo|GetSelfInfo" -g '*.go' -A 5

Repository: pingcap/ticdc

Length of output: 848


🏁 Script executed:

# Search for all public methods in NodeManager
cat -n server/watcher/module_node_manager.go | sed -n '145,250p'

Repository: pingcap/ticdc

Length of output: 4092


🏁 Script executed:

# Look for any getter method for coordinator ID
rg -n "func.*NodeManager.*Coordinator|func.*GetOwner|func.*GetCoordinator" -A 3 server/watcher/module_node_manager.go

Repository: pingcap/ticdc

Length of output: 251


🏁 Script executed:

# Check if there's a proper way to get coordinator ID from the manager
rg -n "coordinatorID" server/watcher/module_node_manager.go

Repository: pingcap/ticdc

Length of output: 250


🏁 Script executed:

# Check how other handlers validate coordinator operations
rg -n "GetCoordinator\|coordinator\|Coordinator" api/v2/ -A 2 | head -50

Repository: pingcap/ticdc

Length of output: 39


🏁 Script executed:

# Search for getter method for coordinator ID in NodeManager
rg -n "GetCoordinator" server/watcher/module_node_manager.go

Repository: pingcap/ticdc

Length of output: 209


🏁 Script executed:

# Look for how other code accesses the coordinator ID
rg -n "GetOwnerID|coordinatorID\\.Load" -A 2 -B 2

Repository: pingcap/ticdc

Length of output: 4381


🏁 Script executed:

# Check what server.GetCoordinator() returns and how it validates
cat -n server/server.go | sed -n '500,550p'

Repository: pingcap/ticdc

Length of output: 1742


🏁 Script executed:

# Search for any validation of coordinator in handlers
rg -n "is.*coordinator|coordinator.*check" -i api/v2/ -B 2 -A 2

Repository: pingcap/ticdc

Length of output: 4497


Use the actual coordinator ID for the "cannot drain coordinator" check.

The check at line 89 compares the target node to the local node (selfInfo.ID), not the actual coordinator. On non-coordinator nodes, this allows draining the coordinator while blocking draining the local node. Compare against the actual coordinator ID instead—obtain it via h.server.GetCoordinatorInfo() (returns the coordinator's node info) or directly query the coordinator ID using the etcd client.

🤖 Prompt for AI Agents
In `@api/v2/capture.go` around lines 72 - 92, The check is comparing targetNode.ID
to the local node (selfInfo.ID) instead of the actual coordinator; update the
drain-prevention logic in the handler to fetch the coordinator via
h.server.GetCoordinatorInfo() (or equivalent) and compare targetNode.ID against
coordinator.ID, returning the same ErrAPIInvalidParam if they match; ensure you
handle errors returned by GetCoordinatorInfo() similarly to the existing
SelfInfo() error handling so failures surface correctly.

Comment on lines +279 to +304
**HTTP 入口与路由**
- `cdc/server/server.go`: `(*server).startStatusHTTP`
- `cdc/http.go`: `RegisterRoutes`
- `cdc/api/v2/api.go`: `RegisterOpenAPIV2Routes`
- `cdc/api/middleware/middleware.go`: `ForwardToOwnerMiddleware` / `CheckServerReadyMiddleware` / `ErrorHandleMiddleware`

**v2 handler 与 owner API 桥接**
- `cdc/api/v2/capture.go`: `(*OpenAPIV2).drainCapture`
- `cdc/api/util.go`: `HandleOwnerDrainCapture` / `ForwardToOwner` / `ForwardToCapture`

**Owner job 队列与 drain 入口**
- `cdc/owner/owner.go`: `(*ownerImpl).Tick` / `handleJobs` / `DrainCapture` / `handleDrainCaptures` / `pushOwnerJob`
- `cdc/owner/status_provider.go`: `(*ownerStatusProvider).GetCaptures` / `sendQueryToOwner`

**scheduler v3 drain 与 tick 主循环**
- `cdc/scheduler/internal/v3/coordinator.go`: `(*coordinator).DrainCapture` / `Tick` / `poll`
- `cdc/scheduler/internal/v3/scheduler/scheduler_manager.go`: `(*Manager).DrainCapture` / `DrainingTarget` / `Schedule`
- `cdc/scheduler/internal/v3/scheduler/scheduler_drain_capture.go`: `(*drainCaptureScheduler).Schedule`
- `cdc/scheduler/internal/v3/member/capture_manager.go`: `(*CaptureManager).Tick`(Heartbeat.IsStopping)

**表迁移消息与状态机**
- `cdc/scheduler/internal/v3/replication/replication_manager.go`: `(*Manager).HandleTasks`
- `cdc/scheduler/internal/v3/replication/replication_set.go`: `handleMoveTable` / `poll` / `pollOnPrepare` / `pollOnCommit`
- `cdc/scheduler/internal/v3/agent/agent.go`: `handleMessageHeartbeat` / `handleMessageDispatchTableRequest`
- `cdc/scheduler/internal/v3/agent/table.go`: `tableSpanManager.poll` / `tableSpan.handleAddTableTask` / `handleRemoveTableTask`

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Use headings instead of emphasis to satisfy markdownlint (MD036).

The bold section titles are flagged as “emphasis used instead of a heading.” Consider switching them to proper headings.

✅ Example fix
-**HTTP 入口与路由**
+### HTTP 入口与路由
@@
-**v2 handler 与 owner API 桥接**
+### v2 handler 与 owner API 桥接
@@
-**Owner job 队列与 drain 入口**
+### Owner job 队列与 drain 入口
@@
-**scheduler v3 drain 与 tick 主循环**
+### scheduler v3 drain 与 tick 主循环
@@
-**表迁移消息与状态机**
+### 表迁移消息与状态机
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
**HTTP 入口与路由**
- `cdc/server/server.go`: `(*server).startStatusHTTP`
- `cdc/http.go`: `RegisterRoutes`
- `cdc/api/v2/api.go`: `RegisterOpenAPIV2Routes`
- `cdc/api/middleware/middleware.go`: `ForwardToOwnerMiddleware` / `CheckServerReadyMiddleware` / `ErrorHandleMiddleware`
**v2 handler 与 owner API 桥接**
- `cdc/api/v2/capture.go`: `(*OpenAPIV2).drainCapture`
- `cdc/api/util.go`: `HandleOwnerDrainCapture` / `ForwardToOwner` / `ForwardToCapture`
**Owner job 队列与 drain 入口**
- `cdc/owner/owner.go`: `(*ownerImpl).Tick` / `handleJobs` / `DrainCapture` / `handleDrainCaptures` / `pushOwnerJob`
- `cdc/owner/status_provider.go`: `(*ownerStatusProvider).GetCaptures` / `sendQueryToOwner`
**scheduler v3 drain 与 tick 主循环**
- `cdc/scheduler/internal/v3/coordinator.go`: `(*coordinator).DrainCapture` / `Tick` / `poll`
- `cdc/scheduler/internal/v3/scheduler/scheduler_manager.go`: `(*Manager).DrainCapture` / `DrainingTarget` / `Schedule`
- `cdc/scheduler/internal/v3/scheduler/scheduler_drain_capture.go`: `(*drainCaptureScheduler).Schedule`
- `cdc/scheduler/internal/v3/member/capture_manager.go`: `(*CaptureManager).Tick`(Heartbeat.IsStopping)
**表迁移消息与状态机**
- `cdc/scheduler/internal/v3/replication/replication_manager.go`: `(*Manager).HandleTasks`
- `cdc/scheduler/internal/v3/replication/replication_set.go`: `handleMoveTable` / `poll` / `pollOnPrepare` / `pollOnCommit`
- `cdc/scheduler/internal/v3/agent/agent.go`: `handleMessageHeartbeat` / `handleMessageDispatchTableRequest`
- `cdc/scheduler/internal/v3/agent/table.go`: `tableSpanManager.poll` / `tableSpan.handleAddTableTask` / `handleRemoveTableTask`
### HTTP 入口与路由
- `cdc/server/server.go`: `(*server).startStatusHTTP`
- `cdc/http.go`: `RegisterRoutes`
- `cdc/api/v2/api.go`: `RegisterOpenAPIV2Routes`
- `cdc/api/middleware/middleware.go`: `ForwardToOwnerMiddleware` / `CheckServerReadyMiddleware` / `ErrorHandleMiddleware`
### v2 handler 与 owner API 桥接
- `cdc/api/v2/capture.go`: `(*OpenAPIV2).drainCapture`
- `cdc/api/util.go`: `HandleOwnerDrainCapture` / `ForwardToOwner` / `ForwardToCapture`
### Owner job 队列与 drain 入口
- `cdc/owner/owner.go`: `(*ownerImpl).Tick` / `handleJobs` / `DrainCapture` / `handleDrainCaptures` / `pushOwnerJob`
- `cdc/owner/status_provider.go`: `(*ownerStatusProvider).GetCaptures` / `sendQueryToOwner`
### scheduler v3 drain 与 tick 主循环
- `cdc/scheduler/internal/v3/coordinator.go`: `(*coordinator).DrainCapture` / `Tick` / `poll`
- `cdc/scheduler/internal/v3/scheduler/scheduler_manager.go`: `(*Manager).DrainCapture` / `DrainingTarget` / `Schedule`
- `cdc/scheduler/internal/v3/scheduler/scheduler_drain_capture.go`: `(*drainCaptureScheduler).Schedule`
- `cdc/scheduler/internal/v3/member/capture_manager.go`: `(*CaptureManager).Tick`(Heartbeat.IsStopping)
### 表迁移消息与状态机
- `cdc/scheduler/internal/v3/replication/replication_manager.go`: `(*Manager).HandleTasks`
- `cdc/scheduler/internal/v3/replication/replication_set.go`: `handleMoveTable` / `poll` / `pollOnPrepare` / `pollOnCommit`
- `cdc/scheduler/internal/v3/agent/agent.go`: `handleMessageHeartbeat` / `handleMessageDispatchTableRequest`
- `cdc/scheduler/internal/v3/agent/table.go`: `tableSpanManager.poll` / `tableSpan.handleAddTableTask` / `handleRemoveTableTask`
🧰 Tools
🪛 markdownlint-cli2 (0.18.1)

279-279: Emphasis used instead of a heading

(MD036, no-emphasis-as-heading)


285-285: Emphasis used instead of a heading

(MD036, no-emphasis-as-heading)


289-289: Emphasis used instead of a heading

(MD036, no-emphasis-as-heading)


293-293: Emphasis used instead of a heading

(MD036, no-emphasis-as-heading)


299-299: Emphasis used instead of a heading

(MD036, no-emphasis-as-heading)

🤖 Prompt for AI Agents
In `@cdc-api-v2-capture-drain-cn.md` around lines 279 - 304, The document uses
bolded section titles (e.g., "**HTTP 入口与路由**", "**v2 handler 与 owner API 桥接**",
etc.) which triggers markdownlint MD036; replace each bolded title with a proper
Markdown heading (for example "## HTTP 入口与路由", "## v2 handler 与 owner API 桥接",
"## Owner job 队列与 drain 入口", "## scheduler v3 drain 与 tick 主循环", "## 表迁移消息与状态机")
keeping a consistent heading level and preserving the following bullet lists and
function references (`(*server).startStatusHTTP`, `RegisterRoutes`,
`(*OpenAPIV2).drainCapture`, `(*ownerImpl).Tick`, `(*coordinator).DrainCapture`,
`(*Manager).HandleTasks`, etc.); ensure a blank line before each heading and
adjust any nested sections to use appropriate subheading levels to maintain
semantic structure.

Comment on lines +152 to +161
// allDispatchersMigrated checks if all dispatchers have been migrated from the draining node.
// Currently, this is a simplified implementation that returns true when all maintainers
// have been migrated. The actual dispatcher migration is handled by the new Maintainer
// after it bootstraps and discovers dispatchers on the draining node.
// TODO: Implement proper dispatcher count tracking via heartbeat.
func (s *drainScheduler) allDispatchersMigrated(nodeID node.ID) bool {
// For now, we consider dispatchers migrated when all maintainers are migrated.
// The new Maintainer will handle dispatcher migration after bootstrap.
// This is a conservative approach - the drain will complete when maintainers are done.
return true
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Drain can complete before dispatchers migrate.
allDispatchersMigrated always returns true, so the coordinator will clear drain state as soon as maintainers move—even if dispatchers still exist on the draining node. That violates the drain completion contract and can lead to premature Stopping.

🤖 Prompt for AI Agents
In `@coordinator/scheduler/drain.go` around lines 152 - 161, The current
allDispatchersMigrated on type drainScheduler always returns true causing drain
to finish before dispatchers leave; change it to query the dispatcher
membership/heartbeat for the given node.ID (e.g., via the dispatcher
registry/heartbeat store or the maintainer discovery API) and return true only
when the live dispatcher count for nodeID is zero; update allDispatchersMigrated
to call the appropriate store/heartbeat method, handle transient errors
(retry/log and return false on uncertainty) and remove the temporary stub so
drain only completes when no dispatchers remain.

Comment on lines +52 to +62
```
Push(path, event)
-> pathMap[ path ] => *pathInfo{stream, dest, pendingQueue, ...}
-> stream.addEvent(eventWrap{event, pathInfo, ...})

stream.handleLoop() // 每个 stream 一个 goroutine
-> 把 eventWrap 写入对应 path 的 pendingQueue
-> 从全局 signalQueue 选一个 path
-> popEvents() 得到 (events[], pathInfo)
-> handler.Handle(dest, events...)
```
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Add a language identifier to the fenced code block (markdownlint MD040).
The runtime-structure code fence lacks a language tag; markdownlint will flag this.

✍️ Suggested fix
-```
+```text
 Push(path, event)
   -> pathMap[ path ] => *pathInfo{stream, dest, pendingQueue, ...}
   -> stream.addEvent(eventWrap{event, pathInfo, ...})

 stream.handleLoop()  // 每个 stream 一个 goroutine
   -> 把 eventWrap 写入对应 path 的 pendingQueue
   -> 从全局 signalQueue 选一个 path
   -> popEvents() 得到 (events[], pathInfo)
   -> handler.Handle(dest, events...)
-```
+```
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
```
Push(path, event)
-> pathMap[ path ] => *pathInfo{stream, dest, pendingQueue, ...}
-> stream.addEvent(eventWrap{event, pathInfo, ...})
stream.handleLoop() // 每个 stream 一个 goroutine
-> 把 eventWrap 写入对应 path 的 pendingQueue
-> 从全局 signalQueue 选一个 path
-> popEvents() 得到 (events[], pathInfo)
-> handler.Handle(dest, events...)
```
🧰 Tools
🪛 markdownlint-cli2 (0.18.1)

52-52: Fenced code blocks should have a language specified

(MD040, fenced-code-language)

🤖 Prompt for AI Agents
In `@docs/design/2026-01-10-dynamic-stream.md` around lines 52 - 62, The fenced
code block showing the runtime structure (containing Push, pathMap, *pathInfo,
stream.addEvent, stream.handleLoop, pendingQueue, signalQueue, popEvents,
handler.Handle and dest) needs a language identifier to satisfy markdownlint
MD040; update the opening fence from ``` to a tagged fence such as ```text (or
```bash/```go if more appropriate) so the block becomes ```text ... ``` and
leave the existing content unchanged.

Comment on lines 198 to 206
func (l *Liveness) String() string {
switch *l {
case LivenessCaptureAlive:
return "Alive"
case LivenessCaptureStopping:
return "Stopping"
case LivenessCaptureDraining:
return "Draining"
default:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# Find the actual file and examine it
fd -t f "util.go" | grep -E "pkg/api"

Repository: pingcap/ticdc

Length of output: 72


🏁 Script executed:

# Find util.go in pkg/api
git ls-files "pkg/api/util.go"

Repository: pingcap/ticdc

Length of output: 72


🏁 Script executed:

# Read the file to understand Liveness type
cat "pkg/api/util.go" | head -250

Repository: pingcap/ticdc

Length of output: 6549


Use atomic Load in Liveness.String for consistency and thread safety.

The String() method directly dereferences *l without synchronization, which violates the atomic access contract of the Liveness type. Other methods like IsSchedulable() correctly use Load(). Update the switch statement to use l.Load() instead.

Suggested fix
 func (l *Liveness) String() string {
-	switch *l {
+	switch l.Load() {
 	case LivenessCaptureAlive:
 		return "Alive"
 	case LivenessCaptureStopping:
 		return "Stopping"
 	case LivenessCaptureDraining:
 		return "Draining"
 	default:
 		return "unknown"
 	}
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (l *Liveness) String() string {
switch *l {
case LivenessCaptureAlive:
return "Alive"
case LivenessCaptureStopping:
return "Stopping"
case LivenessCaptureDraining:
return "Draining"
default:
func (l *Liveness) String() string {
switch l.Load() {
case LivenessCaptureAlive:
return "Alive"
case LivenessCaptureStopping:
return "Stopping"
case LivenessCaptureDraining:
return "Draining"
default:
return "unknown"
}
}
🤖 Prompt for AI Agents
In `@pkg/api/util.go` around lines 198 - 206, The Liveness.String method
dereferences *l directly which breaks the atomic access contract; change the
switch to use l.Load() instead of *l so it reads the value atomically
(consistent with other methods like IsSchedulable). Locate the String method on
type Liveness and replace uses of *l in the switch with the result of l.Load(),
preserving the same case labels (LivenessCaptureAlive, LivenessCaptureStopping,
LivenessCaptureDraining) and default behavior.

Comment on lines +40 to +68
// Load atomically loads the liveness value.
func (l *Liveness) Load() Liveness {
return Liveness(atomic.LoadInt32((*int32)(l)))
}

// Store atomically stores the liveness value.
func (l *Liveness) Store(val Liveness) {
atomic.StoreInt32((*int32)(l), int32(val))
}

// IsSchedulable returns true if the node can accept new workloads.
// Returns false if node is Draining or Stopping.
func (l *Liveness) IsSchedulable() bool {
return l.Load() == LivenessCaptureAlive
}

// StoreDraining sets liveness to Draining. Returns true if successful.
// Can only transition from Alive to Draining.
func (l *Liveness) StoreDraining() bool {
return atomic.CompareAndSwapInt32(
(*int32)(l), int32(LivenessCaptureAlive), int32(LivenessCaptureDraining))
}

// DrainComplete transitions from Draining to Stopping.
// Returns true if successful.
func (l *Liveness) DrainComplete() bool {
return atomic.CompareAndSwapInt32(
(*int32)(l), int32(LivenessCaptureDraining), int32(LivenessCaptureStopping))
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Find all .Store( call sites in Go files to audit Liveness usage
rg -n --type=go '\.Store\(' -A 2 -B 2

Repository: pingcap/ticdc

Length of output: 50370


🏁 Script executed:

#!/bin/bash
# Search for Liveness.Store calls specifically in Go files
rg -n 'Liveness.*\.Store\(' --type=go -A 3 -B 2

Repository: pingcap/ticdc

Length of output: 490


🏁 Script executed:

#!/bin/bash
# Examine the SetNodeLiveness method and its callers
rg -n "SetNodeLiveness" --type=go -A 5 -B 3

Repository: pingcap/ticdc

Length of output: 6475


🏁 Script executed:

#!/bin/bash
# Examine the drain_state.go file to understand the lifecycle
cat -n coordinator/drain_state.go | head -150

Repository: pingcap/ticdc

Length of output: 5840


🏁 Script executed:

#!/bin/bash
# Find all SetNodeLiveness calls with their arguments to see what transitions happen
rg -n "SetNodeLiveness.*Alive|SetNodeLiveness.*Draining|SetNodeLiveness.*Stopping" --type=go

Repository: pingcap/ticdc

Length of output: 1533


🏁 Script executed:

#!/bin/bash
# View the Liveness type and its methods in pkg/node/node.go
cat -n pkg/node/node.go | sed -n '1,100p'

Repository: pingcap/ticdc

Length of output: 3491


Constrain Liveness.Store to prevent invalid state reversions.
Store() allows arbitrary state transitions, including reverting from Draining back to Alive (seen in coordinator/drain_state.go:128), which violates the one-way drain lifecycle enforced by the guarded StoreDraining() and DrainComplete() methods. Either restrict Store() to initialization only, make it unexported, or remove it in favor of explicit guarded transition methods to prevent accidental "revive" transitions in production code.

🤖 Prompt for AI Agents
In `@pkg/node/node.go` around lines 40 - 68, The public Liveness.Store method
currently permits arbitrary state changes (including reverting from Draining
back to Alive) which breaks the one-way lifecycle enforced by StoreDraining and
DrainComplete; update Liveness.Store to prevent invalid reversions by either: 1)
making Store unexported (rename to store) and restrict its use to initialization
paths only, or 2) enforce a guarded write that only allows transitioning from a
zero/uninitialized state to a valid initial state (returning false otherwise),
or 3) remove Store entirely and update call sites to use the existing atomic
CompareAndSwap-based methods (StoreDraining, DrainComplete) for state
transitions; adjust all references to Liveness.Store accordingly so transitions
can only occur via the guarded methods.

Comment on lines +219 to +223
// ClearNodeLiveness removes the liveness tracking for a node.
// Called when a node is removed from the cluster.
func (c *NodeManager) ClearNodeLiveness(id node.ID) {
c.nodeLiveness.Delete(id)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

rg -n "ClearNodeLiveness" --type go

Repository: pingcap/ticdc

Length of output: 430


🏁 Script executed:

rg -n "removeNode|deleteNode|node.*removed|node.*delete" --type go -A 3 server/watcher/

Repository: pingcap/ticdc

Length of output: 358


🏁 Script executed:

rg -n "NodeManager\{|NewNodeManager|onNode|nodeUpdate" --type go -B 2 -A 5 server/watcher/

Repository: pingcap/ticdc

Length of output: 2063


🏁 Script executed:

cat -n server/watcher/module_node_manager.go | head -150

Repository: pingcap/ticdc

Length of output: 5090


🏁 Script executed:

cat -n server/watcher/module_node_manager.go | tail -120

Repository: pingcap/ticdc

Length of output: 4514


🏁 Script executed:

# Check if there's any cleanup in node change handlers or elsewhere
rg -n "nodeLiveness|liveness.*Delete|Delete.*liveness" --type go -B 2 -A 2

Repository: pingcap/ticdc

Length of output: 1674


🏁 Script executed:

# Check if ClearNodeLiveness is called outside of tests
rg -n "ClearNodeLiveness" --type go -B 1 -A 1 | grep -v "test"

Repository: pingcap/ticdc

Length of output: 468


Add node liveness cleanup to the node removal handler. The ClearNodeLiveness() method exists but is never called when nodes are actually removed. The Tick() method detects node removal (lines 110–114) but only triggers registered handlers without clearing stale liveness entries. This causes nodeLiveness to grow unboundedly. Call ClearNodeLiveness(id) in the node change handler or directly after detecting a node removal in Tick().

🤖 Prompt for AI Agents
In `@server/watcher/module_node_manager.go` around lines 219 - 223, The node
liveness map is never cleaned up when nodes are removed; call
ClearNodeLiveness(id) when a node removal is detected to prevent unbounded
growth. Update the node removal path in NodeManager.Tick() (the branch that
currently triggers registered handlers on node removal) or inside the node
change handler to invoke ClearNodeLiveness(id) immediately after detecting the
removed node (using the same node ID passed to the handlers), ensuring the
nodeLiveness entry is deleted alongside any other removal logic.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

do-not-merge/needs-linked-issue release-note Denotes a PR that will be considered when it comes time to generate release notes. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant