Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.

Commit 12fe7d3

Browse files
mergify[bot]behzadnouri
authored andcommitted
v1.18: reverts back in SocketAddr dedup in retransmit stage (backport of #1106) (#1225)
reverts back in SocketAddr dedup in retransmit stage (#1106) This was erronously deemed as unnecessary and removed in: anza-xyz#864 The commit partially reverts #864 and adds back socket-addr dedup. (cherry picked from commit fbe1dbc) Co-authored-by: behzad nouri <[email protected]>
1 parent ddaf56d commit 12fe7d3

File tree

1 file changed

+21
-5
lines changed

1 file changed

+21
-5
lines changed

turbine/src/cluster_nodes.rs

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@ pub struct ClusterNodesCache<T> {
8585
pub struct RetransmitPeers<'a> {
8686
root_distance: usize, // distance from the root node
8787
children: Vec<&'a Node>,
88+
// Maps tvu addresses to the first node
89+
// in the shuffle with the same address.
90+
addrs: HashMap<SocketAddr, Pubkey>, // tvu addresses
8891
}
8992

9093
impl Node {
@@ -173,13 +176,16 @@ impl ClusterNodes<RetransmitStage> {
173176
let RetransmitPeers {
174177
root_distance,
175178
children,
179+
addrs,
176180
} = self.get_retransmit_peers(slot_leader, shred, fanout)?;
177181
let protocol = get_broadcast_protocol(shred);
178-
let peers = children
179-
.into_iter()
180-
.filter_map(|node| node.contact_info()?.tvu(protocol).ok())
181-
.collect();
182-
Ok((root_distance, peers))
182+
let peers = children.into_iter().filter_map(|node| {
183+
node.contact_info()?
184+
.tvu(protocol)
185+
.ok()
186+
.filter(|addr| addrs.get(addr) == Some(&node.pubkey()))
187+
});
188+
Ok((root_distance, peers.collect()))
183189
}
184190

185191
pub fn get_retransmit_peers(
@@ -199,10 +205,19 @@ impl ClusterNodes<RetransmitStage> {
199205
if let Some(index) = self.index.get(slot_leader) {
200206
weighted_shuffle.remove_index(*index);
201207
}
208+
let mut addrs = HashMap::<SocketAddr, Pubkey>::with_capacity(self.nodes.len());
202209
let mut rng = get_seeded_rng(slot_leader, shred);
210+
let protocol = get_broadcast_protocol(shred);
203211
let nodes: Vec<_> = weighted_shuffle
204212
.shuffle(&mut rng)
205213
.map(|index| &self.nodes[index])
214+
.inspect(|node| {
215+
if let Some(node) = node.contact_info() {
216+
if let Ok(addr) = node.tvu(protocol) {
217+
addrs.entry(addr).or_insert(*node.pubkey());
218+
}
219+
}
220+
})
206221
.collect();
207222
let self_index = nodes
208223
.iter()
@@ -221,6 +236,7 @@ impl ClusterNodes<RetransmitStage> {
221236
Ok(RetransmitPeers {
222237
root_distance,
223238
children: peers.collect(),
239+
addrs,
224240
})
225241
}
226242

0 commit comments

Comments
 (0)