Skip to content

Commit b231e93

Browse files
committed
WIP, just for reference
1 parent aa35f04 commit b231e93

File tree

1 file changed

+89
-11
lines changed

1 file changed

+89
-11
lines changed

src/AMSWorkflow/ams_wf/ams_deploy.py

+89-11
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import flux
1010
from flux.job import JobspecV1
1111
import flux.job as fjob
12-
12+
import signal
1313
from ams.store import CreateStore, AMSDataStore
1414

1515
logger = logging.getLogger(__name__)
@@ -84,6 +84,91 @@ def __init__(self, stager_job_generator, config):
8484
self._stager = JobSpec("ams_stager", stager_job_generator(config), exclusive=True)
8585

8686

87+
class AMSConcurrentJobScheduler(AMSJobScheduler):
88+
def __init__(self, config):
89+
def create_rmq_stager_job_descr(user_descr):
90+
config = dict()
91+
92+
# TODO: This is SUPER ugly and not to mention
93+
# potenitally buggy. We will need to clean this up
94+
# once we have all pieces in places (including AMSlib json initialization)
95+
with open("rmq_config.json", "w") as fd:
96+
json.dump(user_descr["stager"]["rmq"], fd, indent=6)
97+
98+
rmq_config_path = Path("rmq_config.json").resolve()
99+
100+
config["executable"] = sys.executable
101+
config["arguments"] = [
102+
"-m",
103+
"ams_wf.AMSDBStage",
104+
"-db",
105+
user_descr["db"]["path"],
106+
"--policy",
107+
"process",
108+
"--dest",
109+
str(Path(user_descr["db"]["path"]) / Path("candidates")),
110+
"--db-type",
111+
"dhdf5",
112+
"--store",
113+
"--mechanism",
114+
"network",
115+
"--class",
116+
user_descr["stager"]["pruner_class"],
117+
"--cert",
118+
user_descr["stager"]["rmq"]["rabbitmq-cert"],
119+
"--creds",
120+
str(rmq_config_path),
121+
"--queue",
122+
user_descr["stager"]["rmq"]["rabbitmq-outbound-queue"],
123+
"--load",
124+
user_descr["stager"]["pruner_path"],
125+
] + user_descr["stager"]["pruner_args"]
126+
127+
config["resources"] = {
128+
"num_nodes": 1,
129+
"num_processes_per_node": 1,
130+
"num_tasks": 1,
131+
"cores_per_task": 5,
132+
"gpus_per_task": 0,
133+
}
134+
135+
return config
136+
137+
super().__init__(create_rmq_stager_job_descr, config)
138+
139+
def execute(self):
140+
def execute_and_wait(job_descr, handle):
141+
jid = job_descr.start(handle)
142+
if not result.success:
143+
logger.critical(f"Unsuccessfull Job Execution: {job_descr.name}")
144+
logger.debug(f"Error code of failed job {result.jobid} is {result.errstr}")
145+
logger.debug(f"stdout is redirected to: {job_descr.stdout}")
146+
logger.debug(f"stderr is redirected to: {job_descr.stderr}")
147+
return False
148+
return True
149+
150+
# We start stager first
151+
logger.debug("Start stager")
152+
stager_id = self._stager.start(self._flux_handle)
153+
logger.debug(f"Stager job id is {stager_id}")
154+
155+
logger.debug("Start user app")
156+
user_app_id = self._user_app.start(self._flux_handle)
157+
logger.debug(f"User App job id is {user_app_id}")
158+
159+
# We are actively waiting for main application to terminate
160+
logger.debug("Wait for user application")
161+
result = fjob.wait(self._flux_handle, jobid=user_app_id)
162+
163+
# stager handles SIGTERM, kill it
164+
kill_status = fjob.kill_async(self._flux_handle, jobid=stager_id, signum=signal.SIGTERM)
165+
logger.debug("Waiting for job to be killed")
166+
print(kill_status.get())
167+
fjob.wait(self._flux_handle, jobid=stager_id)
168+
169+
return True
170+
171+
87172
class AMSSequentialJobScheduler(AMSJobScheduler):
88173
def __init__(self, config):
89174
def create_fs_stager_job_descr(user_descr):
@@ -120,7 +205,7 @@ def create_fs_stager_job_descr(user_descr):
120205

121206
return config
122207

123-
super().__init__(config, create_fs_stager_job_descr)
208+
super().__init__(create_fs_stager_job_descr, config)
124209

125210
def execute(self):
126211
def execute_and_wait(job_descr, handle):
@@ -152,11 +237,10 @@ def deploy(config):
152237
# the server is up and running
153238
logger.info(f"")
154239
if config["execution_mode"] == "concurrent":
155-
# TODO Launch concurrent execution
156-
pass
240+
executor = AMSConcurrentJobScheduler(config)
157241
elif config["execution_mode"] == "sequential":
158242
executor = AMSSequentialJobScheduler(config)
159-
return executor.execute()
243+
return executor.execute()
160244

161245

162246
def bootstrap(cmd, scheduler, flux_log):
@@ -241,10 +325,6 @@ def validate_step_field(level, config):
241325
if config["stager"]["mode"] == "filesystem":
242326
logger.critical("Database is concurrent but the stager polls data from filesystem")
243327
return False
244-
elif config["stager"]["mode"] == "rmq":
245-
if "num_clients" not in config["stager"]:
246-
logger.critical("When stager set in mode 'rmq' you need to define the number of rmq clients")
247-
return False
248328

249329
if config["stager"]["mode"] == "rmq":
250330
rmq_config = config["stager"]["rmq"]
@@ -432,8 +512,6 @@ def main():
432512
ret = not args.func(args)
433513
return ret
434514

435-
sys.exit(main())
436-
437515

438516
if __name__ == "__main__":
439517
sys.exit(main())

0 commit comments

Comments
 (0)