From ba70cec503eb89abb07f3721ba96101ddef4f104 Mon Sep 17 00:00:00 2001 From: K Pamnany Date: Wed, 26 Feb 2025 17:04:00 -0500 Subject: [PATCH 1/6] Use a scheduler task for thread sleep --- base/task.jl | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/base/task.jl b/base/task.jl index 951e980ee903c..2629b1c1fe84f 100644 --- a/base/task.jl +++ b/base/task.jl @@ -1145,6 +1145,16 @@ function throwto(t::Task, @nospecialize exc) return try_yieldto(identity) end +@inline function wait_forever() + while true + wait() + end +end + +const get_sched_task = OncePerThread{Task}() do + @task wait_forever() +end + function ensure_rescheduled(othertask::Task) ct = current_task() W = workqueue_for(Threads.threadid()) @@ -1184,7 +1194,17 @@ checktaskempty = Partr.multiq_check_empty @noinline function poptask(W::StickyWorkqueue) task = trypoptask(W) if !(task isa Task) - task = ccall(:jl_task_get_next, Ref{Task}, (Any, Any, Any), trypoptask, W, checktaskempty) + process_events() + task = trypoptask(W) + end + if !(task isa Task) + sched_task = get_sched_task() + if current_task() === sched_task + task = ccall(:jl_task_get_next, Ref{Task}, (Any, Any, Any), trypoptask, W, checktaskempty) + else + yieldto(sched_task) + task = current_task() + end end set_next_task(task) nothing From 8de180616f0809eb98762c22be73ba02765c4d70 Mon Sep 17 00:00:00 2001 From: K Pamnany Date: Fri, 28 Feb 2025 12:47:26 -0500 Subject: [PATCH 2/6] Rearrange the code to make things cleaner --- base/task.jl | 47 ++++++++++++++++++++++++++++------------------- 1 file changed, 28 insertions(+), 19 deletions(-) diff --git a/base/task.jl b/base/task.jl index 2629b1c1fe84f..a9f4ed2333494 100644 --- a/base/task.jl +++ b/base/task.jl @@ -1191,35 +1191,44 @@ end checktaskempty = Partr.multiq_check_empty -@noinline function poptask(W::StickyWorkqueue) +function wait() + ct = current_task() + # [task] user_time -yield-or-done-> wait_time + record_running_time!(ct) + # let GC run + GC.safepoint() + # check for libuv events + process_events() + + # get the next task to run + result = nothing + have_result = false + W = workqueue_for(Threads.threadid()) task = trypoptask(W) if !(task isa Task) + # didn't find a task to run, try again process_events() task = trypoptask(W) end if !(task isa Task) + # No tasks to run; switch to the scheduler task to run the + # thread sleep logic. sched_task = get_sched_task() - if current_task() === sched_task - task = ccall(:jl_task_get_next, Ref{Task}, (Any, Any, Any), trypoptask, W, checktaskempty) + if ct !== sched_task + result = yieldto(sched_task) + have_result = true else - yieldto(sched_task) - task = current_task() + task = ccall(:jl_task_get_next, Ref{Task}, (Any, Any, Any), + trypoptask, W, checktaskempty) end end - set_next_task(task) - nothing -end - -function wait() - ct = current_task() - # [task] user_time -yield-or-done-> wait_time - record_running_time!(ct) - GC.safepoint() - W = workqueue_for(Threads.threadid()) - poptask(W) - result = try_yieldto(ensure_rescheduled) - process_events() - # return when we come out of the queue + # We may have already switched tasks (via the scheduler task), so + # only switch if we haven't. + if !have_result + @assert task isa Task + set_next_task(task) + result = try_yieldto(ensure_rescheduled) + end return result end From 3bcde1bc4dfc67be95b835bc7ee32e2509a2ee9b Mon Sep 17 00:00:00 2001 From: K Pamnany Date: Mon, 24 Mar 2025 19:15:12 +0000 Subject: [PATCH 3/6] Drop extra process_events call --- base/task.jl | 5 ----- 1 file changed, 5 deletions(-) diff --git a/base/task.jl b/base/task.jl index a9f4ed2333494..8cfb959536b2a 100644 --- a/base/task.jl +++ b/base/task.jl @@ -1205,11 +1205,6 @@ function wait() have_result = false W = workqueue_for(Threads.threadid()) task = trypoptask(W) - if !(task isa Task) - # didn't find a task to run, try again - process_events() - task = trypoptask(W) - end if !(task isa Task) # No tasks to run; switch to the scheduler task to run the # thread sleep logic. From 97d5a37a292cacf5fedc57d80e95fefb72a4790e Mon Sep 17 00:00:00 2001 From: K Pamnany Date: Mon, 24 Mar 2025 20:43:43 +0000 Subject: [PATCH 4/6] Fix channels tests This small group of tests is written with assumptions about when and how the libuv event loop is run. As this PR changes this behavior, the tests needed adjusting. --- test/channels.jl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/channels.jl b/test/channels.jl index f646b41cfa1a0..708455eac610d 100644 --- a/test/channels.jl +++ b/test/channels.jl @@ -463,7 +463,6 @@ end cb = first(async.cond.waitq) @test isopen(async) ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async) - ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async) @test isempty(Base.Workqueue) Base.process_events() # schedule event Sys.iswindows() && Base.process_events() # schedule event (windows?) @@ -472,6 +471,8 @@ end @test tc[] == 0 yield() # consume event @test tc[] == 1 + ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async) + Base.process_events() Sys.iswindows() && Base.process_events() # schedule event (windows?) yield() # consume event @test tc[] == 2 From 9c8c57e0671d1af8a89a22072bebaf875f1d01cf Mon Sep 17 00:00:00 2001 From: K Pamnany Date: Wed, 26 Mar 2025 16:58:14 +0000 Subject: [PATCH 5/6] Fix Sockets test Previously, this test depended on scheduler behavior, which is slightly changed in this PR. Changed the test to connect to a non-routable IP address so that it no longer depends on task ordering. --- stdlib/Sockets/test/runtests.jl | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/stdlib/Sockets/test/runtests.jl b/stdlib/Sockets/test/runtests.jl index 26f95d4ce1819..5a822315ba2cd 100644 --- a/stdlib/Sockets/test/runtests.jl +++ b/stdlib/Sockets/test/runtests.jl @@ -547,14 +547,12 @@ end fetch(r) end - let addr = Sockets.InetAddr(ip"127.0.0.1", 4444) - srv = listen(addr) + let addr = Sockets.InetAddr(ip"192.0.2.5", 4444) s = Sockets.TCPSocket() Sockets.connect!(s, addr) r = @async close(s) @test_throws Base._UVError("connect", Base.UV_ECANCELED) Sockets.wait_connected(s) fetch(r) - close(srv) end end From 192f1228f0b137226f4a34d429a232efcb7c6090 Mon Sep 17 00:00:00 2001 From: K Pamnany Date: Wed, 26 Mar 2025 17:14:52 -0400 Subject: [PATCH 6/6] Remove bad tests for `Workqueue` contents --- test/channels.jl | 2 -- 1 file changed, 2 deletions(-) diff --git a/test/channels.jl b/test/channels.jl index 708455eac610d..721eb478bd13a 100644 --- a/test/channels.jl +++ b/test/channels.jl @@ -463,10 +463,8 @@ end cb = first(async.cond.waitq) @test isopen(async) ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async) - @test isempty(Base.Workqueue) Base.process_events() # schedule event Sys.iswindows() && Base.process_events() # schedule event (windows?) - @test length(Base.Workqueue) == 1 ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async) @test tc[] == 0 yield() # consume event