Skip to content

Commit 7ff1175

Browse files
authored
Merge pull request #218 from JuliaParallel/jps/simple-osproc
Simplify OSProc
2 parents 9c01b65 + c986f6c commit 7ff1175

File tree

6 files changed

+35
-72
lines changed

6 files changed

+35
-72
lines changed

src/Dagger.jl

+2-2
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,10 @@ function __init__()
6565
end
6666
@static if VERSION >= v"1.3.0-DEV.573"
6767
for tid in 1:Threads.nthreads()
68-
push!(PROCESSOR_CALLBACKS, proc->ThreadProc(myid(), tid))
68+
push!(PROCESSOR_CALLBACKS, ()->ThreadProc(myid(), tid))
6969
end
7070
else
71-
push!(PROCESSOR_CALLBACKS, proc->ThreadProc(myid(), 1))
71+
push!(PROCESSOR_CALLBACKS, ()->ThreadProc(myid(), 1))
7272
end
7373
end
7474

src/processor.jl

+24-60
Original file line numberDiff line numberDiff line change
@@ -87,88 +87,52 @@ capacity(proc, ::Type{T}) where T =
8787
"""
8888
OSProc <: Processor
8989
90-
Julia CPU (OS) process, identified by Distributed pid. Executes thunks when
91-
threads or other "children" processors are not available, and/or the user has
92-
not opted to use those processors.
90+
Julia CPU (OS) process, identified by Distributed pid. The logical parent of
91+
all processors on a given node, but otherwise does not participate in
92+
computations.
9393
"""
9494
struct OSProc <: Processor
9595
pid::Int
96-
attrs::Dict{Symbol,Any}
97-
children::Vector{Processor}
98-
queue::Vector{Processor}
99-
end
100-
const OSPROC_CACHE = Dict{Int,OSProc}()
101-
OSProc(pid::Int=myid()) = get!(OSPROC_CACHE, pid) do
102-
remotecall_fetch(get_osproc, pid, pid)
96+
function OSProc(pid::Int=myid())
97+
get!(OSPROC_CACHE, pid) do
98+
remotecall_fetch(get_proc_hierarchy, pid)
99+
end
100+
new(pid)
101+
end
103102
end
104-
function get_osproc(pid::Int)
105-
proc = OSProc(pid, Dict{Symbol,Any}(), Processor[], Processor[])
103+
const OSPROC_CACHE = Dict{Int,Vector{Processor}}()
104+
children(proc::OSProc) = OSPROC_CACHE[proc.pid]
105+
function get_proc_hierarchy()
106+
children = Processor[]
106107
for cb in PROCESSOR_CALLBACKS
107108
try
108-
child = Base.invokelatest(cb, proc)
109-
child !== nothing && push!(proc.children, child)
109+
child = Base.invokelatest(cb)
110+
if (child isa Tuple) || (child isa Vector)
111+
append!(children, child)
112+
elseif child !== nothing
113+
push!(children, child)
114+
end
110115
catch err
111116
@error "Error in processor callback" exception=(err,catch_backtrace())
112117
end
113118
end
114-
proc
119+
children
115120
end
116121
function add_callback!(func)
117122
push!(Dagger.PROCESSOR_CALLBACKS, func)
118123
empty!(OSPROC_CACHE)
119124
end
120125
Base.:(==)(proc1::OSProc, proc2::OSProc) = proc1.pid == proc2.pid
121126
iscompatible(proc::OSProc, opts, f, args...) =
122-
any(child->iscompatible(child, opts, f, args...), proc.children)
127+
any(child->iscompatible(child, opts, f, args...), children(proc))
123128
iscompatible_func(proc::OSProc, opts, f) =
124-
any(child->iscompatible_func(child, opts, f), proc.children)
129+
any(child->iscompatible_func(child, opts, f), children(proc))
125130
iscompatible_arg(proc::OSProc, opts, args...) =
126131
any(child->
127132
all(arg->iscompatible_arg(child, opts, arg), args),
128-
proc.children)
133+
children(proc))
129134
get_processors(proc::OSProc) =
130-
vcat((get_processors(child) for child in proc.children)...,)
131-
function choose_processor(options, f, Targs)
132-
osproc = OSProc()
133-
if isempty(osproc.queue)
134-
for child in osproc.children
135-
grandchildren = get_processors(child)
136-
append!(osproc.queue, grandchildren)
137-
end
138-
end
139-
@assert !isempty(osproc.queue)
140-
for i in 1:length(osproc.queue)
141-
proc = popfirst!(osproc.queue)
142-
push!(osproc.queue, proc)
143-
if !iscompatible(proc, options, f, Targs...)
144-
continue
145-
end
146-
if options.proclist === nothing
147-
default_enabled(proc) && return proc
148-
elseif options.proclist isa Function
149-
options.proclist(proc) && return proc
150-
elseif any(p->proc isa p, options.proclist)
151-
return proc
152-
end
153-
end
154-
throw(ProcessorSelectionException(options.proclist, osproc.queue, f, Targs))
155-
end
156-
struct ProcessorSelectionException <: Exception
157-
proclist
158-
procsavail::Vector{Processor}
159-
f
160-
args
161-
end
162-
function Base.show(io::IO, pex::ProcessorSelectionException)
163-
println(io, "(Worker $(myid())) Exhausted all available processor types!")
164-
println(io, " Proclist: $(pex.proclist)")
165-
println(io, " Procs Available: $(pex.procsavail)")
166-
println(io, " Function: $(pex.f)")
167-
print(io, " Arguments: $(pex.args)")
168-
end
169-
170-
execute!(proc::OSProc, f, args...) = f(args...)
171-
default_enabled(proc::OSProc) = true
135+
vcat((get_processors(child) for child in children(proc))...)
172136

173137
"""
174138
ThreadProc <: Processor

src/sch/Sch.jl

+1-1
Original file line numberDiff line numberDiff line change
@@ -649,7 +649,7 @@ end
649649
# Under-subscribed, calculate extra utilization and execute thunk
650650
@debug "($(myid())) Using available $to_proc: $extra_util | $(real_util[])/$cap"
651651
extra_util = if extra_util isa MaxUtilization
652-
count(c->typeof(c)===typeof(to_proc), from_proc.children)
652+
count(c->typeof(c)===typeof(to_proc), children(from_proc))
653653
else
654654
extra_util
655655
end

test/processors.jl

+5-6
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ end
3232
end
3333
end
3434
@testset "Opt-in/Opt-out" begin
35-
@test Dagger.default_enabled(OSProc()) == true
3635
@test Dagger.default_enabled(ThreadProc(1,1)) == true
3736
@test Dagger.default_enabled(OptOutProc()) == false
3837
end
@@ -54,12 +53,12 @@ end
5453
end
5554
@testset "Add callback in same world" begin
5655
function addcb()
57-
@everywhere Dagger.add_callback!(cb)
58-
cb = eval(Dagger, :(proc->PathProc(myid())))
59-
opts = ThunkOptions(proclist=[PathProc])
60-
collect(delayed(identity; options=opts)(1.0))
61-
@everywhere pop!(Dagger.PROCESSOR_CALLBACKS)
56+
cb = @eval ()->FakeProc(myid())
57+
@everywhere Dagger.add_callback!($cb)
58+
@test any(x->x isa FakeProc, Dagger.children(OSProc()))
59+
@everywhere pop!(Dagger.PROCESSOR_CALLBACKS); empty!(Dagger.OSPROC_CACHE)
6260
end
61+
addcb()
6362
end
6463

6564
@testset "Modify workers in Context" begin

test/runtests.jl

+2-2
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,12 @@ include("util.jl")
88
include("fakeproc.jl")
99

1010
include("thunk.jl")
11-
include("domain.jl")
12-
include("array.jl")
1311
include("scheduler.jl")
1412
include("processors.jl")
1513
include("ui.jl")
1614
include("checkpoint.jl")
15+
include("domain.jl")
16+
include("array.jl")
1717
try # TODO: Fault tolerance is sometimes unreliable
1818
include("fault-tolerance.jl")
1919
catch

test/scheduler.jl

+1-1
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ end
150150
@test collect(Context(), a) == 1
151151
end
152152
end
153-
@everywhere Dagger.add_callback!(proc->FakeProc())
153+
@everywhere Dagger.add_callback!(()->FakeProc())
154154
@testset "proclist FakeProc" begin
155155
@test Dagger.iscompatible_arg(FakeProc(), nothing, Int) == true
156156
@test Dagger.iscompatible_arg(FakeProc(), nothing, FakeVal) == true

0 commit comments

Comments
 (0)