@@ -269,7 +269,8 @@ def runforever(self):
269269 for group in pgroups :
270270 group .transition (self )
271271
272- self .spawn_dependee_queue ()
272+ self ._spawn_dependee_queue ()
273+ self ._handle_spawn_timeout ()
273274
274275 self .reap ()
275276 self .handle_signal ()
@@ -346,71 +347,85 @@ def handle_signal(self):
346347 def get_state (self ):
347348 return self .options .mood
348349
349- def spawn_dependee_queue (self ):
350+ def _spawn_dependee_queue (self ):
350351 """
351- check for processes that are sheduled but not started
352- if the queue is well ordered just pop and spawn all that are ready,
353- Currently ordering is not considered when adding to queue -
354- therefore just iterate over queue and spawn all ready processes.
352+ Iterate over processes that are not started but added to
353+ process_spawn_dict. Spawn all processes which are ready
354+ (All dependees RUNNING or process without dependees)
355355 """
356356 if self .process_spawn_dict :
357357 for process_name , process_object in list (self .process_spawn_dict .items ()):
358358 if process_object .config .depends_on is not None :
359359 if any ([dependee .state is ProcessStates .FATAL for dependee in
360360 process_object .config .depends_on .values ()]):
361- for process_name , process_object in self .process_spawn_dict .items ():
362- process_object .record_spawnerr (
363- 'Dependee process did not start - set FATAL state for {}'
364- .format (process_name ))
365- process_object .change_state (ProcessStates .FATAL )
366- self .process_spawn_set = set ()
367- self .process_spawn_dict = dict ()
361+ self ._set_fatal_state_and_empty_queue ()
368362 break
369363 if all ([dependee .state is ProcessStates .RUNNING for dependee in
370364 process_object .config .depends_on .values ()]):
371- self .process_started_dict [process_name ] = process_object
372- del self .process_spawn_dict [process_name ]
373- process_object .spawn (self )
374- process_object .notify_timer = 5
365+ self ._spawn_process_from_process_dict ()
375366 else :
376- self .process_started_dict [process_name ] = process_object
377- del self .process_spawn_dict [process_name ]
378- process_object .spawn (self )
379- process_object .notify_timer = 5
367+ self ._spawn_process_from_process_dict ()
368+
369+ def _spawn_process_from_process_dict (self , process_name , process_object ):
370+ self .process_started_dict [process_name ] = process_object
371+ del self .process_spawn_dict [process_name ]
372+ process_object .spawn (self )
373+ process_object .notify_timer = 5
374+
375+ def _set_fatal_state_and_empty_queue (self ):
376+ for process_name , process_object in self .process_spawn_dict .items ():
377+ process_object .record_spawnerr (
378+ 'Dependee process did not start - set FATAL state for {}'
379+ .format (process_name ))
380+ process_object .change_state (ProcessStates .FATAL )
381+ self .process_spawn_set = set ()
382+ self .process_spawn_dict = dict ()
380383
384+ def _handle_spawn_timeout (self ):
385+ """
386+ Log info message each 5 seconds if some process is waiting on a dependee
387+ Timeout if a process needs longer than spawn_timeout (default=60 seconds)
388+ to reach RUNNING
389+ """
381390 # check if any of the processes that was started did not make it and remove RUNNING ones.
382391 if self .process_started_dict :
383- for started_process_name , started_process_object in list (self .process_started_dict .items ()):
384- # if started_process in RUNNING - remove from set
385- if started_process_object .state is ProcessStates .RUNNING :
386- del self .process_started_dict [started_process_name ]
392+ for process_name , process_object in list (self .process_started_dict .items ()):
393+ if process_object .state is ProcessStates .RUNNING :
394+ del self .process_started_dict [process_name ]
387395 # handle timeout error.
388- elif (time .time () - started_process_object .laststart ) >= started_process_object .config .spawn_timeout :
389- msg = (
390- "timeout: dependee process {} in {} state did not reach RUNNING within {} seconds, dependees {} are not spawned"
391- .format (started_process_name ,
392- getProcessStateDescription (started_process_object .state ),
393- started_process_object .config .spawn_timeout ,
394- [process for process in self .process_spawn_dict .keys ()]))
395- started_process_object .config .options .logger .warn (msg )
396- started_process_object .record_spawnerr (
397- 'timeout: Process {} did not reach RUNNING state within {} seconds' .format (
398- started_process_name , started_process_object .config .spawn_timeout ))
399- started_process_object .change_state (ProcessStates .FATAL )
400- for process_name , process_object in self .process_spawn_dict .items ():
401- process_object .record_spawnerr (
402- 'Dependee process did not start - set FATAL state for {}' .format (
403- process_name ))
404- process_object .change_state (ProcessStates .FATAL )
405- self .process_spawn_dict = dict ()
406- self .process_started_dict = dict ()
396+ elif (time .time () - process_object .laststart ) >= process_object .config .spawn_timeout :
397+ self ._timeout_process (process_name , process_object )
407398 # notify user about waiting
408- elif (time .time () - started_process_object .laststart ) >= started_process_object .notify_timer :
409- started_process_object .notify_timer += 5
410- msg = ("waiting for dependee process {} in {} state to be RUNNING"
411- .format (started_process_name ,
412- getProcessStateDescription (started_process_object .state )))
413- started_process_object .config .options .logger .info (msg )
399+ elif (time .time () - process_object .laststart ) >= process_object .notify_timer :
400+ self ._notfiy_user_about_waiting (process_name , process_object )
401+
402+ def _timeout_process (self , process_name , process_object ):
403+ msg = ("""timeout: dependee process {} in {} did not reach RUNNING
404+ within {} seconds, dependees {} are not spawned"""
405+ .format (process_name ,
406+ getProcessStateDescription (process_object .state ),
407+ process_object .config .spawn_timeout ,
408+ [process for process in self .process_spawn_dict .keys ()]))
409+ process_object .config .options .logger .warn (msg )
410+ process_object .record_spawnerr (
411+ 'timeout: Process {} did not reach RUNNING state within {} seconds'
412+ .format (process_name ,
413+ process_object .config .spawn_timeout ))
414+ process_object .change_state (ProcessStates .FATAL )
415+ for process_name , process_object in self .process_spawn_dict .items ():
416+ process_object .record_spawnerr (
417+ 'Dependee process did not start - set FATAL state for {}'
418+ .format (process_name ))
419+ process_object .change_state (ProcessStates .FATAL )
420+ self .process_spawn_dict = dict ()
421+ self .process_started_dict = dict ()
422+
423+ def _notfiy_user_about_waiting (self , process_name , process_object ):
424+ process_object .notify_timer += 5
425+ msg = ("waiting for dependee process {} in {} state to be RUNNING"
426+ .format (process_name ,
427+ getProcessStateDescription (process_object .state )))
428+ process_object .config .options .logger .info (msg )
414429
415430def timeslice (period , when ):
416431 return int (when - (when % period ))
0 commit comments