Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributed fixes #25

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
8 changes: 4 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ jobs:
- '1.9'
os:
- ubuntu-latest
- macOS-latest
- windows-latest
arch:
- x64
- x86
exclude:
include:
- os: macOS-latest
arch: x86
arch: aarch64
exclude:
- os: windows-latest # Killing workers doesn't work on windows in 1.9
version: '1.9'

Expand All @@ -58,7 +58,7 @@ jobs:
- uses: julia-actions/julia-buildpkg@v1
- uses: julia-actions/julia-runtest@v1
env:
JULIA_NUM_THREADS: 4
JULIA_NUM_THREADS: 4,4
- uses: julia-actions/julia-processcoverage@v1
- uses: codecov/codecov-action@v5
with:
Expand Down
7 changes: 6 additions & 1 deletion Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,22 @@ Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b"
Sockets = "6462fe0b-24de-5631-8697-dd941f90decc"

[compat]
Aqua = "0.8"
Distributed = "1"
LibSSH = "0.7"
LinearAlgebra = "1"
Random = "1"
Serialization = "1"
Sockets = "1"
Test = "1"
julia = "1.9"

[extras]
Aqua = "4c88cf16-eb10-579e-8560-4a9242c79595"
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
LibSSH = "00483490-30f8-4353-8aba-35b82f51f4d0"
LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e"
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"

[targets]
test = ["LinearAlgebra", "Test", "LibSSH", "Distributed"]
test = ["Aqua", "Distributed", "LibSSH", "LinearAlgebra", "Test"]
13 changes: 4 additions & 9 deletions src/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -777,22 +777,17 @@ function redirect_output_from_additional_worker(pid, port)
end

function check_master_connect()
timeout = worker_timeout() * 1e9
# If we do not have at least process 1 connect to us within timeout
# we log an error and exit, unless we're running on valgrind
if ccall(:jl_running_on_valgrind,Cint,()) != 0
return
end

errormonitor(
@async begin
start = time_ns()
while !haskey(map_pid_wrkr, 1) && (time_ns() - start) < timeout
sleep(1.0)
end

if !haskey(map_pid_wrkr, 1)
print(stderr, "Master process (id 1) could not connect within $(timeout/1e9) seconds.\nexiting.\n")
Threads.@spawn begin
timeout = worker_timeout()
if timedwait(() -> haskey(map_pid_wrkr, 1), timeout) === :timed_out
print(stderr, "Master process (id 1) could not connect within $(timeout) seconds.\nexiting.\n")
exit(1)
end
end
Expand Down
15 changes: 13 additions & 2 deletions src/clusterserialize.jl
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,21 @@ function deserialize_global_from_main(s::ClusterSerializer, sym)
return nothing
end
end
Core.eval(Main, Expr(:global, sym))

if sym_isconst
ccall(:jl_set_const, Cvoid, (Any, Any, Any), Main, sym, v)
@static if VERSION >= v"1.12"
# Note that the post-lowering const form is not allowed in value
# position, so there needs to be a dummy `nothing` argument to drop the
# return value.
Core.eval(Main, Expr(:block,
Expr(:const, GlobalRef(Main, sym), v),
nothing))
else
Core.eval(Main, Expr(:global, sym))
ccall(:jl_set_const, Cvoid, (Any, Any, Any), Main, sym, v)
end
else
Core.eval(Main, Expr(:global, sym))
invokelatest(setglobal!, Main, sym, v)
end
return nothing
Expand Down
14 changes: 11 additions & 3 deletions src/managers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ addprocs([

* `exeflags`: additional flags passed to the worker processes. It can either be a `Cmd`, a `String`
holding one flag, or a collection of strings, with one element per flag.
E.g. `\`--threads=auto project=.\``, `"--compile-trace=stderr"` or `["--threads=auto", "--compile=all"]`.
E.g. `\`--threads=auto project=.\``, `"--compile-trace=stderr"` or `["--threads=auto", "--compile=all"]`.

* `topology`: Specifies how the workers connect to each other. Sending a message between
unconnected workers results in an error.
Expand Down Expand Up @@ -767,7 +767,8 @@ function kill(manager::SSHManager, pid::Int, config::WorkerConfig)
nothing
end

function kill(manager::LocalManager, pid::Int, config::WorkerConfig; exit_timeout = 15, term_timeout = 15)
function kill(manager::LocalManager, pid::Int, config::WorkerConfig; profile_wait = 6, exit_timeout = 15, term_timeout = 15)
# profile_wait = 6 is 1s for profile, 5s for the report to show
# First, try sending `exit()` to the remote over the usual control channels
remote_do(exit, pid)

Expand All @@ -776,7 +777,14 @@ function kill(manager::LocalManager, pid::Int, config::WorkerConfig; exit_timeou

# Check to see if our child exited, and if not, send an actual kill signal
if !process_exited(config.process)
@warn("Failed to gracefully kill worker $(pid), sending SIGQUIT")
@warn "Failed to gracefully kill worker $(pid)"
profile_sig = Sys.iswindows() ? nothing : Sys.isbsd() ? ("SIGINFO", 29) : ("SIGUSR1" , 10)
if profile_sig !== nothing
@warn("Sending profile $(profile_sig[1]) to worker $(pid)")
kill(config.process, profile_sig[2])
sleep(profile_wait)
end
@warn("Sending SIGQUIT to worker $(pid)")
kill(config.process, Base.SIGQUIT)

sleep(term_timeout)
Expand Down
27 changes: 27 additions & 0 deletions src/workerpool.jl
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ function remotecall_pool(rc_f::typeof(remotecall), f, pool::AbstractWorkerPool,

t = Threads.@spawn try
wait(x)
catch # just wait, ignore errors here
finally
put!(pool, worker)
end
Expand Down Expand Up @@ -400,3 +401,29 @@ function remotecall_pool(rc_f, f, pool::CachingPool, args...; kwargs...)
put!(pool, worker)
end
end

# Specialization for remotecall. We have to wait for the Future it returns
# before putting the worker back in the pool.
function remotecall_pool(rc_f::typeof(remotecall), f, pool::CachingPool, args...; kwargs...)
worker = take!(pool)
f_ref = get(pool.map_obj2ref, (worker, f), (f, RemoteChannel(worker)))
isa(f_ref, Tuple) && (pool.map_obj2ref[(worker, f)] = f_ref[2]) # Add to tracker

local x
try
x = rc_f(exec_from_cache, worker, f_ref, args...; kwargs...)
catch
put!(pool, worker)
rethrow()
end

t = Threads.@spawn Threads.threadpool() try
wait(x)
catch # just wait, ignore errors here
finally
put!(pool, worker)
end
errormonitor(t)

return x
end
51 changes: 29 additions & 22 deletions test/distributed_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,8 @@ end
@test nworkers() == length(unique(remotecall_fetch(wp->pmap(_->myid(), wp, 1:100), id_other, wp)))
wp = WorkerPool(2:3)
@test sort(unique(pmap(_->myid(), wp, 1:100))) == [2,3]
@test fetch(remotecall(myid, wp)) in wp.workers
@test_throws RemoteException fetch(remotecall(error, wp))

# wait on worker pool
wp = WorkerPool(2:2)
Expand All @@ -747,6 +749,8 @@ end
# CachingPool tests
wp = CachingPool(workers())
@test [1:100...] == pmap(x->x, wp, 1:100)
@test fetch(remotecall(myid, wp)) in wp.workers
@test_throws RemoteException fetch(remotecall(error, wp))

clear!(wp)
@test length(wp.map_obj2ref) == 0
Expand Down Expand Up @@ -1017,15 +1021,19 @@ f16091b = () -> 1
# Test the behaviour of remotecall(f, ::AbstractWorkerPool), this should
# keep the worker out of the pool until the underlying remotecall has
# finished.
remotechan = RemoteChannel(wrkr1)
pool = WorkerPool([wrkr1])
put_future = remotecall(() -> wait(remotechan), pool)
@test !isready(pool)
put!(remotechan, 1)
wait(put_future)
# The task that waits on the future to put it back into the pool runs
# asynchronously so we use timedwait() to check when the worker is back in.
@test timedwait(() -> isready(pool), 10) === :ok
for PoolType in (WorkerPool, CachingPool)
let
remotechan = RemoteChannel(wrkr1)
pool = PoolType([wrkr1])
put_future = remotecall(() -> wait(remotechan), pool)
@test !isready(pool)
put!(remotechan, 1)
wait(put_future)
# The task that waits on the future to put it back into the pool runs
# asynchronously so we use timedwait() to check when the worker is back in.
@test timedwait(() -> isready(pool), 10) === :ok
end
end

# Test calling @everywhere from a module not defined on the workers
LocalBar.bar()
Expand Down Expand Up @@ -1707,18 +1715,17 @@ end
end

# Ensure that the code has indeed been successfully executed everywhere
@test all(in(results), procs())
return all(in(results), procs())
end

# Test that the client port is reused. SO_REUSEPORT may not be supported on
# all UNIX platforms, Linux kernels prior to 3.9 and older versions of OSX
@assert nprocs() == 1
addprocs_with_testenv(4; lazy=false)
if ccall(:jl_has_so_reuseport, Int32, ()) == 1
reuseport_tests()
else
@info "SO_REUSEPORT is unsupported, skipping reuseport tests"
end

skip_reuseexport = ccall(:jl_has_so_reuseport, Int32, ()) != 1
skip_reuseexport && @debug "SO_REUSEPORT support missing, reuseport_tests skipped"
@test reuseport_tests() skip = skip_reuseexport
end

@testset "Even more various individual issues" begin
Expand Down Expand Up @@ -1848,19 +1855,19 @@ end
end
"""
cmd = setenv(`$(julia) --project=$(project) -e $(testcode * extracode)`, env)
@test success(cmd)
@test success(pipeline(cmd; stdout, stderr))
# JULIA_PROJECT
cmd = setenv(`$(julia) -e $(testcode * extracode)`,
(env["JULIA_PROJECT"] = project; env))
@test success(cmd)
@test success(pipeline(cmd; stdout, stderr))
# Pkg.activate(...)
activateish = """
Base.ACTIVE_PROJECT[] = $(repr(project))
using DistributedNext
addprocs(1)
"""
cmd = setenv(`$(julia) -e $(activateish * testcode * extracode)`, env)
@test success(cmd)
@test success(pipeline(cmd; stdout, stderr))
# JULIA_(LOAD|DEPOT)_PATH
shufflecode = """
d = reverse(DEPOT_PATH)
Expand All @@ -1879,7 +1886,7 @@ end
end
"""
cmd = setenv(`$(julia) -e $(shufflecode * addcode * testcode * extracode)`, env)
@test success(cmd)
@test success(pipeline(cmd; stdout, stderr))
# Mismatch when shuffling after proc addition. Note that the use of
# `addcode` mimics the behaviour of -p1 as the first worker is started
# before `shufflecode` executes.
Expand All @@ -1891,7 +1898,7 @@ end
end
"""
cmd = setenv(`$(julia) -e $(failcode)`, env)
@test success(cmd)
@test success(pipeline(cmd; stdout, stderr))

# Hideous hack to double escape path separators on Windows so that it gets
# interpolated into the string (and then Cmd) correctly.
Expand All @@ -1918,7 +1925,7 @@ end
end
"""
cmd = setenv(`$(julia) -e $(envcode)`, env)
@test success(cmd)
@test success(pipeline(cmd; stdout, stderr))
end end
end

Expand All @@ -1935,7 +1942,7 @@ include("splitrange.jl")

# Next, ensure we get a log message when a worker does not cleanly exit
w = only(addprocs(1))
@test_logs (:warn, r"sending SIGQUIT") begin
@test_logs (:warn, r"Sending SIGQUIT") match_mode=:any begin
remote_do(w) do
# Cause the 'exit()' message that `rmprocs()` sends to do nothing
Core.eval(Base, :(exit() = nothing))
Expand Down
6 changes: 6 additions & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# This file is a part of Julia. License is MIT: https://julialang.org/license

using Test
import DistributedNext
import Aqua

# Run the distributed test outside of the main driver since it needs its own
# set of dedicated workers.
Expand All @@ -22,3 +24,7 @@ include("distributed_exec.jl")
include("managers.jl")

include("distributed_stdlib_detection.jl")

@testset "Aqua" begin
Aqua.test_all(DistributedNext)
end
Loading