Skip to content

Conversation

@lidezhu
Copy link
Collaborator

@lidezhu lidezhu commented Feb 8, 2026

What problem does this PR solve?

Issue Number: close #xxx

What is changed and how it works?

Check List

Tests

  • Unit test
  • Integration test
  • Manual test (add detailed scripts or steps below)
  • No code

Questions

Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?

Release note

Please refer to [Release Notes Language Style Guide](https://pingcap.github.io/tidb-dev-guide/contribute-to-tidb/release-notes-style-guide.html) to write a quality release note.

If you don't think this PR needs a release note then fill it with `None`.

Summary by CodeRabbit

  • Bug Fixes

    • Improved handling of dropped requests to prevent resource tracking inconsistencies.
    • Enhanced cleanup mechanisms for failed region requests to avoid resource leaks.
    • Strengthened error recovery paths during request processing.
  • Tests

    • Expanded test coverage for error handling and resource cleanup scenarios.

@ti-chi-bot ti-chi-bot bot added do-not-merge/needs-linked-issue do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. release-note Denotes a PR that will be considered when it comes time to generate release notes. labels Feb 8, 2026
@ti-chi-bot
Copy link

ti-chi-bot bot commented Feb 8, 2026

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign lidezhu for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@ti-chi-bot ti-chi-bot bot added the size/L Denotes a PR that changes 100-499 lines, ignoring generated files. label Feb 8, 2026
@gemini-code-assist
Copy link

Summary of Changes

Hello @lidezhu, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request focuses on enhancing the accuracy and reliability of the requestCache within the logpuller service by addressing potential discrepancies in the pendingCount. It introduces a dedicated mechanism to account for requests that are initiated but not fully sent or tracked, thereby preventing resource leaks and ensuring the system's state remains consistent. The changes also include improvements to the count correction logic and new tests to validate these critical behaviors.

Highlights

  • Pending Request Count Management: Introduced a new markDropped() method in requestCache to explicitly decrement the pending request count when requests are not sent or fully processed, preventing resource leaks.
  • Robust Error Handling: Integrated markDropped() calls across regionRequestWorker to ensure correct pendingCount updates in various scenarios, including send failures, processing stop tasks, and clearing pre-fetched regions.
  • Improved Count Correction Logic: Refined the clearStaleRequest() method to more accurately correct the pendingCount, now accounting for 'in-flight' requests to avoid false corrections.
  • Enhanced Test Coverage: Added new unit tests to verify that the pendingCount is correctly managed and does not leak under error conditions or when handling stop tasks.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • logservice/logpuller/region_req_cache.go
    • Added markDropped() method to requestCache for handling requests that are not sent or tracked.
    • Modified clearStaleRequest() to improve pendingCount correction, now considering in-flight requests.
  • logservice/logpuller/region_request_worker.go
    • Added calls to requestCache.markDropped() in newRegionRequestWorker when a pre-fetched region is not sent.
    • Added calls to requestCache.markDropped() in processRegionSendTask for send failures, stop tasks, and region failures.
    • Added calls to requestCache.markDropped() in clearPendingRegions when pre-fetched regions are cleared.
  • logservice/logpuller/region_request_worker_test.go
    • Added necessary imports for new test cases, including context, errors, io, time, cdcpb, metapb, heartbeatpb, regionlock, tikv, and metadata.
    • Introduced fakeEventFeedV2Client for mocking gRPC client behavior in tests.
    • Added TestRegionRequestWorkerSendErrorDoesNotLeakPendingCount to verify pending count handling on send errors.
    • Added TestRegionRequestWorkerStopTaskDoesNotLeakPendingCount to verify pending count handling for stop tasks.
Activity
  • The pull request is currently marked as '[WIP]', indicating it is a work in progress and may not yet be ready for final review.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@coderabbitai
Copy link

coderabbitai bot commented Feb 8, 2026

📝 Walkthrough

Walkthrough

This PR fixes pending count leaks in the region request cache by introducing a markDropped() method and invoking it in multiple code paths where region requests are finished. The request cache also gains normalization logic to reconcile pending counts with actual in-flight requests.

Changes

Cohort / File(s) Summary
Request Cache Updates
logservice/logpuller/region_req_cache.go
Added markDropped() method to decrement pending count and signal space availability. Updated clearStaleRequest() to normalize pending counts based on actual request counts, tolerating up to ±1 mismatch and logging mismatches for debugging.
Region Request Worker
logservice/logpuller/region_request_worker.go
Integrated markDropped() calls across multiple error/stop code paths: pre-fetch loop when encountering stopped regions, send task processing (stop requests, already-stopped regions, send errors), and pending region clearing to prevent count leaks.
Test Coverage
logservice/logpuller/region_request_worker_test.go
Added fakeEventFeedV2Client mock implementation and two tests validating pending count cleanup: one for send errors and one for stop path cancellations via context hooks.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Poem

🐰 A count that leaked, now fixed with care,
Dropped requests marked with space to spare,
Each path now calls to clean the state,
No pending ghosts that haunt our fate!
📊✨

🚥 Pre-merge checks | ❌ 3
❌ Failed checks (2 warnings, 1 inconclusive)
Check name Status Explanation Resolution
Description check ⚠️ Warning The description is entirely a template with placeholder values (Issue Number: close #xxx, empty sections). Required fields like 'What is changed and how it works?' and 'Release note' are unfilled. Complete all required sections: link to actual issue, describe changes and implementation details, answer questions about compatibility and documentation, and provide a release note.
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Title check ❓ Inconclusive The title '[WIP]' is vague and generic, using a work-in-progress marker that does not convey meaningful information about the actual changes made to the codebase. Replace '[WIP]' with a clear, specific title that describes the main change, such as 'Fix pending count leak in region request cache' or similar.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch ldz/fix-puller0208

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a robust mechanism to prevent pendingCount leaks in the requestCache by adding a markDropped method. This method is correctly applied in various scenarios where a region request is popped but not sent, such as for stop tasks or when a send operation fails. The logic for correcting the pending count in clearStaleRequest has also been improved to be more accurate while tolerating in-flight requests. The accompanying tests are well-designed and validate the fixes effectively. Overall, the changes are of high quality and correctly address the potential for counter inconsistency.

@ti-chi-bot
Copy link

ti-chi-bot bot commented Feb 8, 2026

[FORMAT CHECKER NOTIFICATION]

Notice: To remove the do-not-merge/needs-linked-issue label, please provide the linked issue number on one line in the PR body, for example: Issue Number: close #123 or Issue Number: ref #456.

📖 For more info, you can check the "Contribute Code" section in the development guide.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
logservice/logpuller/region_req_cache.go (1)

323-331: ⚠️ Potential issue | 🔴 Critical

decPendingCount has a TOCTOU race that can drive pendingCount negative.

Load() → check → Dec() is not atomic. Two concurrent markDropped calls (or any concurrent decPendingCount callers) can both read the same value and both decrement, violating the >= 0 invariant. Use a CAS loop instead.

🔒 Proposed fix: atomic CAS loop
 func (c *requestCache) decPendingCount() {
-	// Ensure pendingCount doesn't go below 0
-	current := c.pendingCount.Load()
-	newCount := current - int64(1)
-	if newCount < 0 {
-		c.pendingCount.Store(0)
-		return
-	}
-	c.pendingCount.Dec()
+	for {
+		current := c.pendingCount.Load()
+		if current <= 0 {
+			return
+		}
+		if c.pendingCount.CAS(current, current-1) {
+			return
+		}
+	}
 }
🤖 Fix all issues with AI agents
In `@logservice/logpuller/region_req_cache.go`:
- Around line 270-278: The correction in clearStaleRequest is racy because it
reads len(c.pendingQueue) and c.pendingCount separately without a consistent
lock; fix it by making the comparison and potential fix under a single
synchronization point (e.g., acquire sentRequests.Lock around reading
len(c.pendingQueue) and pendingCount or otherwise atomically snapshot both
values), and change the behavior when the delta is 1 to only log a warning
instead of blindly calling c.pendingCount.Store(actualReqCount); ensure
references to clearStaleRequest, c.pendingQueue, c.pendingCount,
sentRequests.Lock, and the markDropped/add code paths are considered so
concurrent updates won't cause incorrect corrections.
🧹 Nitpick comments (2)
logservice/logpuller/region_request_worker.go (1)

397-406: addRegionState before doSend leaves an orphaned state on send failure.

At line 400, addRegionState registers the state, then if doSend fails at line 401, the function returns without removing it. The state is eventually cleaned up by clearRegionStates in the outer retry loop, so this isn't a functional bug — but it means the region briefly exists in requestedRegions despite never being sent. If a stale event arrives for this region ID during that window, it would be dispatched to a state that was never actually subscribed on TiKV.

Consider swapping the order: call doSend first, then addRegionState + markSent only on success.

♻️ Proposed reorder
 		} else {
 			state := newRegionFeedState(region, uint64(subID), s)
 			state.start()
-			s.addRegionState(subID, region.verID.GetID(), state)
 			if err := doSend(s.createRegionRequest(region)); err != nil {
+				state.markStopped(&sendRequestToStoreErr{})
 				s.requestCache.markDropped()
 				return err
 			}
+			s.addRegionState(subID, region.verID.GetID(), state)
 			s.requestCache.markSent(regionReq)
 		}
logservice/logpuller/region_request_worker_test.go (1)

154-162: Consider adding a timeout when reading from errCh to prevent a hung test.

If processRegionSendTask unexpectedly blocks after the context is canceled, this test will hang indefinitely at line 160. A select with a deadline (similar to the sendCalled wait above) would make the failure mode explicit.

♻️ Suggested timeout guard
-	err := <-errCh
-	require.Error(t, err)
-	require.Equal(t, int64(0), worker.requestCache.pendingCount.Load())
+	select {
+	case err := <-errCh:
+		require.Error(t, err)
+		require.Equal(t, int64(0), worker.requestCache.pendingCount.Load())
+	case <-time.After(5 * time.Second):
+		t.Fatal("processRegionSendTask did not return in time")
+	}

Comment on lines +270 to +278
actualReqCount := int64(reqCount) + int64(len(c.pendingQueue))
pendingCount := c.pendingCount.Load()
// One request can be "in flight" (popped from pendingQueue but not yet marked sent),
// so we tolerate a small mismatch to avoid false corrections.
if pendingCount < actualReqCount || pendingCount-actualReqCount > 1 {
log.Info("region worker pending request count is not equal to actual region request count, correct it",
zap.Int("pendingCount", int(pendingCount)),
zap.Int64("actualReqCount", actualReqCount))
c.pendingCount.Store(actualReqCount)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Normalization logic is reasonable but the correction itself is racy.

clearStaleRequest reads pendingCount (line 271) and len(c.pendingQueue) (line 270) without holding any synchronization that covers both the atomic counter and the channel length together. A concurrent markDropped/add call between these two reads can cause an incorrect correction. Since sentRequests.Lock is held, only the queue side is unprotected.

Additionally, the tolerance comment says "up to 1" for an in-flight request, but if pendingCount < actualReqCount (line 274, first branch) the correction blindly stores actualReqCount, which may be stale by then. Given this runs on a 10-second timer and is a safety net rather than primary accounting, the impact is low, but consider logging a warning rather than auto-correcting when the mismatch is small.

🤖 Prompt for AI Agents
In `@logservice/logpuller/region_req_cache.go` around lines 270 - 278, The
correction in clearStaleRequest is racy because it reads len(c.pendingQueue) and
c.pendingCount separately without a consistent lock; fix it by making the
comparison and potential fix under a single synchronization point (e.g., acquire
sentRequests.Lock around reading len(c.pendingQueue) and pendingCount or
otherwise atomically snapshot both values), and change the behavior when the
delta is 1 to only log a warning instead of blindly calling
c.pendingCount.Store(actualReqCount); ensure references to clearStaleRequest,
c.pendingQueue, c.pendingCount, sentRequests.Lock, and the markDropped/add code
paths are considered so concurrent updates won't cause incorrect corrections.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

do-not-merge/needs-linked-issue do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. release-note Denotes a PR that will be considered when it comes time to generate release notes. size/L Denotes a PR that changes 100-499 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant