Skip to content

Commit c012af6

Browse files
author
Dmitry Rozhkov
authored
Merge pull request #195 from paulhodson/master
server.Stop() shuts down the socket rather than restarting it.
2 parents 363bf49 + 5279dac commit c012af6

File tree

1 file changed

+36
-4
lines changed

1 file changed

+36
-4
lines changed

pkg/deviceplugin/server.go

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"os"
2222
"path"
2323
"path/filepath"
24+
"sync"
2425
"time"
2526

2627
"github.com/fsnotify/fsnotify"
@@ -32,6 +33,15 @@ import (
3233
"github.com/intel/intel-device-plugins-for-kubernetes/pkg/debug"
3334
)
3435

36+
type serverState int
37+
38+
// Server state
39+
const (
40+
uninitialized serverState = iota
41+
serving
42+
terminating
43+
)
44+
3545
// devicePluginServer maintains a gRPC server satisfying
3646
// pluginapi.PluginInterfaceServer interfaces.
3747
// This internal unexposed interface simplifies unit testing.
@@ -48,6 +58,8 @@ type server struct {
4858
updatesCh chan map[string]DeviceInfo
4959
devices map[string]DeviceInfo
5060
postAllocate func(*pluginapi.AllocateResponse) error
61+
state serverState
62+
stateMutex sync.Mutex
5163
}
5264

5365
// newServer creates a new server satisfying the devicePluginServer interface.
@@ -57,6 +69,7 @@ func newServer(devType string, postAllocate func(*pluginapi.AllocateResponse) er
5769
updatesCh: make(chan map[string]DeviceInfo, 1), // TODO: is 1 needed?
5870
devices: make(map[string]DeviceInfo),
5971
postAllocate: postAllocate,
72+
state: uninitialized,
6073
}
6174
}
6275

@@ -149,6 +162,7 @@ func (srv *server) Stop() error {
149162
if srv.grpcServer == nil {
150163
return errors.New("Can't stop non-existing gRPC server. Calling Stop() before Serve()?")
151164
}
165+
srv.setState(terminating)
152166
srv.grpcServer.Stop()
153167
close(srv.updatesCh)
154168
return nil
@@ -159,12 +173,25 @@ func (srv *server) Update(devices map[string]DeviceInfo) {
159173
srv.updatesCh <- devices
160174
}
161175

176+
func (srv *server) setState(state serverState) {
177+
srv.stateMutex.Lock()
178+
defer srv.stateMutex.Unlock()
179+
srv.state = state
180+
}
181+
182+
func (srv *server) getState() serverState {
183+
srv.stateMutex.Lock()
184+
defer srv.stateMutex.Unlock()
185+
return srv.state
186+
}
187+
162188
// setupAndServe binds given gRPC server to device manager, starts it and registers it with kubelet.
163189
func (srv *server) setupAndServe(namespace string, devicePluginPath string, kubeletSocket string) error {
164190
resourceName := namespace + "/" + srv.devType
165191
pluginPrefix := namespace + "-" + srv.devType
192+
srv.setState(serving)
166193

167-
for {
194+
for srv.getState() == serving {
168195
pluginEndpoint := pluginPrefix + ".sock"
169196
pluginSocket := path.Join(devicePluginPath, pluginEndpoint)
170197

@@ -204,11 +231,16 @@ func (srv *server) setupAndServe(namespace string, devicePluginPath string, kube
204231
if err = watchFile(pluginSocket); err != nil {
205232
return err
206233
}
207-
fmt.Printf("Socket %s removed, restarting\n", pluginSocket)
208234

209-
srv.grpcServer.Stop()
210-
os.Remove(pluginSocket)
235+
if srv.getState() == serving {
236+
srv.grpcServer.Stop()
237+
fmt.Printf("Socket %s removed, restarting\n", pluginSocket)
238+
} else {
239+
fmt.Printf("Socket %s shut down\n", pluginSocket)
240+
}
211241
}
242+
243+
return nil
212244
}
213245

214246
func watchFile(file string) error {

0 commit comments

Comments
 (0)