-
Notifications
You must be signed in to change notification settings - Fork 38
[DNM] Drain capture #4081
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[DNM] Drain capture #4081
Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
📝 WalkthroughWalkthroughIntroduces the Drain Capture feature for TiCDC, enabling graceful node migration by adding drain state management, scheduler integration, liveness tracking, and HTTP APIs to initiate and monitor draining operations across the cluster. Changes
Sequence DiagramsequenceDiagram
participant Client
participant API as API Handler
participant Coordinator
participant DrainState
participant NodeManager
participant DrainScheduler
participant Maintainer
Client->>API: PUT /drain (captureID)
API->>Coordinator: DrainCapture(captureID)
Coordinator->>DrainState: SetDrainingTarget(captureID, counts)
DrainState->>NodeManager: SetNodeLiveness(captureID, Draining)
NodeManager-->>DrainState: ✓
DrainState-->>Coordinator: ✓
Coordinator-->>API: DrainCaptureResponse
API-->>Client: 200 OK
Note over DrainScheduler: Asynchronous Drain Execution
DrainScheduler->>DrainState: GetDrainingTarget()
DrainState-->>DrainScheduler: captureID
DrainScheduler->>NodeManager: GetSchedulableNodes()
NodeManager-->>DrainScheduler: nodes (excluding Draining)
DrainScheduler->>DrainScheduler: Select destinations, batch MoveMaintainer
DrainScheduler->>Maintainer: MoveMaintainer ops
Maintainer-->>DrainScheduler: ✓
loop Until all migrated
Maintainer->>Coordinator: Report progress (heartbeat)
Coordinator-->>DrainScheduler: Query progress
end
DrainScheduler->>DrainState: ClearDrainingTarget()
DrainState->>NodeManager: SetNodeLiveness(captureID, Stopping)
NodeManager-->>DrainState: ✓
DrainState-->>DrainScheduler: ✓
Estimated Code Review Effort🎯 4 (Complex) | ⏱️ ~60 minutes This PR introduces a new feature spanning multiple subsystems: a thread-safe state machine for drain operations, integration with the scheduler framework, liveness state management with atomic transitions, and API handlers. The changes are heterogeneous—mixing logic (drain state, scheduler, filtering), proto/type definitions, and tests across ~20 files—requiring careful review of state transition semantics, concurrency safety, and integration points with existing schedulers and node management. Poem
🚥 Pre-merge checks | ❌ 3❌ Failed checks (2 warnings, 1 inconclusive)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
[FORMAT CHECKER NOTIFICATION] Notice: To remove the 📖 For more info, you can check the "Contribute Code" section in the development guide. |
Summary of ChangesHello @3AceShowHand, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces the 'Drain Capture' feature for TiCDC, enabling operators to gracefully remove a TiCDC node from a cluster without interrupting data synchronization. It achieves this by systematically migrating all associated workloads, including Maintainers and their Dispatchers, to other healthy nodes. The design incorporates new node liveness states, a dedicated drain scheduler, and API endpoints for initiation and monitoring, ensuring a controlled and observable node decommissioning process. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a significant new feature: the ability to drain a capture node. The implementation includes new API endpoints, a drain scheduler, and updates to node liveness states. The overall structure is well-thought-out, with clear separation of concerns. However, there are a few critical issues related to scheduler execution order and inter-component communication that need to be addressed to ensure correctness and align with the design documents. I've also included some suggestions for code clarity and to fix potential load-balancing issues during the drain process.
| repeated MaintainerStatus statuses = 1; | ||
| // draining_capture is the capture ID being drained, empty if no drain in progress. | ||
| // Maintainer should not schedule new dispatchers to this capture. | ||
| string draining_capture = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The draining_capture field has been added to the MaintainerHeartbeat message. This message flows from the maintainer to the coordinator. However, the notification about a draining capture should flow from the coordinator (which initiates the drain) to the maintainers.
This implementation contradicts the design specified in your design documents (.kiro/specs/drain-capture/requirements.md and .kiro/specs/drain-capture/design.md), which call for a new DrainNotification message from the coordinator to the maintainers. Using MaintainerHeartbeat for this purpose is logically incorrect and could lead to issues with propagating the drain status. Please implement the notification mechanism as designed, with the correct message directionality.
| // Generate MoveMaintainer operators in batches | ||
| batchCount := 0 | ||
| for _, cf := range maintainers { | ||
| if batchCount >= availableBatch { | ||
| break | ||
| } | ||
|
|
||
| // Skip if operator already exists | ||
| if s.operatorController.GetOperator(cf.ID) != nil { | ||
| continue | ||
| } | ||
|
|
||
| // Select destination with lowest workload | ||
| destNode := s.selectDestination(schedulableNodes) | ||
| if destNode == "" { | ||
| continue | ||
| } | ||
|
|
||
| op := operator.NewMoveMaintainerOperator(s.changefeedDB, cf, drainingTarget, destNode) | ||
| if s.operatorController.AddOperator(op) { | ||
| batchCount++ | ||
| log.Info("generated move maintainer operator for drain", | ||
| zap.String("changefeed", cf.ID.String()), | ||
| zap.String("from", drainingTarget.String()), | ||
| zap.String("to", destNode.String())) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The selectDestination function is called inside a loop to generate a batch of MoveMaintainer operators. However, it doesn't account for in-flight migrations within the current batch. This can lead to all maintainers in a batch being moved to the same destination node if it initially has the lowest workload, causing an unbalanced distribution.
The design document (.kiro/specs/drain-capture/design.md) suggests tracking in-flight assignments to prevent this. Please consider updating the logic to track planned migrations within the Execute function and pass this information to selectDestination to ensure a more balanced distribution of maintainers across available nodes.
| func (c *NodeManager) SetNodeLiveness(id node.ID, liveness node.Liveness) { | ||
| var l node.Liveness | ||
| actual, _ := c.nodeLiveness.LoadOrStore(id, &l) | ||
| actual.(*node.Liveness).Store(liveness) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The implementation of SetNodeLiveness can be simplified for better clarity and efficiency. The current implementation allocates a new node.Liveness variable on the stack in every call, which is unnecessary if the key already exists in the map.
Consider refactoring this to a more straightforward implementation that avoids this extra allocation.
func (c *NodeManager) SetNodeLiveness(id node.ID, liveness node.Liveness) {
val, _ := c.nodeLiveness.LoadOrStore(id, new(node.Liveness))
val.(*node.Liveness).Store(liveness)
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 12
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
coordinator/scheduler/basic.go (1)
76-82: Guard against empty schedulable node set before scheduling.If all nodes are draining/stopping,
nodeIDsis empty andBasicSchedulemay panic or behave unexpectedly. A short-circuit avoids scheduling with no destinations.🔧 Proposed fix
- nodeIDs := s.nodeManager.GetSchedulableNodeIDs() + nodeIDs := s.nodeManager.GetSchedulableNodeIDs() + if len(nodeIDs) == 0 { + return + } nodeSize := make(map[node.ID]int)
🤖 Fix all issues with AI agents
In @.kiro/specs/drain-capture/design.md:
- Around line 62-67: The fenced code block under "Liveness 状态转换规则" is missing a
language identifier; update the opening fence from ``` to ```text so the block
becomes a text-coded fence containing the three transition lines (Alive →
Draining..., Draining → Stopping..., Alive → Stopping...) to satisfy MD040;
ensure only the fence marker is changed and content remains unchanged.
- Around line 843-907: The markdown uses bold-only lines like "**Validates:
Requirements 2.7, 3.6, ...**" which triggers MD036; update each Property section
(e.g., "Property 1: Draining Node Exclusion from Scheduling", "Property 2:
Single Draining Target Invariant", etc.) to replace the bold-only "Validates:
..." lines with either a proper heading (e.g., "#### Validates" or "Validates:")
or plain paragraph text (e.g., "Validates: Requirements 2.7, 3.6, ...") so they
are not solely bold formatting, and ensure consistency across all Property N
blocks (Properties 1–11) to satisfy markdownlint MD036.
In `@api/v2/api.go`:
- Around line 83-87: The drain endpoints under captureGroup are missing
authentication: add authenticateMiddleware to the middleware chain for the
routes that register DrainCapture and GetDrainStatus so they match other admin
operations; specifically, ensure captureGroup.PUT("/:capture_id/drain", ...) and
captureGroup.GET("/:capture_id/drain", ...) are registered with
authenticateMiddleware (in addition to coordinatorMiddleware) before invoking
the handlers DrainCapture and GetDrainStatus to enforce consistent auth.
In `@api/v2/capture.go`:
- Around line 72-92: The check is comparing targetNode.ID to the local node
(selfInfo.ID) instead of the actual coordinator; update the drain-prevention
logic in the handler to fetch the coordinator via h.server.GetCoordinatorInfo()
(or equivalent) and compare targetNode.ID against coordinator.ID, returning the
same ErrAPIInvalidParam if they match; ensure you handle errors returned by
GetCoordinatorInfo() similarly to the existing SelfInfo() error handling so
failures surface correctly.
In `@cdc-api-v2-capture-drain-cn.md`:
- Around line 279-304: The document uses bolded section titles (e.g., "**HTTP
入口与路由**", "**v2 handler 与 owner API 桥接**", etc.) which triggers markdownlint
MD036; replace each bolded title with a proper Markdown heading (for example "##
HTTP 入口与路由", "## v2 handler 与 owner API 桥接", "## Owner job 队列与 drain 入口", "##
scheduler v3 drain 与 tick 主循环", "## 表迁移消息与状态机") keeping a consistent heading
level and preserving the following bullet lists and function references
(`(*server).startStatusHTTP`, `RegisterRoutes`, `(*OpenAPIV2).drainCapture`,
`(*ownerImpl).Tick`, `(*coordinator).DrainCapture`, `(*Manager).HandleTasks`,
etc.); ensure a blank line before each heading and adjust any nested sections to
use appropriate subheading levels to maintain semantic structure.
In `@coordinator/drain_state.go`:
- Around line 55-79: DrainState currently only updates in-memory fields
(drainingTarget, startTime, initialMaintainerCount, initialDispatcherCount) and
node liveness via nodeManager.SetNodeLiveness, so a coordinator restart loses
the drain record; persist the drain record and liveness and restore it on
startup. Add/use a durable store interface (e.g., EtcdStore) and modify
SetDrainingTarget to write a persistent drain entry (including target ID,
startTime, maintainer/dispatcher counts and liveness state) before—or as part
of—setting the in-memory fields and calling nodeManager.SetNodeLiveness; ensure
writes are durable/checked and handle errors by rolling back in-memory changes.
Implement a startup/hydration path (e.g., DrainState.RecoverFromStore invoked
during coordinator bootstrap) that reads persisted drain entries, reconstructs
DrainState fields and re-applies nodeManager.SetNodeLiveness for each persisted
draining target, and ensure successful drain completion clears the persistent
record (so SetDrainingTarget, any Complete/Cancel methods, and DrainState
recovery logic are updated to maintain persistence).
In `@coordinator/scheduler/balance.go`:
- Around line 78-89: The balance routine must skip balancing when there are no
schedulable nodes to avoid a divide-by-zero in pkgScheduler.CheckBalanceStatus;
after obtaining schedulableNodes from s.nodeManager.GetSchedulableNodes(), add a
guard that if len(schedulableNodes) == 0 you return
now.Add(s.checkBalanceInterval) (i.e., skip calling
pkgScheduler.CheckBalanceStatus and pkgScheduler.Balance). This prevents the
panic and preserves the existing retry scheduling logic.
In `@coordinator/scheduler/drain.go`:
- Around line 152-161: The current allDispatchersMigrated on type drainScheduler
always returns true causing drain to finish before dispatchers leave; change it
to query the dispatcher membership/heartbeat for the given node.ID (e.g., via
the dispatcher registry/heartbeat store or the maintainer discovery API) and
return true only when the live dispatcher count for nodeID is zero; update
allDispatchersMigrated to call the appropriate store/heartbeat method, handle
transient errors (retry/log and return false on uncertainty) and remove the
temporary stub so drain only completes when no dispatchers remain.
In `@docs/design/2026-01-10-dynamic-stream.md`:
- Around line 52-62: The fenced code block showing the runtime structure
(containing Push, pathMap, *pathInfo, stream.addEvent, stream.handleLoop,
pendingQueue, signalQueue, popEvents, handler.Handle and dest) needs a language
identifier to satisfy markdownlint MD040; update the opening fence from ``` to a
tagged fence such as ```text (or ```bash/```go if more appropriate) so the block
becomes ```text ... ``` and leave the existing content unchanged.
In `@pkg/api/util.go`:
- Around line 198-206: The Liveness.String method dereferences *l directly which
breaks the atomic access contract; change the switch to use l.Load() instead of
*l so it reads the value atomically (consistent with other methods like
IsSchedulable). Locate the String method on type Liveness and replace uses of *l
in the switch with the result of l.Load(), preserving the same case labels
(LivenessCaptureAlive, LivenessCaptureStopping, LivenessCaptureDraining) and
default behavior.
In `@pkg/node/node.go`:
- Around line 40-68: The public Liveness.Store method currently permits
arbitrary state changes (including reverting from Draining back to Alive) which
breaks the one-way lifecycle enforced by StoreDraining and DrainComplete; update
Liveness.Store to prevent invalid reversions by either: 1) making Store
unexported (rename to store) and restrict its use to initialization paths only,
or 2) enforce a guarded write that only allows transitioning from a
zero/uninitialized state to a valid initial state (returning false otherwise),
or 3) remove Store entirely and update call sites to use the existing atomic
CompareAndSwap-based methods (StoreDraining, DrainComplete) for state
transitions; adjust all references to Liveness.Store accordingly so transitions
can only occur via the guarded methods.
In `@server/watcher/module_node_manager.go`:
- Around line 219-223: The node liveness map is never cleaned up when nodes are
removed; call ClearNodeLiveness(id) when a node removal is detected to prevent
unbounded growth. Update the node removal path in NodeManager.Tick() (the branch
that currently triggers registered handlers on node removal) or inside the node
change handler to invoke ClearNodeLiveness(id) immediately after detecting the
removed node (using the same node ID passed to the handlers), ensuring the
nodeLiveness entry is deleted alongside any other removal logic.
🧹 Nitpick comments (1)
coordinator/scheduler/drain.go (1)
110-135: Avoid concentrating a batch on one node.
selectDestinationonly considers current workload, so multiple migrations in the same tick can pile onto a single node. Consider accounting for in-flight assignments in this batch.♻️ In-flight–aware selection
- // Generate MoveMaintainer operators in batches - batchCount := 0 + // Generate MoveMaintainer operators in batches + batchCount := 0 + inflight := make(map[node.ID]int) for _, cf := range maintainers { @@ - destNode := s.selectDestination(schedulableNodes) + destNode := s.selectDestination(schedulableNodes, inflight) if destNode == "" { continue } @@ if s.operatorController.AddOperator(op) { batchCount++ + inflight[destNode]++ log.Info("generated move maintainer operator for drain", @@ -func (s *drainScheduler) selectDestination(nodes map[node.ID]*node.Info) node.ID { +func (s *drainScheduler) selectDestination(nodes map[node.ID]*node.Info, inflight map[node.ID]int) node.ID { @@ - if count := nodeTaskSize[id]; count < minCount { + if count := nodeTaskSize[id] + inflight[id]; count < minCount { minCount = count minNode = id } }Also applies to: 164-175
| **Liveness 状态转换规则**: | ||
| ``` | ||
| Alive → Draining:通过 drain API 触发 | ||
| Draining → Stopping:drain 完成(所有任务迁移完成) | ||
| Alive → Stopping:节点优雅关闭(非 drain 场景) | ||
| ``` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a language to fenced code blocks (MD040).
The fenced block lacks a language identifier.
✍️ Suggested fix
-```
+```text
Alive → Draining:通过 drain API 触发
Draining → Stopping:drain 完成(所有任务迁移完成)
Alive → Stopping:节点优雅关闭(非 drain 场景)</details>
<details>
<summary>🧰 Tools</summary>
<details>
<summary>🪛 markdownlint-cli2 (0.18.1)</summary>
63-63: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
</details>
</details>
<details>
<summary>🤖 Prompt for AI Agents</summary>
In @.kiro/specs/drain-capture/design.md around lines 62 - 67, The fenced code
block under "Liveness 状态转换规则" is missing a language identifier; update the
opening fence from totext so the block becomes a text-coded fence
containing the three transition lines (Alive → Draining..., Draining →
Stopping..., Alive → Stopping...) to satisfy MD040; ensure only the fence marker
is changed and content remains unchanged.
</details>
<!-- fingerprinting:phantom:medusa:eagle -->
<!-- This is an auto-generated comment by CodeRabbit -->
| ### Property 1: Draining Node Exclusion from Scheduling | ||
|
|
||
| *For any* scheduling decision (maintainer or dispatcher), when a draining target is set, the draining node SHALL NOT be selected as a destination node. | ||
|
|
||
| **Validates: Requirements 2.7, 3.6, 4.2, 4.5, 5.2, 5.3, 5.4** | ||
|
|
||
| ### Property 2: Single Draining Target Invariant | ||
|
|
||
| *For any* cluster state, there SHALL be at most one draining target at any time. | ||
|
|
||
| **Validates: Requirements 2.3, 2.4** | ||
|
|
||
| ### Property 3: Maintainer-DDLSpan Co-location | ||
|
|
||
| *For any* maintainer, its associated Table Trigger Event Dispatcher (ddlSpan) SHALL always be on the same node as the maintainer. | ||
|
|
||
| **Validates: Architectural constraint** | ||
|
|
||
| ### Property 4: Drain State Transition | ||
|
|
||
| *For any* drain operation, the node liveness SHALL transition from Alive → Draining → Stopping, and never back to Alive (except the “only draining node left” recovery case). | ||
|
|
||
| **Validates: Requirements 9.1, 9.2, 9.3, 9.4, 9.5** | ||
|
|
||
| ### Property 5: Batch Size Constraint | ||
|
|
||
| *For any* batch of migration operators generated by the drain scheduler, the number of concurrent in-progress operators SHALL NOT exceed the configured batch size. | ||
|
|
||
| **Validates: Requirements 3.3** | ||
|
|
||
| ### Property 6: Drain Completion Condition | ||
|
|
||
| *For any* drain operation, the drain SHALL be marked complete only when both maintainer count AND dispatcher count on the draining node are zero. | ||
|
|
||
| **Validates: Requirements 2.9, 3.8** | ||
|
|
||
| ### Property 7: Drain Notification Propagation | ||
|
|
||
| *For any* draining target set in coordinator, the draining capture ID SHALL be propagated to all maintainers via the coordinator→maintainer notification channel within one heartbeat cycle. | ||
|
|
||
| **Validates: Requirements 10.1, 10.2** | ||
|
|
||
| ### Property 8: Workload Balanced Distribution | ||
|
|
||
| *For any* set of maintainers being migrated from a draining node, the destination nodes SHALL be selected based on lowest workload. | ||
|
|
||
| **Validates: Requirements 2.8, 3.5** | ||
|
|
||
| ### Property 9: Draining Node Cannot Become Coordinator (with exception) | ||
|
|
||
| *For any* coordinator election, nodes with Draining or Stopping liveness SHALL NOT be selected as coordinator candidates, EXCEPT when the draining node is the only node left in cluster. | ||
|
|
||
| **Validates: Requirements 9.6, 9.8** | ||
|
|
||
| ### Property 10: Draining Node Crash Cleanup | ||
|
|
||
| *For any* draining node that crashes, the drain state SHALL be cleared and the maintainers SHALL be rescheduled by basic scheduler. | ||
|
|
||
| **Validates: Requirements 9.7** | ||
|
|
||
| ### Property 11: Destination Node Failure Handling | ||
|
|
||
| *For any* destination node failure during maintainer migration, the MoveMaintainer operator SHALL either mark the maintainer as absent (if origin stopped) or convert to add operation (if origin not stopped). | ||
|
|
||
| **Validates: Requirements 8.2** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use headings (or plain text) instead of bold-only lines (MD036).
The **Validates: ...** lines are styled like headings but are just bold text. Convert them to a proper heading or plain text to satisfy markdownlint.
✍️ Example adjustment
-**Validates: Requirements 2.7, 3.6, 4.2, 4.5, 5.2, 5.3, 5.4**
+#### Validates: Requirements 2.7, 3.6, 4.2, 4.5, 5.2, 5.3, 5.4📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| ### Property 1: Draining Node Exclusion from Scheduling | |
| *For any* scheduling decision (maintainer or dispatcher), when a draining target is set, the draining node SHALL NOT be selected as a destination node. | |
| **Validates: Requirements 2.7, 3.6, 4.2, 4.5, 5.2, 5.3, 5.4** | |
| ### Property 2: Single Draining Target Invariant | |
| *For any* cluster state, there SHALL be at most one draining target at any time. | |
| **Validates: Requirements 2.3, 2.4** | |
| ### Property 3: Maintainer-DDLSpan Co-location | |
| *For any* maintainer, its associated Table Trigger Event Dispatcher (ddlSpan) SHALL always be on the same node as the maintainer. | |
| **Validates: Architectural constraint** | |
| ### Property 4: Drain State Transition | |
| *For any* drain operation, the node liveness SHALL transition from Alive → Draining → Stopping, and never back to Alive (except the “only draining node left” recovery case). | |
| **Validates: Requirements 9.1, 9.2, 9.3, 9.4, 9.5** | |
| ### Property 5: Batch Size Constraint | |
| *For any* batch of migration operators generated by the drain scheduler, the number of concurrent in-progress operators SHALL NOT exceed the configured batch size. | |
| **Validates: Requirements 3.3** | |
| ### Property 6: Drain Completion Condition | |
| *For any* drain operation, the drain SHALL be marked complete only when both maintainer count AND dispatcher count on the draining node are zero. | |
| **Validates: Requirements 2.9, 3.8** | |
| ### Property 7: Drain Notification Propagation | |
| *For any* draining target set in coordinator, the draining capture ID SHALL be propagated to all maintainers via the coordinator→maintainer notification channel within one heartbeat cycle. | |
| **Validates: Requirements 10.1, 10.2** | |
| ### Property 8: Workload Balanced Distribution | |
| *For any* set of maintainers being migrated from a draining node, the destination nodes SHALL be selected based on lowest workload. | |
| **Validates: Requirements 2.8, 3.5** | |
| ### Property 9: Draining Node Cannot Become Coordinator (with exception) | |
| *For any* coordinator election, nodes with Draining or Stopping liveness SHALL NOT be selected as coordinator candidates, EXCEPT when the draining node is the only node left in cluster. | |
| **Validates: Requirements 9.6, 9.8** | |
| ### Property 10: Draining Node Crash Cleanup | |
| *For any* draining node that crashes, the drain state SHALL be cleared and the maintainers SHALL be rescheduled by basic scheduler. | |
| **Validates: Requirements 9.7** | |
| ### Property 11: Destination Node Failure Handling | |
| *For any* destination node failure during maintainer migration, the MoveMaintainer operator SHALL either mark the maintainer as absent (if origin stopped) or convert to add operation (if origin not stopped). | |
| **Validates: Requirements 8.2** | |
| ### Property 1: Draining Node Exclusion from Scheduling | |
| *For any* scheduling decision (maintainer or dispatcher), when a draining target is set, the draining node SHALL NOT be selected as a destination node. | |
| #### Validates: Requirements 2.7, 3.6, 4.2, 4.5, 5.2, 5.3, 5.4 | |
| ### Property 2: Single Draining Target Invariant | |
| *For any* cluster state, there SHALL be at most one draining target at any time. | |
| #### Validates: Requirements 2.3, 2.4 | |
| ### Property 3: Maintainer-DDLSpan Co-location | |
| *For any* maintainer, its associated Table Trigger Event Dispatcher (ddlSpan) SHALL always be on the same node as the maintainer. | |
| #### Validates: Architectural constraint | |
| ### Property 4: Drain State Transition | |
| *For any* drain operation, the node liveness SHALL transition from Alive → Draining → Stopping, and never back to Alive (except the "only draining node left" recovery case). | |
| #### Validates: Requirements 9.1, 9.2, 9.3, 9.4, 9.5 | |
| ### Property 5: Batch Size Constraint | |
| *For any* batch of migration operators generated by the drain scheduler, the number of concurrent in-progress operators SHALL NOT exceed the configured batch size. | |
| #### Validates: Requirements 3.3 | |
| ### Property 6: Drain Completion Condition | |
| *For any* drain operation, the drain SHALL be marked complete only when both maintainer count AND dispatcher count on the draining node are zero. | |
| #### Validates: Requirements 2.9, 3.8 | |
| ### Property 7: Drain Notification Propagation | |
| *For any* draining target set in coordinator, the draining capture ID SHALL be propagated to all maintainers via the coordinator→maintainer notification channel within one heartbeat cycle. | |
| #### Validates: Requirements 10.1, 10.2 | |
| ### Property 8: Workload Balanced Distribution | |
| *For any* set of maintainers being migrated from a draining node, the destination nodes SHALL be selected based on lowest workload. | |
| #### Validates: Requirements 2.8, 3.5 | |
| ### Property 9: Draining Node Cannot Become Coordinator (with exception) | |
| *For any* coordinator election, nodes with Draining or Stopping liveness SHALL NOT be selected as coordinator candidates, EXCEPT when the draining node is the only node left in cluster. | |
| #### Validates: Requirements 9.6, 9.8 | |
| ### Property 10: Draining Node Crash Cleanup | |
| *For any* draining node that crashes, the drain state SHALL be cleared and the maintainers SHALL be rescheduled by basic scheduler. | |
| #### Validates: Requirements 9.7 | |
| ### Property 11: Destination Node Failure Handling | |
| *For any* destination node failure during maintainer migration, the MoveMaintainer operator SHALL either mark the maintainer as absent (if origin stopped) or convert to add operation (if origin not stopped). | |
| #### Validates: Requirements 8.2 |
🧰 Tools
🪛 markdownlint-cli2 (0.18.1)
847-847: Emphasis used instead of a heading
(MD036, no-emphasis-as-heading)
853-853: Emphasis used instead of a heading
(MD036, no-emphasis-as-heading)
859-859: Emphasis used instead of a heading
(MD036, no-emphasis-as-heading)
865-865: Emphasis used instead of a heading
(MD036, no-emphasis-as-heading)
871-871: Emphasis used instead of a heading
(MD036, no-emphasis-as-heading)
877-877: Emphasis used instead of a heading
(MD036, no-emphasis-as-heading)
883-883: Emphasis used instead of a heading
(MD036, no-emphasis-as-heading)
889-889: Emphasis used instead of a heading
(MD036, no-emphasis-as-heading)
895-895: Emphasis used instead of a heading
(MD036, no-emphasis-as-heading)
901-901: Emphasis used instead of a heading
(MD036, no-emphasis-as-heading)
907-907: Emphasis used instead of a heading
(MD036, no-emphasis-as-heading)
🤖 Prompt for AI Agents
In @.kiro/specs/drain-capture/design.md around lines 843 - 907, The markdown
uses bold-only lines like "**Validates: Requirements 2.7, 3.6, ...**" which
triggers MD036; update each Property section (e.g., "Property 1: Draining Node
Exclusion from Scheduling", "Property 2: Single Draining Target Invariant",
etc.) to replace the bold-only "Validates: ..." lines with either a proper
heading (e.g., "#### Validates" or "Validates:") or plain paragraph text (e.g.,
"Validates: Requirements 2.7, 3.6, ...") so they are not solely bold formatting,
and ensure consistency across all Property N blocks (Properties 1–11) to satisfy
markdownlint MD036.
| captureGroup := v2.Group("/captures") | ||
| captureGroup.Use(coordinatorMiddleware) | ||
| captureGroup.GET("", api.ListCaptures) | ||
| captureGroup.PUT("/:capture_id/drain", api.DrainCapture) | ||
| captureGroup.GET("/:capture_id/drain", api.GetDrainStatus) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
git ls-files | grep -E "api.*\.go$" | head -20Repository: pingcap/ticdc
Length of output: 517
🏁 Script executed:
find . -type f -name "api.go" | head -10Repository: pingcap/ticdc
Length of output: 88
🏁 Script executed:
cat -n api/v2/api.go | sed -n '75,95p'Repository: pingcap/ticdc
Length of output: 1443
🏁 Script executed:
rg -n "authenticateMiddleware" api/v2/api.goRepository: pingcap/ticdc
Length of output: 1907
🏁 Script executed:
rg -n "coordinatorMiddleware" api/v2/api.goRepository: pingcap/ticdc
Length of output: 1564
🏁 Script executed:
cat -n api/middleware/authenticate_middleware.goRepository: pingcap/ticdc
Length of output: 5410
🏁 Script executed:
cat -n api/v2/capture.go | head -150Repository: pingcap/ticdc
Length of output: 5658
Add authentication to drain endpoints for consistency and security.
The DrainCapture and GetDrainStatus routes lack authenticateMiddleware, while other admin operations in the changefeed group consistently use it. Since drain is a mutating admin operation and the handlers do not perform internal auth checks, these routes should be protected by the same authentication middleware applied elsewhere.
Recommended change
captureGroup := v2.Group("/captures")
captureGroup.Use(coordinatorMiddleware)
captureGroup.GET("", api.ListCaptures)
- captureGroup.PUT("/:capture_id/drain", api.DrainCapture)
- captureGroup.GET("/:capture_id/drain", api.GetDrainStatus)
+ captureGroup.PUT("/:capture_id/drain", authenticateMiddleware, api.DrainCapture)
+ captureGroup.GET("/:capture_id/drain", authenticateMiddleware, api.GetDrainStatus)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| captureGroup := v2.Group("/captures") | |
| captureGroup.Use(coordinatorMiddleware) | |
| captureGroup.GET("", api.ListCaptures) | |
| captureGroup.PUT("/:capture_id/drain", api.DrainCapture) | |
| captureGroup.GET("/:capture_id/drain", api.GetDrainStatus) | |
| captureGroup := v2.Group("/captures") | |
| captureGroup.Use(coordinatorMiddleware) | |
| captureGroup.GET("", api.ListCaptures) | |
| captureGroup.PUT("/:capture_id/drain", authenticateMiddleware, api.DrainCapture) | |
| captureGroup.GET("/:capture_id/drain", authenticateMiddleware, api.GetDrainStatus) |
🤖 Prompt for AI Agents
In `@api/v2/api.go` around lines 83 - 87, The drain endpoints under captureGroup
are missing authentication: add authenticateMiddleware to the middleware chain
for the routes that register DrainCapture and GetDrainStatus so they match other
admin operations; specifically, ensure captureGroup.PUT("/:capture_id/drain",
...) and captureGroup.GET("/:capture_id/drain", ...) are registered with
authenticateMiddleware (in addition to coordinatorMiddleware) before invoking
the handlers DrainCapture and GetDrainStatus to enforce consistent auth.
| // Check if current node is coordinator | ||
| selfInfo, err := h.server.SelfInfo() | ||
| if err != nil { | ||
| _ = c.Error(err) | ||
| return | ||
| } | ||
|
|
||
| nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName) | ||
|
|
||
| // Validate target capture exists | ||
| targetNode := nodeManager.GetNodeInfo(node.ID(captureID)) | ||
| if targetNode == nil { | ||
| _ = c.Error(errors.ErrCaptureNotExist.GenWithStackByArgs(captureID)) | ||
| return | ||
| } | ||
|
|
||
| // Check if target is coordinator | ||
| if targetNode.ID == selfInfo.ID { | ||
| _ = c.Error(errors.ErrAPIInvalidParam.GenWithStackByArgs("cannot drain coordinator node")) | ||
| return | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# Search for coordinator ID related code
rg -n "GetCoordinatorID|coordinatorID|CoordinatorID" -g '*.go' | head -30Repository: pingcap/ticdc
Length of output: 2672
🏁 Script executed:
# Search for OpenAPI v2 handler registration and routing
rg -n "OpenAPIV2|/api/v2" -g '*.go' | head -40Repository: pingcap/ticdc
Length of output: 2940
🏁 Script executed:
# Locate the DrainCapture handler implementation
rg -n "DrainCapture|drain" -g '*.go' api/v2/ | head -20Repository: pingcap/ticdc
Length of output: 1679
🏁 Script executed:
# Find where the coordinator information is stored/accessed
rg -n "selfInfo|GetSelfInfo|coordinator" -g '*.go' | grep -i "coordinator\|selfinfo" | head -30Repository: pingcap/ticdc
Length of output: 2940
🏁 Script executed:
# Read the full DrainCapture handler implementation
cat -n api/v2/capture.go | sed -n '55,120p'Repository: pingcap/ticdc
Length of output: 2477
🏁 Script executed:
# Check NodeManager methods for getting coordinator ID
cat -n server/watcher/module_node_manager.go | sed -n '1,150p'Repository: pingcap/ticdc
Length of output: 5090
🏁 Script executed:
# Check how handlers are registered and if routes can reach non-coordinator nodes
cat -n api/http.go | head -50Repository: pingcap/ticdc
Length of output: 2093
🏁 Script executed:
# Look for the SelfInfo method and how it's used
rg -n "func.*SelfInfo|GetSelfInfo" -g '*.go' -A 5Repository: pingcap/ticdc
Length of output: 848
🏁 Script executed:
# Search for all public methods in NodeManager
cat -n server/watcher/module_node_manager.go | sed -n '145,250p'Repository: pingcap/ticdc
Length of output: 4092
🏁 Script executed:
# Look for any getter method for coordinator ID
rg -n "func.*NodeManager.*Coordinator|func.*GetOwner|func.*GetCoordinator" -A 3 server/watcher/module_node_manager.goRepository: pingcap/ticdc
Length of output: 251
🏁 Script executed:
# Check if there's a proper way to get coordinator ID from the manager
rg -n "coordinatorID" server/watcher/module_node_manager.goRepository: pingcap/ticdc
Length of output: 250
🏁 Script executed:
# Check how other handlers validate coordinator operations
rg -n "GetCoordinator\|coordinator\|Coordinator" api/v2/ -A 2 | head -50Repository: pingcap/ticdc
Length of output: 39
🏁 Script executed:
# Search for getter method for coordinator ID in NodeManager
rg -n "GetCoordinator" server/watcher/module_node_manager.goRepository: pingcap/ticdc
Length of output: 209
🏁 Script executed:
# Look for how other code accesses the coordinator ID
rg -n "GetOwnerID|coordinatorID\\.Load" -A 2 -B 2Repository: pingcap/ticdc
Length of output: 4381
🏁 Script executed:
# Check what server.GetCoordinator() returns and how it validates
cat -n server/server.go | sed -n '500,550p'Repository: pingcap/ticdc
Length of output: 1742
🏁 Script executed:
# Search for any validation of coordinator in handlers
rg -n "is.*coordinator|coordinator.*check" -i api/v2/ -B 2 -A 2Repository: pingcap/ticdc
Length of output: 4497
Use the actual coordinator ID for the "cannot drain coordinator" check.
The check at line 89 compares the target node to the local node (selfInfo.ID), not the actual coordinator. On non-coordinator nodes, this allows draining the coordinator while blocking draining the local node. Compare against the actual coordinator ID instead—obtain it via h.server.GetCoordinatorInfo() (returns the coordinator's node info) or directly query the coordinator ID using the etcd client.
🤖 Prompt for AI Agents
In `@api/v2/capture.go` around lines 72 - 92, The check is comparing targetNode.ID
to the local node (selfInfo.ID) instead of the actual coordinator; update the
drain-prevention logic in the handler to fetch the coordinator via
h.server.GetCoordinatorInfo() (or equivalent) and compare targetNode.ID against
coordinator.ID, returning the same ErrAPIInvalidParam if they match; ensure you
handle errors returned by GetCoordinatorInfo() similarly to the existing
SelfInfo() error handling so failures surface correctly.
| **HTTP 入口与路由** | ||
| - `cdc/server/server.go`: `(*server).startStatusHTTP` | ||
| - `cdc/http.go`: `RegisterRoutes` | ||
| - `cdc/api/v2/api.go`: `RegisterOpenAPIV2Routes` | ||
| - `cdc/api/middleware/middleware.go`: `ForwardToOwnerMiddleware` / `CheckServerReadyMiddleware` / `ErrorHandleMiddleware` | ||
|
|
||
| **v2 handler 与 owner API 桥接** | ||
| - `cdc/api/v2/capture.go`: `(*OpenAPIV2).drainCapture` | ||
| - `cdc/api/util.go`: `HandleOwnerDrainCapture` / `ForwardToOwner` / `ForwardToCapture` | ||
|
|
||
| **Owner job 队列与 drain 入口** | ||
| - `cdc/owner/owner.go`: `(*ownerImpl).Tick` / `handleJobs` / `DrainCapture` / `handleDrainCaptures` / `pushOwnerJob` | ||
| - `cdc/owner/status_provider.go`: `(*ownerStatusProvider).GetCaptures` / `sendQueryToOwner` | ||
|
|
||
| **scheduler v3 drain 与 tick 主循环** | ||
| - `cdc/scheduler/internal/v3/coordinator.go`: `(*coordinator).DrainCapture` / `Tick` / `poll` | ||
| - `cdc/scheduler/internal/v3/scheduler/scheduler_manager.go`: `(*Manager).DrainCapture` / `DrainingTarget` / `Schedule` | ||
| - `cdc/scheduler/internal/v3/scheduler/scheduler_drain_capture.go`: `(*drainCaptureScheduler).Schedule` | ||
| - `cdc/scheduler/internal/v3/member/capture_manager.go`: `(*CaptureManager).Tick`(Heartbeat.IsStopping) | ||
|
|
||
| **表迁移消息与状态机** | ||
| - `cdc/scheduler/internal/v3/replication/replication_manager.go`: `(*Manager).HandleTasks` | ||
| - `cdc/scheduler/internal/v3/replication/replication_set.go`: `handleMoveTable` / `poll` / `pollOnPrepare` / `pollOnCommit` | ||
| - `cdc/scheduler/internal/v3/agent/agent.go`: `handleMessageHeartbeat` / `handleMessageDispatchTableRequest` | ||
| - `cdc/scheduler/internal/v3/agent/table.go`: `tableSpanManager.poll` / `tableSpan.handleAddTableTask` / `handleRemoveTableTask` | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use headings instead of emphasis to satisfy markdownlint (MD036).
The bold section titles are flagged as “emphasis used instead of a heading.” Consider switching them to proper headings.
✅ Example fix
-**HTTP 入口与路由**
+### HTTP 入口与路由
@@
-**v2 handler 与 owner API 桥接**
+### v2 handler 与 owner API 桥接
@@
-**Owner job 队列与 drain 入口**
+### Owner job 队列与 drain 入口
@@
-**scheduler v3 drain 与 tick 主循环**
+### scheduler v3 drain 与 tick 主循环
@@
-**表迁移消息与状态机**
+### 表迁移消息与状态机📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| **HTTP 入口与路由** | |
| - `cdc/server/server.go`: `(*server).startStatusHTTP` | |
| - `cdc/http.go`: `RegisterRoutes` | |
| - `cdc/api/v2/api.go`: `RegisterOpenAPIV2Routes` | |
| - `cdc/api/middleware/middleware.go`: `ForwardToOwnerMiddleware` / `CheckServerReadyMiddleware` / `ErrorHandleMiddleware` | |
| **v2 handler 与 owner API 桥接** | |
| - `cdc/api/v2/capture.go`: `(*OpenAPIV2).drainCapture` | |
| - `cdc/api/util.go`: `HandleOwnerDrainCapture` / `ForwardToOwner` / `ForwardToCapture` | |
| **Owner job 队列与 drain 入口** | |
| - `cdc/owner/owner.go`: `(*ownerImpl).Tick` / `handleJobs` / `DrainCapture` / `handleDrainCaptures` / `pushOwnerJob` | |
| - `cdc/owner/status_provider.go`: `(*ownerStatusProvider).GetCaptures` / `sendQueryToOwner` | |
| **scheduler v3 drain 与 tick 主循环** | |
| - `cdc/scheduler/internal/v3/coordinator.go`: `(*coordinator).DrainCapture` / `Tick` / `poll` | |
| - `cdc/scheduler/internal/v3/scheduler/scheduler_manager.go`: `(*Manager).DrainCapture` / `DrainingTarget` / `Schedule` | |
| - `cdc/scheduler/internal/v3/scheduler/scheduler_drain_capture.go`: `(*drainCaptureScheduler).Schedule` | |
| - `cdc/scheduler/internal/v3/member/capture_manager.go`: `(*CaptureManager).Tick`(Heartbeat.IsStopping) | |
| **表迁移消息与状态机** | |
| - `cdc/scheduler/internal/v3/replication/replication_manager.go`: `(*Manager).HandleTasks` | |
| - `cdc/scheduler/internal/v3/replication/replication_set.go`: `handleMoveTable` / `poll` / `pollOnPrepare` / `pollOnCommit` | |
| - `cdc/scheduler/internal/v3/agent/agent.go`: `handleMessageHeartbeat` / `handleMessageDispatchTableRequest` | |
| - `cdc/scheduler/internal/v3/agent/table.go`: `tableSpanManager.poll` / `tableSpan.handleAddTableTask` / `handleRemoveTableTask` | |
| ### HTTP 入口与路由 | |
| - `cdc/server/server.go`: `(*server).startStatusHTTP` | |
| - `cdc/http.go`: `RegisterRoutes` | |
| - `cdc/api/v2/api.go`: `RegisterOpenAPIV2Routes` | |
| - `cdc/api/middleware/middleware.go`: `ForwardToOwnerMiddleware` / `CheckServerReadyMiddleware` / `ErrorHandleMiddleware` | |
| ### v2 handler 与 owner API 桥接 | |
| - `cdc/api/v2/capture.go`: `(*OpenAPIV2).drainCapture` | |
| - `cdc/api/util.go`: `HandleOwnerDrainCapture` / `ForwardToOwner` / `ForwardToCapture` | |
| ### Owner job 队列与 drain 入口 | |
| - `cdc/owner/owner.go`: `(*ownerImpl).Tick` / `handleJobs` / `DrainCapture` / `handleDrainCaptures` / `pushOwnerJob` | |
| - `cdc/owner/status_provider.go`: `(*ownerStatusProvider).GetCaptures` / `sendQueryToOwner` | |
| ### scheduler v3 drain 与 tick 主循环 | |
| - `cdc/scheduler/internal/v3/coordinator.go`: `(*coordinator).DrainCapture` / `Tick` / `poll` | |
| - `cdc/scheduler/internal/v3/scheduler/scheduler_manager.go`: `(*Manager).DrainCapture` / `DrainingTarget` / `Schedule` | |
| - `cdc/scheduler/internal/v3/scheduler/scheduler_drain_capture.go`: `(*drainCaptureScheduler).Schedule` | |
| - `cdc/scheduler/internal/v3/member/capture_manager.go`: `(*CaptureManager).Tick`(Heartbeat.IsStopping) | |
| ### 表迁移消息与状态机 | |
| - `cdc/scheduler/internal/v3/replication/replication_manager.go`: `(*Manager).HandleTasks` | |
| - `cdc/scheduler/internal/v3/replication/replication_set.go`: `handleMoveTable` / `poll` / `pollOnPrepare` / `pollOnCommit` | |
| - `cdc/scheduler/internal/v3/agent/agent.go`: `handleMessageHeartbeat` / `handleMessageDispatchTableRequest` | |
| - `cdc/scheduler/internal/v3/agent/table.go`: `tableSpanManager.poll` / `tableSpan.handleAddTableTask` / `handleRemoveTableTask` |
🧰 Tools
🪛 markdownlint-cli2 (0.18.1)
279-279: Emphasis used instead of a heading
(MD036, no-emphasis-as-heading)
285-285: Emphasis used instead of a heading
(MD036, no-emphasis-as-heading)
289-289: Emphasis used instead of a heading
(MD036, no-emphasis-as-heading)
293-293: Emphasis used instead of a heading
(MD036, no-emphasis-as-heading)
299-299: Emphasis used instead of a heading
(MD036, no-emphasis-as-heading)
🤖 Prompt for AI Agents
In `@cdc-api-v2-capture-drain-cn.md` around lines 279 - 304, The document uses
bolded section titles (e.g., "**HTTP 入口与路由**", "**v2 handler 与 owner API 桥接**",
etc.) which triggers markdownlint MD036; replace each bolded title with a proper
Markdown heading (for example "## HTTP 入口与路由", "## v2 handler 与 owner API 桥接",
"## Owner job 队列与 drain 入口", "## scheduler v3 drain 与 tick 主循环", "## 表迁移消息与状态机")
keeping a consistent heading level and preserving the following bullet lists and
function references (`(*server).startStatusHTTP`, `RegisterRoutes`,
`(*OpenAPIV2).drainCapture`, `(*ownerImpl).Tick`, `(*coordinator).DrainCapture`,
`(*Manager).HandleTasks`, etc.); ensure a blank line before each heading and
adjust any nested sections to use appropriate subheading levels to maintain
semantic structure.
| // allDispatchersMigrated checks if all dispatchers have been migrated from the draining node. | ||
| // Currently, this is a simplified implementation that returns true when all maintainers | ||
| // have been migrated. The actual dispatcher migration is handled by the new Maintainer | ||
| // after it bootstraps and discovers dispatchers on the draining node. | ||
| // TODO: Implement proper dispatcher count tracking via heartbeat. | ||
| func (s *drainScheduler) allDispatchersMigrated(nodeID node.ID) bool { | ||
| // For now, we consider dispatchers migrated when all maintainers are migrated. | ||
| // The new Maintainer will handle dispatcher migration after bootstrap. | ||
| // This is a conservative approach - the drain will complete when maintainers are done. | ||
| return true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Drain can complete before dispatchers migrate.
allDispatchersMigrated always returns true, so the coordinator will clear drain state as soon as maintainers move—even if dispatchers still exist on the draining node. That violates the drain completion contract and can lead to premature Stopping.
🤖 Prompt for AI Agents
In `@coordinator/scheduler/drain.go` around lines 152 - 161, The current
allDispatchersMigrated on type drainScheduler always returns true causing drain
to finish before dispatchers leave; change it to query the dispatcher
membership/heartbeat for the given node.ID (e.g., via the dispatcher
registry/heartbeat store or the maintainer discovery API) and return true only
when the live dispatcher count for nodeID is zero; update allDispatchersMigrated
to call the appropriate store/heartbeat method, handle transient errors
(retry/log and return false on uncertainty) and remove the temporary stub so
drain only completes when no dispatchers remain.
| ``` | ||
| Push(path, event) | ||
| -> pathMap[ path ] => *pathInfo{stream, dest, pendingQueue, ...} | ||
| -> stream.addEvent(eventWrap{event, pathInfo, ...}) | ||
|
|
||
| stream.handleLoop() // 每个 stream 一个 goroutine | ||
| -> 把 eventWrap 写入对应 path 的 pendingQueue | ||
| -> 从全局 signalQueue 选一个 path | ||
| -> popEvents() 得到 (events[], pathInfo) | ||
| -> handler.Handle(dest, events...) | ||
| ``` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a language identifier to the fenced code block (markdownlint MD040).
The runtime-structure code fence lacks a language tag; markdownlint will flag this.
✍️ Suggested fix
-```
+```text
Push(path, event)
-> pathMap[ path ] => *pathInfo{stream, dest, pendingQueue, ...}
-> stream.addEvent(eventWrap{event, pathInfo, ...})
stream.handleLoop() // 每个 stream 一个 goroutine
-> 把 eventWrap 写入对应 path 的 pendingQueue
-> 从全局 signalQueue 选一个 path
-> popEvents() 得到 (events[], pathInfo)
-> handler.Handle(dest, events...)
-```
+```📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| ``` | |
| Push(path, event) | |
| -> pathMap[ path ] => *pathInfo{stream, dest, pendingQueue, ...} | |
| -> stream.addEvent(eventWrap{event, pathInfo, ...}) | |
| stream.handleLoop() // 每个 stream 一个 goroutine | |
| -> 把 eventWrap 写入对应 path 的 pendingQueue | |
| -> 从全局 signalQueue 选一个 path | |
| -> popEvents() 得到 (events[], pathInfo) | |
| -> handler.Handle(dest, events...) | |
| ``` |
🧰 Tools
🪛 markdownlint-cli2 (0.18.1)
52-52: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
🤖 Prompt for AI Agents
In `@docs/design/2026-01-10-dynamic-stream.md` around lines 52 - 62, The fenced
code block showing the runtime structure (containing Push, pathMap, *pathInfo,
stream.addEvent, stream.handleLoop, pendingQueue, signalQueue, popEvents,
handler.Handle and dest) needs a language identifier to satisfy markdownlint
MD040; update the opening fence from ``` to a tagged fence such as ```text (or
```bash/```go if more appropriate) so the block becomes ```text ... ``` and
leave the existing content unchanged.
| func (l *Liveness) String() string { | ||
| switch *l { | ||
| case LivenessCaptureAlive: | ||
| return "Alive" | ||
| case LivenessCaptureStopping: | ||
| return "Stopping" | ||
| case LivenessCaptureDraining: | ||
| return "Draining" | ||
| default: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# Find the actual file and examine it
fd -t f "util.go" | grep -E "pkg/api"Repository: pingcap/ticdc
Length of output: 72
🏁 Script executed:
# Find util.go in pkg/api
git ls-files "pkg/api/util.go"Repository: pingcap/ticdc
Length of output: 72
🏁 Script executed:
# Read the file to understand Liveness type
cat "pkg/api/util.go" | head -250Repository: pingcap/ticdc
Length of output: 6549
Use atomic Load in Liveness.String for consistency and thread safety.
The String() method directly dereferences *l without synchronization, which violates the atomic access contract of the Liveness type. Other methods like IsSchedulable() correctly use Load(). Update the switch statement to use l.Load() instead.
Suggested fix
func (l *Liveness) String() string {
- switch *l {
+ switch l.Load() {
case LivenessCaptureAlive:
return "Alive"
case LivenessCaptureStopping:
return "Stopping"
case LivenessCaptureDraining:
return "Draining"
default:
return "unknown"
}
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func (l *Liveness) String() string { | |
| switch *l { | |
| case LivenessCaptureAlive: | |
| return "Alive" | |
| case LivenessCaptureStopping: | |
| return "Stopping" | |
| case LivenessCaptureDraining: | |
| return "Draining" | |
| default: | |
| func (l *Liveness) String() string { | |
| switch l.Load() { | |
| case LivenessCaptureAlive: | |
| return "Alive" | |
| case LivenessCaptureStopping: | |
| return "Stopping" | |
| case LivenessCaptureDraining: | |
| return "Draining" | |
| default: | |
| return "unknown" | |
| } | |
| } |
🤖 Prompt for AI Agents
In `@pkg/api/util.go` around lines 198 - 206, The Liveness.String method
dereferences *l directly which breaks the atomic access contract; change the
switch to use l.Load() instead of *l so it reads the value atomically
(consistent with other methods like IsSchedulable). Locate the String method on
type Liveness and replace uses of *l in the switch with the result of l.Load(),
preserving the same case labels (LivenessCaptureAlive, LivenessCaptureStopping,
LivenessCaptureDraining) and default behavior.
| // Load atomically loads the liveness value. | ||
| func (l *Liveness) Load() Liveness { | ||
| return Liveness(atomic.LoadInt32((*int32)(l))) | ||
| } | ||
|
|
||
| // Store atomically stores the liveness value. | ||
| func (l *Liveness) Store(val Liveness) { | ||
| atomic.StoreInt32((*int32)(l), int32(val)) | ||
| } | ||
|
|
||
| // IsSchedulable returns true if the node can accept new workloads. | ||
| // Returns false if node is Draining or Stopping. | ||
| func (l *Liveness) IsSchedulable() bool { | ||
| return l.Load() == LivenessCaptureAlive | ||
| } | ||
|
|
||
| // StoreDraining sets liveness to Draining. Returns true if successful. | ||
| // Can only transition from Alive to Draining. | ||
| func (l *Liveness) StoreDraining() bool { | ||
| return atomic.CompareAndSwapInt32( | ||
| (*int32)(l), int32(LivenessCaptureAlive), int32(LivenessCaptureDraining)) | ||
| } | ||
|
|
||
| // DrainComplete transitions from Draining to Stopping. | ||
| // Returns true if successful. | ||
| func (l *Liveness) DrainComplete() bool { | ||
| return atomic.CompareAndSwapInt32( | ||
| (*int32)(l), int32(LivenessCaptureDraining), int32(LivenessCaptureStopping)) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Find all .Store( call sites in Go files to audit Liveness usage
rg -n --type=go '\.Store\(' -A 2 -B 2Repository: pingcap/ticdc
Length of output: 50370
🏁 Script executed:
#!/bin/bash
# Search for Liveness.Store calls specifically in Go files
rg -n 'Liveness.*\.Store\(' --type=go -A 3 -B 2Repository: pingcap/ticdc
Length of output: 490
🏁 Script executed:
#!/bin/bash
# Examine the SetNodeLiveness method and its callers
rg -n "SetNodeLiveness" --type=go -A 5 -B 3Repository: pingcap/ticdc
Length of output: 6475
🏁 Script executed:
#!/bin/bash
# Examine the drain_state.go file to understand the lifecycle
cat -n coordinator/drain_state.go | head -150Repository: pingcap/ticdc
Length of output: 5840
🏁 Script executed:
#!/bin/bash
# Find all SetNodeLiveness calls with their arguments to see what transitions happen
rg -n "SetNodeLiveness.*Alive|SetNodeLiveness.*Draining|SetNodeLiveness.*Stopping" --type=goRepository: pingcap/ticdc
Length of output: 1533
🏁 Script executed:
#!/bin/bash
# View the Liveness type and its methods in pkg/node/node.go
cat -n pkg/node/node.go | sed -n '1,100p'Repository: pingcap/ticdc
Length of output: 3491
Constrain Liveness.Store to prevent invalid state reversions.
Store() allows arbitrary state transitions, including reverting from Draining back to Alive (seen in coordinator/drain_state.go:128), which violates the one-way drain lifecycle enforced by the guarded StoreDraining() and DrainComplete() methods. Either restrict Store() to initialization only, make it unexported, or remove it in favor of explicit guarded transition methods to prevent accidental "revive" transitions in production code.
🤖 Prompt for AI Agents
In `@pkg/node/node.go` around lines 40 - 68, The public Liveness.Store method
currently permits arbitrary state changes (including reverting from Draining
back to Alive) which breaks the one-way lifecycle enforced by StoreDraining and
DrainComplete; update Liveness.Store to prevent invalid reversions by either: 1)
making Store unexported (rename to store) and restrict its use to initialization
paths only, or 2) enforce a guarded write that only allows transitioning from a
zero/uninitialized state to a valid initial state (returning false otherwise),
or 3) remove Store entirely and update call sites to use the existing atomic
CompareAndSwap-based methods (StoreDraining, DrainComplete) for state
transitions; adjust all references to Liveness.Store accordingly so transitions
can only occur via the guarded methods.
| // ClearNodeLiveness removes the liveness tracking for a node. | ||
| // Called when a node is removed from the cluster. | ||
| func (c *NodeManager) ClearNodeLiveness(id node.ID) { | ||
| c.nodeLiveness.Delete(id) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
rg -n "ClearNodeLiveness" --type goRepository: pingcap/ticdc
Length of output: 430
🏁 Script executed:
rg -n "removeNode|deleteNode|node.*removed|node.*delete" --type go -A 3 server/watcher/Repository: pingcap/ticdc
Length of output: 358
🏁 Script executed:
rg -n "NodeManager\{|NewNodeManager|onNode|nodeUpdate" --type go -B 2 -A 5 server/watcher/Repository: pingcap/ticdc
Length of output: 2063
🏁 Script executed:
cat -n server/watcher/module_node_manager.go | head -150Repository: pingcap/ticdc
Length of output: 5090
🏁 Script executed:
cat -n server/watcher/module_node_manager.go | tail -120Repository: pingcap/ticdc
Length of output: 4514
🏁 Script executed:
# Check if there's any cleanup in node change handlers or elsewhere
rg -n "nodeLiveness|liveness.*Delete|Delete.*liveness" --type go -B 2 -A 2Repository: pingcap/ticdc
Length of output: 1674
🏁 Script executed:
# Check if ClearNodeLiveness is called outside of tests
rg -n "ClearNodeLiveness" --type go -B 1 -A 1 | grep -v "test"Repository: pingcap/ticdc
Length of output: 468
Add node liveness cleanup to the node removal handler. The ClearNodeLiveness() method exists but is never called when nodes are actually removed. The Tick() method detects node removal (lines 110–114) but only triggers registered handlers without clearing stale liveness entries. This causes nodeLiveness to grow unboundedly. Call ClearNodeLiveness(id) in the node change handler or directly after detecting a node removal in Tick().
🤖 Prompt for AI Agents
In `@server/watcher/module_node_manager.go` around lines 219 - 223, The node
liveness map is never cleaned up when nodes are removed; call
ClearNodeLiveness(id) when a node removal is detected to prevent unbounded
growth. Update the node removal path in NodeManager.Tick() (the branch that
currently triggers registered handlers on node removal) or inside the node
change handler to invoke ClearNodeLiveness(id) immediately after detecting the
removed node (using the same node ID passed to the handlers), ensuring the
nodeLiveness entry is deleted alongside any other removal logic.
What problem does this PR solve?
Issue Number: close #xxx
What is changed and how it works?
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note
Summary by CodeRabbit
Release Notes
PUTandGET /api/v2/captures/{capture_id}/drain) to gracefully migrate workloads off nodes✏️ Tip: You can customize this high-level summary in your review settings.