Skip to content

Commit fd82c1a

Browse files
authored
Add HTCondor support (#16)
1 parent 762a817 commit fd82c1a

File tree

3 files changed

+129
-2
lines changed

3 files changed

+129
-2
lines changed

docs/src/index.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ This Julia package provides some tools to ease multithreaded and distributed pro
77

88
Julia provides native support for distributed computing on multiple Julia processes that run in parallel on the same or on different machines. ParallelProcessingTools add some machinery to make some aspects of this even easier.
99

10-
An internal elastic cluster manager ([`ppt_cluster_manager`](@ref), a modified version of `ParallelProcessingTools.ElasticManager`), started on demand, allows for starting ([`runworkers`](@ref)) an stopping ([`stopworkers`](@ref)) worker processes in a dynamic fashion. The worker processes can also be started outside of the Julia session ([`worker_start_command`](@ref) and [`write_worker_start_script`](@ref)), this can be useful to add worker to a running Julia session via manually controlled batch jobs, for example. Workers can be started locally ([`OnLocalhost`](@ref)) or via SLURM ([`SlurmRun`](@ref)). Other methods to start workers (e.g. via SSH) may be added in the future (contributions are very welcome).
10+
An internal elastic cluster manager ([`ppt_cluster_manager`](@ref), a modified version of `ParallelProcessingTools.ElasticManager`), started on demand, allows for starting ([`runworkers`](@ref)) an stopping ([`stopworkers`](@ref)) worker processes in a dynamic fashion. The worker processes can also be started outside of the Julia session ([`worker_start_command`](@ref) and [`write_worker_start_script`](@ref)), this can be useful to add worker to a running Julia session via manually controlled batch jobs, for example. Workers can be started locally ([`OnLocalhost`](@ref)), via SLURM ([`SlurmRun`](@ref)), or via HTCondor ([`HTCondorRun`](@ref)). Other methods to start workers (e.g. via SSH) may be added in the future (contributions are very welcome).
1111

1212
The elastic cluster manager automatically adds new workers to an automatically created dynamic worker pool ([`ppt_worker_pool`](@ref)) of type [`FlexWorkerPool`](@ref) that optionally supports oversubscription. Users can `take!` workers from the pool and `put!` them back, or use [`onworker`](@ref) to send work to workers in the pool without exceeding their maximum occupancy.
1313

@@ -38,7 +38,7 @@ using ParallelProcessingTools, Distributed
3838
end
3939

4040
runmode = OnLocalhost(n = 4)
41-
# runmode = SlurmRun(slurm_flags = `--ntasks=4 --cpus-per-task=8 --mem-per-cpu=8G`)
41+
# runmode = lkSlurmRun(slurm_flags = `--ntasks=4 --cpus-per-task=8 --mem-per-cpu=8G`)
4242

4343
display(worker_start_command(runmode))
4444

src/ParallelProcessingTools.jl

+1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ include("workerpool.jl")
4141
include("onworkers.jl")
4242
include("runworkers.jl")
4343
include("slurm.jl")
44+
include("htcondor.jl")
4445
include("deprecated.jl")
4546

4647
@static if !isdefined(Base, :get_extension)

src/htcondor.jl

+126
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
# This file is a part of ParallelProcessingTools.jl, licensed under the MIT License (MIT).
2+
3+
"""
4+
HTCondorRun(;
5+
n::Int = 1
6+
condor_flags::Cmd = _default_condor_flags()
7+
condor_settings::Dict{String,String} = Dict{String,String}()
8+
julia_flags::Cmd = _default_julia_flags()
9+
julia_depot::Vector{String} = DEPOT_PATH
10+
jobfile_dir = homedir()
11+
env::Dict{String,String} = Dict{String,String}()
12+
redirect_output::Bool = true
13+
)
14+
15+
Mode to add worker processes via HTCondor `condor_submit`.
16+
17+
Condor submit script and steering `.sh` files are stored in `jobfile_dir`.
18+
19+
Example:
20+
21+
```julia-repl
22+
julia> runmode = HTCondorRun(n = 10; condor_settings=Dict("universe" => "vanilla", "+queue" => "short", "request_memory" => "4GB"))
23+
task = runworkers(runmode)
24+
25+
julia> runworkers(runmode)
26+
[ Info: Submitting HTCondor job: `condor_submit /home/jiling/jl_rAHyFadwHa.sub`
27+
Submitting job(s)..........
28+
10 job(s) submitted to cluster 3198291.
29+
[ Info: HTCondor job submitted: `condor_submit /home/jiling/jl_rAHyFadwHa.sub`
30+
(nothing, 10)
31+
32+
julia> sleep(10)
33+
34+
julia> nworkers()
35+
10
36+
```
37+
38+
Workers can also be started manually, use
39+
[`worker_start_command(runmode)`](@ref) to get the `condor_submit` start command and
40+
run it from a separate process or so.
41+
"""
42+
@with_kw struct HTCondorRun <: DynamicAddProcsMode
43+
n::Int = 1
44+
condor_flags::Cmd = _default_condor_flags()
45+
condor_settings::Dict{String,String} = Dict{String,String}()
46+
julia_flags::Cmd = _default_julia_flags()
47+
julia_depot::Vector{String} = DEPOT_PATH
48+
jobfile_dir = homedir()
49+
env::Dict{String,String} = Dict{String,String}()
50+
redirect_output::Bool = true
51+
end
52+
export HTCondorRun
53+
54+
_default_condor_flags() = ``
55+
const _g_condor_nextjlstep = Base.Threads.Atomic{Int}(1)
56+
57+
function worker_start_command(runmode::HTCondorRun, manager::ElasticManager)
58+
flags = runmode.condor_flags
59+
n_workers = runmode.n
60+
temp_name = tempname(runmode.jobfile_dir)
61+
worker_script_path = temp_name*".sh"
62+
submit_file_path = temp_name*".sub"
63+
_generate_condor_worker_script(worker_script_path, runmode, manager)
64+
_generate_condor_submit_file(submit_file_path, worker_script_path, runmode)
65+
66+
return `condor_submit $flags $submit_file_path`, 1, n_workers
67+
end
68+
69+
function _generate_condor_worker_script(filename, runmode::HTCondorRun, manager::ElasticManager)
70+
julia_flags = runmode.julia_flags
71+
72+
request_memory = get(runmode.condor_settings, "request_memory", "2GB")
73+
mem_per_task = _slurm_parse_memoptval(request_memory)
74+
75+
heap_size_hint_fraction = 0.5
76+
heap_size_hint_in_MB = isnothing(mem_per_task) ? nothing : ceil(Int, mem_per_task * heap_size_hint_fraction / 1024^2)
77+
jl_heap_size_hint_flag = isnothing(heap_size_hint_in_MB) ? `` : `--heap-size-hint=$(heap_size_hint_in_MB)M`
78+
79+
jl_threads_flag = `--threads=$(1)`
80+
81+
additional_julia_flags = `$jl_threads_flag $jl_heap_size_hint_flag $julia_flags`
82+
worker_cmd = worker_local_startcmd(
83+
manager;
84+
julia_flags = `$julia_flags $additional_julia_flags`,
85+
redirect_output = runmode.redirect_output, env = runmode.env
86+
)
87+
depot_path = join(runmode.julia_depot, ":")
88+
open(filename, "w") do io
89+
write(io,
90+
"""
91+
export JULIA_DEPOT_PATH='$depot_path'
92+
$worker_cmd
93+
""")
94+
end
95+
end
96+
97+
function _generate_condor_submit_file(submit_file_path, worker_script_path, runmode::HTCondorRun)
98+
jlstep = atomic_add!(_g_condor_nextjlstep, 1)
99+
jobname = "julia-$(getpid())-$jlstep"
100+
default_dict = Dict(
101+
"batch_name" => jobname,
102+
)
103+
condor_settings = merge(default_dict, runmode.condor_settings)
104+
105+
condor_option_strings = join(["$key=$value" for (key, value) in condor_settings], "\n")
106+
open(submit_file_path, "w") do io
107+
write(io,
108+
"""
109+
executable = /bin/bash
110+
arguments = $(basename(worker_script_path))
111+
should_transfer_files = yes
112+
transfer_input_files = $worker_script_path
113+
Notification = Error
114+
$condor_option_strings
115+
queue $(runmode.n)
116+
""")
117+
end
118+
end
119+
120+
function runworkers(runmode::HTCondorRun, manager::ElasticManager)
121+
run_cmd, m, n = worker_start_command(runmode, manager)
122+
@info "Submitting HTCondor job: $run_cmd"
123+
process = run(run_cmd)
124+
@info "HTCondor job submitted: $run_cmd"
125+
return nothing, n
126+
end

0 commit comments

Comments
 (0)