-
Notifications
You must be signed in to change notification settings - Fork 38
[WIP] #4170
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[WIP] #4170
Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
Summary of ChangesHello @lidezhu, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request focuses on enhancing the accuracy and reliability of the Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
📝 WalkthroughWalkthroughThis PR fixes pending count leaks in the region request cache by introducing a Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Poem
🚥 Pre-merge checks | ❌ 3❌ Failed checks (2 warnings, 1 inconclusive)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a robust mechanism to prevent pendingCount leaks in the requestCache by adding a markDropped method. This method is correctly applied in various scenarios where a region request is popped but not sent, such as for stop tasks or when a send operation fails. The logic for correcting the pending count in clearStaleRequest has also been improved to be more accurate while tolerating in-flight requests. The accompanying tests are well-designed and validate the fixes effectively. Overall, the changes are of high quality and correctly address the potential for counter inconsistency.
|
[FORMAT CHECKER NOTIFICATION] Notice: To remove the 📖 For more info, you can check the "Contribute Code" section in the development guide. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
logservice/logpuller/region_req_cache.go (1)
323-331:⚠️ Potential issue | 🔴 Critical
decPendingCounthas a TOCTOU race that can drivependingCountnegative.
Load()→ check →Dec()is not atomic. Two concurrentmarkDroppedcalls (or any concurrentdecPendingCountcallers) can both read the same value and both decrement, violating the>= 0invariant. Use a CAS loop instead.🔒 Proposed fix: atomic CAS loop
func (c *requestCache) decPendingCount() { - // Ensure pendingCount doesn't go below 0 - current := c.pendingCount.Load() - newCount := current - int64(1) - if newCount < 0 { - c.pendingCount.Store(0) - return - } - c.pendingCount.Dec() + for { + current := c.pendingCount.Load() + if current <= 0 { + return + } + if c.pendingCount.CAS(current, current-1) { + return + } + } }
🤖 Fix all issues with AI agents
In `@logservice/logpuller/region_req_cache.go`:
- Around line 270-278: The correction in clearStaleRequest is racy because it
reads len(c.pendingQueue) and c.pendingCount separately without a consistent
lock; fix it by making the comparison and potential fix under a single
synchronization point (e.g., acquire sentRequests.Lock around reading
len(c.pendingQueue) and pendingCount or otherwise atomically snapshot both
values), and change the behavior when the delta is 1 to only log a warning
instead of blindly calling c.pendingCount.Store(actualReqCount); ensure
references to clearStaleRequest, c.pendingQueue, c.pendingCount,
sentRequests.Lock, and the markDropped/add code paths are considered so
concurrent updates won't cause incorrect corrections.
🧹 Nitpick comments (2)
logservice/logpuller/region_request_worker.go (1)
397-406:addRegionStatebeforedoSendleaves an orphaned state on send failure.At line 400,
addRegionStateregisters the state, then ifdoSendfails at line 401, the function returns without removing it. The state is eventually cleaned up byclearRegionStatesin the outer retry loop, so this isn't a functional bug — but it means the region briefly exists inrequestedRegionsdespite never being sent. If a stale event arrives for this region ID during that window, it would be dispatched to a state that was never actually subscribed on TiKV.Consider swapping the order: call
doSendfirst, thenaddRegionState+markSentonly on success.♻️ Proposed reorder
} else { state := newRegionFeedState(region, uint64(subID), s) state.start() - s.addRegionState(subID, region.verID.GetID(), state) if err := doSend(s.createRegionRequest(region)); err != nil { + state.markStopped(&sendRequestToStoreErr{}) s.requestCache.markDropped() return err } + s.addRegionState(subID, region.verID.GetID(), state) s.requestCache.markSent(regionReq) }logservice/logpuller/region_request_worker_test.go (1)
154-162: Consider adding a timeout when reading fromerrChto prevent a hung test.If
processRegionSendTaskunexpectedly blocks after the context is canceled, this test will hang indefinitely at line 160. Aselectwith a deadline (similar to thesendCalledwait above) would make the failure mode explicit.♻️ Suggested timeout guard
- err := <-errCh - require.Error(t, err) - require.Equal(t, int64(0), worker.requestCache.pendingCount.Load()) + select { + case err := <-errCh: + require.Error(t, err) + require.Equal(t, int64(0), worker.requestCache.pendingCount.Load()) + case <-time.After(5 * time.Second): + t.Fatal("processRegionSendTask did not return in time") + }
| actualReqCount := int64(reqCount) + int64(len(c.pendingQueue)) | ||
| pendingCount := c.pendingCount.Load() | ||
| // One request can be "in flight" (popped from pendingQueue but not yet marked sent), | ||
| // so we tolerate a small mismatch to avoid false corrections. | ||
| if pendingCount < actualReqCount || pendingCount-actualReqCount > 1 { | ||
| log.Info("region worker pending request count is not equal to actual region request count, correct it", | ||
| zap.Int("pendingCount", int(pendingCount)), | ||
| zap.Int64("actualReqCount", actualReqCount)) | ||
| c.pendingCount.Store(actualReqCount) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Normalization logic is reasonable but the correction itself is racy.
clearStaleRequest reads pendingCount (line 271) and len(c.pendingQueue) (line 270) without holding any synchronization that covers both the atomic counter and the channel length together. A concurrent markDropped/add call between these two reads can cause an incorrect correction. Since sentRequests.Lock is held, only the queue side is unprotected.
Additionally, the tolerance comment says "up to 1" for an in-flight request, but if pendingCount < actualReqCount (line 274, first branch) the correction blindly stores actualReqCount, which may be stale by then. Given this runs on a 10-second timer and is a safety net rather than primary accounting, the impact is low, but consider logging a warning rather than auto-correcting when the mismatch is small.
🤖 Prompt for AI Agents
In `@logservice/logpuller/region_req_cache.go` around lines 270 - 278, The
correction in clearStaleRequest is racy because it reads len(c.pendingQueue) and
c.pendingCount separately without a consistent lock; fix it by making the
comparison and potential fix under a single synchronization point (e.g., acquire
sentRequests.Lock around reading len(c.pendingQueue) and pendingCount or
otherwise atomically snapshot both values), and change the behavior when the
delta is 1 to only log a warning instead of blindly calling
c.pendingCount.Store(actualReqCount); ensure references to clearStaleRequest,
c.pendingQueue, c.pendingCount, sentRequests.Lock, and the markDropped/add code
paths are considered so concurrent updates won't cause incorrect corrections.
What problem does this PR solve?
Issue Number: close #xxx
What is changed and how it works?
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note
Summary by CodeRabbit
Bug Fixes
Tests