@@ -74,7 +74,17 @@ def name(self):
74
74
return self ._name
75
75
76
76
77
- class SequentialExecutor :
77
+ class AMSJobScheduler :
78
+ def __init__ (self , stager_job_generator , config ):
79
+ self ._flux_handle = flux .Flux ()
80
+ logger .debug ("Preparing user app job specification" )
81
+ self ._user_app = JobSpec ("user_app" , config ["user_app" ], exclusive = True )
82
+ self ._ml_train = JobSpec ("ml_training" , config ["ml_training" ], exclusive = True )
83
+ self ._ml_pruner = JobSpec ("ml_pruner" , config ["ml_pruner" ], exclusive = True )
84
+ self ._stager = JobSpec ("ams_stager" , stager_job_generator (config ), exclusive = True )
85
+
86
+
87
+ class AMSSequentialJobScheduler (AMSJobScheduler ):
78
88
def __init__ (self , config ):
79
89
def create_fs_stager_job_descr (user_descr ):
80
90
config = dict ()
@@ -97,7 +107,7 @@ def create_fs_stager_job_descr(user_descr):
97
107
"--class" ,
98
108
user_descr ["stager" ]["pruner_class" ],
99
109
"--load" ,
100
- "./build_borax/examples/prune.py" ,
110
+ user_descr [ "stager" ][ "pruner_path" ] ,
101
111
] + user_descr ["stager" ]["pruner_args" ]
102
112
103
113
config ["resources" ] = {
@@ -110,17 +120,7 @@ def create_fs_stager_job_descr(user_descr):
110
120
111
121
return config
112
122
113
- self ._flux_handle = flux .Flux ()
114
- logger .debug ("Preparing user app job specification" )
115
- self ._user_app = JobSpec ("user_app" , config ["user_app" ], exclusive = True )
116
- self ._ml_train = JobSpec ("ml_training" , config ["ml_training" ], exclusive = True )
117
- self ._ml_pruner = JobSpec ("ml_pruner" , config ["ml_pruner" ], exclusive = True )
118
- self ._stager = JobSpec ("ams_stager" , create_fs_stager_job_descr (config ), exclusive = True )
119
-
120
- # logger.debug("Preparing ml sub selection specification")
121
- # self._ml_subselect = JobSpec("ml_subselect", config["ml_subselect"])
122
- # Build the pruning module
123
- # TODO Add pruner stage here
123
+ super ().__init__ (config , create_fs_stager_job_descr )
124
124
125
125
def execute (self ):
126
126
def execute_and_wait (job_descr , handle ):
@@ -134,16 +134,10 @@ def execute_and_wait(job_descr, handle):
134
134
return False
135
135
return True
136
136
137
- if not execute_and_wait (self ._user_app , self ._flux_handle ):
138
- return False
139
-
140
- if not execute_and_wait (self ._stager , self ._flux_handle ):
141
- return False
142
- if not execute_and_wait (self ._ml_pruner , self ._flux_handle ):
143
- return False
137
+ for step in [self ._user_app , self ._stager , self ._ml_pruner , self ._ml_train ]:
138
+ if not execute_and_wait (step , self ._flux_handle ):
139
+ return False
144
140
145
- if not execute_and_wait (self ._ml_train , self ._flux_handle ):
146
- return False
147
141
return True
148
142
149
143
@@ -161,7 +155,7 @@ def deploy(config):
161
155
# TODO Launch concurrent execution
162
156
pass
163
157
elif config ["execution_mode" ] == "sequential" :
164
- executor = SequentialExecutor (config )
158
+ executor = AMSSequentialJobScheduler (config )
165
159
return executor .execute ()
166
160
167
161
0 commit comments