Skip to content
Open

[WIP] #4194

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 24 additions & 6 deletions logservice/logpuller/region_req_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,14 @@ func newRequestCache(maxPendingCount int) *requestCache {
}

// add adds a new region request to the cache
// It blocks if pendingCount >= maxPendingCount until there's space or ctx is cancelled
// It returns (false, nil) when it fails to add within a short retry window so that the caller
// can reschedule the request (used by non-stop region requests).
func (c *requestCache) add(ctx context.Context, region regionInfo, force bool) (bool, error) {
start := time.Now()
ticker := time.NewTicker(addReqRetryInterval)
defer ticker.Stop()
addReqRetryLimit := addReqRetryLimit
retryRemaining := addReqRetryLimit
mustSucceed := force && region.isStopTableTask()

for {
current := c.pendingCount.Load()
Expand All @@ -111,8 +113,11 @@ func (c *requestCache) add(ctx context.Context, region regionInfo, force bool) (
case <-c.spaceAvailable:
continue
case <-ticker.C:
addReqRetryLimit--
if addReqRetryLimit <= 0 {
if mustSucceed {
continue
}
retryRemaining--
if retryRemaining <= 0 {
return false, nil
}
continue
Expand All @@ -122,8 +127,11 @@ func (c *requestCache) add(ctx context.Context, region regionInfo, force bool) (
// Wait for space to become available
select {
case <-ticker.C:
addReqRetryLimit--
if addReqRetryLimit <= 0 {
if mustSucceed {
continue
}
retryRemaining--
if retryRemaining <= 0 {
return false, nil
}
continue
Expand All @@ -135,6 +143,16 @@ func (c *requestCache) add(ctx context.Context, region regionInfo, force bool) (
}
}

// finishPoppedNotSent marks a popped request as finished when it won't be tracked by sentRequests.
// It decrements pendingCount and notifies waiters.
func (c *requestCache) finishPoppedNotSent() {
c.decPendingCount()
select {
case c.spaceAvailable <- struct{}{}:
default:
}
}

// pop gets the next pending request, returns nil if queue is empty
func (c *requestCache) pop(ctx context.Context) (regionReq, error) {
select {
Expand Down
65 changes: 65 additions & 0 deletions logservice/logpuller/region_req_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,71 @@ func TestRequestCacheAdd_ContextCancellation(t *testing.T) {
require.Equal(t, context.Canceled, err)
}

func TestRequestCacheAdd_StopTableTaskMustSucceed(t *testing.T) {
cache := newRequestCache(1)
ctx := context.Background()

// Fill the cache to make the channel full.
region1 := createTestRegionInfo(1, 1)
ok, err := cache.add(ctx, region1, false)
require.True(t, ok)
require.NoError(t, err)

stopTask := regionInfo{subscribedSpan: region1.subscribedSpan}

ctx2, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()

done := make(chan struct{})
var addOK bool
var addErr error
go func() {
addOK, addErr = cache.add(ctx2, stopTask, true)
close(done)
}()

// The stop-table task should block (not fail fast) while the channel is full.
select {
case <-done:
t.Fatal("stop-table task should not fail fast when the channel is full")
case <-time.After(10 * time.Millisecond):
}

// Make space and ensure it can be added.
_, err = cache.pop(ctx)
require.NoError(t, err)

select {
case <-done:
case <-time.After(200 * time.Millisecond):
t.Fatal("stop-table task should be added after space becomes available")
}
require.True(t, addOK)
require.NoError(t, addErr)

req, err := cache.pop(ctx)
require.NoError(t, err)
require.True(t, req.regionInfo.isStopTableTask())
}

func TestRequestCacheFinishPoppedNotSent(t *testing.T) {
cache := newRequestCache(10)
ctx := context.Background()

region := createTestRegionInfo(1, 1)
ok, err := cache.add(ctx, region, false)
require.True(t, ok)
require.NoError(t, err)
require.Equal(t, 1, cache.getPendingCount())

_, err = cache.pop(ctx)
require.NoError(t, err)
require.Equal(t, 1, cache.getPendingCount())

cache.finishPoppedNotSent()
require.Equal(t, 0, cache.getPendingCount())
}

func TestRequestCacheAdd_RetryLimitExceeded(t *testing.T) {
cache := newRequestCache(1)
ctx := context.Background()
Expand Down
17 changes: 13 additions & 4 deletions logservice/logpuller/region_request_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,16 @@ func newRegionRequestWorker(
if err != nil {
return err
}
if region.regionInfo.isStopTableTask() || region.regionInfo.isStopped() {
// Stop-table tasks (and unexpected stopped requests) don't need a grpc stream here.
// They are either handled on an existing stream, or can be safely dropped when no stream exists.
worker.requestCache.finishPoppedNotSent()
continue
}
if !region.regionInfo.isStopped() {
worker.preFetchForConnecting = new(regionInfo)
*worker.preFetchForConnecting = region.regionInfo
return nil
} else {
continue
}
Comment on lines +91 to 101

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic here can be simplified. The condition region.regionInfo.isStopTableTask() || region.regionInfo.isStopped() is equivalent to region.regionInfo.isStopped(), because isStopTableTask() implies isStopped(). Also, the subsequent if !region.regionInfo.isStopped() check becomes redundant. The logic can be rewritten as a simple if/else structure for better clarity.

            if region.regionInfo.isStopped() {
				// Stop-table tasks (and unexpected stopped requests) don't need a grpc stream here.
				// They are either handled on an existing stream, or can be safely dropped when no stream exists.
				worker.requestCache.finishPoppedNotSent()
				continue
			}
			worker.preFetchForConnecting = new(regionInfo)
			*worker.preFetchForConnecting = region.regionInfo
			return nil

}
Comment on lines +91 to 102
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Dead branch: the if !region.regionInfo.isStopped() at Line 97 is now always true.

After the guard at Line 91 filters out both isStopTableTask() and isStopped() (which is a superset), any execution reaching Line 97 is guaranteed to have isStopped() == false. The conditional is redundant and the implicit else (fall-through past Line 101) is unreachable.

Consider simplifying to remove the stale guard:

♻️ Remove redundant isStopped check
 		for {
 			region, err := worker.requestCache.pop(ctx)
 			if err != nil {
 				return err
 			}
 			if region.regionInfo.isStopTableTask() || region.regionInfo.isStopped() {
 				// Stop-table tasks (and unexpected stopped requests) don't need a grpc stream here.
 				// They are either handled on an existing stream, or can be safely dropped when no stream exists.
 				worker.requestCache.finishPoppedNotSent()
 				continue
 			}
-			if !region.regionInfo.isStopped() {
-				worker.preFetchForConnecting = new(regionInfo)
-				*worker.preFetchForConnecting = region.regionInfo
-				return nil
-			}
+			worker.preFetchForConnecting = new(regionInfo)
+			*worker.preFetchForConnecting = region.regionInfo
+			return nil
 		}
🤖 Prompt for AI Agents
In `@logservice/logpuller/region_request_worker.go` around lines 91 - 102, The
isStopped() check is redundant because the earlier guard already filtered out
stopped regions; remove the outer if (!region.regionInfo.isStopped()) and
directly set worker.preFetchForConnecting = new(regionInfo), copy
region.regionInfo into *worker.preFetchForConnecting, and return nil; keep the
existing branch that calls worker.requestCache.finishPoppedNotSent() for
isStopTableTask/isStopped and leave the rest of the loop unchanged so there is
no unreachable fall-through.

}
Expand Down Expand Up @@ -134,7 +138,7 @@ func newRegionRequestWorker(
}
// The store may fail forever, so we need try to re-schedule all pending regions.
for _, region := range worker.clearPendingRegions() {
if region.isStopped() {
if region.isStopTableTask() {
// It means it's a special task for stopping the table.
continue
}
Expand Down Expand Up @@ -365,7 +369,7 @@ func (s *regionRequestWorker) processRegionSendTask(
zap.Bool("bdrMode", region.filterLoop))

// It means it's a special task for stopping the table.
if region.isStopped() {
if region.isStopTableTask() {
req := &cdcpb.ChangeDataRequest{
Header: &cdcpb.Header{ClusterId: s.client.clusterID, TicdcVersion: version.ReleaseSemver()},
RequestId: uint64(subID),
Expand All @@ -375,6 +379,7 @@ func (s *regionRequestWorker) processRegionSendTask(
FilterLoop: region.filterLoop,
}
if err := doSend(req); err != nil {
s.requestCache.finishPoppedNotSent()
return err
}
for _, state := range s.takeRegionStates(subID) {
Expand All @@ -384,17 +389,20 @@ func (s *regionRequestWorker) processRegionSendTask(
}
s.client.pushRegionEventToDS(subID, regionEvent)
}
s.requestCache.finishPoppedNotSent()

} else if region.subscribedSpan.stopped.Load() {
// It can be skipped directly because there must be no pending states from
// the stopped subscribedTable, or the special singleRegionInfo for stopping
// the table will be handled later.
s.requestCache.finishPoppedNotSent()
s.client.onRegionFail(newRegionErrorInfo(region, &sendRequestToStoreErr{}))
} else {
state := newRegionFeedState(region, uint64(subID), s)
state.start()
s.addRegionState(subID, region.verID.GetID(), state)
if err := doSend(s.createRegionRequest(region)); err != nil {
s.requestCache.finishPoppedNotSent()
return err
}
s.requestCache.markSent(regionReq)
Expand Down Expand Up @@ -484,6 +492,7 @@ func (s *regionRequestWorker) clearPendingRegions() []regionInfo {
if s.preFetchForConnecting != nil {
region := *s.preFetchForConnecting
s.preFetchForConnecting = nil
s.requestCache.finishPoppedNotSent()
regions = append(regions, region)
}

Expand Down
6 changes: 6 additions & 0 deletions logservice/logpuller/region_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ func (s *regionInfo) isStopped() bool {
return s.lockedRangeState == nil
}

func (s *regionInfo) isStopTableTask() bool {
// Stop-table task is a special regionInfo used to deregister a subscription from
// all TiKV stores. It has no locked range state and carries no region verID.
return s.lockedRangeState == nil && s.verID.GetID() == 0
}

func newRegionInfo(
verID tikv.RegionVerID,
span heartbeatpb.TableSpan,
Expand Down
6 changes: 5 additions & 1 deletion logservice/logpuller/subscription_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ type subscribedSpan struct {

// The target span
span heartbeatpb.TableSpan
// Whether to filter out the value written by cdc itself.
// It should be `true` in BDR mode.
filterLoop bool
// The range lock of the span,
// it is used to prevent duplicate requests to the same region range,
// and it also used to calculate this table's resolvedTs.
Expand Down Expand Up @@ -357,6 +360,7 @@ func (s *subscriptionClient) Subscribe(
}

rt := s.newSubscribedSpan(subID, span, startTs, consumeKVEvents, advanceResolvedTs, advanceInterval)
rt.filterLoop = bdrMode
s.totalSpans.Lock()
s.totalSpans.spanMap[subID] = rt
s.totalSpans.Unlock()
Expand Down Expand Up @@ -473,7 +477,7 @@ func (s *subscriptionClient) setTableStopped(rt *subscribedSpan) {
// Then send a special singleRegionInfo to regionRouter to deregister the table
// from all TiKV instances.
if rt.stopped.CompareAndSwap(false, true) {
s.regionTaskQueue.Push(NewRegionPriorityTask(TaskHighPrior, regionInfo{subscribedSpan: rt}, s.pdClock.CurrentTS()))
s.regionTaskQueue.Push(NewRegionPriorityTask(TaskHighPrior, regionInfo{subscribedSpan: rt, filterLoop: rt.filterLoop}, s.pdClock.CurrentTS()))
if rt.rangeLock.Stop() {
s.onTableDrained(rt)
}
Expand Down