-
Notifications
You must be signed in to change notification settings - Fork 38
ds: support batch also consider the batch bytes #4128
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
ds: support batch also consider the batch bytes #4128
Conversation
Summary of ChangesHello @3AceShowHand, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a significant enhancement to the dynamic stream's event queue by adding byte-based batching capabilities. Previously, event batching was primarily governed by the number of events. With this change, users can now configure a maximum byte size for event batches, providing finer control over memory consumption and improving the stability and efficiency of event processing, especially when dealing with events of varying sizes. This ensures that batches remain within manageable memory limits, preventing potential resource exhaustion. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
📝 WalkthroughWalkthroughAdds a generic batcher and per-area batch-config store; integrates per-area batching into dynamic streams and event queues, updates constructor signatures and pop/processing flow, and propagates batchCount/batchBytes through API, config, dispatcher, event collector, and tests. Changes
Sequence DiagramsequenceDiagram
participant PDS as parallelDynamicStream
participant S as stream
participant EQ as eventQueue
participant ABCS as areaBatchConfigStore
participant B as batcher
PDS->>ABCS: newAreaBatchConfigStore(defaultConfig)
PDS->>S: newStream(..., batchConfigStore)
S->>EQ: newEventQueue(..., batchConfigStore)
PDS->>ABCS: onAddPath(area)
ABCS-->>PDS: refCount++
S->>B: newBatcher(defaultConfig)
loop handleLoop
S->>EQ: popEvents(&B)
EQ->>ABCS: getBatchConfig(path.area)
ABCS-->>EQ: batchConfig
EQ->>B: addEvent(event, bytes)
EQ->>B: isFull?
alt full
B->>EQ: flush() -> batchedEvents, nBytes
EQ-->>S: batchedEvents, pathInfo, nBytes
else not full
EQ-->>S: partial/none
end
S->>B: reset()
end
PDS->>ABCS: setAreaBatchConfig(area, count, bytes)
ABCS->>ABCS: copy-on-write update
PDS->>ABCS: onRemovePath(area)
ABCS-->>PDS: refCount--, maybe remove override
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces support for byte-based batching in the dynamic stream's event queue, in addition to the existing count-based batching. The changes in event_queue.go correctly implement the logic to stop batching when a byte limit is reached, including handling edge cases where the first event itself exceeds the limit. A new test file event_queue_batch_bytes_test.go is added with good test cases that verify the new functionality. The update to the comment for BatchBytes in interfaces.go clarifies its behavior. Overall, the implementation is solid. I have one suggestion to improve the maintainability of the new test code by reducing duplication.
| func TestEventQueueBatchBytesStopsBeforeExceedingLimit(t *testing.T) { | ||
| handler := &mockHandler{} | ||
| option := Option{ | ||
| BatchCount: 10, | ||
| BatchBytes: 100, | ||
| } | ||
|
|
||
| q := newEventQueue[int, string, *mockEvent, any, *mockHandler](option, handler) | ||
| pi := newPathInfo[int, string, *mockEvent, any, *mockHandler](0, "test", "path1", nil) | ||
| q.initPath(pi) | ||
|
|
||
| appendEvent := func(id int, size int) { | ||
| q.appendEvent(eventWrap[int, string, *mockEvent, any, *mockHandler]{ | ||
| event: &mockEvent{id: id, path: "path1"}, | ||
| pathInfo: pi, | ||
| eventSize: size, | ||
| eventType: EventType{DataGroup: 1, Property: BatchableData}, | ||
| }) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is significant code duplication in the setup logic across the test functions in this file. TestEventQueueBatchBytesStopsBeforeExceedingLimit and TestEventQueueBatchBytesAllowsFirstEventLargerThanLimit both have nearly identical blocks for creating the event queue, path info, and the appendEvent helper. To improve maintainability and reduce redundancy, consider extracting this common setup into a test helper function. This helper could accept t *testing.T and option Option as parameters and return the necessary components for the tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@utils/dynstream/event_queue_batch_bytes_test.go`:
- Around line 1-7: Add the standard copyright header to the top of the test file
before the package declaration in event_queue_batch_bytes_test.go (the file that
starts with "package dynstream") so that all source files include the required
notice; open the file containing package dynstream and insert the project's
canonical copyright header comment block above the "package dynstream" line,
preserving the rest of the imports and test code.
🧹 Nitpick comments (1)
utils/dynstream/event_queue_batch_bytes_test.go (1)
45-79: Good edge case coverage for oversized first event.This test correctly validates the important behavior that a first event larger than
BatchBytesis still processed, preventing potential deadlocks where events could never be popped.Consider adding tests for:
- Disabled batching (
BatchBytes <= 0) to verify it falls back to count-only batching- Events that exactly hit the limit (e.g.,
BatchBytes=100, single event of 100 bytes)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
utils/dynstream/event_queue_test.go (1)
16-20:⚠️ Potential issue | 🟠 MajorRun
make fmtto fix gci formatting (CI failing).The pipeline indicates gci rewrote this file. Please run
make fmtand commit the formatted output to unblock CI.
As per coding guidelines, Usegofmtto keep Go code clean; runmake fmtbefore pushing to format code withgci,gofumports, andshfmt, plus log-style checks.utils/dynstream/stream.go (1)
16-26:⚠️ Potential issue | 🟠 MajorRun
make fmtto fix gci formatting (CI failing).The pipeline indicates gci rewrote this file. Please run
make fmtand commit the formatted output to unblock CI.
As per coding guidelines, Usegofmtto keep Go code clean; runmake fmtbefore pushing to format code withgci,gofumports, andshfmt, plus log-style checks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
utils/dynstream/event_queue_test.go (1)
16-20:⚠️ Potential issue | 🟡 MinorFix gci formatting in this file.
The pipeline reports gci formatting changes; please re-run the formatter so the import block (Line 16 and surrounding) is in the expected order. As per coding guidelines, use
gofmtto keep Go code clean; runmake fmtbefore pushing to format code withgci,gofumports, andshfmt, plus log-style checks.utils/dynstream/stream.go (1)
16-26:⚠️ Potential issue | 🟡 MinorFix gci import ordering to pass CI.
The pipeline reports that gci formatting changed the file. Run
make fmtorgci writeto apply the required import ordering before merging.As per coding guidelines: "Use
gofmtto keep Go code clean; runmake fmtbefore pushing to format code withgci,gofumports, andshfmt".
🧹 Nitpick comments (1)
utils/dynstream/batcher.go (1)
21-28: Normalize negativebatchBytesinnewBatchConfig.Line 21 currently keeps negative
batchBytesvalues, while other call sites clamp them. Normalizing here makes the config canonical and avoids subtle mismatches when comparing to defaults.♻️ Suggested fix
func newBatchConfig(batchCount int, batchBytes int) batchConfig { if batchCount <= 0 { batchCount = 1 } + if batchBytes < 0 { + batchBytes = 0 + } return batchConfig{ batchCount: batchCount, batchBytes: batchBytes, } }
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@api/v2/model_test.go`:
- Around line 36-37: Update the test to supply uint64-typed literals for
EventCollectorBatchCount and EventCollectorBatchBytes now that the API fields
are *uint64; replace usages like util.AddressOf(4096) and util.AddressOf(64 *
1024 * 1024) with util.AddressOf(uint64(4096)) and util.AddressOf(uint64(64 *
1024 * 1024)) (or use explicit suffixed literals like 4096 as uint64) so the
values match the *uint64 type expected by EventCollectorBatchCount and
EventCollectorBatchBytes.
In `@api/v2/model.go`:
- Around line 216-217: The API ReplicaConfig declares EventCollectorBatchCount
and EventCollectorBatchBytes as *int but the internal config.ReplicaConfig uses
*uint64, causing type mismatches; update the API fields to *uint64
(EventCollectorBatchCount and EventCollectorBatchBytes in api/v2/model.go) so
they match pkg/config/replica_config.go, and adjust any marshaling/validation if
needed to accept uint64 values (alternatively, if you prefer conversions, add
explicit conversions where ReplicaConfig is converted to config.ReplicaConfig,
but the recommended fix is changing the two API field types to *uint64 to keep
types consistent).
🧹 Nitpick comments (3)
pkg/config/changefeed_batch_config_test.go (2)
33-33: Remove or address the TODO comment.The comment
// todo: why this test, what's the purpose?appears to be a placeholder that should be removed or replaced with a meaningful description. The test's purpose is clear: it validates that whenEventCollectorBatchCountandEventCollectorBatchBytesare not explicitly set (nil),ToChangefeedConfigreturns zero values.♻️ Suggested fix
-// todo: why this test, what's the purpose? +// TestChangeFeedInfoToChangefeedConfig_EventCollectorBatchConfigDefaultZero verifies that +// unset (nil) batch config fields default to zero values in the output ChangefeedConfig. func TestChangeFeedInfoToChangefeedConfig_EventCollectorBatchConfigDefaultZero(t *testing.T) {
29-30: Consider explicit uint64 type in assertions for clarity.While
require.Equalhandles the comparison, using explicituint64literals would make the expected types clearer and avoid potential confusion:require.Equal(t, uint64(4096), cfg.EventCollectorBatchCount) require.Equal(t, uint64(64*1024*1024), cfg.EventCollectorBatchBytes) // and require.Equal(t, uint64(0), cfg.EventCollectorBatchCount) require.Equal(t, uint64(0), cfg.EventCollectorBatchBytes)Also applies to: 50-51
pkg/config/replica_config.go (1)
42-45: Track or resolve the TODO before merging.The comment indicates uncertainty about the default value for
defaultEventCollectorBatchBytes. While0(no limit) is a safe default, consider:
- Defining a reasonable default based on expected event sizes and memory constraints
- Creating an issue to track this if the decision is deferred
The comment "may change this to a reasonable value" suggests this needs further consideration.
Would you like me to open an issue to track determining an appropriate default value for
defaultEventCollectorBatchBytes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@utils/dynstream/area_batch_config_store.go`:
- Around line 80-83: The check for negative batchBytes is dead code because
batchConfig.batchBytes is a uint64; remove the unreachable condition and
assignment in the area_batch_config_store logic—specifically delete the if block
that inspects newConfig.batchBytes in the code that calls newBatchConfig
(referencing newBatchConfig and batchConfig.batchBytes) so the created newConfig
is used directly without the impossible negative check.
In `@utils/dynstream/interfaces.go`:
- Around line 283-292: NewBatchConfig currently returns (int, int) while
SetAreaBatchConfig expects (uint64, uint64); change NewBatchConfig's signature
to return (uint64, uint64), normalize using its existing checks (treat
batchCount<=0 as 1 and batchBytes<0 as 0) and cast/convert values to uint64
before returning so callers can pass results directly to SetAreaBatchConfig;
update any call sites that relied on the old int signature to remove explicit
conversions if present.
🧹 Nitpick comments (1)
downstreamadapter/dispatchermanager/dispatcher_manager.go (1)
312-330: Consider documenting the "0 means use default" behavior.The logic treats
0in config as "use default" rather than an explicit value. This is fine forbatchByteswhere 0 means disabled, but forbatchCount, this means there's no way to explicitly configure a batch count of 0 (unbatched).If this is intentional (batch count should never be 0), consider adding a brief comment explaining this design choice. Otherwise, users might be confused when setting
EventCollectorBatchCount: 0doesn't disable batching.💡 Optional: Add clarifying comment
func (e *DispatcherManager) getEventCollectorBatchCountAndBytes() (uint64, uint64) { var ( + // Default batch count of 4096; config value of 0 is treated as "use default" batchCount uint64 = 4096 + // Default batch bytes of 0 (disabled); config value of 0 keeps it disabled batchBytes uint64 = 0 )
| // NewBatchConfig normalizes batching parameters and returns values that can be passed to SetAreaBatchConfig. | ||
| func NewBatchConfig(batchCount int, batchBytes int) (int, int) { | ||
| if batchCount <= 0 { | ||
| batchCount = 1 | ||
| } | ||
| if batchBytes < 0 { | ||
| batchBytes = 0 | ||
| } | ||
| return batchCount, batchBytes | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Type mismatch between NewBatchConfig return type and SetAreaBatchConfig parameters.
The function NewBatchConfig returns (int, int) but the SetAreaBatchConfig method (line 199) expects (uint64, uint64). This inconsistency will require callers to perform type conversion, which defeats the purpose of this helper function.
Consider changing the return type to match the interface:
🔧 Proposed fix
-func NewBatchConfig(batchCount int, batchBytes int) (int, int) {
+func NewBatchConfig(batchCount int, batchBytes int) (uint64, uint64) {
if batchCount <= 0 {
batchCount = 1
}
if batchBytes < 0 {
batchBytes = 0
}
- return batchCount, batchBytes
+ return uint64(batchCount), uint64(batchBytes)
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // NewBatchConfig normalizes batching parameters and returns values that can be passed to SetAreaBatchConfig. | |
| func NewBatchConfig(batchCount int, batchBytes int) (int, int) { | |
| if batchCount <= 0 { | |
| batchCount = 1 | |
| } | |
| if batchBytes < 0 { | |
| batchBytes = 0 | |
| } | |
| return batchCount, batchBytes | |
| } | |
| // NewBatchConfig normalizes batching parameters and returns values that can be passed to SetAreaBatchConfig. | |
| func NewBatchConfig(batchCount int, batchBytes int) (uint64, uint64) { | |
| if batchCount <= 0 { | |
| batchCount = 1 | |
| } | |
| if batchBytes < 0 { | |
| batchBytes = 0 | |
| } | |
| return uint64(batchCount), uint64(batchBytes) | |
| } |
🤖 Prompt for AI Agents
In `@utils/dynstream/interfaces.go` around lines 283 - 292, NewBatchConfig
currently returns (int, int) while SetAreaBatchConfig expects (uint64, uint64);
change NewBatchConfig's signature to return (uint64, uint64), normalize using
its existing checks (treat batchCount<=0 as 1 and batchBytes<0 as 0) and
cast/convert values to uint64 before returning so callers can pass results
directly to SetAreaBatchConfig; update any call sites that relied on the old int
signature to remove explicit conversions if present.
|
[FORMAT CHECKER NOTIFICATION] Notice: To remove the 📖 For more info, you can check the "Contribute Code" section in the development guide. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@pkg/config/changefeed.go`:
- Around line 281-282: VerifyAndComplete is not backfilling defaults for
info.Config.EventCollectorBatchCount and info.Config.EventCollectorBatchBytes,
so persisted ChangeFeedInfo objects from before those fields existed end up with
nil pointers and later fail ValidateAndAdjust (which disallows zero). In
VerifyAndComplete (method VerifyAndComplete on ChangeFeedInfo) set
info.Config.EventCollectorBatchCount and info.Config.EventCollectorBatchBytes to
the same non-nil default values used by validation (the defaults used by
ValidateAndAdjust or the package-level defaults for event collector batch
size/count) when they are nil, so subsequent util.GetOrZero calls and
ValidateAndAdjust see valid non-zero values; ensure you assign pointer types
(e.g., &defaultCount, &defaultBytes) to info.Config.* fields.
🧹 Nitpick comments (4)
pkg/config/main_test.go (1)
23-27: LGTM — minor consistency note onIgnoreAnyFunctionvsIgnoreTopFunction.The change correctly passes a goleak ignore option to
SetUpLeakTest, matching the existing variadic signature. The existing codebase example inpkg/leakutil/leak_helper_test.gousesgoleak.IgnoreTopFunction, which is more precise (matches only when the function is at the top of the stack).IgnoreAnyFunctionis more permissive and could mask a real leak ifinWorkerappears elsewhere in the stack. Consider usingIgnoreTopFunctionfor consistency and precision, unless you've confirmed thatinWorkerdoesn't always appear at the top.♻️ Optional: use IgnoreTopFunction for consistency
leakutil.SetUpLeakTest(m, - goleak.IgnoreAnyFunction("github.com/godbus/dbus.(*Conn).inWorker"), + goleak.IgnoreTopFunction("github.com/godbus/dbus.(*Conn).inWorker"), )downstreamadapter/dispatcher/basic_dispatcher_info.go (1)
77-110: Consider a config struct to tame the growing parameter list.
NewSharedInfonow takes 15 parameters. While functional, this is getting unwieldy and error-prone (easy to swap positional arguments of the same type). A dedicated options/config struct would improve readability. Not blocking, but worth considering in a follow-up.pkg/config/replica_config_test.go (1)
197-208: Good coverage for the zero-count rejection path.Consider adding a companion test that verifies a valid positive
EventCollectorBatchCountpassesValidateAndAdjustwithout error, and one fornil(unset) to confirm that case is also accepted. This would provide more complete coverage of the validation logic.pkg/config/replica_config.go (1)
326-329: Validation only coversEventCollectorBatchCount; consider also validatingEventCollectorBatchBytes.Currently only
EventCollectorBatchCountis validated (must be >0 when set). There's no validation forEventCollectorBatchBytes. While0correctly means "disabled," a negative value isn't possible withuint64, so the main risk is a user explicitly setting it to some nonsensical value. The asymmetry is fine for now, but you might want to add a brief comment explaining whyEventCollectorBatchBytesdoesn't need the same zero-check (i.e., 0 = disabled).
| EventCollectorBatchCount: util.GetOrZero(info.Config.EventCollectorBatchCount), | ||
| EventCollectorBatchBytes: util.GetOrZero(info.Config.EventCollectorBatchBytes), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify if VerifyAndComplete or FixIncompatible handles EventCollector defaults
rg -n 'EventCollectorBatch' pkg/config/changefeed.go
echo "---"
# Check if there's any other place defaults are applied for these fields
rg -n 'EventCollectorBatch' --type=go -g '!*_test.go'Repository: pingcap/ticdc
Length of output: 4772
🏁 Script executed:
#!/bin/bash
# Get the VerifyAndComplete function
sed -n '420,470p' pkg/config/changefeed.go | cat -n
echo "=== Checking ChangeFeedInfo structure ==="
sed -n '1,100p' pkg/config/changefeed.go | cat -nRepository: pingcap/ticdc
Length of output: 5477
🏁 Script executed:
#!/bin/bash
# Get the full ChangeFeedInfo structure
rg -n 'type ChangeFeedInfo struct' pkg/config/changefeed.go -A 20
echo "=== Checking util.GetOrZero ==="
rg -n 'func GetOrZero' --type=go
echo "=== Check lines around 281-282 for context ==="
sed -n '270,290p' pkg/config/changefeed.go | cat -nRepository: pingcap/ticdc
Length of output: 2765
🏁 Script executed:
#!/bin/bash
# Look for ValidateAndAdjust or validation related to EventCollectorBatchCount
rg -n 'ValidateAndAdjust|EventCollectorBatchCount == 0' pkg/config/replica_config.go -B 2 -A 2
echo "=== Check util.GetOrZero implementation ==="
cat -n pkg/util/pointer.goRepository: pingcap/ticdc
Length of output: 1823
🏁 Script executed:
#!/bin/bash
# Check where ToChangefeedConfig is called to understand the flow
rg -n 'ToChangefeedConfig' --type=go | head -20
echo "=== Check if there are any other places where EventCollectorBatch defaults are set ==="
rg -n 'EventCollectorBatchCount.*=' pkg/config/changefeed.go
rg -n 'EventCollectorBatchCount.*=' pkg/config/replica_config.goRepository: pingcap/ticdc
Length of output: 1698
🏁 Script executed:
#!/bin/bash
# Check the test file for EventCollectorBatchConfig
cat -n pkg/config/changefeed_batch_config_test.goRepository: pingcap/ticdc
Length of output: 1921
🏁 Script executed:
#!/bin/bash
# Check where ValidateAndAdjust is called in relation to VerifyAndComplete and ToChangefeedConfig
rg -n 'ValidateAndAdjust\|VerifyAndComplete' api/v2/changefeed.go -B 2 -A 2
echo "=== Check if ValidateAndAdjust is called on ReplicaConfig in the flow ==="
rg -n 'ValidateAndAdjust' maintainer/maintainer.go -B 2 -A 2 | head -40Repository: pingcap/ticdc
Length of output: 130
🏁 Script executed:
#!/bin/bash
# Check the changefeed creation flow more carefully
rg -n 'VerifyAndComplete' api/v2/changefeed.go -B 3 -A 3
echo "=== Check maintainer flow ==="
rg -n 'VerifyAndComplete\|ValidateAndAdjust' maintainer/maintainer.go -B 1 -A 1 | head -50Repository: pingcap/ticdc
Length of output: 86
🏁 Script executed:
#!/bin/bash
# Search broadly for where ValidateAndAdjust is called
rg -n '\.ValidateAndAdjust\(' --type=go -B 2 -A 2 | grep -A 4 -B 4 'ReplicaConfig\|info.Config'
echo "=== Check server api for changefeed creation ==="
rg -n 'VerifyAndComplete' --type=go -B 3 -A 3 | head -60Repository: pingcap/ticdc
Length of output: 6580
🏁 Script executed:
#!/bin/bash
# Let me trace the complete flow - check what happens with nil values
# Check if there's any backfill happening in the dispatchermanager when values are 0/nil
sed -n '314,330p' downstreamadapter/dispatchermanager/dispatcher_manager.go | cat -n
echo "=== Check what GetDefaultReplicaConfig returns for EventCollectorBatch fields ==="
grep -n 'GetDefaultReplicaConfig' pkg/config/replica_config.go -A 30 | head -50Repository: pingcap/ticdc
Length of output: 2225
🏁 Script executed:
#!/bin/bash
# Check what defaultReplicaConfig is initialized with
grep -n 'var defaultReplicaConfig' pkg/config/replica_config.go -A 40 | head -50Repository: pingcap/ticdc
Length of output: 2338
🏁 Script executed:
#!/bin/bash
# Double-check the type of EventCollectorBatchCount in ChangefeedConfig
rg -n 'type ChangefeedConfig struct' -A 50 pkg/config/changefeed.go | head -60Repository: pingcap/ticdc
Length of output: 2969
VerifyAndComplete does not backfill defaults for EventCollectorBatchCount and EventCollectorBatchBytes fields.
When loading a ChangeFeedInfo persisted before these fields existed, info.Config.EventCollectorBatchCount and info.Config.EventCollectorBatchBytes will be nil. util.GetOrZero() converts these to 0, but ValidateAndAdjust() explicitly rejects zero values for EventCollectorBatchCount (line 326).
VerifyAndComplete() backfills defaults for other pointer fields (e.g., Integrity, ChangefeedErrorStuckDuration, ActiveActiveProgressInterval), but omits the EventCollectorBatch fields. Add backfill to maintain consistency:
Proposed fix in VerifyAndComplete
if util.GetOrZero(info.Config.MemoryQuota) == uint64(0) {
info.fixMemoryQuota()
}
+
+ if info.Config.EventCollectorBatchCount == nil {
+ info.Config.EventCollectorBatchCount = util.AddressOf(DefaultEventCollectorBatchCount)
+ }
+ if info.Config.EventCollectorBatchBytes == nil {
+ info.Config.EventCollectorBatchBytes = util.AddressOf(DefaultEventCollectorBatchBytes)
+ }
if info.Config.Integrity == nil {🤖 Prompt for AI Agents
In `@pkg/config/changefeed.go` around lines 281 - 282, VerifyAndComplete is not
backfilling defaults for info.Config.EventCollectorBatchCount and
info.Config.EventCollectorBatchBytes, so persisted ChangeFeedInfo objects from
before those fields existed end up with nil pointers and later fail
ValidateAndAdjust (which disallows zero). In VerifyAndComplete (method
VerifyAndComplete on ChangeFeedInfo) set info.Config.EventCollectorBatchCount
and info.Config.EventCollectorBatchBytes to the same non-nil default values used
by validation (the defaults used by ValidateAndAdjust or the package-level
defaults for event collector batch size/count) when they are nil, so subsequent
util.GetOrZero calls and ValidateAndAdjust see valid non-zero values; ensure you
assign pointer types (e.g., &defaultCount, &defaultBytes) to info.Config.*
fields.
What problem does this PR solve?
Issue Number: close #xxx
What is changed and how it works?
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note
Summary by CodeRabbit
New Features
API / Configuration
Tests