Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pkg/agent/clientset.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,14 @@ func (cs *ClientSet) sync() {
}

func (cs *ClientSet) connectOnce() error {
// Skip establishing new connections if draining
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure that cs.drainCh is properly initialized in NewClientSet (or wherever ClientSet is created). If it is nil, this case will block (if it were a lone case) or be skipped (in a select with default), effectively disabling the check.

I believe its setup by SetupSignalHandler but given the distance between it would be good to be sure.

select {
case <-cs.drainCh:
klog.V(2).InfoS("Skipping connectOnce - agent is draining")
return nil
default:
}

serverCount := cs.determineServerCount()

// If not in syncForever mode, we only connect if we have fewer connections than the server count.
Expand Down
56 changes: 51 additions & 5 deletions pkg/server/backend_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,25 @@ type Backend struct {
// cached from conn.Context()
id string
idents header.Identifiers

// draining indicates if this backend is draining and should not accept new connections
draining bool
// mu protects draining field
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using sync/atomic (e.g., atomic.Bool or int32) for the draining field. This would allow IsDraining() to be lock-free, reducing the overhead inside the O(N) loop in GetRandomBackend, which runs under the global s.mu lock.

mu sync.RWMutex
}

// IsDraining returns true if the backend is draining
func (b *Backend) IsDraining() bool {
b.mu.RLock()
defer b.mu.RUnlock()
return b.draining
}

// SetDraining marks the backend as draining
func (b *Backend) SetDraining() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Verify lock ordering: GetRandomBackend holds s.mu (Lock) and calls IsDraining (which takes b.mu RLock). SetDraining takes b.mu Lock. Ensure no code path attempts to acquire s.mu while holding b.mu to avoid potential deadlocks.

b.mu.Lock()
defer b.mu.Unlock()
b.draining = true
}

func (b *Backend) Send(p *client.Packet) error {
Expand Down Expand Up @@ -346,9 +365,36 @@ func (s *DefaultBackendStorage) GetRandomBackend() (*Backend, error) {
if len(s.backends) == 0 {
return nil, &ErrNotFound{}
}
agentID := s.agentIDs[s.random.Intn(len(s.agentIDs))]
klog.V(3).InfoS("Pick agent as backend", "agentID", agentID)
// always return the first connection to an agent, because the agent
// will close later connections if there are multiple.
return s.backends[agentID][0], nil

var firstDrainingBackend *Backend

// Start at a random agent and check each agent in sequence
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The complexity here changes from O(1) to O(N). While likely acceptable given the typical number of agents, please ensure this doesn't introduce latency spikes in large clusters.

startIdx := s.random.Intn(len(s.agentIDs))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This loop changes the complexity of GetRandomBackend from O(1) to O(N) while holding the DefaultBackendStorage exclusive lock (s.mu). In clusters with thousands of agents, this extends the critical section and could impact throughput. If performance becomes an issue, consider maintaining a separate list/set of non-draining agents.

for i := 0; i < len(s.agentIDs); i++ {
// Wrap around using modulo
currentIdx := (startIdx + i) % len(s.agentIDs)
agentID := s.agentIDs[currentIdx]
// always return the first connection to an agent, because the agent
// will close later connections if there are multiple.
backend := s.backends[agentID][0]

if !backend.IsDraining() {
klog.V(3).InfoS("Pick agent as backend", "agentID", agentID)
return backend, nil
}

// Keep track of first draining backend as fallback
if firstDrainingBackend == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fallback logic is critical here. It would be great to see a unit test in pkg/server/backend_manager_test.go ensuring that:

  1. Non-draining backends are prioritized.
  2. Draining backends are returned if no others are available (fallback).

firstDrainingBackend = backend
}
}

// All agents are draining, use one as fallback
if firstDrainingBackend != nil {
agentID := firstDrainingBackend.id
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistency in log levels: GetRandomBackend logs the fallback event at V(2), whereas DestHostBackendManager logs a similar fallback event at V(4).

https://github.com/kubernetes/community/blob/master/contributors/devel/sig-instrumentation/logging.md suggests that V2 is for significant changes in state. If we detected that now we had run out of non-draining backends I would agree V2. As it is I would go more for V3.

klog.V(2).InfoS("No non-draining backends available, using draining backend as fallback", "agentID", agentID)
return firstDrainingBackend, nil
}

return nil, &ErrNotFound{}
}
21 changes: 19 additions & 2 deletions pkg/server/desthost_backend_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,25 @@ func (dibm *DestHostBackendManager) Backend(ctx context.Context) (*Backend, erro
if destHost != "" {
bes, exist := dibm.backends[destHost]
if exist && len(bes) > 0 {
klog.V(5).InfoS("Get the backend through the DestHostBackendManager", "destHost", destHost)
return dibm.backends[destHost][0], nil
var firstDrainingBackend *Backend

// Find a non-draining backend for this destination host
for _, backend := range bes {
if !backend.IsDraining() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as DefaultBackendStorage, adding a test case for this logic in DestHostBackendManager would be beneficial to ensure the preference and fallback behavior work correctly.

klog.V(5).InfoS("Get the backend through the DestHostBackendManager", "destHost", destHost)
return backend, nil
}
// Keep track of first draining backend as fallback
if firstDrainingBackend == nil {
firstDrainingBackend = backend
}
}

// All backends for this destination are draining, use one as fallback
if firstDrainingBackend != nil {
klog.V(4).InfoS("All backends for destination host are draining, using one as fallback", "destHost", destHost)
return firstDrainingBackend, nil
}
}
}
return nil, &ErrNotFound{}
Expand Down
2 changes: 2 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -999,6 +999,8 @@ func (s *ProxyServer) serveRecvBackend(backend *Backend, agentID string, recvCh

case client.PacketType_DRAIN:
klog.V(2).InfoS("agent is draining", "agentID", agentID)
backend.SetDraining()
klog.V(2).InfoS("marked backend as draining, will not route new requests to this agent", "agentID", agentID)
default:
klog.V(5).InfoS("Ignoring unrecognized packet from backend", "packet", pkt, "agentID", agentID)
}
Expand Down