Skip to content

Conversation

@3AceShowHand
Copy link
Collaborator

@3AceShowHand 3AceShowHand commented Feb 5, 2026

What problem does this PR solve?

Issue Number: close #xxx

What is changed and how it works?

Check List

Tests

  • Unit test
  • Integration test
  • Manual test (add detailed scripts or steps below)
  • No code

Questions

Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?

Release note

Please refer to [Release Notes Language Style Guide](https://pingcap.github.io/tidb-dev-guide/contribute-to-tidb/release-notes-style-guide.html) to write a quality release note.

If you don't think this PR needs a release note then fill it with `None`.

Summary by CodeRabbit

  • New Features

    • Automatic recovery for transient Kafka sink errors: affected dispatchers are restarted with controlled backoff and eventual downgrade to fatal when retries are exhausted.
    • Sinks can report recoverable errors; per-row dispatcher identity is propagated and included in message/log info for improved tracing.
  • Tests

    • Added unit and integration tests covering recoverable error reporting, dispatcher restart flows, backoff/skipping behavior, and downgrade-to-fatal scenarios.

@ti-chi-bot ti-chi-bot bot added do-not-merge/needs-linked-issue release-note Denotes a PR that will be considered when it comes time to generate release notes. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. labels Feb 5, 2026
@coderabbitai
Copy link

coderabbitai bot commented Feb 5, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds end-to-end recoverable Kafka error handling: transient Kafka errors are detected in the producer, emitted as ErrorEvent, collected by DispatcherManager and converted into RecoverDispatcherRequests, then handled by Maintainer which decides per-dispatcher restart, backoff, or downgrade to fatal and may schedule RestartDispatcherOperator instances. DispatcherID is propagated into row events and logging.

Changes

Cohort / File(s) Summary
Maintainer core & restart policy
maintainer/maintainer.go, maintainer/recoverable_kafka_restart.go, maintainer/maintainer_manager.go
Adds handling for RecoverDispatcherRequest, per-dispatcher restart state machine (backoff, attempts, reset), decision logic (Restart / Skip / Downgrade), and records restart state.
Maintainer tests & operators
maintainer/maintainer_recoverable_kafka_error_test.go, maintainer/operator/operator_move.go, maintainer/operator/operator_move_test.go
Introduces tests for restart/backoff/downgrade, adds forceRestart and NewRestartDispatcherOperator to force restart-on-same-node, and tests the forced restart flow.
Dispatcher Manager & heartbeat plumbing
downstreamadapter/dispatchermanager/dispatcher_manager.go, downstreamadapter/dispatchermanager/dispatcher_manager_info.go, downstreamadapter/dispatchermanager/heartbeat_collector.go, downstreamadapter/dispatchermanager/heartbeat_queue.go, downstreamadapter/dispatchermanager/dispatcher_manager_test.go
Wires a recoverable error channel from sinks, adds collectRecoverableErrors goroutine that builds RecoverDispatcherRequests and enqueues them to a new RecoverDispatcherRequestQueue; heartbeat collector now dispatches recover requests; tests added for enqueue behavior and import constraints.
Sink integrations & producer
downstreamadapter/sink/kafka/sink.go, downstreamadapter/sink/pulsar/sink.go, pkg/sink/kafka/sarama_async_producer.go, pkg/sink/kafka/sarama_async_producer_test.go, pkg/sink/kafka/error_event.go
Propagates DispatcherID into row events and logging, adds SetRecoverableErrorChan plumbing, sarama async producer detects transient Kafka errors, reports recoverable ErrorEvents (non-blocking) and preserves non-transient behavior; unit tests for transient vs non-transient handling.
Recoverable error API
pkg/sink/recoverable/error_event.go
Adds ErrorEvent struct and ErrorEventChanSetter interface to standardize recoverable error notifications from sinks.
Event & logging metadata
pkg/common/event/row_change.go, pkg/sink/codec/common/log_info.go, pkg/sink/codec/common/message.go, pkg/sink/codec/common/log_info_test.go
Adds DispatcherID to RowEvent, collects per-message DispatcherIDs in MessageLogInfo, and updates tests to assert dispatcher propagation.
Messaging & proto changes
pkg/messaging/message.go, heartbeatpb/heartbeat.proto
Adds TypeRecoverDispatcherRequest handling in messaging, and new RecoverDispatcherRequest protobuf message plus two fields in Watermark.
Misc & metrics
pkg/sink/kafka/metrics_collector.go
Minor signature cleanup: updateBrokers() no longer accepts context parameter.

Sequence Diagram(s)

sequenceDiagram
    participant Producer as Kafka Producer
    participant Sarama as Sarama Async Producer
    participant DispatcherMgr as Dispatcher Manager
    participant Heartbeat as HeartBeatCollector
    participant Maintainer as Maintainer
    participant Operator as RestartDispatcherOperator

    Producer->>Sarama: send message (transient Kafka error)
    Sarama->>Sarama: isTransientKError(err) == true
    Sarama->>Sarama: build ErrorEvent (DispatcherIDs)
    Sarama->>DispatcherMgr: non-blocking send to transientErrorCh
    DispatcherMgr->>DispatcherMgr: collectRecoverableErrors() -> build RecoverDispatcherRequest
    DispatcherMgr->>Heartbeat: Enqueue RecoverDispatcherRequestWithTargetID
    Heartbeat->>Maintainer: send RecoverDispatcherRequest (single-target message)
    Maintainer->>Maintainer: onRecoverDispatcherRequest() -> decide Restart / Skip / Downgrade
    alt restart allowed
        Maintainer->>Operator: create NewRestartDispatcherOperator and add to controller
        Operator->>Operator: Start() forces restart (Remove -> Add -> Create)
    else downgrade
        Maintainer->>Maintainer: record fatal RunningError for changefeed
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related issues

Poem

🐇 I nibble logs and hop on code so light,
When Kafka hiccups, I send a little light,
Heartbeats carry dispatcher IDs in tow,
Restarts hop in place, then steady we go,
A tiny rabbit cheers: recover and bright ✨

🚥 Pre-merge checks | ✅ 1 | ❌ 2
❌ Failed checks (2 warnings)
Check name Status Explanation Resolution
Description check ⚠️ Warning The PR description is entirely the unfilled template; no actual problem statement, changes, tests, or release notes are provided. All required sections remain as placeholders. Fill in the required sections: link an issue, describe the problem and changes, check applicable test types, answer performance/compatibility questions, and provide a release note.
Docstring Coverage ⚠️ Warning Docstring coverage is 11.90% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (1 passed)
Check name Status Explanation
Title check ✅ Passed The title describes the main change (Kafka sink transient error reporting and dispatcher force restart via move operator), which aligns with the substantial code changes introducing recoverable error handling throughout the system.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Tip

Issue Planner is now in beta. Read the docs and try it out! Share your feedback on Discord.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@gemini-code-assist
Copy link

Summary of Changes

Hello @3AceShowHand, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request implements a critical error recovery mechanism to enhance the resilience of the system. It enables the Maintainer to automatically detect and respond to temporary, recoverable errors originating from the dispatcher manager's sink, particularly in Kafka environments. By introducing a 'force recreate' option for dispatcher managers and updating the message handling logic, the system can now proactively attempt to restore functionality and prevent prolonged service disruptions without requiring manual intervention for these specific error types.

Highlights

  • Dispatcher Manager Re-creation: Introduced a mechanism to force the re-creation of a dispatcher manager, including its sink, when it encounters specific recoverable errors. This is controlled by a new ForceRecreate flag in the MaintainerBootstrapRequest.
  • Maintainer Error Recovery Logic: The Maintainer now includes logic to detect and attempt recovery from certain dispatcher manager errors, specifically Kafka async send errors related to insufficient in-sync replicas. This recovery involves sending a ForceRecreate bootstrap request and re-adding dispatchers, incorporating a backoff strategy for retries.
  • Protobuf Message Enhancement: The MaintainerBootstrapRequest protobuf message has been extended with a ForceRecreate boolean field, along with the necessary Go struct updates and serialization/deserialization logic.
  • Pending Message Queue Priority: The pendingMessageQueue now prioritizes MaintainerBootstrapRequest messages with ForceRecreate=true over those with ForceRecreate=false, ensuring that recovery requests take precedence.
  • Operator Controller Enhancement: A new ReplaceOperator method has been added to the operator.Controller to facilitate recovery workflows by allowing existing operators to be replaced with new ones, effectively taking over a replica set.
  • Testing Improvements: New unit tests, TestPendingMessageQueue_BootstrapRequestForceRecreateOverridesPendingFalse and TestMaintainerSwallowRecoverableKafkaAsyncSendError, have been added to validate the updated message queue logic and the new error recovery mechanism, respectively.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go
    • Added logic to close and recreate a dispatcher manager if ForceRecreate is set in a bootstrap request.
    • Implemented timeout handling for dispatcher manager closure during re-creation.
  • downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator_test.go
    • Added TestPendingMessageQueue_BootstrapRequestForceRecreateOverridesPendingFalse to verify bootstrap request override behavior.
  • downstreamadapter/dispatcherorchestrator/helper.go
    • Updated shouldReplacePendingMessage to incorporate MaintainerBootstrapRequest with ForceRecreate semantics.
    • Added comments to explain the new ForceRecreate priority in the pending message queue.
  • heartbeatpb/heartbeat.pb.go
    • Added ForceRecreate field to the MaintainerBootstrapRequest Go struct.
    • Generated GetForceRecreate, MarshalToSizedBuffer, Size, and Unmarshal methods for the new ForceRecreate field.
  • heartbeatpb/heartbeat.proto
    • Added force_recreate field (field number 8) to the MaintainerBootstrapRequest message definition.
  • maintainer/maintainer.go
    • Introduced dispatcherManagerRecoveryState struct and dispatcherManagerRecovery field to the Maintainer.
    • Implemented tryRecoverDispatcherManagerError to detect and handle recoverable Kafka async send errors.
    • Added helper functions isKafkaSink, isRecoverableKafkaAsyncSendError, and nextDispatcherManagerRecoveryBackoff.
    • Implemented sendForceRecreateBootstrapRequest and forceRecreateDispatchersOnNode for recovery actions.
    • Updated onHeartbeatRequest to integrate the new recovery logic.
    • Modified prepareRecoveryPostBootstrapRequest to support local dispatcher manager reinitialization during recovery.
  • maintainer/maintainer_manager_test.go
    • Added syscall and errors imports.
    • Modified newTestNodeWithListener to gracefully skip tests in sandboxed environments that restrict local TCP port listening.
  • maintainer/maintainer_test.go
    • Added errors import.
    • Updated mockDispatcherManager to include bootstrapReqCh for observing bootstrap requests during testing.
    • Modified onBootstrapRequest in mockDispatcherManager to handle ForceRecreate and send requests to bootstrapReqCh.
    • Updated assertions in TestMaintainerSchedule to check maintainer.initialized.Load().
    • Added TestMaintainerSwallowRecoverableKafkaAsyncSendError to test the new error recovery logic.
  • maintainer/operator/operator_add.go
    • Modified the Start method of AddDispatcherOperator to correctly pass the origin node ID to BindSpanToNode.
  • maintainer/operator/operator_controller.go
    • Added a new ReplaceOperator method to the operator.Controller for replacing existing operators in recovery scenarios.
  • pkg/messaging/helper.go
    • Modified NewMessageCenterForTest to handle net.Listen errors by falling back to a local-only message center, improving test compatibility in sandboxed environments.
    • Added mc.Close() to the cleanup function in NewMessageCenterForTest.
Activity
  • The pull request is currently marked as [DNM] (Do Not Merge), indicating it is a work-in-progress or intended for discussion rather than immediate merging.
  • The changes introduce a new error recovery feature, suggesting active development on system resilience.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a mechanism to automatically recover from temporary sink errors by recreating the dispatcher manager. However, a critical vulnerability has been identified: the implementation in DispatcherOrchestrator uses a blocking loop in a shared message processing thread, which can lead to a denial of service for all changefeeds on the node if a single dispatcher manager takes too long to close. Additionally, there are areas for improvement related to the robustness of error detection and the logic for closing the dispatcher manager.

Comment on lines 178 to 183
for time.Now().Before(deadline) {
if manager.TryClose(false) {
break
}
time.Sleep(200 * time.Millisecond)
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

This section of code presents a critical denial of service vulnerability. The handleBootstrapRequest function, running in a shared message processing goroutine, contains a loop that blocks for up to one minute while waiting for a dispatcher manager to close. This blocking behavior can stall all changefeed operations on the node if a single dispatcher manager takes too long to close, leading to a significant denial of service for all replication tasks. Additionally, the logic for closing the dispatcher manager can be simplified and made more robust. The initial call to manager.TryClose(false) on line 174 is redundant, and calling it again in the if condition on line 184 after the loop could have unintended side effects if TryClose is not idempotent. A cleaner approach would be to use a flag to track the closure status, avoiding redundant calls and potential issues.

Comment on lines 931 to 942
func isRecoverableKafkaAsyncSendError(err *heartbeatpb.RunningError) bool {
if err == nil {
return false
}
// Dispatcher manager reports changefeed-level codes, so match by message content.
// The message contains the original RFC error code and the Kafka broker error.
if !strings.Contains(err.Message, string(errors.ErrKafkaAsyncSendMessage.RFCCode())) {
return false
}
// `NOT_ENOUGH_REPLICAS_AFTER_APPEND`: Messages are written to the log, but to fewer in-sync replicas than required.
return strings.Contains(err.Message, "fewer in-sync replicas than required")
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Relying on string matching for error messages, as done here to identify a recoverable Kafka error, is brittle and can break if the error message from the Kafka client or server changes in a future version. It would be more robust to rely on structured error codes if the underlying library provides them. If not, this is a known trade-off, but it's worth considering if there's a more stable way to identify this specific error condition.

@3AceShowHand 3AceShowHand force-pushed the retry-sink-error-at-maintainer branch from ad03915 to 418987b Compare February 9, 2026 08:14
@ti-chi-bot ti-chi-bot bot added size/XS Denotes a PR that changes 0-9 lines, ignoring generated files. and removed size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. labels Feb 9, 2026
@3AceShowHand 3AceShowHand reopened this Feb 9, 2026
@ti-chi-bot
Copy link

ti-chi-bot bot commented Feb 9, 2026

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign charlescheung96 for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

1 similar comment
@ti-chi-bot
Copy link

ti-chi-bot bot commented Feb 9, 2026

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign charlescheung96 for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@ti-chi-bot ti-chi-bot bot added size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. and removed size/XS Denotes a PR that changes 0-9 lines, ignoring generated files. labels Feb 9, 2026
@3AceShowHand 3AceShowHand changed the title [DNM] maintainer: recreate the dispatcher manager if sink meet temporary errors [DNM] maintainer: kafka sink report transient error and just recreate the dispatcher by force move operator. Feb 9, 2026
@3AceShowHand 3AceShowHand changed the title [DNM] maintainer: kafka sink report transient error and just recreate the dispatcher by force move operator. [DNM] maintainer: kafka sink report transient error and just recreate the dispatcher by force move operator Feb 9, 2026
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
maintainer/operator/operator_move.go (1)

175-223: ⚠️ Potential issue | 🔴 Critical

When forceRestart is true (origin == dest) and that node is removed, the operator gets stuck.

In OnNodeRemove, when n == m.dest (line 187) matches first in state moveStateRemoveOrigin, the operator transitions to moveStateAbortRemoveOrigin and returns (line 207). The subsequent n == m.origin check (line 209) is never reached, even though origin is the same node. No further OnNodeRemove or heartbeat will arrive for this node, so the operator permanently stays in moveStateAbortRemoveOrigin.

Consider handling the origin == dest case explicitly. For example, when forceRestart is true and the single node is removed, go directly to finishAsAbsent:

Proposed fix
 	if n == m.dest {
+		if n == m.origin && m.forceRestart {
+			// Restart operator: the single node is gone, mark absent immediately.
+			m.finishAsAbsent()
+			return
+		}
 		log.Info("dest node is offline, abort move and reschedule span",
🤖 Fix all issues with AI agents
In `@maintainer/maintainer_recoverable_kafka_error_test.go`:
- Line 1: Add the standard PingCAP 2025 copyright header at the top of this file
before the "package maintainer" declaration to match the other files in the PR;
ensure the header text and year format exactly match the 2025 convention used
elsewhere in the repo so the CI copyright check passes.

In `@pkg/sink/kafka/recoverable_error_event.go`:
- Line 1: Add the standard Apache 2.0 copyright header at the top of this file
(pkg/sink/kafka/recoverable_error_event.go) to match other files in the package
(e.g., sarama_async_producer.go); ensure the header text, license identifier,
and author/project attribution exactly match the existing files and set the year
to 2026 (or match the most recent files if a different year is used), placing it
above the package kafka declaration so CI no longer flags incorrect copyright
information.

In `@pkg/sink/kafka/sarama_async_producer_test.go`:
- Line 1: Add the standard 2025 PingCAP copyright header to the top of the file
before the package declaration to satisfy CI: insert the multi-line copyright
header comment above the existing "package kafka" line in
pkg/sink/kafka/sarama_async_producer_test.go so the file begins with the
required header followed by the package kafka declaration.

In `@pkg/sink/kafka/sarama_async_producer.go`:
- Around line 136-140: The current logic in the error handling branch around
isProducerKErrorRecoverable(err) may escalate recoverable errors to fatal when
reportRecoverableError(err) returns false (e.g., channel full); update the code
in the sarama async producer error path that calls reportRecoverableError to
detect a failed non-blocking send and log a warning indicating the channel was
full before falling through to handleProducerError, and optionally replace the
non-blocking send with a short blocking attempt (with a small timeout) inside
the same branch to give consumers a chance to drain; edit the relevant code that
invokes isProducerKErrorRecoverable, reportRecoverableError, and
handleProducerError to implement the warning log and/or short timeout retry.
- Around line 146-148: The SetRecoverableErrorChan method on saramaAsyncProducer
assigns p.recoverableErrCh without synchronization while AsyncRunCallback
concurrently accesses it; either add synchronization (e.g., a mutex on
saramaAsyncProducer around assignments and reads or use a sync.Once to ensure a
single safe assignment) or document the required call ordering in the public
API/GoDoc for SetRecoverableErrorChan stating it must be called before
AsyncRunCallback starts; update references to p.recoverableErrCh in
AsyncRunCallback to acquire the same mutex or rely on the sync.Once-protected
value accordingly and ensure any new mutex/sync.Once field is added to the
saramaAsyncProducer struct.
🧹 Nitpick comments (7)
maintainer/operator/operator_move.go (1)

111-120: NewRestartDispatcherOperator — consider giving it a distinct Type() string.

Currently Type() returns "move" for both move and restart operators. A distinct type (e.g., "restart") would improve logging, metrics, and debugging, especially since the restart path has different semantics.

downstreamadapter/dispatchermanager/dispatcher_manager.go (2)

329-383: Double-wrap on line 368: errors.WrapError(…, errors.Trace(err))

errors.Trace(err) already attaches a stack trace; wrapping the result again with WrapError is redundant. Use one or the other—WrapError alone already records the cause.

Proposed fix
-			e.handleError(ctx, errors.WrapError(errors.ErrKafkaAsyncSendMessage, errors.Trace(err)))
+			e.handleError(ctx, errors.WrapError(errors.ErrKafkaAsyncSendMessage, err))

As per coding guidelines, "wrap it immediately with errors.Trace(err) or errors.WrapError(...)… upstream callers should propagate wrapped errors without wrapping again".


374-378: Rename constant to remove stutter in name.

The constant RecoverableKafkaKErrorRunningErrorCode (defined in pkg/sink/kafka/recoverable_error_event.go) contains a stutter: "KafkaK" should be just "Kafka". Rename to RecoverableKafkaErrorRunningErrorCode to align with its value "CDC:ErrKafkaRecoverableKError" and improve clarity. Currently used in 4 places across the codebase, making this a low-cost refactor.

pkg/sink/kafka/sarama_async_producer_test.go (1)

66-118: Solid test for the recoverable error path — verifies non-exit and correct error reporting.

The 200ms wait at lines 105-109 is a reasonable heuristic for a unit test; just be aware it could be flaky under heavy CI load.

As per coding guidelines, "Use camelCase for function names and do not include underscores" — the test names use underscores (TestSaramaAsyncProducer_TransientKErrorDoesNotExitAndReports). Go test conventions commonly use underscores for readability in test names, so this is a minor tension with the project guideline.

pkg/sink/kafka/sarama_async_producer.go (1)

165-173: KafkaErrName and Message will always be identical here.

On Line 168, kerr.Error() and on Line 169, err.Err.Error() both resolve to the same sarama.KError.Error() call since err.Err is kerr. The ErrorEvent will always have KafkaErrName == Message in this code path.

If Message is intended to carry additional context (e.g., a wrapped error string), consider using err.Error() (the ProducerError.Error()) instead, which includes the topic/partition info. Otherwise, one of the two fields is redundant.

pkg/sink/kafka/recoverable_error_event.go (2)

46-57: Consider whether the transient error list is complete enough.

The current set covers the most common transient Kafka producer errors. A few other potentially transient errors worth considering:

  • sarama.ErrBrokerNotAvailable — broker temporarily down during rolling restart
  • sarama.ErrKafkaStorageError — transient disk issues on the broker

If the intent is to be conservative and only recover from well-understood cases, the current list is reasonable. Just flagging for deliberate consideration.


11-11: Exported constant naming is quite long.

RecoverableKafkaKErrorRunningErrorCode has nested abbreviations (K in KError) and is verbose. A shorter name like ErrCodeKafkaRecoverable or RecoverableKafkaErrorCode would be easier to read while remaining descriptive.

Comment on lines 136 to 140
if isProducerKErrorRecoverable(err) {
if p.reportRecoverableError(err) {
continue
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

When the channel is full, a recoverable error silently escalates to fatal.

If reportRecoverableError returns false (channel full or not set), execution falls through to handleProducerError, which terminates the callback loop. This means a genuinely recoverable error becomes fatal purely because the consumer didn't drain the channel in time.

Consider logging a warning when the non-blocking send fails, so operators can distinguish "channel full → escalated to fatal" from a truly non-recoverable error. Also consider whether a brief blocking send with a short timeout would be more appropriate than default.

🤖 Prompt for AI Agents
In `@pkg/sink/kafka/sarama_async_producer.go` around lines 136 - 140, The current
logic in the error handling branch around isProducerKErrorRecoverable(err) may
escalate recoverable errors to fatal when reportRecoverableError(err) returns
false (e.g., channel full); update the code in the sarama async producer error
path that calls reportRecoverableError to detect a failed non-blocking send and
log a warning indicating the channel was full before falling through to
handleProducerError, and optionally replace the non-blocking send with a short
blocking attempt (with a small timeout) inside the same branch to give consumers
a chance to drain; edit the relevant code that invokes
isProducerKErrorRecoverable, reportRecoverableError, and handleProducerError to
implement the warning log and/or short timeout retry.

Comment on lines 146 to 148
func (p *saramaAsyncProducer) SetRecoverableErrorChan(ch chan<- *ErrorEvent) {
p.recoverableErrCh = ch
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

# Check how SetRecoverableErrorChan is called relative to AsyncRunCallback
rg -n --type=go -C5 'SetRecoverableErrorChan'

Repository: pingcap/ticdc

Length of output: 4305


🏁 Script executed:

# Get the sarama_async_producer.go file to see the full context
cat -n pkg/sink/kafka/sarama_async_producer.go | head -200

Repository: pingcap/ticdc

Length of output: 7761


🏁 Script executed:

# Look at the full struct definition and constructor
rg -n --type=go 'saramaAsyncProducer struct' -A 10

Repository: pingcap/ticdc

Length of output: 772


🏁 Script executed:

# Check if there's a constructor or New function
rg -n --type=go 'func.*saramaAsyncProducer|func New.*Producer' -A 5 pkg/sink/kafka/sarama_async_producer.go

Repository: pingcap/ticdc

Length of output: 1473


Add synchronization guard or document the calling contract for SetRecoverableErrorChan.

The method writes to p.recoverableErrCh without synchronization, but AsyncRunCallback reads and writes to this field concurrently. If SetRecoverableErrorChan is called after AsyncRunCallback has started, a data race occurs. The code currently relies on an implicit contract that it's always called before the loop starts, but this is neither enforced nor documented. Either protect the assignment with synchronization (e.g., a mutex or sync.Once) or explicitly document the required calling order.

🤖 Prompt for AI Agents
In `@pkg/sink/kafka/sarama_async_producer.go` around lines 146 - 148, The
SetRecoverableErrorChan method on saramaAsyncProducer assigns p.recoverableErrCh
without synchronization while AsyncRunCallback concurrently accesses it; either
add synchronization (e.g., a mutex on saramaAsyncProducer around assignments and
reads or use a sync.Once to ensure a single safe assignment) or document the
required call ordering in the public API/GoDoc for SetRecoverableErrorChan
stating it must be called before AsyncRunCallback starts; update references to
p.recoverableErrCh in AsyncRunCallback to acquire the same mutex or rely on the
sync.Once-protected value accordingly and ensure any new mutex/sync.Once field
is added to the saramaAsyncProducer struct.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@downstreamadapter/dispatchermanager/dispatcher_manager.go`:
- Around line 343-352: The code currently treats an empty dispatcherIDs as a
changefeed-level fatal by calling e.handleError (which feeds into
collectErrors), causing repeated resend; instead modify the logic around the
dispatcherIDs check in dispatcher_manager.go so that when dispatcherIDs is empty
you do not call e.handleError; either log a warning via log.Warn with the same
context (changefeedID, Kafka fields) and drop the event, or implement a limited
retry path (e.g., enqueue the event for a small number of retries) before
escalating—change the call site that currently invokes e.handleError to perform
the chosen non-escalating behavior and ensure any retry counter/lifetime is
tracked in that local scope so collectErrors is not triggered immediately.

In `@maintainer/maintainer.go`:
- Around line 817-883: handleRecoverableKafkaError currently uses
replication.GetNodeID() as the operator origin and always targets dispatchers
from common.DefaultMode without verifying the reporting node; add a node-origin
consistency check and a mode check: after obtaining replication :=
spanController.GetTaskByID(dispatcherID) and origin := replication.GetNodeID(),
if from != origin then log a warning (similar message to "dispatcher not found")
and continue; also ensure the dispatcher is in DefaultMode (or explicitly skip
non-DefaultMode dispatchers) before creating
operator.NewRestartDispatcherOperator so Kafka errors only restart dispatchers
in the expected mode.
🧹 Nitpick comments (4)
pkg/sink/kafka/error_event.go (1)

13-33: Consider adding JSON tags to ErrorReport fields.

ErrorReport is serialized with json.Marshal in dispatcher_manager.go and deserialized in maintainer.go. While both sides use the same Go struct (so it works correctly today), adding explicit JSON tags would make the serialization contract explicit and resilient to field renames.

pkg/sink/kafka/sarama_async_producer.go (1)

150-181: KafkaErrName and Message will always be identical.

Since isTransientKError already confirmed err.Err is exactly a sarama.KError, the type assertion on Line 154 always succeeds and kerr.Error() (Line 168) equals err.Err.Error() (Line 169). Both KafkaErrName and Message in the ErrorEvent will hold the same string. If distinct semantics are intended (e.g., error code name vs. full message), consider using kerr.Error() for one and a formatted string for the other.

downstreamadapter/dispatchermanager/dispatcher_manager.go (1)

329-383: Recoverable error collector looks correct; consider deduplication for error bursts.

The implementation correctly extracts dispatcher IDs, falls back to the changefeed error path when IDs are missing, and serializes the report into the heartbeat. One consideration: during a Kafka broker outage, many transient errors can fire in rapid succession for the same dispatcher. Each one triggers a separate heartbeat + RestartDispatcherOperator creation attempt on the maintainer side. While the maintainer's AddOperator deduplicates (Line 876 in maintainer.go logs "already exists, ignore"), this still generates significant heartbeat traffic and JSON serialization overhead.

Consider batching or deduplicating errors within a short time window (e.g., only report the first error per dispatcher per N seconds) to reduce noise during extended broker failures.

maintainer/maintainer.go (1)

860-866: Dispatcher not found is logged at Warn — expected during concurrent restarts?

When a dispatcher is undergoing removal/restart (e.g., from a previous error cycle), GetTaskByID may return nil. Logging at Warn level could be noisy during burst transient-error scenarios where multiple errors arrive for the same dispatcher that's already being restarted. Consider Debug or Info level to reduce noise.

Proposed change
-		log.Warn("dispatcher not found, ignore recoverable kafka error",
+		log.Info("dispatcher not found, ignore recoverable kafka error",
 			zap.Stringer("changefeedID", m.changefeedID),
 			zap.Stringer("dispatcherID", dispatcherID))

Comment on lines 817 to 883
func (m *Maintainer) handleRecoverableKafkaError(from node.ID, err *heartbeatpb.RunningError) bool {
if err == nil || err.Code != kafkapkg.KafkaTransientErrorCode {
return false
}

var report kafkapkg.ErrorReport
if unmarshalErr := json.Unmarshal([]byte(err.Message), &report); unmarshalErr != nil {
log.Warn("unmarshal recoverable kafka error report failed",
zap.Stringer("changefeedID", m.changefeedID),
zap.Stringer("sourceNode", from),
zap.Error(unmarshalErr))
return false
}
if len(report.DispatcherIDs) == 0 {
log.Warn("recoverable kafka error report has no dispatcher IDs",
zap.Stringer("changefeedID", m.changefeedID),
zap.Stringer("sourceNode", from),
zap.Int16("kafkaErrCode", report.KafkaErrCode),
zap.String("kafkaErrName", report.KafkaErrName),
zap.String("topic", report.Topic),
zap.Int32("partition", report.Partition))
return false
}

log.Warn("recoverable kafka error received, restart dispatchers",
zap.Stringer("changefeedID", m.changefeedID),
zap.Stringer("sourceNode", from),
zap.Int("dispatcherCount", len(report.DispatcherIDs)),
zap.Int16("kafkaErrCode", report.KafkaErrCode),
zap.String("kafkaErrName", report.KafkaErrName),
zap.String("topic", report.Topic),
zap.Int32("partition", report.Partition))

operatorController := m.controller.getOperatorController(common.DefaultMode)
spanController := m.controller.getSpanController(common.DefaultMode)

seen := make(map[common.DispatcherID]struct{}, len(report.DispatcherIDs))
for _, dispatcherID := range report.DispatcherIDs {
if _, ok := seen[dispatcherID]; ok {
continue
}
seen[dispatcherID] = struct{}{}

replication := spanController.GetTaskByID(dispatcherID)
if replication == nil {
log.Warn("dispatcher not found, ignore recoverable kafka error",
zap.Stringer("changefeedID", m.changefeedID),
zap.Stringer("dispatcherID", dispatcherID))
continue
}
origin := replication.GetNodeID()
if origin == "" {
log.Warn("dispatcher has empty node ID, ignore recoverable kafka error",
zap.Stringer("changefeedID", m.changefeedID),
zap.Stringer("dispatcherID", dispatcherID))
continue
}

op := operator.NewRestartDispatcherOperator(spanController, replication, origin)
if ok := operatorController.AddOperator(op); !ok {
log.Info("restart dispatcher operator already exists, ignore",
zap.Stringer("changefeedID", m.changefeedID),
zap.Stringer("dispatcherID", dispatcherID))
}
}
return true
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify that redo sinks don't use Kafka (confirming DefaultMode is correct)
rg -n --type=go 'func.*NewRestartDispatcherOperator' -A 5

Repository: pingcap/ticdc

Length of output: 587


🏁 Script executed:

#!/bin/bash
# Check if redo dispatchers could use Kafka sink
rg -n --type=go 'redoSink.*kafka|redo.*KafkaSink' -i

Repository: pingcap/ticdc

Length of output: 39


🏁 Script executed:

#!/bin/bash
# Verify getOperatorController and getSpanController accept mode parameter
rg -n --type=go 'func.*getOperatorController|func.*getSpanController' -A 3

Repository: pingcap/ticdc

Length of output: 713


🏁 Script executed:

#!/bin/bash
# Check how the 'from' parameter is used elsewhere in handleRecoverableKafkaError context
rg -n --type=go 'GetNodeID\(\)' -B 2 -A 2 | head -40

Repository: pingcap/ticdc

Length of output: 2724


🏁 Script executed:

#!/bin/bash
# Check if there's any node consistency check in the codebase
rg -n --type=go 'from.*origin|origin.*from' -i -B 2 -A 2 | head -40

Repository: pingcap/ticdc

Length of output: 2715


🏁 Script executed:

#!/bin/bash
# Search for where handleRecoverableKafkaError is called and how 'from' is obtained
rg -n --type=go 'handleRecoverableKafkaError' -B 5 -A 2

Repository: pingcap/ticdc

Length of output: 2063


🏁 Script executed:

#!/bin/bash
# Check the error reporting context - when/how are Kafka errors generated
rg -n --type=go 'KafkaTransientErrorCode' -B 3 -A 3

Repository: pingcap/ticdc

Length of output: 2987


🏁 Script executed:

#!/bin/bash
# Look for other heartbeat handlers that process 'from' with node consistency checks
rg -n --type=go 'func.*heartbeat|heartbeatpb.*Status' -A 2 | grep -A 2 'from.*origin\|origin.*from' | head -30

Repository: pingcap/ticdc

Length of output: 273


🏁 Script executed:

#!/bin/bash
# Check how dispatchers are reassigned - is GetNodeID always current or can it be stale?
rg -n --type=go 'SetNodeID|replication.*SetNodeID' -B 2 -A 2 | head -40

Repository: pingcap/ticdc

Length of output: 2763


🏁 Script executed:

#!/antml:bash
# Verify if Kafka error handling is Kafka-specific (redo mode support)
rg -n --type=go 'KafkaTransientErrorCode|KafkaSink' -B 5 | head -50

Repository: pingcap/ticdc

Length of output: 3335


handleRecoverableKafkaError lacks node-origin consistency check found in similar operators.

The method correctly unmarshals the error report, deduplicates dispatcher IDs, validates replication state, and creates RestartDispatcherOperator instances. However:

  1. Missing node validation: The method uses replication.GetNodeID() (current node) as the operator's origin rather than the from parameter (reporting node). Unlike the similar pattern in MergeDispatcherOperator.Check() (which validates from == m.originNode), this method doesn't validate that the reporting node matches the dispatcher's current node. If a dispatcher is reassigned between error generation and processing, the operator targets the new node rather than the one that reported the error. Consider adding a consistency check: if from != origin { log.Warn(...); continue }.

  2. Hardcoded DefaultMode assumption (Lines 850-851): Kafka errors are Kafka-specific and should not apply to redo-mode dispatchers. This assumption appears correct based on current implementation (no evidence of Kafka sinks in redo mode), but document this constraint or add a mode validation to prevent silent failures if redo sink implementations change.

🤖 Prompt for AI Agents
In `@maintainer/maintainer.go` around lines 817 - 883, handleRecoverableKafkaError
currently uses replication.GetNodeID() as the operator origin and always targets
dispatchers from common.DefaultMode without verifying the reporting node; add a
node-origin consistency check and a mode check: after obtaining replication :=
spanController.GetTaskByID(dispatcherID) and origin := replication.GetNodeID(),
if from != origin then log a warning (similar message to "dispatcher not found")
and continue; also ensure the dispatcher is in DefaultMode (or explicitly skip
non-DefaultMode dispatchers) before creating
operator.NewRestartDispatcherOperator so Kafka errors only restart dispatchers
in the expected mode.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@maintainer/maintainer.go`:
- Around line 884-893: The code short-circuits the dispatcher loop by calling
m.onError(...heartbeatpb.RunningError...) and returning true inside the loop
when one dispatcher exceeds its restart budget, which skips creating
RestartDispatcherOperator for remaining report.DispatcherIDs; change this by
removing the early return and instead record the downgrade decision (e.g., set a
boolean like downgraded=true or append to a collected list) while continuing to
iterate over dispatcherID/RestartDispatcherOperator creation, then after the
loop emit a single m.onError with the aggregated info and return true if any
dispatcher downgraded; alternatively, if the short-circuit is intentional, add a
clear comment at the return true site explaining the deliberate early-exit
behavior referencing dispatcherID, report.DispatcherIDs,
RestartDispatcherOperator and the heartbeatpb.RunningError call.

In `@maintainer/recoverable_kafka_restart.go`:
- Around line 1-7: Add the standard PingCAP copyright header comment to the top
of the new recoverable_kafka_restart.go file (package maintainer) to match the
existing files (e.g., maintainer/maintainer.go header lines 1-12); insert the
identical header block before the "package maintainer" line so the CI copyright
check passes.
🧹 Nitpick comments (2)
maintainer/recoverable_kafka_restart.go (2)

85-103: Backoff calculation has redundant/overlapping checks — simplify.

Lines 92-93 and 95-96 both cap at recoverableKafkaDispatcherRestartBackoffMax, and the post-loop check at lines 100-102 does the same. The standard pattern is simpler:

Proposed simplification
 func recoverableKafkaDispatcherRestartBackoff(restartAttempts int) time.Duration {
 	if restartAttempts <= 0 {
 		return 0
 	}
 
 	backoff := recoverableKafkaDispatcherRestartBackoffInit
 	for i := 1; i < restartAttempts; i++ {
-		if backoff >= recoverableKafkaDispatcherRestartBackoffMax {
-			return recoverableKafkaDispatcherRestartBackoffMax
-		}
-		if backoff > recoverableKafkaDispatcherRestartBackoffMax/2 {
-			return recoverableKafkaDispatcherRestartBackoffMax
-		}
 		backoff *= 2
+		if backoff >= recoverableKafkaDispatcherRestartBackoffMax {
+			return recoverableKafkaDispatcherRestartBackoffMax
+		}
 	}
-	if backoff > recoverableKafkaDispatcherRestartBackoffMax {
-		return recoverableKafkaDispatcherRestartBackoffMax
-	}
 	return backoff
 }

31-46: getRecoverableKafkaDispatcherRestartState assumes caller holds the lock — document this.

This method reads and writes m.recoverableKafkaRestarts.dispatchers without acquiring the mutex. It's currently safe because both callers (shouldDowngrade... and record...) hold the lock, but a future caller could easily miss this requirement.

Add a brief comment like // caller must hold m.recoverableKafkaRestarts.Lock().

Comment on lines 884 to 893
m.onError(from, &heartbeatpb.RunningError{
Time: time.Now().String(),
Code: string(errors.ErrKafkaAsyncSendMessage.RFCCode()),
Message: fmt.Sprintf(
"recoverable kafka error exceeded dispatcher restart budget, downgrade to changefeed error path, dispatcherID=%s, kafkaErrCode=%d, kafkaErrName=%s, topic=%s, partition=%d, message=%s",
dispatcherID.String(), report.KafkaErrCode, report.KafkaErrName, report.Topic, report.Partition, report.Message,
),
})
return true
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Early return true on downgrade skips remaining dispatchers in the batch.

When one dispatcher exceeds its restart budget, return true at Line 892 exits the loop immediately. Any subsequent dispatcher IDs in report.DispatcherIDs that haven't been iterated yet won't get a RestartDispatcherOperator. Since the entire error report is then considered "handled" (returns true), those dispatchers silently miss their restart.

If the intent is to fail the whole changefeed on any single dispatcher downgrade, this is fine — but the dispatchers that were already restarted earlier in the loop still get operators created unnecessarily (they'll be restarted even though the changefeed is about to error out).

Consider either:

  1. Processing all dispatchers and collecting downgrade decisions, then acting on them after the loop, or
  2. At minimum, adding a comment explaining the intentional short-circuit behavior.
🤖 Prompt for AI Agents
In `@maintainer/maintainer.go` around lines 884 - 893, The code short-circuits the
dispatcher loop by calling m.onError(...heartbeatpb.RunningError...) and
returning true inside the loop when one dispatcher exceeds its restart budget,
which skips creating RestartDispatcherOperator for remaining
report.DispatcherIDs; change this by removing the early return and instead
record the downgrade decision (e.g., set a boolean like downgraded=true or
append to a collected list) while continuing to iterate over
dispatcherID/RestartDispatcherOperator creation, then after the loop emit a
single m.onError with the aggregated info and return true if any dispatcher
downgraded; alternatively, if the short-circuit is intentional, add a clear
comment at the return true site explaining the deliberate early-exit behavior
referencing dispatcherID, report.DispatcherIDs, RestartDispatcherOperator and
the heartbeatpb.RunningError call.

Comment on lines 1 to 7
package maintainer

import (
"time"

"github.com/pingcap/ticdc/pkg/common"
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Missing copyright header.

This new file is missing the standard PingCAP copyright header that all other .go files in the repo have (e.g., maintainer/maintainer.go lines 1-12). The CI copyright check will likely fail.

Proposed fix
+// Copyright 2025 PingCAP, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
 package maintainer
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
package maintainer
import (
"time"
"github.com/pingcap/ticdc/pkg/common"
)
// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package maintainer
import (
"time"
"github.com/pingcap/ticdc/pkg/common"
)
🤖 Prompt for AI Agents
In `@maintainer/recoverable_kafka_restart.go` around lines 1 - 7, Add the standard
PingCAP copyright header comment to the top of the new
recoverable_kafka_restart.go file (package maintainer) to match the existing
files (e.g., maintainer/maintainer.go header lines 1-12); insert the identical
header block before the "package maintainer" line so the CI copyright check
passes.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@downstreamadapter/sink/kafka/sink.go`:
- Around line 496-497: The ctx.Done() branch currently returns the raw context
error via context.Cause(ctx); change it to return a traced error by wrapping the
cause with errors.Trace (e.g., return errors.Trace(context.Cause(ctx))) to match
the rest of the file where other <-ctx.Done() branches use
errors.Trace(ctx.Err()); update imports if necessary to ensure errors.Trace is
available and keep the same error-wrapping convention used in this file.

In `@pkg/sink/recoverable/error_event.go`:
- Line 1: The file error_event.go in package recoverable is missing the standard
Apache 2.0 copyright header which causes CI to fail; add the same Apache 2.0
header block used across the repo to the very top of error_event.go (above the
package recoverable declaration), matching text and formatting exactly as in
other files (copy from a nearby file to ensure consistency), and ensure no extra
characters or blank lines precede the header so the CI copyright check passes.
🧹 Nitpick comments (6)
maintainer/recoverable_kafka_restart.go (2)

93-111: Backoff computation has redundant branches; consider simplifying.

Lines 100-101 (backoff >= max) are fully subsumed by lines 103-104 (backoff > max/2) plus the final cap at lines 108-110. The logic is correct but can be simplified:

Suggested simplification
 func recoverableKafkaDispatcherRestartBackoff(restartAttempts int) time.Duration {
 	if restartAttempts <= 0 {
 		return 0
 	}
-
 	backoff := recoverableKafkaDispatcherRestartBackoffInit
 	for i := 1; i < restartAttempts; i++ {
-		if backoff >= recoverableKafkaDispatcherRestartBackoffMax {
-			return recoverableKafkaDispatcherRestartBackoffMax
-		}
-		if backoff > recoverableKafkaDispatcherRestartBackoffMax/2 {
+		if backoff >= recoverableKafkaDispatcherRestartBackoffMax/2 {
 			return recoverableKafkaDispatcherRestartBackoffMax
 		}
 		backoff *= 2
 	}
-	if backoff > recoverableKafkaDispatcherRestartBackoffMax {
-		return recoverableKafkaDispatcherRestartBackoffMax
-	}
-	return backoff
+	return min(backoff, recoverableKafkaDispatcherRestartBackoffMax)
 }

39-54: Initialize recoverableKafkaRestarts.dispatchers in the Maintainer constructor to remove implicit lock requirement.

getRecoverableKafkaDispatcherRestartState performs lazy initialization of m.recoverableKafkaRestarts.dispatchers without acquiring the lock itself, relying on callers to hold it. While both current callers (getRecoverableKafkaDispatcherRestartDecision and recordRecoverableKafkaDispatcherRestart) do lock before calling, this implicit lock requirement is fragile and easy to miss in future changes. Initialize the map in NewMaintainer instead to eliminate the lazy check and make the safety requirement explicit.

downstreamadapter/sink/kafka/sink.go (1)

120-130: Local interface duplicates recoverable.ErrorEventChanSetter.

The recoverableErrorChanSetter interface defined here (lines 120-122) has the exact same signature as recoverable.ErrorEventChanSetter in pkg/sink/recoverable/error_event.go. Use the imported interface directly to avoid duplication.

Suggested fix
-type recoverableErrorChanSetter interface {
-	SetRecoverableErrorChan(ch chan<- *recoverable.ErrorEvent)
-}
-
 func (s *sink) SetRecoverableErrorChan(ch chan<- *recoverable.ErrorEvent) {
-	setter, ok := s.dmlProducer.(recoverableErrorChanSetter)
+	setter, ok := s.dmlProducer.(recoverable.ErrorEventChanSetter)
 	if !ok {
 		return
 	}
 	setter.SetRecoverableErrorChan(ch)
 }
maintainer/maintainer_recoverable_kafka_error_test.go (1)

19-76: Test function names use underscores, which conflicts with Go naming guideline.

Functions like TestRecoverDispatcherRequest_RestartDispatchers use underscores. While this is a common Go test convention for readability, the project coding guidelines state to use camelCase without underscores for function names. Consider renaming, e.g., TestRecoverDispatcherRequestRestartDispatchers.

As per coding guidelines, "Use camelCase for function names and do not include underscores (e.g., getPartitionNum, not get_partition_num) in Go".

downstreamadapter/dispatchermanager/heartbeat_queue.go (2)

109-111: Blocking Enqueue may stall dispatcher recovery producers if the channel is full.

Enqueue performs an unconditional blocking send. If the consumer (HeartbeatCollector) is slow or stuck, any goroutine calling Enqueue on the recovery path will block silently. The existing HeartbeatRequestQueue has the same pattern, but since this queue is specifically for error-recovery, a stall here could delay or prevent recovery of other dispatchers too.

Consider a non-blocking send with a log/metric on drop, or at minimum a select with ctx.Done() so the caller isn't stuck indefinitely:

♻️ Optional: context-aware enqueue
-func (q *RecoverDispatcherRequestQueue) Enqueue(request *RecoverDispatcherRequestWithTargetID) {
-	q.queue <- request
-}
+func (q *RecoverDispatcherRequestQueue) Enqueue(ctx context.Context, request *RecoverDispatcherRequestWithTargetID) error {
+	select {
+	case q.queue <- request:
+		return nil
+	case <-ctx.Done():
+		return ctx.Err()
+	}
+}

109-120: No observability on queue depth — consider adding a metric like BlockStatusRequestQueue does.

BlockStatusRequestQueue.Enqueue/Dequeue (lines 73-86) report queue length via a gauge. This new queue lacks equivalent instrumentation, which would be useful for monitoring recovery-request backlog in production.

@ti-chi-bot
Copy link

ti-chi-bot bot commented Feb 12, 2026

[FORMAT CHECKER NOTIFICATION]

Notice: To remove the do-not-merge/needs-linked-issue label, please provide the linked issue number on one line in the PR body, for example: Issue Number: close #123 or Issue Number: ref #456.

📖 For more info, you can check the "Contribute Code" section in the development guide.

@ti-chi-bot
Copy link

ti-chi-bot bot commented Feb 12, 2026

@3AceShowHand: The following test failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
pull-error-log-review 19028a5 link true /test pull-error-log-review

Full PR test history. Your PR dashboard.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

do-not-merge/needs-linked-issue release-note Denotes a PR that will be considered when it comes time to generate release notes. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant