Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add flexible worker management, pools and onworkers #11

Closed
wants to merge 64 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
edb2653
Add on_free_worker (Julia >= 1v.9 only)
oschulz Apr 27, 2024
ae69c79
Use collect for Cmd components
oschulz Apr 27, 2024
43ce4d3
Better logging handling for test_onworkers
oschulz Apr 27, 2024
175eb4c
Make on_free_worker throw original exception
oschulz Apr 27, 2024
ad88613
Disable ThreadPinning OS warnings in tests
oschulz Apr 27, 2024
0866c97
Fixes for non-Linux platforms
oschulz Apr 27, 2024
cd91fee
Small fix in _on_free_worker_impl_
oschulz Apr 27, 2024
3eb4f2d
Fix sleep tests on non-Linux OS, esp. OS-X
oschulz Apr 27, 2024
f40c1c4
Add getlabel, isactive, hasfailed and whyfailed
oschulz Apr 27, 2024
9bafd64
Improve start_elastic_workers
oschulz Apr 27, 2024
61edac2
Rename util.jl to waiting.jl
oschulz Apr 28, 2024
8498265
Add wouldwait
oschulz Apr 28, 2024
6d480fb
Add macro wait_while and functions wait_for_any and wait_for_all
oschulz Apr 28, 2024
3869385
Require ArgCheck and Sockets
oschulz Apr 30, 2024
9dd2556
Remove unused _addprocs_slurm
oschulz Apr 30, 2024
d9e3662
Improve wait_for_any and wait_for_all
oschulz Apr 30, 2024
315c084
Support Nothing in state checks
oschulz Apr 30, 2024
8a7f83c
Remove worker scheduler, add new procinit and FlexWorkerPool
oschulz Apr 30, 2024
f6b243a
Set atomic file operations default verbosity to false
oschulz Apr 30, 2024
de678f0
Handle MethodError in onwoker
oschulz Apr 30, 2024
3d6653a
Fix test_states
oschulz Apr 30, 2024
4897aea
Adds inner_exception, original_exception get's a new role
oschulz Apr 30, 2024
35f082f
Adapt to change from original_exception to inner_exception
oschulz Apr 30, 2024
d2a4804
Add macro return_exceptions
oschulz Apr 30, 2024
24800c3
Add function default_cache_dir
oschulz Apr 30, 2024
42c972a
Add function modify_files
oschulz Apr 30, 2024
b3d9b44
Make worker_resources robust
oschulz May 2, 2024
ad4e110
Don't init workers in addworker
oschulz May 2, 2024
a8517c0
Improve take! implementation for FlexWorkerPool
oschulz May 2, 2024
669870a
No greedy worker init, add occupancy info to FlexWorkerPool
oschulz May 2, 2024
45161de
First steps to make printover VSCode-compatible
oschulz May 3, 2024
6cd64bf
Replace watch_show by keep_showing
oschulz May 5, 2024
5ff32af
Improve SLUM option memsize parsing
oschulz May 4, 2024
eb87002
Replace addworkers by runworkers, change a lot
oschulz May 3, 2024
9bb4d37
Rename pool oversubscription to maxoccupancy
oschulz May 6, 2024
b93fe56
Change FlexWorkerPool ctors
oschulz May 6, 2024
ea6f15f
Fix deprecation tests
oschulz May 7, 2024
e66cdad
Add write_worker_start_script
oschulz May 6, 2024
d983669
Relax waiting tests
oschulz May 7, 2024
4983a32
FlexWorkerPool supports nworkers and sorts workers
oschulz May 7, 2024
3c5818c
Fix macro return_exceptions
oschulz May 7, 2024
acda3b9
Add function stopworkers
oschulz May 7, 2024
67f3192
Multiple implementation and test fixes.
oschulz May 7, 2024
7a775eb
Improve write_worker_start_script implementation
oschulz May 7, 2024
ad58cb4
Change worker_start_command and write_worker_start_script
oschulz May 7, 2024
bd6f357
Fix handling of worker_timeout options
oschulz May 7, 2024
1dfc4a4
Add timeout support to waiting functions
oschulz May 8, 2024
deb174e
Use wait_for_any timeout in _on_worker_impl
oschulz May 9, 2024
cb21ec9
Adapt to changes in ElasticManager
oschulz May 9, 2024
8440abf
Use internal modified version of ElasticManager for now
oschulz May 9, 2024
b240451
Make write_worker_start_script return script filename
oschulz May 9, 2024
5225a16
Add testtools
oschulz May 9, 2024
25f942e
Improve onworkers tests
oschulz May 9, 2024
6d8aa75
Move ThreadPinning related code to into an extension
oschulz May 9, 2024
89ef323
Fix tests and enable onworker on Julia v1.6
oschulz May 9, 2024
25b3cb8
FIx global_procinit_level docstring
oschulz May 9, 2024
02e8a30
Remove on_free_worker
oschulz May 9, 2024
21cbf5d
Fix docstring of idle_sleep
oschulz May 9, 2024
ef88a8c
Remove keep_showing
oschulz May 9, 2024
a43d909
Run ensure_procinit before worker_resources
oschulz May 9, 2024
93be1f0
Add package functionality overview to docs
oschulz May 9, 2024
d821382
Relax sleep timing tests on OS-X a lot
oschulz May 9, 2024
a1d6aaa
Deactivate write_worker_start_script tests
oschulz May 9, 2024
4805bb6
Set JULIA_REVISE to off on elastic workers
oschulz May 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,30 @@ uuid = "8e8a01fc-6193-5ca1-a2f1-20776dae4199"
version = "0.4.3"

[deps]
ArgCheck = "dce04be8-c92d-5529-be00-80e4d2c0e197"
ClusterManagers = "34f1f09b-3a8b-5176-ab39-66d58a4d544e"
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e"
Logging = "56ddb016-857b-54e1-b83d-db4d58db5568"
Parameters = "d96e819e-fc66-5662-9728-84c9c7592b0a"
Pkg = "44cfe95a-1eb2-52ea-b672-e2afdf69b78f"
Sockets = "6462fe0b-24de-5631-8697-dd941f90decc"
ThreadPinning = "811555cd-349b-4f26-b7bc-1f208b848042"

[weakdeps]
ThreadPinning = "811555cd-349b-4f26-b7bc-1f208b848042"

[extensions]
ParallelProcessingToolsThreadPinningExt = "ThreadPinning"

[compat]
ArgCheck = "1, 2"
ClusterManagers = "0.4.6"
Distributed = "1"
LinearAlgebra = "1"
Logging = "1"
Parameters = "0.12, 0.13"
Pkg = "1"
Sockets = "1"
ThreadPinning = "0.7.22"
julia = "1.6"
2 changes: 2 additions & 0 deletions Test/Project.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[deps]
Pkg = "44cfe95a-1eb2-52ea-b672-e2afdf69b78f"
142 changes: 117 additions & 25 deletions docs/src/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,56 +3,112 @@
This Julia package provides some tools to ease multithreaded and distributed programming.


## Compute cluster management
## Distributed computing

ParallelProcessingTools helps spin-up Julia compute clusters. It currently has support for clusters on localhost and on SLURM (uses `ClusterManagers.ElasticManager` internally).
Julia provides native support for distributed computing on multiple Julia processes that run in parallel on the same or on different machines. ParallelProcessingTools add some machinery to make some aspects of this even easier.

On SLURM, `addworkers` will automatically try to perform a sensible thread-pinning (using the [ThreadPinning](https://github.com/carstenbauer/ThreadPinning.jl) package internally).
An internal elastic cluster manager ([`ppt_cluster_manager`](@ref), a modified version of `ParallelProcessingTools.ElasticManager`), started on demand, allows for starting ([`runworkers`](@ref)) an stopping ([`stopworkers`](@ref)) worker processes in a dynamic fashion. The worker processes can also be started outside of the Julia session ([`worker_start_command`](@ref) and [`write_worker_start_script`](@ref)), this can be useful to add worker to a running Julia session via manually controlled batch jobs, for example. Workers can be started locally ([`OnLocalhost`](@ref)) or via SLURM ([`SlurmRun`](@ref)). Other methods to start workers (e.g. via SSH) may be added in the future (contributions are very welcome).

The elastic cluster manager automatically adds new workers to an automatically created dynamic worker pool ([`ppt_worker_pool`](@ref)) of type [`FlexWorkerPool`](@ref) that optionally supports oversubscription. Users can `take!` workers from the pool and `put!` them back, or use [`onworker`](@ref) to send work to workers in the pool without exceeding their maximum occupancy.

Since workers can appear and disappear dynamically, initializing them (loading packages, etc.) via the standard `Distributed.@everywhere` macro is problematic, as workers added afterwards won't be initialized. Parallel processing tools provides the macro [`@always_everywhere`](@ref) to run code globally on all current processes, but also store the code so it can be run again on future new worker processes. Workers that are part of a [`FlexWorkerPool`](@ref) will be updated automatically on `take!` and `onworker`. You can also use [`ensure_procinit`](@ref) to manually update all workers
to all `@always_everywhere` used so far.

The function [`pinthreads_auto`](@ref) (used inside of `@always_everywhere`) provides a convenient way to perform some automatic thread pinning on all processes. Note that it needs to follow an [`import ThreadPinning`](https://github.com/carstenbauer/ThreadPinning.jl/), and that more complex use cased may require customized thread pinning for best performance.

For example:

```julia
ENV["JULIA_DEBUG"] = "ParallelProcessingTools"
ENV["JULIA_WORKER_TIMEOUT"] = "120"

using ParallelProcessingTools, Distributed

@always_everywhere begin
using Distributions
using ParallelProcessingTools
using Statistics

import ThreadPinning
pinthreads_auto()
end

mode = ParallelProcessingTools.SlurmRun(slurm_flags = `--ntasks=4 --cpus-per-task=8 --mem-per-cpu=8G`)
#ParallelProcessingTools.worker_start_command(mode)
runmode = OnLocalhost(n = 4)
# runmode = SlurmRun(slurm_flags = `--ntasks=4 --cpus-per-task=8 --mem-per-cpu=8G`)

display(worker_start_command(runmode))

# Add some workers:
addworkers(mode)
# Add some workers and initialize with all `@always_everywhere` code:
old_nprocs = nprocs()
_, n = runworkers(runmode)
@wait_while nprocs() < old_nprocs + n
ensure_procinit()

# List resources:
ParallelProcessingTools.worker_resources()
# Show worker resources:
pool = ppt_worker_pool()
display(pool)
display(worker_resources())

# Confirm that Distributions is loaded on workers:
# Confirm that Distributions is loaded on a worker:
worker = last(workers())
@fetchfrom worker Normal()
@fetchfrom worker mean(rand(100))

# Add some more workers:
addworkers(mode)
Table(ParallelProcessingTools.worker_resources())
# Some more init code
@always_everywhere begin
X = rand(100)
end

# Add even more workers:
addworkers(mode)
Table(ParallelProcessingTools.worker_resources())
# Add some more workers, we won't run `ensure_procinit()` manually this time:
old_nprocs = nprocs()
_, n = runworkers(runmode)
@wait_while nprocs() < old_nprocs + n

# Worker hasn't run @always_everywhere code yet, so it doesn't have `mean`:
worker = last(workers())
display(@return_exceptions @userfriendly_exceptions begin
@fetchfrom worker mean(X)
end)

# Using `take!` on a `FlexWorkerPool` automatically runs init code as necessary:
pid = take!(pool)
try
remotecall_fetch(() -> mean(X), pid)
finally
put!(pool, pid)
end

# `onworker` (using the default `FlexWorkerPool` here) does the same:
onworker(mean, X)

# If we don't need workers processes for a while, let's stop them:
stopworkers()
```

And we can do SLURM batch scripts like this (e.g. "batchtest.jl"):
We can also use SLURM batch scripts, like this (e.g. "batchtest.jl"):

```julia
#!/usr/bin/env -S julia --project=@SOME_JULIA_ENVIRONMENT --threads=8
#SBATCH --ntasks=4 --cpus-per-task=8 --mem-per-cpu=8G
#!/usr/bin/env julia
#SBATCH --ntasks=4 --cpus-per-task=8 --mem-per-cpu=8G --time=00:15:00

using Pkg; pkg"activate @SOME_JULIA_ENVIRONMENT"

ENV["JULIA_DEBUG"] = "ParallelProcessingTools"
ENV["JULIA_WORKER_TIMEOUT"] = "120"

using ParallelProcessingTools, Distributed

@always_everywhere begin
using ParallelProcessingTools
import ThreadPinning
pinthreads_auto()
end

addworkers(SlurmRun())
resources = ParallelProcessingTools.worker_resources()
show(stdout, MIME"text/plain"(), ParallelProcessingTools.worker_resources())
_, n = runworkers(SlurmRun(slurm_flags = `--cpu-bind=cores --mem-bind=local`))
@wait_while maxtime=240 nprocs() < n + 1

resources = worker_resources()
display(resources)

stopworkers()
```

This should run with a simple
Expand All @@ -61,4 +117,40 @@ This should run with a simple
sbatch -o out.txt batchtest.jl
```

and "out.txt" should then contain a list of the worker resources.
and "out.txt" should then contain debugging output and a list of the worker
resources.


## Multithreading

To test multithreading performance and help debug and optimize multithreaded
code, ParallelProcessingTools provides the utility macros [`@onthreads`](@ref)
to run code explicitly on the selected Julia threads (all threads can be
listed using [`allthreads`](@ref)).

You can use the macro [`@critical`](@ref) to prevent code that may suffer from race conditions in parallel to other code fenced by `@critical`.

The macro [`@mt_out_of_order`](@ref) is useful to run different code on in parallel on Julia threads.


# Waiting and sleeping

In a parallel computing scenario, on threads, distributed processes or both, or when dealing with I/O operations, code often needs to wait. In addition a timeout mechanism is often necessary. Julia's standard `wait` function can only waits a single object without a timeout. (`waitany`, requires Julia >= v1.12, can be used to wait for multiple tasks).

ParallelProcessingTools provides a very flexible macro [`@wait_while`](@ref) to wait for custom conditions with an optional timeout, as well as the functions [`wait_for_all`](@ref) and [`wait_for_any`](@ref) that can wait for different kinds of objects, also with an optional timeout.

The functions [`sleep_ns`](@ref) and [`idle_sleep`](@ref) can be used to implement custom scenarios that require precise sleeping for both very short and long intervals.


# Exception handling

Exceptions throws during remote code execution can be complex, nested and sometimes hard to understand. You can use the functions [`inner_exception`](@ref), [`onlyfirst_exception`](@ref) and [`original_exception`](@ref) to get to the underlying reason of a failure more easily. The macro [`@userfriendly_exceptions`](@ref) automatizes this to some extent for a given piece of code.

To get an exception "in hand" for further analysis, you can use the macro [`@return_exceptions`](@ref) to make (possibly failing) code return the exceptions instead of throwing it.


# File I/O

File handling can become more challenging when working in a parallel and possibly distributed fashion. Code or whole workers can crash, resulting in corrupt files, or workers may become disconnected, but still write files and clash with restarted code (resulting in race conditions and may also result in corrupt files).

ParallelProcessingTools provides the functions [`create_files`](@ref), [`read_files`](@ref) and [`modify_files`](@ref) to implement atomic file operations, on a best-effort basis (depending on the operating system and underlying file systems).
38 changes: 38 additions & 0 deletions ext/ParallelProcessingToolsThreadPinningExt.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
module ParallelProcessingToolsThreadPinningExt

import ParallelProcessingTools
import LinearAlgebra
import Distributed
import ThreadPinning

# ThreadPinning.jl does not support all operating systems, currently:
const _threadpinning_supported = isdefined(ThreadPinning, :affinitymask2cpuids)

@static if _threadpinning_supported


function ParallelProcessingTools._pinthreads_auto_impl(::Val{true})
pid = Distributed.myid()
if Distributed.myid() == 1
@debug "On process $pid, leaving Julia threads unpinned"
let n_juliathreads = Threads.nthreads()
if n_juliathreads > 1
LinearAlgebra.BLAS.set_num_threads(n_juliathreads)

Check warning on line 20 in ext/ParallelProcessingToolsThreadPinningExt.jl

View check run for this annotation

Codecov / codecov/patch

ext/ParallelProcessingToolsThreadPinningExt.jl#L14-L20

Added lines #L14 - L20 were not covered by tests
end
end
else
@debug "On process $pid, pinning threads according to affinity mask"
let available_cpus = ThreadPinning.affinitymask2cpuids(ThreadPinning.get_affinity_mask())
ThreadPinning.pinthreads(:affinitymask)
LinearAlgebra.BLAS.set_num_threads(length(available_cpus))

Check warning on line 27 in ext/ParallelProcessingToolsThreadPinningExt.jl

View check run for this annotation

Codecov / codecov/patch

ext/ParallelProcessingToolsThreadPinningExt.jl#L24-L27

Added lines #L24 - L27 were not covered by tests
end
end
end


ParallelProcessingTools._getcpuids_impl(::Val{true}) = ThreadPinning.getcpuids()

Check warning on line 33 in ext/ParallelProcessingToolsThreadPinningExt.jl

View check run for this annotation

Codecov / codecov/patch

ext/ParallelProcessingToolsThreadPinningExt.jl#L33

Added line #L33 was not covered by tests


end # if _threadpinning_supported

end # module ChangesOfVariablesInverseFunctionsExt
22 changes: 20 additions & 2 deletions src/ParallelProcessingTools.jl
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,41 @@ using Distributed

import LinearAlgebra
import Pkg
import Sockets

import ClusterManagers
import ThreadPinning

using Base: Process
using Logging: @logmsg, LogLevel, Info, Debug

using ArgCheck: @argcheck
using Parameters: @with_kw

# # ToDo: Remove CustomClusterManagers once changes to ElasticManager have
# # have been upstreamed.
#using CustomClusterManagers: ElasticManager
include("custom_cluster_managers.jl")
using .CustomClusterManagers: ElasticManager

include("display.jl")
include("waiting.jl")
include("exceptions.jl")
include("states.jl")
include("fileio.jl")
include("threadsafe.jl")
include("threadlocal.jl")
include("onthreads.jl")
include("onprocs.jl")
include("workpartition.jl")
include("addworkers.jl")
include("procinit.jl")
include("workerpool.jl")
include("onworkers.jl")
include("runworkers.jl")
include("slurm.jl")
include("deprecated.jl")

@static if !isdefined(Base, :get_extension)
include("../ext/ParallelProcessingToolsThreadPinningExt.jl")
end

end # module
Loading
Loading