From bdadc0caa026aa6b0306868a9f07c2baa137ab02 Mon Sep 17 00:00:00 2001 From: JamesWrigley Date: Tue, 21 Jan 2025 19:46:46 +0100 Subject: [PATCH 01/11] Use timedwait() in check_master_connect() --- src/cluster.jl | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/src/cluster.jl b/src/cluster.jl index 3ca82d9..f91d589 100644 --- a/src/cluster.jl +++ b/src/cluster.jl @@ -777,7 +777,6 @@ function redirect_output_from_additional_worker(pid, port) end function check_master_connect() - timeout = worker_timeout() * 1e9 # If we do not have at least process 1 connect to us within timeout # we log an error and exit, unless we're running on valgrind if ccall(:jl_running_on_valgrind,Cint,()) != 0 @@ -785,14 +784,10 @@ function check_master_connect() end errormonitor( - @async begin - start = time_ns() - while !haskey(map_pid_wrkr, 1) && (time_ns() - start) < timeout - sleep(1.0) - end - - if !haskey(map_pid_wrkr, 1) - print(stderr, "Master process (id 1) could not connect within $(timeout/1e9) seconds.\nexiting.\n") + Threads.@spawn begin + timeout = worker_timeout() + if timedwait(() -> !haskey(map_pid_wrkr, 1), timeout) === :timed_out + print(stderr, "Master process (id 1) could not connect within $(timeout) seconds.\nexiting.\n") exit(1) end end From dd65d221e12b592333aab007bd155b034822f6e8 Mon Sep 17 00:00:00 2001 From: Ian Butterworth Date: Wed, 22 Jan 2025 09:36:01 -0500 Subject: [PATCH 02/11] fix macos ci setup for apple silicon runners --- .github/workflows/ci.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d8f14f5..bf0ffae 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -28,14 +28,14 @@ jobs: - '1.9' os: - ubuntu-latest - - macOS-latest - windows-latest arch: - x64 - x86 - exclude: + include: - os: macOS-latest - arch: x86 + arch: aarch64 + exclude: - os: windows-latest # Killing workers doesn't work on windows in 1.9 version: '1.9' From 6515ec2bb21492c2e21b9ad8085014910657eb84 Mon Sep 17 00:00:00 2001 From: Ian Butterworth Date: Fri, 24 Jan 2025 16:46:42 -0500 Subject: [PATCH 03/11] use proper test skip mechanism for SO_REUSEPORT --- test/distributed_exec.jl | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/test/distributed_exec.jl b/test/distributed_exec.jl index 23fba89..5e10e86 100644 --- a/test/distributed_exec.jl +++ b/test/distributed_exec.jl @@ -1707,18 +1707,17 @@ end end # Ensure that the code has indeed been successfully executed everywhere - @test all(in(results), procs()) + return all(in(results), procs()) end # Test that the client port is reused. SO_REUSEPORT may not be supported on # all UNIX platforms, Linux kernels prior to 3.9 and older versions of OSX @assert nprocs() == 1 addprocs_with_testenv(4; lazy=false) - if ccall(:jl_has_so_reuseport, Int32, ()) == 1 - reuseport_tests() - else - @info "SO_REUSEPORT is unsupported, skipping reuseport tests" - end + + skip_reuseexport = ccall(:jl_has_so_reuseport, Int32, ()) != 1 + skip_reuseexport && @debug "SO_REUSEPORT support missing, reuseport_tests skipped" + @test reuseport_tests() skip = skip_reuseexport end @testset "Even more various individual issues" begin From cb93991ba4950440bb2b27096921bd19678dafd2 Mon Sep 17 00:00:00 2001 From: Ian Butterworth Date: Fri, 24 Jan 2025 21:36:05 -0500 Subject: [PATCH 04/11] sig profile hanging workers before SIGKILL --- src/managers.jl | 14 +++++++++++--- test/distributed_exec.jl | 2 +- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/src/managers.jl b/src/managers.jl index fccc5ea..ab79abe 100644 --- a/src/managers.jl +++ b/src/managers.jl @@ -126,7 +126,7 @@ addprocs([ * `exeflags`: additional flags passed to the worker processes. It can either be a `Cmd`, a `String` holding one flag, or a collection of strings, with one element per flag. - E.g. `\`--threads=auto project=.\``, `"--compile-trace=stderr"` or `["--threads=auto", "--compile=all"]`. + E.g. `\`--threads=auto project=.\``, `"--compile-trace=stderr"` or `["--threads=auto", "--compile=all"]`. * `topology`: Specifies how the workers connect to each other. Sending a message between unconnected workers results in an error. @@ -767,7 +767,8 @@ function kill(manager::SSHManager, pid::Int, config::WorkerConfig) nothing end -function kill(manager::LocalManager, pid::Int, config::WorkerConfig; exit_timeout = 15, term_timeout = 15) +function kill(manager::LocalManager, pid::Int, config::WorkerConfig; profile_wait = 6, exit_timeout = 15, term_timeout = 15) + # profile_wait = 6 is 1s for profile, 5s for the report to show # First, try sending `exit()` to the remote over the usual control channels remote_do(exit, pid) @@ -776,7 +777,14 @@ function kill(manager::LocalManager, pid::Int, config::WorkerConfig; exit_timeou # Check to see if our child exited, and if not, send an actual kill signal if !process_exited(config.process) - @warn("Failed to gracefully kill worker $(pid), sending SIGQUIT") + @warn "Failed to gracefully kill worker $(pid)" + profile_sig = Sys.iswindows() ? nothing : Sys.isbsd() ? ("SIGINFO", 29) : ("SIGUSR1" , 10) + if profile_sig !== nothing + @warn("Sending profile $(profile_sig[1]) to worker $(pid)") + kill(config.process, profile_sig[2]) + sleep(profile_wait) + end + @warn("Sending SIGQUIT to worker $(pid)") kill(config.process, Base.SIGQUIT) sleep(term_timeout) diff --git a/test/distributed_exec.jl b/test/distributed_exec.jl index 5e10e86..0507657 100644 --- a/test/distributed_exec.jl +++ b/test/distributed_exec.jl @@ -1934,7 +1934,7 @@ include("splitrange.jl") # Next, ensure we get a log message when a worker does not cleanly exit w = only(addprocs(1)) - @test_logs (:warn, r"sending SIGQUIT") begin + @test_logs (:warn, r"Sending SIGQUIT") match_mode=:any begin remote_do(w) do # Cause the 'exit()' message that `rmprocs()` sends to do nothing Core.eval(Base, :(exit() = nothing)) From ab3a9ff118b6af13553fd7b1a1051e6b492e7e81 Mon Sep 17 00:00:00 2001 From: JamesWrigley Date: Wed, 26 Feb 2025 21:18:12 +0100 Subject: [PATCH 05/11] Use an additional 4 interactive threads in CI --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index bf0ffae..9aad012 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -58,7 +58,7 @@ jobs: - uses: julia-actions/julia-buildpkg@v1 - uses: julia-actions/julia-runtest@v1 env: - JULIA_NUM_THREADS: 4 + JULIA_NUM_THREADS: 4,4 - uses: julia-actions/julia-processcoverage@v1 - uses: codecov/codecov-action@v5 with: From 9843937d01be1dbb74f5155e1cbcd01d84c530f2 Mon Sep 17 00:00:00 2001 From: Ian Butterworth Date: Sat, 25 Jan 2025 09:58:40 -0500 Subject: [PATCH 06/11] monitor sub-subprocesses for error stdio --- test/distributed_exec.jl | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/test/distributed_exec.jl b/test/distributed_exec.jl index 0507657..11cb733 100644 --- a/test/distributed_exec.jl +++ b/test/distributed_exec.jl @@ -1847,11 +1847,11 @@ end end """ cmd = setenv(`$(julia) --project=$(project) -e $(testcode * extracode)`, env) - @test success(cmd) + @test success(pipeline(cmd; stdout, stderr)) # JULIA_PROJECT cmd = setenv(`$(julia) -e $(testcode * extracode)`, (env["JULIA_PROJECT"] = project; env)) - @test success(cmd) + @test success(pipeline(cmd; stdout, stderr)) # Pkg.activate(...) activateish = """ Base.ACTIVE_PROJECT[] = $(repr(project)) @@ -1859,7 +1859,7 @@ end addprocs(1) """ cmd = setenv(`$(julia) -e $(activateish * testcode * extracode)`, env) - @test success(cmd) + @test success(pipeline(cmd; stdout, stderr)) # JULIA_(LOAD|DEPOT)_PATH shufflecode = """ d = reverse(DEPOT_PATH) @@ -1878,7 +1878,7 @@ end end """ cmd = setenv(`$(julia) -e $(shufflecode * addcode * testcode * extracode)`, env) - @test success(cmd) + @test success(pipeline(cmd; stdout, stderr)) # Mismatch when shuffling after proc addition. Note that the use of # `addcode` mimics the behaviour of -p1 as the first worker is started # before `shufflecode` executes. @@ -1890,7 +1890,7 @@ end end """ cmd = setenv(`$(julia) -e $(failcode)`, env) - @test success(cmd) + @test success(pipeline(cmd; stdout, stderr)) # Hideous hack to double escape path separators on Windows so that it gets # interpolated into the string (and then Cmd) correctly. @@ -1917,7 +1917,7 @@ end end """ cmd = setenv(`$(julia) -e $(envcode)`, env) - @test success(cmd) + @test success(pipeline(cmd; stdout, stderr)) end end end From b41f62af811fc68f98c3bba7b47c35c6187b3791 Mon Sep 17 00:00:00 2001 From: Keno Fischer Date: Sat, 25 Jan 2025 04:53:48 +0000 Subject: [PATCH 07/11] Don't use internal jl_set_const to create constants The internal function `jl_set_const` is allowed during bootstrap only and ignores world age partition. This would give incorrect results after JuliaLang/julia#57150. Just eval the constant definition directly, which has well defined semantics. --- src/clusterserialize.jl | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/clusterserialize.jl b/src/clusterserialize.jl index d2f09e7..347ca9c 100644 --- a/src/clusterserialize.jl +++ b/src/clusterserialize.jl @@ -167,10 +167,21 @@ function deserialize_global_from_main(s::ClusterSerializer, sym) return nothing end end - Core.eval(Main, Expr(:global, sym)) + if sym_isconst - ccall(:jl_set_const, Cvoid, (Any, Any, Any), Main, sym, v) + @static if VERSION >= v"1.12" + # Note that the post-lowering const form is not allowed in value + # position, so there needs to be a dummy `nothing` argument to drop the + # return value. + Core.eval(Main, Expr(:block, + Expr(:const, GlobalRef(Main, sym), v), + nothing)) + else + Core.eval(Main, Expr(:global, sym)) + ccall(:jl_set_const, Cvoid, (Any, Any, Any), Main, sym, v) + end else + Core.eval(Main, Expr(:global, sym)) invokelatest(setglobal!, Main, sym, v) end return nothing From 23dd9a914ebd00fe14cbf5ae166df1209e7d3ed4 Mon Sep 17 00:00:00 2001 From: Ian Butterworth Date: Sun, 26 Jan 2025 12:56:42 -0500 Subject: [PATCH 08/11] Add CachingPool method for remotecall --- src/workerpool.jl | 25 +++++++++++++++++++++++++ test/distributed_exec.jl | 26 +++++++++++++++++--------- 2 files changed, 42 insertions(+), 9 deletions(-) diff --git a/src/workerpool.jl b/src/workerpool.jl index 6d03bc9..44c588b 100644 --- a/src/workerpool.jl +++ b/src/workerpool.jl @@ -400,3 +400,28 @@ function remotecall_pool(rc_f, f, pool::CachingPool, args...; kwargs...) put!(pool, worker) end end + +# Specialization for remotecall. We have to wait for the Future it returns +# before putting the worker back in the pool. +function remotecall_pool(rc_f::typeof(remotecall), f, pool::CachingPool, args...; kwargs...) + worker = take!(pool) + f_ref = get(pool.map_obj2ref, (worker, f), (f, RemoteChannel(worker))) + isa(f_ref, Tuple) && (pool.map_obj2ref[(worker, f)] = f_ref[2]) # Add to tracker + + local x + try + x = rc_f(exec_from_cache, worker, f_ref, args...; kwargs...) + catch + put!(pool, worker) + rethrow() + end + + t = Threads.@spawn Threads.threadpool() try + wait(x) + finally + put!(pool, worker) + end + errormonitor(t) + + return x +end diff --git a/test/distributed_exec.jl b/test/distributed_exec.jl index 11cb733..e6338f1 100644 --- a/test/distributed_exec.jl +++ b/test/distributed_exec.jl @@ -722,6 +722,8 @@ end @test nworkers() == length(unique(remotecall_fetch(wp->pmap(_->myid(), wp, 1:100), id_other, wp))) wp = WorkerPool(2:3) @test sort(unique(pmap(_->myid(), wp, 1:100))) == [2,3] + @test fetch(remotecall(myid, wp)) in wp.workers + @test_throws RemoteException fetch(remotecall(error, wp)) # wait on worker pool wp = WorkerPool(2:2) @@ -747,6 +749,8 @@ end # CachingPool tests wp = CachingPool(workers()) @test [1:100...] == pmap(x->x, wp, 1:100) + @test fetch(remotecall(myid, wp)) in wp.workers + @test_throws RemoteException fetch(remotecall(error, wp)) clear!(wp) @test length(wp.map_obj2ref) == 0 @@ -1017,15 +1021,19 @@ f16091b = () -> 1 # Test the behaviour of remotecall(f, ::AbstractWorkerPool), this should # keep the worker out of the pool until the underlying remotecall has # finished. - remotechan = RemoteChannel(wrkr1) - pool = WorkerPool([wrkr1]) - put_future = remotecall(() -> wait(remotechan), pool) - @test !isready(pool) - put!(remotechan, 1) - wait(put_future) - # The task that waits on the future to put it back into the pool runs - # asynchronously so we use timedwait() to check when the worker is back in. - @test timedwait(() -> isready(pool), 10) === :ok + for PoolType in (WorkerPool, CachingPool) + let + remotechan = RemoteChannel(wrkr1) + pool = PoolType([wrkr1]) + put_future = remotecall(() -> wait(remotechan), pool) + @test !isready(pool) + put!(remotechan, 1) + wait(put_future) + # The task that waits on the future to put it back into the pool runs + # asynchronously so we use timedwait() to check when the worker is back in. + @test timedwait(() -> isready(pool), 10) === :ok + end + end # Test calling @everywhere from a module not defined on the workers LocalBar.bar() From ea4ee00029ab3a482a970ce0fdb0337aad212883 Mon Sep 17 00:00:00 2001 From: Ian Butterworth Date: Sun, 26 Jan 2025 19:57:25 -0500 Subject: [PATCH 09/11] add Aqua tests --- Project.toml | 7 ++++++- test/runtests.jl | 6 ++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/Project.toml b/Project.toml index 26447ea..b5d8749 100644 --- a/Project.toml +++ b/Project.toml @@ -8,17 +8,22 @@ Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b" Sockets = "6462fe0b-24de-5631-8697-dd941f90decc" [compat] +Aqua = "0.8" Distributed = "1" +LibSSH = "0.7" +LinearAlgebra = "1" Random = "1" Serialization = "1" Sockets = "1" +Test = "1" julia = "1.9" [extras] +Aqua = "4c88cf16-eb10-579e-8560-4a9242c79595" Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" LibSSH = "00483490-30f8-4353-8aba-35b82f51f4d0" LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e" Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" [targets] -test = ["LinearAlgebra", "Test", "LibSSH", "Distributed"] +test = ["Aqua", "Distributed", "LibSSH", "LinearAlgebra", "Test"] diff --git a/test/runtests.jl b/test/runtests.jl index 5eea288..0e7441d 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,6 +1,8 @@ # This file is a part of Julia. License is MIT: https://julialang.org/license using Test +import DistributedNext +import Aqua # Run the distributed test outside of the main driver since it needs its own # set of dedicated workers. @@ -22,3 +24,7 @@ include("distributed_exec.jl") include("managers.jl") include("distributed_stdlib_detection.jl") + +@testset "Aqua" begin + Aqua.test_all(DistributedNext) +end From 4f836adc47f6e80e2b6f50b9a48eee1bb33c2b7b Mon Sep 17 00:00:00 2001 From: Ian Butterworth Date: Sun, 26 Jan 2025 21:50:49 -0500 Subject: [PATCH 10/11] fix both remotecall_pool remotecall waits to ignore errors --- src/workerpool.jl | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/workerpool.jl b/src/workerpool.jl index 44c588b..b28c726 100644 --- a/src/workerpool.jl +++ b/src/workerpool.jl @@ -149,6 +149,7 @@ function remotecall_pool(rc_f::typeof(remotecall), f, pool::AbstractWorkerPool, t = Threads.@spawn try wait(x) + catch # just wait, ignore errors here finally put!(pool, worker) end @@ -418,6 +419,7 @@ function remotecall_pool(rc_f::typeof(remotecall), f, pool::CachingPool, args... t = Threads.@spawn Threads.threadpool() try wait(x) + catch # just wait, ignore errors here finally put!(pool, worker) end From 29f6eb62449b32ade259a264e6333fb4c6a6c666 Mon Sep 17 00:00:00 2001 From: Ian Butterworth Date: Mon, 27 Jan 2025 07:04:02 -0500 Subject: [PATCH 11/11] fix inverted timedwait logic --- src/cluster.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cluster.jl b/src/cluster.jl index f91d589..1f52db7 100644 --- a/src/cluster.jl +++ b/src/cluster.jl @@ -786,7 +786,7 @@ function check_master_connect() errormonitor( Threads.@spawn begin timeout = worker_timeout() - if timedwait(() -> !haskey(map_pid_wrkr, 1), timeout) === :timed_out + if timedwait(() -> haskey(map_pid_wrkr, 1), timeout) === :timed_out print(stderr, "Master process (id 1) could not connect within $(timeout) seconds.\nexiting.\n") exit(1) end