-
Notifications
You must be signed in to change notification settings - Fork 38
[WIP] #4194
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[WIP] #4194
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -88,12 +88,16 @@ func newRegionRequestWorker( | |
| if err != nil { | ||
| return err | ||
| } | ||
| if region.regionInfo.isStopTableTask() || region.regionInfo.isStopped() { | ||
| // Stop-table tasks (and unexpected stopped requests) don't need a grpc stream here. | ||
| // They are either handled on an existing stream, or can be safely dropped when no stream exists. | ||
| worker.requestCache.finishPoppedNotSent() | ||
| continue | ||
| } | ||
| if !region.regionInfo.isStopped() { | ||
| worker.preFetchForConnecting = new(regionInfo) | ||
| *worker.preFetchForConnecting = region.regionInfo | ||
| return nil | ||
| } else { | ||
| continue | ||
| } | ||
| } | ||
|
Comment on lines
+91
to
102
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion | 🟠 Major Dead branch: the After the guard at Line 91 filters out both Consider simplifying to remove the stale guard: ♻️ Remove redundant isStopped check for {
region, err := worker.requestCache.pop(ctx)
if err != nil {
return err
}
if region.regionInfo.isStopTableTask() || region.regionInfo.isStopped() {
// Stop-table tasks (and unexpected stopped requests) don't need a grpc stream here.
// They are either handled on an existing stream, or can be safely dropped when no stream exists.
worker.requestCache.finishPoppedNotSent()
continue
}
- if !region.regionInfo.isStopped() {
- worker.preFetchForConnecting = new(regionInfo)
- *worker.preFetchForConnecting = region.regionInfo
- return nil
- }
+ worker.preFetchForConnecting = new(regionInfo)
+ *worker.preFetchForConnecting = region.regionInfo
+ return nil
}🤖 Prompt for AI Agents |
||
| } | ||
|
|
@@ -134,7 +138,7 @@ func newRegionRequestWorker( | |
| } | ||
| // The store may fail forever, so we need try to re-schedule all pending regions. | ||
| for _, region := range worker.clearPendingRegions() { | ||
| if region.isStopped() { | ||
| if region.isStopTableTask() { | ||
| // It means it's a special task for stopping the table. | ||
| continue | ||
| } | ||
|
|
@@ -365,7 +369,7 @@ func (s *regionRequestWorker) processRegionSendTask( | |
| zap.Bool("bdrMode", region.filterLoop)) | ||
|
|
||
| // It means it's a special task for stopping the table. | ||
| if region.isStopped() { | ||
| if region.isStopTableTask() { | ||
| req := &cdcpb.ChangeDataRequest{ | ||
| Header: &cdcpb.Header{ClusterId: s.client.clusterID, TicdcVersion: version.ReleaseSemver()}, | ||
| RequestId: uint64(subID), | ||
|
|
@@ -375,6 +379,7 @@ func (s *regionRequestWorker) processRegionSendTask( | |
| FilterLoop: region.filterLoop, | ||
| } | ||
| if err := doSend(req); err != nil { | ||
| s.requestCache.finishPoppedNotSent() | ||
| return err | ||
| } | ||
| for _, state := range s.takeRegionStates(subID) { | ||
|
|
@@ -384,17 +389,20 @@ func (s *regionRequestWorker) processRegionSendTask( | |
| } | ||
| s.client.pushRegionEventToDS(subID, regionEvent) | ||
| } | ||
| s.requestCache.finishPoppedNotSent() | ||
|
|
||
| } else if region.subscribedSpan.stopped.Load() { | ||
| // It can be skipped directly because there must be no pending states from | ||
| // the stopped subscribedTable, or the special singleRegionInfo for stopping | ||
| // the table will be handled later. | ||
| s.requestCache.finishPoppedNotSent() | ||
| s.client.onRegionFail(newRegionErrorInfo(region, &sendRequestToStoreErr{})) | ||
| } else { | ||
| state := newRegionFeedState(region, uint64(subID), s) | ||
| state.start() | ||
| s.addRegionState(subID, region.verID.GetID(), state) | ||
| if err := doSend(s.createRegionRequest(region)); err != nil { | ||
| s.requestCache.finishPoppedNotSent() | ||
| return err | ||
| } | ||
| s.requestCache.markSent(regionReq) | ||
|
|
@@ -484,6 +492,7 @@ func (s *regionRequestWorker) clearPendingRegions() []regionInfo { | |
| if s.preFetchForConnecting != nil { | ||
| region := *s.preFetchForConnecting | ||
| s.preFetchForConnecting = nil | ||
| s.requestCache.finishPoppedNotSent() | ||
| regions = append(regions, region) | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic here can be simplified. The condition
region.regionInfo.isStopTableTask() || region.regionInfo.isStopped()is equivalent toregion.regionInfo.isStopped(), becauseisStopTableTask()impliesisStopped(). Also, the subsequentif !region.regionInfo.isStopped()check becomes redundant. The logic can be rewritten as a simple if/else structure for better clarity.