Skip to content

Commit 0d4d6d9

Browse files
authored
Scheduler: Use a "scheduler" task for thread sleep (#57544)
A Julia thread runs Julia's scheduler in the context of the switching task. If no task is found to switch to, the thread will sleep while holding onto the (possibly completed) task, preventing the task from being garbage collected. This recent [Discourse post](https://discourse.julialang.org/t/weird-behaviour-of-gc-with-multithreaded-array-access/125433) illustrates precisely this problem. A solution to this would be for an idle Julia thread to switch to a "scheduler" task, thereby freeing the old task. This PR uses `OncePerThread` to create a "scheduler" task (that does nothing but run `wait()` in a loop) and switches to that task when the thread finds itself idle. Other approaches considered and discarded in favor of this one: #57465 and #57543.
1 parent ed3fccc commit 0d4d6d9

File tree

3 files changed

+40
-19
lines changed

3 files changed

+40
-19
lines changed

base/task.jl

+37-13
Original file line numberDiff line numberDiff line change
@@ -1145,6 +1145,16 @@ function throwto(t::Task, @nospecialize exc)
11451145
return try_yieldto(identity)
11461146
end
11471147

1148+
@inline function wait_forever()
1149+
while true
1150+
wait()
1151+
end
1152+
end
1153+
1154+
const get_sched_task = OncePerThread{Task}() do
1155+
@task wait_forever()
1156+
end
1157+
11481158
function ensure_rescheduled(othertask::Task)
11491159
ct = current_task()
11501160
W = workqueue_for(Threads.threadid())
@@ -1181,25 +1191,39 @@ end
11811191

11821192
checktaskempty = Partr.multiq_check_empty
11831193

1184-
@noinline function poptask(W::StickyWorkqueue)
1185-
task = trypoptask(W)
1186-
if !(task isa Task)
1187-
task = ccall(:jl_task_get_next, Ref{Task}, (Any, Any, Any), trypoptask, W, checktaskempty)
1188-
end
1189-
set_next_task(task)
1190-
nothing
1191-
end
1192-
11931194
function wait()
11941195
ct = current_task()
11951196
# [task] user_time -yield-or-done-> wait_time
11961197
record_running_time!(ct)
1198+
# let GC run
11971199
GC.safepoint()
1198-
W = workqueue_for(Threads.threadid())
1199-
poptask(W)
1200-
result = try_yieldto(ensure_rescheduled)
1200+
# check for libuv events
12011201
process_events()
1202-
# return when we come out of the queue
1202+
1203+
# get the next task to run
1204+
result = nothing
1205+
have_result = false
1206+
W = workqueue_for(Threads.threadid())
1207+
task = trypoptask(W)
1208+
if !(task isa Task)
1209+
# No tasks to run; switch to the scheduler task to run the
1210+
# thread sleep logic.
1211+
sched_task = get_sched_task()
1212+
if ct !== sched_task
1213+
result = yieldto(sched_task)
1214+
have_result = true
1215+
else
1216+
task = ccall(:jl_task_get_next, Ref{Task}, (Any, Any, Any),
1217+
trypoptask, W, checktaskempty)
1218+
end
1219+
end
1220+
# We may have already switched tasks (via the scheduler task), so
1221+
# only switch if we haven't.
1222+
if !have_result
1223+
@assert task isa Task
1224+
set_next_task(task)
1225+
result = try_yieldto(ensure_rescheduled)
1226+
end
12031227
return result
12041228
end
12051229

stdlib/Sockets/test/runtests.jl

+1-3
Original file line numberDiff line numberDiff line change
@@ -547,14 +547,12 @@ end
547547
fetch(r)
548548
end
549549

550-
let addr = Sockets.InetAddr(ip"127.0.0.1", 4444)
551-
srv = listen(addr)
550+
let addr = Sockets.InetAddr(ip"192.0.2.5", 4444)
552551
s = Sockets.TCPSocket()
553552
Sockets.connect!(s, addr)
554553
r = @async close(s)
555554
@test_throws Base._UVError("connect", Base.UV_ECANCELED) Sockets.wait_connected(s)
556555
fetch(r)
557-
close(srv)
558556
end
559557
end
560558

test/channels.jl

+2-3
Original file line numberDiff line numberDiff line change
@@ -463,15 +463,14 @@ end
463463
cb = first(async.cond.waitq)
464464
@test isopen(async)
465465
ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async)
466-
ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async)
467-
@test isempty(Base.Workqueue)
468466
Base.process_events() # schedule event
469467
Sys.iswindows() && Base.process_events() # schedule event (windows?)
470-
@test length(Base.Workqueue) == 1
471468
ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async)
472469
@test tc[] == 0
473470
yield() # consume event
474471
@test tc[] == 1
472+
ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async)
473+
Base.process_events()
475474
Sys.iswindows() && Base.process_events() # schedule event (windows?)
476475
yield() # consume event
477476
@test tc[] == 2

0 commit comments

Comments
 (0)