Skip to content

Commit 69294bb

Browse files
committed
refactor: change local reset counter to use type system more
1 parent 97704b5 commit 69294bb

File tree

3 files changed

+96
-32
lines changed

3 files changed

+96
-32
lines changed

src/proto/connection.rs

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -436,24 +436,7 @@ where
436436
// error. This is handled by setting a GOAWAY frame followed by
437437
// terminating the connection.
438438
Err(Error::GoAway(debug_data, reason, initiator)) => {
439-
let e = Error::GoAway(debug_data.clone(), reason, initiator);
440-
tracing::debug!(error = ?e, "Connection::poll; connection error");
441-
442-
// We may have already sent a GOAWAY for this error,
443-
// if so, don't send another, just flush and close up.
444-
if self
445-
.go_away
446-
.going_away()
447-
.map_or(false, |frame| frame.reason() == reason)
448-
{
449-
tracing::trace!(" -> already going away");
450-
*self.state = State::Closing(reason, initiator);
451-
return Ok(());
452-
}
453-
454-
// Reset all active streams
455-
self.streams.handle_error(e);
456-
self.go_away_now_data(reason, debug_data);
439+
self.handle_go_away(reason, debug_data, initiator);
457440
Ok(())
458441
}
459442
// Attempting to read a frame resulted in a stream level error.
@@ -462,7 +445,12 @@ where
462445
Err(Error::Reset(id, reason, initiator)) => {
463446
debug_assert_eq!(initiator, Initiator::Library);
464447
tracing::trace!(?id, ?reason, "stream error");
465-
self.streams.send_reset(id, reason);
448+
match self.streams.send_reset(id, reason) {
449+
Ok(()) => (),
450+
Err(crate::proto::error::GoAway { debug_data, reason }) => {
451+
self.handle_go_away(reason, debug_data, Initiator::Library);
452+
}
453+
}
466454
Ok(())
467455
}
468456
// Attempting to read a frame resulted in an I/O error. All
@@ -498,6 +486,27 @@ where
498486
}
499487
}
500488

489+
fn handle_go_away(&mut self, reason: Reason, debug_data: Bytes, initiator: Initiator) {
490+
let e = Error::GoAway(debug_data.clone(), reason, initiator);
491+
tracing::debug!(error = ?e, "Connection::poll; connection error");
492+
493+
// We may have already sent a GOAWAY for this error,
494+
// if so, don't send another, just flush and close up.
495+
if self
496+
.go_away
497+
.going_away()
498+
.map_or(false, |frame| frame.reason() == reason)
499+
{
500+
tracing::trace!(" -> already going away");
501+
*self.state = State::Closing(reason, initiator);
502+
return;
503+
}
504+
505+
// Reset all active streams
506+
self.streams.handle_error(e);
507+
self.go_away_now_data(reason, debug_data);
508+
}
509+
501510
fn recv_frame(&mut self, frame: Option<Frame>) -> Result<ReceivedFrame, Error> {
502511
use crate::frame::Frame::*;
503512
match frame {

src/proto/error.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,11 @@ pub enum Error {
1313
Io(io::ErrorKind, Option<String>),
1414
}
1515

16+
pub struct GoAway {
17+
pub debug_data: Bytes,
18+
pub reason: Reason,
19+
}
20+
1621
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1722
pub enum Initiator {
1823
User,
@@ -60,6 +65,10 @@ impl Initiator {
6065
Self::Remote => false,
6166
}
6267
}
68+
69+
pub(crate) fn is_library(&self) -> bool {
70+
matches!(self, Self::Library)
71+
}
6372
}
6473

6574
impl fmt::Display for Error {

src/proto/streams/streams.rs

Lines changed: 59 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -388,7 +388,11 @@ impl<B> DynStreams<'_, B> {
388388
me.recv_eof(self.send_buffer, clear_pending_accept)
389389
}
390390

391-
pub fn send_reset(&mut self, id: StreamId, reason: Reason) {
391+
pub fn send_reset(
392+
&mut self,
393+
id: StreamId,
394+
reason: Reason,
395+
) -> Result<(), crate::proto::error::GoAway> {
392396
let mut me = self.inner.lock().unwrap();
393397
me.send_reset(self.send_buffer, id, reason)
394398
}
@@ -659,15 +663,23 @@ impl Inner {
659663
// The remote may send window updates for streams that the local now
660664
// considers closed. It's ok...
661665
if let Some(mut stream) = self.store.find_mut(&id) {
662-
// This result is ignored as there is nothing to do when there
663-
// is an error. The stream is reset by the function on error and
664-
// the error is informational.
665-
let _ = self.actions.send.recv_stream_window_update(
666-
frame.size_increment(),
666+
let res = self
667+
.actions
668+
.send
669+
.recv_stream_window_update(
670+
frame.size_increment(),
671+
send_buffer,
672+
&mut stream,
673+
&mut self.counts,
674+
&mut self.actions.task,
675+
)
676+
.map_err(|reason| Error::library_reset(id, reason));
677+
678+
return self.actions.reset_on_recv_stream_err(
667679
send_buffer,
668680
&mut stream,
669681
&mut self.counts,
670-
&mut self.actions.task,
682+
res,
671683
);
672684
} else {
673685
self.actions
@@ -904,7 +916,12 @@ impl Inner {
904916
Poll::Ready(Ok(()))
905917
}
906918

907-
fn send_reset<B>(&mut self, send_buffer: &SendBuffer<B>, id: StreamId, reason: Reason) {
919+
fn send_reset<B>(
920+
&mut self,
921+
send_buffer: &SendBuffer<B>,
922+
id: StreamId,
923+
reason: Reason,
924+
) -> Result<(), crate::proto::error::GoAway> {
908925
let key = match self.store.find_entry(id) {
909926
Entry::Occupied(e) => e.key(),
910927
Entry::Vacant(e) => {
@@ -945,7 +962,7 @@ impl Inner {
945962
Initiator::Library,
946963
&mut self.counts,
947964
send_buffer,
948-
);
965+
)
949966
}
950967
}
951968

@@ -1117,8 +1134,20 @@ impl<B> StreamRef<B> {
11171134
let mut send_buffer = self.send_buffer.inner.lock().unwrap();
11181135
let send_buffer = &mut *send_buffer;
11191136

1120-
me.actions
1121-
.send_reset(stream, reason, Initiator::User, &mut me.counts, send_buffer);
1137+
match me
1138+
.actions
1139+
.send_reset(stream, reason, Initiator::User, &mut me.counts, send_buffer)
1140+
{
1141+
Ok(()) => (),
1142+
Err(crate::proto::error::GoAway { .. }) => {
1143+
// this should never happen, because Initiator::User resets do
1144+
// not count toward the local limit.
1145+
// we could perhaps make this state impossible, if we made the
1146+
// initiator argument a generic, and so this could return
1147+
// Infallible instead of an impossible GoAway, but oh well.
1148+
unreachable!("Initiator::User should not error sending reset");
1149+
}
1150+
}
11221151
}
11231152

11241153
pub fn send_response(
@@ -1541,8 +1570,23 @@ impl Actions {
15411570
initiator: Initiator,
15421571
counts: &mut Counts,
15431572
send_buffer: &mut Buffer<Frame<B>>,
1544-
) {
1573+
) -> Result<(), crate::proto::error::GoAway> {
15451574
counts.transition(stream, |counts, stream| {
1575+
if initiator.is_library() {
1576+
if counts.can_inc_num_local_error_resets() {
1577+
counts.inc_num_local_error_resets();
1578+
} else {
1579+
tracing::warn!(
1580+
"locally-reset streams reached limit ({:?})",
1581+
counts.max_local_error_resets().unwrap(),
1582+
);
1583+
return Err(crate::proto::error::GoAway {
1584+
reason: Reason::ENHANCE_YOUR_CALM,
1585+
debug_data: "too_many_internal_resets".into(),
1586+
});
1587+
}
1588+
}
1589+
15461590
self.send.send_reset(
15471591
reason,
15481592
initiator,
@@ -1554,7 +1598,9 @@ impl Actions {
15541598
self.recv.enqueue_reset_expiration(stream, counts);
15551599
// if a RecvStream is parked, ensure it's notified
15561600
stream.notify_recv();
1557-
});
1601+
1602+
Ok(())
1603+
})
15581604
}
15591605

15601606
fn reset_on_recv_stream_err<B>(

0 commit comments

Comments
 (0)