Background
The broadcast queue's per-peer dispatch uses a tokio::sync::oneshot::Sender<()> (completion_tx) to coordinate semaphore-permit release. Today the queue treats any signal on this oneshot as "successfully delivered" — including the cases where the StreamSend dispatch dropped the message (channel closed, channel full / timeout, no connection) and signaled completion only to release the permit.
Caller code (broadcast_queue.rs:444-518):
match tokio::time::timeout(STREAM_COMPLETION_TIMEOUT, completion_rx).await {
Ok(Ok(())) => "completed successfully" + refresh_peer_interest + update_peer_summary,
Ok(Err(_)) => "completion channel dropped (task ended)",
Err(_) => "completion timed out",
}
After the Ok(Ok(())) branch, send_ok = send_result.is_ok() (true if bridge.send enqueued the StreamSend dispatch — not whether the peer actually received it), and downstream:
refresh_peer_interest refreshes the peer's interest TTL based on a send that may not have happened
update_peer_summary caches a summary for the peer — defeating the next summary-mismatch round that should have detected the drop
Pre-existing vs PR #4231
This is a pre-existing semantic conflation. PR #4231 extends the trigger (the StreamSend timeout path now fires completion_tx to release the permit immediately instead of waiting STREAM_COMPLETION_TIMEOUT = 120 s), making the silent-loss surface larger but not introducing it (the channel-closed path already had this behavior).
Codex external reviewer flagged this as P2 on PR #4231. Big-picture and skeptical reviewers concurred it's worth fixing but agreed it's out-of-scope for that PR.
Proposed fix
Change the oneshot's payload from () to a delivery-outcome enum:
enum DeliveryOutcome {
Delivered,
Dropped { reason: DropReason },
}
enum DropReason { Timeout, ChannelClosed, NoConnection }
broadcast_queue.rs then:
- On
Delivered → refresh interest + update summary cache (as today)
- On
Dropped { ... } → release permit but do NOT update summary cache; ideally trigger an immediate retry path or at least a counter
Touchpoints: p2p_protoc.rs 4 sites (StreamSend dispatch + StreamSend per-peer task handling; PipeStream has no completion_tx today), broadcast_queue.rs (consumer interpretation), any other callers of send_stream_with_completion.
Risk
Medium. Semantic API change for an internal channel; touches the per-peer send path. Needs a regression test that an UPDATE dropped in StreamSend triggers a subsequent retry rather than stale-summary caching.
[AI-assisted - Claude]
Background
The broadcast queue's per-peer dispatch uses a
tokio::sync::oneshot::Sender<()>(completion_tx) to coordinate semaphore-permit release. Today the queue treats any signal on this oneshot as "successfully delivered" — including the cases where the StreamSend dispatch dropped the message (channel closed, channel full / timeout, no connection) and signaled completion only to release the permit.Caller code (
broadcast_queue.rs:444-518):After the
Ok(Ok(()))branch,send_ok = send_result.is_ok()(true ifbridge.sendenqueued the StreamSend dispatch — not whether the peer actually received it), and downstream:refresh_peer_interestrefreshes the peer's interest TTL based on a send that may not have happenedupdate_peer_summarycaches a summary for the peer — defeating the next summary-mismatch round that should have detected the dropPre-existing vs PR #4231
This is a pre-existing semantic conflation. PR #4231 extends the trigger (the
StreamSendtimeout path now firescompletion_txto release the permit immediately instead of waitingSTREAM_COMPLETION_TIMEOUT= 120 s), making the silent-loss surface larger but not introducing it (the channel-closed path already had this behavior).Codex external reviewer flagged this as P2 on PR #4231. Big-picture and skeptical reviewers concurred it's worth fixing but agreed it's out-of-scope for that PR.
Proposed fix
Change the oneshot's payload from
()to a delivery-outcome enum:broadcast_queue.rsthen:Delivered→ refresh interest + update summary cache (as today)Dropped { ... }→ release permit but do NOT update summary cache; ideally trigger an immediate retry path or at least a counterTouchpoints:
p2p_protoc.rs4 sites (StreamSend dispatch + StreamSend per-peer task handling; PipeStream has no completion_tx today),broadcast_queue.rs(consumer interpretation), any other callers ofsend_stream_with_completion.Risk
Medium. Semantic API change for an internal channel; touches the per-peer send path. Needs a regression test that an UPDATE dropped in
StreamSendtriggers a subsequent retry rather than stale-summary caching.[AI-assisted - Claude]