Skip to content

Conversation

@wlwilliamx
Copy link
Collaborator

@wlwilliamx wlwilliamx commented Jan 29, 2026

What problem does this PR solve?

Issue Number: close #4064

What is changed and how it works?

When the downstream Kafka topic is missing (and auto-create-topic is disabled), changefeed staying in warning / retrying is expected. However, TiCDC could still hit CPU/memory runaway and OOM.

This PR fixes the runaway from two angles:

  1. Close DispatcherManager when the sink goroutine exits unexpectedly

    • Run the sink via runSinkWithExitHandling.
    • If sink.Run(ctx) returns with a non-cancel error, report the error to maintainer and close the DispatcherManager proactively, so dispatchers stop pulling/enqueuing events instead of buffering into unbounded channels.
  2. Release sarama admin resources reliably on close

    • saramaAdminClient.Close() now closes the underlying sarama.Client in addition to ClusterAdmin, which stops sarama background goroutines (metadata updater, broker receivers) and releases associated metrics/metadata caches.
    • Close the sarama client on NewClusterAdminFromClient failure to avoid leaking on error paths.

Check List

Tests

  • Unit test
  • Manual test

Questions

Will it cause performance regression or break compatibility?

No. The new close path only triggers when the sink exits unexpectedly. The admin close change only makes resource cleanup more complete.

Do you need to update user documentation, design documentation or monitoring documentation?

No.

Release note

Fix TiCDC Kafka sink CPU/memory runaway when downstream topic is missing by stopping buffering on sink exit and properly closing sarama admin clients.

If sink.Run exits with error, close DispatcherManager to stop dispatchers and release sink resources (e.g. sarama) instead of allowing unbounded buffering.
@ti-chi-bot ti-chi-bot bot added the release-note Denotes a PR that will be considered when it comes time to generate release notes. label Jan 29, 2026
@ti-chi-bot
Copy link

ti-chi-bot bot commented Jan 29, 2026

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign lidezhu for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@ti-chi-bot ti-chi-bot bot added the size/L Denotes a PR that changes 100-499 lines, ignoring generated files. label Jan 29, 2026
@gemini-code-assist
Copy link

Summary of Changes

Hello @wlwilliamx, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the stability of TiCDC's Kafka sink by introducing robust error handling. Previously, an unexpectedly failing Kafka sink could lead to severe resource exhaustion, including out-of-memory errors and CPU spikes, as the system continued to buffer events. The changes ensure that upon detecting a non-recoverable sink error, the DispatcherManager gracefully shuts down, preventing further event buffering and immediately reporting the issue, thus safeguarding the system's overall health and performance.

Highlights

  • Proactive Sink Error Handling: The DispatcherManager now proactively closes itself when the underlying Kafka sink exits with a non-cancellation error (e.g., missing topic, authentication failure). This prevents continued buffering of events into a failed sink, which could lead to unbounded memory growth and Out-Of-Memory (OOM) errors.
  • Immediate Error Reporting: A new mechanism ensures that critical sink errors are reported to the maintainer immediately upon detection, even if the manager's context is subsequently canceled during shutdown. This prevents important error messages from being lost.
  • New Unit Test for Error Scenarios: A new unit test, TestDispatcherManagerCloseOnSinkError, has been added to validate the correct behavior of the DispatcherManager when the sink encounters an unexpected error and triggers a proactive shutdown.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@coderabbitai
Copy link

coderabbitai bot commented Jan 29, 2026

📝 Walkthrough

Walkthrough

Introduces centralized sink exit handling in the dispatcher manager: runs sinks via a new handler that propagates errors, reports visible maintainer messages via the heartbeat queue on non-cancellation errors, and triggers proactive manager shutdown on unexpected or error exits.

Changes

Cohort / File(s) Summary
Dispatcher exit handling
downstreamadapter/dispatchermanager/dispatcher_manager.go
Added runSinkWithExitHandling to run sinks with centralized exit/error handling, and reportErrorToMaintainer to enqueue maintainer-visible heartbeat messages. Differentiates context cancellation, explicit errors, and unexpected exits; triggers manager close on error/unexpected exit.
Dispatcher tests
downstreamadapter/dispatchermanager/dispatcher_manager_test.go
Added errorSink test double and TestDispatcherManagerCloseOnSinkError to assert manager closes when sink returns a non-cancellation error.
Goroutine leak ignores
pkg/leakutil/leak_helper.go
Added goleak ignore rules for two dbus inWorker functions and two IBM Sarama background goroutines.
Kafka admin abstractions & cleanup
pkg/sink/kafka/admin.go, pkg/sink/kafka/sarama_factory.go, pkg/sink/kafka/admin_test.go
Replaced concrete Sarama types with private interfaces (saramaClient, saramaClusterAdmin), improved Close() order and nil-safety (close admin then client), ensured client is closed on admin creation failure, and added unit tests verifying Close semantics and nil tolerance.

Sequence Diagram

sequenceDiagram
    participant DM as DispatcherManager
    participant Sink as Sink
    participant HB as HeartbeatQueue
    participant Maint as Maintainer

    DM->>Sink: runSinkWithExitHandling(ctx)
    activate Sink
    Sink->>Sink: Run(ctx)
    Sink-->>DM: returns (nil / error)
    deactivate Sink

    alt ctx cancelled
        DM->>DM: return (normal shutdown)
    else error returned
        DM->>HB: reportErrorToMaintainer(details)
        HB->>Maint: enqueue/notify (time, node, code, message)
        DM->>DM: close manager
    else unexpected nil return
        DM->>DM: log unexpected exit
        DM->>DM: close manager
    end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Poem

🐰 I hopped in quick, with whiskers bright,

When sinks misstep in dead of night.
I beeped the heartbeat, sent a clue,
Closed the gate so buffers flew;
A tidy patch — a carrot chew.

🚥 Pre-merge checks | ✅ 4 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 37.50% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Linked Issues check ✅ Passed The PR addresses issue #4064 by implementing proactive DispatcherManager closure on sink errors and error reporting to maintainer, matching the issue's objective of preventing resource runaway.
Out of Scope Changes check ✅ Passed All changes are scope-aligned: dispatcher manager sink handling, error reporting, admin client resource cleanup, and test coverage directly address issue #4064's root cause of unbounded buffering and resource exhaustion.
Title check ✅ Passed The title 'kafka: prevent CPU/memory runaway when topic missing' is clear, specific, and accurately describes the main problem being addressed in the changeset.
Description check ✅ Passed The PR description includes the required template sections: Issue Number, What is changed and how it works, Tests, Questions, and Release note. All major sections are completed with substantive content.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request effectively addresses a potential OOM issue by ensuring the DispatcherManager closes proactively when a sink exits unexpectedly. The introduction of runSinkWithExitHandling and reportErrorToMaintainer is a solid approach to make the error handling more robust and prevent the loss of critical error information during shutdown. The new unit tests also provide good coverage for the added functionality. I have a few suggestions to further refine the implementation by removing a redundant error handling call, using a standardized timestamp format for better machine readability, and ensuring all unexpected sink exits are consistently reported as errors.

Comment on lines +524 to +526
} else {
log.Error("sink exited without error", zap.Stringer("changefeedID", e.changefeedID))
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

When the sink exits unexpectedly without an error, the code logs an error but doesn't report it to the maintainer. This is inconsistent with the case where the sink exits with an error. An unexpected exit, even without an error, is a failure condition for the changefeed that should be reported for better observability. I suggest creating and reporting an error in this case as well.

} else {
		err := errors.ErrUnexpected.GenWithStack("sink exited without error")
		log.Error("sink exited unexpectedly without returning an error",
			zap.Stringer("changefeedID", e.changefeedID),
			zap.Error(err))
		e.reportErrorToMaintainer(err)
	}

// manager proactively.
func (e *DispatcherManager) runSinkWithExitHandling(ctx context.Context) {
err := e.sink.Run(ctx)
e.handleError(ctx, err)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The call to e.handleError(ctx, err) is redundant given that e.reportErrorToMaintainer(err) is called below to handle the same error. The comment for reportErrorToMaintainer correctly notes that it's a pre-emptive report to avoid losing the error due to a race with context cancellation, which is a risk with the handleError -> collectErrors path. Removing this line will simplify the logic and prevent potential double-reporting of the error.

var message heartbeatpb.HeartBeatRequest
message.ChangefeedID = e.changefeedID.ToPB()
message.Err = &heartbeatpb.RunningError{
Time: time.Now().String(),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using time.Now().String() for timestamps is not ideal as the format is not standardized and can be difficult for other systems to parse. It's better to use a standard format like time.RFC3339Nano. Using UTC is also a best practice to avoid timezone-related issues.

		Time:    time.Now().UTC().Format(time.RFC3339Nano),

@lidezhu
Copy link
Collaborator

lidezhu commented Feb 1, 2026

AI suggest to fix it in this way:

• Edited pkg/sink/kafka/admin.go (+15 -5)
    174  func (a *saramaAdminClient) Close() {
    175 -	if err := a.admin.Close(); err != nil {
    176 -		log.Warn("close admin client meet error",
    177 -			zap.String("keyspace", a.changefeed.Keyspace()),
    178 -			zap.String("changefeed", a.changefeed.Name()),
    179 -			zap.Error(err))
    175 +	if a.admin != nil {
    176 +		if err := a.admin.Close(); err != nil {
    177 +			log.Warn("close admin client meet error",
    178 +				zap.String("keyspace", a.changefeed.Keyspace()),
    179 +				zap.String("changefeed", a.changefeed.Name()),
    180 +				zap.Error(err))
    181 +		}
    182 +	}
    183 +	if a.client != nil {
    184 +		if err := a.client.Close(); err != nil {
    185 +			log.Warn("close admin client sarama client meet error",
    186 +				zap.String("keyspace", a.changefeed.Keyspace()),
    187 +				zap.String("changefeed", a.changefeed.Name()),
    188 +				zap.Error(err))
    189 +		}
    190  	}
    ```

@ti-chi-bot
Copy link

ti-chi-bot bot commented Feb 12, 2026

@wlwilliamx: The following test failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
pull-error-log-review 6afd7d8 link true /test pull-error-log-review

Full PR test history. Your PR dashboard.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

@wlwilliamx wlwilliamx changed the title dispatchermanager: close on sink exit to avoid OOM kafka: prevent CPU/memory runaway when topic missing Feb 12, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

release-note Denotes a PR that will be considered when it comes time to generate release notes. size/L Denotes a PR that changes 100-499 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

TiCDC Kafka sink: missing topic keeps changefeed in warning (expected) but pegs CPU/memory

2 participants