-
Notifications
You must be signed in to change notification settings - Fork 38
[DNM] maintainer: kafka sink report transient error and just recreate the dispatcher by force move operator #4139
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[DNM] maintainer: kafka sink report transient error and just recreate the dispatcher by force move operator #4139
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds end-to-end recoverable Kafka error handling: transient Kafka errors are detected in the producer, emitted as ErrorEvent, collected by DispatcherManager and converted into RecoverDispatcherRequests, then handled by Maintainer which decides per-dispatcher restart, backoff, or downgrade to fatal and may schedule RestartDispatcherOperator instances. DispatcherID is propagated into row events and logging. Changes
Sequence Diagram(s)sequenceDiagram
participant Producer as Kafka Producer
participant Sarama as Sarama Async Producer
participant DispatcherMgr as Dispatcher Manager
participant Heartbeat as HeartBeatCollector
participant Maintainer as Maintainer
participant Operator as RestartDispatcherOperator
Producer->>Sarama: send message (transient Kafka error)
Sarama->>Sarama: isTransientKError(err) == true
Sarama->>Sarama: build ErrorEvent (DispatcherIDs)
Sarama->>DispatcherMgr: non-blocking send to transientErrorCh
DispatcherMgr->>DispatcherMgr: collectRecoverableErrors() -> build RecoverDispatcherRequest
DispatcherMgr->>Heartbeat: Enqueue RecoverDispatcherRequestWithTargetID
Heartbeat->>Maintainer: send RecoverDispatcherRequest (single-target message)
Maintainer->>Maintainer: onRecoverDispatcherRequest() -> decide Restart / Skip / Downgrade
alt restart allowed
Maintainer->>Operator: create NewRestartDispatcherOperator and add to controller
Operator->>Operator: Start() forces restart (Remove -> Add -> Create)
else downgrade
Maintainer->>Maintainer: record fatal RunningError for changefeed
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related issues
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches🧪 Generate unit tests (beta)
Tip Issue Planner is now in beta. Read the docs and try it out! Share your feedback on Discord. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Summary of ChangesHello @3AceShowHand, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request implements a critical error recovery mechanism to enhance the resilience of the system. It enables the Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a mechanism to automatically recover from temporary sink errors by recreating the dispatcher manager. However, a critical vulnerability has been identified: the implementation in DispatcherOrchestrator uses a blocking loop in a shared message processing thread, which can lead to a denial of service for all changefeeds on the node if a single dispatcher manager takes too long to close. Additionally, there are areas for improvement related to the robustness of error detection and the logic for closing the dispatcher manager.
| for time.Now().Before(deadline) { | ||
| if manager.TryClose(false) { | ||
| break | ||
| } | ||
| time.Sleep(200 * time.Millisecond) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This section of code presents a critical denial of service vulnerability. The handleBootstrapRequest function, running in a shared message processing goroutine, contains a loop that blocks for up to one minute while waiting for a dispatcher manager to close. This blocking behavior can stall all changefeed operations on the node if a single dispatcher manager takes too long to close, leading to a significant denial of service for all replication tasks. Additionally, the logic for closing the dispatcher manager can be simplified and made more robust. The initial call to manager.TryClose(false) on line 174 is redundant, and calling it again in the if condition on line 184 after the loop could have unintended side effects if TryClose is not idempotent. A cleaner approach would be to use a flag to track the closure status, avoiding redundant calls and potential issues.
maintainer/maintainer.go
Outdated
| func isRecoverableKafkaAsyncSendError(err *heartbeatpb.RunningError) bool { | ||
| if err == nil { | ||
| return false | ||
| } | ||
| // Dispatcher manager reports changefeed-level codes, so match by message content. | ||
| // The message contains the original RFC error code and the Kafka broker error. | ||
| if !strings.Contains(err.Message, string(errors.ErrKafkaAsyncSendMessage.RFCCode())) { | ||
| return false | ||
| } | ||
| // `NOT_ENOUGH_REPLICAS_AFTER_APPEND`: Messages are written to the log, but to fewer in-sync replicas than required. | ||
| return strings.Contains(err.Message, "fewer in-sync replicas than required") | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Relying on string matching for error messages, as done here to identify a recoverable Kafka error, is brittle and can break if the error message from the Kafka client or server changes in a future version. It would be more robust to rely on structured error codes if the underlying library provides them. If not, this is a known trade-off, but it's worth considering if there's a more stable way to identify this specific error condition.
ad03915 to
418987b
Compare
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
1 similar comment
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
maintainer/operator/operator_move.go (1)
175-223:⚠️ Potential issue | 🔴 CriticalWhen
forceRestartis true (origin == dest) and that node is removed, the operator gets stuck.In
OnNodeRemove, whenn == m.dest(line 187) matches first in statemoveStateRemoveOrigin, the operator transitions tomoveStateAbortRemoveOriginand returns (line 207). The subsequentn == m.origincheck (line 209) is never reached, even though origin is the same node. No furtherOnNodeRemoveor heartbeat will arrive for this node, so the operator permanently stays inmoveStateAbortRemoveOrigin.Consider handling the
origin == destcase explicitly. For example, whenforceRestartis true and the single node is removed, go directly tofinishAsAbsent:Proposed fix
if n == m.dest { + if n == m.origin && m.forceRestart { + // Restart operator: the single node is gone, mark absent immediately. + m.finishAsAbsent() + return + } log.Info("dest node is offline, abort move and reschedule span",
🤖 Fix all issues with AI agents
In `@maintainer/maintainer_recoverable_kafka_error_test.go`:
- Line 1: Add the standard PingCAP 2025 copyright header at the top of this file
before the "package maintainer" declaration to match the other files in the PR;
ensure the header text and year format exactly match the 2025 convention used
elsewhere in the repo so the CI copyright check passes.
In `@pkg/sink/kafka/recoverable_error_event.go`:
- Line 1: Add the standard Apache 2.0 copyright header at the top of this file
(pkg/sink/kafka/recoverable_error_event.go) to match other files in the package
(e.g., sarama_async_producer.go); ensure the header text, license identifier,
and author/project attribution exactly match the existing files and set the year
to 2026 (or match the most recent files if a different year is used), placing it
above the package kafka declaration so CI no longer flags incorrect copyright
information.
In `@pkg/sink/kafka/sarama_async_producer_test.go`:
- Line 1: Add the standard 2025 PingCAP copyright header to the top of the file
before the package declaration to satisfy CI: insert the multi-line copyright
header comment above the existing "package kafka" line in
pkg/sink/kafka/sarama_async_producer_test.go so the file begins with the
required header followed by the package kafka declaration.
In `@pkg/sink/kafka/sarama_async_producer.go`:
- Around line 136-140: The current logic in the error handling branch around
isProducerKErrorRecoverable(err) may escalate recoverable errors to fatal when
reportRecoverableError(err) returns false (e.g., channel full); update the code
in the sarama async producer error path that calls reportRecoverableError to
detect a failed non-blocking send and log a warning indicating the channel was
full before falling through to handleProducerError, and optionally replace the
non-blocking send with a short blocking attempt (with a small timeout) inside
the same branch to give consumers a chance to drain; edit the relevant code that
invokes isProducerKErrorRecoverable, reportRecoverableError, and
handleProducerError to implement the warning log and/or short timeout retry.
- Around line 146-148: The SetRecoverableErrorChan method on saramaAsyncProducer
assigns p.recoverableErrCh without synchronization while AsyncRunCallback
concurrently accesses it; either add synchronization (e.g., a mutex on
saramaAsyncProducer around assignments and reads or use a sync.Once to ensure a
single safe assignment) or document the required call ordering in the public
API/GoDoc for SetRecoverableErrorChan stating it must be called before
AsyncRunCallback starts; update references to p.recoverableErrCh in
AsyncRunCallback to acquire the same mutex or rely on the sync.Once-protected
value accordingly and ensure any new mutex/sync.Once field is added to the
saramaAsyncProducer struct.
🧹 Nitpick comments (7)
maintainer/operator/operator_move.go (1)
111-120:NewRestartDispatcherOperator— consider giving it a distinctType()string.Currently
Type()returns"move"for both move and restart operators. A distinct type (e.g.,"restart") would improve logging, metrics, and debugging, especially since the restart path has different semantics.downstreamadapter/dispatchermanager/dispatcher_manager.go (2)
329-383: Double-wrap on line 368:errors.WrapError(…, errors.Trace(err))
errors.Trace(err)already attaches a stack trace; wrapping the result again withWrapErroris redundant. Use one or the other—WrapErroralone already records the cause.Proposed fix
- e.handleError(ctx, errors.WrapError(errors.ErrKafkaAsyncSendMessage, errors.Trace(err))) + e.handleError(ctx, errors.WrapError(errors.ErrKafkaAsyncSendMessage, err))As per coding guidelines, "wrap it immediately with
errors.Trace(err)orerrors.WrapError(...)… upstream callers should propagate wrapped errors without wrapping again".
374-378: Rename constant to remove stutter in name.The constant
RecoverableKafkaKErrorRunningErrorCode(defined inpkg/sink/kafka/recoverable_error_event.go) contains a stutter: "KafkaK" should be just "Kafka". Rename toRecoverableKafkaErrorRunningErrorCodeto align with its value"CDC:ErrKafkaRecoverableKError"and improve clarity. Currently used in 4 places across the codebase, making this a low-cost refactor.pkg/sink/kafka/sarama_async_producer_test.go (1)
66-118: Solid test for the recoverable error path — verifies non-exit and correct error reporting.The 200ms wait at lines 105-109 is a reasonable heuristic for a unit test; just be aware it could be flaky under heavy CI load.
As per coding guidelines, "Use camelCase for function names and do not include underscores" — the test names use underscores (
TestSaramaAsyncProducer_TransientKErrorDoesNotExitAndReports). Go test conventions commonly use underscores for readability in test names, so this is a minor tension with the project guideline.pkg/sink/kafka/sarama_async_producer.go (1)
165-173:KafkaErrNameandMessagewill always be identical here.On Line 168,
kerr.Error()and on Line 169,err.Err.Error()both resolve to the samesarama.KError.Error()call sinceerr.Erriskerr. TheErrorEventwill always haveKafkaErrName == Messagein this code path.If
Messageis intended to carry additional context (e.g., a wrapped error string), consider usingerr.Error()(theProducerError.Error()) instead, which includes the topic/partition info. Otherwise, one of the two fields is redundant.pkg/sink/kafka/recoverable_error_event.go (2)
46-57: Consider whether the transient error list is complete enough.The current set covers the most common transient Kafka producer errors. A few other potentially transient errors worth considering:
sarama.ErrBrokerNotAvailable— broker temporarily down during rolling restartsarama.ErrKafkaStorageError— transient disk issues on the brokerIf the intent is to be conservative and only recover from well-understood cases, the current list is reasonable. Just flagging for deliberate consideration.
11-11: Exported constant naming is quite long.
RecoverableKafkaKErrorRunningErrorCodehas nested abbreviations (KinKError) and is verbose. A shorter name likeErrCodeKafkaRecoverableorRecoverableKafkaErrorCodewould be easier to read while remaining descriptive.
| if isProducerKErrorRecoverable(err) { | ||
| if p.reportRecoverableError(err) { | ||
| continue | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the channel is full, a recoverable error silently escalates to fatal.
If reportRecoverableError returns false (channel full or not set), execution falls through to handleProducerError, which terminates the callback loop. This means a genuinely recoverable error becomes fatal purely because the consumer didn't drain the channel in time.
Consider logging a warning when the non-blocking send fails, so operators can distinguish "channel full → escalated to fatal" from a truly non-recoverable error. Also consider whether a brief blocking send with a short timeout would be more appropriate than default.
🤖 Prompt for AI Agents
In `@pkg/sink/kafka/sarama_async_producer.go` around lines 136 - 140, The current
logic in the error handling branch around isProducerKErrorRecoverable(err) may
escalate recoverable errors to fatal when reportRecoverableError(err) returns
false (e.g., channel full); update the code in the sarama async producer error
path that calls reportRecoverableError to detect a failed non-blocking send and
log a warning indicating the channel was full before falling through to
handleProducerError, and optionally replace the non-blocking send with a short
blocking attempt (with a small timeout) inside the same branch to give consumers
a chance to drain; edit the relevant code that invokes
isProducerKErrorRecoverable, reportRecoverableError, and handleProducerError to
implement the warning log and/or short timeout retry.
| func (p *saramaAsyncProducer) SetRecoverableErrorChan(ch chan<- *ErrorEvent) { | ||
| p.recoverableErrCh = ch | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# Check how SetRecoverableErrorChan is called relative to AsyncRunCallback
rg -n --type=go -C5 'SetRecoverableErrorChan'Repository: pingcap/ticdc
Length of output: 4305
🏁 Script executed:
# Get the sarama_async_producer.go file to see the full context
cat -n pkg/sink/kafka/sarama_async_producer.go | head -200Repository: pingcap/ticdc
Length of output: 7761
🏁 Script executed:
# Look at the full struct definition and constructor
rg -n --type=go 'saramaAsyncProducer struct' -A 10Repository: pingcap/ticdc
Length of output: 772
🏁 Script executed:
# Check if there's a constructor or New function
rg -n --type=go 'func.*saramaAsyncProducer|func New.*Producer' -A 5 pkg/sink/kafka/sarama_async_producer.goRepository: pingcap/ticdc
Length of output: 1473
Add synchronization guard or document the calling contract for SetRecoverableErrorChan.
The method writes to p.recoverableErrCh without synchronization, but AsyncRunCallback reads and writes to this field concurrently. If SetRecoverableErrorChan is called after AsyncRunCallback has started, a data race occurs. The code currently relies on an implicit contract that it's always called before the loop starts, but this is neither enforced nor documented. Either protect the assignment with synchronization (e.g., a mutex or sync.Once) or explicitly document the required calling order.
🤖 Prompt for AI Agents
In `@pkg/sink/kafka/sarama_async_producer.go` around lines 146 - 148, The
SetRecoverableErrorChan method on saramaAsyncProducer assigns p.recoverableErrCh
without synchronization while AsyncRunCallback concurrently accesses it; either
add synchronization (e.g., a mutex on saramaAsyncProducer around assignments and
reads or use a sync.Once to ensure a single safe assignment) or document the
required call ordering in the public API/GoDoc for SetRecoverableErrorChan
stating it must be called before AsyncRunCallback starts; update references to
p.recoverableErrCh in AsyncRunCallback to acquire the same mutex or rely on the
sync.Once-protected value accordingly and ensure any new mutex/sync.Once field
is added to the saramaAsyncProducer struct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@downstreamadapter/dispatchermanager/dispatcher_manager.go`:
- Around line 343-352: The code currently treats an empty dispatcherIDs as a
changefeed-level fatal by calling e.handleError (which feeds into
collectErrors), causing repeated resend; instead modify the logic around the
dispatcherIDs check in dispatcher_manager.go so that when dispatcherIDs is empty
you do not call e.handleError; either log a warning via log.Warn with the same
context (changefeedID, Kafka fields) and drop the event, or implement a limited
retry path (e.g., enqueue the event for a small number of retries) before
escalating—change the call site that currently invokes e.handleError to perform
the chosen non-escalating behavior and ensure any retry counter/lifetime is
tracked in that local scope so collectErrors is not triggered immediately.
In `@maintainer/maintainer.go`:
- Around line 817-883: handleRecoverableKafkaError currently uses
replication.GetNodeID() as the operator origin and always targets dispatchers
from common.DefaultMode without verifying the reporting node; add a node-origin
consistency check and a mode check: after obtaining replication :=
spanController.GetTaskByID(dispatcherID) and origin := replication.GetNodeID(),
if from != origin then log a warning (similar message to "dispatcher not found")
and continue; also ensure the dispatcher is in DefaultMode (or explicitly skip
non-DefaultMode dispatchers) before creating
operator.NewRestartDispatcherOperator so Kafka errors only restart dispatchers
in the expected mode.
🧹 Nitpick comments (4)
pkg/sink/kafka/error_event.go (1)
13-33: Consider adding JSON tags toErrorReportfields.
ErrorReportis serialized withjson.Marshalindispatcher_manager.goand deserialized inmaintainer.go. While both sides use the same Go struct (so it works correctly today), adding explicit JSON tags would make the serialization contract explicit and resilient to field renames.pkg/sink/kafka/sarama_async_producer.go (1)
150-181:KafkaErrNameandMessagewill always be identical.Since
isTransientKErroralready confirmederr.Erris exactly asarama.KError, the type assertion on Line 154 always succeeds andkerr.Error()(Line 168) equalserr.Err.Error()(Line 169). BothKafkaErrNameandMessagein theErrorEventwill hold the same string. If distinct semantics are intended (e.g., error code name vs. full message), consider usingkerr.Error()for one and a formatted string for the other.downstreamadapter/dispatchermanager/dispatcher_manager.go (1)
329-383: Recoverable error collector looks correct; consider deduplication for error bursts.The implementation correctly extracts dispatcher IDs, falls back to the changefeed error path when IDs are missing, and serializes the report into the heartbeat. One consideration: during a Kafka broker outage, many transient errors can fire in rapid succession for the same dispatcher. Each one triggers a separate heartbeat +
RestartDispatcherOperatorcreation attempt on the maintainer side. While the maintainer'sAddOperatordeduplicates (Line 876 inmaintainer.gologs "already exists, ignore"), this still generates significant heartbeat traffic and JSON serialization overhead.Consider batching or deduplicating errors within a short time window (e.g., only report the first error per dispatcher per N seconds) to reduce noise during extended broker failures.
maintainer/maintainer.go (1)
860-866: Dispatcher not found is logged at Warn — expected during concurrent restarts?When a dispatcher is undergoing removal/restart (e.g., from a previous error cycle),
GetTaskByIDmay return nil. Logging atWarnlevel could be noisy during burst transient-error scenarios where multiple errors arrive for the same dispatcher that's already being restarted. ConsiderDebugorInfolevel to reduce noise.Proposed change
- log.Warn("dispatcher not found, ignore recoverable kafka error", + log.Info("dispatcher not found, ignore recoverable kafka error", zap.Stringer("changefeedID", m.changefeedID), zap.Stringer("dispatcherID", dispatcherID))
maintainer/maintainer.go
Outdated
| func (m *Maintainer) handleRecoverableKafkaError(from node.ID, err *heartbeatpb.RunningError) bool { | ||
| if err == nil || err.Code != kafkapkg.KafkaTransientErrorCode { | ||
| return false | ||
| } | ||
|
|
||
| var report kafkapkg.ErrorReport | ||
| if unmarshalErr := json.Unmarshal([]byte(err.Message), &report); unmarshalErr != nil { | ||
| log.Warn("unmarshal recoverable kafka error report failed", | ||
| zap.Stringer("changefeedID", m.changefeedID), | ||
| zap.Stringer("sourceNode", from), | ||
| zap.Error(unmarshalErr)) | ||
| return false | ||
| } | ||
| if len(report.DispatcherIDs) == 0 { | ||
| log.Warn("recoverable kafka error report has no dispatcher IDs", | ||
| zap.Stringer("changefeedID", m.changefeedID), | ||
| zap.Stringer("sourceNode", from), | ||
| zap.Int16("kafkaErrCode", report.KafkaErrCode), | ||
| zap.String("kafkaErrName", report.KafkaErrName), | ||
| zap.String("topic", report.Topic), | ||
| zap.Int32("partition", report.Partition)) | ||
| return false | ||
| } | ||
|
|
||
| log.Warn("recoverable kafka error received, restart dispatchers", | ||
| zap.Stringer("changefeedID", m.changefeedID), | ||
| zap.Stringer("sourceNode", from), | ||
| zap.Int("dispatcherCount", len(report.DispatcherIDs)), | ||
| zap.Int16("kafkaErrCode", report.KafkaErrCode), | ||
| zap.String("kafkaErrName", report.KafkaErrName), | ||
| zap.String("topic", report.Topic), | ||
| zap.Int32("partition", report.Partition)) | ||
|
|
||
| operatorController := m.controller.getOperatorController(common.DefaultMode) | ||
| spanController := m.controller.getSpanController(common.DefaultMode) | ||
|
|
||
| seen := make(map[common.DispatcherID]struct{}, len(report.DispatcherIDs)) | ||
| for _, dispatcherID := range report.DispatcherIDs { | ||
| if _, ok := seen[dispatcherID]; ok { | ||
| continue | ||
| } | ||
| seen[dispatcherID] = struct{}{} | ||
|
|
||
| replication := spanController.GetTaskByID(dispatcherID) | ||
| if replication == nil { | ||
| log.Warn("dispatcher not found, ignore recoverable kafka error", | ||
| zap.Stringer("changefeedID", m.changefeedID), | ||
| zap.Stringer("dispatcherID", dispatcherID)) | ||
| continue | ||
| } | ||
| origin := replication.GetNodeID() | ||
| if origin == "" { | ||
| log.Warn("dispatcher has empty node ID, ignore recoverable kafka error", | ||
| zap.Stringer("changefeedID", m.changefeedID), | ||
| zap.Stringer("dispatcherID", dispatcherID)) | ||
| continue | ||
| } | ||
|
|
||
| op := operator.NewRestartDispatcherOperator(spanController, replication, origin) | ||
| if ok := operatorController.AddOperator(op); !ok { | ||
| log.Info("restart dispatcher operator already exists, ignore", | ||
| zap.Stringer("changefeedID", m.changefeedID), | ||
| zap.Stringer("dispatcherID", dispatcherID)) | ||
| } | ||
| } | ||
| return true | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify that redo sinks don't use Kafka (confirming DefaultMode is correct)
rg -n --type=go 'func.*NewRestartDispatcherOperator' -A 5Repository: pingcap/ticdc
Length of output: 587
🏁 Script executed:
#!/bin/bash
# Check if redo dispatchers could use Kafka sink
rg -n --type=go 'redoSink.*kafka|redo.*KafkaSink' -iRepository: pingcap/ticdc
Length of output: 39
🏁 Script executed:
#!/bin/bash
# Verify getOperatorController and getSpanController accept mode parameter
rg -n --type=go 'func.*getOperatorController|func.*getSpanController' -A 3Repository: pingcap/ticdc
Length of output: 713
🏁 Script executed:
#!/bin/bash
# Check how the 'from' parameter is used elsewhere in handleRecoverableKafkaError context
rg -n --type=go 'GetNodeID\(\)' -B 2 -A 2 | head -40Repository: pingcap/ticdc
Length of output: 2724
🏁 Script executed:
#!/bin/bash
# Check if there's any node consistency check in the codebase
rg -n --type=go 'from.*origin|origin.*from' -i -B 2 -A 2 | head -40Repository: pingcap/ticdc
Length of output: 2715
🏁 Script executed:
#!/bin/bash
# Search for where handleRecoverableKafkaError is called and how 'from' is obtained
rg -n --type=go 'handleRecoverableKafkaError' -B 5 -A 2Repository: pingcap/ticdc
Length of output: 2063
🏁 Script executed:
#!/bin/bash
# Check the error reporting context - when/how are Kafka errors generated
rg -n --type=go 'KafkaTransientErrorCode' -B 3 -A 3Repository: pingcap/ticdc
Length of output: 2987
🏁 Script executed:
#!/bin/bash
# Look for other heartbeat handlers that process 'from' with node consistency checks
rg -n --type=go 'func.*heartbeat|heartbeatpb.*Status' -A 2 | grep -A 2 'from.*origin\|origin.*from' | head -30Repository: pingcap/ticdc
Length of output: 273
🏁 Script executed:
#!/bin/bash
# Check how dispatchers are reassigned - is GetNodeID always current or can it be stale?
rg -n --type=go 'SetNodeID|replication.*SetNodeID' -B 2 -A 2 | head -40Repository: pingcap/ticdc
Length of output: 2763
🏁 Script executed:
#!/antml:bash
# Verify if Kafka error handling is Kafka-specific (redo mode support)
rg -n --type=go 'KafkaTransientErrorCode|KafkaSink' -B 5 | head -50Repository: pingcap/ticdc
Length of output: 3335
handleRecoverableKafkaError lacks node-origin consistency check found in similar operators.
The method correctly unmarshals the error report, deduplicates dispatcher IDs, validates replication state, and creates RestartDispatcherOperator instances. However:
-
Missing node validation: The method uses
replication.GetNodeID()(current node) as the operator's origin rather than thefromparameter (reporting node). Unlike the similar pattern inMergeDispatcherOperator.Check()(which validatesfrom == m.originNode), this method doesn't validate that the reporting node matches the dispatcher's current node. If a dispatcher is reassigned between error generation and processing, the operator targets the new node rather than the one that reported the error. Consider adding a consistency check:if from != origin { log.Warn(...); continue }. -
Hardcoded
DefaultModeassumption (Lines 850-851): Kafka errors are Kafka-specific and should not apply to redo-mode dispatchers. This assumption appears correct based on current implementation (no evidence of Kafka sinks in redo mode), but document this constraint or add a mode validation to prevent silent failures if redo sink implementations change.
🤖 Prompt for AI Agents
In `@maintainer/maintainer.go` around lines 817 - 883, handleRecoverableKafkaError
currently uses replication.GetNodeID() as the operator origin and always targets
dispatchers from common.DefaultMode without verifying the reporting node; add a
node-origin consistency check and a mode check: after obtaining replication :=
spanController.GetTaskByID(dispatcherID) and origin := replication.GetNodeID(),
if from != origin then log a warning (similar message to "dispatcher not found")
and continue; also ensure the dispatcher is in DefaultMode (or explicitly skip
non-DefaultMode dispatchers) before creating
operator.NewRestartDispatcherOperator so Kafka errors only restart dispatchers
in the expected mode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@maintainer/maintainer.go`:
- Around line 884-893: The code short-circuits the dispatcher loop by calling
m.onError(...heartbeatpb.RunningError...) and returning true inside the loop
when one dispatcher exceeds its restart budget, which skips creating
RestartDispatcherOperator for remaining report.DispatcherIDs; change this by
removing the early return and instead record the downgrade decision (e.g., set a
boolean like downgraded=true or append to a collected list) while continuing to
iterate over dispatcherID/RestartDispatcherOperator creation, then after the
loop emit a single m.onError with the aggregated info and return true if any
dispatcher downgraded; alternatively, if the short-circuit is intentional, add a
clear comment at the return true site explaining the deliberate early-exit
behavior referencing dispatcherID, report.DispatcherIDs,
RestartDispatcherOperator and the heartbeatpb.RunningError call.
In `@maintainer/recoverable_kafka_restart.go`:
- Around line 1-7: Add the standard PingCAP copyright header comment to the top
of the new recoverable_kafka_restart.go file (package maintainer) to match the
existing files (e.g., maintainer/maintainer.go header lines 1-12); insert the
identical header block before the "package maintainer" line so the CI copyright
check passes.
🧹 Nitpick comments (2)
maintainer/recoverable_kafka_restart.go (2)
85-103: Backoff calculation has redundant/overlapping checks — simplify.Lines 92-93 and 95-96 both cap at
recoverableKafkaDispatcherRestartBackoffMax, and the post-loop check at lines 100-102 does the same. The standard pattern is simpler:Proposed simplification
func recoverableKafkaDispatcherRestartBackoff(restartAttempts int) time.Duration { if restartAttempts <= 0 { return 0 } backoff := recoverableKafkaDispatcherRestartBackoffInit for i := 1; i < restartAttempts; i++ { - if backoff >= recoverableKafkaDispatcherRestartBackoffMax { - return recoverableKafkaDispatcherRestartBackoffMax - } - if backoff > recoverableKafkaDispatcherRestartBackoffMax/2 { - return recoverableKafkaDispatcherRestartBackoffMax - } backoff *= 2 + if backoff >= recoverableKafkaDispatcherRestartBackoffMax { + return recoverableKafkaDispatcherRestartBackoffMax + } } - if backoff > recoverableKafkaDispatcherRestartBackoffMax { - return recoverableKafkaDispatcherRestartBackoffMax - } return backoff }
31-46:getRecoverableKafkaDispatcherRestartStateassumes caller holds the lock — document this.This method reads and writes
m.recoverableKafkaRestarts.dispatcherswithout acquiring the mutex. It's currently safe because both callers (shouldDowngrade...andrecord...) hold the lock, but a future caller could easily miss this requirement.Add a brief comment like
// caller must hold m.recoverableKafkaRestarts.Lock().
| m.onError(from, &heartbeatpb.RunningError{ | ||
| Time: time.Now().String(), | ||
| Code: string(errors.ErrKafkaAsyncSendMessage.RFCCode()), | ||
| Message: fmt.Sprintf( | ||
| "recoverable kafka error exceeded dispatcher restart budget, downgrade to changefeed error path, dispatcherID=%s, kafkaErrCode=%d, kafkaErrName=%s, topic=%s, partition=%d, message=%s", | ||
| dispatcherID.String(), report.KafkaErrCode, report.KafkaErrName, report.Topic, report.Partition, report.Message, | ||
| ), | ||
| }) | ||
| return true | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Early return true on downgrade skips remaining dispatchers in the batch.
When one dispatcher exceeds its restart budget, return true at Line 892 exits the loop immediately. Any subsequent dispatcher IDs in report.DispatcherIDs that haven't been iterated yet won't get a RestartDispatcherOperator. Since the entire error report is then considered "handled" (returns true), those dispatchers silently miss their restart.
If the intent is to fail the whole changefeed on any single dispatcher downgrade, this is fine — but the dispatchers that were already restarted earlier in the loop still get operators created unnecessarily (they'll be restarted even though the changefeed is about to error out).
Consider either:
- Processing all dispatchers and collecting downgrade decisions, then acting on them after the loop, or
- At minimum, adding a comment explaining the intentional short-circuit behavior.
🤖 Prompt for AI Agents
In `@maintainer/maintainer.go` around lines 884 - 893, The code short-circuits the
dispatcher loop by calling m.onError(...heartbeatpb.RunningError...) and
returning true inside the loop when one dispatcher exceeds its restart budget,
which skips creating RestartDispatcherOperator for remaining
report.DispatcherIDs; change this by removing the early return and instead
record the downgrade decision (e.g., set a boolean like downgraded=true or
append to a collected list) while continuing to iterate over
dispatcherID/RestartDispatcherOperator creation, then after the loop emit a
single m.onError with the aggregated info and return true if any dispatcher
downgraded; alternatively, if the short-circuit is intentional, add a clear
comment at the return true site explaining the deliberate early-exit behavior
referencing dispatcherID, report.DispatcherIDs, RestartDispatcherOperator and
the heartbeatpb.RunningError call.
| package maintainer | ||
|
|
||
| import ( | ||
| "time" | ||
|
|
||
| "github.com/pingcap/ticdc/pkg/common" | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing copyright header.
This new file is missing the standard PingCAP copyright header that all other .go files in the repo have (e.g., maintainer/maintainer.go lines 1-12). The CI copyright check will likely fail.
Proposed fix
+// Copyright 2025 PingCAP, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
package maintainer📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| package maintainer | |
| import ( | |
| "time" | |
| "github.com/pingcap/ticdc/pkg/common" | |
| ) | |
| // Copyright 2025 PingCAP, Inc. | |
| // | |
| // Licensed under the Apache License, Version 2.0 (the "License"); | |
| // you may not use this file except in compliance with the License. | |
| // You may obtain a copy of the License at | |
| // | |
| // http://www.apache.org/licenses/LICENSE-2.0 | |
| // | |
| // Unless required by applicable law or agreed to in writing, software | |
| // distributed under the License is distributed on an "AS IS" BASIS, | |
| // See the License for the specific language governing permissions and | |
| // limitations under the License. | |
| package maintainer | |
| import ( | |
| "time" | |
| "github.com/pingcap/ticdc/pkg/common" | |
| ) |
🤖 Prompt for AI Agents
In `@maintainer/recoverable_kafka_restart.go` around lines 1 - 7, Add the standard
PingCAP copyright header comment to the top of the new
recoverable_kafka_restart.go file (package maintainer) to match the existing
files (e.g., maintainer/maintainer.go header lines 1-12); insert the identical
header block before the "package maintainer" line so the CI copyright check
passes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@downstreamadapter/sink/kafka/sink.go`:
- Around line 496-497: The ctx.Done() branch currently returns the raw context
error via context.Cause(ctx); change it to return a traced error by wrapping the
cause with errors.Trace (e.g., return errors.Trace(context.Cause(ctx))) to match
the rest of the file where other <-ctx.Done() branches use
errors.Trace(ctx.Err()); update imports if necessary to ensure errors.Trace is
available and keep the same error-wrapping convention used in this file.
In `@pkg/sink/recoverable/error_event.go`:
- Line 1: The file error_event.go in package recoverable is missing the standard
Apache 2.0 copyright header which causes CI to fail; add the same Apache 2.0
header block used across the repo to the very top of error_event.go (above the
package recoverable declaration), matching text and formatting exactly as in
other files (copy from a nearby file to ensure consistency), and ensure no extra
characters or blank lines precede the header so the CI copyright check passes.
🧹 Nitpick comments (6)
maintainer/recoverable_kafka_restart.go (2)
93-111: Backoff computation has redundant branches; consider simplifying.Lines 100-101 (
backoff >= max) are fully subsumed by lines 103-104 (backoff > max/2) plus the final cap at lines 108-110. The logic is correct but can be simplified:Suggested simplification
func recoverableKafkaDispatcherRestartBackoff(restartAttempts int) time.Duration { if restartAttempts <= 0 { return 0 } - backoff := recoverableKafkaDispatcherRestartBackoffInit for i := 1; i < restartAttempts; i++ { - if backoff >= recoverableKafkaDispatcherRestartBackoffMax { - return recoverableKafkaDispatcherRestartBackoffMax - } - if backoff > recoverableKafkaDispatcherRestartBackoffMax/2 { + if backoff >= recoverableKafkaDispatcherRestartBackoffMax/2 { return recoverableKafkaDispatcherRestartBackoffMax } backoff *= 2 } - if backoff > recoverableKafkaDispatcherRestartBackoffMax { - return recoverableKafkaDispatcherRestartBackoffMax - } - return backoff + return min(backoff, recoverableKafkaDispatcherRestartBackoffMax) }
39-54: InitializerecoverableKafkaRestarts.dispatchersin theMaintainerconstructor to remove implicit lock requirement.
getRecoverableKafkaDispatcherRestartStateperforms lazy initialization ofm.recoverableKafkaRestarts.dispatcherswithout acquiring the lock itself, relying on callers to hold it. While both current callers (getRecoverableKafkaDispatcherRestartDecisionandrecordRecoverableKafkaDispatcherRestart) do lock before calling, this implicit lock requirement is fragile and easy to miss in future changes. Initialize the map inNewMaintainerinstead to eliminate the lazy check and make the safety requirement explicit.downstreamadapter/sink/kafka/sink.go (1)
120-130: Local interface duplicatesrecoverable.ErrorEventChanSetter.The
recoverableErrorChanSetterinterface defined here (lines 120-122) has the exact same signature asrecoverable.ErrorEventChanSetterinpkg/sink/recoverable/error_event.go. Use the imported interface directly to avoid duplication.Suggested fix
-type recoverableErrorChanSetter interface { - SetRecoverableErrorChan(ch chan<- *recoverable.ErrorEvent) -} - func (s *sink) SetRecoverableErrorChan(ch chan<- *recoverable.ErrorEvent) { - setter, ok := s.dmlProducer.(recoverableErrorChanSetter) + setter, ok := s.dmlProducer.(recoverable.ErrorEventChanSetter) if !ok { return } setter.SetRecoverableErrorChan(ch) }maintainer/maintainer_recoverable_kafka_error_test.go (1)
19-76: Test function names use underscores, which conflicts with Go naming guideline.Functions like
TestRecoverDispatcherRequest_RestartDispatchersuse underscores. While this is a common Go test convention for readability, the project coding guidelines state to use camelCase without underscores for function names. Consider renaming, e.g.,TestRecoverDispatcherRequestRestartDispatchers.As per coding guidelines, "Use camelCase for function names and do not include underscores (e.g.,
getPartitionNum, notget_partition_num) in Go".downstreamadapter/dispatchermanager/heartbeat_queue.go (2)
109-111: BlockingEnqueuemay stall dispatcher recovery producers if the channel is full.
Enqueueperforms an unconditional blocking send. If the consumer (HeartbeatCollector) is slow or stuck, any goroutine callingEnqueueon the recovery path will block silently. The existingHeartbeatRequestQueuehas the same pattern, but since this queue is specifically for error-recovery, a stall here could delay or prevent recovery of other dispatchers too.Consider a non-blocking send with a log/metric on drop, or at minimum a
selectwithctx.Done()so the caller isn't stuck indefinitely:♻️ Optional: context-aware enqueue
-func (q *RecoverDispatcherRequestQueue) Enqueue(request *RecoverDispatcherRequestWithTargetID) { - q.queue <- request -} +func (q *RecoverDispatcherRequestQueue) Enqueue(ctx context.Context, request *RecoverDispatcherRequestWithTargetID) error { + select { + case q.queue <- request: + return nil + case <-ctx.Done(): + return ctx.Err() + } +}
109-120: No observability on queue depth — consider adding a metric likeBlockStatusRequestQueuedoes.
BlockStatusRequestQueue.Enqueue/Dequeue(lines 73-86) report queue length via a gauge. This new queue lacks equivalent instrumentation, which would be useful for monitoring recovery-request backlog in production.
|
[FORMAT CHECKER NOTIFICATION] Notice: To remove the 📖 For more info, you can check the "Contribute Code" section in the development guide. |
|
@3AceShowHand: The following test failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
What problem does this PR solve?
Issue Number: close #xxx
What is changed and how it works?
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note
Summary by CodeRabbit
New Features
Tests