From 507c7aa5f3e37bf64e776e6be9f2299b2243c1e8 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Wed, 11 Feb 2026 21:39:52 +0800 Subject: [PATCH] fix log puller --- logservice/logpuller/region_req_cache.go | 30 +++++++-- logservice/logpuller/region_req_cache_test.go | 65 +++++++++++++++++++ logservice/logpuller/region_request_worker.go | 17 +++-- logservice/logpuller/region_state.go | 6 ++ logservice/logpuller/subscription_client.go | 6 +- 5 files changed, 113 insertions(+), 11 deletions(-) diff --git a/logservice/logpuller/region_req_cache.go b/logservice/logpuller/region_req_cache.go index 2a01d78026..7d66f969d3 100644 --- a/logservice/logpuller/region_req_cache.go +++ b/logservice/logpuller/region_req_cache.go @@ -88,12 +88,14 @@ func newRequestCache(maxPendingCount int) *requestCache { } // add adds a new region request to the cache -// It blocks if pendingCount >= maxPendingCount until there's space or ctx is cancelled +// It returns (false, nil) when it fails to add within a short retry window so that the caller +// can reschedule the request (used by non-stop region requests). func (c *requestCache) add(ctx context.Context, region regionInfo, force bool) (bool, error) { start := time.Now() ticker := time.NewTicker(addReqRetryInterval) defer ticker.Stop() - addReqRetryLimit := addReqRetryLimit + retryRemaining := addReqRetryLimit + mustSucceed := force && region.isStopTableTask() for { current := c.pendingCount.Load() @@ -111,8 +113,11 @@ func (c *requestCache) add(ctx context.Context, region regionInfo, force bool) ( case <-c.spaceAvailable: continue case <-ticker.C: - addReqRetryLimit-- - if addReqRetryLimit <= 0 { + if mustSucceed { + continue + } + retryRemaining-- + if retryRemaining <= 0 { return false, nil } continue @@ -122,8 +127,11 @@ func (c *requestCache) add(ctx context.Context, region regionInfo, force bool) ( // Wait for space to become available select { case <-ticker.C: - addReqRetryLimit-- - if addReqRetryLimit <= 0 { + if mustSucceed { + continue + } + retryRemaining-- + if retryRemaining <= 0 { return false, nil } continue @@ -135,6 +143,16 @@ func (c *requestCache) add(ctx context.Context, region regionInfo, force bool) ( } } +// finishPoppedNotSent marks a popped request as finished when it won't be tracked by sentRequests. +// It decrements pendingCount and notifies waiters. +func (c *requestCache) finishPoppedNotSent() { + c.decPendingCount() + select { + case c.spaceAvailable <- struct{}{}: + default: + } +} + // pop gets the next pending request, returns nil if queue is empty func (c *requestCache) pop(ctx context.Context) (regionReq, error) { select { diff --git a/logservice/logpuller/region_req_cache_test.go b/logservice/logpuller/region_req_cache_test.go index 76e725fa8e..4a468c8a84 100644 --- a/logservice/logpuller/region_req_cache_test.go +++ b/logservice/logpuller/region_req_cache_test.go @@ -125,6 +125,71 @@ func TestRequestCacheAdd_ContextCancellation(t *testing.T) { require.Equal(t, context.Canceled, err) } +func TestRequestCacheAdd_StopTableTaskMustSucceed(t *testing.T) { + cache := newRequestCache(1) + ctx := context.Background() + + // Fill the cache to make the channel full. + region1 := createTestRegionInfo(1, 1) + ok, err := cache.add(ctx, region1, false) + require.True(t, ok) + require.NoError(t, err) + + stopTask := regionInfo{subscribedSpan: region1.subscribedSpan} + + ctx2, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + + done := make(chan struct{}) + var addOK bool + var addErr error + go func() { + addOK, addErr = cache.add(ctx2, stopTask, true) + close(done) + }() + + // The stop-table task should block (not fail fast) while the channel is full. + select { + case <-done: + t.Fatal("stop-table task should not fail fast when the channel is full") + case <-time.After(10 * time.Millisecond): + } + + // Make space and ensure it can be added. + _, err = cache.pop(ctx) + require.NoError(t, err) + + select { + case <-done: + case <-time.After(200 * time.Millisecond): + t.Fatal("stop-table task should be added after space becomes available") + } + require.True(t, addOK) + require.NoError(t, addErr) + + req, err := cache.pop(ctx) + require.NoError(t, err) + require.True(t, req.regionInfo.isStopTableTask()) +} + +func TestRequestCacheFinishPoppedNotSent(t *testing.T) { + cache := newRequestCache(10) + ctx := context.Background() + + region := createTestRegionInfo(1, 1) + ok, err := cache.add(ctx, region, false) + require.True(t, ok) + require.NoError(t, err) + require.Equal(t, 1, cache.getPendingCount()) + + _, err = cache.pop(ctx) + require.NoError(t, err) + require.Equal(t, 1, cache.getPendingCount()) + + cache.finishPoppedNotSent() + require.Equal(t, 0, cache.getPendingCount()) +} + func TestRequestCacheAdd_RetryLimitExceeded(t *testing.T) { cache := newRequestCache(1) ctx := context.Background() diff --git a/logservice/logpuller/region_request_worker.go b/logservice/logpuller/region_request_worker.go index 35d5037038..062df0dd2c 100644 --- a/logservice/logpuller/region_request_worker.go +++ b/logservice/logpuller/region_request_worker.go @@ -88,12 +88,16 @@ func newRegionRequestWorker( if err != nil { return err } + if region.regionInfo.isStopTableTask() || region.regionInfo.isStopped() { + // Stop-table tasks (and unexpected stopped requests) don't need a grpc stream here. + // They are either handled on an existing stream, or can be safely dropped when no stream exists. + worker.requestCache.finishPoppedNotSent() + continue + } if !region.regionInfo.isStopped() { worker.preFetchForConnecting = new(regionInfo) *worker.preFetchForConnecting = region.regionInfo return nil - } else { - continue } } } @@ -134,7 +138,7 @@ func newRegionRequestWorker( } // The store may fail forever, so we need try to re-schedule all pending regions. for _, region := range worker.clearPendingRegions() { - if region.isStopped() { + if region.isStopTableTask() { // It means it's a special task for stopping the table. continue } @@ -365,7 +369,7 @@ func (s *regionRequestWorker) processRegionSendTask( zap.Bool("bdrMode", region.filterLoop)) // It means it's a special task for stopping the table. - if region.isStopped() { + if region.isStopTableTask() { req := &cdcpb.ChangeDataRequest{ Header: &cdcpb.Header{ClusterId: s.client.clusterID, TicdcVersion: version.ReleaseSemver()}, RequestId: uint64(subID), @@ -375,6 +379,7 @@ func (s *regionRequestWorker) processRegionSendTask( FilterLoop: region.filterLoop, } if err := doSend(req); err != nil { + s.requestCache.finishPoppedNotSent() return err } for _, state := range s.takeRegionStates(subID) { @@ -384,17 +389,20 @@ func (s *regionRequestWorker) processRegionSendTask( } s.client.pushRegionEventToDS(subID, regionEvent) } + s.requestCache.finishPoppedNotSent() } else if region.subscribedSpan.stopped.Load() { // It can be skipped directly because there must be no pending states from // the stopped subscribedTable, or the special singleRegionInfo for stopping // the table will be handled later. + s.requestCache.finishPoppedNotSent() s.client.onRegionFail(newRegionErrorInfo(region, &sendRequestToStoreErr{})) } else { state := newRegionFeedState(region, uint64(subID), s) state.start() s.addRegionState(subID, region.verID.GetID(), state) if err := doSend(s.createRegionRequest(region)); err != nil { + s.requestCache.finishPoppedNotSent() return err } s.requestCache.markSent(regionReq) @@ -484,6 +492,7 @@ func (s *regionRequestWorker) clearPendingRegions() []regionInfo { if s.preFetchForConnecting != nil { region := *s.preFetchForConnecting s.preFetchForConnecting = nil + s.requestCache.finishPoppedNotSent() regions = append(regions, region) } diff --git a/logservice/logpuller/region_state.go b/logservice/logpuller/region_state.go index e9c21a7aad..8b56a6e8be 100644 --- a/logservice/logpuller/region_state.go +++ b/logservice/logpuller/region_state.go @@ -53,6 +53,12 @@ func (s *regionInfo) isStopped() bool { return s.lockedRangeState == nil } +func (s *regionInfo) isStopTableTask() bool { + // Stop-table task is a special regionInfo used to deregister a subscription from + // all TiKV stores. It has no locked range state and carries no region verID. + return s.lockedRangeState == nil && s.verID.GetID() == 0 +} + func newRegionInfo( verID tikv.RegionVerID, span heartbeatpb.TableSpan, diff --git a/logservice/logpuller/subscription_client.go b/logservice/logpuller/subscription_client.go index 1722fb9dfc..d377028130 100644 --- a/logservice/logpuller/subscription_client.go +++ b/logservice/logpuller/subscription_client.go @@ -112,6 +112,9 @@ type subscribedSpan struct { // The target span span heartbeatpb.TableSpan + // Whether to filter out the value written by cdc itself. + // It should be `true` in BDR mode. + filterLoop bool // The range lock of the span, // it is used to prevent duplicate requests to the same region range, // and it also used to calculate this table's resolvedTs. @@ -357,6 +360,7 @@ func (s *subscriptionClient) Subscribe( } rt := s.newSubscribedSpan(subID, span, startTs, consumeKVEvents, advanceResolvedTs, advanceInterval) + rt.filterLoop = bdrMode s.totalSpans.Lock() s.totalSpans.spanMap[subID] = rt s.totalSpans.Unlock() @@ -473,7 +477,7 @@ func (s *subscriptionClient) setTableStopped(rt *subscribedSpan) { // Then send a special singleRegionInfo to regionRouter to deregister the table // from all TiKV instances. if rt.stopped.CompareAndSwap(false, true) { - s.regionTaskQueue.Push(NewRegionPriorityTask(TaskHighPrior, regionInfo{subscribedSpan: rt}, s.pdClock.CurrentTS())) + s.regionTaskQueue.Push(NewRegionPriorityTask(TaskHighPrior, regionInfo{subscribedSpan: rt, filterLoop: rt.filterLoop}, s.pdClock.CurrentTS())) if rt.rangeLock.Stop() { s.onTableDrained(rt) }