Skip to content

Commit 5a53b50

Browse files
committed
Cache chunks per-process
1 parent 9b90bce commit 5a53b50

File tree

2 files changed

+16
-1
lines changed

2 files changed

+16
-1
lines changed

src/chunks.jl

+3
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ function unrelease(c::Chunk{<:Any,DRef})
6363
end
6464
unrelease(c::Chunk) = c
6565

66+
Base.:(==)(c1::Chunk, c2::Chunk) = c1.handle == c2.handle
67+
Base.hash(c::Chunk, x::UInt64) = hash(c.handle, x)
68+
6669
collect_remote(chunk::Chunk, network) =
6770
move(chunk.processor, OSProc(), poolget(chunk.handle; network=network); network=network)
6871
function collect(ctx::Context, chunk::Chunk; options=nothing)

src/sch/Sch.jl

+13-1
Original file line numberDiff line numberDiff line change
@@ -441,6 +441,8 @@ function Base.show(io::IO, se::SchedulingException)
441441
print(io, "SchedulingException ($(se.reason))")
442442
end
443443

444+
const CHUNK_CACHE = Dict{Chunk,Any}()
445+
444446
function schedule!(ctx, state, procs=procs_to_use(ctx))
445447
lock(state.lock) do
446448
safepoint(state)
@@ -787,7 +789,17 @@ function do_task(to_proc, extra_util, thunk_id, f, data, send_result, persist, c
787789
fetch_report.(map(Iterators.zip(data,ids)) do (x, id)
788790
@async begin
789791
@dbg timespan_start(ctx, :move, (thunk_id, id), (f, id))
790-
x = move(to_proc, x; network=network)
792+
x = if x isa Chunk
793+
if haskey(CHUNK_CACHE, x)
794+
CHUNK_CACHE[x]
795+
else
796+
_x = move(to_proc, x; network=network)
797+
CHUNK_CACHE[x] = _x
798+
_x
799+
end
800+
else
801+
move(to_proc, x; network=network)
802+
end
791803
@dbg timespan_end(ctx, :move, (thunk_id, id), (f, id))
792804
return x
793805
end

0 commit comments

Comments
 (0)