Skip to content

Scheduler: Use a "scheduler" task for thread sleep #57544

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Mar 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 37 additions & 13 deletions base/task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1145,6 +1145,16 @@ function throwto(t::Task, @nospecialize exc)
return try_yieldto(identity)
end

@inline function wait_forever()
while true
wait()
end
end

const get_sched_task = OncePerThread{Task}() do
@task wait_forever()
end

function ensure_rescheduled(othertask::Task)
ct = current_task()
W = workqueue_for(Threads.threadid())
Expand Down Expand Up @@ -1181,25 +1191,39 @@ end

checktaskempty = Partr.multiq_check_empty

@noinline function poptask(W::StickyWorkqueue)
task = trypoptask(W)
if !(task isa Task)
task = ccall(:jl_task_get_next, Ref{Task}, (Any, Any, Any), trypoptask, W, checktaskempty)
end
set_next_task(task)
nothing
end

function wait()
ct = current_task()
# [task] user_time -yield-or-done-> wait_time
record_running_time!(ct)
# let GC run
GC.safepoint()
W = workqueue_for(Threads.threadid())
poptask(W)
result = try_yieldto(ensure_rescheduled)
# check for libuv events
process_events()
# return when we come out of the queue

# get the next task to run
result = nothing
have_result = false
W = workqueue_for(Threads.threadid())
task = trypoptask(W)
if !(task isa Task)
# No tasks to run; switch to the scheduler task to run the
# thread sleep logic.
sched_task = get_sched_task()
if ct !== sched_task
result = yieldto(sched_task)
have_result = true
else
task = ccall(:jl_task_get_next, Ref{Task}, (Any, Any, Any),
trypoptask, W, checktaskempty)
end
end
# We may have already switched tasks (via the scheduler task), so
# only switch if we haven't.
if !have_result
@assert task isa Task
Copy link
Contributor

@nsajko nsajko Apr 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any specific reason this doesn't just typeassert? Just curious.

Suggested change
@assert task isa Task
task = task::Task

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No specific reason. Seems like it should be a typeassert, actually.

set_next_task(task)
result = try_yieldto(ensure_rescheduled)
end
return result
end

Expand Down
4 changes: 1 addition & 3 deletions stdlib/Sockets/test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -547,14 +547,12 @@ end
fetch(r)
end

let addr = Sockets.InetAddr(ip"127.0.0.1", 4444)
srv = listen(addr)
let addr = Sockets.InetAddr(ip"192.0.2.5", 4444)
s = Sockets.TCPSocket()
Sockets.connect!(s, addr)
r = @async close(s)
@test_throws Base._UVError("connect", Base.UV_ECANCELED) Sockets.wait_connected(s)
fetch(r)
close(srv)
end
end

Expand Down
5 changes: 2 additions & 3 deletions test/channels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -463,15 +463,14 @@ end
cb = first(async.cond.waitq)
@test isopen(async)
ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async)
ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async)
@test isempty(Base.Workqueue)
Base.process_events() # schedule event
Sys.iswindows() && Base.process_events() # schedule event (windows?)
@test length(Base.Workqueue) == 1
ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async)
@test tc[] == 0
yield() # consume event
@test tc[] == 1
ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async)
Base.process_events()
Sys.iswindows() && Base.process_events() # schedule event (windows?)
yield() # consume event
@test tc[] == 2
Expand Down