Skip to content

Commit 06e5889

Browse files
committed
Initial support of workflow deployment tool
1 parent 223d32e commit 06e5889

13 files changed

+1220
-235
lines changed

pyproject.toml

+3-1
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,19 @@ AMSDBStage = "ams_wf.AMSDBStage:main"
2525
AMSOrchestrator = "ams_wf.AMSOrchestrator:main"
2626
AMSStore = "ams_wf.AMSStore:main"
2727
AMSTrain = "ams_wf.AMSTrain:main"
28+
AMSDeploy = "ams_wf.AMSDeploy:main"
2829

2930
[project.urls]
3031
"Homepage" = "https://github.com/LLNL/AMS/"
3132

3233
[tool.setuptools]
33-
package-dir = {"" = "src/AMSWorkflow"}
34+
package-dir = {"" = "src/AMSWorkflow/"}
3435
packages = ["ams_wf", "ams"]
3536

3637
# Black formatting
3738
[tool.black]
3839
line-length = 120
40+
preview = true
3941
include = '\.pyi?$'
4042
exclude = '''
4143
/(

scripts/bootstrap_flux.sh

+1-2
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,7 @@ if [[ "$MACHINE" == "lassen" ]] ; then
136136
module load pmi-shim
137137

138138
PMIX_MCA_gds="^ds12,ds21" \
139-
jsrun -a 1 -c ALL_CPUS -g ALL_GPUS -n ${FLUX_NODES} \
140-
--bind=none --smpiargs="-disable_gpu_hooks" \
139+
jsrun -a 1 -c ALL_CPUS -g ALL_GPUS -n ${FLUX_NODES} --bind=none --smpiargs="-disable_gpu_hooks" \
141140
flux start -o,-S,log-filename=$FLUX_LOG -v $FLUX_SLEEP_WRAPPER $FLUX_SERVER &
142141
elif [[ "$MACHINE" == "pascal" || "$MACHINE" == "ruby" ]] ; then
143142
srun -n ${FLUX_NODES} -N ${FLUX_NODES} --pty --mpi=none --mpibind=off \

scripts/rmq_add_secrets.sh

+1-1
Original file line numberDiff line numberDiff line change
@@ -66,4 +66,4 @@ else
6666
echo "[$(date +'%m%d%Y-%T')@$(hostname)] Added secrets successfully."
6767
fi
6868

69-
# check_cmd oc logout
69+
# check_cmd oc logout

src/AMSWorkflow/ams/deploy_tools.py

+96
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
import sys
2+
import os
3+
import select
4+
import tempfile
5+
import subprocess as sp
6+
from enum import Enum
7+
8+
9+
class RootSched(Enum):
10+
SLURM = 1
11+
LSF = 2
12+
13+
14+
def _run_daemon(cmd, shell=False):
15+
print(f"Going to run {cmd}")
16+
proc = sp.Popen(
17+
cmd,
18+
shell=shell,
19+
stdin=None,
20+
stdout=sp.PIPE,
21+
stderr=sp.PIPE,
22+
bufsize=1,
23+
text=True,
24+
universal_newlines=True,
25+
close_fds=True,
26+
)
27+
return proc
28+
29+
30+
def _read_flux_uri(proc, timeout=5):
31+
"""
32+
Reads the first line from the flux start command's stdout and puts it into a queue.
33+
:param timeout: The maximum of time we wait for writting to stdout
34+
:param proc: The process from which to read stdout.
35+
"""
36+
37+
# Time to wait for I/O plus the time already waited
38+
total_wait_time = 0
39+
poll_interval = 0.5 # Poll interval in seconds
40+
41+
while total_wait_time < timeout:
42+
# Check if there is data to read from stdout
43+
ready_to_read = select.select([proc.stdout], [], [], poll_interval)[0]
44+
if ready_to_read:
45+
first_line = proc.stdout.readline()
46+
if "ssh" in first_line:
47+
return first_line
48+
total_wait_time += poll_interval
49+
return None
50+
51+
52+
def spawn_rmq_broker(flux_uri):
53+
# TODO We need to implement this, my current specification is limited
54+
# We probably need to access to flux, to spawn a daemon inside the flux allocation
55+
raise NotImplementedError("spawn_rmq_broker is not implemented, spawn it manually and provide the credentials")
56+
return None, None
57+
58+
59+
def start_flux(scheduler, nnodes=None):
60+
def bootstrap_with_slurm(nnodes):
61+
def generate_sleep_script():
62+
script_fn = tempfile.NamedTemporaryFile(prefix="ams_flux_bootstrap", suffix=".sh", delete=False, mode="w")
63+
script = "\n".join(
64+
[
65+
"#!/usr/bin/env bash",
66+
"echo \"ssh://$(hostname)$(flux getattr local-uri | sed -e 's!local://!!')\"",
67+
"sleep inf",
68+
]
69+
)
70+
script_fn.write(script)
71+
script_fn.close()
72+
os.chmod(script_fn.name, 0o777)
73+
return script_fn.name
74+
75+
if nnodes is None:
76+
nnodes = os.environ.get("SLURM_NNODES", None)
77+
78+
bootstrap_cmd = f"srun -N {nnodes} -n {nnodes} --pty --mpi=none --mpibind=off flux start"
79+
script = generate_sleep_script()
80+
print(f"Script Name is {script}")
81+
flux_get_uri_cmd = f"{generate_sleep_script()}"
82+
83+
daemon = _run_daemon(f'{bootstrap_cmd} "{flux_get_uri_cmd}"', shell=True)
84+
flux_uri = _read_flux_uri(daemon, timeout=10)
85+
print("Got flux uri: ", flux_uri)
86+
if flux_uri is None:
87+
print("Fatal Error, Cannot read flux")
88+
daemon.terminate()
89+
raise RuntimeError("Cannot Get FLUX URI")
90+
91+
return daemon, flux_uri, script
92+
93+
if scheduler == RootSched.SLURM:
94+
return bootstrap_with_slurm(nnodes)
95+
96+
raise NotImplementedError("We are only supporting bootstrap through SLURM")

src/AMSWorkflow/ams/job_types.py

+163
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
from dataclasses import dataclass
2+
from pathlib import Path
3+
import os
4+
import sys
5+
import shutil
6+
from warnings import warn
7+
from typing import List, Dict, Optional, ClassVar
8+
from flux.job import JobspecV1
9+
import flux.job as fjob
10+
11+
from ams.loader import load_class
12+
13+
14+
@dataclass(kw_only=True)
15+
class BaseJob:
16+
"""
17+
Class Modeling a Job scheduled by AMS. There can be five types of JOBs (Physics, Stagers, Training, RMQServer and TrainingDispatcher)
18+
"""
19+
20+
name: str
21+
executable: str
22+
nodes: int
23+
tasks_per_node: int
24+
args: List[str] = list()
25+
exclusive: bool = True
26+
cores_per_task: int = 1
27+
environ: Dict[str, str] = dict()
28+
orderId: ClassVar[int] = 0
29+
gpus_per_task: Optional[int] = None
30+
stdout: Optional[str] = None
31+
stderr: Optional[str] = None
32+
33+
def _construct_command(self):
34+
command = [self.executable] + self.args
35+
return command
36+
37+
def _construct_environ(self, forward_environ):
38+
environ = self.environ
39+
if forward_environ is not None:
40+
if not isinstance(forward_environ, type(os.environ)) and not isinstance(forward_environ, dict):
41+
raise TypeError(f"Unsupported forward_environ type ({type(forward_environ)})")
42+
for k, v in forward_environ:
43+
if k in environ:
44+
warn(f"Key {k} already exists in environment ({environ[k]}), prioritizing existing one ({v})")
45+
else:
46+
environ[k] = forward_environ[k]
47+
return environ
48+
49+
def _construct_redirect_paths(self, redirectDir):
50+
stdDir = Path.cwd()
51+
if redirectDir is not None:
52+
stdDir = Path(redirectDir)
53+
54+
if self.stdout is None:
55+
stdout = f"{stdDir}/{self.name}_{BaseJob.orderId}.out"
56+
else:
57+
stdout = f"{stdDir}/{self.stdout}_{BaseJob.orderId}.out"
58+
59+
if self.stderr is None:
60+
stderr = f"{stdDir}/{self.name}_{BaseJob.orderId}.err"
61+
else:
62+
stderr = f"{stdDir}/{self.stderr}_{BaseJob.orderId}.err"
63+
64+
BaseJob.orderId += 1
65+
66+
return stdout, stderr
67+
68+
def schedule(self, flux_handle, forward_environ=None, redirectDir=None, pre_signed=False, waitable=True):
69+
jobspec = JobspecV1.from_command(
70+
command=self._construct_command(),
71+
num_tasks=self.tasks_per_node * self.nodes,
72+
num_nodes=self.nodes,
73+
cores_per_task=self.cores_per_task,
74+
gpus_per_task=self.gpus_per_task,
75+
exclusive=self.exclusive,
76+
)
77+
78+
stdout, stderr = self._construct_redirect_paths(redirectDir)
79+
environ = self._construct_environ(forward_environ)
80+
jobspec.environment = environ
81+
jobspec.stdout = stdout
82+
jobspec.stderr = stderr
83+
84+
return jobspec, fjob.submit(flux_handle, jobspec, pre_signed=pre_signed, waitable=waitable)
85+
86+
87+
@dataclass(kw_only=True)
88+
class PhysicsJob(BaseJob):
89+
def _verify(self):
90+
is_executable = shutil.which(self.executable) is not None
91+
is_path = Path(self.executable).is_file()
92+
return is_executable or is_path
93+
94+
def __post_init__(self):
95+
if not self._verify():
96+
raise RuntimeError(
97+
f"[PhysicsJob] executable is neither a executable nor a system command {self.executable}"
98+
)
99+
100+
101+
@dataclass(kw_only=True, init=False)
102+
class Stager(BaseJob):
103+
def _get_stager_default_cores(self):
104+
"""
105+
We need the following cores:
106+
1 RMQ Client to receive messages
107+
1 Process to store to filesystem
108+
1 Process to make public to kosh
109+
"""
110+
return 3
111+
112+
def _verify(self, pruner_path, pruner_cls):
113+
assert Path(pruner_path).is_file(), "Path to Pruner class should exist"
114+
user_class = load_class(pruner_path, pruner_cls)
115+
print(f"Loaded Pruner Class {user_class.__name__}")
116+
117+
def __init__(
118+
self,
119+
name: str,
120+
num_cores: int,
121+
db_path: str,
122+
pruner_cls: str,
123+
pruner_path: str,
124+
pruner_args: List[str],
125+
num_gpus: Optional[int],
126+
**kwargs,
127+
):
128+
executable = sys.executable
129+
130+
self._verify(pruner_path, pruner_cls)
131+
132+
# TODO: Here we are accessing both the stager arguments and the pruner_arguments. Is is an oppotunity to emit
133+
# an early error message. But, this would require extending argparse or something else. Noting for future reference
134+
cli_arguments = [
135+
"-m",
136+
"ams_wf.AMSDBStage",
137+
"-db",
138+
db_path,
139+
"--policy",
140+
"process",
141+
"--dest",
142+
str(Path(db_path) / Path("candidates")),
143+
"--db-type",
144+
"dhdf5",
145+
"--store",
146+
"-m",
147+
"fs",
148+
"--class",
149+
pruner_cls,
150+
]
151+
cli_arguments += pruner_args
152+
153+
num_cores = self._get_stager_default_cores() + num_cores
154+
super().__init__(
155+
name=name,
156+
executable=executable,
157+
nodes=1,
158+
tasks_per_node=1,
159+
cores_per_task=num_cores,
160+
args=cli_arguments,
161+
gpus_per_task=num_gpus,
162+
**kwargs,
163+
)

0 commit comments

Comments
 (0)