-
Notifications
You must be signed in to change notification settings - Fork 38
kafka: make franz the default sink client #4167
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
|
Adding the "do-not-merge/release-note-label-needed" label because no release-note block was detected, please follow our release note process to remove it. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds Franz-go as an alternative Kafka client to the sink: factory selection, full Franz-based admin/producer implementations, SASL/GSSAPI support, Prometheus metrics integration, config/url option to choose client, adapter layer to reuse downstream logic, and accompanying tests and go.mod updates. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches🧪 Generate unit tests (beta)
Tip Issue Planner is now in beta. Read the docs and try it out! Share your feedback on Discord. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Summary of ChangesHello @3AceShowHand, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly refactors the Kafka sink client by making Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
|
/test all |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request makes franz-go the default Kafka sink client, introducing a new implementation for producers and the admin client. However, it introduces potential security vulnerabilities related to SSRF and Path Traversal, specifically with user-provided URLs for OAuth2 token endpoints and file paths for Kerberos configurations lacking proper validation. Additionally, there are a few code-related issues to address: a resource leak in the admin client's Close method, a non-blocking Close in the async producer which should be blocking, and the use of incorrect timeout values in producer configurations. Addressing these points will improve both the security and robustness of the new implementation.
pkg/sink/kafka/franz_factory.go
Outdated
| tokenURL, err := url.Parse(o.SASL.OAuth2.TokenURL) | ||
| if err != nil { | ||
| return nil, errors.Trace(err) | ||
| } | ||
|
|
||
| cfg := &clientcredentials.Config{ | ||
| ClientID: o.SASL.OAuth2.ClientID, | ||
| ClientSecret: o.SASL.OAuth2.ClientSecret, | ||
| TokenURL: tokenURL.String(), | ||
| EndpointParams: endpointParams, | ||
| Scopes: o.SASL.OAuth2.Scopes, | ||
| } | ||
| return cfg.TokenSource(ctx), nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The buildFranzOauthTokenSource function uses the user-provided o.SASL.OAuth2.TokenURL to create an OAuth2 token source. This token source will make HTTP POST requests to the provided URL when fetching tokens. Since there is no validation of the URL, an attacker who can create or modify a changefeed can provide an internal URL (e.g., pointing to a cloud metadata service or internal API), leading to Server-Side Request Forgery (SSRF). It is recommended to validate the TokenURL against an allow-list of trusted domains or ensure it does not point to internal IP addresses.
| cfg, err := krb5config.Load(g.KerberosConfigPath) | ||
| if err != nil { | ||
| return nil, errors.Trace(err) | ||
| } | ||
|
|
||
| var client *krb5client.Client | ||
| switch g.AuthType { | ||
| case security.KeyTabAuth: | ||
| kt, err := keytab.Load(g.KeyTabPath) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The newFranzKerberosClient function uses g.KerberosConfigPath and g.KeyTabPath directly in krb5config.Load and keytab.Load. These paths are provided by the user via the sink-uri or changefeed configuration. An attacker can provide absolute paths to sensitive files on the server (e.g., /etc/passwd or internal configuration files), leading to arbitrary file access. While these functions are intended to load Kerberos configurations, they do not restrict the file paths to a safe directory. It is recommended to validate that the provided paths are within an expected directory or restrict the use of absolute paths and parent directory references (..).
pkg/sink/kafka/franz_admin_client.go
Outdated
| func (a *franzAdminClient) Close() { | ||
| if a.admin != nil { | ||
| a.admin.Close() | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The kadm.Client.Close() method is a no-op. The underlying kgo.Client (a.client) must be closed to release resources. The current implementation will lead to a resource leak as the kgo.Client is never closed.
| func (a *franzAdminClient) Close() { | |
| if a.admin != nil { | |
| a.admin.Close() | |
| } | |
| } | |
| func (a *franzAdminClient) Close() { | |
| if a.client != nil { | |
| a.client.Close() | |
| } | |
| } |
pkg/sink/kafka/franz_factory.go
Outdated
| produceTimeout := o.ReadTimeout | ||
| if produceTimeout < 100*time.Millisecond { | ||
| produceTimeout = 10 * time.Second | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The produceTimeout is being set from o.ReadTimeout. For consistency with the Sarama implementation (saramaConfig.Producer.Timeout), this should be based on o.WriteTimeout, which is more relevant for a producer's request timeout.
| produceTimeout := o.ReadTimeout | |
| if produceTimeout < 100*time.Millisecond { | |
| produceTimeout = 10 * time.Second | |
| } | |
| produceTimeout := o.WriteTimeout | |
| if produceTimeout <= 0 { | |
| // This is a fail-safe, as the default is 10s. | |
| produceTimeout = 10 * time.Second | |
| } |
| produceTimeout := o.ReadTimeout | ||
| if produceTimeout <= 0 { | ||
| produceTimeout = 10 * time.Second | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The produceTimeout is calculated based on o.ReadTimeout. For a producer, o.WriteTimeout is the more appropriate configuration to use, which is also consistent with how the Sarama producer is configured.
| produceTimeout := o.ReadTimeout | |
| if produceTimeout <= 0 { | |
| produceTimeout = 10 * time.Second | |
| } | |
| produceTimeout := o.WriteTimeout | |
| if produceTimeout <= 0 { | |
| produceTimeout = 10 * time.Second | |
| } |
| func (p *franzAsyncProducer) Close() { | ||
| if !p.closed.CompareAndSwap(false, true) { | ||
| return | ||
| } | ||
|
|
||
| go func() { | ||
| start := time.Now() | ||
| p.client.Close() | ||
| log.Info("Close kafka async producer success", | ||
| zap.String("keyspace", p.changefeedID.Keyspace()), | ||
| zap.String("changefeed", p.changefeedID.Name()), | ||
| zap.Duration("duration", time.Since(start))) | ||
| }() | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Closing the client in a new goroutine makes the Close method non-blocking. However, kgo.Client.Close() is a blocking call that waits for buffered messages to be sent. To prevent data loss on shutdown, this Close method should be blocking, consistent with the saramaAsyncProducer implementation and the expectation of its callers.
func (p *franzAsyncProducer) Close() {
if !p.closed.CompareAndSwap(false, true) {
return
}
start := time.Now()
p.client.Close()
log.Info("Close kafka async producer success",
zap.String("keyspace", p.changefeedID.Keyspace()),
zap.String("changefeed", p.changefeedID.Name()),
zap.Duration("duration", time.Since(start)))
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 9
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
go.mod (1)
3-3:⚠️ Potential issue | 🟡 MinorUpdate Go version to the latest 1.25 patch release (1.25.7).
The go.mod specifies
go 1.25.5, but Go 1.25.7 was released on February 4, 2026, and is now the latest stable patch in the 1.25 series. Consider updating togo 1.25.7to pick up the latest bug fixes and security patches.
🤖 Fix all issues with AI agents
In `@docs/design/2026-01-09-franz-go-kafka-sink.md`:
- Around line 117-119: The doc's stated default for the kafka-client flag is out
of sync with code: update the design doc text that currently reads
"kafka-client=sarama|franz(默认 sarama)" to reflect the actual default used by
NewOptions() in options.go and the behavior in factory_selector.go (which treat
"" as "franz"), i.e., change the default to franz or clearly document that franz
is the default and sarama must be opted into via kafka-client=sarama.
In `@go.mod`:
- Around line 44-47: go.mod currently pins Go 1.25.5 which is vulnerable to
CVE-2025-61728; update the Go toolchain version to 1.25.6 in go.mod and run `go
mod tidy`/`go get` to regenerate the lock info, and also bump the listed module
versions: update github.com/klauspost/compress to v1.18.3, pierrec/lz4/v4 to
v4.1.25, golang.org/x/sync to v0.19.0, golang.org/x/sys to v0.40.0, and
golang.org/x/net to v0.49.0 (leave twmb/franz-go, pkg/kadm, pkg/kmsg as-is);
ensure after changes you run the test suite and CI to verify builds.
In `@pkg/sink/kafka/franz_async_producer.go`:
- Around line 68-81: The Close() currently returns without closing p.client when
p.closed was already set by AsyncRunCallback; update the lifecycle so
client.Close() is always invoked exactly once regardless of p.closed state—for
example add a dedicated sync.Once (e.g., producerCloseOnce) or separate flags so
that franzAsyncProducer.Close() always calls p.client.Close() (or triggers the
once) even when AsyncRunCallback sets p.closed; ensure AsyncRunCallback no
longer assumes responsibility for closing p.client and instead calls the same
sync.Once/closing helper used by Close(), referencing franzAsyncProducer.Close,
AsyncRunCallback, p.closed, p.client.Close, and the new sync.Once field to
guarantee no leak.
In `@pkg/sink/kafka/franz_factory_test.go`:
- Line 82: The test uses a misspelled constant common.DefaultKeyspaceNamme
(double 'm') when constructing changefeedID in the changefeedID :=
common.NewChangefeedID4Test(...) line; replace it with the correctly named
constant common.DefaultKeyspaceName to match other tests and avoid compile
errors, and run the repository-wide search for DefaultKeyspaceNam* to confirm
there are no other misspelled occurrences.
In `@pkg/sink/kafka/franz_factory.go`:
- Around line 268-271: The produceTimeout is being set from o.ReadTimeout but
since this value controls the producer (write) request timeout, update the
assignment to use o.WriteTimeout instead of o.ReadTimeout; locate the
produceTimeout variable initialization in franz_factory.go (the block that
currently reads produceTimeout := o.ReadTimeout and the subsequent min check)
and change it to derive from o.WriteTimeout, preserving the same minimum/default
fallback logic.
- Around line 155-177: The code in buildFranzTLSConfig currently sets ALPN
NextProtos to {"h2","http/1.1"} which is inappropriate for Kafka; update
buildFranzTLSConfig to stop setting NextProtos by default and avoid overwriting
credentialTlsConfig.NextProtos — remove the NextProtos fields at lines where
tlsConfig is constructed and where credentialTlsConfig is applied (refer to the
local variable tlsConfig, the parameter o and credentialTlsConfig returned by
o.Credential.ToTLSConfig()), while preserving MinVersion enforcement and
InsecureSkipVerify copying.
- Around line 69-75: In AdminClient (franzFactory.AdminClient) the error is
wrapped with errors.ErrKafkaNewProducer which is the wrong operation; change the
error handling to use errors.Trace(err) (same pattern as used in
NewFranzFactory) so the original error is preserved and not mislabelled as a
"new producer" failure; locate newFranzAdminClient call in AdminClient and
replace the errors.WrapError(errors.ErrKafkaNewProducer, err) return with
returning errors.Trace(err).
In `@pkg/sink/kafka/franz_gssapi.go`:
- Around line 160-162: The call to wrapTokenResp.Marshal() returns an unwrapped
third-party error; replace the direct return with wrapping the error using
errors.Trace (or errors.WrapError) so the stack is preserved: call
wrapTokenResp.Marshal(), check the returned error, and if non-nil return
errors.Trace(err) (keeping the existing s.step = gssAPIFinished assignment)
instead of returning the raw error from wrapTokenResp.Marshal().
In `@pkg/sink/kafka/franz_sync_producer.go`:
- Around line 60-64: The code uses o.ReadTimeout to compute produceTimeout for
sync produce; change this to use o.WriteTimeout (falling back to the same 10s
default if <=0) so the produce timeout reflects a write operation; update the
assignment of produceTimeout (and any related calculation using
franzSyncRecordRetries) to derive timeout from o.WriteTimeout instead of
o.ReadTimeout to match the design-doc's ProduceRequestTimeout/WriteTimeout
alignment.
🧹 Nitpick comments (6)
docs/design/2026-01-09-franz-go-kafka-sink.md (1)
66-71: Add a language specifier to the fenced code block.The code block on line 66 has no language tag. Use
```textor similar to silence the markdownlint MD040 warning.pkg/sink/kafka/franz_sync_producer.go (1)
146-161: Close() has a TOCTOU race between check and store.Two concurrent
Close()calls can both pass thep.closed.Load()check before either callsStore(true), leading to doubleclient.Close(). Consider usingCompareAndSwapfor atomicity.Proposed fix
func (p *franzSyncProducer) Close() { - if p.closed.Load() { + if !p.closed.CompareAndSwap(false, true) { log.Warn("kafka DDL producer already closed", zap.String("keyspace", p.id.Keyspace()), zap.String("changefeed", p.id.Name())) return } - p.closed.Store(true) start := time.Now()pkg/sink/kafka/franz_metrics_collector.go (1)
96-139: Consider whetherinFlightaccounting is correct for all edge cases.
inFlightis incremented inOnBrokerWritewhenerr == nil(Line 114) and decremented inOnBrokerE2EwhenWriteErr == nil(Line 129). If thekgo.Clientis closed abruptly,OnBrokerE2Emay never fire for some in-flight requests, leaving the counter inflated. The clamp to 0 at Line 130 helps prevent negative values but won't correct an over-count. This is a minor observability inaccuracy, not a functional bug.pkg/sink/kafka/franz_admin_client.go (1)
74-95:GetAllBrokerssilently returns nil on error — callers must handle this.When
BrokerMetadatafails, the method logs a warning and returnsnil(Line 84). If callers don't check for nil, they'll get an empty broker list which could cause silent misconfiguration downstream (e.g., metrics not attached to any broker). Consider whether returning an error would be more appropriate for this method, aligning with the pattern used by the otherGet*methods.pkg/sink/kafka/franz_factory.go (2)
246-284: Disabling idempotent writes with single in-flight request per broker is a valid ordering guarantee, but worth documenting the trade-off.
DisableIdempotentWrite()(Line 276) plusMaxProduceRequestsInflightPerBroker(1)(Line 277) ensures strict ordering but sacrifices throughput and exactly-once semantics. This is a conscious design choice that matches the Sarama defaults. Consider adding a brief comment explaining why idempotency is disabled (e.g., TiCDC manages its own deduplication).
286-304: Unknown compression silently defaults toNoCompression.Lines 301–303 log a warning for unrecognized compression algorithms but proceed with
NoCompression. If a user configures"brotli"(unsupported), they might not notice the warning and assume compression is active, leading to unexpectedly high bandwidth usage. Consider returning an error for truly unknown values to fail fast.Proposed change
default: - log.Warn("unsupported compression algorithm", zap.String("compression", o.Compression)) - return kgo.ProducerBatchCompression(kgo.NoCompression()), nil + return nil, errors.ErrKafkaInvalidConfig.GenWithStack( + "unsupported compression algorithm: %s", o.Compression) }
| 建议增加一个可选 sink-uri 参数来选择 Kafka client 实现,默认保持 Sarama: | ||
|
|
||
| - 新增参数:`kafka-client=sarama|franz`(默认 sarama) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Design doc states default is sarama, but code now defaults to franz.
Line 119 says kafka-client=sarama|franz(默认 sarama), but NewOptions() in options.go now sets KafkaClient: "franz" and factory_selector.go treats "" as franz. Update the doc to reflect the new default.
🤖 Prompt for AI Agents
In `@docs/design/2026-01-09-franz-go-kafka-sink.md` around lines 117 - 119, The
doc's stated default for the kafka-client flag is out of sync with code: update
the design doc text that currently reads "kafka-client=sarama|franz(默认 sarama)"
to reflect the actual default used by NewOptions() in options.go and the
behavior in factory_selector.go (which treat "" as "franz"), i.e., change the
default to franz or clearly document that franz is the default and sarama must
be opted into via kafka-client=sarama.
| github.com/jcmturner/gofork v1.7.6 | ||
| github.com/jcmturner/gokrb5/v8 v8.4.4 | ||
| github.com/json-iterator/go v1.1.12 | ||
| github.com/klauspost/compress v1.18.0 | ||
| github.com/klauspost/compress v1.18.2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
cd /dev/null && pwd || true # Check contextRepository: pingcap/ticdc
Length of output: 106
🏁 Script executed:
find . -name "go.mod" -type f | head -20Repository: pingcap/ticdc
Length of output: 154
🏁 Script executed:
cat -n ./go.mod | head -100Repository: pingcap/ticdc
Length of output: 4747
🏁 Script executed:
cat -n ./go.mod | tail -20Repository: pingcap/ticdc
Length of output: 1129
🏁 Script executed:
cat -n ./go.mod | grep -E "franz-go|kmsg"Repository: pingcap/ticdc
Length of output: 209
🌐 Web query:
github.com/twmb/franz-go v1.20.6 latest version 2025 2026 compatibility
💡 Result:
- Latest version (as of Feb 7, 2026):
github.com/twmb/franz-gov1.20.6, published Dec 20, 2025 (no newer tag is listed on pkg.go.dev). [1] - Kafka compatibility: franz-go states it supports Apache Kafka 0.8.0 through 4.1+ (and Kafka-compatible brokers like Redpanda, Confluent Platform, MSK, etc.). [2]
- 2025/2026 usage note: v1.20.6 is a v1 (stable) module release line in late 2025; for “compatibility” beyond Kafka protocol (i.e., minimum Go version), the authoritative source is the module’s
go.modfor that tag. [1][2]
Sources
[1] https://pkg.go.dev/github.com/twmb/franz-go?tab=versions
[2] https://github.com/twmb/franz-go
🌐 Web query:
github.com/klauspost/compress v1.18.2 latest version security advisories 2025 2026
💡 Result:
- Latest version (as of Feb 7, 2026):
github.com/klauspost/compressv1.18.3 (released Jan 16, 2026). v1.18.2 was released Dec 1, 2025. [1] - Security advisories (2025–2026):
- The v1.18.3 release notes call out a downstream issue: CVE-2025-61728 (Go standard library
archive/zipDoS via super-linear indexing) and references archive/zip: denial of service when parsing arbitrary ZIP archives (CVE-2025-61728) golang/go#77102. [1] - In the Go Vulnerability DB, this is tracked as GO-2026-4342 / CVE-2025-61728, affecting Go < 1.24.12 and Go 1.25.0–1.25.5 (fixed in 1.24.12 and 1.25.6). [2]
- The v1.18.3 release notes call out a downstream issue: CVE-2025-61728 (Go standard library
Sources:
[1] GitHub releases for klauspost/compress (shows v1.18.3 “Latest” and “Downstream CVE-2025-61728”)
[2] Go Vulnerability Database entry GO-2026-4342 (CVE-2025-61728)
🌐 Web query:
github.com/pierrec/lz4/v4 v4.1.22 latest version 2025 2026
💡 Result:
github.com/pierrec/lz4/v4 v4.1.22 is not the latest.
As of February 7, 2026, the latest tagged/module version is v4.1.25 (Jan 17, 2026). Recent versions are: v4.1.24 (Jan 16, 2026), v4.1.23 (May 10, 2025), v4.1.22 (Dec 12, 2024). [1][2]
Note: the GitHub “Releases” page may lag behind tags (it shows only up to v4.1.23), but the Tags list and Go module index show v4.1.25. [1][2]
Sources: [1] GitHub tags for pierrec/lz4, [2] pkg.go.dev versions for github.com/pierrec/lz4/v4.
🌐 Web query:
golang.org/x/net golang.org/x/sync golang.org/x/sys v0.47.0 v0.18.0 v0.38.0 latest versions 2026 security advisories CVE
💡 Result:
Latest module versions (as of Feb 7, 2026)
| Module | Your version | Latest version (published) |
|---|---|---|
golang.org/x/net |
v0.47.0 | v0.49.0 (Jan 12, 2026) [1] |
golang.org/x/sync |
v0.18.0 | v0.19.0 (Dec 4, 2025) [2] |
golang.org/x/sys |
v0.38.0 | v0.40.0 (Dec 19, 2025) [3] |
Security advisories / CVEs relevant to these modules (notably in 2025–2026)
golang.org/x/net
- CVE-2025-58190 / GO-2026-4441 — DoS via infinite parsing loop in
x/net/html; affected: before v0.45.0 (so not v0.47.0+) [4] - CVE-2025-47911 / GO-2026-4440 — DoS via quadratic parsing complexity in
x/net/html; affected: before v0.45.0 (so not v0.47.0+) [5] - CVE-2025-22870 / GO-2025-3503 — proxy bypass via IPv6 zone ID handling; affected: before v0.36.0 (so not v0.47.0+) [6]
- CVE-2025-22872 (GHSA-vvgc-356p-c3xw) — tokenizer issue (GitHub advisory); affected: < 0.38.0, patched in 0.38.0 (so not v0.47.0+) [7]
golang.org/x/sync
- No entries found for
golang.org/x/syncin the Go Vulnerability Database “All Reports” list (searched on Feb 7, 2026). [8]
golang.org/x/sys
- No entries found for
golang.org/x/sysin the Go Vulnerability Database “All Reports” list (searched on Feb 7, 2026). [8]
Bottom line for the specific versions you listed:
x/net v0.47.0is already past the fixed versions for the 2025–2026x/netCVEs above, but is not the latest (latest is v0.49.0). [1][4][5][6][7]x/sync v0.18.0→ update available to v0.19.0; no Go-vulndb CVEs found. [2][8]x/sys v0.38.0→ update available to v0.40.0; no Go-vulndb CVEs found. [3][8]
Sources
[1] golang.org/x/net on pkg.go.dev (v0.49.0) (pkg.go.dev)
[2] golang.org/x/sync on pkg.go.dev (v0.19.0) (pkg.go.dev)
[3] golang.org/x/sys on pkg.go.dev (v0.40.0) (pkg.go.dev)
[4] GO-2026-4441 (CVE-2025-58190) (pkg.go.dev)
[5] GO-2026-4440 (CVE-2025-47911) (pkg.go.dev)
[6] GO-2025-3503 (CVE-2025-22870) (osv.dev)
[7] GHSA-vvgc-356p-c3xw (CVE-2025-22872) (github.com)
[8] Go Vulnerability Database – “All Reports” list (used to check for x/sync / x/sys) (pkg.go.dev)
Citations:
- 1: https://pkg.go.dev/golang.org/x/net
- 2: https://pkg.go.dev/golang.org/x/sync
- 3: https://pkg.go.dev/golang.org/x/sys
- 4: https://pkg.go.dev/vuln/GO-2026-4441?utm_source=openai
- 5: https://pkg.go.dev/vuln/GO-2026-4440?utm_source=openai
- 6: https://osv.dev/vulnerability/GO-2025-3503?utm_source=openai
- 7: https://github.com/advisories/GHSA-vvgc-356p-c3xw?utm_source=openai
- 8: https://pkg.go.dev/vuln/list
🌐 Web query:
Go 1.25.5 CVE-2025-61728 vulnerability affected versions 1.25.6 security patch
💡 Result:
CVE-2025-61728 is a denial-of-service (CPU exhaustion) issue in Go’s standard library archive/zip: opening a file in a specially crafted ZIP can trigger a super-linear filename indexing algorithm. [3]
Affected Go versions
- All versions before Go 1.24.12
- Go 1.25.0 through 1.25.5 (i.e., < 1.25.6) [1][3]
So Go 1.25.5 is affected. [1]
Fixed / security patch versions
- Go 1.25.6 (fix for the Go 1.25.x line)
- Go 1.24.12 (fix for the Go 1.24.x line) [1][2]
What to do
- Upgrade to Go 1.25.6 (or newer) to remediate CVE-2025-61728. [1][2]
Upgrade Go to 1.25.6 to address CVE-2025-61728 vulnerability in archive/zip.
Go 1.25.5 (line 3) is affected by CVE-2025-61728, a denial-of-service vulnerability in the standard library's archive/zip package. The fix is available in Go 1.25.6. Additionally, several dependency versions are outdated and have newer releases available:
klauspost/compress v1.18.2(line 47) → update to v1.18.3 (Jan 16, 2026; latest as of Feb 7, 2026)pierrec/lz4/v4 v4.1.22(line 51) → update to v4.1.25 (Jan 17, 2026; latest)golang.org/x/sync v0.18.0(line 94) → update to v0.19.0 (Dec 4, 2025; no security advisories)golang.org/x/sys v0.38.0(line 95) → update to v0.40.0 (Dec 19, 2025; no security advisories)golang.org/x/net v0.47.0(line 92) → update to v0.49.0 (Jan 12, 2026; all known 2025 CVEs patched before v0.45.0)github.com/twmb/franz-go v1.20.6(line 76),pkg/kadm v1.17.1(line 77), andpkg/kmsg v1.12.0(line 336) are at or near latest stable versions with no known security advisories.
🤖 Prompt for AI Agents
In `@go.mod` around lines 44 - 47, go.mod currently pins Go 1.25.5 which is
vulnerable to CVE-2025-61728; update the Go toolchain version to 1.25.6 in
go.mod and run `go mod tidy`/`go get` to regenerate the lock info, and also bump
the listed module versions: update github.com/klauspost/compress to v1.18.3,
pierrec/lz4/v4 to v4.1.25, golang.org/x/sync to v0.19.0, golang.org/x/sys to
v0.40.0, and golang.org/x/net to v0.49.0 (leave twmb/franz-go, pkg/kadm,
pkg/kmsg as-is); ensure after changes you run the test suite and CI to verify
builds.
| func (p *franzAsyncProducer) Close() { | ||
| if !p.closed.CompareAndSwap(false, true) { | ||
| return | ||
| } | ||
|
|
||
| go func() { | ||
| start := time.Now() | ||
| p.client.Close() | ||
| log.Info("Close kafka async producer success", | ||
| zap.String("keyspace", p.changefeedID.Keyspace()), | ||
| zap.String("changefeed", p.changefeedID.Name()), | ||
| zap.Duration("duration", time.Since(start))) | ||
| }() | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resource leak: kgo.Client is never closed when AsyncRunCallback exits before Close().
AsyncRunCallback unconditionally sets closed=true on exit (Line 151). When Close() is subsequently called, the CompareAndSwap(false, true) at Line 69 fails and the method returns without calling p.client.Close(). This leaves the kgo.Client open — leaking connections, goroutines, and memory.
Either decouple the "no more sends" flag from the "client closed" lifecycle, or ensure client.Close() is called unconditionally with a sync.Once:
Proposed fix using sync.Once
type franzAsyncProducer struct {
client *kgo.Client
changefeedID commonType.ChangeFeedID
- closed *atomic.Bool
- errCh chan error
+ closed *atomic.Bool
+ closeOnce sync.Once
+ errCh chan error
}
func (p *franzAsyncProducer) Close() {
- if !p.closed.CompareAndSwap(false, true) {
- return
- }
-
- go func() {
+ p.closed.Store(true)
+ p.closeOnce.Do(func() {
start := time.Now()
p.client.Close()
- log.Info("Close kafka async producer success",
+ log.Info("kafka async producer closed",
zap.String("keyspace", p.changefeedID.Keyspace()),
zap.String("changefeed", p.changefeedID.Name()),
zap.Duration("duration", time.Since(start)))
- }()
+ })
}Also applies to: 150-165
🤖 Prompt for AI Agents
In `@pkg/sink/kafka/franz_async_producer.go` around lines 68 - 81, The Close()
currently returns without closing p.client when p.closed was already set by
AsyncRunCallback; update the lifecycle so client.Close() is always invoked
exactly once regardless of p.closed state—for example add a dedicated sync.Once
(e.g., producerCloseOnce) or separate flags so that franzAsyncProducer.Close()
always calls p.client.Close() (or triggers the once) even when AsyncRunCallback
sets p.closed; ensure AsyncRunCallback no longer assumes responsibility for
closing p.client and instead calls the same sync.Once/closing helper used by
Close(), referencing franzAsyncProducer.Close, AsyncRunCallback, p.closed,
p.client.Close, and the new sync.Once field to guarantee no leak.
pkg/sink/kafka/franz_factory.go
Outdated
| produceTimeout := o.ReadTimeout | ||
| if produceTimeout < 100*time.Millisecond { | ||
| produceTimeout = 10 * time.Second | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
produceTimeout derived from ReadTimeout — should this be WriteTimeout?
Line 268 uses o.ReadTimeout for the produce request timeout. Since producing is a write-path operation, o.WriteTimeout seems like a more natural choice. Verify that ReadTimeout is intentional here and not a copy-paste from the base options builder.
🤖 Prompt for AI Agents
In `@pkg/sink/kafka/franz_factory.go` around lines 268 - 271, The produceTimeout
is being set from o.ReadTimeout but since this value controls the producer
(write) request timeout, update the assignment to use o.WriteTimeout instead of
o.ReadTimeout; locate the produceTimeout variable initialization in
franz_factory.go (the block that currently reads produceTimeout := o.ReadTimeout
and the subsequent min check) and change it to derive from o.WriteTimeout,
preserving the same minimum/default fallback logic.
| } | ||
| s.step = gssAPIFinished | ||
| return wrapTokenResp.Marshal() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unwrapped third-party error from Marshal().
wrapTokenResp.Marshal() (gokrb5 library call) can return an error that is not wrapped with errors.Trace. As per coding guidelines, "When an error comes from a third-party or library call in Go, wrap it immediately with errors.Trace(err) or errors.WrapError(...) to attach a stack trace".
Proposed fix
s.step = gssAPIFinished
- return wrapTokenResp.Marshal()
+ marshaled, err := wrapTokenResp.Marshal()
+ if err != nil {
+ return nil, errors.Trace(err)
+ }
+ return marshaled, nil📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| } | |
| s.step = gssAPIFinished | |
| return wrapTokenResp.Marshal() | |
| } | |
| s.step = gssAPIFinished | |
| marshaled, err := wrapTokenResp.Marshal() | |
| if err != nil { | |
| return nil, errors.Trace(err) | |
| } | |
| return marshaled, nil |
🤖 Prompt for AI Agents
In `@pkg/sink/kafka/franz_gssapi.go` around lines 160 - 162, The call to
wrapTokenResp.Marshal() returns an unwrapped third-party error; replace the
direct return with wrapping the error using errors.Trace (or errors.WrapError)
so the stack is preserved: call wrapTokenResp.Marshal(), check the returned
error, and if non-nil return errors.Trace(err) (keeping the existing s.step =
gssAPIFinished assignment) instead of returning the raw error from
wrapTokenResp.Marshal().
| produceTimeout := o.ReadTimeout | ||
| if produceTimeout <= 0 { | ||
| produceTimeout = 10 * time.Second | ||
| } | ||
| timeout := time.Duration(franzSyncRecordRetries+1) * produceTimeout |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ReadTimeout used for produce timeout — should this be WriteTimeout?
o.ReadTimeout is used to derive the overall sync-produce timeout, but this is a write (produce) operation. The design doc (line 166) mentions aligning ProduceRequestTimeout with WriteTimeout. Consider using o.WriteTimeout instead for semantic correctness.
Proposed fix
- produceTimeout := o.ReadTimeout
+ produceTimeout := o.WriteTimeout
if produceTimeout <= 0 {
produceTimeout = 10 * time.Second
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| produceTimeout := o.ReadTimeout | |
| if produceTimeout <= 0 { | |
| produceTimeout = 10 * time.Second | |
| } | |
| timeout := time.Duration(franzSyncRecordRetries+1) * produceTimeout | |
| produceTimeout := o.WriteTimeout | |
| if produceTimeout <= 0 { | |
| produceTimeout = 10 * time.Second | |
| } | |
| timeout := time.Duration(franzSyncRecordRetries+1) * produceTimeout |
🤖 Prompt for AI Agents
In `@pkg/sink/kafka/franz_sync_producer.go` around lines 60 - 64, The code uses
o.ReadTimeout to compute produceTimeout for sync produce; change this to use
o.WriteTimeout (falling back to the same 10s default if <=0) so the produce
timeout reflects a write operation; update the assignment of produceTimeout (and
any related calculation using franzSyncRecordRetries) to derive timeout from
o.WriteTimeout instead of o.ReadTimeout to match the design-doc's
ProduceRequestTimeout/WriteTimeout alignment.
|
/test pull-cdc-kafka-integration-light |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 6
🤖 Fix all issues with AI agents
In `@pkg/sink/kafka/franz_metrics_collector.go`:
- Around line 77-94: cleanupMetrics only looks at the final hook.Snapshot() so
broker IDs seen earlier can be left as stale Prometheus labels; modify
franzMetricsCollector to maintain a persistent set (e.g., seenBrokers
map[uint32]struct{} or set of string IDs) that is updated inside collectMetrics
whenever you iterate snapshot.Brokers, and then have cleanupMetrics iterate that
accumulated set (not just the final snapshot) to call
OutgoingByteRateGauge.DeleteLabelValues, RequestRateGauge.DeleteLabelValues,
RequestLatencyGauge.DeleteLabelValues, requestsInFlightGauge.DeleteLabelValues
and responseRateGauge.DeleteLabelValues for every brokerID seen; ensure
seenBrokers is initialized in the constructor and cleared/updated appropriately
when collectors are stopped or reused.
In `@pkg/sink/kafka/franz/admin_client.go`:
- Around line 185-228: GetTopicsMeta currently omits ReplicationFactor when
building TopicDetail in GetTopicsMeta; populate ReplicationFactor from the
fetched metadata by inspecting detail.Partitions' replica lists (e.g., if
len(detail.Partitions) > 0 take the replica slice length from
detail.Partitions[0].Replicas or compute a consistent value across partitions)
and assign it to TopicDetail.ReplicationFactor when creating the result entry;
update the construction of TopicDetail in GetTopicsMeta (referencing
GetTopicsMeta, meta.Topics[topic], detail.Partitions and TopicDetail) to set
ReplicationFactor accordingly and handle the empty-partitions case.
In `@pkg/sink/kafka/franz/async_producer.go`:
- Around line 69-82: The AsyncProducer.Close implementation spawns a goroutine
and calls p.client.Close() fire-and-forget, risking loss of buffered messages;
change AsyncProducer.Close (and its use of p.closed.CompareAndSwap and
p.client.Close) to perform the client.Close() call synchronously (or expose/wait
on a completion channel) so callers block until the franz-go client has flushed
and closed, mirroring the SyncProducer.Close pattern; keep the same logging
(including duration, p.changefeedID.Keyspace()/Name()) after the synchronous
close completes.
- Around line 151-166: Remove the premature mark of p.closed in
AsyncRunCallback: the deferred p.closed.Store(true) should be removed so Close()
remains the single place that sets the closed flag and performs shutdown; update
AsyncRunCallback to simply return on context cancel or errors (and if you
prefer, call p.client.Close() directly before returning instead of setting
p.closed) so that Close()'s CompareAndSwap(false, true) can succeed and the
underlying kgo.Client is properly closed; ensure references to p.errCh,
ctx.Done(), and p.changefeedID remain unchanged.
In `@pkg/sink/kafka/franz/sync_producer.go`:
- Around line 147-162: The Close method on SyncProducer is not atomic; replace
the Load()+Store() pattern with an atomic CompareAndSwap on p.closed so only one
goroutine proceeds to close the client: in SyncProducer.Close(), attempt if
p.closed.CompareAndSwap(false, true) { do the start timer, p.client.Close(), and
the Info log } else { emit the Warn log about already-closed using
p.id.Keyspace() and p.id.Name() and return }; mirror the AsyncProducer Close
behavior and messages to ensure consistency.
In `@pkg/sink/kafka/internal/logutil/logutil.go`:
- Around line 86-99: The AnnotateEventError function is duplicating the original
error message by appending err.Error() into the annotation; change the call that
builds the wrapped error (in AnnotateEventError) to only include the event
context (e.g., use errors.Annotate(err, contextStr) or a context-only string
like contextStr+";") instead of contextStr+"; ErrorInfo:"+err.Error(),
referencing AnnotateEventError, BuildEventLogContext and errors.Annotate so the
original error text is preserved exactly once in the wrapped error.
🧹 Nitpick comments (4)
pkg/sink/kafka/internal/logutil/logutil.go (1)
101-107: Silent error swallowing informatDMLInfo.
json.Marshalfailure is silently discarded, returning"". This is acceptable for best-effort log context, but a brief log or comment documenting the intentional discard would help future readers.pkg/sink/kafka/franz/factory.go (2)
117-139: ALPNNextProtosis unnecessary for Kafka connections.Lines 120 and 133 set
NextProtos: []string{"h2", "http/1.1"}, which are HTTP ALPN identifiers. Kafka uses its own binary protocol over TLS and does not negotiate via ALPN. While this won't break anything (the broker simply ignores unknown ALPN tokens), it adds confusion and could cause issues with strict TLS intermediaries.Consider removing
NextProtosor leaving it nil unless there's a specific proxy/load-balancer requirement.
248-266:buildFranzCompressionOptionsignature returns an error but never produces one.The function always returns
nilfor the error. The caller at Line 225-228 checks for an error that can never happen. This is harmless but misleading — consider simplifying the return tokgo.Optonly, or returning an error on thedefaultbranch instead of silently falling back.pkg/sink/kafka/franz/metrics_hook.go (1)
145-153:snapshotBrokersis unused — the same broker-map copy logic is inlined inSnapshot()at lines 114–118.Remove this function to eliminate dead code.
| func (m *franzMetricsCollector) cleanupMetrics() { | ||
| keyspace := m.changefeedID.Keyspace() | ||
| changefeedID := m.changefeedID.Name() | ||
| compressionRatioGauge.DeleteLabelValues(keyspace, changefeedID, avg) | ||
| compressionRatioGauge.DeleteLabelValues(keyspace, changefeedID, p99) | ||
| recordsPerRequestGauge.DeleteLabelValues(keyspace, changefeedID, avg) | ||
| recordsPerRequestGauge.DeleteLabelValues(keyspace, changefeedID, p99) | ||
|
|
||
| snapshot := m.hook.Snapshot() | ||
| for id := range snapshot.Brokers { | ||
| brokerID := strconv.Itoa(int(id)) | ||
| OutgoingByteRateGauge.DeleteLabelValues(keyspace, changefeedID, brokerID) | ||
| RequestRateGauge.DeleteLabelValues(keyspace, changefeedID, brokerID) | ||
| RequestLatencyGauge.DeleteLabelValues(keyspace, changefeedID, brokerID, avg) | ||
| RequestLatencyGauge.DeleteLabelValues(keyspace, changefeedID, brokerID, p99) | ||
| requestsInFlightGauge.DeleteLabelValues(keyspace, changefeedID, brokerID) | ||
| responseRateGauge.DeleteLabelValues(keyspace, changefeedID, brokerID) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stale per-broker metrics may not be cleaned up.
cleanupMetrics takes a snapshot at cleanup time to discover broker IDs. If a broker was seen earlier in the collector's lifetime but is no longer present in the final snapshot (e.g., broker removed from the cluster or connection dropped), its metric labels will never be deleted — leading to stale gauge values in Prometheus.
Consider tracking the set of broker IDs seen across all collectMetrics calls and using that accumulated set during cleanup.
🤖 Prompt for AI Agents
In `@pkg/sink/kafka/franz_metrics_collector.go` around lines 77 - 94,
cleanupMetrics only looks at the final hook.Snapshot() so broker IDs seen
earlier can be left as stale Prometheus labels; modify franzMetricsCollector to
maintain a persistent set (e.g., seenBrokers map[uint32]struct{} or set of
string IDs) that is updated inside collectMetrics whenever you iterate
snapshot.Brokers, and then have cleanupMetrics iterate that accumulated set (not
just the final snapshot) to call OutgoingByteRateGauge.DeleteLabelValues,
RequestRateGauge.DeleteLabelValues, RequestLatencyGauge.DeleteLabelValues,
requestsInFlightGauge.DeleteLabelValues and responseRateGauge.DeleteLabelValues
for every brokerID seen; ensure seenBrokers is initialized in the constructor
and cleared/updated appropriately when collectors are stopped or reused.
| func (a *AdminClient) GetTopicsMeta( | ||
| topics []string, | ||
| ignoreTopicError bool, | ||
| ) (map[string]TopicDetail, error) { | ||
| if len(topics) == 0 { | ||
| return make(map[string]TopicDetail), nil | ||
| } | ||
|
|
||
| ctx, cancel := a.newRequestContext() | ||
| defer cancel() | ||
|
|
||
| meta, err := a.admin.Metadata(ctx, topics...) | ||
| if err != nil { | ||
| return nil, errors.Trace(err) | ||
| } | ||
|
|
||
| result := make(map[string]TopicDetail, len(topics)) | ||
| for _, topic := range topics { | ||
| detail, ok := meta.Topics[topic] | ||
| if !ok { | ||
| continue | ||
| } | ||
| if detail.Err != nil { | ||
| if errors.Is(detail.Err, kerr.UnknownTopicOrPartition) { | ||
| continue | ||
| } | ||
| if !ignoreTopicError { | ||
| return nil, errors.Trace(detail.Err) | ||
| } | ||
| log.Warn("fetch topic meta failed", | ||
| zap.String("keyspace", a.changefeed.Keyspace()), | ||
| zap.String("changefeed", a.changefeed.Name()), | ||
| zap.String("topic", topic), | ||
| zap.Error(detail.Err)) | ||
| continue | ||
| } | ||
|
|
||
| result[topic] = TopicDetail{ | ||
| Name: topic, | ||
| NumPartitions: int32(len(detail.Partitions)), | ||
| } | ||
| } | ||
| return result, nil | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GetTopicsMeta never populates ReplicationFactor in the returned TopicDetail.
At Line 222-225, the TopicDetail is built with Name and NumPartitions but ReplicationFactor is left at its zero value. If callers rely on replication factor from this method, they'll silently get 0. If it's intentionally omitted, a comment would help; otherwise the franz-go metadata response does expose partition replica counts that could be used.
🤖 Prompt for AI Agents
In `@pkg/sink/kafka/franz/admin_client.go` around lines 185 - 228, GetTopicsMeta
currently omits ReplicationFactor when building TopicDetail in GetTopicsMeta;
populate ReplicationFactor from the fetched metadata by inspecting
detail.Partitions' replica lists (e.g., if len(detail.Partitions) > 0 take the
replica slice length from detail.Partitions[0].Replicas or compute a consistent
value across partitions) and assign it to TopicDetail.ReplicationFactor when
creating the result entry; update the construction of TopicDetail in
GetTopicsMeta (referencing GetTopicsMeta, meta.Topics[topic], detail.Partitions
and TopicDetail) to set ReplicationFactor accordingly and handle the
empty-partitions case.
| func (p *AsyncProducer) Close() { | ||
| if !p.closed.CompareAndSwap(false, true) { | ||
| return | ||
| } | ||
|
|
||
| go func() { | ||
| start := time.Now() | ||
| p.client.Close() | ||
| log.Info("Close kafka async producer success", | ||
| zap.String("keyspace", p.changefeedID.Keyspace()), | ||
| zap.String("changefeed", p.changefeedID.Name()), | ||
| zap.Duration("duration", time.Since(start))) | ||
| }() | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Asynchronous Close() may lose buffered messages.
p.client.Close() is called in a fire-and-forget goroutine. The franz-go Client.Close() flushes pending records and waits for in-flight produce requests. By not waiting for it to complete, the caller may proceed (e.g., exit the process) before buffered messages are flushed.
Consider making Close() synchronous or providing a way for the caller to wait for completion, similar to the SyncProducer.Close() pattern in sync_producer.go which calls p.client.Close() synchronously.
🤖 Prompt for AI Agents
In `@pkg/sink/kafka/franz/async_producer.go` around lines 69 - 82, The
AsyncProducer.Close implementation spawns a goroutine and calls p.client.Close()
fire-and-forget, risking loss of buffered messages; change AsyncProducer.Close
(and its use of p.closed.CompareAndSwap and p.client.Close) to perform the
client.Close() call synchronously (or expose/wait on a completion channel) so
callers block until the franz-go client has flushed and closed, mirroring the
SyncProducer.Close pattern; keep the same logging (including duration,
p.changefeedID.Keyspace()/Name()) after the synchronous close completes.
| func (p *AsyncProducer) AsyncRunCallback(ctx context.Context) error { | ||
| defer p.closed.Store(true) | ||
| for { | ||
| select { | ||
| case <-ctx.Done(): | ||
| log.Info("async producer exit since context is done", | ||
| zap.String("keyspace", p.changefeedID.Keyspace()), | ||
| zap.String("changefeed", p.changefeedID.Name())) | ||
| return errors.Trace(ctx.Err()) | ||
| case err := <-p.errCh: | ||
| if err == nil { | ||
| return nil | ||
| } | ||
| return err | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AsyncRunCallback sets closed=true before Close() runs, preventing client shutdown.
AsyncRunCallback defers p.closed.Store(true) (Line 152). When it returns (e.g., on context cancellation or error), closed becomes true. A subsequent call to Close() sees CompareAndSwap(false, true) fail and returns immediately — the underlying kgo.Client is never closed, leaking connections and potentially losing buffered messages.
Either remove the defer p.closed.Store(true) from AsyncRunCallback (let Close() handle the lifecycle), or have AsyncRunCallback call p.client.Close() directly before returning.
Proposed fix: remove the premature closed flag
func (p *AsyncProducer) AsyncRunCallback(ctx context.Context) error {
- defer p.closed.Store(true)
for {
select {
case <-ctx.Done():
log.Info("async producer exit since context is done",
zap.String("keyspace", p.changefeedID.Keyspace()),
zap.String("changefeed", p.changefeedID.Name()))
return errors.Trace(ctx.Err())
case err := <-p.errCh:
if err == nil {
return nil
}
return err
}
}
}🤖 Prompt for AI Agents
In `@pkg/sink/kafka/franz/async_producer.go` around lines 151 - 166, Remove the
premature mark of p.closed in AsyncRunCallback: the deferred
p.closed.Store(true) should be removed so Close() remains the single place that
sets the closed flag and performs shutdown; update AsyncRunCallback to simply
return on context cancel or errors (and if you prefer, call p.client.Close()
directly before returning instead of setting p.closed) so that Close()'s
CompareAndSwap(false, true) can succeed and the underlying kgo.Client is
properly closed; ensure references to p.errCh, ctx.Done(), and p.changefeedID
remain unchanged.
| // AnnotateEventError logs the event context and annotates the error with that context. | ||
| func AnnotateEventError( | ||
| keyspace, changefeed string, | ||
| info *common.MessageLogInfo, | ||
| err error, | ||
| ) error { | ||
| if err == nil { | ||
| return nil | ||
| } | ||
| if contextStr := BuildEventLogContext(keyspace, changefeed, info); contextStr != "" { | ||
| return errors.Annotate(err, contextStr+"; ErrorInfo:"+err.Error()) | ||
| } | ||
| return err | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Duplicated error message in annotated error.
errors.Annotate already wraps err, so when the annotated error is stringified it will include the original error's message. Appending err.Error() into the annotation string at line 96 causes the original message to appear twice in the output, e.g.:
"keyspace=..., eventType=dml; ErrorInfo:something failed: something failed"
Proposed fix
if contextStr := BuildEventLogContext(keyspace, changefeed, info); contextStr != "" {
- return errors.Annotate(err, contextStr+"; ErrorInfo:"+err.Error())
+ return errors.Annotate(err, contextStr)
}🤖 Prompt for AI Agents
In `@pkg/sink/kafka/internal/logutil/logutil.go` around lines 86 - 99, The
AnnotateEventError function is duplicating the original error message by
appending err.Error() into the annotation; change the call that builds the
wrapped error (in AnnotateEventError) to only include the event context (e.g.,
use errors.Annotate(err, contextStr) or a context-only string like
contextStr+";") instead of contextStr+"; ErrorInfo:"+err.Error(), referencing
AnnotateEventError, BuildEventLogContext and errors.Annotate so the original
error text is preserved exactly once in the wrapped error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 7
🤖 Fix all issues with AI agents
In `@pkg/sink/kafka/franz_admin_client.go`:
- Around line 29-36: GetAllBrokers currently treats elements returned by
a.inner.GetAllBrokers() as structs with a field ID, but the inner call returns
[]int32 (broker node IDs); update the loop in GetAllBrokers to iterate over the
int32 IDs (e.g., for _, id := range brokers) and construct Broker values using
the id directly (Broker{ID: id}) so you don't access a non-existent b.ID field.
- Around line 70-80: The CreateTopic implementation in franzAdminClientAdapter
is constructing the wrong type: it uses kadm.TopicDetail fields that don't exist
and passes *kadm.TopicDetail to a.inner.CreateTopic which expects
*kafkafranz.TopicDetail; change the code in CreateTopic to build and pass a
*kafkafranz.TopicDetail populated with the correct fields (e.g., Topic,
Partitions, ReplicationFactor or their actual names in kafkafranz.TopicDetail)
and remove the unused kadm import; ensure you still validate detail == nil and
return the same error when appropriate.
In `@pkg/sink/kafka/franz/admin_client.go`:
- Around line 81-94: GetAllBrokers currently swallows errors from
a.admin.BrokerMetadata making callers unable to tell "no brokers" vs "request
failed"; change the method signature on AdminClient.GetAllBrokers to return
([]int32, error), use a.newRequestContext() as before, call
a.admin.BrokerMetadata(ctx) and on error log the warning (including
keyspace/changefeed via a.changefeed) and return nil plus the error, otherwise
return meta.Brokers.NodeIDs() with a nil error; update all callers of
GetAllBrokers to handle the error appropriately (or if you intentionally want
best-effort behavior, instead add a clear comment above GetAllBrokers explaining
that failures are intentionally ignored).
In `@pkg/sink/kafka/franz/async_producer.go`:
- Around line 78-143: The return of context.Cause(ctx) in AsyncSend should be
wrapped to attach a stack trace; update the select case that currently does
"return context.Cause(ctx)" to instead capture the error and return a wrapped
error (e.g. errors.Trace(err) or errors.WrapError(...)) so library/stdlib errors
are traced; locate the select in AsyncSend that checks <-ctx.Done() and replace
the direct context.Cause(ctx) return with a wrapped version while preserving the
original behavior.
- Around line 148-164: In AsyncRunCallback, the returned context.Cause(ctx)
should be wrapped with the project's error-tracing helper (e.g. errors.Trace or
errors.WrapError) like other callsites (same pattern used in AsyncSend); update
the return in the ctx.Done() case inside AsyncRunCallback to return
errors.Trace(context.Cause(ctx)) (and ensure the errors package is imported/used
consistently) so the context-derived error gets a stack trace.
In `@pkg/sink/kafka/franz/factory_api_test.go`:
- Around line 28-29: The test calls newProducerOptions with an extra integer
argument causing a compile error; update the test to call newProducerOptions
with only the Options pointer (o) and, if the intent was to set maxAttempts or
similar, set the appropriate field on the Options struct (e.g., o.SomeField = 3)
before calling newProducerOptions so the function signature newProducerOptions(o
*Options) []kgo.Opt is respected.
In `@pkg/sink/kafka/franz/factory.go`:
- Around line 220-239: The newCompressionOption function currently logs an
unrecognized compression but leaves codec as the zero value, causing silent
no-compression; update the default branch in newCompressionOption (and any
early-empty case) to explicitly set codec = kgo.NoCompression() (or
alternatively return an error) so that kgo.ProducerBatchCompression always
receives a valid codec; ensure the warning log remains but does not allow codec
to stay nil before returning.
🧹 Nitpick comments (4)
pkg/sink/kafka/franz/factory_api_test.go (1)
18-29: Tests don't assert anything — they only verify no panic.Both tests discard the return values without any assertions. Consider using
requirefromtestifyto validate the returned options (e.g., non-nil, expected length, specific codec).pkg/sink/kafka/franz/factory.go (1)
132-173: Emptydefault:case in the SASL switch is misleading.The
default:on line 170 is empty, and the error return on line 172 is outside the switch. This makes it look like a fallthrough path, but Go doesn't fall through in switches. Moving the error return insidedefault:would improve clarity.♻️ Proposed refactor
- default: - } - return nil, errors.ErrKafkaInvalidConfig.GenWithStack("unsupported sasl mechanism %s", o.SASL.SASLMechanism) + default: + return nil, errors.ErrKafkaInvalidConfig.GenWithStack("unsupported sasl mechanism %s", o.SASL.SASLMechanism) + }pkg/sink/kafka/franz/sync_producer.go (1)
93-101:WrapErrorcalled unconditionally — wraps nil on success path.Line 101 calls
errors.WrapError(errors.ErrKafkaSendMessage, err)regardless of whethererris nil. WhileWrapErrorlikely returns nil when the inner error is nil, this pattern is fragile and obscures intent. The same applies toSendMessagesat line 136.♻️ Proposed fix for both methods
- return errors.WrapError(errors.ErrKafkaSendMessage, err) + if err != nil { + return errors.WrapError(errors.ErrKafkaSendMessage, err) + } + return nilpkg/sink/kafka/franz/gssapi.go (1)
46-52: Typo:kerborosClient→kerberosClient.The interface name and all references misspell "Kerberos" as "Kerboros." This affects the interface name (Line 46), the field type (Line 98), the constructor name (Line 217), and the constructor return type. Since these are unexported, it won't break external consumers, but it's still a readability/maintenance concern.
Proposed fix (interface + constructor rename)
-type kerborosClient interface { +type kerberosClient interface {-func newKerborosClient(g security.GSSAPI) (kerborosClient, error) { +func newKerberosClient(g security.GSSAPI) (kerberosClient, error) {And update the field type in
gssapiSession:type gssapiSession struct { - client kerborosClient + client kerberosClient
| func (a *franzAdminClientAdapter) CreateTopic(detail *TopicDetail, validateOnly bool) error { | ||
| if detail == nil { | ||
| return errors.ErrKafkaInvalidConfig.GenWithStack("topic detail must not be nil") | ||
| } | ||
| franzDetail := &kadm.TopicDetail{ | ||
| Name: detail.Name, | ||
| NumPartitions: detail.NumPartitions, | ||
| ReplicationFactor: detail.ReplicationFactor, | ||
| } | ||
| return a.inner.CreateTopic(franzDetail, validateOnly) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Compilation error: wrong type used for CreateTopic argument.
kadm.TopicDetail does not have Name, NumPartitions, or ReplicationFactor fields, and a.inner.CreateTopic expects *kafkafranz.TopicDetail, not *kadm.TopicDetail. The pipeline confirms all four errors on lines 75-79.
The kadm import on line 19 can also be removed after this fix.
🐛 Proposed fix
import (
"github.com/pingcap/ticdc/pkg/errors"
kafkafranz "github.com/pingcap/ticdc/pkg/sink/kafka/franz"
- "github.com/twmb/franz-go/pkg/kadm"
) func (a *franzAdminClientAdapter) CreateTopic(detail *TopicDetail, validateOnly bool) error {
if detail == nil {
return errors.ErrKafkaInvalidConfig.GenWithStack("topic detail must not be nil")
}
- franzDetail := &kadm.TopicDetail{
- Name: detail.Name,
- NumPartitions: detail.NumPartitions,
- ReplicationFactor: detail.ReplicationFactor,
- }
+ franzDetail := &kafkafranz.TopicDetail{
+ Name: detail.Name,
+ NumPartitions: detail.NumPartitions,
+ ReplicationFactor: detail.ReplicationFactor,
+ }
return a.inner.CreateTopic(franzDetail, validateOnly)
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func (a *franzAdminClientAdapter) CreateTopic(detail *TopicDetail, validateOnly bool) error { | |
| if detail == nil { | |
| return errors.ErrKafkaInvalidConfig.GenWithStack("topic detail must not be nil") | |
| } | |
| franzDetail := &kadm.TopicDetail{ | |
| Name: detail.Name, | |
| NumPartitions: detail.NumPartitions, | |
| ReplicationFactor: detail.ReplicationFactor, | |
| } | |
| return a.inner.CreateTopic(franzDetail, validateOnly) | |
| } | |
| func (a *franzAdminClientAdapter) CreateTopic(detail *TopicDetail, validateOnly bool) error { | |
| if detail == nil { | |
| return errors.ErrKafkaInvalidConfig.GenWithStack("topic detail must not be nil") | |
| } | |
| franzDetail := &kafkafranz.TopicDetail{ | |
| Name: detail.Name, | |
| NumPartitions: detail.NumPartitions, | |
| ReplicationFactor: detail.ReplicationFactor, | |
| } | |
| return a.inner.CreateTopic(franzDetail, validateOnly) | |
| } |
🧰 Tools
🪛 GitHub Check: Mac OS Build
[failure] 79-79:
cannot use franzDetail (variable of type *kadm.TopicDetail) as *franz.TopicDetail value in argument to a.inner.CreateTopic
[failure] 77-77:
unknown field ReplicationFactor in struct literal of type kadm.TopicDetail
[failure] 76-76:
unknown field NumPartitions in struct literal of type kadm.TopicDetail
[failure] 75-75:
unknown field Name in struct literal of type kadm.TopicDetail
🤖 Prompt for AI Agents
In `@pkg/sink/kafka/franz_admin_client.go` around lines 70 - 80, The CreateTopic
implementation in franzAdminClientAdapter is constructing the wrong type: it
uses kadm.TopicDetail fields that don't exist and passes *kadm.TopicDetail to
a.inner.CreateTopic which expects *kafkafranz.TopicDetail; change the code in
CreateTopic to build and pass a *kafkafranz.TopicDetail populated with the
correct fields (e.g., Topic, Partitions, ReplicationFactor or their actual names
in kafkafranz.TopicDetail) and remove the unused kadm import; ensure you still
validate detail == nil and return the same error when appropriate.
| func (a *AdminClient) GetAllBrokers() []int32 { | ||
| ctx, cancel := a.newRequestContext() | ||
| defer cancel() | ||
|
|
||
| meta, err := a.admin.BrokerMetadata(ctx) | ||
| if err != nil { | ||
| log.Warn("Kafka admin client fetch broker metadata failed", | ||
| zap.String("keyspace", a.changefeed.Keyspace()), | ||
| zap.String("changefeed", a.changefeed.Name()), | ||
| zap.Error(err)) | ||
| return nil | ||
| } | ||
| return meta.Brokers.NodeIDs() | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GetAllBrokers silently swallows errors by returning nil.
When BrokerMetadata fails, the method logs a warning and returns nil instead of propagating the error. Callers cannot distinguish "no brokers" from "request failed." If this is intentional (best-effort for heartbeat/monitoring), a comment would clarify the design. If callers need to react to failures, an error return value is needed.
🤖 Prompt for AI Agents
In `@pkg/sink/kafka/franz/admin_client.go` around lines 81 - 94, GetAllBrokers
currently swallows errors from a.admin.BrokerMetadata making callers unable to
tell "no brokers" vs "request failed"; change the method signature on
AdminClient.GetAllBrokers to return ([]int32, error), use a.newRequestContext()
as before, call a.admin.BrokerMetadata(ctx) and on error log the warning
(including keyspace/changefeed via a.changefeed) and return nil plus the error,
otherwise return meta.Brokers.NodeIDs() with a nil error; update all callers of
GetAllBrokers to handle the error appropriately (or if you intentionally want
best-effort behavior, instead add a clear comment above GetAllBrokers explaining
that failures are intentionally ignored).
| func (p *AsyncProducer) AsyncSend( | ||
| ctx context.Context, | ||
| topic string, | ||
| partition int32, | ||
| message *common.Message, | ||
| ) error { | ||
| if p.closed.Load() { | ||
| return errors.ErrKafkaProducerClosed.GenWithStackByArgs() | ||
| } | ||
|
|
||
| select { | ||
| case <-ctx.Done(): | ||
| return context.Cause(ctx) | ||
| default: | ||
| } | ||
|
|
||
| var ( | ||
| keyspace = p.changefeedID.Keyspace() | ||
| changefeed = p.changefeedID.Name() | ||
| ) | ||
|
|
||
| failpoint.Inject("KafkaSinkAsyncSendError", func() { | ||
| log.Info("KafkaSinkAsyncSendError error injected", | ||
| zap.String("keyspace", keyspace), zap.String("changefeed", changefeed)) | ||
| errWithInfo := logutil.AnnotateEventError( | ||
| keyspace, | ||
| changefeed, | ||
| message.LogInfo, | ||
| errors.New("kafka sink injected error"), | ||
| ) | ||
| select { | ||
| case p.errCh <- errors.WrapError(errors.ErrKafkaAsyncSendMessage, errWithInfo): | ||
| default: | ||
| } | ||
| failpoint.Return(nil) | ||
| }) | ||
|
|
||
| record := &kgo.Record{ | ||
| Topic: topic, | ||
| Partition: partition, | ||
| Key: message.Key, | ||
| Value: message.Value, | ||
| } | ||
|
|
||
| callback := message.Callback | ||
| logInfo := message.LogInfo | ||
| promise := func(_ *kgo.Record, err error) { | ||
| if err != nil { | ||
| errWithInfo := logutil.AnnotateEventError( | ||
| keyspace, changefeed, | ||
| logInfo, | ||
| err, | ||
| ) | ||
| select { | ||
| case p.errCh <- errors.WrapError(errors.ErrKafkaAsyncSendMessage, errWithInfo): | ||
| // todo: remove this default after support dispatcher recover logic. | ||
| default: | ||
| } | ||
| return | ||
| } | ||
| if callback != nil { | ||
| callback() | ||
| } | ||
| } | ||
| p.client.Produce(ctx, record, promise) | ||
| return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AsyncSend should wrap the error from context.Cause(ctx).
Line 90 returns the raw error from context.Cause(ctx) without errors.Trace. Per the coding guidelines, errors from library/stdlib calls should be wrapped to attach a stack trace.
Proposed fix
select {
case <-ctx.Done():
- return context.Cause(ctx)
+ return errors.Trace(context.Cause(ctx))
default:
}The rest of AsyncSend — record construction, promise callback wiring, failpoint injection, and the non-blocking error channel send — is clean and correct. As per coding guidelines, "When an error comes from a third-party or library call in Go, wrap it immediately with errors.Trace(err) or errors.WrapError(...) to attach a stack trace."
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func (p *AsyncProducer) AsyncSend( | |
| ctx context.Context, | |
| topic string, | |
| partition int32, | |
| message *common.Message, | |
| ) error { | |
| if p.closed.Load() { | |
| return errors.ErrKafkaProducerClosed.GenWithStackByArgs() | |
| } | |
| select { | |
| case <-ctx.Done(): | |
| return context.Cause(ctx) | |
| default: | |
| } | |
| var ( | |
| keyspace = p.changefeedID.Keyspace() | |
| changefeed = p.changefeedID.Name() | |
| ) | |
| failpoint.Inject("KafkaSinkAsyncSendError", func() { | |
| log.Info("KafkaSinkAsyncSendError error injected", | |
| zap.String("keyspace", keyspace), zap.String("changefeed", changefeed)) | |
| errWithInfo := logutil.AnnotateEventError( | |
| keyspace, | |
| changefeed, | |
| message.LogInfo, | |
| errors.New("kafka sink injected error"), | |
| ) | |
| select { | |
| case p.errCh <- errors.WrapError(errors.ErrKafkaAsyncSendMessage, errWithInfo): | |
| default: | |
| } | |
| failpoint.Return(nil) | |
| }) | |
| record := &kgo.Record{ | |
| Topic: topic, | |
| Partition: partition, | |
| Key: message.Key, | |
| Value: message.Value, | |
| } | |
| callback := message.Callback | |
| logInfo := message.LogInfo | |
| promise := func(_ *kgo.Record, err error) { | |
| if err != nil { | |
| errWithInfo := logutil.AnnotateEventError( | |
| keyspace, changefeed, | |
| logInfo, | |
| err, | |
| ) | |
| select { | |
| case p.errCh <- errors.WrapError(errors.ErrKafkaAsyncSendMessage, errWithInfo): | |
| // todo: remove this default after support dispatcher recover logic. | |
| default: | |
| } | |
| return | |
| } | |
| if callback != nil { | |
| callback() | |
| } | |
| } | |
| p.client.Produce(ctx, record, promise) | |
| return nil | |
| func (p *AsyncProducer) AsyncSend( | |
| ctx context.Context, | |
| topic string, | |
| partition int32, | |
| message *common.Message, | |
| ) error { | |
| if p.closed.Load() { | |
| return errors.ErrKafkaProducerClosed.GenWithStackByArgs() | |
| } | |
| select { | |
| case <-ctx.Done(): | |
| return errors.Trace(context.Cause(ctx)) | |
| default: | |
| } | |
| var ( | |
| keyspace = p.changefeedID.Keyspace() | |
| changefeed = p.changefeedID.Name() | |
| ) | |
| failpoint.Inject("KafkaSinkAsyncSendError", func() { | |
| log.Info("KafkaSinkAsyncSendError error injected", | |
| zap.String("keyspace", keyspace), zap.String("changefeed", changefeed)) | |
| errWithInfo := logutil.AnnotateEventError( | |
| keyspace, | |
| changefeed, | |
| message.LogInfo, | |
| errors.New("kafka sink injected error"), | |
| ) | |
| select { | |
| case p.errCh <- errors.WrapError(errors.ErrKafkaAsyncSendMessage, errWithInfo): | |
| default: | |
| } | |
| failpoint.Return(nil) | |
| }) | |
| record := &kgo.Record{ | |
| Topic: topic, | |
| Partition: partition, | |
| Key: message.Key, | |
| Value: message.Value, | |
| } | |
| callback := message.Callback | |
| logInfo := message.LogInfo | |
| promise := func(_ *kgo.Record, err error) { | |
| if err != nil { | |
| errWithInfo := logutil.AnnotateEventError( | |
| keyspace, changefeed, | |
| logInfo, | |
| err, | |
| ) | |
| select { | |
| case p.errCh <- errors.WrapError(errors.ErrKafkaAsyncSendMessage, errWithInfo): | |
| // todo: remove this default after support dispatcher recover logic. | |
| default: | |
| } | |
| return | |
| } | |
| if callback != nil { | |
| callback() | |
| } | |
| } | |
| p.client.Produce(ctx, record, promise) | |
| return nil |
🤖 Prompt for AI Agents
In `@pkg/sink/kafka/franz/async_producer.go` around lines 78 - 143, The return of
context.Cause(ctx) in AsyncSend should be wrapped to attach a stack trace;
update the select case that currently does "return context.Cause(ctx)" to
instead capture the error and return a wrapped error (e.g. errors.Trace(err) or
errors.WrapError(...)) so library/stdlib errors are traced; locate the select in
AsyncSend that checks <-ctx.Done() and replace the direct context.Cause(ctx)
return with a wrapped version while preserving the original behavior.
| func (p *AsyncProducer) AsyncRunCallback(ctx context.Context) error { | ||
| defer p.closed.Store(true) | ||
| for { | ||
| select { | ||
| case <-ctx.Done(): | ||
| log.Info("async producer exit since context is done", | ||
| zap.String("keyspace", p.changefeedID.Keyspace()), | ||
| zap.String("changefeed", p.changefeedID.Name())) | ||
| return context.Cause(ctx) | ||
| case err := <-p.errCh: | ||
| if err == nil { | ||
| return nil | ||
| } | ||
| return err | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AsyncRunCallback should also wrap the context.Cause error.
Same pattern as in AsyncSend: Line 156 returns context.Cause(ctx) unwrapped.
Proposed fix
case <-ctx.Done():
log.Info("async producer exit since context is done",
zap.String("keyspace", p.changefeedID.Keyspace()),
zap.String("changefeed", p.changefeedID.Name()))
- return context.Cause(ctx)
+ return errors.Trace(context.Cause(ctx))As per coding guidelines, "When an error comes from a third-party or library call in Go, wrap it immediately with errors.Trace(err) or errors.WrapError(...) to attach a stack trace."
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func (p *AsyncProducer) AsyncRunCallback(ctx context.Context) error { | |
| defer p.closed.Store(true) | |
| for { | |
| select { | |
| case <-ctx.Done(): | |
| log.Info("async producer exit since context is done", | |
| zap.String("keyspace", p.changefeedID.Keyspace()), | |
| zap.String("changefeed", p.changefeedID.Name())) | |
| return context.Cause(ctx) | |
| case err := <-p.errCh: | |
| if err == nil { | |
| return nil | |
| } | |
| return err | |
| } | |
| } | |
| } | |
| func (p *AsyncProducer) AsyncRunCallback(ctx context.Context) error { | |
| defer p.closed.Store(true) | |
| for { | |
| select { | |
| case <-ctx.Done(): | |
| log.Info("async producer exit since context is done", | |
| zap.String("keyspace", p.changefeedID.Keyspace()), | |
| zap.String("changefeed", p.changefeedID.Name())) | |
| return errors.Trace(context.Cause(ctx)) | |
| case err := <-p.errCh: | |
| if err == nil { | |
| return nil | |
| } | |
| return err | |
| } | |
| } | |
| } |
🤖 Prompt for AI Agents
In `@pkg/sink/kafka/franz/async_producer.go` around lines 148 - 164, In
AsyncRunCallback, the returned context.Cause(ctx) should be wrapped with the
project's error-tracing helper (e.g. errors.Trace or errors.WrapError) like
other callsites (same pattern used in AsyncSend); update the return in the
ctx.Done() case inside AsyncRunCallback to return
errors.Trace(context.Cause(ctx)) (and ensure the errors package is imported/used
consistently) so the context-derived error gets a stack trace.
| func newCompressionOption(o *Options) kgo.Opt { | ||
| compression := strings.ToLower(strings.TrimSpace(o.Compression)) | ||
| var codec kgo.CompressionCodec | ||
| switch compression { | ||
| case "none": | ||
| codec = kgo.NoCompression() | ||
| case "gzip": | ||
| codec = kgo.GzipCompression() | ||
| case "snappy": | ||
| codec = kgo.SnappyCompression() | ||
| case "lz4": | ||
| codec = kgo.Lz4Compression() | ||
| case "zstd": | ||
| codec = kgo.ZstdCompression() | ||
| case "": | ||
| codec = kgo.NoCompression() | ||
| default: | ||
| log.Warn("unsupported compression algorithm", zap.String("compression", o.Compression)) | ||
| } | ||
| return kgo.ProducerBatchCompression(codec) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unrecognized compression falls through silently with a zero-value codec.
When compression is an unrecognized string, the code logs a warning (line 237) but then passes a zero-value kgo.CompressionCodec to kgo.ProducerBatchCompression. This results in silent degradation to no compression. Consider returning an error or explicitly falling back to kgo.NoCompression() to make the behavior intentional.
🛡️ Proposed fix — explicit fallback
default:
log.Warn("unsupported compression algorithm", zap.String("compression", o.Compression))
+ codec = kgo.NoCompression()
}🤖 Prompt for AI Agents
In `@pkg/sink/kafka/franz/factory.go` around lines 220 - 239, The
newCompressionOption function currently logs an unrecognized compression but
leaves codec as the zero value, causing silent no-compression; update the
default branch in newCompressionOption (and any early-empty case) to explicitly
set codec = kgo.NoCompression() (or alternatively return an error) so that
kgo.ProducerBatchCompression always receives a valid codec; ensure the warning
log remains but does not allow codec to stay nil before returning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
pkg/sink/kafka/metrics.go (1)
56-92:⚠️ Potential issue | 🟠 MajorDefault histogram buckets are unsuitable for all three franz metrics.
The Prometheus default buckets (
DefBuckets: 0.005 … 10) are designed for second-based latencies. These histograms observe:
franzRequestLatencyHistogram— values in milliseconds (seemetrics_hook.goLine 175:Microseconds() / 1000). Typical latencies (5–500 ms) will all overflow into the+Infbucket.franzCompressionRatioHistogram— values ofratio × 100(seemetrics_hook.goLine 200). Typical values (100–500) will all overflow.franzRecordsPerRequestHistogram— integer record counts. Also a poor fit for sub-1 buckets.Without custom
Buckets, these histograms provide no useful quantile information.Proposed fix — add meaningful bucket ranges
franzRequestLatencyHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "ticdc", Subsystem: "sink", Name: "kafka_producer_request_latency_histogram", Help: "Request latency histogram for franz producer in milliseconds.", + Buckets: []float64{1, 5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000}, }, []string{"namespace", "changefeed", "broker"}) ... franzCompressionRatioHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "ticdc", Subsystem: "sink", Name: "kafka_producer_compression_ratio_histogram", Help: "Compression ratio times 100 histogram for franz producer.", + Buckets: []float64{50, 100, 150, 200, 300, 500, 1000}, }, []string{"namespace", "changefeed"}) ... franzRecordsPerRequestHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "ticdc", Subsystem: "sink", Name: "kafka_producer_records_per_request_histogram", Help: "Records per request histogram for franz producer.", + Buckets: []float64{1, 5, 10, 25, 50, 100, 250, 500, 1000}, }, []string{"namespace", "changefeed"})
🤖 Fix all issues with AI agents
In `@pkg/sink/kafka/franz/metrics_hook.go`:
- Around line 120-140: RecordBrokerWrite increments RequestsInFlight only when
err==nil but OnBrokerE2E only decrements when e2e.WriteErr==nil, causing leaks;
update OnBrokerE2E to always decrement the RequestsInFlight gauge for the same
label tuple (keyspace, changefeed, brokerID) whenever the write was counted
(i.e., when RequestsInFlight != nil and the broker/request labels exist), rather
than only on e2e.WriteErr==nil; locate MetricsHook.OnBrokerE2E and remove or
broaden the e2e.WriteErr condition so
RequestsInFlight.WithLabelValues(...).Add(-1) (or .Dec()) is invoked
unconditionally when the corresponding request was previously incremented.
🧹 Nitpick comments (2)
pkg/sink/kafka/franz/admin_client_test.go (1)
25-36: Source-code grep test is fragile; prefer a behavioral assertion.This test reads the source file as text and asserts on string patterns. It will silently pass if the error is misspelled, aliased, or if the function is moved to another file. A unit test that actually calls the code path (e.g., with a mock returning a "controller not available" condition) and asserts on the returned error type would be more reliable and refactor-proof.
pkg/sink/kafka/franz/metrics_hook.go (1)
70-80:Runwon't clean up ifBindPrometheusMetricsis called afterRunstarts.
Runreadsboundonce on entry (Line 71). If it'sfalse, the early return on Line 75 skips cleanup entirely — even ifBindPrometheusMetricsis called later whileRunis blocked onctx.Done(). This appears safe given the current call site (NewFranzFactorybinds beforeRunis invoked), but it's worth a brief comment documenting that precondition.
| func (h *MetricsHook) RecordBrokerWrite(nodeID int32, bytesWritten int, err error) { | ||
| if nodeID < 0 { | ||
| return | ||
| } | ||
|
|
||
| keyspace, changefeed, metrics, bound := h.loadPrometheusMetrics() | ||
| if !bound { | ||
| return | ||
| } | ||
| brokerID := strconv.Itoa(int(nodeID)) | ||
|
|
||
| if metrics.OutgoingByteRate != nil && bytesWritten > 0 { | ||
| metrics.OutgoingByteRate.WithLabelValues(keyspace, changefeed, brokerID).Add(float64(bytesWritten)) | ||
| } | ||
| if metrics.RequestRate != nil { | ||
| metrics.RequestRate.WithLabelValues(keyspace, changefeed, brokerID).Add(1) | ||
| } | ||
| if err == nil && metrics.RequestsInFlight != nil { | ||
| metrics.RequestsInFlight.WithLabelValues(keyspace, changefeed, brokerID).Add(1) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In-flight tracking increments on write but only decrements on E2E success — failed writes leak.
RecordBrokerWrite increments RequestsInFlight when err == nil (Line 137-138). OnBrokerE2E decrements it only when e2e.WriteErr == nil (Line 168-169). If a request is successfully written (err == nil in OnBrokerWrite) but later the E2E callback reports WriteErr != nil (e.g., broker dropped it), the in-flight gauge will never decrement, drifting upward over time.
Consider whether the decrement condition should be broadened (e.g., always decrement in OnBrokerE2E if the corresponding write incremented).
🤖 Prompt for AI Agents
In `@pkg/sink/kafka/franz/metrics_hook.go` around lines 120 - 140,
RecordBrokerWrite increments RequestsInFlight only when err==nil but OnBrokerE2E
only decrements when e2e.WriteErr==nil, causing leaks; update OnBrokerE2E to
always decrement the RequestsInFlight gauge for the same label tuple (keyspace,
changefeed, brokerID) whenever the write was counted (i.e., when
RequestsInFlight != nil and the broker/request labels exist), rather than only
on e2e.WriteErr==nil; locate MetricsHook.OnBrokerE2E and remove or broaden the
e2e.WriteErr condition so RequestsInFlight.WithLabelValues(...).Add(-1) (or
.Dec()) is invoked unconditionally when the corresponding request was previously
incremented.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@pkg/errors/helper.go`:
- Line 118: The review asks to confirm if temporary Kafka controller
unavailability should cause an unretryable changefeed failure: check the code
paths where ErrKafkaControllerNotAvailable is returned (notably GetBrokerConfig
and the meta.Controller < 0 check) and decide whether it represents a transient
error; if it is transient, change its classification from unretryable to
retryable (or wrap it so callers can distinguish transient vs permanent), update
any error mapping/constants where ErrKafkaControllerNotAvailable is listed
(e.g., remove it from permanent-failure lists), and add/update a
unit/integration test that simulates meta.Controller < 0 to ensure changefeed
initialization now retries with backoff instead of failing fast; also update
comments/docs around ErrKafkaControllerNotAvailable to describe expected retry
behavior.
In `@pkg/sink/kafka/franz/metrics_hook.go`:
- Around line 69-79: The Run method currently returns early if metrics aren't
yet bound, which lets metrics bound later leak; change MetricsHook.Run so it
does not early-return on !bound — instead, simply wait for ctx.Done() and then
call CleanupPrometheusMetrics(); use the existing
loadPrometheusMetrics()/promBound state only for the initial check if needed,
but ensure CleanupPrometheusMetrics() is always invoked on shutdown
(BindPrometheusMetrics may be called after Run starts), so remove the
early-return path that prevents cleanup when BindPrometheusMetrics runs later.
🧹 Nitpick comments (1)
pkg/sink/kafka/franz/admin_client_test.go (1)
27-38: Source-file-scanning test is fragile and non-behavioral.This test reads the Go source file at runtime and does string matching. It will break if the file is renamed, moved, or if the error usage is refactored without changing semantics. A behavioral test that calls
GetBrokerConfigwith a mock/stub returningController < 0and asserts the error type would be more robust and maintainable.
pkg/errors/helper.go
Outdated
|
|
||
| ErrSinkURIInvalid, | ||
| ErrKafkaInvalidConfig, | ||
| ErrKafkaControllerNotAvailable, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# Check where ErrKafkaControllerNotAvailable is generated/used
rg -n 'ErrKafkaControllerNotAvailable' --type=go -C3Repository: pingcap/ticdc
Length of output: 2064
🏁 Script executed:
# Get more context around the GetBrokerConfig function
rg -n 'GetBrokerConfig|meta.Controller' pkg/sink/kafka/franz/admin_client.go -B5 -A10Repository: pingcap/ticdc
Length of output: 971
🏁 Script executed:
# Check where GetBrokerConfig is called
rg -n 'GetBrokerConfig' --type=go -B2 -A2Repository: pingcap/ticdc
Length of output: 4324
🏁 Script executed:
# Check how other Kafka errors are classified in changefeedUnRetryableErrors
rg -n 'changefeedUnRetryableErrors' pkg/errors/helper.go -A50Repository: pingcap/ticdc
Length of output: 2547
🏁 Script executed:
# Check how errors are handled during changefeed creation and where ShouldFailChangefeed is called
rg -n 'ShouldFailChangefeed|adjustOptions' --type=go -B2 -A2 | head -80Repository: pingcap/ticdc
Length of output: 4769
🏁 Script executed:
# Check the context of where GetBrokerConfig is called in options.go - specifically around line 727
sed -n '715,740p' pkg/sink/kafka/options.goRepository: pingcap/ticdc
Length of output: 886
🏁 Script executed:
# Check how errors from adjustOptions are handled in the factory
sed -n '40,80p' pkg/sink/kafka/sarama_factory.goRepository: pingcap/ticdc
Length of output: 1212
🏁 Script executed:
# Check if there's any retry logic around changefeed sink creation
rg -n 'retry|Retry' pkg/sink/kafka/sarama_factory.go -B2 -A2Repository: pingcap/ticdc
Length of output: 220
🏁 Script executed:
# Look for comments or documentation about why controller unavailability is unretryable
rg -n 'controller.*not.*available|Controller.*not.*available' --type=go -B3 -A3Repository: pingcap/ticdc
Length of output: 1086
🏁 Script executed:
# Check the test to understand the intent better
cat -n pkg/sink/kafka/franz/admin_client_test.go | head -40Repository: pingcap/ticdc
Length of output: 1552
🏁 Script executed:
# Check if there's any documentation or comments about retry behavior in options.go or factory
sed -n '580,600p' pkg/sink/kafka/options.goRepository: pingcap/ticdc
Length of output: 655
🏁 Script executed:
# Check how changefeed creation handles errors and if there's retry logic
rg -n 'CreateSinkFactory|New.*Factory' --type=go -B3 -A3 | head -60Repository: pingcap/ticdc
Length of output: 3927
🏁 Script executed:
# Look for any retry or backoff logic around changefeed creation
rg -n 'backoff|exponential|retry.*changefeed|changefeed.*retry' --type=go -i | head -20Repository: pingcap/ticdc
Length of output: 2200
Confirm whether temporary controller unavailability should cause permanent changefeed failure.
ErrKafkaControllerNotAvailable is returned when meta.Controller < 0 during broker config lookup in GetBrokerConfig, which is called during changefeed initialization. Controller elections can be temporarily unavailable during broker restarts or failovers. Marking this unretryable causes changefeed creation to fail immediately rather than retry with backoff. Verify this is the intended behavior—whether the changefeed should fail fast on controller unavailability or allow transient recovery with retries.
🤖 Prompt for AI Agents
In `@pkg/errors/helper.go` at line 118, The review asks to confirm if temporary
Kafka controller unavailability should cause an unretryable changefeed failure:
check the code paths where ErrKafkaControllerNotAvailable is returned (notably
GetBrokerConfig and the meta.Controller < 0 check) and decide whether it
represents a transient error; if it is transient, change its classification from
unretryable to retryable (or wrap it so callers can distinguish transient vs
permanent), update any error mapping/constants where
ErrKafkaControllerNotAvailable is listed (e.g., remove it from permanent-failure
lists), and add/update a unit/integration test that simulates meta.Controller <
0 to ensure changefeed initialization now retries with backoff instead of
failing fast; also update comments/docs around ErrKafkaControllerNotAvailable to
describe expected retry behavior.
| func (h *MetricsHook) Run(ctx context.Context) { | ||
| _, _, _, bound := h.loadPrometheusMetrics() | ||
|
|
||
| if !bound { | ||
| <-ctx.Done() | ||
| return | ||
| } | ||
|
|
||
| <-ctx.Done() | ||
| h.CleanupPrometheusMetrics() | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Run does not actually wait for binding — metrics will leak if BindPrometheusMetrics is called after Run starts.
If Run is invoked before BindPrometheusMetrics (a likely lifecycle ordering), it sees !bound, blocks on <-ctx.Done(), and returns without calling CleanupPrometheusMetrics. Metrics bound later are never cleaned up.
Consider using a channel or condition variable to wait for binding, or re-check promBound after <-ctx.Done():
Proposed minimal fix
func (h *MetricsHook) Run(ctx context.Context) {
- _, _, _, bound := h.loadPrometheusMetrics()
-
- if !bound {
- <-ctx.Done()
- return
- }
-
<-ctx.Done()
h.CleanupPrometheusMetrics()
}CleanupPrometheusMetrics already guards against the unbound case, so the early-return check is unnecessary and actively harmful.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func (h *MetricsHook) Run(ctx context.Context) { | |
| _, _, _, bound := h.loadPrometheusMetrics() | |
| if !bound { | |
| <-ctx.Done() | |
| return | |
| } | |
| <-ctx.Done() | |
| h.CleanupPrometheusMetrics() | |
| } | |
| func (h *MetricsHook) Run(ctx context.Context) { | |
| <-ctx.Done() | |
| h.CleanupPrometheusMetrics() | |
| } |
🤖 Prompt for AI Agents
In `@pkg/sink/kafka/franz/metrics_hook.go` around lines 69 - 79, The Run method
currently returns early if metrics aren't yet bound, which lets metrics bound
later leak; change MetricsHook.Run so it does not early-return on !bound —
instead, simply wait for ctx.Done() and then call CleanupPrometheusMetrics();
use the existing loadPrometheusMetrics()/promBound state only for the initial
check if needed, but ensure CleanupPrometheusMetrics() is always invoked on
shutdown (BindPrometheusMetrics may be called after Run starts), so remove the
early-return path that prevents cleanup when BindPrometheusMetrics runs later.
|
[FORMAT CHECKER NOTIFICATION] Notice: To remove the 📖 For more info, you can check the "Contribute Code" section in the development guide. |
|
/test all |
|
@3AceShowHand: The following tests failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
Summary
Test Plan
Summary by CodeRabbit
New Features
kafka-client(default: franz).Tests
Chores