Skip to content

Commit 8239e14

Browse files
author
Michael Hammann
committed
feature: implement depends_on parameter to spawn processes only when all dependees are in RUNNING state
1 parent 8dd528c commit 8239e14

File tree

4 files changed

+159
-13
lines changed

4 files changed

+159
-13
lines changed

supervisor/options.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -930,6 +930,8 @@ def get(section, opt, *args, **kwargs):
930930
serverurl = get(section, 'serverurl', None)
931931
if serverurl and serverurl.strip().upper() == 'AUTO':
932932
serverurl = None
933+
depends_on = get(section, 'depends_on', None)
934+
spawn_timeout = int(get(section, 'spawn_timeout', 60))
933935

934936
# find uid from "user" option
935937
user = get(section, 'user', None)
@@ -1055,7 +1057,10 @@ def get(section, opt, *args, **kwargs):
10551057
exitcodes=exitcodes,
10561058
redirect_stderr=redirect_stderr,
10571059
environment=environment,
1058-
serverurl=serverurl)
1060+
serverurl=serverurl,
1061+
depends_on=depends_on,
1062+
spawn_timeout=spawn_timeout,
1063+
)
10591064

10601065
programs.append(pconfig)
10611066

@@ -1873,7 +1878,8 @@ class ProcessConfig(Config):
18731878
'stderr_events_enabled', 'stderr_syslog',
18741879
'stopsignal', 'stopwaitsecs', 'stopasgroup', 'killasgroup',
18751880
'exitcodes', 'redirect_stderr' ]
1876-
optional_param_names = [ 'environment', 'serverurl' ]
1881+
optional_param_names = [ 'environment', 'serverurl',
1882+
'depends_on', 'spawn_timeout' ]
18771883

18781884
def __init__(self, options, **params):
18791885
self.options = options

supervisor/process.py

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -188,11 +188,29 @@ def record_spawnerr(self, msg):
188188
self.spawnerr = msg
189189
self.config.options.logger.info("spawnerr: %s" % msg)
190190

191-
def spawn(self):
191+
def queue_all_dependee_processes(self, supervisor):
192+
if (self.config.name not in supervisor.process_spawn_dict.keys() and
193+
self.config.name not in supervisor.process_started_dict.keys()):
194+
supervisor.process_spawn_dict[self.config.name] = self
195+
if self.config.depends_on is not None:
196+
for dependee in self.config.depends_on.values():
197+
if dependee.state is not ProcessStates.RUNNING and dependee.state is not ProcessStates.STARTING:
198+
if (dependee.config.name not in supervisor.process_spawn_dict.keys() and
199+
dependee.config.name not in supervisor.process_started_dict.keys()):
200+
supervisor.process_spawn_dict[dependee.config.name] = dependee
201+
dependee.queue_all_dependee_processes(supervisor)
202+
203+
def spawn(self, supervisor=None):
192204
"""Start the subprocess. It must not be running already.
193205
194206
Return the process id. If the fork() call fails, return None.
195207
"""
208+
if self.config.depends_on is not None:
209+
if any([dependee.state is not ProcessStates.RUNNING for dependee in
210+
self.config.depends_on.values()]):
211+
self.queue_all_dependee_processes(supervisor)
212+
return
213+
196214
options = self.config.options
197215
processname = as_string(self.config.name)
198216

@@ -647,7 +665,7 @@ def __repr__(self):
647665
def get_state(self):
648666
return self.state
649667

650-
def transition(self):
668+
def transition(self, supervisor=None):
651669
now = time.time()
652670
state = self.state
653671

@@ -659,22 +677,22 @@ def transition(self):
659677
# dont start any processes if supervisor is shutting down
660678
if state == ProcessStates.EXITED:
661679
if self.config.autorestart:
680+
# STOPPED -> STARTING
662681
if self.config.autorestart is RestartUnconditionally:
663682
# EXITED -> STARTING
664-
self.spawn()
683+
self.spawn(supervisor)
665684
else: # autorestart is RestartWhenExitUnexpected
666685
if self.exitstatus not in self.config.exitcodes:
667686
# EXITED -> STARTING
668-
self.spawn()
687+
self.spawn(supervisor)
669688
elif state == ProcessStates.STOPPED and not self.laststart:
670689
if self.config.autostart:
671-
# STOPPED -> STARTING
672-
self.spawn()
690+
self.spawn(supervisor)
673691
elif state == ProcessStates.BACKOFF:
674692
if self.backoff <= self.config.startretries:
675693
if now > self.delay:
676694
# BACKOFF -> STARTING
677-
self.spawn()
695+
self.spawn(supervisor)
678696

679697
processname = as_string(self.config.name)
680698
if state == ProcessStates.STARTING:
@@ -836,9 +854,9 @@ def before_remove(self):
836854
pass
837855

838856
class ProcessGroup(ProcessGroupBase):
839-
def transition(self):
857+
def transition(self, supervisor=None):
840858
for proc in self.processes.values():
841-
proc.transition()
859+
proc.transition(supervisor)
842860

843861
class FastCGIProcessGroup(ProcessGroup):
844862

supervisor/rpcinterface.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,9 @@ def startProcess(self, name, wait=True):
281281
@return boolean result Always true unless error
282282
283283
"""
284+
## check if the process is dependent upon any other process and if so make sure that one is in the RUNNING state
285+
group, process = self._getGroupAndProcess(name)
286+
284287
self._update('startProcess')
285288
group, process = self._getGroupAndProcess(name)
286289
if process is None:
@@ -303,7 +306,11 @@ def startProcess(self, name, wait=True):
303306
raise RPCError(Faults.FAILED,
304307
"%s is in an unknown process state" % name)
305308

306-
process.spawn()
309+
process.spawn(self.supervisord)
310+
# if process has dependees, return succesfull start -
311+
# errors will be handled in main loop and inside process.spawn()
312+
if process.config.depends_on is not None:
313+
return True
307314

308315
# We call reap() in order to more quickly obtain the side effects of
309316
# process.finish(), which reap() eventually ends up calling. This
@@ -592,6 +599,8 @@ def getAllConfigInfo(self):
592599
'stderr_logfile_backups': pconfig.stderr_logfile_backups,
593600
'stderr_logfile_maxbytes': pconfig.stderr_logfile_maxbytes,
594601
'stderr_syslog': pconfig.stderr_syslog,
602+
'depends_on': pconfig.depends_on,
603+
'spawn_timeout': pconfig.spawn_timeout,
595604
}
596605
# no support for these types in xml-rpc
597606
d.update((k, 'auto') for k, v in d.items() if v is Automatic)

supervisor/supervisord.py

Lines changed: 114 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@
4444
from supervisor import events
4545
from supervisor.states import SupervisorStates
4646
from supervisor.states import getProcessStateDescription
47+
from supervisor.graphutils import Graph
48+
49+
from supervisor.states import ProcessStates
4750

4851
class Supervisor:
4952
stopping = False # set after we detect that we are handling a stop request
@@ -55,6 +58,8 @@ def __init__(self, options):
5558
self.options = options
5659
self.process_groups = {}
5760
self.ticks = {}
61+
self.process_spawn_dict = dict()
62+
self.process_started_dict = dict()
5863

5964
def main(self):
6065
if not self.options.first:
@@ -84,6 +89,29 @@ def run(self):
8489
try:
8590
for config in self.options.process_group_configs:
8691
self.add_process_group(config)
92+
# add processes to directed graph, to check for dependency cycles
93+
g = Graph(len(self.options.process_group_configs))
94+
# replace depends_on string with actual process object
95+
for config in (self.options.process_group_configs):
96+
# check dependencies for all programs in group:
97+
for conf in enumerate(config.process_configs):
98+
if config.process_configs[conf[0]].depends_on is not None:
99+
process_dict=dict({})
100+
# split to get all processes in case there are multiple dependencies
101+
dependent_processes = (config.process_configs[conf[0]].depends_on).split()
102+
for process in dependent_processes:
103+
# this can be of form group:process or simply process
104+
try:
105+
dependent_group, dependent_process=process.split(":")
106+
except:
107+
dependent_group=dependent_process=process
108+
g.addEdge(config.process_configs[conf[0]].name, dependent_process)
109+
process_dict[dependent_process] = self.process_groups[dependent_group].processes[dependent_process]
110+
config.process_configs[conf[0]].depends_on = process_dict
111+
# check for cyclical process dependencies
112+
if g.cyclic() == 1:
113+
raise AttributeError('Process config contains dependeny cycle(s)! Check config files again!')
114+
87115
self.options.openhttpservers(self)
88116
self.options.setsignals()
89117
if (not self.options.nodaemon) and self.options.first:
@@ -239,7 +267,10 @@ def runforever(self):
239267
combined_map[fd].handle_error()
240268

241269
for group in pgroups:
242-
group.transition()
270+
group.transition(self)
271+
272+
self._spawn_dependee_queue()
273+
self._handle_spawn_timeout()
243274

244275
self.reap()
245276
self.handle_signal()
@@ -316,6 +347,88 @@ def handle_signal(self):
316347
def get_state(self):
317348
return self.options.mood
318349

350+
def _spawn_dependee_queue(self):
351+
"""
352+
Iterate over processes that are not started but added to
353+
process_spawn_dict. Spawn all processes which are ready
354+
(All dependees RUNNING or process without dependees)
355+
"""
356+
if self.process_spawn_dict:
357+
for process_name, process_object in list(self.process_spawn_dict.items()):
358+
if process_object.config.depends_on is not None:
359+
if any([dependee.state is ProcessStates.FATAL for dependee in
360+
process_object.config.depends_on.values()]):
361+
self._set_fatal_state_and_empty_queue()
362+
break
363+
if all([dependee.state is ProcessStates.RUNNING for dependee in
364+
process_object.config.depends_on.values()]):
365+
self._spawn_process_from_process_dict(process_name, process_object)
366+
else:
367+
self._spawn_process_from_process_dict(process_name, process_object)
368+
369+
def _spawn_process_from_process_dict(self, process_name, process_object):
370+
self.process_started_dict[process_name] = process_object
371+
del self.process_spawn_dict[process_name]
372+
# only spawn if the process is not running yet (could be started in the meanwhile)
373+
if (process_object.state is not ProcessStates.STARTING and
374+
process_object.state is not ProcessStates.RUNNING):
375+
process_object.spawn(self)
376+
process_object.notify_timer = 5
377+
378+
def _set_fatal_state_and_empty_queue(self):
379+
for process_name, process_object in self.process_spawn_dict.items():
380+
process_object.record_spawnerr(
381+
'Dependee process did not start - set FATAL state for {}'
382+
.format(process_name))
383+
process_object.change_state(ProcessStates.FATAL)
384+
self.process_spawn_set = set()
385+
self.process_spawn_dict = dict()
386+
387+
def _handle_spawn_timeout(self):
388+
"""
389+
Log info message each 5 seconds if some process is waiting on a dependee
390+
Timeout if a process needs longer than spawn_timeout (default=60 seconds)
391+
to reach RUNNING
392+
"""
393+
# check if any of the processes that was started did not make it and remove RUNNING ones.
394+
if self.process_started_dict:
395+
for process_name, process_object in list(self.process_started_dict.items()):
396+
if process_object.state is ProcessStates.RUNNING:
397+
del self.process_started_dict[process_name]
398+
# handle timeout error.
399+
elif (time.time() - process_object.laststart) >= process_object.config.spawn_timeout:
400+
self._timeout_process(process_name, process_object)
401+
# notify user about waiting
402+
elif (time.time() - process_object.laststart) >= process_object.notify_timer:
403+
self._notfiy_user_about_waiting(process_name, process_object)
404+
405+
def _timeout_process(self, process_name, process_object):
406+
msg = ("timeout: dependee process {} in {} did not reach RUNNING within {} seconds, dependees {} are not spawned"
407+
.format(process_name,
408+
getProcessStateDescription(process_object.state),
409+
process_object.config.spawn_timeout,
410+
[process for process in self.process_spawn_dict.keys()]))
411+
process_object.config.options.logger.warn(msg)
412+
process_object.record_spawnerr(
413+
'timeout: Process {} did not reach RUNNING state within {} seconds'
414+
.format(process_name,
415+
process_object.config.spawn_timeout))
416+
process_object.change_state(ProcessStates.FATAL)
417+
for process_name, process_object in self.process_spawn_dict.items():
418+
process_object.record_spawnerr(
419+
'Dependee process did not start - set FATAL state for {}'
420+
.format(process_name))
421+
process_object.change_state(ProcessStates.FATAL)
422+
self.process_spawn_dict = dict()
423+
self.process_started_dict = dict()
424+
425+
def _notfiy_user_about_waiting(self, process_name, process_object):
426+
process_object.notify_timer += 5
427+
msg = ("waiting for dependee process {} in {} state to be RUNNING"
428+
.format(process_name,
429+
getProcessStateDescription(process_object.state)))
430+
process_object.config.options.logger.info(msg)
431+
319432
def timeslice(period, when):
320433
return int(when - (when % period))
321434

0 commit comments

Comments
 (0)