Skip to content

Commit fc7f63f

Browse files
authored
start adding tracing spans to internals (#478)
We've adopted `tracing` for diagnostics, but currently, it is just being used as a drop-in replacement for the `log` crate. Ideally, we would want to start emitting more structured diagnostics, using `tracing`'s `Span`s and structured key-value fields. A lot of the logging in `h2` is already written in a style that imitates the formatting of structured key-value logs, but as textual log messages. Migrating the logs to structured `tracing` events therefore is pretty easy to do. I've also started adding spans, mostly in the read path. Finally, I've updated the tests to use `tracing` rather than `env_logger`. The tracing setup happens in a macro, so that a span for each test with the test's name can be generated and entered. This will make the test output easier to read if multiple tests are run concurrently with `--nocapture`. Signed-off-by: Eliza Weisman <[email protected]>
1 parent d3c2bba commit fc7f63f

25 files changed

+369
-263
lines changed

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ tokio = { version = "0.2", features = ["io-util"] }
4848
bytes = "0.5.2"
4949
http = "0.2"
5050
tracing = { version = "0.1.13", default-features = false, features = ["std", "log"] }
51+
tracing-futures = { version = "0.2", default-features = false, features = ["std-future"]}
5152
fnv = "1.0.5"
5253
slab = "0.4.0"
5354
indexmap = "1.0"

src/client.rs

+7-1
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ use std::task::{Context, Poll};
149149
use std::time::Duration;
150150
use std::usize;
151151
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
152+
use tracing_futures::Instrument;
152153

153154
/// Initializes new HTTP/2.0 streams on a connection by sending a request.
154155
///
@@ -1115,7 +1116,10 @@ where
11151116
T: AsyncRead + AsyncWrite + Unpin,
11161117
{
11171118
let builder = Builder::new();
1118-
builder.handshake(io).await
1119+
builder
1120+
.handshake(io)
1121+
.instrument(tracing::trace_span!("client_handshake", io = %std::any::type_name::<T>()))
1122+
.await
11191123
}
11201124

11211125
// ===== impl Connection =====
@@ -1438,6 +1442,8 @@ impl Peer {
14381442
impl proto::Peer for Peer {
14391443
type Poll = Response<()>;
14401444

1445+
const NAME: &'static str = "Client";
1446+
14411447
fn r#dyn() -> proto::DynPeer {
14421448
proto::DynPeer::Client
14431449
}

src/codec/framed_read.rs

+7-3
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ impl<T> FramedRead<T> {
6161

6262
fn decode_frame(&mut self, mut bytes: BytesMut) -> Result<Option<Frame>, RecvError> {
6363
use self::RecvError::*;
64+
let span = tracing::trace_span!("FramedRead::decode_frame", offset = bytes.len());
65+
let _e = span.enter();
6466

6567
tracing::trace!("decoding frame from {}B", bytes.len());
6668

@@ -74,7 +76,7 @@ impl<T> FramedRead<T> {
7476

7577
let kind = head.kind();
7678

77-
tracing::trace!(" -> kind={:?}", kind);
79+
tracing::trace!(frame.kind = ?kind);
7880

7981
macro_rules! header_block {
8082
($frame:ident, $head:ident, $bytes:ident) => ({
@@ -338,6 +340,8 @@ where
338340
type Item = Result<Frame, RecvError>;
339341

340342
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
343+
let span = tracing::trace_span!("FramedRead::poll_next");
344+
let _e = span.enter();
341345
loop {
342346
tracing::trace!("poll");
343347
let bytes = match ready!(Pin::new(&mut self.inner).poll_next(cx)) {
@@ -346,9 +350,9 @@ where
346350
None => return Poll::Ready(None),
347351
};
348352

349-
tracing::trace!("poll; bytes={}B", bytes.len());
353+
tracing::trace!(read.bytes = bytes.len());
350354
if let Some(frame) = self.decode_frame(bytes)? {
351-
tracing::debug!("received; frame={:?}", frame);
355+
tracing::debug!(?frame, "received");
352356
return Poll::Ready(Some(Ok(frame)));
353357
}
354358
}

src/codec/framed_write.rs

+12-9
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,10 @@ where
105105
pub fn buffer(&mut self, item: Frame<B>) -> Result<(), UserError> {
106106
// Ensure that we have enough capacity to accept the write.
107107
assert!(self.has_capacity());
108+
let span = tracing::trace_span!("FramedWrite::buffer", frame = ?item);
109+
let _e = span.enter();
108110

109-
tracing::debug!("send; frame={:?}", item);
111+
tracing::debug!(frame = ?item, "send");
110112

111113
match item {
112114
Frame::Data(mut v) => {
@@ -150,19 +152,19 @@ where
150152
}
151153
Frame::Settings(v) => {
152154
v.encode(self.buf.get_mut());
153-
tracing::trace!("encoded settings; rem={:?}", self.buf.remaining());
155+
tracing::trace!(rem = self.buf.remaining(), "encoded settings");
154156
}
155157
Frame::GoAway(v) => {
156158
v.encode(self.buf.get_mut());
157-
tracing::trace!("encoded go_away; rem={:?}", self.buf.remaining());
159+
tracing::trace!(rem = self.buf.remaining(), "encoded go_away");
158160
}
159161
Frame::Ping(v) => {
160162
v.encode(self.buf.get_mut());
161-
tracing::trace!("encoded ping; rem={:?}", self.buf.remaining());
163+
tracing::trace!(rem = self.buf.remaining(), "encoded ping");
162164
}
163165
Frame::WindowUpdate(v) => {
164166
v.encode(self.buf.get_mut());
165-
tracing::trace!("encoded window_update; rem={:?}", self.buf.remaining());
167+
tracing::trace!(rem = self.buf.remaining(), "encoded window_update");
166168
}
167169

168170
Frame::Priority(_) => {
@@ -174,7 +176,7 @@ where
174176
}
175177
Frame::Reset(v) => {
176178
v.encode(self.buf.get_mut());
177-
tracing::trace!("encoded reset; rem={:?}", self.buf.remaining());
179+
tracing::trace!(rem = self.buf.remaining(), "encoded reset");
178180
}
179181
}
180182

@@ -183,18 +185,19 @@ where
183185

184186
/// Flush buffered data to the wire
185187
pub fn flush(&mut self, cx: &mut Context) -> Poll<io::Result<()>> {
186-
tracing::trace!("flush");
188+
let span = tracing::trace_span!("FramedWrite::flush");
189+
let _e = span.enter();
187190

188191
loop {
189192
while !self.is_empty() {
190193
match self.next {
191194
Some(Next::Data(ref mut frame)) => {
192-
tracing::trace!(" -> queued data frame");
195+
tracing::trace!(queued_data_frame = true);
193196
let mut buf = (&mut self.buf).chain(frame.payload_mut());
194197
ready!(Pin::new(&mut self.inner).poll_write_buf(cx, &mut buf))?;
195198
}
196199
_ => {
197-
tracing::trace!(" -> not a queued data frame");
200+
tracing::trace!(queued_data_frame = false);
198201
ready!(Pin::new(&mut self.inner).poll_write_buf(cx, &mut self.buf))?;
199202
}
200203
}

src/hpack/decoder.rs

+12-13
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,9 @@ impl Decoder {
183183
self.last_max_update = size;
184184
}
185185

186+
let span = tracing::trace_span!("hpack::decode");
187+
let _e = span.enter();
188+
186189
tracing::trace!("decode");
187190

188191
while let Some(ty) = peek_u8(src) {
@@ -191,14 +194,14 @@ impl Decoder {
191194
// determined from the first byte.
192195
match Representation::load(ty)? {
193196
Indexed => {
194-
tracing::trace!(" Indexed; rem={:?}", src.remaining());
197+
tracing::trace!(rem = src.remaining(), kind = %"Indexed");
195198
can_resize = false;
196199
let entry = self.decode_indexed(src)?;
197200
consume(src);
198201
f(entry);
199202
}
200203
LiteralWithIndexing => {
201-
tracing::trace!(" LiteralWithIndexing; rem={:?}", src.remaining());
204+
tracing::trace!(rem = src.remaining(), kind = %"LiteralWithIndexing");
202205
can_resize = false;
203206
let entry = self.decode_literal(src, true)?;
204207

@@ -209,14 +212,14 @@ impl Decoder {
209212
f(entry);
210213
}
211214
LiteralWithoutIndexing => {
212-
tracing::trace!(" LiteralWithoutIndexing; rem={:?}", src.remaining());
215+
tracing::trace!(rem = src.remaining(), kind = %"LiteralWithoutIndexing");
213216
can_resize = false;
214217
let entry = self.decode_literal(src, false)?;
215218
consume(src);
216219
f(entry);
217220
}
218221
LiteralNeverIndexed => {
219-
tracing::trace!(" LiteralNeverIndexed; rem={:?}", src.remaining());
222+
tracing::trace!(rem = src.remaining(), kind = %"LiteralNeverIndexed");
220223
can_resize = false;
221224
let entry = self.decode_literal(src, false)?;
222225
consume(src);
@@ -226,7 +229,7 @@ impl Decoder {
226229
f(entry);
227230
}
228231
SizeUpdate => {
229-
tracing::trace!(" SizeUpdate; rem={:?}", src.remaining());
232+
tracing::trace!(rem = src.remaining(), kind = %"SizeUpdate");
230233
if !can_resize {
231234
return Err(DecoderError::InvalidMaxDynamicSize);
232235
}
@@ -249,9 +252,9 @@ impl Decoder {
249252
}
250253

251254
tracing::debug!(
252-
"Decoder changed max table size from {} to {}",
253-
self.table.size(),
254-
new_size
255+
from = self.table.size(),
256+
to = new_size,
257+
"Decoder changed max table size"
255258
);
256259

257260
self.table.set_max_size(new_size);
@@ -302,11 +305,7 @@ impl Decoder {
302305
let len = decode_int(buf, 7)?;
303306

304307
if len > buf.remaining() {
305-
tracing::trace!(
306-
"decode_string underflow; len={}; remaining={}",
307-
len,
308-
buf.remaining()
309-
);
308+
tracing::trace!(len, remaining = buf.remaining(), "decode_string underflow",);
310309
return Err(DecoderError::NeedMore(NeedMore::StringUnderflow));
311310
}
312311

src/hpack/encoder.rs

+4
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,11 @@ impl Encoder {
8686
where
8787
I: Iterator<Item = Header<Option<HeaderName>>>,
8888
{
89+
let span = tracing::trace_span!("hpack::encode");
90+
let _e = span.enter();
91+
8992
let pos = position(dst);
93+
tracing::trace!(pos, "encoding at");
9094

9195
if let Err(e) = self.encode_size_updates(dst) {
9296
if e == EncoderError::BufferOverflow {

src/hpack/table.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -597,7 +597,7 @@ impl Table {
597597
}
598598
599599
assert!(dist <= their_dist,
600-
"could not find entry; actual={}; desired={};" +
600+
"could not find entry; actual={}; desired={}" +
601601
"probe={}, dist={}; their_dist={}; index={}; msg={}",
602602
actual, desired, probe, dist, their_dist,
603603
index.wrapping_sub(self.inserted), msg);

src/proto/connection.rs

+28-12
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ where
4444
/// Stream state handler
4545
streams: Streams<B, P>,
4646

47+
/// A `tracing` span tracking the lifetime of the connection.
48+
span: tracing::Span,
49+
4750
/// Client or server
4851
_phantom: PhantomData<P>,
4952
}
@@ -100,6 +103,7 @@ where
100103
ping_pong: PingPong::new(),
101104
settings: Settings::new(config.settings),
102105
streams,
106+
span: tracing::debug_span!("Connection", peer = %P::NAME),
103107
_phantom: PhantomData,
104108
}
105109
}
@@ -121,6 +125,9 @@ where
121125
/// Returns `RecvError` as this may raise errors that are caused by delayed
122126
/// processing of received frames.
123127
fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), RecvError>> {
128+
let _e = self.span.enter();
129+
let span = tracing::trace_span!("poll_ready");
130+
let _e = span.enter();
124131
// The order of these calls don't really matter too much
125132
ready!(self.ping_pong.send_pending_pong(cx, &mut self.codec))?;
126133
ready!(self.ping_pong.send_pending_ping(cx, &mut self.codec))?;
@@ -200,9 +207,18 @@ where
200207

201208
/// Advances the internal state of the connection.
202209
pub fn poll(&mut self, cx: &mut Context) -> Poll<Result<(), proto::Error>> {
210+
// XXX(eliza): cloning the span is unfortunately necessary here in
211+
// order to placate the borrow checker — `self` is mutably borrowed by
212+
// `poll2`, which means that we can't borrow `self.span` to enter it.
213+
// The clone is just an atomic ref bump.
214+
let span = self.span.clone();
215+
let _e = span.enter();
216+
let span = tracing::trace_span!("poll");
217+
let _e = span.enter();
203218
use crate::codec::RecvError::*;
204219

205220
loop {
221+
tracing::trace!(connection.state = ?self.state);
206222
// TODO: probably clean up this glob of code
207223
match self.state {
208224
// When open, continue to poll a frame
@@ -230,7 +246,7 @@ where
230246
// error. This is handled by setting a GOAWAY frame followed by
231247
// terminating the connection.
232248
Poll::Ready(Err(Connection(e))) => {
233-
tracing::debug!("Connection::poll; connection error={:?}", e);
249+
tracing::debug!(error = ?e, "Connection::poll; connection error");
234250

235251
// We may have already sent a GOAWAY for this error,
236252
// if so, don't send another, just flush and close up.
@@ -250,15 +266,15 @@ where
250266
// This is handled by resetting the frame then trying to read
251267
// another frame.
252268
Poll::Ready(Err(Stream { id, reason })) => {
253-
tracing::trace!("stream error; id={:?}; reason={:?}", id, reason);
269+
tracing::trace!(?id, ?reason, "stream error");
254270
self.streams.send_reset(id, reason);
255271
}
256272
// Attempting to read a frame resulted in an I/O error. All
257273
// active streams must be reset.
258274
//
259275
// TODO: Are I/O errors recoverable?
260276
Poll::Ready(Err(Io(e))) => {
261-
tracing::debug!("Connection::poll; IO error={:?}", e);
277+
tracing::debug!(error = ?e, "Connection::poll; IO error");
262278
let e = e.into();
263279

264280
// Reset all active streams
@@ -317,28 +333,28 @@ where
317333

318334
match ready!(Pin::new(&mut self.codec).poll_next(cx)?) {
319335
Some(Headers(frame)) => {
320-
tracing::trace!("recv HEADERS; frame={:?}", frame);
336+
tracing::trace!(?frame, "recv HEADERS");
321337
self.streams.recv_headers(frame)?;
322338
}
323339
Some(Data(frame)) => {
324-
tracing::trace!("recv DATA; frame={:?}", frame);
340+
tracing::trace!(?frame, "recv DATA");
325341
self.streams.recv_data(frame)?;
326342
}
327343
Some(Reset(frame)) => {
328-
tracing::trace!("recv RST_STREAM; frame={:?}", frame);
344+
tracing::trace!(?frame, "recv RST_STREAM");
329345
self.streams.recv_reset(frame)?;
330346
}
331347
Some(PushPromise(frame)) => {
332-
tracing::trace!("recv PUSH_PROMISE; frame={:?}", frame);
348+
tracing::trace!(?frame, "recv PUSH_PROMISE");
333349
self.streams.recv_push_promise(frame)?;
334350
}
335351
Some(Settings(frame)) => {
336-
tracing::trace!("recv SETTINGS; frame={:?}", frame);
352+
tracing::trace!(?frame, "recv SETTINGS");
337353
self.settings
338354
.recv_settings(frame, &mut self.codec, &mut self.streams)?;
339355
}
340356
Some(GoAway(frame)) => {
341-
tracing::trace!("recv GOAWAY; frame={:?}", frame);
357+
tracing::trace!(?frame, "recv GOAWAY");
342358
// This should prevent starting new streams,
343359
// but should allow continuing to process current streams
344360
// until they are all EOS. Once they are, State should
@@ -347,7 +363,7 @@ where
347363
self.error = Some(frame.reason());
348364
}
349365
Some(Ping(frame)) => {
350-
tracing::trace!("recv PING; frame={:?}", frame);
366+
tracing::trace!(?frame, "recv PING");
351367
let status = self.ping_pong.recv_ping(frame);
352368
if status.is_shutdown() {
353369
assert!(
@@ -360,11 +376,11 @@ where
360376
}
361377
}
362378
Some(WindowUpdate(frame)) => {
363-
tracing::trace!("recv WINDOW_UPDATE; frame={:?}", frame);
379+
tracing::trace!(?frame, "recv WINDOW_UPDATE");
364380
self.streams.recv_window_update(frame)?;
365381
}
366382
Some(Priority(frame)) => {
367-
tracing::trace!("recv PRIORITY; frame={:?}", frame);
383+
tracing::trace!(?frame, "recv PRIORITY");
368384
// TODO: handle
369385
}
370386
None => {

src/proto/peer.rs

+1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use std::fmt;
1111
pub(crate) trait Peer {
1212
/// Message type polled from the transport
1313
type Poll: fmt::Debug;
14+
const NAME: &'static str;
1415

1516
fn r#dyn() -> Dyn;
1617

0 commit comments

Comments
 (0)