Skip to content

[WIP]#4194

Open
lidezhu wants to merge 1 commit intomasterfrom
ldz/fix-log-puller0211
Open

[WIP]#4194
lidezhu wants to merge 1 commit intomasterfrom
ldz/fix-log-puller0211

Conversation

@lidezhu
Copy link
Collaborator

@lidezhu lidezhu commented Feb 11, 2026

What problem does this PR solve?

Issue Number: close #xxx

What is changed and how it works?

Check List

Tests

  • Unit test
  • Integration test
  • Manual test (add detailed scripts or steps below)
  • No code

Questions

Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?

Release note

Please refer to [Release Notes Language Style Guide](https://pingcap.github.io/tidb-dev-guide/contribute-to-tidb/release-notes-style-guide.html) to write a quality release note.

If you don't think this PR needs a release note then fill it with `None`.

Summary by CodeRabbit

Release Notes

  • Improvements

    • Enhanced retry logic for request enqueuing with configurable retry windows and improved failure handling.
    • Improved handling of table stop operations to wait for available space rather than failing immediately.
    • Better flow control for pending requests with new tracking mechanisms.
  • Tests

    • Added comprehensive tests for table stop task behavior and request cache operations.

@ti-chi-bot ti-chi-bot bot added do-not-merge/needs-linked-issue release-note Denotes a PR that will be considered when it comes time to generate release notes. labels Feb 11, 2026
@ti-chi-bot
Copy link

ti-chi-bot bot commented Feb 11, 2026

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign lidezhu for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@ti-chi-bot ti-chi-bot bot added the size/L Denotes a PR that changes 100-499 lines, ignoring generated files. label Feb 11, 2026
@lidezhu lidezhu changed the title fix log puller [WIP] Feb 11, 2026
@ti-chi-bot ti-chi-bot bot added the do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. label Feb 11, 2026
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 11, 2026

📝 Walkthrough

Walkthrough

The changes introduce stop-table task handling with a short retry window in the request cache's add operation, a new finishPoppedNotSent method for flow control notifications, and systematic support for identifying and processing deregistration tasks throughout the region request worker workflow.

Changes

Cohort / File(s) Summary
Retry Window and Flow Control
logservice/logpuller/region_req_cache.go
Introduces retryRemaining counter driven by addReqRetryInterval and mustSucceed guard based on force and region.isStopTableTask(). Adds finishPoppedNotSent() method to decrement pending count and notify waiters when popped requests won't be tracked.
Stop-Table Task Identification
logservice/logpuller/region_state.go
Adds isStopTableTask() helper method that identifies deregistration tasks as regionInfo with nil lockedRangeState and verID having ID 0.
Subscription Configuration
logservice/logpuller/subscription_client.go
Introduces filterLoop field to subscribedSpan struct, initialized from bdrMode parameter, and propagates it through regionInfo when pushing stop-table tasks.
Worker Flow Integration
logservice/logpuller/region_request_worker.go
Integrates stop-table task handling across waitForPreFetching, newRegionRequestWorker, and processRegionSendTask by checking isStopTableTask() and invoking finishPoppedNotSent() at appropriate points to skip grpc stream establishment and manage flow control.
Test Coverage
logservice/logpuller/region_req_cache_test.go
Adds TestRequestCacheAdd_StopTableTaskMustSucceed to validate stop-table tasks' blocking behavior and proper enqueuing; adds TestRequestCacheFinishPoppedNotSent to verify pending count reset after pop and markSent sequence.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Poem

🐰 A cache that remembers when to retry,
When tables must stop, we won't pass them by,
With finishPoppedNotSent to set spirits free,
Flow control dancing like carrots and tea! 🥕✨

🚥 Pre-merge checks | ❌ 3
❌ Failed checks (2 warnings, 1 inconclusive)
Check name Status Explanation Resolution
Description check ⚠️ Warning The PR description contains only the template structure with unfilled placeholders; the issue number, problem statement, implementation details, test results, and release notes are all missing or incomplete. Complete the description by filling in the issue number, explaining the problem being solved, detailing the implementation, confirming which tests were added/run, and providing a meaningful release note.
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Title check ❓ Inconclusive The pull request title '[WIP]' is vague and generic, providing no meaningful information about the actual changes made to the codebase. Replace '[WIP]' with a descriptive title that summarizes the main change, such as 'Add stop-table task retry logic and flow control to region request cache' or similar.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch ldz/fix-log-puller0211

Tip

Issue Planner is now in beta. Read the docs and try it out! Share your feedback on Discord.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@gemini-code-assist
Copy link

Summary of Changes

Hello @lidezhu, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request refines the log puller's request handling mechanism, specifically improving the reliability and correctness of "stop-table tasks." It ensures that these critical deregistration requests are always processed by the system, even under contention, and that the internal state of the request cache accurately reflects the lifecycle of these special tasks. Additionally, it introduces a new flag to support BDR mode filtering.

Highlights

  • Enhanced Stop-Table Task Handling: Introduced a distinct isStopTableTask method to specifically identify special deregistration requests, differentiating them from general stopped regions.
  • Guaranteed Request Cache Addition for Stop-Table Tasks: Modified the request cache's add method to ensure that 'stop-table tasks' are eventually added, even under high load, by retrying indefinitely until space is available or the context is cancelled.
  • Accurate Pending Count Management: Implemented a new finishPoppedNotSent method in the request cache to correctly decrement pending counts for requests that are popped but do not result in a gRPC stream being established (e.g., stop-table tasks).
  • Refined Region Request Worker Logic: Updated the region request worker to leverage isStopTableTask for specific processing and to call finishPoppedNotSent for tasks that do not require a gRPC stream, ensuring accurate cache state.
  • BDR Mode Filtering Support: Added a filterLoop flag to the subscribedSpan struct and ensured its propagation to stop-table tasks, likely to support filtering in BDR (Bidirectional Replication) mode.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • logservice/logpuller/region_req_cache.go
    • Changed add function's retry logic to ensure "stop-table tasks" always succeed.
    • Added finishPoppedNotSent method to manage pending request count for unsent requests.
  • logservice/logpuller/region_req_cache_test.go
    • Added unit tests for the new "stop-table task" retry behavior and finishPoppedNotSent method.
  • logservice/logpuller/region_request_worker.go
    • Updated request processing to differentiate and correctly handle "stop-table tasks" and other stopped regions.
    • Integrated finishPoppedNotSent calls to maintain accurate request cache state.
  • logservice/logpuller/region_state.go
    • Introduced isStopTableTask method to specifically identify stop-table requests.
  • logservice/logpuller/subscription_client.go
    • Added filterLoop field to subscribedSpan and propagated it to stop-table tasks.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a more explicit mechanism for handling "stop-table tasks" and corrects the pending request count by adding a finishPoppedNotSent method. These changes improve the robustness of table stopping logic. The modifications are logical and are accompanied by corresponding tests. My review includes suggestions to simplify some of the new logic by removing redundancy and duplication, which will enhance code clarity and maintainability.

Comment on lines +91 to 101
if region.regionInfo.isStopTableTask() || region.regionInfo.isStopped() {
// Stop-table tasks (and unexpected stopped requests) don't need a grpc stream here.
// They are either handled on an existing stream, or can be safely dropped when no stream exists.
worker.requestCache.finishPoppedNotSent()
continue
}
if !region.regionInfo.isStopped() {
worker.preFetchForConnecting = new(regionInfo)
*worker.preFetchForConnecting = region.regionInfo
return nil
} else {
continue
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic here can be simplified. The condition region.regionInfo.isStopTableTask() || region.regionInfo.isStopped() is equivalent to region.regionInfo.isStopped(), because isStopTableTask() implies isStopped(). Also, the subsequent if !region.regionInfo.isStopped() check becomes redundant. The logic can be rewritten as a simple if/else structure for better clarity.

            if region.regionInfo.isStopped() {
				// Stop-table tasks (and unexpected stopped requests) don't need a grpc stream here.
				// They are either handled on an existing stream, or can be safely dropped when no stream exists.
				worker.requestCache.finishPoppedNotSent()
				continue
			}
			worker.preFetchForConnecting = new(regionInfo)
			*worker.preFetchForConnecting = region.regionInfo
			return nil

@ti-chi-bot
Copy link

ti-chi-bot bot commented Feb 11, 2026

[FORMAT CHECKER NOTIFICATION]

Notice: To remove the do-not-merge/needs-linked-issue label, please provide the linked issue number on one line in the PR body, for example: Issue Number: close #123 or Issue Number: ref #456.

📖 For more info, you can check the "Contribute Code" section in the development guide.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@logservice/logpuller/region_request_worker.go`:
- Around line 91-102: The isStopped() check is redundant because the earlier
guard already filtered out stopped regions; remove the outer if
(!region.regionInfo.isStopped()) and directly set worker.preFetchForConnecting =
new(regionInfo), copy region.regionInfo into *worker.preFetchForConnecting, and
return nil; keep the existing branch that calls
worker.requestCache.finishPoppedNotSent() for isStopTableTask/isStopped and
leave the rest of the loop unchanged so there is no unreachable fall-through.
🧹 Nitpick comments (3)
logservice/logpuller/subscription_client.go (1)

362-363: Consider initializing filterLoop inside newSubscribedSpan instead of post-hoc assignment.

filterLoop is set on the returned *subscribedSpan right after construction, but newSubscribedSpan already accepts all other configuration. Adding a bdrMode bool parameter to newSubscribedSpan (and setting the field there) would keep the struct initialization self-contained and prevent the field from being accidentally forgotten when creating a subscribedSpan elsewhere.

logservice/logpuller/region_req_cache.go (2)

128-137: The mustSucceed guard on lines 130-132 is unreachable.

When mustSucceed is true, force is also true (since mustSucceed = force && region.isStopTableTask()). The outer condition at Line 102 (current < c.maxPendingCount || force) always evaluates to true when force is true, so execution never reaches this second select block for stop-table tasks. The guard is harmless but dead code.


323-331: Pre-existing: decPendingCount has a TOCTOU race between Load() and Dec().

Two concurrent callers can both observe current=1, both pass the newCount < 0 check, and both call Dec(), resulting in pendingCount = -1. The periodic correction in clearStaleRequest (Line 276-278) mitigates this, but a CompareAndSwap loop would be more robust. Not blocking since this is pre-existing code.

♻️ Suggested atomic-safe decrement
 func (c *requestCache) decPendingCount() {
-	// Ensure pendingCount doesn't go below 0
-	current := c.pendingCount.Load()
-	newCount := current - int64(1)
-	if newCount < 0 {
-		c.pendingCount.Store(0)
-		return
-	}
-	c.pendingCount.Dec()
+	for {
+		current := c.pendingCount.Load()
+		if current <= 0 {
+			return
+		}
+		if c.pendingCount.CompareAndSwap(current, current-1) {
+			return
+		}
+	}
 }

Comment on lines +91 to 102
if region.regionInfo.isStopTableTask() || region.regionInfo.isStopped() {
// Stop-table tasks (and unexpected stopped requests) don't need a grpc stream here.
// They are either handled on an existing stream, or can be safely dropped when no stream exists.
worker.requestCache.finishPoppedNotSent()
continue
}
if !region.regionInfo.isStopped() {
worker.preFetchForConnecting = new(regionInfo)
*worker.preFetchForConnecting = region.regionInfo
return nil
} else {
continue
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Dead branch: the if !region.regionInfo.isStopped() at Line 97 is now always true.

After the guard at Line 91 filters out both isStopTableTask() and isStopped() (which is a superset), any execution reaching Line 97 is guaranteed to have isStopped() == false. The conditional is redundant and the implicit else (fall-through past Line 101) is unreachable.

Consider simplifying to remove the stale guard:

♻️ Remove redundant isStopped check
 		for {
 			region, err := worker.requestCache.pop(ctx)
 			if err != nil {
 				return err
 			}
 			if region.regionInfo.isStopTableTask() || region.regionInfo.isStopped() {
 				// Stop-table tasks (and unexpected stopped requests) don't need a grpc stream here.
 				// They are either handled on an existing stream, or can be safely dropped when no stream exists.
 				worker.requestCache.finishPoppedNotSent()
 				continue
 			}
-			if !region.regionInfo.isStopped() {
-				worker.preFetchForConnecting = new(regionInfo)
-				*worker.preFetchForConnecting = region.regionInfo
-				return nil
-			}
+			worker.preFetchForConnecting = new(regionInfo)
+			*worker.preFetchForConnecting = region.regionInfo
+			return nil
 		}
🤖 Prompt for AI Agents
In `@logservice/logpuller/region_request_worker.go` around lines 91 - 102, The
isStopped() check is redundant because the earlier guard already filtered out
stopped regions; remove the outer if (!region.regionInfo.isStopped()) and
directly set worker.preFetchForConnecting = new(regionInfo), copy
region.regionInfo into *worker.preFetchForConnecting, and return nil; keep the
existing branch that calls worker.requestCache.finishPoppedNotSent() for
isStopTableTask/isStopped and leave the rest of the loop unchanged so there is
no unreachable fall-through.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

do-not-merge/needs-linked-issue do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. release-note Denotes a PR that will be considered when it comes time to generate release notes. size/L Denotes a PR that changes 100-499 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant