Skip to content

Conversation

@3AceShowHand
Copy link
Collaborator

@3AceShowHand 3AceShowHand commented Feb 4, 2026

What problem does this PR solve?

Issue Number: close #xxx

What is changed and how it works?

Check List

Tests

  • Unit test
  • Integration test
  • Manual test (add detailed scripts or steps below)
  • No code

Questions

Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?

Release note

Please refer to [Release Notes Language Style Guide](https://pingcap.github.io/tidb-dev-guide/contribute-to-tidb/release-notes-style-guide.html) to write a quality release note.

If you don't think this PR needs a release note then fill it with `None`.

Summary by CodeRabbit

  • New Features

    • Per-area event batching with configurable per-area count and byte limits; batches flush on limits and allow a single oversized event.
  • API / Configuration

    • New API/runtime fields to set and expose event-collector batch count and batch bytes; area-level overrides applied when dispatchers are prepared.
  • Tests

    • Added/updated tests validating per-area batching, byte-limit behavior (including oversized events), lifecycle, and config mapping.

@ti-chi-bot ti-chi-bot bot added do-not-merge/needs-linked-issue release-note Denotes a PR that will be considered when it comes time to generate release notes. size/L Denotes a PR that changes 100-499 lines, ignoring generated files. labels Feb 4, 2026
@gemini-code-assist
Copy link

Summary of Changes

Hello @3AceShowHand, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a significant enhancement to the dynamic stream's event queue by adding byte-based batching capabilities. Previously, event batching was primarily governed by the number of events. With this change, users can now configure a maximum byte size for event batches, providing finer control over memory consumption and improving the stability and efficiency of event processing, especially when dealing with events of varying sizes. This ensures that batches remain within manageable memory limits, preventing potential resource exhaustion.

Highlights

  • Byte-based Batching: Implemented a new mechanism to limit event batches not only by count but also by their total byte size, configurable via the BatchBytes option.
  • Enhanced popEvents Logic: The popEvents function in event_queue.go now incorporates checks to ensure that the accumulated byte size of events in a batch does not exceed the specified BatchBytes limit, breaking the batching process if the limit is met or would be exceeded by the next event.
  • New Unit Tests: Added event_queue_batch_bytes_test.go with dedicated unit tests to validate the correctness of the new byte-based batching logic, including edge cases where a single event might be larger than the BatchBytes limit.
  • Documentation Clarity: Updated the comment for the BatchBytes field in the Option struct within interfaces.go to clearly state that a value of <= 0 disables bytes-based batching.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • utils/dynstream/event_queue.go
    • Introduced batchBytes and bytesBatchingEnabled variables to control byte-based batching.
    • Modified the popEvents function to incorporate byte-size checks, stopping batch accumulation if the BatchBytes limit is reached or would be exceeded by the next event.
  • utils/dynstream/event_queue_batch_bytes_test.go
    • Added a new test file to cover the byte-based batching logic.
    • Included tests for scenarios where batching stops before exceeding the byte limit and where a single event larger than the limit is correctly handled.
  • utils/dynstream/interfaces.go
    • Updated the documentation comment for the BatchBytes field in the Option struct to clarify its behavior (<= 0 disables byte-based batching).
Activity
  • The pull request description includes a template for problem description, changes, and checklist, but specific details were not filled out by the author.
  • An issue number placeholder close #xxx is present, indicating an issue was intended to be linked but not specified.
  • No specific comments or reviews have been provided in the context.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@coderabbitai
Copy link

coderabbitai bot commented Feb 4, 2026

📝 Walkthrough

Walkthrough

Adds a generic batcher and per-area batch-config store; integrates per-area batching into dynamic streams and event queues, updates constructor signatures and pop/processing flow, and propagates batchCount/batchBytes through API, config, dispatcher, event collector, and tests.

Changes

Cohort / File(s) Summary
Batching primitives
utils/dynstream/batcher.go, utils/dynstream/area_batch_config_store.go
Introduce internal batcher and batchConfig types plus an areaBatchConfigStore with atomic copy-on-write map, ref-counted lifecycle, and per-area override APIs.
Event queue batching
utils/dynstream/event_queue.go, utils/dynstream/event_queue_batch_bytes_test.go, utils/dynstream/event_queue_per_area_batch_config_test.go
eventQueue gains batchConfigStore; newEventQueue signature extended; popEvents now accepts a batcher, uses per-area limits, returns batched events and bytes; tests added for byte limits and per-area batch-count.
Stream wiring & runtime
utils/dynstream/stream.go, utils/dynstream/parallel_dynamic_stream.go, utils/dynstream/event_queue_test.go, utils/dynstream/stream_test.go, utils/dynstream/stream_bench_test.go
Propagate areaBatchConfigStore through newStream/newEventQueue, initialize and use batcher in handleLoop, call store hooks on add/remove path; update call sites and tests to new signatures.
Dynstream interfaces
utils/dynstream/interfaces.go
Add SetAreaBatchConfig to DynamicStream, change Option BatchCount/BatchBytes to uint64, and add NewBatchConfig helper (duplicate definition present in diff).
Area store tests
utils/dynstream/area_batch_config_store_test.go
Add tests validating noop behavior without paths, setting/clearing overrides, and cleaning overrides on path removal.
API & config plumbing
api/v2/model.go, api/v2/model_test.go, pkg/config/replica_config.go, pkg/config/changefeed.go, pkg/config/changefeed_batch_config_test.go
Expose EventCollectorBatchCount/EventCollectorBatchBytes in API/internal replica config, add defaults, propagate through conversions, and add tests for mapping behavior.
Dispatcher & eventcollector wiring
downstreamadapter/dispatcher/*, downstreamadapter/eventcollector/*, downstreamadapter/dispatchermanager/*
Add SharedInfo fields and accessors for batch settings; pass batchCount/bytes into NewSharedInfo; add interface methods and mock implementations in tests; apply per-changefeed batch config via ds.SetAreaBatchConfig during PrepareAddDispatcher.
Tests & minor updates
utils/dynstream/*_test.go, downstreamadapter/*_test.go, pkg/config/*_test.go
Update tests to new constructors/signatures, construct/use batchers in tests, add mocks/helpers to satisfy new interfaces and behaviors, and add validation tests for zero batchCount.

Sequence Diagram

sequenceDiagram
    participant PDS as parallelDynamicStream
    participant S as stream
    participant EQ as eventQueue
    participant ABCS as areaBatchConfigStore
    participant B as batcher

    PDS->>ABCS: newAreaBatchConfigStore(defaultConfig)
    PDS->>S: newStream(..., batchConfigStore)
    S->>EQ: newEventQueue(..., batchConfigStore)

    PDS->>ABCS: onAddPath(area)
    ABCS-->>PDS: refCount++

    S->>B: newBatcher(defaultConfig)
    loop handleLoop
        S->>EQ: popEvents(&B)
        EQ->>ABCS: getBatchConfig(path.area)
        ABCS-->>EQ: batchConfig
        EQ->>B: addEvent(event, bytes)
        EQ->>B: isFull?
        alt full
            B->>EQ: flush() -> batchedEvents, nBytes
            EQ-->>S: batchedEvents, pathInfo, nBytes
        else not full
            EQ-->>S: partial/none
        end
        S->>B: reset()
    end

    PDS->>ABCS: setAreaBatchConfig(area, count, bytes)
    ABCS->>ABCS: copy-on-write update
    PDS->>ABCS: onRemovePath(area)
    ABCS-->>PDS: refCount--, maybe remove override
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Suggested labels

lgtm, approved

Suggested reviewers

  • flowbehappy
  • asddongmen
  • bufferflies
  • lidezhu

Poem

🐰 I hopped through queues with bytes and counts,
Per-area rules tucked in atomic mounts,
Batches filled and flushed with steady beat,
Paths kept true — one hop, one treat,
A rabbit cheers: batching complete! 🥕

🚥 Pre-merge checks | ✅ 1 | ❌ 2
❌ Failed checks (2 warnings)
Check name Status Explanation Resolution
Description check ⚠️ Warning The PR description consists entirely of the template with all placeholder values unchanged, including 'Issue Number: close #xxx', unchecked test options, and unanswered questions about compatibility and documentation. Fill in the concrete issue number, specify which tests were added, answer compatibility/documentation questions, and provide a meaningful release note instead of leaving template placeholders.
Docstring Coverage ⚠️ Warning Docstring coverage is 21.74% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (1 passed)
Check name Status Explanation
Title check ✅ Passed The PR title 'ds: support batch also consider the batch bytes' clearly summarizes the main change: adding byte-based batch size limits alongside count-based batching.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces support for byte-based batching in the dynamic stream's event queue, in addition to the existing count-based batching. The changes in event_queue.go correctly implement the logic to stop batching when a byte limit is reached, including handling edge cases where the first event itself exceeds the limit. A new test file event_queue_batch_bytes_test.go is added with good test cases that verify the new functionality. The update to the comment for BatchBytes in interfaces.go clarifies its behavior. Overall, the implementation is solid. I have one suggestion to improve the maintainability of the new test code by reducing duplication.

Comment on lines 9 to 27
func TestEventQueueBatchBytesStopsBeforeExceedingLimit(t *testing.T) {
handler := &mockHandler{}
option := Option{
BatchCount: 10,
BatchBytes: 100,
}

q := newEventQueue[int, string, *mockEvent, any, *mockHandler](option, handler)
pi := newPathInfo[int, string, *mockEvent, any, *mockHandler](0, "test", "path1", nil)
q.initPath(pi)

appendEvent := func(id int, size int) {
q.appendEvent(eventWrap[int, string, *mockEvent, any, *mockHandler]{
event: &mockEvent{id: id, path: "path1"},
pathInfo: pi,
eventSize: size,
eventType: EventType{DataGroup: 1, Property: BatchableData},
})
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There is significant code duplication in the setup logic across the test functions in this file. TestEventQueueBatchBytesStopsBeforeExceedingLimit and TestEventQueueBatchBytesAllowsFirstEventLargerThanLimit both have nearly identical blocks for creating the event queue, path info, and the appendEvent helper. To improve maintainability and reduce redundancy, consider extracting this common setup into a test helper function. This helper could accept t *testing.T and option Option as parameters and return the necessary components for the tests.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@utils/dynstream/event_queue_batch_bytes_test.go`:
- Around line 1-7: Add the standard copyright header to the top of the test file
before the package declaration in event_queue_batch_bytes_test.go (the file that
starts with "package dynstream") so that all source files include the required
notice; open the file containing package dynstream and insert the project's
canonical copyright header comment block above the "package dynstream" line,
preserving the rest of the imports and test code.
🧹 Nitpick comments (1)
utils/dynstream/event_queue_batch_bytes_test.go (1)

45-79: Good edge case coverage for oversized first event.

This test correctly validates the important behavior that a first event larger than BatchBytes is still processed, preventing potential deadlocks where events could never be popped.

Consider adding tests for:

  • Disabled batching (BatchBytes <= 0) to verify it falls back to count-only batching
  • Events that exactly hit the limit (e.g., BatchBytes=100, single event of 100 bytes)

@ti-chi-bot ti-chi-bot bot added size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. and removed size/L Denotes a PR that changes 100-499 lines, ignoring generated files. labels Feb 4, 2026
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
utils/dynstream/event_queue_test.go (1)

16-20: ⚠️ Potential issue | 🟠 Major

Run make fmt to fix gci formatting (CI failing).

The pipeline indicates gci rewrote this file. Please run make fmt and commit the formatted output to unblock CI.
As per coding guidelines, Use gofmt to keep Go code clean; run make fmt before pushing to format code with gci, gofumports, and shfmt, plus log-style checks.

utils/dynstream/stream.go (1)

16-26: ⚠️ Potential issue | 🟠 Major

Run make fmt to fix gci formatting (CI failing).

The pipeline indicates gci rewrote this file. Please run make fmt and commit the formatted output to unblock CI.
As per coding guidelines, Use gofmt to keep Go code clean; run make fmt before pushing to format code with gci, gofumports, and shfmt, plus log-style checks.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
utils/dynstream/event_queue_test.go (1)

16-20: ⚠️ Potential issue | 🟡 Minor

Fix gci formatting in this file.

The pipeline reports gci formatting changes; please re-run the formatter so the import block (Line 16 and surrounding) is in the expected order. As per coding guidelines, use gofmt to keep Go code clean; run make fmt before pushing to format code with gci, gofumports, and shfmt, plus log-style checks.

utils/dynstream/stream.go (1)

16-26: ⚠️ Potential issue | 🟡 Minor

Fix gci import ordering to pass CI.

The pipeline reports that gci formatting changed the file. Run make fmt or gci write to apply the required import ordering before merging.

As per coding guidelines: "Use gofmt to keep Go code clean; run make fmt before pushing to format code with gci, gofumports, and shfmt".

🧹 Nitpick comments (1)
utils/dynstream/batcher.go (1)

21-28: Normalize negative batchBytes in newBatchConfig.

Line 21 currently keeps negative batchBytes values, while other call sites clamp them. Normalizing here makes the config canonical and avoids subtle mismatches when comparing to defaults.

♻️ Suggested fix
 func newBatchConfig(batchCount int, batchBytes int) batchConfig {
 	if batchCount <= 0 {
 		batchCount = 1
 	}
+	if batchBytes < 0 {
+		batchBytes = 0
+	}
 	return batchConfig{
 		batchCount: batchCount,
 		batchBytes: batchBytes,
 	}
 }

@ti-chi-bot
Copy link

ti-chi-bot bot commented Feb 5, 2026

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign flowbehappy for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@api/v2/model_test.go`:
- Around line 36-37: Update the test to supply uint64-typed literals for
EventCollectorBatchCount and EventCollectorBatchBytes now that the API fields
are *uint64; replace usages like util.AddressOf(4096) and util.AddressOf(64 *
1024 * 1024) with util.AddressOf(uint64(4096)) and util.AddressOf(uint64(64 *
1024 * 1024)) (or use explicit suffixed literals like 4096 as uint64) so the
values match the *uint64 type expected by EventCollectorBatchCount and
EventCollectorBatchBytes.

In `@api/v2/model.go`:
- Around line 216-217: The API ReplicaConfig declares EventCollectorBatchCount
and EventCollectorBatchBytes as *int but the internal config.ReplicaConfig uses
*uint64, causing type mismatches; update the API fields to *uint64
(EventCollectorBatchCount and EventCollectorBatchBytes in api/v2/model.go) so
they match pkg/config/replica_config.go, and adjust any marshaling/validation if
needed to accept uint64 values (alternatively, if you prefer conversions, add
explicit conversions where ReplicaConfig is converted to config.ReplicaConfig,
but the recommended fix is changing the two API field types to *uint64 to keep
types consistent).
🧹 Nitpick comments (3)
pkg/config/changefeed_batch_config_test.go (2)

33-33: Remove or address the TODO comment.

The comment // todo: why this test, what's the purpose? appears to be a placeholder that should be removed or replaced with a meaningful description. The test's purpose is clear: it validates that when EventCollectorBatchCount and EventCollectorBatchBytes are not explicitly set (nil), ToChangefeedConfig returns zero values.

♻️ Suggested fix
-// todo: why this test, what's the purpose?
+// TestChangeFeedInfoToChangefeedConfig_EventCollectorBatchConfigDefaultZero verifies that
+// unset (nil) batch config fields default to zero values in the output ChangefeedConfig.
 func TestChangeFeedInfoToChangefeedConfig_EventCollectorBatchConfigDefaultZero(t *testing.T) {

29-30: Consider explicit uint64 type in assertions for clarity.

While require.Equal handles the comparison, using explicit uint64 literals would make the expected types clearer and avoid potential confusion:

require.Equal(t, uint64(4096), cfg.EventCollectorBatchCount)
require.Equal(t, uint64(64*1024*1024), cfg.EventCollectorBatchBytes)
// and
require.Equal(t, uint64(0), cfg.EventCollectorBatchCount)
require.Equal(t, uint64(0), cfg.EventCollectorBatchBytes)

Also applies to: 50-51

pkg/config/replica_config.go (1)

42-45: Track or resolve the TODO before merging.

The comment indicates uncertainty about the default value for defaultEventCollectorBatchBytes. While 0 (no limit) is a safe default, consider:

  1. Defining a reasonable default based on expected event sizes and memory constraints
  2. Creating an issue to track this if the decision is deferred

The comment "may change this to a reasonable value" suggests this needs further consideration.

Would you like me to open an issue to track determining an appropriate default value for defaultEventCollectorBatchBytes?

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@utils/dynstream/area_batch_config_store.go`:
- Around line 80-83: The check for negative batchBytes is dead code because
batchConfig.batchBytes is a uint64; remove the unreachable condition and
assignment in the area_batch_config_store logic—specifically delete the if block
that inspects newConfig.batchBytes in the code that calls newBatchConfig
(referencing newBatchConfig and batchConfig.batchBytes) so the created newConfig
is used directly without the impossible negative check.

In `@utils/dynstream/interfaces.go`:
- Around line 283-292: NewBatchConfig currently returns (int, int) while
SetAreaBatchConfig expects (uint64, uint64); change NewBatchConfig's signature
to return (uint64, uint64), normalize using its existing checks (treat
batchCount<=0 as 1 and batchBytes<0 as 0) and cast/convert values to uint64
before returning so callers can pass results directly to SetAreaBatchConfig;
update any call sites that relied on the old int signature to remove explicit
conversions if present.
🧹 Nitpick comments (1)
downstreamadapter/dispatchermanager/dispatcher_manager.go (1)

312-330: Consider documenting the "0 means use default" behavior.

The logic treats 0 in config as "use default" rather than an explicit value. This is fine for batchBytes where 0 means disabled, but for batchCount, this means there's no way to explicitly configure a batch count of 0 (unbatched).

If this is intentional (batch count should never be 0), consider adding a brief comment explaining this design choice. Otherwise, users might be confused when setting EventCollectorBatchCount: 0 doesn't disable batching.

💡 Optional: Add clarifying comment
 func (e *DispatcherManager) getEventCollectorBatchCountAndBytes() (uint64, uint64) {
 	var (
+		// Default batch count of 4096; config value of 0 is treated as "use default"
 		batchCount uint64 = 4096
+		// Default batch bytes of 0 (disabled); config value of 0 keeps it disabled
 		batchBytes uint64 = 0
 	)

Comment on lines 283 to 292
// NewBatchConfig normalizes batching parameters and returns values that can be passed to SetAreaBatchConfig.
func NewBatchConfig(batchCount int, batchBytes int) (int, int) {
if batchCount <= 0 {
batchCount = 1
}
if batchBytes < 0 {
batchBytes = 0
}
return batchCount, batchBytes
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Type mismatch between NewBatchConfig return type and SetAreaBatchConfig parameters.

The function NewBatchConfig returns (int, int) but the SetAreaBatchConfig method (line 199) expects (uint64, uint64). This inconsistency will require callers to perform type conversion, which defeats the purpose of this helper function.

Consider changing the return type to match the interface:

🔧 Proposed fix
-func NewBatchConfig(batchCount int, batchBytes int) (int, int) {
+func NewBatchConfig(batchCount int, batchBytes int) (uint64, uint64) {
 	if batchCount <= 0 {
 		batchCount = 1
 	}
 	if batchBytes < 0 {
 		batchBytes = 0
 	}
-	return batchCount, batchBytes
+	return uint64(batchCount), uint64(batchBytes)
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// NewBatchConfig normalizes batching parameters and returns values that can be passed to SetAreaBatchConfig.
func NewBatchConfig(batchCount int, batchBytes int) (int, int) {
if batchCount <= 0 {
batchCount = 1
}
if batchBytes < 0 {
batchBytes = 0
}
return batchCount, batchBytes
}
// NewBatchConfig normalizes batching parameters and returns values that can be passed to SetAreaBatchConfig.
func NewBatchConfig(batchCount int, batchBytes int) (uint64, uint64) {
if batchCount <= 0 {
batchCount = 1
}
if batchBytes < 0 {
batchBytes = 0
}
return uint64(batchCount), uint64(batchBytes)
}
🤖 Prompt for AI Agents
In `@utils/dynstream/interfaces.go` around lines 283 - 292, NewBatchConfig
currently returns (int, int) while SetAreaBatchConfig expects (uint64, uint64);
change NewBatchConfig's signature to return (uint64, uint64), normalize using
its existing checks (treat batchCount<=0 as 1 and batchBytes<0 as 0) and
cast/convert values to uint64 before returning so callers can pass results
directly to SetAreaBatchConfig; update any call sites that relied on the old int
signature to remove explicit conversions if present.

@ti-chi-bot
Copy link

ti-chi-bot bot commented Feb 6, 2026

[FORMAT CHECKER NOTIFICATION]

Notice: To remove the do-not-merge/needs-linked-issue label, please provide the linked issue number on one line in the PR body, for example: Issue Number: close #123 or Issue Number: ref #456.

📖 For more info, you can check the "Contribute Code" section in the development guide.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@pkg/config/changefeed.go`:
- Around line 281-282: VerifyAndComplete is not backfilling defaults for
info.Config.EventCollectorBatchCount and info.Config.EventCollectorBatchBytes,
so persisted ChangeFeedInfo objects from before those fields existed end up with
nil pointers and later fail ValidateAndAdjust (which disallows zero). In
VerifyAndComplete (method VerifyAndComplete on ChangeFeedInfo) set
info.Config.EventCollectorBatchCount and info.Config.EventCollectorBatchBytes to
the same non-nil default values used by validation (the defaults used by
ValidateAndAdjust or the package-level defaults for event collector batch
size/count) when they are nil, so subsequent util.GetOrZero calls and
ValidateAndAdjust see valid non-zero values; ensure you assign pointer types
(e.g., &defaultCount, &defaultBytes) to info.Config.* fields.
🧹 Nitpick comments (4)
pkg/config/main_test.go (1)

23-27: LGTM — minor consistency note on IgnoreAnyFunction vs IgnoreTopFunction.

The change correctly passes a goleak ignore option to SetUpLeakTest, matching the existing variadic signature. The existing codebase example in pkg/leakutil/leak_helper_test.go uses goleak.IgnoreTopFunction, which is more precise (matches only when the function is at the top of the stack). IgnoreAnyFunction is more permissive and could mask a real leak if inWorker appears elsewhere in the stack. Consider using IgnoreTopFunction for consistency and precision, unless you've confirmed that inWorker doesn't always appear at the top.

♻️ Optional: use IgnoreTopFunction for consistency
 	leakutil.SetUpLeakTest(m,
-		goleak.IgnoreAnyFunction("github.com/godbus/dbus.(*Conn).inWorker"),
+		goleak.IgnoreTopFunction("github.com/godbus/dbus.(*Conn).inWorker"),
 	)
downstreamadapter/dispatcher/basic_dispatcher_info.go (1)

77-110: Consider a config struct to tame the growing parameter list.

NewSharedInfo now takes 15 parameters. While functional, this is getting unwieldy and error-prone (easy to swap positional arguments of the same type). A dedicated options/config struct would improve readability. Not blocking, but worth considering in a follow-up.

pkg/config/replica_config_test.go (1)

197-208: Good coverage for the zero-count rejection path.

Consider adding a companion test that verifies a valid positive EventCollectorBatchCount passes ValidateAndAdjust without error, and one for nil (unset) to confirm that case is also accepted. This would provide more complete coverage of the validation logic.

pkg/config/replica_config.go (1)

326-329: Validation only covers EventCollectorBatchCount; consider also validating EventCollectorBatchBytes.

Currently only EventCollectorBatchCount is validated (must be >0 when set). There's no validation for EventCollectorBatchBytes. While 0 correctly means "disabled," a negative value isn't possible with uint64, so the main risk is a user explicitly setting it to some nonsensical value. The asymmetry is fine for now, but you might want to add a brief comment explaining why EventCollectorBatchBytes doesn't need the same zero-check (i.e., 0 = disabled).

Comment on lines +281 to +282
EventCollectorBatchCount: util.GetOrZero(info.Config.EventCollectorBatchCount),
EventCollectorBatchBytes: util.GetOrZero(info.Config.EventCollectorBatchBytes),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify if VerifyAndComplete or FixIncompatible handles EventCollector defaults
rg -n 'EventCollectorBatch' pkg/config/changefeed.go
echo "---"
# Check if there's any other place defaults are applied for these fields
rg -n 'EventCollectorBatch' --type=go -g '!*_test.go'

Repository: pingcap/ticdc

Length of output: 4772


🏁 Script executed:

#!/bin/bash
# Get the VerifyAndComplete function
sed -n '420,470p' pkg/config/changefeed.go | cat -n
echo "=== Checking ChangeFeedInfo structure ==="
sed -n '1,100p' pkg/config/changefeed.go | cat -n

Repository: pingcap/ticdc

Length of output: 5477


🏁 Script executed:

#!/bin/bash
# Get the full ChangeFeedInfo structure
rg -n 'type ChangeFeedInfo struct' pkg/config/changefeed.go -A 20
echo "=== Checking util.GetOrZero ==="
rg -n 'func GetOrZero' --type=go
echo "=== Check lines around 281-282 for context ==="
sed -n '270,290p' pkg/config/changefeed.go | cat -n

Repository: pingcap/ticdc

Length of output: 2765


🏁 Script executed:

#!/bin/bash
# Look for ValidateAndAdjust or validation related to EventCollectorBatchCount
rg -n 'ValidateAndAdjust|EventCollectorBatchCount == 0' pkg/config/replica_config.go -B 2 -A 2
echo "=== Check util.GetOrZero implementation ==="
cat -n pkg/util/pointer.go

Repository: pingcap/ticdc

Length of output: 1823


🏁 Script executed:

#!/bin/bash
# Check where ToChangefeedConfig is called to understand the flow
rg -n 'ToChangefeedConfig' --type=go | head -20
echo "=== Check if there are any other places where EventCollectorBatch defaults are set ==="
rg -n 'EventCollectorBatchCount.*=' pkg/config/changefeed.go
rg -n 'EventCollectorBatchCount.*=' pkg/config/replica_config.go

Repository: pingcap/ticdc

Length of output: 1698


🏁 Script executed:

#!/bin/bash
# Check the test file for EventCollectorBatchConfig
cat -n pkg/config/changefeed_batch_config_test.go

Repository: pingcap/ticdc

Length of output: 1921


🏁 Script executed:

#!/bin/bash
# Check where ValidateAndAdjust is called in relation to VerifyAndComplete and ToChangefeedConfig
rg -n 'ValidateAndAdjust\|VerifyAndComplete' api/v2/changefeed.go -B 2 -A 2
echo "=== Check if ValidateAndAdjust is called on ReplicaConfig in the flow ==="
rg -n 'ValidateAndAdjust' maintainer/maintainer.go -B 2 -A 2 | head -40

Repository: pingcap/ticdc

Length of output: 130


🏁 Script executed:

#!/bin/bash
# Check the changefeed creation flow more carefully
rg -n 'VerifyAndComplete' api/v2/changefeed.go -B 3 -A 3
echo "=== Check maintainer flow ==="
rg -n 'VerifyAndComplete\|ValidateAndAdjust' maintainer/maintainer.go -B 1 -A 1 | head -50

Repository: pingcap/ticdc

Length of output: 86


🏁 Script executed:

#!/bin/bash
# Search broadly for where ValidateAndAdjust is called
rg -n '\.ValidateAndAdjust\(' --type=go -B 2 -A 2 | grep -A 4 -B 4 'ReplicaConfig\|info.Config'
echo "=== Check server api for changefeed creation ==="
rg -n 'VerifyAndComplete' --type=go -B 3 -A 3 | head -60

Repository: pingcap/ticdc

Length of output: 6580


🏁 Script executed:

#!/bin/bash
# Let me trace the complete flow - check what happens with nil values
# Check if there's any backfill happening in the dispatchermanager when values are 0/nil
sed -n '314,330p' downstreamadapter/dispatchermanager/dispatcher_manager.go | cat -n
echo "=== Check what GetDefaultReplicaConfig returns for EventCollectorBatch fields ==="
grep -n 'GetDefaultReplicaConfig' pkg/config/replica_config.go -A 30 | head -50

Repository: pingcap/ticdc

Length of output: 2225


🏁 Script executed:

#!/bin/bash
# Check what defaultReplicaConfig is initialized with
grep -n 'var defaultReplicaConfig' pkg/config/replica_config.go -A 40 | head -50

Repository: pingcap/ticdc

Length of output: 2338


🏁 Script executed:

#!/bin/bash
# Double-check the type of EventCollectorBatchCount in ChangefeedConfig
rg -n 'type ChangefeedConfig struct' -A 50 pkg/config/changefeed.go | head -60

Repository: pingcap/ticdc

Length of output: 2969


VerifyAndComplete does not backfill defaults for EventCollectorBatchCount and EventCollectorBatchBytes fields.

When loading a ChangeFeedInfo persisted before these fields existed, info.Config.EventCollectorBatchCount and info.Config.EventCollectorBatchBytes will be nil. util.GetOrZero() converts these to 0, but ValidateAndAdjust() explicitly rejects zero values for EventCollectorBatchCount (line 326).

VerifyAndComplete() backfills defaults for other pointer fields (e.g., Integrity, ChangefeedErrorStuckDuration, ActiveActiveProgressInterval), but omits the EventCollectorBatch fields. Add backfill to maintain consistency:

Proposed fix in VerifyAndComplete
 	if util.GetOrZero(info.Config.MemoryQuota) == uint64(0) {
 		info.fixMemoryQuota()
 	}
+
+	if info.Config.EventCollectorBatchCount == nil {
+		info.Config.EventCollectorBatchCount = util.AddressOf(DefaultEventCollectorBatchCount)
+	}
+	if info.Config.EventCollectorBatchBytes == nil {
+		info.Config.EventCollectorBatchBytes = util.AddressOf(DefaultEventCollectorBatchBytes)
+	}
 
 	if info.Config.Integrity == nil {
🤖 Prompt for AI Agents
In `@pkg/config/changefeed.go` around lines 281 - 282, VerifyAndComplete is not
backfilling defaults for info.Config.EventCollectorBatchCount and
info.Config.EventCollectorBatchBytes, so persisted ChangeFeedInfo objects from
before those fields existed end up with nil pointers and later fail
ValidateAndAdjust (which disallows zero). In VerifyAndComplete (method
VerifyAndComplete on ChangeFeedInfo) set info.Config.EventCollectorBatchCount
and info.Config.EventCollectorBatchBytes to the same non-nil default values used
by validation (the defaults used by ValidateAndAdjust or the package-level
defaults for event collector batch size/count) when they are nil, so subsequent
util.GetOrZero calls and ValidateAndAdjust see valid non-zero values; ensure you
assign pointer types (e.g., &defaultCount, &defaultBytes) to info.Config.*
fields.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

do-not-merge/needs-linked-issue release-note Denotes a PR that will be considered when it comes time to generate release notes. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant