Skip to content

Commit 160f959

Browse files
committed
Mostly fix fault tolerance
1 parent a71c1f0 commit 160f959

File tree

3 files changed

+61
-73
lines changed

3 files changed

+61
-73
lines changed

src/sch/Sch.jl

+7-1
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ Fields:
5151
- ready::Vector{Thunk} - The list of `Thunk`s that are ready to execute
5252
- cache::Dict{Thunk, Any} - Maps from a finished `Thunk` to it's cached result, often a DRef
5353
- running::Set{Thunk} - The set of currently-running `Thunk`s
54+
- running_on::Dict{Thunk,OSProc} - Map from `Thunk` to the OS process executing it
5455
- thunk_dict::Dict{Int, Any} - Maps from thunk IDs to a `Thunk`
5556
- node_order::Any - Function that returns the order of a thunk
5657
- worker_pressure::Dict{Int,Dict{Type,UInt}} - Cache of worker pressure
@@ -74,6 +75,7 @@ struct ComputeState
7475
ready::Vector{Thunk}
7576
cache::Dict{Thunk, Any}
7677
running::Set{Thunk}
78+
running_on::Dict{Thunk,OSProc}
7779
thunk_dict::Dict{Int, Any}
7880
node_order::Any
7981
worker_pressure::Dict{Int,Dict{Type,UInt}}
@@ -98,6 +100,7 @@ function start_state(deps::Dict, node_order, chan)
98100
Vector{Thunk}(undef, 0),
99101
Dict{Thunk, Any}(),
100102
Set{Thunk}(),
103+
Dict{Thunk,OSProc}(),
101104
Dict{Int, Thunk}(),
102105
node_order,
103106
Dict{Int,Dict{Type,UInt}}(),
@@ -389,7 +392,7 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions())
389392
thunk_failed = false
390393
if res isa Exception
391394
if unwrap_nested_exception(res) isa Union{ProcessExitedException, Base.IOError}
392-
@warn "Worker $(pid) died on thunk $thunk_id, rescheduling work"
395+
@warn "Worker $(pid) died, rescheduling work"
393396

394397
# Remove dead worker from procs list
395398
remove_dead_proc!(ctx, state, gproc)
@@ -640,6 +643,7 @@ function remove_dead_proc!(ctx, state, proc, options=ctx.options)
640643
delete!(state.worker_capacity, proc.pid)
641644
delete!(state.worker_loadavg, proc.pid)
642645
delete!(state.worker_chans, proc.pid)
646+
state.procs_cache_list[] = nothing
643647
end
644648

645649
function pop_with_affinity!(ctx, tasks, proc)
@@ -684,6 +688,7 @@ end
684688

685689
function finish_task!(ctx, state, node, thunk_failed; free=true)
686690
pop!(state.running, node)
691+
delete!(state.running_on, node)
687692
if !thunk_failed
688693
push!(state.finished, node)
689694
else
@@ -759,6 +764,7 @@ function fire_tasks!(ctx, thunks::Vector{<:Tuple}, (gproc, proc), state)
759764
to_send = []
760765
for (thunk, util) in thunks
761766
push!(state.running, thunk)
767+
state.running_on[thunk] = gproc
762768
if thunk.cache && thunk.cache_ref !== nothing
763769
# the result might be already cached
764770
data = unrelease(thunk.cache_ref) # ask worker to keep the data around

src/sch/fault-handler.jl

+48-70
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@
22
handle_fault(...)
33
44
An internal function to handle a worker dying or being killed by the OS.
5-
Attempts to determine which `Thunk`s require rescheduling based on a
6-
"deadlist", and then corrects the scheduler's internal `ComputeState` struct
7-
to recover from the fault.
5+
Attempts to determine which `Thunk`s were running on (or had their results
6+
cached on) the dead worker, and stores them in a "deadlist". It uses this
7+
deadlist to correct the scheduler's internal `ComputeState` struct to recover
8+
from the fault.
89
910
Note: The logic for this functionality is not currently perfectly robust to
1011
all failure modes, and is only really intended as a last-ditch attempt to
@@ -13,100 +14,72 @@ of DAGs, it *may* cause a `KeyError` or other failures in the scheduler due to
1314
the complexity of getting the internal state back to a consistent and proper
1415
state.
1516
"""
16-
function handle_fault(ctx, state, oldproc)
17-
# Find thunks whose results were cached on the dead worker and place them
18-
# on what's called a "deadlist". This structure will direct the recovery
19-
# of the scheduler's state.
17+
function handle_fault(ctx, state, deadproc)
18+
@assert !isempty(procs(ctx)) "No workers left for fault handling!"
19+
2020
deadlist = Thunk[]
21+
22+
# Evict cache entries that were stored on the worker
2123
for t in keys(state.cache)
2224
v = state.cache[t]
23-
if v isa Chunk && v.handle isa DRef && v.handle.owner == oldproc.pid
25+
if v isa Chunk && v.handle isa DRef && v.handle.owner == deadproc.pid
2426
push!(deadlist, t)
25-
# Any inputs to dead cached thunks must be rescheduled
26-
function bfs!(deadlist, t)
27-
for input in t.inputs
28-
istask(input) || continue
29-
!(input in deadlist) && push!(deadlist, input)
30-
bfs!(deadlist, input)
31-
end
32-
end
33-
bfs!(deadlist, t)
27+
pop!(state.cache, t)
3428
end
3529
end
36-
# TODO: Find *all* thunks who were actively running on the dead worker
37-
# TODO: Set thunk.cache to nothing
30+
# Remove thunks that were running on the worker
31+
for t in collect(keys(state.running_on))
32+
pid = state.running_on[t].pid
33+
if pid == deadproc.pid
34+
push!(deadlist, t)
35+
delete!(state.running_on, t)
36+
end
37+
end
38+
# Clear thunk.cache_ref
39+
for t in deadlist
40+
t.cache_ref = nothing
41+
end
3842

39-
# Empty cache of dead thunks
40-
for ct in keys(state.cache)
41-
if ct in deadlist
42-
delete!(state.cache, ct)
43+
# Remove thunks from state.ready that have inputs on the deadlist
44+
for idx in length(state.ready):-1:1
45+
rt = state.ready[idx]
46+
if any((input in deadlist) for input in rt.inputs)
47+
deleteat!(state.ready, idx)
4348
end
4449
end
4550

51+
#=
4652
function fix_waitdicts!(state, deadlist, t::Thunk; isleaf=false)
4753
waiting, waiting_data = state.waiting, state.waiting_data
48-
if !(t in keys(waiting))
49-
waiting[t] = Set{Thunk}()
50-
end
5154
if !isleaf
5255
# If we aren't a leaf thunk, then we may still need to recover
5356
# further into the DAG
57+
if !haskey(waiting, t)
58+
waiting[t] = Set{Thunk}()
59+
end
5460
for input in t.inputs
5561
istask(input) || continue
56-
@assert haskey(waiting, t) "Error: $t not in state.waiting"
57-
push!(waiting[t], input)
62+
will_reschedule = !haskey(state.cache, input)
63+
if will_reschedule
64+
push!(waiting[t], input)
65+
end
5866
push!(waiting_data[input], t)
59-
isleaf = !(input in deadlist)
60-
fix_waitdicts!(state, deadlist, input; isleaf=isleaf)
67+
if will_reschedule
68+
isleaf = !(input in deadlist)
69+
fix_waitdicts!(state, deadlist, input; isleaf=isleaf)
70+
end
6171
end
6272
end
6373
if isempty(waiting[t])
6474
delete!(waiting, t)
6575
end
6676
end
67-
68-
# Add state.waiting deps back to state.waiting
69-
for ot in keys(state.waiting)
70-
fix_waitdicts!(state, deadlist, ot)
71-
end
72-
73-
#fix_waitdicts!(state, deadlist, thunk)
74-
75-
# Remove thunks from state.ready that have inputs on the deadlist
76-
for idx in length(state.ready):-1:1
77-
rt = state.ready[idx]
78-
if any((input in deadlist) for input in rt.inputs)
79-
deleteat!(state.ready, idx)
80-
end
81-
end
82-
83-
# Remove dead thunks from state.running, and add state.running
84-
# deps back to state.waiting
85-
wasrunning = copy(state.running)
86-
empty!(state.running)
87-
while !isempty(wasrunning)
88-
temp = pop!(wasrunning)
89-
if temp isa Thunk
90-
if !(temp in deadlist)
91-
push!(state.running, temp)
92-
end
93-
fix_waitdicts!(state, deadlist, temp)
94-
elseif temp isa Vector
95-
newtemp = []
96-
for t in temp
97-
fix_waitdicts!(state, deadlist, t)
98-
if !(t in deadlist)
99-
push!(newtemp, t)
100-
end
101-
end
102-
isempty(newtemp) || push!(state.running, newtemp)
103-
else
104-
throw("Unexpected type in recovery: $temp")
105-
end
77+
# Fixup state.waiting and state.waiting_data
78+
for t in deadlist
79+
fix_waitdicts!(state, deadlist, t)
10680
end
10781
10882
# Reschedule inputs from deadlist
109-
@assert !isempty(procs(ctx)) "No workers left for fault handling!"
11083
while length(deadlist) > 0
11184
dt = popfirst!(deadlist)
11285
if any((input in deadlist) for input in dt.inputs)
@@ -115,5 +88,10 @@ function handle_fault(ctx, state, oldproc)
11588
end
11689
push!(state.ready, dt)
11790
end
91+
=#
92+
seen = Dict{Thunk,Bool}()
93+
for t in deadlist
94+
reschedule_inputs!(state, t, seen)
95+
end
11896
schedule!(ctx, state)
11997
end

src/sch/util.jl

+6-2
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ unwrap_nested_exception(err::RemoteException) =
1010
unwrap_nested_exception(err) = err
1111

1212
"Prepares the scheduler to schedule `thunk`."
13-
function reschedule_inputs!(state, thunk)
13+
function reschedule_inputs!(state, thunk, seen=Dict{Thunk,Bool}())
14+
haskey(seen, thunk) && return seen[thunk]
1415
w = get!(()->Set{Thunk}(), state.waiting, thunk)
1516
scheduled = false
1617
for input in thunk.inputs
@@ -26,14 +27,17 @@ function reschedule_inputs!(state, thunk)
2627
haskey(state.cache, input) && continue
2728
if (input in state.running) ||
2829
(input in state.ready) ||
29-
reschedule_inputs!(state, input)
30+
reschedule_inputs!(state, input, seen)
3031
push!(w, input)
3132
scheduled = true
33+
else
34+
error("Failed to reschedule $(input.id) for $(thunk.id)")
3235
end
3336
end
3437
if isempty(w) && !(thunk in state.errored)
3538
# Inputs are ready
3639
push!(state.ready, thunk)
40+
delete!(state.waiting, thunk)
3741
return true
3842
else
3943
return scheduled

0 commit comments

Comments
 (0)