Skip to content

Conversation

@3AceShowHand
Copy link
Collaborator

@3AceShowHand 3AceShowHand commented Feb 7, 2026

Summary

  • change kafka sink default client from sarama to franz in NewOptions
  • update factory selector to treat empty client setting as franz
  • add unit test to assert default kafka client is franz

Test Plan

  • GOCACHE=/tmp/go-build-cache GOTMPDIR=/tmp/gotmp go test ./pkg/sink/kafka -run TestNewOptionsDefaultKafkaClient -count=1
  • GOCACHE=/tmp/go-build-cache GOTMPDIR=/tmp/gotmp go test ./pkg/sink/kafka -count=1
  • GOCACHE=/tmp/go-build-cache GOTMPDIR=/tmp/gotmp go test ./downstreamadapter/sink/kafka -run 'TestDMLProducerHeartbeat|TestDDLProducerHeartbeat' -count=1

Summary by CodeRabbit

  • New Features

    • Franz-go Kafka client selectable via kafka-client (default: franz).
    • Kerberos (GSSAPI) SASL support for Kafka authentication.
    • New Prometheus metrics: producer request latency, compression ratio, and records-per-request.
    • Improved event-context error annotations for clearer logs.
  • Tests

    • Added unit/integration tests covering Franz factory, admin client, producers, SASL, and metrics.
  • Chores

    • Updated dependencies, including franz-go and Go modules.

@ti-chi-bot
Copy link

ti-chi-bot bot commented Feb 7, 2026

Adding the "do-not-merge/release-note-label-needed" label because no release-note block was detected, please follow our release note process to remove it.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.

@ti-chi-bot ti-chi-bot bot added do-not-merge/needs-linked-issue do-not-merge/release-note-label-needed Indicates that a PR should not merge because it's missing one of the release note labels. labels Feb 7, 2026
@ti-chi-bot
Copy link

ti-chi-bot bot commented Feb 7, 2026

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign wlwilliamx for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@ti-chi-bot ti-chi-bot bot added the size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. label Feb 7, 2026
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 7, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds Franz-go as an alternative Kafka client to the sink: factory selection, full Franz-based admin/producer implementations, SASL/GSSAPI support, Prometheus metrics integration, config/url option to choose client, adapter layer to reuse downstream logic, and accompanying tests and go.mod updates.

Changes

Cohort / File(s) Summary
Design & Configuration
docs/design/2026-01-09-franz-go-kafka-sink.md, pkg/sink/kafka/options.go, pkg/sink/kafka/factory_selector.go
Design doc and option additions for kafka-client= URL param (default "franz"); option parsing/validation and factory selector to choose Franz or Sarama.
Franz-go Core Implementation
pkg/sink/kafka/franz/factory.go, pkg/sink/kafka/franz/admin_client.go, pkg/sink/kafka/franz/sync_producer.go, pkg/sink/kafka/franz/async_producer.go
New franz package: Options struct, client builders, AdminClient, SyncProducer, AsyncProducer with kgo/kadm integration and per-call timeout handling.
Franz-go SASL & GSSAPI
pkg/sink/kafka/franz/gssapi.go, pkg/sink/kafka/franz/sasl_test.go
GSSAPI/Kerberos SASL mechanism implementation (gokrb5 integration) and unit tests for user/keytab flows.
Metrics Integration
pkg/sink/kafka/franz/metrics_hook.go, pkg/sink/kafka/metrics.go, pkg/sink/kafka/franz_factory_test.go
MetricsHook implementing kgo.Hook, Prometheus metric collectors (request latency, compression ratio, records per request, etc.), binding/cleanup logic and tests.
Adapter & Factory Wiring
pkg/sink/kafka/franz_admin_client.go, pkg/sink/kafka/franz_factory.go, downstreamadapter/sink/kafka/helper.go
Adapter wrapping franz admin to local ClusterAdminClient; franzFactory producing admin/producers and exposing metrics; helper switched to use factory selector.
Logging & Error Context
pkg/sink/kafka/internal/logutil/logutil.go, pkg/sink/kafka/logutil.go, pkg/sink/kafka/logutil_test.go
Refactored event-type detection and error annotation into internal logutil with BuildEventLogContext/AnnotateEventError; callers updated and tests adjusted.
Tests & CI tweaks
pkg/sink/kafka/franz_admin_client_test.go, pkg/sink/kafka/franz/factory_api_test.go, pkg/sink/kafka/franz/admin_client_test.go, pkg/sink/kafka/franz/sasl_test.go, pkg/sink/kafka/options_test.go, pkg/sink/kafka/main_test.go
New and updated tests covering factory, admin adapter, producer options, SASL, default option, and leak test adjustments.
Dependencies
go.mod
Added/updated dependencies: github.com/twmb/franz-go (+kadm/kmsg), github.com/jcmturner/gokrb5/v8, compression libs and multiple golang.org/x/* upgrades.
Errors
pkg/errors/error.go, pkg/errors/helper.go
Added ErrKafkaControllerNotAvailable and marked it unretryable.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Suggested labels

lgtm, approved, release-note

Suggested reviewers

  • bufferflies
  • lidezhu
  • wk989898

Poem

🐰
I hopped through code, found Franz in the glen,
Two clients now dance where one walked—then when,
Metrics hum softly, GSSAPI keeps the den,
Factories pick paths again and again,
Thumper-approved: ship it, my friends! 🥕✨

🚥 Pre-merge checks | ✅ 1 | ❌ 2
❌ Failed checks (2 warnings)
Check name Status Explanation Resolution
Description check ⚠️ Warning The pull request description lacks critical sections required by the template. It is missing the formal 'Issue Number' reference, omits the 'Will it cause performance regression or break compatibility?' section, and provides no release notes. Add Issue Number with close/ref reference, address the compatibility question explicitly, and provide a release note following the style guide or mark as 'None' if not applicable.
Docstring Coverage ⚠️ Warning Docstring coverage is 16.13% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (1 passed)
Check name Status Explanation
Title check ✅ Passed The pull request title clearly and concisely summarizes the main change: making franz the default Kafka sink client instead of sarama.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Tip

Issue Planner is now in beta. Read the docs and try it out! Share your feedback on Discord.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@gemini-code-assist
Copy link

Summary of Changes

Hello @3AceShowHand, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly refactors the Kafka sink client by making franz-go the new default, replacing Sarama. The change introduces a complete franz-go client implementation for administrative tasks, asynchronous, and synchronous message production, aiming to enhance performance and stability. It ensures backward compatibility by allowing explicit selection of Sarama and provides a robust framework for future optimizations and gradual rollout.

Highlights

  • Default Kafka Client Change: The default Kafka sink client has been switched from Sarama to franz-go in NewOptions.
  • New Kafka Client Implementation: A new franz-go based Kafka client implementation has been introduced, including franzAdminClient, franzAsyncProducer, and franzSyncProducer.
  • Factory Selector Update: The Kafka factory selector now treats an empty client setting as franz-go, making it the new default.
  • Design Document Added: A detailed design document (docs/design/2026-01-09-franz-go-kafka-sink.md) outlining the architecture, components, configuration, and rollout plan for the franz-go Kafka sink has been added.
  • Dependency Updates: Several Go module dependencies have been updated, and new indirect dependencies for franz-go have been added.
  • GSSAPI SASL Support: GSSAPI SASL mechanism support has been implemented for the franz-go client, handling both user and keytab authentication.
  • Metrics Collection: A new metrics collector (franzMetricsCollector) has been added to gather and expose Kafka client metrics from franz-go.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • docs/design/2026-01-09-franz-go-kafka-sink.md
    • Added a comprehensive design document detailing the introduction and implementation of franz-go as a Kafka sink client, covering architecture, components, configuration mapping, producer behavior, error handling, resource management, performance, testing, observability, and rollout plan.
  • downstreamadapter/sink/kafka/helper.go
    • Changed the default Kafka factory from kafka.NewSaramaFactory to kafka.NewFactory.
  • go.mod
    • Updated github.com/klauspost/compress from v1.18.0 to v1.18.2.
    • Updated github.com/pierrec/lz4/v4 from v4.1.18 to v4.1.22.
    • Updated golang.org/x/net from v0.43.0 to v0.47.0.
    • Updated golang.org/x/sync from v0.17.0 to v0.18.0.
    • Updated golang.org/x/sys from v0.35.0 to v0.38.0.
    • Updated golang.org/x/term from v0.34.0 to v0.37.0.
    • Updated golang.org/x/text from v0.29.0 to v0.31.0.
    • Updated golang.org/x/crypto from v0.41.0 to v0.45.0.
    • Updated golang.org/x/mod from v0.27.0 to v0.29.0.
    • Updated golang.org/x/tools from v0.36.0 to v0.38.0.
    • Added new indirect dependencies for franz-go: github.com/twmb/franz-go v1.20.6, github.com/twmb/franz-go/pkg/kadm v1.17.1, github.com/twmb/franz-go/pkg/kmsg v1.12.0.
  • go.sum
    • Updated checksums for the changed and added Go modules.
  • pkg/sink/kafka/factory_selector.go
    • Added NewFactory function to select between franz-go and Sarama implementations based on the KafkaClient option, defaulting to franz-go.
  • pkg/sink/kafka/franz_admin_client.go
    • Added franzAdminClient implementation for Kafka admin operations using franz-go, including broker and topic metadata retrieval, config fetching, and topic creation.
  • pkg/sink/kafka/franz_async_producer.go
    • Added franzAsyncProducer implementation for asynchronous message sending using franz-go, including error handling and callbacks.
  • pkg/sink/kafka/franz_factory.go
    • Added franzFactory to construct franz-go based Kafka clients (Admin, Sync, Async Producers) and handle options, TLS, SASL, and compression.
  • pkg/sink/kafka/franz_factory_test.go
    • Added unit tests for franzFactory, specifically for GSSAPI SASL mechanism building and metrics collector behavior.
  • pkg/sink/kafka/franz_gssapi.go
    • Added GSSAPI SASL mechanism implementation for franz-go, supporting user and keytab authentication.
  • pkg/sink/kafka/franz_metrics_collector.go
    • Added franzMetricsHook and franzMetricsCollector to gather and expose Kafka client metrics from franz-go.
  • pkg/sink/kafka/franz_sync_producer.go
    • Added franzSyncProducer implementation for synchronous message sending using franz-go, including retry logic and error handling.
  • pkg/sink/kafka/options.go
    • Added KafkaClient field to urlConfig and options structs.
    • Set the default value of KafkaClient to "franz" in NewOptions.
    • Added logic to parse and validate the kafka-client URL parameter.
  • pkg/sink/kafka/options_test.go
    • Added a unit test TestNewOptionsDefaultKafkaClient to verify that the default Kafka client is "franz".
Activity
  • The pull request introduces a new Kafka client implementation based on franz-go.
  • The default Kafka client for the sink has been changed to franz-go.
  • A detailed design document has been added to explain the new franz-go integration.
  • New franz-go specific implementations for admin, asynchronous, and synchronous producers have been added.
  • The go.mod and go.sum files have been updated to include franz-go and other dependency upgrades.
  • Unit tests have been added to validate the new franz-go client's behavior, including SASL GSSAPI and metrics collection.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@3AceShowHand
Copy link
Collaborator Author

/test all

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request makes franz-go the default Kafka sink client, introducing a new implementation for producers and the admin client. However, it introduces potential security vulnerabilities related to SSRF and Path Traversal, specifically with user-provided URLs for OAuth2 token endpoints and file paths for Kerberos configurations lacking proper validation. Additionally, there are a few code-related issues to address: a resource leak in the admin client's Close method, a non-blocking Close in the async producer which should be blocking, and the use of incorrect timeout values in producer configurations. Addressing these points will improve both the security and robustness of the new implementation.

Comment on lines 231 to 243
tokenURL, err := url.Parse(o.SASL.OAuth2.TokenURL)
if err != nil {
return nil, errors.Trace(err)
}

cfg := &clientcredentials.Config{
ClientID: o.SASL.OAuth2.ClientID,
ClientSecret: o.SASL.OAuth2.ClientSecret,
TokenURL: tokenURL.String(),
EndpointParams: endpointParams,
Scopes: o.SASL.OAuth2.Scopes,
}
return cfg.TokenSource(ctx), nil

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

The buildFranzOauthTokenSource function uses the user-provided o.SASL.OAuth2.TokenURL to create an OAuth2 token source. This token source will make HTTP POST requests to the provided URL when fetching tokens. Since there is no validation of the URL, an attacker who can create or modify a changefeed can provide an internal URL (e.g., pointing to a cloud metadata service or internal API), leading to Server-Side Request Forgery (SSRF). It is recommended to validate the TokenURL against an allow-list of trusted domains or ensure it does not point to internal IP addresses.

Comment on lines +218 to +226
cfg, err := krb5config.Load(g.KerberosConfigPath)
if err != nil {
return nil, errors.Trace(err)
}

var client *krb5client.Client
switch g.AuthType {
case security.KeyTabAuth:
kt, err := keytab.Load(g.KeyTabPath)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

The newFranzKerberosClient function uses g.KerberosConfigPath and g.KeyTabPath directly in krb5config.Load and keytab.Load. These paths are provided by the user via the sink-uri or changefeed configuration. An attacker can provide absolute paths to sensitive files on the server (e.g., /etc/passwd or internal configuration files), leading to arbitrary file access. While these functions are intended to load Kerberos configurations, they do not restrict the file paths to a safe directory. It is recommended to validate that the provided paths are within an expected directory or restrict the use of absolute paths and parent directory references (..).

Comment on lines 277 to 281
func (a *franzAdminClient) Close() {
if a.admin != nil {
a.admin.Close()
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The kadm.Client.Close() method is a no-op. The underlying kgo.Client (a.client) must be closed to release resources. The current implementation will lead to a resource leak as the kgo.Client is never closed.

Suggested change
func (a *franzAdminClient) Close() {
if a.admin != nil {
a.admin.Close()
}
}
func (a *franzAdminClient) Close() {
if a.client != nil {
a.client.Close()
}
}

Comment on lines 268 to 271
produceTimeout := o.ReadTimeout
if produceTimeout < 100*time.Millisecond {
produceTimeout = 10 * time.Second
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The produceTimeout is being set from o.ReadTimeout. For consistency with the Sarama implementation (saramaConfig.Producer.Timeout), this should be based on o.WriteTimeout, which is more relevant for a producer's request timeout.

Suggested change
produceTimeout := o.ReadTimeout
if produceTimeout < 100*time.Millisecond {
produceTimeout = 10 * time.Second
}
produceTimeout := o.WriteTimeout
if produceTimeout <= 0 {
// This is a fail-safe, as the default is 10s.
produceTimeout = 10 * time.Second
}

Comment on lines 60 to 63
produceTimeout := o.ReadTimeout
if produceTimeout <= 0 {
produceTimeout = 10 * time.Second
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The produceTimeout is calculated based on o.ReadTimeout. For a producer, o.WriteTimeout is the more appropriate configuration to use, which is also consistent with how the Sarama producer is configured.

Suggested change
produceTimeout := o.ReadTimeout
if produceTimeout <= 0 {
produceTimeout = 10 * time.Second
}
produceTimeout := o.WriteTimeout
if produceTimeout <= 0 {
produceTimeout = 10 * time.Second
}

Comment on lines 68 to 81
func (p *franzAsyncProducer) Close() {
if !p.closed.CompareAndSwap(false, true) {
return
}

go func() {
start := time.Now()
p.client.Close()
log.Info("Close kafka async producer success",
zap.String("keyspace", p.changefeedID.Keyspace()),
zap.String("changefeed", p.changefeedID.Name()),
zap.Duration("duration", time.Since(start)))
}()
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Closing the client in a new goroutine makes the Close method non-blocking. However, kgo.Client.Close() is a blocking call that waits for buffered messages to be sent. To prevent data loss on shutdown, this Close method should be blocking, consistent with the saramaAsyncProducer implementation and the expectation of its callers.

func (p *franzAsyncProducer) Close() {
	if !p.closed.CompareAndSwap(false, true) {
		return
	}

	start := time.Now()
	p.client.Close()
	log.Info("Close kafka async producer success",
		zap.String("keyspace", p.changefeedID.Keyspace()),
		zap.String("changefeed", p.changefeedID.Name()),
		zap.Duration("duration", time.Since(start)))
}

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
go.mod (1)

3-3: ⚠️ Potential issue | 🟡 Minor

Update Go version to the latest 1.25 patch release (1.25.7).

The go.mod specifies go 1.25.5, but Go 1.25.7 was released on February 4, 2026, and is now the latest stable patch in the 1.25 series. Consider updating to go 1.25.7 to pick up the latest bug fixes and security patches.

🤖 Fix all issues with AI agents
In `@docs/design/2026-01-09-franz-go-kafka-sink.md`:
- Around line 117-119: The doc's stated default for the kafka-client flag is out
of sync with code: update the design doc text that currently reads
"kafka-client=sarama|franz(默认 sarama)" to reflect the actual default used by
NewOptions() in options.go and the behavior in factory_selector.go (which treat
"" as "franz"), i.e., change the default to franz or clearly document that franz
is the default and sarama must be opted into via kafka-client=sarama.

In `@go.mod`:
- Around line 44-47: go.mod currently pins Go 1.25.5 which is vulnerable to
CVE-2025-61728; update the Go toolchain version to 1.25.6 in go.mod and run `go
mod tidy`/`go get` to regenerate the lock info, and also bump the listed module
versions: update github.com/klauspost/compress to v1.18.3, pierrec/lz4/v4 to
v4.1.25, golang.org/x/sync to v0.19.0, golang.org/x/sys to v0.40.0, and
golang.org/x/net to v0.49.0 (leave twmb/franz-go, pkg/kadm, pkg/kmsg as-is);
ensure after changes you run the test suite and CI to verify builds.

In `@pkg/sink/kafka/franz_async_producer.go`:
- Around line 68-81: The Close() currently returns without closing p.client when
p.closed was already set by AsyncRunCallback; update the lifecycle so
client.Close() is always invoked exactly once regardless of p.closed state—for
example add a dedicated sync.Once (e.g., producerCloseOnce) or separate flags so
that franzAsyncProducer.Close() always calls p.client.Close() (or triggers the
once) even when AsyncRunCallback sets p.closed; ensure AsyncRunCallback no
longer assumes responsibility for closing p.client and instead calls the same
sync.Once/closing helper used by Close(), referencing franzAsyncProducer.Close,
AsyncRunCallback, p.closed, p.client.Close, and the new sync.Once field to
guarantee no leak.

In `@pkg/sink/kafka/franz_factory_test.go`:
- Line 82: The test uses a misspelled constant common.DefaultKeyspaceNamme
(double 'm') when constructing changefeedID in the changefeedID :=
common.NewChangefeedID4Test(...) line; replace it with the correctly named
constant common.DefaultKeyspaceName to match other tests and avoid compile
errors, and run the repository-wide search for DefaultKeyspaceNam* to confirm
there are no other misspelled occurrences.

In `@pkg/sink/kafka/franz_factory.go`:
- Around line 268-271: The produceTimeout is being set from o.ReadTimeout but
since this value controls the producer (write) request timeout, update the
assignment to use o.WriteTimeout instead of o.ReadTimeout; locate the
produceTimeout variable initialization in franz_factory.go (the block that
currently reads produceTimeout := o.ReadTimeout and the subsequent min check)
and change it to derive from o.WriteTimeout, preserving the same minimum/default
fallback logic.
- Around line 155-177: The code in buildFranzTLSConfig currently sets ALPN
NextProtos to {"h2","http/1.1"} which is inappropriate for Kafka; update
buildFranzTLSConfig to stop setting NextProtos by default and avoid overwriting
credentialTlsConfig.NextProtos — remove the NextProtos fields at lines where
tlsConfig is constructed and where credentialTlsConfig is applied (refer to the
local variable tlsConfig, the parameter o and credentialTlsConfig returned by
o.Credential.ToTLSConfig()), while preserving MinVersion enforcement and
InsecureSkipVerify copying.
- Around line 69-75: In AdminClient (franzFactory.AdminClient) the error is
wrapped with errors.ErrKafkaNewProducer which is the wrong operation; change the
error handling to use errors.Trace(err) (same pattern as used in
NewFranzFactory) so the original error is preserved and not mislabelled as a
"new producer" failure; locate newFranzAdminClient call in AdminClient and
replace the errors.WrapError(errors.ErrKafkaNewProducer, err) return with
returning errors.Trace(err).

In `@pkg/sink/kafka/franz_gssapi.go`:
- Around line 160-162: The call to wrapTokenResp.Marshal() returns an unwrapped
third-party error; replace the direct return with wrapping the error using
errors.Trace (or errors.WrapError) so the stack is preserved: call
wrapTokenResp.Marshal(), check the returned error, and if non-nil return
errors.Trace(err) (keeping the existing s.step = gssAPIFinished assignment)
instead of returning the raw error from wrapTokenResp.Marshal().

In `@pkg/sink/kafka/franz_sync_producer.go`:
- Around line 60-64: The code uses o.ReadTimeout to compute produceTimeout for
sync produce; change this to use o.WriteTimeout (falling back to the same 10s
default if <=0) so the produce timeout reflects a write operation; update the
assignment of produceTimeout (and any related calculation using
franzSyncRecordRetries) to derive timeout from o.WriteTimeout instead of
o.ReadTimeout to match the design-doc's ProduceRequestTimeout/WriteTimeout
alignment.
🧹 Nitpick comments (6)
docs/design/2026-01-09-franz-go-kafka-sink.md (1)

66-71: Add a language specifier to the fenced code block.

The code block on line 66 has no language tag. Use ```text or similar to silence the markdownlint MD040 warning.

pkg/sink/kafka/franz_sync_producer.go (1)

146-161: Close() has a TOCTOU race between check and store.

Two concurrent Close() calls can both pass the p.closed.Load() check before either calls Store(true), leading to double client.Close(). Consider using CompareAndSwap for atomicity.

Proposed fix
 func (p *franzSyncProducer) Close() {
-	if p.closed.Load() {
+	if !p.closed.CompareAndSwap(false, true) {
 		log.Warn("kafka DDL producer already closed",
 			zap.String("keyspace", p.id.Keyspace()),
 			zap.String("changefeed", p.id.Name()))
 		return
 	}
 
-	p.closed.Store(true)
 	start := time.Now()
pkg/sink/kafka/franz_metrics_collector.go (1)

96-139: Consider whether inFlight accounting is correct for all edge cases.

inFlight is incremented in OnBrokerWrite when err == nil (Line 114) and decremented in OnBrokerE2E when WriteErr == nil (Line 129). If the kgo.Client is closed abruptly, OnBrokerE2E may never fire for some in-flight requests, leaving the counter inflated. The clamp to 0 at Line 130 helps prevent negative values but won't correct an over-count. This is a minor observability inaccuracy, not a functional bug.

pkg/sink/kafka/franz_admin_client.go (1)

74-95: GetAllBrokers silently returns nil on error — callers must handle this.

When BrokerMetadata fails, the method logs a warning and returns nil (Line 84). If callers don't check for nil, they'll get an empty broker list which could cause silent misconfiguration downstream (e.g., metrics not attached to any broker). Consider whether returning an error would be more appropriate for this method, aligning with the pattern used by the other Get* methods.

pkg/sink/kafka/franz_factory.go (2)

246-284: Disabling idempotent writes with single in-flight request per broker is a valid ordering guarantee, but worth documenting the trade-off.

DisableIdempotentWrite() (Line 276) plus MaxProduceRequestsInflightPerBroker(1) (Line 277) ensures strict ordering but sacrifices throughput and exactly-once semantics. This is a conscious design choice that matches the Sarama defaults. Consider adding a brief comment explaining why idempotency is disabled (e.g., TiCDC manages its own deduplication).


286-304: Unknown compression silently defaults to NoCompression.

Lines 301–303 log a warning for unrecognized compression algorithms but proceed with NoCompression. If a user configures "brotli" (unsupported), they might not notice the warning and assume compression is active, leading to unexpectedly high bandwidth usage. Consider returning an error for truly unknown values to fail fast.

Proposed change
 	default:
-		log.Warn("unsupported compression algorithm", zap.String("compression", o.Compression))
-		return kgo.ProducerBatchCompression(kgo.NoCompression()), nil
+		return nil, errors.ErrKafkaInvalidConfig.GenWithStack(
+			"unsupported compression algorithm: %s", o.Compression)
 	}

Comment on lines +117 to +119
建议增加一个可选 sink-uri 参数来选择 Kafka client 实现,默认保持 Sarama:

- 新增参数:`kafka-client=sarama|franz`(默认 sarama)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Design doc states default is sarama, but code now defaults to franz.

Line 119 says kafka-client=sarama|franz(默认 sarama), but NewOptions() in options.go now sets KafkaClient: "franz" and factory_selector.go treats "" as franz. Update the doc to reflect the new default.

🤖 Prompt for AI Agents
In `@docs/design/2026-01-09-franz-go-kafka-sink.md` around lines 117 - 119, The
doc's stated default for the kafka-client flag is out of sync with code: update
the design doc text that currently reads "kafka-client=sarama|franz(默认 sarama)"
to reflect the actual default used by NewOptions() in options.go and the
behavior in factory_selector.go (which treat "" as "franz"), i.e., change the
default to franz or clearly document that franz is the default and sarama must
be opted into via kafka-client=sarama.

Comment on lines +44 to +47
github.com/jcmturner/gofork v1.7.6
github.com/jcmturner/gokrb5/v8 v8.4.4
github.com/json-iterator/go v1.1.12
github.com/klauspost/compress v1.18.0
github.com/klauspost/compress v1.18.2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

cd /dev/null && pwd || true  # Check context

Repository: pingcap/ticdc

Length of output: 106


🏁 Script executed:

find . -name "go.mod" -type f | head -20

Repository: pingcap/ticdc

Length of output: 154


🏁 Script executed:

cat -n ./go.mod | head -100

Repository: pingcap/ticdc

Length of output: 4747


🏁 Script executed:

cat -n ./go.mod | tail -20

Repository: pingcap/ticdc

Length of output: 1129


🏁 Script executed:

cat -n ./go.mod | grep -E "franz-go|kmsg"

Repository: pingcap/ticdc

Length of output: 209


🌐 Web query:

github.com/twmb/franz-go v1.20.6 latest version 2025 2026 compatibility

💡 Result:

  • Latest version (as of Feb 7, 2026): github.com/twmb/franz-go v1.20.6, published Dec 20, 2025 (no newer tag is listed on pkg.go.dev). [1]
  • Kafka compatibility: franz-go states it supports Apache Kafka 0.8.0 through 4.1+ (and Kafka-compatible brokers like Redpanda, Confluent Platform, MSK, etc.). [2]
  • 2025/2026 usage note: v1.20.6 is a v1 (stable) module release line in late 2025; for “compatibility” beyond Kafka protocol (i.e., minimum Go version), the authoritative source is the module’s go.mod for that tag. [1][2]

Sources
[1] https://pkg.go.dev/github.com/twmb/franz-go?tab=versions
[2] https://github.com/twmb/franz-go


🌐 Web query:

github.com/klauspost/compress v1.18.2 latest version security advisories 2025 2026

💡 Result:

Sources:
[1] GitHub releases for klauspost/compress (shows v1.18.3 “Latest” and “Downstream CVE-2025-61728”)
[2] Go Vulnerability Database entry GO-2026-4342 (CVE-2025-61728)


🌐 Web query:

github.com/pierrec/lz4/v4 v4.1.22 latest version 2025 2026

💡 Result:

github.com/pierrec/lz4/v4 v4.1.22 is not the latest.

As of February 7, 2026, the latest tagged/module version is v4.1.25 (Jan 17, 2026). Recent versions are: v4.1.24 (Jan 16, 2026), v4.1.23 (May 10, 2025), v4.1.22 (Dec 12, 2024). [1][2]

Note: the GitHub “Releases” page may lag behind tags (it shows only up to v4.1.23), but the Tags list and Go module index show v4.1.25. [1][2]

Sources: [1] GitHub tags for pierrec/lz4, [2] pkg.go.dev versions for github.com/pierrec/lz4/v4.


🌐 Web query:

golang.org/x/net golang.org/x/sync golang.org/x/sys v0.47.0 v0.18.0 v0.38.0 latest versions 2026 security advisories CVE

💡 Result:

Latest module versions (as of Feb 7, 2026)

Module Your version Latest version (published)
golang.org/x/net v0.47.0 v0.49.0 (Jan 12, 2026) [1]
golang.org/x/sync v0.18.0 v0.19.0 (Dec 4, 2025) [2]
golang.org/x/sys v0.38.0 v0.40.0 (Dec 19, 2025) [3]

Security advisories / CVEs relevant to these modules (notably in 2025–2026)

golang.org/x/net

  • CVE-2025-58190 / GO-2026-4441 — DoS via infinite parsing loop in x/net/html; affected: before v0.45.0 (so not v0.47.0+) [4]
  • CVE-2025-47911 / GO-2026-4440 — DoS via quadratic parsing complexity in x/net/html; affected: before v0.45.0 (so not v0.47.0+) [5]
  • CVE-2025-22870 / GO-2025-3503 — proxy bypass via IPv6 zone ID handling; affected: before v0.36.0 (so not v0.47.0+) [6]
  • CVE-2025-22872 (GHSA-vvgc-356p-c3xw) — tokenizer issue (GitHub advisory); affected: < 0.38.0, patched in 0.38.0 (so not v0.47.0+) [7]

golang.org/x/sync

  • No entries found for golang.org/x/sync in the Go Vulnerability Database “All Reports” list (searched on Feb 7, 2026). [8]

golang.org/x/sys

  • No entries found for golang.org/x/sys in the Go Vulnerability Database “All Reports” list (searched on Feb 7, 2026). [8]

Bottom line for the specific versions you listed:

  • x/net v0.47.0 is already past the fixed versions for the 2025–2026 x/net CVEs above, but is not the latest (latest is v0.49.0). [1][4][5][6][7]
  • x/sync v0.18.0 → update available to v0.19.0; no Go-vulndb CVEs found. [2][8]
  • x/sys v0.38.0 → update available to v0.40.0; no Go-vulndb CVEs found. [3][8]

Sources
[1] golang.org/x/net on pkg.go.dev (v0.49.0) (pkg.go.dev)
[2] golang.org/x/sync on pkg.go.dev (v0.19.0) (pkg.go.dev)
[3] golang.org/x/sys on pkg.go.dev (v0.40.0) (pkg.go.dev)
[4] GO-2026-4441 (CVE-2025-58190) (pkg.go.dev)
[5] GO-2026-4440 (CVE-2025-47911) (pkg.go.dev)
[6] GO-2025-3503 (CVE-2025-22870) (osv.dev)
[7] GHSA-vvgc-356p-c3xw (CVE-2025-22872) (github.com)
[8] Go Vulnerability Database – “All Reports” list (used to check for x/sync / x/sys) (pkg.go.dev)

Citations:


🌐 Web query:

Go 1.25.5 CVE-2025-61728 vulnerability affected versions 1.25.6 security patch

💡 Result:

CVE-2025-61728 is a denial-of-service (CPU exhaustion) issue in Go’s standard library archive/zip: opening a file in a specially crafted ZIP can trigger a super-linear filename indexing algorithm. [3]

Affected Go versions

  • All versions before Go 1.24.12
  • Go 1.25.0 through 1.25.5 (i.e., < 1.25.6) [1][3]

So Go 1.25.5 is affected. [1]

Fixed / security patch versions

  • Go 1.25.6 (fix for the Go 1.25.x line)
  • Go 1.24.12 (fix for the Go 1.24.x line) [1][2]

What to do


Upgrade Go to 1.25.6 to address CVE-2025-61728 vulnerability in archive/zip.

Go 1.25.5 (line 3) is affected by CVE-2025-61728, a denial-of-service vulnerability in the standard library's archive/zip package. The fix is available in Go 1.25.6. Additionally, several dependency versions are outdated and have newer releases available:

  • klauspost/compress v1.18.2 (line 47) → update to v1.18.3 (Jan 16, 2026; latest as of Feb 7, 2026)
  • pierrec/lz4/v4 v4.1.22 (line 51) → update to v4.1.25 (Jan 17, 2026; latest)
  • golang.org/x/sync v0.18.0 (line 94) → update to v0.19.0 (Dec 4, 2025; no security advisories)
  • golang.org/x/sys v0.38.0 (line 95) → update to v0.40.0 (Dec 19, 2025; no security advisories)
  • golang.org/x/net v0.47.0 (line 92) → update to v0.49.0 (Jan 12, 2026; all known 2025 CVEs patched before v0.45.0)
  • github.com/twmb/franz-go v1.20.6 (line 76), pkg/kadm v1.17.1 (line 77), and pkg/kmsg v1.12.0 (line 336) are at or near latest stable versions with no known security advisories.
🤖 Prompt for AI Agents
In `@go.mod` around lines 44 - 47, go.mod currently pins Go 1.25.5 which is
vulnerable to CVE-2025-61728; update the Go toolchain version to 1.25.6 in
go.mod and run `go mod tidy`/`go get` to regenerate the lock info, and also bump
the listed module versions: update github.com/klauspost/compress to v1.18.3,
pierrec/lz4/v4 to v4.1.25, golang.org/x/sync to v0.19.0, golang.org/x/sys to
v0.40.0, and golang.org/x/net to v0.49.0 (leave twmb/franz-go, pkg/kadm,
pkg/kmsg as-is); ensure after changes you run the test suite and CI to verify
builds.

Comment on lines 68 to 81
func (p *franzAsyncProducer) Close() {
if !p.closed.CompareAndSwap(false, true) {
return
}

go func() {
start := time.Now()
p.client.Close()
log.Info("Close kafka async producer success",
zap.String("keyspace", p.changefeedID.Keyspace()),
zap.String("changefeed", p.changefeedID.Name()),
zap.Duration("duration", time.Since(start)))
}()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Resource leak: kgo.Client is never closed when AsyncRunCallback exits before Close().

AsyncRunCallback unconditionally sets closed=true on exit (Line 151). When Close() is subsequently called, the CompareAndSwap(false, true) at Line 69 fails and the method returns without calling p.client.Close(). This leaves the kgo.Client open — leaking connections, goroutines, and memory.

Either decouple the "no more sends" flag from the "client closed" lifecycle, or ensure client.Close() is called unconditionally with a sync.Once:

Proposed fix using sync.Once
 type franzAsyncProducer struct {
 	client       *kgo.Client
 	changefeedID commonType.ChangeFeedID
 
-	closed *atomic.Bool
-	errCh  chan error
+	closed    *atomic.Bool
+	closeOnce sync.Once
+	errCh     chan error
 }

 func (p *franzAsyncProducer) Close() {
-	if !p.closed.CompareAndSwap(false, true) {
-		return
-	}
-
-	go func() {
+	p.closed.Store(true)
+	p.closeOnce.Do(func() {
 		start := time.Now()
 		p.client.Close()
-		log.Info("Close kafka async producer success",
+		log.Info("kafka async producer closed",
 			zap.String("keyspace", p.changefeedID.Keyspace()),
 			zap.String("changefeed", p.changefeedID.Name()),
 			zap.Duration("duration", time.Since(start)))
-	}()
+	})
 }

Also applies to: 150-165

🤖 Prompt for AI Agents
In `@pkg/sink/kafka/franz_async_producer.go` around lines 68 - 81, The Close()
currently returns without closing p.client when p.closed was already set by
AsyncRunCallback; update the lifecycle so client.Close() is always invoked
exactly once regardless of p.closed state—for example add a dedicated sync.Once
(e.g., producerCloseOnce) or separate flags so that franzAsyncProducer.Close()
always calls p.client.Close() (or triggers the once) even when AsyncRunCallback
sets p.closed; ensure AsyncRunCallback no longer assumes responsibility for
closing p.client and instead calls the same sync.Once/closing helper used by
Close(), referencing franzAsyncProducer.Close, AsyncRunCallback, p.closed,
p.client.Close, and the new sync.Once field to guarantee no leak.

Comment on lines 268 to 271
produceTimeout := o.ReadTimeout
if produceTimeout < 100*time.Millisecond {
produceTimeout = 10 * time.Second
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

produceTimeout derived from ReadTimeout — should this be WriteTimeout?

Line 268 uses o.ReadTimeout for the produce request timeout. Since producing is a write-path operation, o.WriteTimeout seems like a more natural choice. Verify that ReadTimeout is intentional here and not a copy-paste from the base options builder.

🤖 Prompt for AI Agents
In `@pkg/sink/kafka/franz_factory.go` around lines 268 - 271, The produceTimeout
is being set from o.ReadTimeout but since this value controls the producer
(write) request timeout, update the assignment to use o.WriteTimeout instead of
o.ReadTimeout; locate the produceTimeout variable initialization in
franz_factory.go (the block that currently reads produceTimeout := o.ReadTimeout
and the subsequent min check) and change it to derive from o.WriteTimeout,
preserving the same minimum/default fallback logic.

Comment on lines +160 to +162
}
s.step = gssAPIFinished
return wrapTokenResp.Marshal()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Unwrapped third-party error from Marshal().

wrapTokenResp.Marshal() (gokrb5 library call) can return an error that is not wrapped with errors.Trace. As per coding guidelines, "When an error comes from a third-party or library call in Go, wrap it immediately with errors.Trace(err) or errors.WrapError(...) to attach a stack trace".

Proposed fix
 		s.step = gssAPIFinished
-		return wrapTokenResp.Marshal()
+		marshaled, err := wrapTokenResp.Marshal()
+		if err != nil {
+			return nil, errors.Trace(err)
+		}
+		return marshaled, nil
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
}
s.step = gssAPIFinished
return wrapTokenResp.Marshal()
}
s.step = gssAPIFinished
marshaled, err := wrapTokenResp.Marshal()
if err != nil {
return nil, errors.Trace(err)
}
return marshaled, nil
🤖 Prompt for AI Agents
In `@pkg/sink/kafka/franz_gssapi.go` around lines 160 - 162, The call to
wrapTokenResp.Marshal() returns an unwrapped third-party error; replace the
direct return with wrapping the error using errors.Trace (or errors.WrapError)
so the stack is preserved: call wrapTokenResp.Marshal(), check the returned
error, and if non-nil return errors.Trace(err) (keeping the existing s.step =
gssAPIFinished assignment) instead of returning the raw error from
wrapTokenResp.Marshal().

Comment on lines 60 to 64
produceTimeout := o.ReadTimeout
if produceTimeout <= 0 {
produceTimeout = 10 * time.Second
}
timeout := time.Duration(franzSyncRecordRetries+1) * produceTimeout
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

ReadTimeout used for produce timeout — should this be WriteTimeout?

o.ReadTimeout is used to derive the overall sync-produce timeout, but this is a write (produce) operation. The design doc (line 166) mentions aligning ProduceRequestTimeout with WriteTimeout. Consider using o.WriteTimeout instead for semantic correctness.

Proposed fix
-	produceTimeout := o.ReadTimeout
+	produceTimeout := o.WriteTimeout
 	if produceTimeout <= 0 {
 		produceTimeout = 10 * time.Second
 	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
produceTimeout := o.ReadTimeout
if produceTimeout <= 0 {
produceTimeout = 10 * time.Second
}
timeout := time.Duration(franzSyncRecordRetries+1) * produceTimeout
produceTimeout := o.WriteTimeout
if produceTimeout <= 0 {
produceTimeout = 10 * time.Second
}
timeout := time.Duration(franzSyncRecordRetries+1) * produceTimeout
🤖 Prompt for AI Agents
In `@pkg/sink/kafka/franz_sync_producer.go` around lines 60 - 64, The code uses
o.ReadTimeout to compute produceTimeout for sync produce; change this to use
o.WriteTimeout (falling back to the same 10s default if <=0) so the produce
timeout reflects a write operation; update the assignment of produceTimeout (and
any related calculation using franzSyncRecordRetries) to derive timeout from
o.WriteTimeout instead of o.ReadTimeout to match the design-doc's
ProduceRequestTimeout/WriteTimeout alignment.

@3AceShowHand
Copy link
Collaborator Author

/test pull-cdc-kafka-integration-light

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🤖 Fix all issues with AI agents
In `@pkg/sink/kafka/franz_metrics_collector.go`:
- Around line 77-94: cleanupMetrics only looks at the final hook.Snapshot() so
broker IDs seen earlier can be left as stale Prometheus labels; modify
franzMetricsCollector to maintain a persistent set (e.g., seenBrokers
map[uint32]struct{} or set of string IDs) that is updated inside collectMetrics
whenever you iterate snapshot.Brokers, and then have cleanupMetrics iterate that
accumulated set (not just the final snapshot) to call
OutgoingByteRateGauge.DeleteLabelValues, RequestRateGauge.DeleteLabelValues,
RequestLatencyGauge.DeleteLabelValues, requestsInFlightGauge.DeleteLabelValues
and responseRateGauge.DeleteLabelValues for every brokerID seen; ensure
seenBrokers is initialized in the constructor and cleared/updated appropriately
when collectors are stopped or reused.

In `@pkg/sink/kafka/franz/admin_client.go`:
- Around line 185-228: GetTopicsMeta currently omits ReplicationFactor when
building TopicDetail in GetTopicsMeta; populate ReplicationFactor from the
fetched metadata by inspecting detail.Partitions' replica lists (e.g., if
len(detail.Partitions) > 0 take the replica slice length from
detail.Partitions[0].Replicas or compute a consistent value across partitions)
and assign it to TopicDetail.ReplicationFactor when creating the result entry;
update the construction of TopicDetail in GetTopicsMeta (referencing
GetTopicsMeta, meta.Topics[topic], detail.Partitions and TopicDetail) to set
ReplicationFactor accordingly and handle the empty-partitions case.

In `@pkg/sink/kafka/franz/async_producer.go`:
- Around line 69-82: The AsyncProducer.Close implementation spawns a goroutine
and calls p.client.Close() fire-and-forget, risking loss of buffered messages;
change AsyncProducer.Close (and its use of p.closed.CompareAndSwap and
p.client.Close) to perform the client.Close() call synchronously (or expose/wait
on a completion channel) so callers block until the franz-go client has flushed
and closed, mirroring the SyncProducer.Close pattern; keep the same logging
(including duration, p.changefeedID.Keyspace()/Name()) after the synchronous
close completes.
- Around line 151-166: Remove the premature mark of p.closed in
AsyncRunCallback: the deferred p.closed.Store(true) should be removed so Close()
remains the single place that sets the closed flag and performs shutdown; update
AsyncRunCallback to simply return on context cancel or errors (and if you
prefer, call p.client.Close() directly before returning instead of setting
p.closed) so that Close()'s CompareAndSwap(false, true) can succeed and the
underlying kgo.Client is properly closed; ensure references to p.errCh,
ctx.Done(), and p.changefeedID remain unchanged.

In `@pkg/sink/kafka/franz/sync_producer.go`:
- Around line 147-162: The Close method on SyncProducer is not atomic; replace
the Load()+Store() pattern with an atomic CompareAndSwap on p.closed so only one
goroutine proceeds to close the client: in SyncProducer.Close(), attempt if
p.closed.CompareAndSwap(false, true) { do the start timer, p.client.Close(), and
the Info log } else { emit the Warn log about already-closed using
p.id.Keyspace() and p.id.Name() and return }; mirror the AsyncProducer Close
behavior and messages to ensure consistency.

In `@pkg/sink/kafka/internal/logutil/logutil.go`:
- Around line 86-99: The AnnotateEventError function is duplicating the original
error message by appending err.Error() into the annotation; change the call that
builds the wrapped error (in AnnotateEventError) to only include the event
context (e.g., use errors.Annotate(err, contextStr) or a context-only string
like contextStr+";") instead of contextStr+"; ErrorInfo:"+err.Error(),
referencing AnnotateEventError, BuildEventLogContext and errors.Annotate so the
original error text is preserved exactly once in the wrapped error.
🧹 Nitpick comments (4)
pkg/sink/kafka/internal/logutil/logutil.go (1)

101-107: Silent error swallowing in formatDMLInfo.

json.Marshal failure is silently discarded, returning "". This is acceptable for best-effort log context, but a brief log or comment documenting the intentional discard would help future readers.

pkg/sink/kafka/franz/factory.go (2)

117-139: ALPN NextProtos is unnecessary for Kafka connections.

Lines 120 and 133 set NextProtos: []string{"h2", "http/1.1"}, which are HTTP ALPN identifiers. Kafka uses its own binary protocol over TLS and does not negotiate via ALPN. While this won't break anything (the broker simply ignores unknown ALPN tokens), it adds confusion and could cause issues with strict TLS intermediaries.

Consider removing NextProtos or leaving it nil unless there's a specific proxy/load-balancer requirement.


248-266: buildFranzCompressionOption signature returns an error but never produces one.

The function always returns nil for the error. The caller at Line 225-228 checks for an error that can never happen. This is harmless but misleading — consider simplifying the return to kgo.Opt only, or returning an error on the default branch instead of silently falling back.

pkg/sink/kafka/franz/metrics_hook.go (1)

145-153: snapshotBrokers is unused — the same broker-map copy logic is inlined in Snapshot() at lines 114–118.

Remove this function to eliminate dead code.

Comment on lines 77 to 94
func (m *franzMetricsCollector) cleanupMetrics() {
keyspace := m.changefeedID.Keyspace()
changefeedID := m.changefeedID.Name()
compressionRatioGauge.DeleteLabelValues(keyspace, changefeedID, avg)
compressionRatioGauge.DeleteLabelValues(keyspace, changefeedID, p99)
recordsPerRequestGauge.DeleteLabelValues(keyspace, changefeedID, avg)
recordsPerRequestGauge.DeleteLabelValues(keyspace, changefeedID, p99)

snapshot := m.hook.Snapshot()
for id := range snapshot.Brokers {
brokerID := strconv.Itoa(int(id))
OutgoingByteRateGauge.DeleteLabelValues(keyspace, changefeedID, brokerID)
RequestRateGauge.DeleteLabelValues(keyspace, changefeedID, brokerID)
RequestLatencyGauge.DeleteLabelValues(keyspace, changefeedID, brokerID, avg)
RequestLatencyGauge.DeleteLabelValues(keyspace, changefeedID, brokerID, p99)
requestsInFlightGauge.DeleteLabelValues(keyspace, changefeedID, brokerID)
responseRateGauge.DeleteLabelValues(keyspace, changefeedID, brokerID)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Stale per-broker metrics may not be cleaned up.

cleanupMetrics takes a snapshot at cleanup time to discover broker IDs. If a broker was seen earlier in the collector's lifetime but is no longer present in the final snapshot (e.g., broker removed from the cluster or connection dropped), its metric labels will never be deleted — leading to stale gauge values in Prometheus.

Consider tracking the set of broker IDs seen across all collectMetrics calls and using that accumulated set during cleanup.

🤖 Prompt for AI Agents
In `@pkg/sink/kafka/franz_metrics_collector.go` around lines 77 - 94,
cleanupMetrics only looks at the final hook.Snapshot() so broker IDs seen
earlier can be left as stale Prometheus labels; modify franzMetricsCollector to
maintain a persistent set (e.g., seenBrokers map[uint32]struct{} or set of
string IDs) that is updated inside collectMetrics whenever you iterate
snapshot.Brokers, and then have cleanupMetrics iterate that accumulated set (not
just the final snapshot) to call OutgoingByteRateGauge.DeleteLabelValues,
RequestRateGauge.DeleteLabelValues, RequestLatencyGauge.DeleteLabelValues,
requestsInFlightGauge.DeleteLabelValues and responseRateGauge.DeleteLabelValues
for every brokerID seen; ensure seenBrokers is initialized in the constructor
and cleared/updated appropriately when collectors are stopped or reused.

Comment on lines 185 to 228
func (a *AdminClient) GetTopicsMeta(
topics []string,
ignoreTopicError bool,
) (map[string]TopicDetail, error) {
if len(topics) == 0 {
return make(map[string]TopicDetail), nil
}

ctx, cancel := a.newRequestContext()
defer cancel()

meta, err := a.admin.Metadata(ctx, topics...)
if err != nil {
return nil, errors.Trace(err)
}

result := make(map[string]TopicDetail, len(topics))
for _, topic := range topics {
detail, ok := meta.Topics[topic]
if !ok {
continue
}
if detail.Err != nil {
if errors.Is(detail.Err, kerr.UnknownTopicOrPartition) {
continue
}
if !ignoreTopicError {
return nil, errors.Trace(detail.Err)
}
log.Warn("fetch topic meta failed",
zap.String("keyspace", a.changefeed.Keyspace()),
zap.String("changefeed", a.changefeed.Name()),
zap.String("topic", topic),
zap.Error(detail.Err))
continue
}

result[topic] = TopicDetail{
Name: topic,
NumPartitions: int32(len(detail.Partitions)),
}
}
return result, nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

GetTopicsMeta never populates ReplicationFactor in the returned TopicDetail.

At Line 222-225, the TopicDetail is built with Name and NumPartitions but ReplicationFactor is left at its zero value. If callers rely on replication factor from this method, they'll silently get 0. If it's intentionally omitted, a comment would help; otherwise the franz-go metadata response does expose partition replica counts that could be used.

🤖 Prompt for AI Agents
In `@pkg/sink/kafka/franz/admin_client.go` around lines 185 - 228, GetTopicsMeta
currently omits ReplicationFactor when building TopicDetail in GetTopicsMeta;
populate ReplicationFactor from the fetched metadata by inspecting
detail.Partitions' replica lists (e.g., if len(detail.Partitions) > 0 take the
replica slice length from detail.Partitions[0].Replicas or compute a consistent
value across partitions) and assign it to TopicDetail.ReplicationFactor when
creating the result entry; update the construction of TopicDetail in
GetTopicsMeta (referencing GetTopicsMeta, meta.Topics[topic], detail.Partitions
and TopicDetail) to set ReplicationFactor accordingly and handle the
empty-partitions case.

Comment on lines +69 to +82
func (p *AsyncProducer) Close() {
if !p.closed.CompareAndSwap(false, true) {
return
}

go func() {
start := time.Now()
p.client.Close()
log.Info("Close kafka async producer success",
zap.String("keyspace", p.changefeedID.Keyspace()),
zap.String("changefeed", p.changefeedID.Name()),
zap.Duration("duration", time.Since(start)))
}()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Asynchronous Close() may lose buffered messages.

p.client.Close() is called in a fire-and-forget goroutine. The franz-go Client.Close() flushes pending records and waits for in-flight produce requests. By not waiting for it to complete, the caller may proceed (e.g., exit the process) before buffered messages are flushed.

Consider making Close() synchronous or providing a way for the caller to wait for completion, similar to the SyncProducer.Close() pattern in sync_producer.go which calls p.client.Close() synchronously.

🤖 Prompt for AI Agents
In `@pkg/sink/kafka/franz/async_producer.go` around lines 69 - 82, The
AsyncProducer.Close implementation spawns a goroutine and calls p.client.Close()
fire-and-forget, risking loss of buffered messages; change AsyncProducer.Close
(and its use of p.closed.CompareAndSwap and p.client.Close) to perform the
client.Close() call synchronously (or expose/wait on a completion channel) so
callers block until the franz-go client has flushed and closed, mirroring the
SyncProducer.Close pattern; keep the same logging (including duration,
p.changefeedID.Keyspace()/Name()) after the synchronous close completes.

Comment on lines 151 to 166
func (p *AsyncProducer) AsyncRunCallback(ctx context.Context) error {
defer p.closed.Store(true)
for {
select {
case <-ctx.Done():
log.Info("async producer exit since context is done",
zap.String("keyspace", p.changefeedID.Keyspace()),
zap.String("changefeed", p.changefeedID.Name()))
return errors.Trace(ctx.Err())
case err := <-p.errCh:
if err == nil {
return nil
}
return err
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

AsyncRunCallback sets closed=true before Close() runs, preventing client shutdown.

AsyncRunCallback defers p.closed.Store(true) (Line 152). When it returns (e.g., on context cancellation or error), closed becomes true. A subsequent call to Close() sees CompareAndSwap(false, true) fail and returns immediately — the underlying kgo.Client is never closed, leaking connections and potentially losing buffered messages.

Either remove the defer p.closed.Store(true) from AsyncRunCallback (let Close() handle the lifecycle), or have AsyncRunCallback call p.client.Close() directly before returning.

Proposed fix: remove the premature closed flag
 func (p *AsyncProducer) AsyncRunCallback(ctx context.Context) error {
-	defer p.closed.Store(true)
 	for {
 		select {
 		case <-ctx.Done():
 			log.Info("async producer exit since context is done",
 				zap.String("keyspace", p.changefeedID.Keyspace()),
 				zap.String("changefeed", p.changefeedID.Name()))
 			return errors.Trace(ctx.Err())
 		case err := <-p.errCh:
 			if err == nil {
 				return nil
 			}
 			return err
 		}
 	}
 }
🤖 Prompt for AI Agents
In `@pkg/sink/kafka/franz/async_producer.go` around lines 151 - 166, Remove the
premature mark of p.closed in AsyncRunCallback: the deferred
p.closed.Store(true) should be removed so Close() remains the single place that
sets the closed flag and performs shutdown; update AsyncRunCallback to simply
return on context cancel or errors (and if you prefer, call p.client.Close()
directly before returning instead of setting p.closed) so that Close()'s
CompareAndSwap(false, true) can succeed and the underlying kgo.Client is
properly closed; ensure references to p.errCh, ctx.Done(), and p.changefeedID
remain unchanged.

Comment on lines +86 to +99
// AnnotateEventError logs the event context and annotates the error with that context.
func AnnotateEventError(
keyspace, changefeed string,
info *common.MessageLogInfo,
err error,
) error {
if err == nil {
return nil
}
if contextStr := BuildEventLogContext(keyspace, changefeed, info); contextStr != "" {
return errors.Annotate(err, contextStr+"; ErrorInfo:"+err.Error())
}
return err
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Duplicated error message in annotated error.

errors.Annotate already wraps err, so when the annotated error is stringified it will include the original error's message. Appending err.Error() into the annotation string at line 96 causes the original message to appear twice in the output, e.g.:
"keyspace=..., eventType=dml; ErrorInfo:something failed: something failed"

Proposed fix
 	if contextStr := BuildEventLogContext(keyspace, changefeed, info); contextStr != "" {
-		return errors.Annotate(err, contextStr+"; ErrorInfo:"+err.Error())
+		return errors.Annotate(err, contextStr)
 	}
🤖 Prompt for AI Agents
In `@pkg/sink/kafka/internal/logutil/logutil.go` around lines 86 - 99, The
AnnotateEventError function is duplicating the original error message by
appending err.Error() into the annotation; change the call that builds the
wrapped error (in AnnotateEventError) to only include the event context (e.g.,
use errors.Annotate(err, contextStr) or a context-only string like
contextStr+";") instead of contextStr+"; ErrorInfo:"+err.Error(), referencing
AnnotateEventError, BuildEventLogContext and errors.Annotate so the original
error text is preserved exactly once in the wrapped error.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

🤖 Fix all issues with AI agents
In `@pkg/sink/kafka/franz_admin_client.go`:
- Around line 29-36: GetAllBrokers currently treats elements returned by
a.inner.GetAllBrokers() as structs with a field ID, but the inner call returns
[]int32 (broker node IDs); update the loop in GetAllBrokers to iterate over the
int32 IDs (e.g., for _, id := range brokers) and construct Broker values using
the id directly (Broker{ID: id}) so you don't access a non-existent b.ID field.
- Around line 70-80: The CreateTopic implementation in franzAdminClientAdapter
is constructing the wrong type: it uses kadm.TopicDetail fields that don't exist
and passes *kadm.TopicDetail to a.inner.CreateTopic which expects
*kafkafranz.TopicDetail; change the code in CreateTopic to build and pass a
*kafkafranz.TopicDetail populated with the correct fields (e.g., Topic,
Partitions, ReplicationFactor or their actual names in kafkafranz.TopicDetail)
and remove the unused kadm import; ensure you still validate detail == nil and
return the same error when appropriate.

In `@pkg/sink/kafka/franz/admin_client.go`:
- Around line 81-94: GetAllBrokers currently swallows errors from
a.admin.BrokerMetadata making callers unable to tell "no brokers" vs "request
failed"; change the method signature on AdminClient.GetAllBrokers to return
([]int32, error), use a.newRequestContext() as before, call
a.admin.BrokerMetadata(ctx) and on error log the warning (including
keyspace/changefeed via a.changefeed) and return nil plus the error, otherwise
return meta.Brokers.NodeIDs() with a nil error; update all callers of
GetAllBrokers to handle the error appropriately (or if you intentionally want
best-effort behavior, instead add a clear comment above GetAllBrokers explaining
that failures are intentionally ignored).

In `@pkg/sink/kafka/franz/async_producer.go`:
- Around line 78-143: The return of context.Cause(ctx) in AsyncSend should be
wrapped to attach a stack trace; update the select case that currently does
"return context.Cause(ctx)" to instead capture the error and return a wrapped
error (e.g. errors.Trace(err) or errors.WrapError(...)) so library/stdlib errors
are traced; locate the select in AsyncSend that checks <-ctx.Done() and replace
the direct context.Cause(ctx) return with a wrapped version while preserving the
original behavior.
- Around line 148-164: In AsyncRunCallback, the returned context.Cause(ctx)
should be wrapped with the project's error-tracing helper (e.g. errors.Trace or
errors.WrapError) like other callsites (same pattern used in AsyncSend); update
the return in the ctx.Done() case inside AsyncRunCallback to return
errors.Trace(context.Cause(ctx)) (and ensure the errors package is imported/used
consistently) so the context-derived error gets a stack trace.

In `@pkg/sink/kafka/franz/factory_api_test.go`:
- Around line 28-29: The test calls newProducerOptions with an extra integer
argument causing a compile error; update the test to call newProducerOptions
with only the Options pointer (o) and, if the intent was to set maxAttempts or
similar, set the appropriate field on the Options struct (e.g., o.SomeField = 3)
before calling newProducerOptions so the function signature newProducerOptions(o
*Options) []kgo.Opt is respected.

In `@pkg/sink/kafka/franz/factory.go`:
- Around line 220-239: The newCompressionOption function currently logs an
unrecognized compression but leaves codec as the zero value, causing silent
no-compression; update the default branch in newCompressionOption (and any
early-empty case) to explicitly set codec = kgo.NoCompression() (or
alternatively return an error) so that kgo.ProducerBatchCompression always
receives a valid codec; ensure the warning log remains but does not allow codec
to stay nil before returning.
🧹 Nitpick comments (4)
pkg/sink/kafka/franz/factory_api_test.go (1)

18-29: Tests don't assert anything — they only verify no panic.

Both tests discard the return values without any assertions. Consider using require from testify to validate the returned options (e.g., non-nil, expected length, specific codec).

pkg/sink/kafka/franz/factory.go (1)

132-173: Empty default: case in the SASL switch is misleading.

The default: on line 170 is empty, and the error return on line 172 is outside the switch. This makes it look like a fallthrough path, but Go doesn't fall through in switches. Moving the error return inside default: would improve clarity.

♻️ Proposed refactor
-	default:
-	}
-	return nil, errors.ErrKafkaInvalidConfig.GenWithStack("unsupported sasl mechanism %s", o.SASL.SASLMechanism)
+	default:
+		return nil, errors.ErrKafkaInvalidConfig.GenWithStack("unsupported sasl mechanism %s", o.SASL.SASLMechanism)
+	}
pkg/sink/kafka/franz/sync_producer.go (1)

93-101: WrapError called unconditionally — wraps nil on success path.

Line 101 calls errors.WrapError(errors.ErrKafkaSendMessage, err) regardless of whether err is nil. While WrapError likely returns nil when the inner error is nil, this pattern is fragile and obscures intent. The same applies to SendMessages at line 136.

♻️ Proposed fix for both methods
-	return errors.WrapError(errors.ErrKafkaSendMessage, err)
+	if err != nil {
+		return errors.WrapError(errors.ErrKafkaSendMessage, err)
+	}
+	return nil
pkg/sink/kafka/franz/gssapi.go (1)

46-52: Typo: kerborosClientkerberosClient.

The interface name and all references misspell "Kerberos" as "Kerboros." This affects the interface name (Line 46), the field type (Line 98), the constructor name (Line 217), and the constructor return type. Since these are unexported, it won't break external consumers, but it's still a readability/maintenance concern.

Proposed fix (interface + constructor rename)
-type kerborosClient interface {
+type kerberosClient interface {
-func newKerborosClient(g security.GSSAPI) (kerborosClient, error) {
+func newKerberosClient(g security.GSSAPI) (kerberosClient, error) {

And update the field type in gssapiSession:

 type gssapiSession struct {
-	client kerborosClient
+	client kerberosClient

Comment on lines 70 to 80
func (a *franzAdminClientAdapter) CreateTopic(detail *TopicDetail, validateOnly bool) error {
if detail == nil {
return errors.ErrKafkaInvalidConfig.GenWithStack("topic detail must not be nil")
}
franzDetail := &kadm.TopicDetail{
Name: detail.Name,
NumPartitions: detail.NumPartitions,
ReplicationFactor: detail.ReplicationFactor,
}
return a.inner.CreateTopic(franzDetail, validateOnly)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Compilation error: wrong type used for CreateTopic argument.

kadm.TopicDetail does not have Name, NumPartitions, or ReplicationFactor fields, and a.inner.CreateTopic expects *kafkafranz.TopicDetail, not *kadm.TopicDetail. The pipeline confirms all four errors on lines 75-79.

The kadm import on line 19 can also be removed after this fix.

🐛 Proposed fix
 import (
 	"github.com/pingcap/ticdc/pkg/errors"
 	kafkafranz "github.com/pingcap/ticdc/pkg/sink/kafka/franz"
-	"github.com/twmb/franz-go/pkg/kadm"
 )
 func (a *franzAdminClientAdapter) CreateTopic(detail *TopicDetail, validateOnly bool) error {
 	if detail == nil {
 		return errors.ErrKafkaInvalidConfig.GenWithStack("topic detail must not be nil")
 	}
-	franzDetail := &kadm.TopicDetail{
-		Name:              detail.Name,
-		NumPartitions:     detail.NumPartitions,
-		ReplicationFactor: detail.ReplicationFactor,
-	}
+	franzDetail := &kafkafranz.TopicDetail{
+		Name:              detail.Name,
+		NumPartitions:     detail.NumPartitions,
+		ReplicationFactor: detail.ReplicationFactor,
+	}
 	return a.inner.CreateTopic(franzDetail, validateOnly)
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (a *franzAdminClientAdapter) CreateTopic(detail *TopicDetail, validateOnly bool) error {
if detail == nil {
return errors.ErrKafkaInvalidConfig.GenWithStack("topic detail must not be nil")
}
franzDetail := &kadm.TopicDetail{
Name: detail.Name,
NumPartitions: detail.NumPartitions,
ReplicationFactor: detail.ReplicationFactor,
}
return a.inner.CreateTopic(franzDetail, validateOnly)
}
func (a *franzAdminClientAdapter) CreateTopic(detail *TopicDetail, validateOnly bool) error {
if detail == nil {
return errors.ErrKafkaInvalidConfig.GenWithStack("topic detail must not be nil")
}
franzDetail := &kafkafranz.TopicDetail{
Name: detail.Name,
NumPartitions: detail.NumPartitions,
ReplicationFactor: detail.ReplicationFactor,
}
return a.inner.CreateTopic(franzDetail, validateOnly)
}
🧰 Tools
🪛 GitHub Check: Mac OS Build

[failure] 79-79:
cannot use franzDetail (variable of type *kadm.TopicDetail) as *franz.TopicDetail value in argument to a.inner.CreateTopic


[failure] 77-77:
unknown field ReplicationFactor in struct literal of type kadm.TopicDetail


[failure] 76-76:
unknown field NumPartitions in struct literal of type kadm.TopicDetail


[failure] 75-75:
unknown field Name in struct literal of type kadm.TopicDetail

🤖 Prompt for AI Agents
In `@pkg/sink/kafka/franz_admin_client.go` around lines 70 - 80, The CreateTopic
implementation in franzAdminClientAdapter is constructing the wrong type: it
uses kadm.TopicDetail fields that don't exist and passes *kadm.TopicDetail to
a.inner.CreateTopic which expects *kafkafranz.TopicDetail; change the code in
CreateTopic to build and pass a *kafkafranz.TopicDetail populated with the
correct fields (e.g., Topic, Partitions, ReplicationFactor or their actual names
in kafkafranz.TopicDetail) and remove the unused kadm import; ensure you still
validate detail == nil and return the same error when appropriate.

Comment on lines 81 to 94
func (a *AdminClient) GetAllBrokers() []int32 {
ctx, cancel := a.newRequestContext()
defer cancel()

meta, err := a.admin.BrokerMetadata(ctx)
if err != nil {
log.Warn("Kafka admin client fetch broker metadata failed",
zap.String("keyspace", a.changefeed.Keyspace()),
zap.String("changefeed", a.changefeed.Name()),
zap.Error(err))
return nil
}
return meta.Brokers.NodeIDs()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

GetAllBrokers silently swallows errors by returning nil.

When BrokerMetadata fails, the method logs a warning and returns nil instead of propagating the error. Callers cannot distinguish "no brokers" from "request failed." If this is intentional (best-effort for heartbeat/monitoring), a comment would clarify the design. If callers need to react to failures, an error return value is needed.

🤖 Prompt for AI Agents
In `@pkg/sink/kafka/franz/admin_client.go` around lines 81 - 94, GetAllBrokers
currently swallows errors from a.admin.BrokerMetadata making callers unable to
tell "no brokers" vs "request failed"; change the method signature on
AdminClient.GetAllBrokers to return ([]int32, error), use a.newRequestContext()
as before, call a.admin.BrokerMetadata(ctx) and on error log the warning
(including keyspace/changefeed via a.changefeed) and return nil plus the error,
otherwise return meta.Brokers.NodeIDs() with a nil error; update all callers of
GetAllBrokers to handle the error appropriately (or if you intentionally want
best-effort behavior, instead add a clear comment above GetAllBrokers explaining
that failures are intentionally ignored).

Comment on lines +78 to +143
func (p *AsyncProducer) AsyncSend(
ctx context.Context,
topic string,
partition int32,
message *common.Message,
) error {
if p.closed.Load() {
return errors.ErrKafkaProducerClosed.GenWithStackByArgs()
}

select {
case <-ctx.Done():
return context.Cause(ctx)
default:
}

var (
keyspace = p.changefeedID.Keyspace()
changefeed = p.changefeedID.Name()
)

failpoint.Inject("KafkaSinkAsyncSendError", func() {
log.Info("KafkaSinkAsyncSendError error injected",
zap.String("keyspace", keyspace), zap.String("changefeed", changefeed))
errWithInfo := logutil.AnnotateEventError(
keyspace,
changefeed,
message.LogInfo,
errors.New("kafka sink injected error"),
)
select {
case p.errCh <- errors.WrapError(errors.ErrKafkaAsyncSendMessage, errWithInfo):
default:
}
failpoint.Return(nil)
})

record := &kgo.Record{
Topic: topic,
Partition: partition,
Key: message.Key,
Value: message.Value,
}

callback := message.Callback
logInfo := message.LogInfo
promise := func(_ *kgo.Record, err error) {
if err != nil {
errWithInfo := logutil.AnnotateEventError(
keyspace, changefeed,
logInfo,
err,
)
select {
case p.errCh <- errors.WrapError(errors.ErrKafkaAsyncSendMessage, errWithInfo):
// todo: remove this default after support dispatcher recover logic.
default:
}
return
}
if callback != nil {
callback()
}
}
p.client.Produce(ctx, record, promise)
return nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

AsyncSend should wrap the error from context.Cause(ctx).

Line 90 returns the raw error from context.Cause(ctx) without errors.Trace. Per the coding guidelines, errors from library/stdlib calls should be wrapped to attach a stack trace.

Proposed fix
 	select {
 	case <-ctx.Done():
-		return context.Cause(ctx)
+		return errors.Trace(context.Cause(ctx))
 	default:
 	}

The rest of AsyncSend — record construction, promise callback wiring, failpoint injection, and the non-blocking error channel send — is clean and correct. As per coding guidelines, "When an error comes from a third-party or library call in Go, wrap it immediately with errors.Trace(err) or errors.WrapError(...) to attach a stack trace."

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (p *AsyncProducer) AsyncSend(
ctx context.Context,
topic string,
partition int32,
message *common.Message,
) error {
if p.closed.Load() {
return errors.ErrKafkaProducerClosed.GenWithStackByArgs()
}
select {
case <-ctx.Done():
return context.Cause(ctx)
default:
}
var (
keyspace = p.changefeedID.Keyspace()
changefeed = p.changefeedID.Name()
)
failpoint.Inject("KafkaSinkAsyncSendError", func() {
log.Info("KafkaSinkAsyncSendError error injected",
zap.String("keyspace", keyspace), zap.String("changefeed", changefeed))
errWithInfo := logutil.AnnotateEventError(
keyspace,
changefeed,
message.LogInfo,
errors.New("kafka sink injected error"),
)
select {
case p.errCh <- errors.WrapError(errors.ErrKafkaAsyncSendMessage, errWithInfo):
default:
}
failpoint.Return(nil)
})
record := &kgo.Record{
Topic: topic,
Partition: partition,
Key: message.Key,
Value: message.Value,
}
callback := message.Callback
logInfo := message.LogInfo
promise := func(_ *kgo.Record, err error) {
if err != nil {
errWithInfo := logutil.AnnotateEventError(
keyspace, changefeed,
logInfo,
err,
)
select {
case p.errCh <- errors.WrapError(errors.ErrKafkaAsyncSendMessage, errWithInfo):
// todo: remove this default after support dispatcher recover logic.
default:
}
return
}
if callback != nil {
callback()
}
}
p.client.Produce(ctx, record, promise)
return nil
func (p *AsyncProducer) AsyncSend(
ctx context.Context,
topic string,
partition int32,
message *common.Message,
) error {
if p.closed.Load() {
return errors.ErrKafkaProducerClosed.GenWithStackByArgs()
}
select {
case <-ctx.Done():
return errors.Trace(context.Cause(ctx))
default:
}
var (
keyspace = p.changefeedID.Keyspace()
changefeed = p.changefeedID.Name()
)
failpoint.Inject("KafkaSinkAsyncSendError", func() {
log.Info("KafkaSinkAsyncSendError error injected",
zap.String("keyspace", keyspace), zap.String("changefeed", changefeed))
errWithInfo := logutil.AnnotateEventError(
keyspace,
changefeed,
message.LogInfo,
errors.New("kafka sink injected error"),
)
select {
case p.errCh <- errors.WrapError(errors.ErrKafkaAsyncSendMessage, errWithInfo):
default:
}
failpoint.Return(nil)
})
record := &kgo.Record{
Topic: topic,
Partition: partition,
Key: message.Key,
Value: message.Value,
}
callback := message.Callback
logInfo := message.LogInfo
promise := func(_ *kgo.Record, err error) {
if err != nil {
errWithInfo := logutil.AnnotateEventError(
keyspace, changefeed,
logInfo,
err,
)
select {
case p.errCh <- errors.WrapError(errors.ErrKafkaAsyncSendMessage, errWithInfo):
// todo: remove this default after support dispatcher recover logic.
default:
}
return
}
if callback != nil {
callback()
}
}
p.client.Produce(ctx, record, promise)
return nil
🤖 Prompt for AI Agents
In `@pkg/sink/kafka/franz/async_producer.go` around lines 78 - 143, The return of
context.Cause(ctx) in AsyncSend should be wrapped to attach a stack trace;
update the select case that currently does "return context.Cause(ctx)" to
instead capture the error and return a wrapped error (e.g. errors.Trace(err) or
errors.WrapError(...)) so library/stdlib errors are traced; locate the select in
AsyncSend that checks <-ctx.Done() and replace the direct context.Cause(ctx)
return with a wrapped version while preserving the original behavior.

Comment on lines +148 to +164
func (p *AsyncProducer) AsyncRunCallback(ctx context.Context) error {
defer p.closed.Store(true)
for {
select {
case <-ctx.Done():
log.Info("async producer exit since context is done",
zap.String("keyspace", p.changefeedID.Keyspace()),
zap.String("changefeed", p.changefeedID.Name()))
return context.Cause(ctx)
case err := <-p.errCh:
if err == nil {
return nil
}
return err
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

AsyncRunCallback should also wrap the context.Cause error.

Same pattern as in AsyncSend: Line 156 returns context.Cause(ctx) unwrapped.

Proposed fix
 		case <-ctx.Done():
 			log.Info("async producer exit since context is done",
 				zap.String("keyspace", p.changefeedID.Keyspace()),
 				zap.String("changefeed", p.changefeedID.Name()))
-			return context.Cause(ctx)
+			return errors.Trace(context.Cause(ctx))

As per coding guidelines, "When an error comes from a third-party or library call in Go, wrap it immediately with errors.Trace(err) or errors.WrapError(...) to attach a stack trace."

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (p *AsyncProducer) AsyncRunCallback(ctx context.Context) error {
defer p.closed.Store(true)
for {
select {
case <-ctx.Done():
log.Info("async producer exit since context is done",
zap.String("keyspace", p.changefeedID.Keyspace()),
zap.String("changefeed", p.changefeedID.Name()))
return context.Cause(ctx)
case err := <-p.errCh:
if err == nil {
return nil
}
return err
}
}
}
func (p *AsyncProducer) AsyncRunCallback(ctx context.Context) error {
defer p.closed.Store(true)
for {
select {
case <-ctx.Done():
log.Info("async producer exit since context is done",
zap.String("keyspace", p.changefeedID.Keyspace()),
zap.String("changefeed", p.changefeedID.Name()))
return errors.Trace(context.Cause(ctx))
case err := <-p.errCh:
if err == nil {
return nil
}
return err
}
}
}
🤖 Prompt for AI Agents
In `@pkg/sink/kafka/franz/async_producer.go` around lines 148 - 164, In
AsyncRunCallback, the returned context.Cause(ctx) should be wrapped with the
project's error-tracing helper (e.g. errors.Trace or errors.WrapError) like
other callsites (same pattern used in AsyncSend); update the return in the
ctx.Done() case inside AsyncRunCallback to return
errors.Trace(context.Cause(ctx)) (and ensure the errors package is imported/used
consistently) so the context-derived error gets a stack trace.

Comment on lines 220 to 239
func newCompressionOption(o *Options) kgo.Opt {
compression := strings.ToLower(strings.TrimSpace(o.Compression))
var codec kgo.CompressionCodec
switch compression {
case "none":
codec = kgo.NoCompression()
case "gzip":
codec = kgo.GzipCompression()
case "snappy":
codec = kgo.SnappyCompression()
case "lz4":
codec = kgo.Lz4Compression()
case "zstd":
codec = kgo.ZstdCompression()
case "":
codec = kgo.NoCompression()
default:
log.Warn("unsupported compression algorithm", zap.String("compression", o.Compression))
}
return kgo.ProducerBatchCompression(codec)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Unrecognized compression falls through silently with a zero-value codec.

When compression is an unrecognized string, the code logs a warning (line 237) but then passes a zero-value kgo.CompressionCodec to kgo.ProducerBatchCompression. This results in silent degradation to no compression. Consider returning an error or explicitly falling back to kgo.NoCompression() to make the behavior intentional.

🛡️ Proposed fix — explicit fallback
 	default:
 		log.Warn("unsupported compression algorithm", zap.String("compression", o.Compression))
+		codec = kgo.NoCompression()
 	}
🤖 Prompt for AI Agents
In `@pkg/sink/kafka/franz/factory.go` around lines 220 - 239, The
newCompressionOption function currently logs an unrecognized compression but
leaves codec as the zero value, causing silent no-compression; update the
default branch in newCompressionOption (and any early-empty case) to explicitly
set codec = kgo.NoCompression() (or alternatively return an error) so that
kgo.ProducerBatchCompression always receives a valid codec; ensure the warning
log remains but does not allow codec to stay nil before returning.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
pkg/sink/kafka/metrics.go (1)

56-92: ⚠️ Potential issue | 🟠 Major

Default histogram buckets are unsuitable for all three franz metrics.

The Prometheus default buckets (DefBuckets: 0.005 … 10) are designed for second-based latencies. These histograms observe:

  1. franzRequestLatencyHistogram — values in milliseconds (see metrics_hook.go Line 175: Microseconds() / 1000). Typical latencies (5–500 ms) will all overflow into the +Inf bucket.
  2. franzCompressionRatioHistogram — values of ratio × 100 (see metrics_hook.go Line 200). Typical values (100–500) will all overflow.
  3. franzRecordsPerRequestHistogram — integer record counts. Also a poor fit for sub-1 buckets.

Without custom Buckets, these histograms provide no useful quantile information.

Proposed fix — add meaningful bucket ranges
 	franzRequestLatencyHistogram = prometheus.NewHistogramVec(
 		prometheus.HistogramOpts{
 			Namespace: "ticdc",
 			Subsystem: "sink",
 			Name:      "kafka_producer_request_latency_histogram",
 			Help:      "Request latency histogram for franz producer in milliseconds.",
+			Buckets:   []float64{1, 5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000},
 		}, []string{"namespace", "changefeed", "broker"})
 ...
 	franzCompressionRatioHistogram = prometheus.NewHistogramVec(
 		prometheus.HistogramOpts{
 			Namespace: "ticdc",
 			Subsystem: "sink",
 			Name:      "kafka_producer_compression_ratio_histogram",
 			Help:      "Compression ratio times 100 histogram for franz producer.",
+			Buckets:   []float64{50, 100, 150, 200, 300, 500, 1000},
 		}, []string{"namespace", "changefeed"})
 ...
 	franzRecordsPerRequestHistogram = prometheus.NewHistogramVec(
 		prometheus.HistogramOpts{
 			Namespace: "ticdc",
 			Subsystem: "sink",
 			Name:      "kafka_producer_records_per_request_histogram",
 			Help:      "Records per request histogram for franz producer.",
+			Buckets:   []float64{1, 5, 10, 25, 50, 100, 250, 500, 1000},
 		}, []string{"namespace", "changefeed"})
🤖 Fix all issues with AI agents
In `@pkg/sink/kafka/franz/metrics_hook.go`:
- Around line 120-140: RecordBrokerWrite increments RequestsInFlight only when
err==nil but OnBrokerE2E only decrements when e2e.WriteErr==nil, causing leaks;
update OnBrokerE2E to always decrement the RequestsInFlight gauge for the same
label tuple (keyspace, changefeed, brokerID) whenever the write was counted
(i.e., when RequestsInFlight != nil and the broker/request labels exist), rather
than only on e2e.WriteErr==nil; locate MetricsHook.OnBrokerE2E and remove or
broaden the e2e.WriteErr condition so
RequestsInFlight.WithLabelValues(...).Add(-1) (or .Dec()) is invoked
unconditionally when the corresponding request was previously incremented.
🧹 Nitpick comments (2)
pkg/sink/kafka/franz/admin_client_test.go (1)

25-36: Source-code grep test is fragile; prefer a behavioral assertion.

This test reads the source file as text and asserts on string patterns. It will silently pass if the error is misspelled, aliased, or if the function is moved to another file. A unit test that actually calls the code path (e.g., with a mock returning a "controller not available" condition) and asserts on the returned error type would be more reliable and refactor-proof.

pkg/sink/kafka/franz/metrics_hook.go (1)

70-80: Run won't clean up if BindPrometheusMetrics is called after Run starts.

Run reads bound once on entry (Line 71). If it's false, the early return on Line 75 skips cleanup entirely — even if BindPrometheusMetrics is called later while Run is blocked on ctx.Done(). This appears safe given the current call site (NewFranzFactory binds before Run is invoked), but it's worth a brief comment documenting that precondition.

Comment on lines 120 to 140
func (h *MetricsHook) RecordBrokerWrite(nodeID int32, bytesWritten int, err error) {
if nodeID < 0 {
return
}

keyspace, changefeed, metrics, bound := h.loadPrometheusMetrics()
if !bound {
return
}
brokerID := strconv.Itoa(int(nodeID))

if metrics.OutgoingByteRate != nil && bytesWritten > 0 {
metrics.OutgoingByteRate.WithLabelValues(keyspace, changefeed, brokerID).Add(float64(bytesWritten))
}
if metrics.RequestRate != nil {
metrics.RequestRate.WithLabelValues(keyspace, changefeed, brokerID).Add(1)
}
if err == nil && metrics.RequestsInFlight != nil {
metrics.RequestsInFlight.WithLabelValues(keyspace, changefeed, brokerID).Add(1)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

In-flight tracking increments on write but only decrements on E2E success — failed writes leak.

RecordBrokerWrite increments RequestsInFlight when err == nil (Line 137-138). OnBrokerE2E decrements it only when e2e.WriteErr == nil (Line 168-169). If a request is successfully written (err == nil in OnBrokerWrite) but later the E2E callback reports WriteErr != nil (e.g., broker dropped it), the in-flight gauge will never decrement, drifting upward over time.

Consider whether the decrement condition should be broadened (e.g., always decrement in OnBrokerE2E if the corresponding write incremented).

🤖 Prompt for AI Agents
In `@pkg/sink/kafka/franz/metrics_hook.go` around lines 120 - 140,
RecordBrokerWrite increments RequestsInFlight only when err==nil but OnBrokerE2E
only decrements when e2e.WriteErr==nil, causing leaks; update OnBrokerE2E to
always decrement the RequestsInFlight gauge for the same label tuple (keyspace,
changefeed, brokerID) whenever the write was counted (i.e., when
RequestsInFlight != nil and the broker/request labels exist), rather than only
on e2e.WriteErr==nil; locate MetricsHook.OnBrokerE2E and remove or broaden the
e2e.WriteErr condition so RequestsInFlight.WithLabelValues(...).Add(-1) (or
.Dec()) is invoked unconditionally when the corresponding request was previously
incremented.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@pkg/errors/helper.go`:
- Line 118: The review asks to confirm if temporary Kafka controller
unavailability should cause an unretryable changefeed failure: check the code
paths where ErrKafkaControllerNotAvailable is returned (notably GetBrokerConfig
and the meta.Controller < 0 check) and decide whether it represents a transient
error; if it is transient, change its classification from unretryable to
retryable (or wrap it so callers can distinguish transient vs permanent), update
any error mapping/constants where ErrKafkaControllerNotAvailable is listed
(e.g., remove it from permanent-failure lists), and add/update a
unit/integration test that simulates meta.Controller < 0 to ensure changefeed
initialization now retries with backoff instead of failing fast; also update
comments/docs around ErrKafkaControllerNotAvailable to describe expected retry
behavior.

In `@pkg/sink/kafka/franz/metrics_hook.go`:
- Around line 69-79: The Run method currently returns early if metrics aren't
yet bound, which lets metrics bound later leak; change MetricsHook.Run so it
does not early-return on !bound — instead, simply wait for ctx.Done() and then
call CleanupPrometheusMetrics(); use the existing
loadPrometheusMetrics()/promBound state only for the initial check if needed,
but ensure CleanupPrometheusMetrics() is always invoked on shutdown
(BindPrometheusMetrics may be called after Run starts), so remove the
early-return path that prevents cleanup when BindPrometheusMetrics runs later.
🧹 Nitpick comments (1)
pkg/sink/kafka/franz/admin_client_test.go (1)

27-38: Source-file-scanning test is fragile and non-behavioral.

This test reads the Go source file at runtime and does string matching. It will break if the file is renamed, moved, or if the error usage is refactored without changing semantics. A behavioral test that calls GetBrokerConfig with a mock/stub returning Controller < 0 and asserts the error type would be more robust and maintainable.


ErrSinkURIInvalid,
ErrKafkaInvalidConfig,
ErrKafkaControllerNotAvailable,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

# Check where ErrKafkaControllerNotAvailable is generated/used
rg -n 'ErrKafkaControllerNotAvailable' --type=go -C3

Repository: pingcap/ticdc

Length of output: 2064


🏁 Script executed:

# Get more context around the GetBrokerConfig function
rg -n 'GetBrokerConfig|meta.Controller' pkg/sink/kafka/franz/admin_client.go -B5 -A10

Repository: pingcap/ticdc

Length of output: 971


🏁 Script executed:

# Check where GetBrokerConfig is called
rg -n 'GetBrokerConfig' --type=go -B2 -A2

Repository: pingcap/ticdc

Length of output: 4324


🏁 Script executed:

# Check how other Kafka errors are classified in changefeedUnRetryableErrors
rg -n 'changefeedUnRetryableErrors' pkg/errors/helper.go -A50

Repository: pingcap/ticdc

Length of output: 2547


🏁 Script executed:

# Check how errors are handled during changefeed creation and where ShouldFailChangefeed is called
rg -n 'ShouldFailChangefeed|adjustOptions' --type=go -B2 -A2 | head -80

Repository: pingcap/ticdc

Length of output: 4769


🏁 Script executed:

# Check the context of where GetBrokerConfig is called in options.go - specifically around line 727
sed -n '715,740p' pkg/sink/kafka/options.go

Repository: pingcap/ticdc

Length of output: 886


🏁 Script executed:

# Check how errors from adjustOptions are handled in the factory
sed -n '40,80p' pkg/sink/kafka/sarama_factory.go

Repository: pingcap/ticdc

Length of output: 1212


🏁 Script executed:

# Check if there's any retry logic around changefeed sink creation
rg -n 'retry|Retry' pkg/sink/kafka/sarama_factory.go -B2 -A2

Repository: pingcap/ticdc

Length of output: 220


🏁 Script executed:

# Look for comments or documentation about why controller unavailability is unretryable
rg -n 'controller.*not.*available|Controller.*not.*available' --type=go -B3 -A3

Repository: pingcap/ticdc

Length of output: 1086


🏁 Script executed:

# Check the test to understand the intent better
cat -n pkg/sink/kafka/franz/admin_client_test.go | head -40

Repository: pingcap/ticdc

Length of output: 1552


🏁 Script executed:

# Check if there's any documentation or comments about retry behavior in options.go or factory
sed -n '580,600p' pkg/sink/kafka/options.go

Repository: pingcap/ticdc

Length of output: 655


🏁 Script executed:

# Check how changefeed creation handles errors and if there's retry logic
rg -n 'CreateSinkFactory|New.*Factory' --type=go -B3 -A3 | head -60

Repository: pingcap/ticdc

Length of output: 3927


🏁 Script executed:

# Look for any retry or backoff logic around changefeed creation
rg -n 'backoff|exponential|retry.*changefeed|changefeed.*retry' --type=go -i | head -20

Repository: pingcap/ticdc

Length of output: 2200


Confirm whether temporary controller unavailability should cause permanent changefeed failure.

ErrKafkaControllerNotAvailable is returned when meta.Controller < 0 during broker config lookup in GetBrokerConfig, which is called during changefeed initialization. Controller elections can be temporarily unavailable during broker restarts or failovers. Marking this unretryable causes changefeed creation to fail immediately rather than retry with backoff. Verify this is the intended behavior—whether the changefeed should fail fast on controller unavailability or allow transient recovery with retries.

🤖 Prompt for AI Agents
In `@pkg/errors/helper.go` at line 118, The review asks to confirm if temporary
Kafka controller unavailability should cause an unretryable changefeed failure:
check the code paths where ErrKafkaControllerNotAvailable is returned (notably
GetBrokerConfig and the meta.Controller < 0 check) and decide whether it
represents a transient error; if it is transient, change its classification from
unretryable to retryable (or wrap it so callers can distinguish transient vs
permanent), update any error mapping/constants where
ErrKafkaControllerNotAvailable is listed (e.g., remove it from permanent-failure
lists), and add/update a unit/integration test that simulates meta.Controller <
0 to ensure changefeed initialization now retries with backoff instead of
failing fast; also update comments/docs around ErrKafkaControllerNotAvailable to
describe expected retry behavior.

Comment on lines +69 to +79
func (h *MetricsHook) Run(ctx context.Context) {
_, _, _, bound := h.loadPrometheusMetrics()

if !bound {
<-ctx.Done()
return
}

<-ctx.Done()
h.CleanupPrometheusMetrics()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Run does not actually wait for binding — metrics will leak if BindPrometheusMetrics is called after Run starts.

If Run is invoked before BindPrometheusMetrics (a likely lifecycle ordering), it sees !bound, blocks on <-ctx.Done(), and returns without calling CleanupPrometheusMetrics. Metrics bound later are never cleaned up.

Consider using a channel or condition variable to wait for binding, or re-check promBound after <-ctx.Done():

Proposed minimal fix
 func (h *MetricsHook) Run(ctx context.Context) {
-	_, _, _, bound := h.loadPrometheusMetrics()
-
-	if !bound {
-		<-ctx.Done()
-		return
-	}
-
 	<-ctx.Done()
 	h.CleanupPrometheusMetrics()
 }

CleanupPrometheusMetrics already guards against the unbound case, so the early-return check is unnecessary and actively harmful.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (h *MetricsHook) Run(ctx context.Context) {
_, _, _, bound := h.loadPrometheusMetrics()
if !bound {
<-ctx.Done()
return
}
<-ctx.Done()
h.CleanupPrometheusMetrics()
}
func (h *MetricsHook) Run(ctx context.Context) {
<-ctx.Done()
h.CleanupPrometheusMetrics()
}
🤖 Prompt for AI Agents
In `@pkg/sink/kafka/franz/metrics_hook.go` around lines 69 - 79, The Run method
currently returns early if metrics aren't yet bound, which lets metrics bound
later leak; change MetricsHook.Run so it does not early-return on !bound —
instead, simply wait for ctx.Done() and then call CleanupPrometheusMetrics();
use the existing loadPrometheusMetrics()/promBound state only for the initial
check if needed, but ensure CleanupPrometheusMetrics() is always invoked on
shutdown (BindPrometheusMetrics may be called after Run starts), so remove the
early-return path that prevents cleanup when BindPrometheusMetrics runs later.

@ti-chi-bot
Copy link

ti-chi-bot bot commented Feb 11, 2026

[FORMAT CHECKER NOTIFICATION]

Notice: To remove the do-not-merge/needs-linked-issue label, please provide the linked issue number on one line in the PR body, for example: Issue Number: close #123 or Issue Number: ref #456.

📖 For more info, you can check the "Contribute Code" section in the development guide.

@3AceShowHand
Copy link
Collaborator Author

/test all

@ti-chi-bot
Copy link

ti-chi-bot bot commented Feb 11, 2026

@3AceShowHand: The following tests failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
pull-cdc-kafka-integration-light 6e39cdd link true /test pull-cdc-kafka-integration-light
pull-cdc-kafka-integration-heavy 6e39cdd link true /test pull-cdc-kafka-integration-heavy
pull-cdc-pulsar-integration-heavy 6e39cdd link false /test pull-cdc-pulsar-integration-heavy

Full PR test history. Your PR dashboard.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

do-not-merge/needs-linked-issue do-not-merge/release-note-label-needed Indicates that a PR should not merge because it's missing one of the release note labels. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant