Skip to content

Commit a5b4650

Browse files
committed
Use inheritance to re-use code.
1 parent 78ed657 commit a5b4650

File tree

1 file changed

+17
-23
lines changed

1 file changed

+17
-23
lines changed

src/AMSWorkflow/ams_wf/ams_deploy.py

+17-23
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,17 @@ def name(self):
7474
return self._name
7575

7676

77-
class SequentialExecutor:
77+
class AMSJobScheduler:
78+
def __init__(self, stager_job_generator, config):
79+
self._flux_handle = flux.Flux()
80+
logger.debug("Preparing user app job specification")
81+
self._user_app = JobSpec("user_app", config["user_app"], exclusive=True)
82+
self._ml_train = JobSpec("ml_training", config["ml_training"], exclusive=True)
83+
self._ml_pruner = JobSpec("ml_pruner", config["ml_pruner"], exclusive=True)
84+
self._stager = JobSpec("ams_stager", stager_job_generator(config), exclusive=True)
85+
86+
87+
class AMSSequentialJobScheduler(AMSJobScheduler):
7888
def __init__(self, config):
7989
def create_fs_stager_job_descr(user_descr):
8090
config = dict()
@@ -97,7 +107,7 @@ def create_fs_stager_job_descr(user_descr):
97107
"--class",
98108
user_descr["stager"]["pruner_class"],
99109
"--load",
100-
"./build_borax/examples/prune.py",
110+
user_descr["stager"]["pruner_path"],
101111
] + user_descr["stager"]["pruner_args"]
102112

103113
config["resources"] = {
@@ -110,17 +120,7 @@ def create_fs_stager_job_descr(user_descr):
110120

111121
return config
112122

113-
self._flux_handle = flux.Flux()
114-
logger.debug("Preparing user app job specification")
115-
self._user_app = JobSpec("user_app", config["user_app"], exclusive=True)
116-
self._ml_train = JobSpec("ml_training", config["ml_training"], exclusive=True)
117-
self._ml_pruner = JobSpec("ml_pruner", config["ml_pruner"], exclusive=True)
118-
self._stager = JobSpec("ams_stager", create_fs_stager_job_descr(config), exclusive=True)
119-
120-
# logger.debug("Preparing ml sub selection specification")
121-
# self._ml_subselect = JobSpec("ml_subselect", config["ml_subselect"])
122-
# Build the pruning module
123-
# TODO Add pruner stage here
123+
super().__init__(config, create_fs_stager_job_descr)
124124

125125
def execute(self):
126126
def execute_and_wait(job_descr, handle):
@@ -134,16 +134,10 @@ def execute_and_wait(job_descr, handle):
134134
return False
135135
return True
136136

137-
if not execute_and_wait(self._user_app, self._flux_handle):
138-
return False
139-
140-
if not execute_and_wait(self._stager, self._flux_handle):
141-
return False
142-
if not execute_and_wait(self._ml_pruner, self._flux_handle):
143-
return False
137+
for step in [self._user_app, self._stager, self._ml_pruner, self._ml_train]:
138+
if not execute_and_wait(step, self._flux_handle):
139+
return False
144140

145-
if not execute_and_wait(self._ml_train, self._flux_handle):
146-
return False
147141
return True
148142

149143

@@ -161,7 +155,7 @@ def deploy(config):
161155
# TODO Launch concurrent execution
162156
pass
163157
elif config["execution_mode"] == "sequential":
164-
executor = SequentialExecutor(config)
158+
executor = AMSSequentialJobScheduler(config)
165159
return executor.execute()
166160

167161

0 commit comments

Comments
 (0)