-
Notifications
You must be signed in to change notification settings - Fork 204
implement server agent draining #795
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -49,6 +49,25 @@ type Backend struct { | |
| // cached from conn.Context() | ||
| id string | ||
| idents header.Identifiers | ||
|
|
||
| // draining indicates if this backend is draining and should not accept new connections | ||
| draining bool | ||
| // mu protects draining field | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider using |
||
| mu sync.RWMutex | ||
| } | ||
|
|
||
| // IsDraining returns true if the backend is draining | ||
| func (b *Backend) IsDraining() bool { | ||
| b.mu.RLock() | ||
| defer b.mu.RUnlock() | ||
| return b.draining | ||
| } | ||
|
|
||
| // SetDraining marks the backend as draining | ||
| func (b *Backend) SetDraining() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Verify lock ordering: |
||
| b.mu.Lock() | ||
| defer b.mu.Unlock() | ||
| b.draining = true | ||
| } | ||
|
|
||
| func (b *Backend) Send(p *client.Packet) error { | ||
|
|
@@ -346,9 +365,36 @@ func (s *DefaultBackendStorage) GetRandomBackend() (*Backend, error) { | |
| if len(s.backends) == 0 { | ||
| return nil, &ErrNotFound{} | ||
| } | ||
| agentID := s.agentIDs[s.random.Intn(len(s.agentIDs))] | ||
| klog.V(3).InfoS("Pick agent as backend", "agentID", agentID) | ||
| // always return the first connection to an agent, because the agent | ||
| // will close later connections if there are multiple. | ||
| return s.backends[agentID][0], nil | ||
|
|
||
| var firstDrainingBackend *Backend | ||
|
|
||
| // Start at a random agent and check each agent in sequence | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The complexity here changes from O(1) to O(N). While likely acceptable given the typical number of agents, please ensure this doesn't introduce latency spikes in large clusters. |
||
| startIdx := s.random.Intn(len(s.agentIDs)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This loop changes the complexity of |
||
| for i := 0; i < len(s.agentIDs); i++ { | ||
| // Wrap around using modulo | ||
| currentIdx := (startIdx + i) % len(s.agentIDs) | ||
| agentID := s.agentIDs[currentIdx] | ||
| // always return the first connection to an agent, because the agent | ||
| // will close later connections if there are multiple. | ||
| backend := s.backends[agentID][0] | ||
|
|
||
| if !backend.IsDraining() { | ||
| klog.V(3).InfoS("Pick agent as backend", "agentID", agentID) | ||
| return backend, nil | ||
| } | ||
|
|
||
| // Keep track of first draining backend as fallback | ||
| if firstDrainingBackend == nil { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The fallback logic is critical here. It would be great to see a unit test in
|
||
| firstDrainingBackend = backend | ||
| } | ||
| } | ||
|
|
||
| // All agents are draining, use one as fallback | ||
| if firstDrainingBackend != nil { | ||
| agentID := firstDrainingBackend.id | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Inconsistency in log levels: https://github.com/kubernetes/community/blob/master/contributors/devel/sig-instrumentation/logging.md suggests that V2 is for significant changes in state. If we detected that now we had run out of non-draining backends I would agree V2. As it is I would go more for V3. |
||
| klog.V(2).InfoS("No non-draining backends available, using draining backend as fallback", "agentID", agentID) | ||
| return firstDrainingBackend, nil | ||
| } | ||
|
|
||
| return nil, &ErrNotFound{} | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -79,8 +79,25 @@ func (dibm *DestHostBackendManager) Backend(ctx context.Context) (*Backend, erro | |
| if destHost != "" { | ||
| bes, exist := dibm.backends[destHost] | ||
| if exist && len(bes) > 0 { | ||
| klog.V(5).InfoS("Get the backend through the DestHostBackendManager", "destHost", destHost) | ||
| return dibm.backends[destHost][0], nil | ||
| var firstDrainingBackend *Backend | ||
|
|
||
| // Find a non-draining backend for this destination host | ||
| for _, backend := range bes { | ||
| if !backend.IsDraining() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as |
||
| klog.V(5).InfoS("Get the backend through the DestHostBackendManager", "destHost", destHost) | ||
| return backend, nil | ||
| } | ||
| // Keep track of first draining backend as fallback | ||
| if firstDrainingBackend == nil { | ||
| firstDrainingBackend = backend | ||
| } | ||
| } | ||
|
|
||
| // All backends for this destination are draining, use one as fallback | ||
| if firstDrainingBackend != nil { | ||
| klog.V(4).InfoS("All backends for destination host are draining, using one as fallback", "destHost", destHost) | ||
| return firstDrainingBackend, nil | ||
| } | ||
| } | ||
| } | ||
| return nil, &ErrNotFound{} | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ensure that
cs.drainChis properly initialized inNewClientSet(or whereverClientSetis created). If it isnil, this case will block (if it were a lone case) or be skipped (in aselectwithdefault), effectively disabling the check.I believe its setup by SetupSignalHandler but given the distance between it would be good to be sure.