Skip to content

Commit 3e4a0e7

Browse files
authored
fix #35937, serializing TaskFailedException in Distributed (#35943)
1 parent 4ea22b5 commit 3e4a0e7

File tree

2 files changed

+51
-1
lines changed

2 files changed

+51
-1
lines changed

src/clusterserialize.jl

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
using Serialization: serialize_cycle, deserialize_cycle, writetag,
44
serialize_typename, deserialize_typename,
5-
TYPENAME_TAG, reset_state, serialize_type
5+
TYPENAME_TAG, TASK_TAG, reset_state, serialize_type
66
using Serialization.__deserialized_types__
77

88
import Serialization: object_number, lookup_object_number, remember_object
@@ -102,6 +102,26 @@ function serialize(s::ClusterSerializer, t::Core.TypeName)
102102
nothing
103103
end
104104

105+
function serialize(s::ClusterSerializer, t::Task)
106+
serialize_cycle(s, t) && return
107+
if istaskstarted(t) && !istaskdone(t)
108+
error("cannot serialize a running Task")
109+
end
110+
writetag(s.io, TASK_TAG)
111+
serialize(s, t.code)
112+
serialize(s, t.storage)
113+
bt = t.backtrace
114+
if bt !== nothing
115+
if !isa(bt, Vector{Any})
116+
bt = Base.process_backtrace(bt, 100)
117+
end
118+
serialize(s, bt)
119+
end
120+
serialize(s, t.state)
121+
serialize(s, t.result)
122+
serialize(s, t.exception)
123+
end
124+
105125
function serialize(s::ClusterSerializer, g::GlobalRef)
106126
# Record if required and then invoke the default GlobalRef serializer.
107127
sym = g.name
@@ -231,6 +251,23 @@ function deserialize(s::ClusterSerializer, t::Type{<:CapturedException})
231251
return CapturedException(capex, bt)
232252
end
233253

254+
function deserialize(s::ClusterSerializer, ::Type{Task})
255+
t = Task(nothing)
256+
deserialize_cycle(s, t)
257+
t.code = deserialize(s)
258+
t.storage = deserialize(s)
259+
state_or_bt = deserialize(s)
260+
if state_or_bt isa Symbol
261+
t.state = state_or_bt
262+
else
263+
t.backtrace = state_or_bt
264+
t.state = deserialize(s)
265+
end
266+
t.result = deserialize(s)
267+
t.exception = deserialize(s)
268+
t
269+
end
270+
234271
"""
235272
clear!(syms, pids=workers(); mod=Main)
236273

test/distributed_exec.jl

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1682,6 +1682,19 @@ let (h, t) = Distributed.head_and_tail(Int[], 0)
16821682
@test collect(t) == []
16831683
end
16841684

1685+
# issue #35937
1686+
let e
1687+
try
1688+
pmap(1) do _
1689+
wait(@async error(42))
1690+
end
1691+
catch ex
1692+
e = ex
1693+
end
1694+
# check that the inner TaskFailedException is correctly formed & can be printed
1695+
@test sprint(showerror, e) isa String
1696+
end
1697+
16851698
include("splitrange.jl")
16861699

16871700
# Run topology tests last after removing all workers, since a given

0 commit comments

Comments
 (0)