@@ -60,6 +60,7 @@ def __init__(self, options):
6060 self .ticks = {}
6161 self .process_spawn_dict = dict ()
6262 self .process_started_dict = dict ()
63+ self .abort_queing = None
6364
6465 def main (self ):
6566 if not self .options .first :
@@ -90,7 +91,7 @@ def run(self):
9091 for config in self .options .process_group_configs :
9192 self .add_process_group (config )
9293 # add processes to directed graph, to check for dependency cycles
93- g = Graph (len (self .options .process_group_configs ))
94+ self . g = Graph (len (self .options .process_group_configs ))
9495 # replace depends_on string with actual process object
9596 for config in (self .options .process_group_configs ):
9697 # check dependencies for all programs in group:
@@ -105,11 +106,11 @@ def run(self):
105106 dependent_group , dependent_process = process .split (":" )
106107 except :
107108 dependent_group = dependent_process = process
108- g .addEdge (config .process_configs [conf [0 ]].name , dependent_process )
109+ self . g .addEdge (config .process_configs [conf [0 ]].name , dependent_process )
109110 process_dict [dependent_process ] = self .process_groups [dependent_group ].processes [dependent_process ]
110111 config .process_configs [conf [0 ]].depends_on = process_dict
111112 # check for cyclical process dependencies
112- if g .cyclic () == 1 :
113+ if self . g .cyclic () == 1 :
113114 raise AttributeError ('Process config contains dependeny cycle(s)! Check config files again!' )
114115
115116 self .options .openhttpservers (self )
@@ -356,16 +357,23 @@ def _spawn_dependee_queue(self):
356357 if self .process_spawn_dict :
357358 for process_name , process_object in list (self .process_spawn_dict .items ()):
358359 if process_object .config .depends_on is not None :
359- if any ([dependee .state is ProcessStates .FATAL for dependee in
360- process_object .config .depends_on .values ()]):
361- self ._set_fatal_state_and_empty_queue ()
360+ if self ._any_dependee_failed (process_object ):
361+ self ._empty_queue ()
362362 break
363- if all ([dependee .state is ProcessStates .RUNNING for dependee in
364- process_object .config .depends_on .values ()]):
363+ if self ._all_dependees_running (process_object ):
365364 self ._spawn_process_from_process_dict (process_name , process_object )
366365 else :
367366 self ._spawn_process_from_process_dict (process_name , process_object )
368367
368+ def _any_dependee_failed (self , process_object ):
369+ return any ([dependee .state is ProcessStates .BACKOFF or dependee .state
370+ is ProcessStates .FATAL for dependee in
371+ process_object .config .depends_on .values ()])
372+
373+ def _all_dependees_running (self , process_object ):
374+ return all ([dependee .state is ProcessStates .RUNNING for dependee in
375+ process_object .config .depends_on .values ()])
376+
369377 def _spawn_process_from_process_dict (self , process_name , process_object ):
370378 self .process_started_dict [process_name ] = process_object
371379 del self .process_spawn_dict [process_name ]
@@ -375,12 +383,7 @@ def _spawn_process_from_process_dict(self, process_name, process_object):
375383 process_object .spawn (self )
376384 process_object .notify_timer = 5
377385
378- def _set_fatal_state_and_empty_queue (self ):
379- for process_name , process_object in self .process_spawn_dict .items ():
380- process_object .record_spawnerr (
381- 'Dependee process did not start - set FATAL state for {}'
382- .format (process_name ))
383- process_object .change_state (ProcessStates .FATAL )
386+ def _empty_queue (self ):
384387 self .process_spawn_set = set ()
385388 self .process_spawn_dict = dict ()
386389
@@ -390,37 +393,55 @@ def _handle_spawn_timeout(self):
390393 Timeout if a process needs longer than spawn_timeout (default=60 seconds)
391394 to reach RUNNING
392395 """
393- # check if any of the processes that was started did not make it and remove RUNNING ones.
396+ # check if any of the processes which was started did not make it and
397+ # remove RUNNING processes from the process_started_dict.
398+ if self .abort_queing is not None :
399+ self .abort_queing .change_state (ProcessStates .FATAL )
400+ self .abort_queing = None
394401 if self .process_started_dict :
395402 for process_name , process_object in list (self .process_started_dict .items ()):
396403 if process_object .state is ProcessStates .RUNNING :
397404 del self .process_started_dict [process_name ]
398405 # handle timeout error.
399406 elif (time .time () - process_object .laststart ) >= process_object .config .spawn_timeout :
400407 self ._timeout_process (process_name , process_object )
408+ del self .process_started_dict [process_name ]
409+ break
401410 # notify user about waiting
402411 elif (time .time () - process_object .laststart ) >= process_object .notify_timer :
403412 self ._notfiy_user_about_waiting (process_name , process_object )
404413
405414 def _timeout_process (self , process_name , process_object ):
406- msg = ("timeout: dependee process {} in {} did not reach RUNNING within {} seconds, dependees {} are not spawned"
415+ msg = ("timeout: dependee process {} in {} did not reach RUNNING within"
416+ " {} seconds, checking now if dependees {} can be spawned"
407417 .format (process_name ,
408418 getProcessStateDescription (process_object .state ),
409419 process_object .config .spawn_timeout ,
410420 [process for process in self .process_spawn_dict .keys ()]))
411421 process_object .config .options .logger .warn (msg )
412- process_object .record_spawnerr (
413- 'timeout: Process {} did not reach RUNNING state within {} seconds'
414- . format ( process_name ,
415- process_object . config . spawn_timeout ))
416- process_object . change_state ( ProcessStates . FATAL )
422+ process_object .stop ()
423+ # keep track of the process that timed out - will be set to FATAL in
424+ # the next iteration
425+ self . abort_queing = process_object
426+ keys_to_remove = []
417427 for process_name , process_object in self .process_spawn_dict .items ():
418- process_object .record_spawnerr (
419- 'Dependee process did not start - set FATAL state for {}'
420- .format (process_name ))
421- process_object .change_state (ProcessStates .FATAL )
422- self .process_spawn_dict = dict ()
423- self .process_started_dict = dict ()
428+ if self .g .connected (process_name , self .abort_queing .config .name ):
429+ keys_to_remove .append (process_name )
430+ msg = ("{} will not be spawned, because {} did not "
431+ "successfully start" .format (process_name , self .abort_queing .config .name ))
432+ process_object .config .options .logger .warn (msg )
433+ for key in keys_to_remove :
434+ del self .process_spawn_dict [key ]
435+ keys_to_remove = []
436+ for process_name , process_object in self .process_started_dict .items ():
437+ if self .g .connected (process_name , self .abort_queing .config .name ):
438+ keys_to_remove .append (process_name )
439+ msg = ("stopping {}, because {} did not successfully "
440+ "start" .format (process_name , self .abort_queing .config .name ))
441+ process_object .config .options .logger .warn (msg )
442+ process_object .stop ()
443+ for key in keys_to_remove :
444+ del self .process_started_dict [key ]
424445
425446 def _notfiy_user_about_waiting (self , process_name , process_object ):
426447 process_object .notify_timer += 5
0 commit comments