Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
📝 WalkthroughWalkthroughThe changes introduce stop-table task handling with a short retry window in the request cache's add operation, a new finishPoppedNotSent method for flow control notifications, and systematic support for identifying and processing deregistration tasks throughout the region request worker workflow. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ❌ 3❌ Failed checks (2 warnings, 1 inconclusive)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Tip Issue Planner is now in beta. Read the docs and try it out! Share your feedback on Discord. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Summary of ChangesHello @lidezhu, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request refines the log puller's request handling mechanism, specifically improving the reliability and correctness of "stop-table tasks." It ensures that these critical deregistration requests are always processed by the system, even under contention, and that the internal state of the request cache accurately reflects the lifecycle of these special tasks. Additionally, it introduces a new flag to support BDR mode filtering. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a more explicit mechanism for handling "stop-table tasks" and corrects the pending request count by adding a finishPoppedNotSent method. These changes improve the robustness of table stopping logic. The modifications are logical and are accompanied by corresponding tests. My review includes suggestions to simplify some of the new logic by removing redundancy and duplication, which will enhance code clarity and maintainability.
| if region.regionInfo.isStopTableTask() || region.regionInfo.isStopped() { | ||
| // Stop-table tasks (and unexpected stopped requests) don't need a grpc stream here. | ||
| // They are either handled on an existing stream, or can be safely dropped when no stream exists. | ||
| worker.requestCache.finishPoppedNotSent() | ||
| continue | ||
| } | ||
| if !region.regionInfo.isStopped() { | ||
| worker.preFetchForConnecting = new(regionInfo) | ||
| *worker.preFetchForConnecting = region.regionInfo | ||
| return nil | ||
| } else { | ||
| continue | ||
| } |
There was a problem hiding this comment.
The logic here can be simplified. The condition region.regionInfo.isStopTableTask() || region.regionInfo.isStopped() is equivalent to region.regionInfo.isStopped(), because isStopTableTask() implies isStopped(). Also, the subsequent if !region.regionInfo.isStopped() check becomes redundant. The logic can be rewritten as a simple if/else structure for better clarity.
if region.regionInfo.isStopped() {
// Stop-table tasks (and unexpected stopped requests) don't need a grpc stream here.
// They are either handled on an existing stream, or can be safely dropped when no stream exists.
worker.requestCache.finishPoppedNotSent()
continue
}
worker.preFetchForConnecting = new(regionInfo)
*worker.preFetchForConnecting = region.regionInfo
return nil|
[FORMAT CHECKER NOTIFICATION] Notice: To remove the 📖 For more info, you can check the "Contribute Code" section in the development guide. |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@logservice/logpuller/region_request_worker.go`:
- Around line 91-102: The isStopped() check is redundant because the earlier
guard already filtered out stopped regions; remove the outer if
(!region.regionInfo.isStopped()) and directly set worker.preFetchForConnecting =
new(regionInfo), copy region.regionInfo into *worker.preFetchForConnecting, and
return nil; keep the existing branch that calls
worker.requestCache.finishPoppedNotSent() for isStopTableTask/isStopped and
leave the rest of the loop unchanged so there is no unreachable fall-through.
🧹 Nitpick comments (3)
logservice/logpuller/subscription_client.go (1)
362-363: Consider initializingfilterLoopinsidenewSubscribedSpaninstead of post-hoc assignment.
filterLoopis set on the returned*subscribedSpanright after construction, butnewSubscribedSpanalready accepts all other configuration. Adding abdrMode boolparameter tonewSubscribedSpan(and setting the field there) would keep the struct initialization self-contained and prevent the field from being accidentally forgotten when creating asubscribedSpanelsewhere.logservice/logpuller/region_req_cache.go (2)
128-137: ThemustSucceedguard on lines 130-132 is unreachable.When
mustSucceedis true,forceis also true (sincemustSucceed = force && region.isStopTableTask()). The outer condition at Line 102 (current < c.maxPendingCount || force) always evaluates to true whenforceis true, so execution never reaches this secondselectblock for stop-table tasks. The guard is harmless but dead code.
323-331: Pre-existing:decPendingCounthas a TOCTOU race betweenLoad()andDec().Two concurrent callers can both observe
current=1, both pass thenewCount < 0check, and both callDec(), resulting inpendingCount = -1. The periodic correction inclearStaleRequest(Line 276-278) mitigates this, but aCompareAndSwaploop would be more robust. Not blocking since this is pre-existing code.♻️ Suggested atomic-safe decrement
func (c *requestCache) decPendingCount() { - // Ensure pendingCount doesn't go below 0 - current := c.pendingCount.Load() - newCount := current - int64(1) - if newCount < 0 { - c.pendingCount.Store(0) - return - } - c.pendingCount.Dec() + for { + current := c.pendingCount.Load() + if current <= 0 { + return + } + if c.pendingCount.CompareAndSwap(current, current-1) { + return + } + } }
| if region.regionInfo.isStopTableTask() || region.regionInfo.isStopped() { | ||
| // Stop-table tasks (and unexpected stopped requests) don't need a grpc stream here. | ||
| // They are either handled on an existing stream, or can be safely dropped when no stream exists. | ||
| worker.requestCache.finishPoppedNotSent() | ||
| continue | ||
| } | ||
| if !region.regionInfo.isStopped() { | ||
| worker.preFetchForConnecting = new(regionInfo) | ||
| *worker.preFetchForConnecting = region.regionInfo | ||
| return nil | ||
| } else { | ||
| continue | ||
| } | ||
| } |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Dead branch: the if !region.regionInfo.isStopped() at Line 97 is now always true.
After the guard at Line 91 filters out both isStopTableTask() and isStopped() (which is a superset), any execution reaching Line 97 is guaranteed to have isStopped() == false. The conditional is redundant and the implicit else (fall-through past Line 101) is unreachable.
Consider simplifying to remove the stale guard:
♻️ Remove redundant isStopped check
for {
region, err := worker.requestCache.pop(ctx)
if err != nil {
return err
}
if region.regionInfo.isStopTableTask() || region.regionInfo.isStopped() {
// Stop-table tasks (and unexpected stopped requests) don't need a grpc stream here.
// They are either handled on an existing stream, or can be safely dropped when no stream exists.
worker.requestCache.finishPoppedNotSent()
continue
}
- if !region.regionInfo.isStopped() {
- worker.preFetchForConnecting = new(regionInfo)
- *worker.preFetchForConnecting = region.regionInfo
- return nil
- }
+ worker.preFetchForConnecting = new(regionInfo)
+ *worker.preFetchForConnecting = region.regionInfo
+ return nil
}🤖 Prompt for AI Agents
In `@logservice/logpuller/region_request_worker.go` around lines 91 - 102, The
isStopped() check is redundant because the earlier guard already filtered out
stopped regions; remove the outer if (!region.regionInfo.isStopped()) and
directly set worker.preFetchForConnecting = new(regionInfo), copy
region.regionInfo into *worker.preFetchForConnecting, and return nil; keep the
existing branch that calls worker.requestCache.finishPoppedNotSent() for
isStopTableTask/isStopped and leave the rest of the loop unchanged so there is
no unreachable fall-through.
What problem does this PR solve?
Issue Number: close #xxx
What is changed and how it works?
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note
Summary by CodeRabbit
Release Notes
Improvements
Tests