diff --git a/base/task.jl b/base/task.jl index 951e980ee903c..8cfb959536b2a 100644 --- a/base/task.jl +++ b/base/task.jl @@ -1145,6 +1145,16 @@ function throwto(t::Task, @nospecialize exc) return try_yieldto(identity) end +@inline function wait_forever() + while true + wait() + end +end + +const get_sched_task = OncePerThread{Task}() do + @task wait_forever() +end + function ensure_rescheduled(othertask::Task) ct = current_task() W = workqueue_for(Threads.threadid()) @@ -1181,25 +1191,39 @@ end checktaskempty = Partr.multiq_check_empty -@noinline function poptask(W::StickyWorkqueue) - task = trypoptask(W) - if !(task isa Task) - task = ccall(:jl_task_get_next, Ref{Task}, (Any, Any, Any), trypoptask, W, checktaskempty) - end - set_next_task(task) - nothing -end - function wait() ct = current_task() # [task] user_time -yield-or-done-> wait_time record_running_time!(ct) + # let GC run GC.safepoint() - W = workqueue_for(Threads.threadid()) - poptask(W) - result = try_yieldto(ensure_rescheduled) + # check for libuv events process_events() - # return when we come out of the queue + + # get the next task to run + result = nothing + have_result = false + W = workqueue_for(Threads.threadid()) + task = trypoptask(W) + if !(task isa Task) + # No tasks to run; switch to the scheduler task to run the + # thread sleep logic. + sched_task = get_sched_task() + if ct !== sched_task + result = yieldto(sched_task) + have_result = true + else + task = ccall(:jl_task_get_next, Ref{Task}, (Any, Any, Any), + trypoptask, W, checktaskempty) + end + end + # We may have already switched tasks (via the scheduler task), so + # only switch if we haven't. + if !have_result + @assert task isa Task + set_next_task(task) + result = try_yieldto(ensure_rescheduled) + end return result end diff --git a/stdlib/Sockets/test/runtests.jl b/stdlib/Sockets/test/runtests.jl index 26f95d4ce1819..5a822315ba2cd 100644 --- a/stdlib/Sockets/test/runtests.jl +++ b/stdlib/Sockets/test/runtests.jl @@ -547,14 +547,12 @@ end fetch(r) end - let addr = Sockets.InetAddr(ip"127.0.0.1", 4444) - srv = listen(addr) + let addr = Sockets.InetAddr(ip"192.0.2.5", 4444) s = Sockets.TCPSocket() Sockets.connect!(s, addr) r = @async close(s) @test_throws Base._UVError("connect", Base.UV_ECANCELED) Sockets.wait_connected(s) fetch(r) - close(srv) end end diff --git a/test/channels.jl b/test/channels.jl index f646b41cfa1a0..721eb478bd13a 100644 --- a/test/channels.jl +++ b/test/channels.jl @@ -463,15 +463,14 @@ end cb = first(async.cond.waitq) @test isopen(async) ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async) - ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async) - @test isempty(Base.Workqueue) Base.process_events() # schedule event Sys.iswindows() && Base.process_events() # schedule event (windows?) - @test length(Base.Workqueue) == 1 ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async) @test tc[] == 0 yield() # consume event @test tc[] == 1 + ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async) + Base.process_events() Sys.iswindows() && Base.process_events() # schedule event (windows?) yield() # consume event @test tc[] == 2