Skip to content

Commit b157b2e

Browse files
committed
feat(blockexchange): implement delta WantList updates with batching
Implements delta-based WantList updates to reduce network traffic during block exchange. Only sends newly added blocks instead of resending the entire WantList on every refresh. Also some network related fixes: - Add TCP_NODELAY flag to prevent Nagle's algorithm delays - Clear sendConn on stream reset to allow garbage collection - Improve error handling in NetworkPeer.send() Part of #974 Signed-off-by: Chrysostomos Nanakos <[email protected]>
1 parent 5f3f203 commit b157b2e

File tree

4 files changed

+71
-14
lines changed

4 files changed

+71
-14
lines changed

codex/blockexchange/engine/engine.nim

Lines changed: 58 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -187,15 +187,65 @@ proc sendWantBlock(
187187
proc refreshBlockKnowledge(
188188
self: BlockExcEngine, peer: BlockExcPeerCtx
189189
) {.async: (raises: [CancelledError]).} =
190-
if self.pendingBlocks.wantListLen > 0:
191-
# We send only blocks that the peer hasn't already told us that they already have.
192-
let
193-
peerHave = peer.peerHave
194-
toAsk = self.pendingBlocks.wantList.toSeq.filterIt(it notin peerHave)
190+
if self.pendingBlocks.wantListLen == 0:
191+
if peer.lastSentWants.len > 0:
192+
trace "Clearing want list tracking, no pending blocks", peer = peer.id
193+
peer.lastSentWants.clear()
194+
return
195+
196+
# We send only blocks that the peer hasn't already told us that they already have.
197+
let
198+
peerHave = peer.peerHave
199+
toAsk = toHashSet(self.pendingBlocks.wantList.toSeq.filterIt(it notin peerHave))
200+
201+
if toAsk.len == 0:
202+
if peer.lastSentWants.len > 0:
203+
trace "Clearing want list tracking, peer has all blocks", peer = peer.id
204+
peer.lastSentWants.clear()
205+
return
206+
207+
let newWants = toAsk - peer.lastSentWants
208+
209+
if peer.lastSentWants.len > 0:
210+
if newWants.len > 0:
211+
trace "Sending delta want list update",
212+
peer = peer.id, newWants = newWants.len, totalWants = toAsk.len
213+
214+
let newWantsSeq = newWants.toSeq
215+
var offset = 0
216+
while offset < newWantsSeq.len:
217+
let batchEnd = min(offset + MaxWantListBatchSize, newWantsSeq.len)
218+
let batch = newWantsSeq[offset ..< batchEnd]
219+
220+
trace "Sending want list batch",
221+
peer = peer.id,
222+
batchSize = batch.len,
223+
offset = offset,
224+
total = newWantsSeq.len
225+
226+
await self.network.request.sendWantList(peer.id, batch, full = false)
227+
offset = batchEnd
228+
else:
229+
trace "No changes in want list, skipping send", peer = peer.id
230+
231+
peer.lastSentWants = toAsk
232+
else:
233+
trace "Sending full want list",
234+
peer = peer.id, length = toAsk.len, reason = "first_send"
235+
236+
let toAskSeq = toAsk.toSeq
237+
var offset = 0
238+
while offset < toAskSeq.len:
239+
let batchEnd = min(offset + MaxWantListBatchSize, toAskSeq.len)
240+
let batch = toAskSeq[offset ..< batchEnd]
241+
242+
trace "Sending full want list batch",
243+
peer = peer.id, batchSize = batch.len, offset = offset, total = toAskSeq.len
244+
245+
await self.network.request.sendWantList(peer.id, batch, full = (offset == 0))
246+
offset = batchEnd
195247

196-
if toAsk.len > 0:
197-
trace "Sending want list to a peer", peer = peer.id, length = toAsk.len
198-
await self.network.request.sendWantList(peer.id, toAsk, full = true)
248+
peer.lastSentWants = toAsk
199249

200250
proc refreshBlockKnowledge(self: BlockExcEngine) {.async: (raises: [CancelledError]).} =
201251
let runtimeQuota = 10.milliseconds
@@ -215,9 +265,6 @@ proc refreshBlockKnowledge(self: BlockExcEngine) {.async: (raises: [CancelledErr
215265
if peer.isKnowledgeStale or peer.lastRefresh < self.pendingBlocks.lastInclusion:
216266
if not peer.refreshInProgress:
217267
peer.refreshRequested()
218-
# TODO: optimize this by keeping track of what was sent and sending deltas.
219-
# This should allow us to run much more frequent refreshes, and be way more
220-
# efficient about it.
221268
await self.refreshBlockKnowledge(peer)
222269
else:
223270
trace "Not refreshing: peer is up to date", peer = peer.id

codex/blockexchange/network/networkpeer.nim

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,9 @@ proc readLoop*(self: NetworkPeer, conn: Connection) {.async: (raises: []).} =
6565
except CatchableError as err:
6666
warn "Exception in blockexc read loop", msg = err.msg
6767
finally:
68-
trace "Detaching read loop", peer = self.id, connId = conn.oid
68+
warn "Detaching read loop", peer = self.id, connId = conn.oid
69+
if self.sendConn == conn:
70+
self.sendConn = nil
6971
await conn.close()
7072

7173
proc connect*(
@@ -89,7 +91,12 @@ proc send*(
8991
return
9092

9193
trace "Sending message", peer = self.id, connId = conn.oid
92-
await conn.writeLp(protobufEncode(msg))
94+
try:
95+
await conn.writeLp(protobufEncode(msg))
96+
except CatchableError as err:
97+
if self.sendConn == conn:
98+
self.sendConn = nil
99+
raise newException(LPStreamError, "Failed to send message: " & err.msg)
93100

94101
func new*(
95102
T: type NetworkPeer,

codex/blockexchange/peers/peercontext.nim

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ export payments, nitro
2929
const
3030
MinRefreshInterval = 1.seconds
3131
MaxRefreshBackoff = 36 # 36 seconds
32+
MaxWantListBatchSize* = 1024 # Maximum blocks to send per WantList message
3233

3334
type BlockExcPeerCtx* = ref object of RootObj
3435
id*: PeerId
@@ -44,6 +45,8 @@ type BlockExcPeerCtx* = ref object of RootObj
4445
blocksRequested*: HashSet[BlockAddress] # pending block requests to this peer
4546
lastExchange*: Moment # last time peer has sent us a block
4647
activityTimeout*: Duration
48+
lastSentWants*: HashSet[BlockAddress]
49+
# track what wantList we last sent for delta updates
4750

4851
proc isKnowledgeStale*(self: BlockExcPeerCtx): bool =
4952
let staleness =

codex/codex.nim

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ proc new*(
211211
.withMaxConnections(config.maxPeers)
212212
.withAgentVersion(config.agentString)
213213
.withSignedPeerRecord(true)
214-
.withTcpTransport({ServerFlags.ReuseAddr})
214+
.withTcpTransport({ServerFlags.ReuseAddr, ServerFlags.TcpNoDelay})
215215
.build()
216216

217217
var

0 commit comments

Comments
 (0)