Skip to content

Commit d5d2eb6

Browse files
committed
STASH clusters
1 parent 484bc9a commit d5d2eb6

File tree

4 files changed

+805
-0
lines changed

4 files changed

+805
-0
lines changed

docs/src/index.md

+61
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,64 @@
11
# ParallelProcessingTools.jl
22

33
This Julia package provides some tools to ease multithreaded and distributed programming.
4+
5+
6+
## Compute cluster management
7+
8+
ParallelProcessingTools helps spin-up Julia compute clusters. It currently has support for clusters on localhost and on SLURM (uses `ClusterManagers.ElasticManager` internally).
9+
10+
On SLURM, `addworkers` will automatically try to perform a sensible thread-pinning (using the [ThreadPinning](https://github.com/carstenbauer/ThreadPinning.jl) package internally).
11+
12+
```julia
13+
using ParallelProcessingTools, Distributed
14+
15+
@always_everywhere begin
16+
using Distributions
17+
end
18+
19+
mode = ParallelProcessingTools.SlurmRun(slurm_flags = `--ntasks=4 --cpus-per-task=8 --mem-per-cpu=8G`)
20+
#ParallelProcessingTools.worker_start_command(mode)
21+
22+
# Add some workers:
23+
addworkers(mode)
24+
25+
# List resources:
26+
ParallelProcessingTools.worker_resources()
27+
28+
# Confirm that Distributions is loaded on workers:
29+
worker = last(workers())
30+
@fetchfrom worker Normal()
31+
32+
# Add some more workers:
33+
addworkers(mode)
34+
Table(ParallelProcessingTools.worker_resources())
35+
36+
# Add even more workers:
37+
addworkers(mode)
38+
Table(ParallelProcessingTools.worker_resources())
39+
```
40+
41+
And we can do SLURM batch scripts like this (e.g. "batchtest.jl"):
42+
43+
```julia
44+
#!/usr/bin/env -S julia --project=@SOME_JULIA_ENVIRONMENT --threads=8
45+
#SBATCH --ntasks=4 --cpus-per-task=8 --mem-per-cpu=8G
46+
47+
using ParallelProcessingTools, Distributed
48+
49+
@always_everywhere begin
50+
using ParallelProcessingTools
51+
end
52+
53+
addworkers()
54+
resources = ParallelProcessingTools.worker_resources()
55+
show(stdout, MIME"text/plain"(), ParallelProcessingTools.worker_resources())
56+
```
57+
58+
This should run with a simple
59+
60+
```shell
61+
sbatch -o out.txt batchtest.jl
62+
```
63+
64+
and "out.txt" should then contain a list of the worker resources.

src/ParallelProcessingTools.jl

+2
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ include("threadlocal.jl")
1515
include("onthreads.jl")
1616
include("onprocs.jl")
1717
include("workpartition.jl")
18+
include("workers.jl")
19+
include("slurm.jl")
1820
include("deprecated.jl")
1921

2022
end # module

src/slurm.jl

+267
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
# This file is a part of ParallelProcessingTools.jl, licensed under the MIT License (MIT).
2+
3+
"""
4+
ParallelProcessingTools.SlurmRun(;
5+
slurm_flags::Cmd = {defaults}
6+
julia_flags::Cmd = {defaults}
7+
dir = pwd()
8+
user_start::Bool = false
9+
timeout::Real = 60
10+
)
11+
12+
Mode to add worker processes via SLURM `srun`.
13+
14+
`srun` and Julia worker `julia` command line flags are inferred from SLURM
15+
environment variables (e.g. when inside of an `salloc` or batch job), as
16+
well as `slurm_flags` and `julia_flags`.
17+
18+
Workers are started with current directory set to `dir`.
19+
20+
Example:
21+
22+
```julia
23+
mode = SlurmRun(slurm_flags = `--ntasks=4 --cpus-per-task=8 --mem-per-cpu=8G`)
24+
addworkers(mode)
25+
```
26+
27+
If `user_start` is `true`, then the SLURM srun-command will not be run
28+
automatically, instead it will be logged via `@info` and the user is
29+
responsible for running it. This srun-command can also be retrieved via
30+
[`worker_start_command(mode)](@ref).
31+
"""
32+
@kwdef struct SlurmRun <: ElasticAddProcsMode
33+
slurm_flags::Cmd = _default_slurm_flags()
34+
julia_flags::Cmd = _default_julia_flags()
35+
dir = pwd()
36+
user_start::Bool = false
37+
timeout::Real = 60
38+
end
39+
40+
41+
const _g_slurm_nextjlstep = Base.Threads.Atomic{Int}(1)
42+
43+
function worker_start_command(mode::SlurmRun, manager::ClusterManagers.ElasticManager)
44+
slurm_flags = mode.slurm_flags
45+
julia_flags = mode.julia_flags
46+
dir = mode.dir
47+
48+
tc = _get_slurm_taskconf(slurm_flags, ENV)
49+
50+
n_workers = _slurm_nworkers(tc)
51+
mem_per_task = _slurm_mem_per_task(tc)
52+
53+
heap_size_hint_fraction = 0.5
54+
heap_size_hint_in_MB = isnothing(mem_per_task) ? nothing : ceil(Int, mem_per_task * heap_size_hint_fraction / 1024^2)
55+
jl_heap_size_hint_flag = isnothing(heap_size_hint_in_MB) ? `` : `--heap-size-hint=$(heap_size_hint_in_MB)M`
56+
57+
jl_threads_flag = isnothing(tc.cpus_per_task) ? `` : `--threads=$(tc.cpus_per_task)`
58+
59+
additional_julia_flags = `$jl_threads_flag $jl_heap_size_hint_flag $julia_flags`
60+
jlstep = atomic_add!(_g_slurm_nextjlstep, 1)
61+
jobname = "julia-$(getpid())-$jlstep"
62+
63+
worker_cmd = elastic_localworker_startcmd(manager; julia_flags = `$julia_flags $additional_julia_flags`)
64+
65+
return `srun --job-name=$jobname --chdir=$dir $slurm_flags $worker_cmd`, n_workers
66+
end
67+
68+
function _slurm_nworkers(tc::NamedTuple)
69+
if !isnothing(tc.n_tasks)
70+
tc.n_tasks
71+
elseif !isnothing(tc.n_nodes) && !isnothing(tc.ntasks_per_node)
72+
tc.n_nodes * tc.ntasks_per_node
73+
else
74+
throw(ArgumentError("Could not infer number of tasks/processes from SLURM environment and flags."))
75+
end
76+
end
77+
78+
function _slurm_mem_per_task(tc::NamedTuple)
79+
if !isnothing(tc.cpus_per_task) && !isnothing(tc.mem_per_cpu)
80+
tc.cpus_per_task * tc.mem_per_cpu
81+
elseif !isnothing(tc.n_nodes) && !isnothing(tc.mem_per_node) && !isnothing(tc.ntasks_per_node)
82+
div(tc.mem_per_node, tc.ntasks_per_node)
83+
elseif !isnothing(tc.n_nodes) && !isnothing(tc.mem_per_node) && !isnothing(tc.n_tasks)
84+
div(tc.n_nodes * tc.mem_per_node, tc.n_tasks)
85+
else
86+
nothing
87+
end
88+
end
89+
90+
91+
function ParallelProcessingTools.start_elastic_workers(mode::SlurmRun, manager::ClusterManagers.ElasticManager)
92+
srun_cmd, n_workers = worker_start_command(mode, manager)
93+
if mode.user_start
94+
@info "To add Julia worker processes (I'll wait for them), run: $srun_cmd"
95+
else
96+
@info "Starting SLURM job: $srun_cmd"
97+
srun_proc = open(srun_cmd)
98+
end
99+
return n_workers
100+
end
101+
102+
function worker_init_code(::SlurmRun)
103+
quote
104+
import ParallelProcessingTools
105+
ParallelProcessingTools.pinthreads_auto()
106+
end
107+
end
108+
109+
elastic_addprocs_timeout(mode::SlurmRun) = mode.timeout
110+
111+
112+
function _default_slurm_flags()
113+
# `srun` in `salloc`/`sbatch` doesn't seem to always pick up
114+
# SLURM_CPUS_PER_TASK, resulting in incorrect thread pinning. So we'll
115+
# set `--cpus-per-task` explicitly:
116+
haskey(ENV, "SLURM_CPUS_PER_TASK") ? `--cpus-per-task=$(ENV["SLURM_CPUS_PER_TASK"])` : ``
117+
end
118+
119+
120+
const _slurm_memunits = IdDict{Char,Int}('K' => 1024^1, 'M' => 1024^2, 'G' => 1024^3, 'T' => 1024^4)
121+
122+
const _slurm_memsize_regex = r"^([0-9]+)([KMGT])?$"
123+
function _slurm_parse_memoptval(memsize::AbstractString)
124+
s = strip(memsize)
125+
m = match(_slurm_memsize_regex, s)
126+
if isnothing(m)
127+
throw(ArgumentError("Invalid SLURM memory size specification \"$s\""))
128+
else
129+
value = parse(Int, m.captures[1])
130+
unitchar = only(something(m.captures[2], 'M'))
131+
unitmult = _slurm_memunits[unitchar]
132+
return value * unitmult
133+
end
134+
end
135+
_slurm_parse_memoptval(::Nothing) = nothing
136+
137+
_slurm_parse_intoptval(value::AbstractString) = parse(Int, value)
138+
_slurm_parse_intoptval(::Nothing) = nothing
139+
140+
function _slurm_parse_shortopt(opt::Char, args::Vector{String}, i::Int, default)
141+
if i <= lastindex(args)
142+
arg = args[i]
143+
if arg == "-$opt"
144+
if i < lastindex(args) && !startswith(args[i+1], "-")
145+
return args[i+1], i+2
146+
else
147+
throw(ArgumentError("Missing value for option \"-$opt\""))
148+
end
149+
elseif startswith(arg, "-$opt")
150+
if length(arg) > 2
151+
return arg[begin+2:end], i+1
152+
else
153+
throw(ArgumentError("Missing value for option \"-$opt\""))
154+
end
155+
else
156+
return default, i
157+
end
158+
else
159+
return default, i
160+
end
161+
end
162+
163+
function _slurm_parse_longopt(opt::String, args::Vector{String}, i::Int, default)
164+
if i <= lastindex(args)
165+
arg = args[i]
166+
if arg == "--$opt"
167+
if i < lastindex(args) && !startswith(args[i+1], "-")
168+
return args[i+1], i+2
169+
else
170+
throw(ArgumentError("Missing value for option \"--$opt\""))
171+
end
172+
elseif startswith(arg, "--$opt=")
173+
if length(arg) > length(opt) + 3
174+
return arg[begin+length(opt)+3:end], i+1
175+
else
176+
throw(ArgumentError("Missing value for option \"--$opt\""))
177+
end
178+
else
179+
return default, i
180+
end
181+
else
182+
return default, i
183+
end
184+
end
185+
186+
function _get_slurm_taskconf(slurmflags::Cmd, env::AbstractDict{String,String})
187+
n_tasks = get(env, "SLURM_NTASKS", nothing)
188+
cpus_per_task = get(env, "SLURM_CPUS_PER_TASK", nothing)
189+
mem_per_cpu = get(env, "SLURM_MEM_PER_CPU", nothing)
190+
n_nodes = get(env, "SLURM_JOB_NUM_NODES", nothing)
191+
ntasks_per_node = get(env, "SLURM_NTASKS_PER_NODE", nothing)
192+
mem_per_node = get(env, "SLURM_MEM_PER_NODE", nothing)
193+
194+
args = slurmflags.exec
195+
i::Int = firstindex(args)
196+
while i <= lastindex(args)
197+
last_i = i
198+
n_tasks, i = _slurm_parse_shortopt('n', args, i, n_tasks)
199+
n_tasks, i = _slurm_parse_longopt("ntasks", args, i, n_tasks)
200+
cpus_per_task, i = _slurm_parse_shortopt('c', args, i, cpus_per_task)
201+
cpus_per_task, i = _slurm_parse_longopt("cpus-per-task", args, i, cpus_per_task)
202+
mem_per_cpu, i = _slurm_parse_longopt("mem-per-cpu", args, i, mem_per_cpu)
203+
n_nodes, i = _slurm_parse_shortopt('N', args, i, n_nodes)
204+
n_nodes, i = _slurm_parse_longopt("nodes", args, i, n_nodes)
205+
mem_per_node, i = _slurm_parse_longopt("mem", args, i, mem_per_node)
206+
ntasks_per_node, i = _slurm_parse_longopt("ntasks-per-node", args, i, ntasks_per_node)
207+
208+
if last_i == i
209+
i += 1
210+
end
211+
end
212+
213+
return (
214+
n_tasks = _slurm_parse_intoptval(n_tasks),
215+
cpus_per_task = _slurm_parse_intoptval(cpus_per_task),
216+
mem_per_cpu = _slurm_parse_memoptval(mem_per_cpu),
217+
n_nodes = _slurm_parse_intoptval(n_nodes),
218+
ntasks_per_node = _slurm_parse_intoptval(ntasks_per_node),
219+
mem_per_node = _slurm_parse_memoptval(mem_per_node),
220+
)
221+
end
222+
223+
224+
function _addprocs_slurm(; kwargs...)
225+
slurm_ntasks = parse(Int, ENV["SLURM_NTASKS"])
226+
slurm_ntasks > 1 || throw(ErrorException("Invalid nprocs=$slurm_ntasks inferred from SLURM environment"))
227+
_addprocs_slurm(slurm_ntasks; kwargs...)
228+
end
229+
230+
function _addprocs_slurm(
231+
nprocs::Int;
232+
job_file_loc::AbstractString = joinpath(homedir(), "slurm-julia-output"),
233+
retry_delays::AbstractVector{<:Real} = [1, 1, 2, 2, 4, 5, 5, 10, 10, 10, 10, 20, 20, 20]
234+
)
235+
try
236+
lock(_g_processops_lock)
237+
238+
@info "Adding $nprocs Julia processes via SLURM"
239+
240+
julia_project = dirname(Pkg.project().path)
241+
slurm_ntasks = nprocs
242+
slurm_nthreads = parse(Int, ENV["SLURM_CPUS_PER_TASK"])
243+
slurm_mem_per_cpu = parse(Int, ENV["SLURM_MEM_PER_CPU"]) * 1024^2
244+
slurm_mem_per_task = slurm_nthreads * slurm_mem_per_cpu
245+
246+
cluster_manager = ClusterManagers.SlurmManager(slurm_ntasks, retry_delays)
247+
worker_timeout = round(Int, max(sum(cluster_manager.retry_delays), 60))
248+
ENV["JULIA_WORKER_TIMEOUT"] = "$worker_timeout"
249+
250+
mkpath(job_file_loc)
251+
new_workers = Distributed.addprocs(
252+
cluster_manager, job_file_loc = job_file_loc,
253+
exeflags = `--project=$julia_project --threads=$slurm_nthreads --heap-size-hint=$(slurm_mem_per_task÷2)`,
254+
cpus_per_task = "$slurm_nthreads", mem_per_cpu="$(slurm_mem_per_cpu >> 30)G", # time="0:10:00",
255+
mem_bind = "local", cpu_bind="cores",
256+
)
257+
258+
@info "Configuring $nprocs new Julia worker processes"
259+
260+
_run_always_everywhere_code(new_workers)
261+
pinthreads_distributed(new_workers)
262+
263+
@info "Added $(length(new_workers)) Julia worker processes via SLURM"
264+
finally
265+
unlock(_g_processops_lock)
266+
end
267+
end

0 commit comments

Comments
 (0)