Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sch: Use W1T1 if profitable #561

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions src/sch/Sch.jl
Original file line number Diff line number Diff line change
Expand Up @@ -757,14 +757,16 @@ function schedule!(ctx, state, procs=procs_to_use(ctx))

inputs = map(last, collect_task_inputs(state, task))
opts = populate_defaults(opts, chunktype(task.f), map(chunktype, inputs))
local_procs, costs = estimate_task_costs(state, local_procs, task, inputs)
local_procs, costs = estimate_task_costs(state, local_procs, task, inputs; sig)
scheduled = false

# Move our corresponding ThreadProc to be the last considered
# Move our corresponding ThreadProc to be the last considered,
# if the task is expected to run for longer than the time it takes to
# schedule it onto another worker (estimated at 1ms).
if length(local_procs) > 1
sch_threadproc = Dagger.ThreadProc(myid(), Threads.threadid())
sch_thread_idx = findfirst(proc->proc==sch_threadproc, local_procs)
if sch_thread_idx !== nothing
if sch_thread_idx !== nothing && costs[sch_threadproc] > 1_000_000 # 1ms
deleteat!(local_procs, sch_thread_idx)
push!(local_procs, sch_threadproc)
end
Expand All @@ -786,7 +788,7 @@ function schedule!(ctx, state, procs=procs_to_use(ctx))
state.worker_time_pressure[gproc.pid][proc] =
get(state.worker_time_pressure[gproc.pid], proc, 0) +
est_time_util
@dagdebug task :schedule "Scheduling to $gproc -> $proc"
@dagdebug task :schedule "Scheduling to $gproc -> $proc (cost: $(costs[proc]))"
@goto pop_task
end
end
Expand Down
22 changes: 18 additions & 4 deletions src/sch/util.jl
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ current estimated per-processor compute pressure, and transfer costs for each
`Chunk` argument to `task`. Returns `(procs, costs)`, with `procs` sorted in
order of ascending cost.
"""
function estimate_task_costs(state, procs, task, inputs)
function estimate_task_costs(state, procs, task, inputs; sig=nothing)
tx_rate = state.transfer_rate[]

# Find all Chunks
Expand All @@ -473,9 +473,17 @@ function estimate_task_costs(state, procs, task, inputs)
end
end

# Estimate the cost of executing the task itself
if sig === nothing
sig = signature(task.f, inputs)
end
est_time_util = get(state.signature_time_cost, sig, 1000^3)

# Estimate total cost for executing this task on each candidate processor
costs = Dict{Processor,Float64}()
for proc in procs
chunks_filt = Iterators.filter(c->get_parent(processor(c))!=get_parent(proc), chunks)
gproc = get_parent(proc)
chunks_filt = Iterators.filter(c->get_parent(processor(c))!=gproc, chunks)

# Estimate network transfer costs based on data size
# N.B. `affinity(x)` really means "data size of `x`"
Expand All @@ -485,8 +493,14 @@ function estimate_task_costs(state, procs, task, inputs)
tx_cost = impute_sum(affinity(chunk)[2] for chunk in chunks_filt)

# Estimate total cost to move data and get task running after currently-scheduled tasks
est_time_util = get(state.worker_time_pressure[get_parent(proc).pid], proc, 0)
costs[proc] = est_time_util + (tx_cost/tx_rate)
est_business = get(state.worker_time_pressure[get_parent(proc).pid], proc, 0)

# Add fixed cost for cross-worker task transfer (esimated at 1ms)
# TODO: Actually estimate/benchmark this
task_xfer_cost = gproc.pid != myid() ? 1_000_000 : 0 # 1ms

# Compute final cost
costs[proc] = est_time_util + est_business + (tx_cost/tx_rate) + task_xfer_cost
end

# Shuffle procs around, so equally-costly procs are equally considered
Expand Down