Skip to content

Commit 18b7406

Browse files
committed
Evict Chunks during finishing
1 parent 988b612 commit 18b7406

File tree

3 files changed

+46
-23
lines changed

3 files changed

+46
-23
lines changed

src/compute.jl

+10-8
Original file line numberDiff line numberDiff line change
@@ -82,12 +82,12 @@ end
8282
##### Dag utilities #####
8383

8484
"""
85-
dependents(node::Thunk, deps=Dict{Thunk, Set{Thunk}}()) -> Dict{Thunk, Set{Thunk}}
85+
dependents(node::Thunk) -> Dict{Union{Thunk,Chunk}, Set{Thunk}}
8686
8787
Find the set of direct dependents for each task.
8888
"""
8989
function dependents(node::Thunk)
90-
deps = Dict{Thunk, Set{Thunk}}()
90+
deps = Dict{Union{Thunk,Chunk}, Set{Thunk}}()
9191
visited = Set{Thunk}()
9292
to_visit = Set{Thunk}()
9393
push!(to_visit, node)
@@ -98,10 +98,12 @@ function dependents(node::Thunk)
9898
deps[next] = Set{Thunk}()
9999
end
100100
for inp in inputs(next)
101-
if inp isa Thunk
102-
s::Set{Thunk} = get!(()->Set{Thunk}(), deps, inp)
101+
if istask(inp) || (inp isa Chunk)
102+
s = get!(()->Set{Thunk}(), deps, inp)
103103
push!(s, next)
104-
!(inp in visited) && push!(to_visit, inp)
104+
if istask(inp) && !(inp in visited)
105+
push!(to_visit, inp)
106+
end
105107
end
106108
end
107109
push!(visited, next)
@@ -110,14 +112,14 @@ function dependents(node::Thunk)
110112
end
111113

112114
"""
113-
noffspring(dpents::Dict{Thunk, Set{Thunk}}) -> Dict{Thunk, Int}
115+
noffspring(dpents::Dict{Union{Thunk,Chunk}, Set{Thunk}}) -> Dict{Thunk, Int}
114116
115117
Recursively find the number of tasks dependent on each task in the DAG.
116118
Takes a Dict as returned by [`dependents`](@ref).
117119
"""
118-
function noffspring(dpents::Dict{Thunk, Set{Thunk}})
120+
function noffspring(dpents::Dict{Union{Thunk,Chunk}, Set{Thunk}})
119121
noff = Dict{Thunk,Int}()
120-
to_visit = collect(keys(dpents))
122+
to_visit = collect(filter(istask, keys(dpents)))
121123
while !isempty(to_visit)
122124
next = popfirst!(to_visit)
123125
haskey(noff, next) && continue

src/sch/Sch.jl

+32-13
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,10 @@ The internal state-holding struct of the scheduler.
4444
4545
Fields:
4646
- uid::UInt64 - Unique identifier for this scheduler instance
47-
- dependents::OneToMany - The result of calling `dependents` on the DAG
47+
- dependents::Dict{Union{Thunk,Chunk},Set{Thunk}} - The result of calling `dependents` on the DAG
4848
- finished::Set{Thunk} - The set of completed `Thunk`s
4949
- waiting::OneToMany - Map from downstream `Thunk` to upstream `Thunk`s that still need to execute
50-
- waiting_data::OneToMany - Map from upstream `Thunk` to all downstream `Thunk`s, accumulating over time
50+
- waiting_data::Dict{Union{Thunk,Chunk},Set{Thunk}} - Map from input `Chunk`/upstream `Thunk` to all unfinished downstream `Thunk`s, to retain caches
5151
- ready::Vector{Thunk} - The list of `Thunk`s that are ready to execute
5252
- cache::Dict{Thunk, Any} - Maps from a finished `Thunk` to it's cached result, often a DRef
5353
- running::Set{Thunk} - The set of currently-running `Thunk`s
@@ -67,10 +67,10 @@ Fields:
6767
"""
6868
struct ComputeState
6969
uid::UInt64
70-
dependents::OneToMany
70+
dependents::Dict{Union{Thunk,Chunk},Set{Thunk}}
7171
finished::Set{Thunk}
7272
waiting::OneToMany
73-
waiting_data::OneToMany
73+
waiting_data::Dict{Union{Thunk,Chunk},Set{Thunk}}
7474
ready::Vector{Thunk}
7575
cache::Dict{Thunk, Any}
7676
running::Set{Thunk}
@@ -94,7 +94,7 @@ function start_state(deps::Dict, node_order, chan)
9494
deps,
9595
Set{Thunk}(),
9696
OneToMany(),
97-
OneToMany(),
97+
Dict{Union{Thunk,Chunk},Set{Thunk}}(),
9898
Vector{Thunk}(undef, 0),
9999
Dict{Thunk, Any}(),
100100
Set{Thunk}(),
@@ -300,7 +300,7 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions())
300300
state = start_state(deps, node_order, chan)
301301

302302
# setup thunk_dict mappings
303-
for node in keys(deps)
303+
for node in filter(istask, keys(deps))
304304
state.thunk_dict[node.id] = node
305305
for dep in deps[node]
306306
state.thunk_dict[dep.id] = dep
@@ -395,7 +395,7 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions())
395395
# FIXME: Move log start and lock to before error check
396396
@dbg timespan_start(ctx, :finish, thunk_id, master)
397397
lock(state.lock) do
398-
finish_task!(state, node, thunk_failed)
398+
finish_task!(ctx, state, node, thunk_failed)
399399
end
400400
@dbg timespan_end(ctx, :finish, thunk_id, master)
401401

@@ -651,7 +651,7 @@ function pop_with_affinity!(ctx, tasks, proc)
651651
return nothing
652652
end
653653

654-
function finish_task!(state, node, thunk_failed; free=true)
654+
function finish_task!(ctx, state, node, thunk_failed; free=true)
655655
pop!(state.running, node)
656656
if !thunk_failed
657657
push!(state.finished, node)
@@ -683,22 +683,41 @@ function finish_task!(state, node, thunk_failed; free=true)
683683
delete!(state.futures, node)
684684
end
685685
end
686-
# Internal clean-up
687-
for inp in filter(istask, inputs(node))
686+
687+
# Chunk clean-up
688+
to_evict = Set{Chunk}()
689+
for inp in filter(t->istask(t) || (t isa Chunk), inputs(node))
688690
if inp in keys(state.waiting_data)
689691
s = state.waiting_data[inp]
690692
if node in s
691693
pop!(s, node)
692694
end
693695
if free && isempty(s)
694-
if haskey(state.cache, inp)
696+
if istask(inp) && haskey(state.cache, inp)
695697
_node = state.cache[inp]
698+
if _node isa Chunk
699+
push!(to_evict, _node)
700+
end
696701
free!(_node, force=false, cache=(istask(inp) && inp.cache))
697702
pop!(state.cache, inp)
703+
elseif inp isa Chunk
704+
push!(to_evict, inp)
698705
end
699706
end
700707
end
701708
end
709+
if !isempty(to_evict)
710+
@sync for w in map(p->p.pid, procs_to_use(ctx))
711+
@async remote_do(evict_chunks!, w, to_evict)
712+
end
713+
end
714+
end
715+
716+
function evict_chunks!(chunks::Set{Chunk})
717+
for chunk in chunks
718+
haskey(CHUNK_CACHE, chunk) && delete!(CHUNK_CACHE, chunk)
719+
end
720+
nothing
702721
end
703722

704723
fire_task!(ctx, thunk::Thunk, p, state; util=10^9) =
@@ -717,7 +736,7 @@ function fire_tasks!(ctx, thunks::Vector{<:Tuple}, (gproc, proc), state)
717736
# cache hit
718737
state.cache[thunk] = data
719738
thunk_failed = thunk in state.errored
720-
finish_task!(state, thunk, thunk_failed; free=false)
739+
finish_task!(ctx, state, thunk, thunk_failed; free=false)
721740
continue
722741
else
723742
# cache miss
@@ -728,7 +747,7 @@ function fire_tasks!(ctx, thunks::Vector{<:Tuple}, (gproc, proc), state)
728747
try
729748
result = thunk.options.restore(thunk)
730749
state.cache[thunk] = result
731-
finish_task!(state, thunk, false; free=false)
750+
finish_task!(ctx, state, thunk, false; free=false)
732751
continue
733752
catch err
734753
@error "Thunk restore failed" exception=(err,catch_backtrace())

src/sch/util.jl

+4-2
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,11 @@ function reschedule_inputs!(state, thunk)
1414
w = get!(()->Set{Thunk}(), state.waiting, thunk)
1515
scheduled = false
1616
for input in thunk.inputs
17+
if istask(input) || (input isa Chunk)
18+
push!(get!(()->Set{Thunk}(), state.waiting_data, input), thunk)
19+
push!(get!(()->Set{Thunk}(), state.dependents, input), thunk)
20+
end
1721
istask(input) || continue
18-
push!(get!(()->Set{Thunk}(), state.waiting_data, input), thunk)
19-
push!(get!(()->Set{Thunk}(), state.dependents, input), thunk)
2022
if input in state.errored
2123
set_failed!(state, input, thunk)
2224
break # TODO: Allow collecting all error'd inputs

0 commit comments

Comments
 (0)