-
Notifications
You must be signed in to change notification settings - Fork 38
kafka: prevent CPU/memory runaway when topic missing #4087
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
kafka: prevent CPU/memory runaway when topic missing #4087
Conversation
If sink.Run exits with error, close DispatcherManager to stop dispatchers and release sink resources (e.g. sarama) instead of allowing unbounded buffering.
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
Summary of ChangesHello @wlwilliamx, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the stability of TiCDC's Kafka sink by introducing robust error handling. Previously, an unexpectedly failing Kafka sink could lead to severe resource exhaustion, including out-of-memory errors and CPU spikes, as the system continued to buffer events. The changes ensure that upon detecting a non-recoverable sink error, the DispatcherManager gracefully shuts down, preventing further event buffering and immediately reporting the issue, thus safeguarding the system's overall health and performance. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
📝 WalkthroughWalkthroughIntroduces centralized sink exit handling in the dispatcher manager: runs sinks via a new handler that propagates errors, reports visible maintainer messages via the heartbeat queue on non-cancellation errors, and triggers proactive manager shutdown on unexpected or error exits. Changes
Sequence DiagramsequenceDiagram
participant DM as DispatcherManager
participant Sink as Sink
participant HB as HeartbeatQueue
participant Maint as Maintainer
DM->>Sink: runSinkWithExitHandling(ctx)
activate Sink
Sink->>Sink: Run(ctx)
Sink-->>DM: returns (nil / error)
deactivate Sink
alt ctx cancelled
DM->>DM: return (normal shutdown)
else error returned
DM->>HB: reportErrorToMaintainer(details)
HB->>Maint: enqueue/notify (time, node, code, message)
DM->>DM: close manager
else unexpected nil return
DM->>DM: log unexpected exit
DM->>DM: close manager
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request effectively addresses a potential OOM issue by ensuring the DispatcherManager closes proactively when a sink exits unexpectedly. The introduction of runSinkWithExitHandling and reportErrorToMaintainer is a solid approach to make the error handling more robust and prevent the loss of critical error information during shutdown. The new unit tests also provide good coverage for the added functionality. I have a few suggestions to further refine the implementation by removing a redundant error handling call, using a standardized timestamp format for better machine readability, and ensuring all unexpected sink exits are consistently reported as errors.
| } else { | ||
| log.Error("sink exited without error", zap.Stringer("changefeedID", e.changefeedID)) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the sink exits unexpectedly without an error, the code logs an error but doesn't report it to the maintainer. This is inconsistent with the case where the sink exits with an error. An unexpected exit, even without an error, is a failure condition for the changefeed that should be reported for better observability. I suggest creating and reporting an error in this case as well.
} else {
err := errors.ErrUnexpected.GenWithStack("sink exited without error")
log.Error("sink exited unexpectedly without returning an error",
zap.Stringer("changefeedID", e.changefeedID),
zap.Error(err))
e.reportErrorToMaintainer(err)
}| // manager proactively. | ||
| func (e *DispatcherManager) runSinkWithExitHandling(ctx context.Context) { | ||
| err := e.sink.Run(ctx) | ||
| e.handleError(ctx, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The call to e.handleError(ctx, err) is redundant given that e.reportErrorToMaintainer(err) is called below to handle the same error. The comment for reportErrorToMaintainer correctly notes that it's a pre-emptive report to avoid losing the error due to a race with context cancellation, which is a risk with the handleError -> collectErrors path. Removing this line will simplify the logic and prevent potential double-reporting of the error.
| var message heartbeatpb.HeartBeatRequest | ||
| message.ChangefeedID = e.changefeedID.ToPB() | ||
| message.Err = &heartbeatpb.RunningError{ | ||
| Time: time.Now().String(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using time.Now().String() for timestamps is not ideal as the format is not standardized and can be difficult for other systems to parse. It's better to use a standard format like time.RFC3339Nano. Using UTC is also a best practice to avoid timezone-related issues.
Time: time.Now().UTC().Format(time.RFC3339Nano),|
AI suggest to fix it in this way: |
|
@wlwilliamx: The following test failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
What problem does this PR solve?
Issue Number: close #4064
What is changed and how it works?
When the downstream Kafka topic is missing (and
auto-create-topicis disabled), changefeed staying inwarning/ retrying is expected. However, TiCDC could still hit CPU/memory runaway and OOM.This PR fixes the runaway from two angles:
Close DispatcherManager when the sink goroutine exits unexpectedly
runSinkWithExitHandling.sink.Run(ctx)returns with a non-cancel error, report the error to maintainer and close the DispatcherManager proactively, so dispatchers stop pulling/enqueuing events instead of buffering into unbounded channels.Release sarama admin resources reliably on close
saramaAdminClient.Close()now closes the underlyingsarama.Clientin addition toClusterAdmin, which stops sarama background goroutines (metadata updater, broker receivers) and releases associated metrics/metadata caches.NewClusterAdminFromClientfailure to avoid leaking on error paths.Check List
Tests
Questions
Will it cause performance regression or break compatibility?
No. The new close path only triggers when the sink exits unexpectedly. The admin close change only makes resource cleanup more complete.
Do you need to update user documentation, design documentation or monitoring documentation?
No.
Release note