Skip to content

Commit fbb695f

Browse files
committed
feat(node): implement sliding window for block prefetching
Now fetchBatched maintains a sliding window of batchSize blocks in-flight. When 75% complete, adds next chunk (batchSize/2) to maintain constant window size. This ensures blocks are already pending or have been fetched when StoreStream needs them. Part of #974 Signed-off-by: Chrysostomos Nanakos <[email protected]>
1 parent d508a61 commit fbb695f

File tree

3 files changed

+58
-44
lines changed

3 files changed

+58
-44
lines changed

codex/blockexchange/engine/engine.nim

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,17 +82,17 @@ declareCounter(
8282
)
8383

8484
const
85-
DefaultMaxPeersPerRequest* = 10
8685
# The default max message length of nim-libp2p is 100 megabytes, meaning we can
8786
# in principle fit up to 1600 64k blocks per message, so 20 is well under
8887
# that number.
8988
DefaultMaxBlocksPerMessage = 20
9089
DefaultTaskQueueSize = 100
9190
DefaultConcurrentTasks = 10
9291
# Don't do more than one discovery request per `DiscoveryRateLimit` seconds.
93-
DiscoveryRateLimit = 1.seconds
92+
DiscoveryRateLimit = 3.seconds
9493
DefaultPeerActivityTimeout = 1.minutes
95-
PresenceBatchSize = 1024
94+
# Match MaxWantListBatchSize to efficiently respond to incoming WantLists
95+
PresenceBatchSize = MaxWantListBatchSize
9696

9797
type
9898
TaskHandler* = proc(task: BlockExcPeerCtx): Future[void] {.gcsafe.}

codex/blockexchange/engine/pendingblocks.nim

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ declareGauge(
3434

3535
const
3636
DefaultBlockRetries* = 3000
37-
DefaultRetryInterval* = 1.seconds
37+
DefaultRetryInterval* = 2.seconds
3838

3939
type
4040
RetriesExhaustedError* = object of CatchableError

codex/node.nim

Lines changed: 54 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,9 @@ logScope:
5353
topics = "codex node"
5454

5555
const
56-
DefaultFetchBatch = 2048
56+
DefaultFetchBatch = 1024
5757
MaxOnBatchBlocks = 128
58+
BatchRefillThreshold = 0.75 # Refill when 75% of window completes
5859

5960
type
6061
Contracts* =
@@ -188,48 +189,61 @@ proc fetchBatched*(
188189
# (i: int) => self.networkStore.getBlock(BlockAddress.init(cid, i))
189190
# )
190191

191-
var addresses = newSeqOfCap[BlockAddress](batchSize)
192-
while not iter.finished:
193-
addresses.setLen(0)
194-
for i in 0 ..< batchSize:
195-
if not iter.finished:
196-
let address = BlockAddress.init(cid, iter.next())
197-
if fetchLocal or not (await address in self.networkStore):
198-
addresses.add(address)
199-
200-
let blockResults = await self.networkStore.getBlocks(addresses)
201-
202-
var
203-
successfulBlocks = 0
204-
failedBlocks = 0
205-
blockData: seq[bt.Block]
206-
207-
for res in blockResults:
208-
without blk =? await res:
209-
inc(failedBlocks)
210-
continue
211-
212-
inc(successfulBlocks)
213-
214-
# Only retains block data in memory if there's
215-
# a callback.
216-
if not onBatch.isNil:
217-
blockData.add(blk)
218-
219-
if blockData.len >= MaxOnBatchBlocks:
220-
if batchErr =? (await onBatch(blockData)).errorOption:
221-
return failure(batchErr)
222-
blockData = @[]
223-
224-
if failedBlocks > 0:
225-
return failure("Some blocks failed (Result) to fetch (" & $failedBlocks & ")")
192+
# Sliding window: maintain batchSize blocks in-flight
193+
let
194+
refillThreshold = int(float(batchSize) * BatchRefillThreshold)
195+
refillSize = max(refillThreshold, 1)
196+
maxCallbackBlocks = min(batchSize, MaxOnBatchBlocks)
226197

227-
if not onBatch.isNil and blockData.len > 0:
228-
if batchErr =? (await onBatch(blockData)).errorOption:
229-
return failure(batchErr)
198+
var
199+
blockData: seq[bt.Block]
200+
failedBlocks = 0
201+
successfulBlocks = 0
202+
completedInWindow = 0
230203

204+
var addresses = newSeqOfCap[BlockAddress](batchSize)
205+
for i in 0 ..< batchSize:
231206
if not iter.finished:
232-
await idleAsync()
207+
let address = BlockAddress.init(cid, iter.next())
208+
if fetchLocal or not (await address in self.networkStore):
209+
addresses.add(address)
210+
211+
var blockResults = await self.networkStore.getBlocks(addresses)
212+
213+
while not blockResults.finished:
214+
without blk =? await blockResults.next(), err:
215+
inc(failedBlocks)
216+
continue
217+
218+
inc(successfulBlocks)
219+
inc(completedInWindow)
220+
221+
if not onBatch.isNil:
222+
blockData.add(blk)
223+
if blockData.len >= maxCallbackBlocks:
224+
if batchErr =? (await onBatch(blockData)).errorOption:
225+
return failure(batchErr)
226+
blockData = @[]
227+
228+
if completedInWindow >= refillThreshold and not iter.finished:
229+
var refillAddresses = newSeqOfCap[BlockAddress](refillSize)
230+
for i in 0 ..< refillSize:
231+
if not iter.finished:
232+
let address = BlockAddress.init(cid, iter.next())
233+
if fetchLocal or not (await address in self.networkStore):
234+
refillAddresses.add(address)
235+
236+
if refillAddresses.len > 0:
237+
blockResults =
238+
chain(blockResults, await self.networkStore.getBlocks(refillAddresses))
239+
completedInWindow = 0
240+
241+
if failedBlocks > 0:
242+
return failure("Some blocks failed (Result) to fetch (" & $failedBlocks & ")")
243+
244+
if not onBatch.isNil and blockData.len > 0:
245+
if batchErr =? (await onBatch(blockData)).errorOption:
246+
return failure(batchErr)
233247

234248
success()
235249

0 commit comments

Comments
 (0)