Skip to content

Commit 2264eee

Browse files
chore(snownet): reuse RingBuffer (firezone#3725)
Previously, we eagerly created a channel bind message and then buffered it if we didn't have an allocation. That resulted in some duplicated checks once we did end up sending the message. To avoid this, we remove the dedicated `BufferedChannelBindings` struct and instead use the newly added `RingBuffer`. Whilst we are at it, we also increase the number of buffered messages to avoid dropping them too early.
1 parent c09ba08 commit 2264eee

File tree

2 files changed

+45
-90
lines changed

2 files changed

+45
-90
lines changed

rust/connlib/snownet/src/allocation.rs

+33-90
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ pub struct Allocation {
5858
timed_out_requests: RingBuffer<TransactionId>,
5959

6060
channel_bindings: ChannelBindings,
61-
buffered_channel_bindings: BufferedChannelBindings,
61+
buffered_channel_bindings: RingBuffer<SocketAddr>,
6262

6363
last_now: Instant,
6464

@@ -113,7 +113,7 @@ impl Allocation {
113113
allocation_lifetime: Default::default(),
114114
channel_bindings: Default::default(),
115115
last_now: now,
116-
buffered_channel_bindings: Default::default(),
116+
buffered_channel_bindings: RingBuffer::new(100),
117117
backoff: backoff::new(now, REQUEST_TIMEOUT),
118118
timed_out_requests: RingBuffer::new(100),
119119
};
@@ -299,6 +299,11 @@ impl Allocation {
299299
.attributes()
300300
.find_map(relay_candidate(|s| s.is_ipv6()));
301301

302+
if maybe_ip4_relay_candidate.is_none() && maybe_ip6_relay_candidate.is_none() {
303+
tracing::warn!("Relay sent a successful allocate response without addresses");
304+
return true;
305+
}
306+
302307
self.allocation_lifetime = Some((now, lifetime));
303308
update_candidate(
304309
maybe_srflx_candidate,
@@ -324,18 +329,12 @@ impl Allocation {
324329
"Updated candidates of allocation"
325330
);
326331

327-
while let Some(buffered) = self.buffered_channel_bindings.pop_front() {
328-
let Some(peer) = buffered.get_attribute::<XorPeerAddress>() else {
329-
debug_assert!(false, "channel binding must have peer address");
330-
continue;
331-
};
332-
333-
if !self.can_relay_to(peer.address()) {
334-
tracing::debug!("Allocation cannot relay to this IP version");
335-
continue;
336-
}
337-
338-
self.authenticate_and_queue(buffered);
332+
while let Some(peer) = self.buffered_channel_bindings.pop() {
333+
debug_assert!(
334+
self.has_allocation(),
335+
"We just received a successful allocation response"
336+
);
337+
self.bind_channel(peer, now);
339338
}
340339
}
341340
REFRESH => {
@@ -501,17 +500,10 @@ impl Allocation {
501500
return;
502501
}
503502

504-
let Some(channel) = self.channel_bindings.new_channel_to_peer(peer, now) else {
505-
tracing::warn!("All channels are exhausted");
506-
return;
507-
};
508-
509-
let msg = make_channel_bind_request(peer, channel);
510-
511503
if !self.has_allocation() {
512504
tracing::debug!("No allocation yet, buffering channel binding");
513505

514-
self.buffered_channel_bindings.push_back(msg);
506+
self.buffered_channel_bindings.push(peer);
515507
return;
516508
}
517509

@@ -520,7 +512,12 @@ impl Allocation {
520512
return;
521513
}
522514

523-
self.authenticate_and_queue(msg);
515+
let Some(channel) = self.channel_bindings.new_channel_to_peer(peer, now) else {
516+
tracing::warn!("All channels are exhausted");
517+
return;
518+
};
519+
520+
self.authenticate_and_queue(make_channel_bind_request(peer, channel));
524521
}
525522

526523
pub fn encode_to_slice(
@@ -627,17 +624,18 @@ impl Allocation {
627624
}
628625

629626
fn channel_binding_in_flight_by_peer(&self, peer: SocketAddr) -> bool {
630-
let sent_requests = self.sent_requests.values().map(|(r, _, _)| r);
631-
let buffered = self.buffered_channel_bindings.inner.iter();
632-
633-
sent_requests.chain(buffered).any(|message| {
634-
let is_binding = message.method() == CHANNEL_BIND;
635-
let is_for_peer = message
636-
.get_attribute::<XorPeerAddress>()
637-
.is_some_and(|n| n.address() == peer);
627+
let sent_requests = self
628+
.sent_requests
629+
.values()
630+
.map(|(r, _, _)| r)
631+
.filter(|message| message.method() == CHANNEL_BIND)
632+
.filter_map(|message| message.get_attribute::<XorPeerAddress>())
633+
.map(|a| a.address());
634+
let buffered = self.buffered_channel_bindings.iter().copied();
638635

639-
is_binding && is_for_peer
640-
})
636+
sent_requests
637+
.chain(buffered)
638+
.any(|buffered| buffered == peer)
641639
}
642640

643641
fn allocate_in_flight(&self) -> bool {
@@ -1050,39 +1048,6 @@ impl Channel {
10501048
}
10511049
}
10521050

1053-
#[derive(Debug, Default)]
1054-
struct BufferedChannelBindings {
1055-
inner: VecDeque<Message<Attribute>>,
1056-
}
1057-
1058-
impl BufferedChannelBindings {
1059-
/// Adds a new `CHANNEL-BIND` message to this buffer.
1060-
///
1061-
/// The buffer has a fixed size of 10 to avoid unbounded memory growth.
1062-
/// All prior messages are cleared once we outgrow the buffer.
1063-
/// Very likely, we buffer `CHANNEL-BIND` messages only for a brief period of time.
1064-
/// However, it might also happen that we can only re-connect to a TURN server after an extended period of downtime.
1065-
/// Chances are that we don't need any of the old channels any more, and that the new ones are much more relevant.
1066-
fn push_back(&mut self, msg: Message<Attribute>) {
1067-
debug_assert_eq!(msg.method(), CHANNEL_BIND);
1068-
1069-
if self.inner.len() == 10 {
1070-
tracing::debug!("Clearing buffered channel-data messages");
1071-
self.inner.clear()
1072-
}
1073-
1074-
self.inner.push_back(msg);
1075-
}
1076-
1077-
fn pop_front(&mut self) -> Option<Message<Attribute>> {
1078-
self.inner.pop_front()
1079-
}
1080-
1081-
fn clear(&mut self) {
1082-
self.inner.clear()
1083-
}
1084-
}
1085-
10861051
#[cfg(test)]
10871052
mod tests {
10881053
use super::*;
@@ -1391,28 +1356,6 @@ mod tests {
13911356
assert!(expected_backoffs.is_empty())
13921357
}
13931358

1394-
#[test]
1395-
fn discards_old_channel_bindings_once_we_outgrow_buffer() {
1396-
let mut buffered_channel_bindings = BufferedChannelBindings::default();
1397-
1398-
for c in 0..11 {
1399-
buffered_channel_bindings.push_back(make_channel_bind_request(
1400-
PEER1,
1401-
ChannelBindings::FIRST_CHANNEL + c,
1402-
));
1403-
}
1404-
1405-
let msg = buffered_channel_bindings.pop_front().unwrap();
1406-
assert!(
1407-
buffered_channel_bindings.pop_front().is_none(),
1408-
"no more messages"
1409-
);
1410-
assert_eq!(
1411-
msg.get_attribute::<ChannelNumber>().unwrap().value(),
1412-
ChannelBindings::FIRST_CHANNEL + 10
1413-
);
1414-
}
1415-
14161359
#[test]
14171360
fn given_no_ip6_allocation_does_not_attempt_to_bind_channel_to_ip6_address() {
14181361
let mut allocation =
@@ -1801,10 +1744,10 @@ mod tests {
18011744
let channel_bind_peer_2 = allocation.next_message().unwrap();
18021745

18031746
assert_eq!(channel_bind_peer_1.method(), CHANNEL_BIND);
1804-
assert_eq!(peer_address(&channel_bind_peer_1), PEER1);
1747+
assert_eq!(peer_address(&channel_bind_peer_1), PEER2_IP4);
18051748

18061749
assert_eq!(channel_bind_peer_2.method(), CHANNEL_BIND);
1807-
assert_eq!(peer_address(&channel_bind_peer_2), PEER2_IP4);
1750+
assert_eq!(peer_address(&channel_bind_peer_2), PEER1);
18081751
}
18091752

18101753
#[test]

rust/connlib/snownet/src/ringbuffer.rs

+12
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,18 @@ impl<T: PartialEq> RingBuffer<T> {
2424
initial_len != self.buffer.len()
2525
}
2626

27+
pub fn pop(&mut self) -> Option<T> {
28+
self.buffer.pop()
29+
}
30+
31+
pub fn clear(&mut self) {
32+
self.buffer.clear();
33+
}
34+
35+
pub fn iter(&self) -> impl Iterator<Item = &T> + '_ {
36+
self.buffer.iter()
37+
}
38+
2739
#[cfg(test)]
2840
fn inner(&self) -> &[T] {
2941
self.buffer.as_slice()

0 commit comments

Comments
 (0)