Skip to content

Commit 65953d8

Browse files
authored
Fix more invalidations from overloading == (#36282)
* Improve typing of ProcessGroup.refs * Add type annotation in stacktrace handling * Eliminate boxing in REPL Related to #15276
1 parent ab3c1d2 commit 65953d8

File tree

3 files changed

+26
-24
lines changed

3 files changed

+26
-24
lines changed

src/Distributed.jl

+14
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,20 @@ function _require_callback(mod::Base.PkgId)
8282
end
8383
end
8484

85+
const REF_ID = Ref(1)
86+
next_ref_id() = (id = REF_ID[]; REF_ID[] = id+1; id)
87+
88+
struct RRID
89+
whence::Int
90+
id::Int
91+
92+
RRID() = RRID(myid(),next_ref_id())
93+
RRID(whence, id) = new(whence,id)
94+
end
95+
96+
hash(r::RRID, h::UInt) = hash(r.whence, hash(r.id, h))
97+
==(r::RRID, s::RRID) = (r.whence==s.whence && r.id==s.id)
98+
8599
include("clusterserialize.jl")
86100
include("cluster.jl") # cluster setup and management, addprocs
87101
include("messages.jl")

src/cluster.jl

+12-12
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ function set_worker_state(w, state)
148148
end
149149

150150
function check_worker_state(w::Worker)
151-
if w.state == W_CREATED
151+
if w.state === W_CREATED
152152
if !isclusterlazy()
153153
if PGRP.topology === :all_to_all
154154
# Since higher pids connect with lower pids, the remote worker
@@ -185,13 +185,13 @@ function exec_conn_func(w::Worker)
185185
end
186186

187187
function wait_for_conn(w)
188-
if w.state == W_CREATED
188+
if w.state === W_CREATED
189189
timeout = worker_timeout() - (time() - w.ct_time)
190190
timeout <= 0 && error("peer $(w.id) has not connected to $(myid())")
191191

192192
@async (sleep(timeout); notify(w.c_state; all=true))
193193
wait(w.c_state)
194-
w.state == W_CREATED && error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds")
194+
w.state === W_CREATED && error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds")
195195
end
196196
nothing
197197
end
@@ -626,7 +626,7 @@ function create_worker(manager, wconfig)
626626
# require the value of config.connect_at which is set only upon connection completion
627627
for jw in PGRP.workers
628628
if (jw.id != 1) && (jw.id < w.id)
629-
(jw.state == W_CREATED) && wait(jw.c_state)
629+
(jw.state === W_CREATED) && wait(jw.c_state)
630630
push!(join_list, jw)
631631
end
632632
end
@@ -649,7 +649,7 @@ function create_worker(manager, wconfig)
649649
end
650650

651651
for wl in wlist
652-
(wl.state == W_CREATED) && wait(wl.c_state)
652+
(wl.state === W_CREATED) && wait(wl.c_state)
653653
push!(join_list, wl)
654654
end
655655
end
@@ -767,7 +767,7 @@ end
767767
mutable struct ProcessGroup
768768
name::AbstractString
769769
workers::Array{Any,1}
770-
refs::Dict # global references
770+
refs::Dict{RRID,Any} # global references
771771
topology::Symbol
772772
lazy::Union{Bool, Nothing}
773773

@@ -851,7 +851,7 @@ function nprocs()
851851
n = length(PGRP.workers)
852852
# filter out workers in the process of being setup/shutdown.
853853
for jw in PGRP.workers
854-
if !isa(jw, LocalProcess) && (jw.state != W_CONNECTED)
854+
if !isa(jw, LocalProcess) && (jw.state !== W_CONNECTED)
855855
n = n - 1
856856
end
857857
end
@@ -902,7 +902,7 @@ julia> procs()
902902
function procs()
903903
if myid() == 1 || (PGRP.topology === :all_to_all && !isclusterlazy())
904904
# filter out workers in the process of being setup/shutdown.
905-
return Int[x.id for x in PGRP.workers if isa(x, LocalProcess) || (x.state == W_CONNECTED)]
905+
return Int[x.id for x in PGRP.workers if isa(x, LocalProcess) || (x.state === W_CONNECTED)]
906906
else
907907
return Int[x.id for x in PGRP.workers]
908908
end
@@ -911,7 +911,7 @@ end
911911
function id_in_procs(id) # faster version of `id in procs()`
912912
if myid() == 1 || (PGRP.topology === :all_to_all && !isclusterlazy())
913913
for x in PGRP.workers
914-
if (x.id::Int) == id && (isa(x, LocalProcess) || (x::Worker).state == W_CONNECTED)
914+
if (x.id::Int) == id && (isa(x, LocalProcess) || (x::Worker).state === W_CONNECTED)
915915
return true
916916
end
917917
end
@@ -933,7 +933,7 @@ Specifically all workers bound to the same ip-address as `pid` are returned.
933933
"""
934934
function procs(pid::Integer)
935935
if myid() == 1
936-
all_workers = [x for x in PGRP.workers if isa(x, LocalProcess) || (x.state == W_CONNECTED)]
936+
all_workers = [x for x in PGRP.workers if isa(x, LocalProcess) || (x.state === W_CONNECTED)]
937937
if (pid == 1) || (isa(map_pid_wrkr[pid].manager, LocalManager))
938938
Int[x.id for x in filter(w -> (w.id==1) || (isa(w.manager, LocalManager)), all_workers)]
939939
else
@@ -1040,11 +1040,11 @@ function _rmprocs(pids, waitfor)
10401040

10411041
start = time_ns()
10421042
while (time_ns() - start) < waitfor*1e9
1043-
all(w -> w.state == W_TERMINATED, rmprocset) && break
1043+
all(w -> w.state === W_TERMINATED, rmprocset) && break
10441044
sleep(min(0.1, waitfor - (time_ns() - start)/1e9))
10451045
end
10461046

1047-
unremoved = [wrkr.id for wrkr in filter(w -> w.state != W_TERMINATED, rmprocset)]
1047+
unremoved = [wrkr.id for wrkr in filter(w -> w.state !== W_TERMINATED, rmprocset)]
10481048
if length(unremoved) > 0
10491049
estr = string("rmprocs: pids ", unremoved, " not terminated after ", waitfor, " seconds.")
10501050
throw(ErrorException(estr))

src/messages.jl

-12
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,6 @@
22

33
abstract type AbstractMsg end
44

5-
const REF_ID = Ref(1)
6-
next_ref_id() = (id = REF_ID[]; REF_ID[] = id+1; id)
7-
8-
struct RRID
9-
whence::Int
10-
id::Int
11-
12-
RRID() = RRID(myid(),next_ref_id())
13-
RRID(whence, id) = new(whence,id)
14-
end
15-
hash(r::RRID, h::UInt) = hash(r.whence, hash(r.id, h))
16-
==(r::RRID, s::RRID) = (r.whence==s.whence && r.id==s.id)
175

186
## Wire format description
197
#

0 commit comments

Comments
 (0)