Skip to content

Commit 7422a3c

Browse files
committed
Refactor (including API changes), add documentation
1 parent 9a986ea commit 7422a3c

18 files changed

+702
-103
lines changed

.appveyor.yml .appveyor.yml_

+2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Documentation: https://github.com/JuliaCI/Appveyor.jl
22

33
environment:
4+
JULIA_NUM_THREADS: 4
45
matrix:
56
- julia_version: 0.7
67
- julia_version: 1.0
@@ -33,6 +34,7 @@ install:
3334
build_script:
3435
- echo "%JL_BUILD_SCRIPT%"
3536
- C:\julia\bin\julia -e "%JL_BUILD_SCRIPT%"
37+
# - C:\julia\bin\julia -e '@info "Running Julia with $(Base.Threads.nthreads()) threads"'
3638

3739
test_script:
3840
- echo "%JL_TEST_SCRIPT%"

.travis.yml

+6
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ julia:
1111
- 1.0
1212
- nightly
1313

14+
env:
15+
- JULIA_NUM_THREADS=4
16+
1417
## uncomment the following lines to allow failures on nightly julia
1518
## (tests will run but not make your overall status red)
1619
#matrix:
@@ -21,6 +24,9 @@ julia:
2124
notifications:
2225
email: false
2326

27+
before_script:
28+
- julia -e '@info "Running Julia with $(Base.Threads.nthreads()) threads"'
29+
2430
after_success:
2531
# push coverage results to Codecov
2632
- julia -e 'import Pkg; Pkg.add("Coverage"); using Coverage; Codecov.submit(Codecov.process_folder())'

README.md

+169
Original file line numberDiff line numberDiff line change
@@ -1 +1,170 @@
11
# ParallelProcessingTools.jl
2+
3+
[![Build Status](https://travis-ci.com/oschulz/ParallelProcessingTools.jl.svg?branch=master)](https://travis-ci.com/oschulz/ParallelProcessingTools.jl)
4+
[![Codecov](https://codecov.io/gh/oschulz/ParallelProcessingTools.jl/branch/master/graph/badge.svg)](https://codecov.io/gh/oschulz/ParallelProcessingTools.jl)
5+
6+
This Julia package provides some tools to ease multithreaded and distributed programming, especially for more complex use cases and when using multiple processes with multiple threads on each process.
7+
8+
This package follows the SPMD (Single Program Multiple Data) paradigm (like, e.g MPI, Cuda, OpenCL and
9+
`DistributedArrays.SPMD`): Run the same code on every execution unit (process or thread) and make the code responsible for figuring out which part of the data it should process. This differs from the approach of `Base.Threads.@threads` and `Distributed.@distributed`. SPMD is more appropriate for complex cases that the latter do not handle well (e.g. because some initial setup is required on each execution unit and/or iteration scheme over the data is more complex, control over SIMD processing is required, etc.).
10+
11+
This package also implements thread-local variables and tooling to handle non-thread-safe code.
12+
13+
Note: Some features may not work on Windows, currently.
14+
15+
16+
## Work partitions
17+
18+
`workpart` partitions an `AbstractArray` across a a specified set of workers (i.e. processes or threads). E.g.
19+
20+
```julia
21+
A = rand(100)
22+
workpart(A, 4:7, 5) == view(A, 26:50)
23+
```
24+
25+
returns a views into the array that worker `5` out of a set or workers `4:7` will be responsible for. The intended usage is
26+
27+
```julia
28+
using Distributed, Base.Threads
29+
@everywhere data = rand(1000)
30+
@everywhere procsel = workers()
31+
@onprocs procsel begin
32+
sub_A = workpart(data, procsel, myid())
33+
threadsel = allthreads()
34+
@onthreads threadsel begin
35+
# ... some initialization, create local buffers, etc.
36+
idxs = workpart(eachindex(sub_A), threadsel, threadid())
37+
for i in idxs
38+
# ... A[i] ...
39+
end
40+
end
41+
end
42+
```
43+
44+
see below for a full example.
45+
46+
If `data` is a `DistributedArrays.DArray`, then `DistributedArrays.localpart(data)` should be used instead of `workpart(data, workers(), myid())`.
47+
48+
49+
## Thread-safety
50+
51+
Use `@critical` to mark non thread-safe code, e.g. for logging. For example
52+
53+
```julia
54+
@onthreads allthreads() begin
55+
@critical @info Base.Threads.threadid()
56+
end
57+
```
58+
59+
would crash Julia without `@critical` because `@info` is not thread-safe.
60+
61+
Note: This doesn't always work for multithreaded code on other processes yet.
62+
63+
64+
# Thread-local variables
65+
66+
Thread-local variable can be created and initialized via
67+
68+
```julia
69+
tl = ThreadLocal(0.0)
70+
```
71+
72+
The API is the similar to `Ref`: `tl[]` gets the value of `tl` for the current thread, `tl[] = 4.2` sets the value for the current thread. `getallvalues(tl)` returns the values for all threads as a vector, and can only be called from single-threaded code.
73+
74+
75+
# Multithreaded code execution
76+
77+
The macro `@onthreads threadsel expr` will run the code in `expr` on the threads in `threadsel` (typically a range of thread IDs). For convenience, the package exports `allthreads() = 1:nthreads()`. Here's a simple example on how to use thread-local variables and `@onthreads` to sum up numbers in parallel:
78+
79+
```julia
80+
tlsum = ThreadLocal(0.0)
81+
data = rand(100)
82+
@onthreads allthreads() begin
83+
tlsum[] = sum(workpart(data, allthreads(), Base.Threads.threadid()))
84+
end
85+
sum(getallvalues(tlsum)) sum(data)
86+
```
87+
88+
`@onthreads` forwards exceptions thrown by the code in `expr` to the caller (in contrast to, `Base.Threads.@threads`, that will currently print an exception but not forward it, so when using `@threads` program execution simply continues after a failure in multithreaded code).
89+
90+
Note: Julia can currently run only one function on multiple threads at the same time (this restriction is likely to disappear in the the future). So even if `threadsel` does not include all threads, the rest of the threads will be idle but blocked and cannot be used to run other code in parallel. However, the ability to run on a subset of the available threads is still useful to measure the scaling behavior of multithreaded code (without restarting Julia with a different value for `$JULIA_NUM_THREADS`).
91+
92+
93+
94+
# Multiprocess code execution
95+
96+
The macro `@onprocs procsel expr` will run the code in `expr` on the processes in `procsel` (typically an
97+
array of process IDs). `@onprocs` returns a vector with the result of `expr` on each process and
98+
will wait until all the results are available (but may of course be wrapped in `@async`). A
99+
simple example to get the process ID on each worker process:
100+
101+
```julia
102+
using Distributed
103+
addprocs(2)
104+
workers() == @onprocs workers() myid()
105+
```
106+
107+
Note: If the data can be expressed in terms of a `DistributedArrays.DArray`, it may be more appropriate and convenient to use the multiprocess execution tooling available in the package `DistributedArrays` (possibly combined with `ParallelProcessingTools.@onthreads`).
108+
109+
110+
# Creating multithreaded workers
111+
112+
Julia currently doesn't provide an easy way to start multithreaded worker instances. `ParallelProcessingTools` provides a script `mtjulia.sh` (currently Linux-only) that will start Julia with `$JULIA_NUM_THREADS` set to a suitable value for each worker host (currently the number of physical processes on one NUMA node). `mtjulia_exe()` will return the absolute path to `mtjulia.sh`. So multithreaded workers can be spawned (via SSH) like this:
113+
114+
```julia
115+
addprocs([hostname1, ...], exename = mtjulia_exe())
116+
```
117+
118+
119+
### Example use case:
120+
121+
As a simple real-world use case, let's histogram distributed data on multiple processes and threads:
122+
123+
Set up a cluster of multithreaded workers and load the required packages:
124+
125+
```julia
126+
using Distributed, ParallelProcessingTools
127+
addprocs(["hostname1", ...], exename = mtjulia_exe())
128+
@everywhere using ParallelProcessingTools, Base.Threads,
129+
DistributedArrays, Statistics, StatsBase
130+
```
131+
132+
Create some distributed data and check how the data is distributed:
133+
134+
```julia
135+
data = drandn(10^8)
136+
procsel = procs(data)
137+
@onprocs procsel size(localpart(data))
138+
```
139+
140+
Check the number of threads on each worker holding a part of the data:
141+
142+
```julia
143+
@onprocs procsel nthreads()
144+
```
145+
146+
Create histograms in parallel on all threads of all workers and merge:
147+
148+
```julia
149+
proc_hists = @onprocs procsel begin
150+
local_data = localpart(data)
151+
tl_hist = ThreadLocal(Histogram((-6:0.1:6,), :left))
152+
@onthreads allthreads() begin
153+
data_for_this_thread = workpart(local_data, allthreads(), threadid())
154+
append!(tl_hist[], data_for_this_thread)
155+
end
156+
merged_hist = merge(getallvalues(tl_hist)...)
157+
end
158+
final_hist = merge(proc_hists...)
159+
```
160+
161+
Check result:
162+
163+
```
164+
sum(final_hist.weights) ≈ length(data)
165+
166+
using Plots
167+
plot(final_hist)
168+
```
169+
170+
Note: This example is meant to show how to combine the features of this package. The multi-process part of this particular use case can be written in a simpler way using functionality from `DistributedArrays`.

bin/mtjulia.sh

+52
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
#!/bin/bash -e
2+
3+
phys_cores() {
4+
lscpu -p | grep -v '^#' | cut -d ',' -f 2 | sort | uniq | wc -l
5+
}
6+
7+
phys_cores_node() {
8+
lscpu -p | grep '^[0-9]\+,[0-9]\+,[0-9]\+,0,' | cut -d ',' -f 2 | sort | uniq | wc -l
9+
}
10+
11+
12+
get_num_threads() {
13+
ARG="$1"
14+
case "${ARG}" in
15+
[0-9]*) echo "${ARG}" ;;
16+
phys_cores_node) phys_cores_node ;;
17+
phys_cores) phys_cores ;;
18+
nproc) nproc ;;
19+
auto)
20+
if [ `nproc` -le 4 ] ; then
21+
nproc
22+
else
23+
phys_cores_node
24+
fi
25+
;;
26+
*)
27+
echo "ERROR: Invalid argument \"${ARG}\" for get_num_threads." >&2
28+
return 1
29+
;;
30+
esac
31+
}
32+
33+
34+
NTHREADS="auto"
35+
PRECMD=""
36+
# while getopts j:p: opt
37+
# do
38+
# case "$opt" in
39+
# j) NTHREADS="$OPTARG" ;;
40+
# p) PRECMD="$OPTARG" ;;
41+
# esac
42+
# done
43+
# shift `expr $OPTIND - 1`
44+
45+
46+
NTHREADS=`get_num_threads "${NTHREADS}"`
47+
# echo "INFO: Setting environment variables for ${NTHREADS} threads." >&2
48+
49+
export OMP_NUM_THREADS="${NTHREADS}"
50+
export JULIA_NUM_THREADS="${NTHREADS}"
51+
52+
${PRECMD} julia "$@"

src/ParallelProcessingTools.jl

+3-1
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@ __precompile__(true)
55
module ParallelProcessingTools
66

77
using Base.Threads
8+
using Distributed
89

910
include("threadsafe.jl")
1011
include("threadlocal.jl")
11-
include("threadexec.jl")
12+
include("onthreads.jl")
13+
include("onprocs.jl")
1214
include("workpartition.jl")
1315

1416
end # module

src/onprocs.jl

+45
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# This file is a part of ParallelProcessingTools.jl, licensed under the MIT License (MIT).
2+
3+
4+
_run_on_procs(f, procsel::Integer) = remotecall_fetch(f, procsel)
5+
6+
_run_on_procs(f, procsel::AbstractArray) =
7+
fetch.([remotecall(f, pid) for pid in procsel])
8+
9+
10+
"""
11+
@onprocs procsel expr
12+
13+
Executes `expr` in parallel on all processes in `procsel`. Waits until
14+
all processes are done. Returns all results as a vector (or as a single
15+
scalar value, if `procsel` itself is a scalar).
16+
17+
Example:
18+
19+
```julia
20+
using Distributed
21+
addprocs(2)
22+
workers() == @onprocs workers() myid()
23+
```
24+
"""
25+
macro onprocs(procsel, expr)
26+
f = esc(:(()->($expr)))
27+
quote
28+
let procsel = $(esc(procsel)), f = $f
29+
_run_on_procs(f, procsel)
30+
end
31+
end
32+
end
33+
export @onprocs
34+
35+
36+
function mtjulia_exe()
37+
if Sys.islinux()
38+
joinpath(@__DIR__, "..", "bin", "mtjulia.sh")
39+
else
40+
# No equivalent for "mtjulia.sh" implemented for non-Linux systems yet,
41+
# return default exename:
42+
joinpath(Sys.BINDIR, Base.julia_exename())
43+
end
44+
end
45+
export mtjulia_exe

0 commit comments

Comments
 (0)