diff --git a/codex/blockexchange/engine/discovery.nim b/codex/blockexchange/engine/discovery.nim index b32b855574..883eea8eea 100644 --- a/codex/blockexchange/engine/discovery.nim +++ b/codex/blockexchange/engine/discovery.nim @@ -8,6 +8,7 @@ ## those terms. import std/sequtils +import std/algorithm import pkg/chronos import pkg/libp2p/cid @@ -38,6 +39,7 @@ const DefaultConcurrentDiscRequests = 10 DefaultDiscoveryTimeout = 1.minutes DefaultMinPeersPerBlock = 3 + DefaultMaxPeersPerBlock = 8 DefaultDiscoveryLoopSleep = 3.seconds type DiscoveryEngine* = ref object of RootObj @@ -51,11 +53,32 @@ type DiscoveryEngine* = ref object of RootObj discoveryLoop*: Future[void].Raising([]) # Discovery loop task handle discoveryQueue*: AsyncQueue[Cid] # Discovery queue trackedFutures*: TrackedFutures # Tracked Discovery tasks futures - minPeersPerBlock*: int # Max number of peers with block + minPeersPerBlock*: int # Min number of peers with block + maxPeersPerBlock*: int # Max number of peers with block discoveryLoopSleep: Duration # Discovery loop sleep inFlightDiscReqs*: Table[Cid, Future[seq[SignedPeerRecord]]] # Inflight discovery requests +proc cleanupExcessPeers(b: DiscoveryEngine, cid: Cid) {.gcsafe, raises: [].} = + var haves = b.peers.peersHave(cid) + let count = haves.len - b.maxPeersPerBlock + if count <= 0: + return + + haves.sort( + proc(a, b: BlockExcPeerCtx): int = + cmp(a.lastExchange, b.lastExchange) + ) + + let toRemove = haves[0 ..< count] + for peer in toRemove: + try: + peer.cleanPresence(BlockAddress.init(cid)) + trace "Removed block presence from peer", cid, peer = peer.id + except CatchableError as exc: + error "Failed to clean presence for peer", + cid, peer = peer.id, error = exc.msg, name = exc.name + proc discoveryQueueLoop(b: DiscoveryEngine) {.async: (raises: []).} = try: while b.discEngineRunning: @@ -78,8 +101,16 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async: (raises: []).} = trace "Discovery request already in progress", cid continue + trace "Running discovery task for cid", cid + let haves = b.peers.peersHave(cid) + if haves.len > b.maxPeersPerBlock: + trace "Cleaning up excess peers", + cid, peers = haves.len, max = b.maxPeersPerBlock + b.cleanupExcessPeers(cid) + continue + if haves.len < b.minPeersPerBlock: let request = b.discovery.find(cid) b.inFlightDiscReqs[cid] = request @@ -156,6 +187,7 @@ proc new*( concurrentDiscReqs = DefaultConcurrentDiscRequests, discoveryLoopSleep = DefaultDiscoveryLoopSleep, minPeersPerBlock = DefaultMinPeersPerBlock, + maxPeersPerBlock = DefaultMaxPeersPerBlock, ): DiscoveryEngine = ## Create a discovery engine instance for advertising services ## @@ -171,4 +203,5 @@ proc new*( inFlightDiscReqs: initTable[Cid, Future[seq[SignedPeerRecord]]](), discoveryLoopSleep: discoveryLoopSleep, minPeersPerBlock: minPeersPerBlock, + maxPeersPerBlock: maxPeersPerBlock, ) diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index 0d04fd7fd2..ddbad52cf2 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -12,12 +12,14 @@ import std/sets import std/options import std/algorithm import std/sugar +import std/random import pkg/chronos import pkg/libp2p/[cid, switch, multihash, multicodec] import pkg/metrics import pkg/stint import pkg/questionable +import pkg/stew/shims/sets import ../../rng import ../../stores/blockstore @@ -63,30 +65,59 @@ declareCounter(codex_block_exchange_blocks_sent, "codex blockexchange blocks sen declareCounter( codex_block_exchange_blocks_received, "codex blockexchange blocks received" ) +declareCounter( + codex_block_exchange_spurious_blocks_received, + "codex blockexchange unrequested/duplicate blocks received", +) +declareCounter( + codex_block_exchange_discovery_requests_total, + "Total number of peer discovery requests sent", +) +declareCounter( + codex_block_exchange_peer_timeouts_total, "Total number of peer activity timeouts" +) +declareCounter( + codex_block_exchange_requests_failed_total, + "Total number of block requests that failed after exhausting retries", +) const - DefaultMaxPeersPerRequest* = 10 + # The default max message length of nim-libp2p is 100 megabytes, meaning we can + # in principle fit up to 1600 64k blocks per message, so 20 is well under + # that number. + DefaultMaxBlocksPerMessage = 20 DefaultTaskQueueSize = 100 DefaultConcurrentTasks = 10 + # Don't do more than one discovery request per `DiscoveryRateLimit` seconds. + DiscoveryRateLimit = 3.seconds + DefaultPeerActivityTimeout = 1.minutes + # Match MaxWantListBatchSize to efficiently respond to incoming WantLists + PresenceBatchSize = MaxWantListBatchSize + CleanupBatchSize = 2048 type TaskHandler* = proc(task: BlockExcPeerCtx): Future[void] {.gcsafe.} TaskScheduler* = proc(task: BlockExcPeerCtx): bool {.gcsafe.} + PeerSelector* = + proc(peers: seq[BlockExcPeerCtx]): BlockExcPeerCtx {.gcsafe, raises: [].} BlockExcEngine* = ref object of RootObj localStore*: BlockStore # Local block store for this instance - network*: BlockExcNetwork # Petwork interface + network*: BlockExcNetwork # Network interface peers*: PeerCtxStore # Peers we're currently actively exchanging with taskQueue*: AsyncHeapQueue[BlockExcPeerCtx] - # Peers we're currently processing tasks for + selectPeer*: PeerSelector # Peers we're currently processing tasks for concurrentTasks: int # Number of concurrent peers we're serving at any given time trackedFutures: TrackedFutures # Tracks futures of blockexc tasks blockexcRunning: bool # Indicates if the blockexc task is running + maxBlocksPerMessage: int + # Maximum number of blocks we can squeeze in a single message pendingBlocks*: PendingBlocksManager # Blocks we're awaiting to be resolved wallet*: WalletRef # Nitro wallet for micropayments pricing*: ?Pricing # Optional bandwidth pricing discovery*: DiscoveryEngine advertiser*: Advertiser + lastDiscRequest: Moment # time of last discovery request Pricing* = object address*: EthAddress @@ -104,7 +135,6 @@ proc blockexcTaskRunner(self: BlockExcEngine) {.async: (raises: []).} proc start*(self: BlockExcEngine) {.async: (raises: []).} = ## Start the blockexc task ## - await self.discovery.start() await self.advertiser.start() @@ -154,8 +184,145 @@ proc sendWantBlock( ) # we want this remote to send us a block codex_block_exchange_want_block_lists_sent.inc() -proc randomPeer(peers: seq[BlockExcPeerCtx]): BlockExcPeerCtx = - Rng.instance.sample(peers) +proc sendBatchedWantList( + self: BlockExcEngine, + peer: BlockExcPeerCtx, + addresses: seq[BlockAddress], + full: bool, +) {.async: (raises: [CancelledError]).} = + var offset = 0 + while offset < addresses.len: + let batchEnd = min(offset + MaxWantListBatchSize, addresses.len) + let batch = addresses[offset ..< batchEnd] + + trace "Sending want list batch", + peer = peer.id, + batchSize = batch.len, + offset = offset, + total = addresses.len, + full = full + + await self.network.request.sendWantList( + peer.id, batch, full = (full and offset == 0) + ) + for address in batch: + peer.lastSentWants.incl(address) + + offset = batchEnd + +proc refreshBlockKnowledge( + self: BlockExcEngine, peer: BlockExcPeerCtx, skipDelta = false, resetBackoff = false +) {.async: (raises: [CancelledError]).} = + if peer.lastSentWants.len > 0: + var toRemove: seq[BlockAddress] + + for address in peer.lastSentWants: + if address notin self.pendingBlocks: + toRemove.add(address) + + if toRemove.len >= CleanupBatchSize: + await idleAsync() + break + + for addr in toRemove: + peer.lastSentWants.excl(addr) + + if self.pendingBlocks.wantListLen == 0: + if peer.lastSentWants.len > 0: + trace "Clearing want list tracking, no pending blocks", peer = peer.id + peer.lastSentWants.clear() + return + + # We send only blocks that the peer hasn't already told us that they already have. + let + peerHave = peer.peerHave + toAsk = toHashSet(self.pendingBlocks.wantList.toSeq.filterIt(it notin peerHave)) + + if toAsk.len == 0: + if peer.lastSentWants.len > 0: + trace "Clearing want list tracking, peer has all blocks", peer = peer.id + peer.lastSentWants.clear() + return + + let newWants = toAsk - peer.lastSentWants + + if peer.lastSentWants.len > 0 and not skipDelta: + if newWants.len > 0: + trace "Sending delta want list update", + peer = peer.id, newWants = newWants.len, totalWants = toAsk.len + + await self.sendBatchedWantList(peer, newWants.toSeq, full = false) + + if resetBackoff: + peer.wantsUpdated + else: + trace "No changes in want list, skipping send", peer = peer.id + peer.lastSentWants = toAsk + else: + trace "Sending full want list", peer = peer.id, length = toAsk.len + + await self.sendBatchedWantList(peer, toAsk.toSeq, full = true) + + if resetBackoff: + peer.wantsUpdated + +proc refreshBlockKnowledge(self: BlockExcEngine) {.async: (raises: [CancelledError]).} = + let runtimeQuota = 10.milliseconds + var lastIdle = Moment.now() + + for peer in self.peers.peers.values.toSeq: + # We refresh block knowledge if: + # 1. the peer hasn't been refreshed in a while; + # 2. the list of blocks we care about has changed. + # + # Note that because of (2), it is important that we update our + # want list in the coarsest way possible instead of over many + # small updates. + # + + # In dynamic swarms, staleness will dominate latency. + let + hasNewBlocks = peer.lastRefresh < self.pendingBlocks.lastInclusion + isKnowledgeStale = peer.isKnowledgeStale + + if isKnowledgeStale or hasNewBlocks: + if not peer.refreshInProgress: + peer.refreshRequested() + await self.refreshBlockKnowledge( + peer, skipDelta = isKnowledgeStale, resetBackoff = hasNewBlocks + ) + else: + trace "Not refreshing: peer is up to date", peer = peer.id + + if (Moment.now() - lastIdle) >= runtimeQuota: + try: + await idleAsync() + except CancelledError: + discard + lastIdle = Moment.now() + +proc searchForNewPeers(self: BlockExcEngine, cid: Cid) = + if self.lastDiscRequest + DiscoveryRateLimit < Moment.now(): + trace "Searching for new peers for", cid = cid + codex_block_exchange_discovery_requests_total.inc() + self.lastDiscRequest = Moment.now() # always refresh before calling await! + self.discovery.queueFindBlocksReq(@[cid]) + else: + trace "Not searching for new peers, rate limit not expired", cid = cid + +proc evictPeer(self: BlockExcEngine, peer: PeerId) = + ## Cleanup disconnected peer + ## + + trace "Evicting disconnected/departed peer", peer + + let peerCtx = self.peers.get(peer) + if not peerCtx.isNil: + for address in peerCtx.blocksRequested: + self.pendingBlocks.clearRequest(address, peer.some) + + # drop the peer from the peers table + self.peers.remove(peer) proc downloadInternal( self: BlockExcEngine, address: BlockAddress @@ -173,41 +340,133 @@ proc downloadInternal( if self.pendingBlocks.retriesExhausted(address): trace "Error retries exhausted" + codex_block_exchange_requests_failed_total.inc() handle.fail(newException(RetriesExhaustedError, "Error retries exhausted")) break - trace "Running retry handle" let peers = self.peers.getPeersForBlock(address) logScope: peersWith = peers.with.len peersWithout = peers.without.len - trace "Peers for block" - if peers.with.len > 0: - self.pendingBlocks.setInFlight(address, true) - await self.sendWantBlock(@[address], peers.with.randomPeer) - else: - self.pendingBlocks.setInFlight(address, false) + if peers.with.len == 0: + # We know of no peers that have the block. if peers.without.len > 0: - await self.sendWantHave(@[address], peers.without) - self.discovery.queueFindBlocksReq(@[address.cidOrTreeCid]) + # If we have peers connected but none of them have the block, this + # could be because our knowledge about what they have has run stale. + # Tries to refresh it. + await self.refreshBlockKnowledge() + # Also tries to look for new peers for good measure. + # TODO: in the future, peer search and knowledge maintenance should + # be completely decoupled from one another. It is very hard to + # control what happens and how many neighbors we get like this. + self.searchForNewPeers(address.cidOrTreeCid) + + # We now wait for a bit and then retry. If the handle gets completed in the + # meantime (cause the presence handler might have requested the block and + # received it in the meantime), we are done. Retry delays are randomized + # so we don't get all block loops spinning at the same time. + await handle or sleepAsync(secs(rand(self.pendingBlocks.retryInterval.secs))) + if handle.finished: + break + # If we still don't have the block, we'll go for another cycle. + trace "No peers for block, will retry shortly" + continue + + # Once again, it might happen that the block was requested to a peer + # in the meantime. If so, we don't need to do anything. Otherwise, + # we'll be the ones placing the request. + let scheduledPeer = + if not self.pendingBlocks.isRequested(address): + let peer = self.selectPeer(peers.with) + discard self.pendingBlocks.markRequested(address, peer.id) + peer.blockRequestScheduled(address) + trace "Request block from block retry loop" + await self.sendWantBlock(@[address], peer) + peer + else: + let peerId = self.pendingBlocks.getRequestPeer(address).get() + self.peers.get(peerId) + + if scheduledPeer.isNil: + trace "Scheduled peer no longer available, clearing stale request", address + self.pendingBlocks.clearRequest(address) + continue + + # Parks until either the block is received, or the peer times out. + let activityTimer = scheduledPeer.activityTimer() + await handle or activityTimer # TODO: or peerDropped + activityTimer.cancel() - await (handle or sleepAsync(self.pendingBlocks.retryInterval)) + # XXX: we should probably not have this. Blocks should be retried + # to infinity unless cancelled by the client. self.pendingBlocks.decRetries(address) if handle.finished: trace "Handle for block finished", failed = handle.failed break + else: + # If the peer timed out, retries immediately. + trace "Peer timed out during block request", peer = scheduledPeer.id + codex_block_exchange_peer_timeouts_total.inc() + await self.network.dropPeer(scheduledPeer.id) + # Evicts peer immediately or we may end up picking it again in the + # next retry. + self.evictPeer(scheduledPeer.id) except CancelledError as exc: trace "Block download cancelled" if not handle.finished: await handle.cancelAndWait() except RetriesExhaustedError as exc: warn "Retries exhausted for block", address, exc = exc.msg + codex_block_exchange_requests_failed_total.inc() if not handle.finished: handle.fail(exc) finally: - self.pendingBlocks.setInFlight(address, false) + self.pendingBlocks.clearRequest(address) + +proc requestBlocks*( + self: BlockExcEngine, addresses: seq[BlockAddress] +): SafeAsyncIter[Block] = + var handles: seq[BlockHandle] + + # Adds all blocks to pendingBlocks before calling the first downloadInternal. This will + # ensure that we don't send incomplete want lists. + for address in addresses: + if address notin self.pendingBlocks: + handles.add(self.pendingBlocks.getWantHandle(address)) + + for address in addresses: + self.trackedFutures.track(self.downloadInternal(address)) + + let totalHandles = handles.len + var completed = 0 + + proc isFinished(): bool = + completed == totalHandles + + proc genNext(): Future[?!Block] {.async: (raises: [CancelledError]).} = + # Be it success or failure, we're completing this future. + let value = + try: + # FIXME: this is super expensive. We're doing several linear scans, + # not to mention all the copying and callback fumbling in `one`. + let + handle = await one(handles) + i = handles.find(handle) + handles.del(i) + success await handle + except CancelledError as err: + warn "Block request cancelled", addresses, err = err.msg + raise err + except CatchableError as err: + error "Error getting blocks from exchange engine", addresses, err = err.msg + failure err + + inc(completed) + return value + + return SafeAsyncIter[Block].new(genNext, isFinished) proc requestBlock*( self: BlockExcEngine, address: BlockAddress @@ -239,60 +498,64 @@ proc completeBlock*(self: BlockExcEngine, address: BlockAddress, blk: Block) = proc blockPresenceHandler*( self: BlockExcEngine, peer: PeerId, blocks: seq[BlockPresence] ) {.async: (raises: []).} = - trace "Received block presence from peer", peer, blocks = blocks.mapIt($it) + trace "Received block presence from peer", peer, len = blocks.len let peerCtx = self.peers.get(peer) - ourWantList = toSeq(self.pendingBlocks.wantList) + ourWantList = toHashSet(self.pendingBlocks.wantList.toSeq) if peerCtx.isNil: return + peerCtx.refreshReplied() + for blk in blocks: if presence =? Presence.init(blk): peerCtx.setPresence(presence) let peerHave = peerCtx.peerHave - dontWantCids = peerHave.filterIt(it notin ourWantList) + dontWantCids = peerHave - ourWantList if dontWantCids.len > 0: - peerCtx.cleanPresence(dontWantCids) + peerCtx.cleanPresence(dontWantCids.toSeq) let ourWantCids = ourWantList.filterIt( it in peerHave and not self.pendingBlocks.retriesExhausted(it) and - not self.pendingBlocks.isInFlight(it) - ) + self.pendingBlocks.markRequested(it, peer) + ).toSeq for address in ourWantCids: - self.pendingBlocks.setInFlight(address, true) self.pendingBlocks.decRetries(address) + peerCtx.blockRequestScheduled(address) if ourWantCids.len > 0: trace "Peer has blocks in our wantList", peer, wants = ourWantCids + # FIXME: this will result in duplicate requests for blocks if err =? catch(await self.sendWantBlock(ourWantCids, peerCtx)).errorOption: warn "Failed to send wantBlock to peer", peer, err = err.msg + for address in ourWantCids: + self.pendingBlocks.clearRequest(address, peer.some) proc scheduleTasks( self: BlockExcEngine, blocksDelivery: seq[BlockDelivery] ) {.async: (raises: [CancelledError]).} = - let cids = blocksDelivery.mapIt(it.blk.cid) - # schedule any new peers to provide blocks to for p in self.peers: - for c in cids: # for each cid + for blockDelivery in blocksDelivery: # for each cid # schedule a peer if it wants at least one cid # and we have it in our local store - if c in p.peerWantsCids: + if blockDelivery.address in p.wantedBlocks: + let cid = blockDelivery.blk.cid try: - if await (c in self.localStore): + if await (cid in self.localStore): # TODO: the try/except should go away once blockstore tracks exceptions self.scheduleTask(p) break except CancelledError as exc: - warn "Checking local store canceled", cid = c, err = exc.msg + warn "Checking local store canceled", cid = cid, err = exc.msg return except CatchableError as exc: - error "Error checking local store for cid", cid = c, err = exc.msg + error "Error checking local store for cid", cid = cid, err = exc.msg raiseAssert "Unexpected error checking local store for cid" proc cancelBlocks( @@ -301,28 +564,45 @@ proc cancelBlocks( ## Tells neighboring peers that we're no longer interested in a block. ## + let blocksDelivered = toHashSet(addrs) + var scheduledCancellations: Table[PeerId, HashSet[BlockAddress]] + if self.peers.len == 0: return - trace "Sending block request cancellations to peers", - addrs, peers = self.peers.peerIds - - proc processPeer(peerCtx: BlockExcPeerCtx): Future[BlockExcPeerCtx] {.async.} = + proc dispatchCancellations( + entry: tuple[peerId: PeerId, addresses: HashSet[BlockAddress]] + ): Future[PeerId] {.async: (raises: [CancelledError]).} = + trace "Sending block request cancellations to peer", + peer = entry.peerId, addresses = entry.addresses.len await self.network.request.sendWantCancellations( - peer = peerCtx.id, addresses = addrs.filterIt(it in peerCtx) + peer = entry.peerId, addresses = entry.addresses.toSeq ) - return peerCtx + return entry.peerId try: - let (succeededFuts, failedFuts) = await allFinishedFailed[BlockExcPeerCtx]( - toSeq(self.peers.peers.values).filterIt(it.peerHave.anyIt(it in addrs)).map( - processPeer - ) + for peerCtx in self.peers.peers.values: + # Do we have pending requests, towards this peer, for any of the blocks + # that were just delivered? + let intersection = peerCtx.blocksRequested.intersection(blocksDelivered) + if intersection.len > 0: + # If so, schedules a cancellation. + scheduledCancellations[peerCtx.id] = intersection + + if scheduledCancellations.len == 0: + return + + let (succeededFuts, failedFuts) = await allFinishedFailed[PeerId]( + toSeq(scheduledCancellations.pairs).map(dispatchCancellations) ) - (await allFinished(succeededFuts)).mapIt(it.read).apply do(peerCtx: BlockExcPeerCtx): - peerCtx.cleanPresence(addrs) + (await allFinished(succeededFuts)).mapIt(it.read).apply do(peerId: PeerId): + let ctx = self.peers.get(peerId) + if not ctx.isNil: + ctx.cleanPresence(addrs) + for address in scheduledCancellations[peerId]: + ctx.blockRequestCancelled(address) if failedFuts.len > 0: warn "Failed to send block request cancellations to peers", peers = failedFuts.len @@ -392,17 +672,31 @@ proc validateBlockDelivery(self: BlockExcEngine, bd: BlockDelivery): ?!void = return success() proc blocksDeliveryHandler*( - self: BlockExcEngine, peer: PeerId, blocksDelivery: seq[BlockDelivery] + self: BlockExcEngine, + peer: PeerId, + blocksDelivery: seq[BlockDelivery], + allowSpurious: bool = false, ) {.async: (raises: []).} = trace "Received blocks from peer", peer, blocks = (blocksDelivery.mapIt(it.address)) var validatedBlocksDelivery: seq[BlockDelivery] + let peerCtx = self.peers.get(peer) + + let runtimeQuota = 10.milliseconds + var lastIdle = Moment.now() + for bd in blocksDelivery: logScope: peer = peer address = bd.address try: + # Unknown peers and unrequested blocks are dropped with a warning. + if not allowSpurious and (peerCtx == nil or not peerCtx.blockReceived(bd.address)): + warn "Dropping unrequested or duplicate block received from peer" + codex_block_exchange_spurious_blocks_received.inc() + continue + if err =? self.validateBlockDelivery(bd).errorOption: warn "Block validation failed", msg = err.msg continue @@ -428,9 +722,17 @@ proc blocksDeliveryHandler*( validatedBlocksDelivery.add(bd) + if (Moment.now() - lastIdle) >= runtimeQuota: + try: + await idleAsync() + except CancelledError: + discard + except CatchableError: + discard + lastIdle = Moment.now() + codex_block_exchange_blocks_received.inc(validatedBlocksDelivery.len.int64) - let peerCtx = self.peers.get(peer) if peerCtx != nil: if err =? catch(await self.payForBlocks(peerCtx, blocksDelivery)).errorOption: warn "Error paying for blocks", err = err.msg @@ -454,16 +756,17 @@ proc wantListHandler*( presence: seq[BlockPresence] schedulePeer = false + let runtimeQuota = 10.milliseconds + var lastIdle = Moment.now() + try: for e in wantList.entries: - let idx = peerCtx.peerWants.findIt(it.address == e.address) - logScope: peer = peerCtx.id address = e.address wantType = $e.wantType - if idx < 0: # Adding new entry to peer wants + if e.address notin peerCtx.wantedBlocks: # Adding new entry to peer wants let have = try: @@ -474,6 +777,8 @@ proc wantListHandler*( price = @(self.pricing.get(Pricing(price: 0.u256)).price.toBytesBE) if e.cancel: + # This is sort of expected if we sent the block to the peer, as we have removed + # it from the peer's wantlist ourselves. trace "Received cancelation for untracked block, skipping", address = e.address continue @@ -482,12 +787,14 @@ proc wantListHandler*( case e.wantType of WantType.WantHave: if have: + trace "We HAVE the block", address = e.address presence.add( BlockPresence( address: e.address, `type`: BlockPresenceType.Have, price: price ) ) else: + trace "We DON'T HAVE the block", address = e.address if e.sendDontHave: presence.add( BlockPresence( @@ -497,28 +804,35 @@ proc wantListHandler*( codex_block_exchange_want_have_lists_received.inc() of WantType.WantBlock: - peerCtx.peerWants.add(e) + peerCtx.wantedBlocks.incl(e.address) schedulePeer = true codex_block_exchange_want_block_lists_received.inc() else: # Updating existing entry in peer wants # peer doesn't want this block anymore if e.cancel: trace "Canceling want for block", address = e.address - peerCtx.peerWants.del(idx) + peerCtx.wantedBlocks.excl(e.address) trace "Canceled block request", - address = e.address, len = peerCtx.peerWants.len + address = e.address, len = peerCtx.wantedBlocks.len else: + trace "Peer has requested a block more than once", address = e.address if e.wantType == WantType.WantBlock: schedulePeer = true - # peer might want to ask for the same cid with - # different want params - trace "Updating want for block", address = e.address - peerCtx.peerWants[idx] = e # update entry - trace "Updated block request", - address = e.address, len = peerCtx.peerWants.len + if presence.len >= PresenceBatchSize or (Moment.now() - lastIdle) >= runtimeQuota: + if presence.len > 0: + trace "Sending presence batch to remote", items = presence.len + await self.network.request.sendPresence(peer, presence) + presence = @[] + try: + await idleAsync() + except CancelledError: + discard + lastIdle = Moment.now() + + # Send any remaining presence messages if presence.len > 0: - trace "Sending presence to remote", items = presence.mapIt($it).join(",") + trace "Sending final presence to remote", items = presence.len await self.network.request.sendPresence(peer, presence) if schedulePeer: @@ -550,7 +864,7 @@ proc paymentHandler*( else: context.paymentChannel = self.wallet.acceptChannel(payment).option -proc setupPeer*( +proc peerAddedHandler*( self: BlockExcEngine, peer: PeerId ) {.async: (raises: [CancelledError]).} = ## Perform initial setup, such as want @@ -560,88 +874,85 @@ proc setupPeer*( trace "Setting up peer", peer if peer notin self.peers: + let peerCtx = BlockExcPeerCtx(id: peer, activityTimeout: DefaultPeerActivityTimeout) trace "Setting up new peer", peer - self.peers.add(BlockExcPeerCtx(id: peer)) + self.peers.add(peerCtx) trace "Added peer", peers = self.peers.len - - # broadcast our want list, the other peer will do the same - if self.pendingBlocks.wantListLen > 0: - trace "Sending our want list to a peer", peer - let cids = toSeq(self.pendingBlocks.wantList) - await self.network.request.sendWantList(peer, cids, full = true) + await self.refreshBlockKnowledge(peerCtx) if address =? self.pricing .? address: trace "Sending account to peer", peer await self.network.request.sendAccount(peer, Account(address: address)) -proc dropPeer*(self: BlockExcEngine, peer: PeerId) {.raises: [].} = - ## Cleanup disconnected peer - ## +proc localLookup( + self: BlockExcEngine, address: BlockAddress +): Future[?!BlockDelivery] {.async: (raises: [CancelledError]).} = + if address.leaf: + (await self.localStore.getBlockAndProof(address.treeCid, address.index)).map( + (blkAndProof: (Block, CodexProof)) => + BlockDelivery(address: address, blk: blkAndProof[0], proof: blkAndProof[1].some) + ) + else: + (await self.localStore.getBlock(address)).map( + (blk: Block) => BlockDelivery(address: address, blk: blk, proof: CodexProof.none) + ) - trace "Dropping peer", peer +iterator splitBatches[T](sequence: seq[T], batchSize: int): seq[T] = + var batch: seq[T] + for element in sequence: + if batch.len == batchSize: + yield batch + batch = @[] + batch.add(element) - # drop the peer from the peers table - self.peers.remove(peer) + if batch.len > 0: + yield batch proc taskHandler*( - self: BlockExcEngine, task: BlockExcPeerCtx + self: BlockExcEngine, peerCtx: BlockExcPeerCtx ) {.gcsafe, async: (raises: [CancelledError, RetriesExhaustedError]).} = # Send to the peer blocks he wants to get, # if they present in our local store - # TODO: There should be all sorts of accounting of - # bytes sent/received here - - var wantsBlocks = - task.peerWants.filterIt(it.wantType == WantType.WantBlock and not it.inFlight) - - proc updateInFlight(addresses: seq[BlockAddress], inFlight: bool) = - for peerWant in task.peerWants.mitems: - if peerWant.address in addresses: - peerWant.inFlight = inFlight - - if wantsBlocks.len > 0: - # Mark wants as in-flight. - let wantAddresses = wantsBlocks.mapIt(it.address) - updateInFlight(wantAddresses, true) - wantsBlocks.sort(SortOrder.Descending) - - proc localLookup(e: WantListEntry): Future[?!BlockDelivery] {.async.} = - if e.address.leaf: - (await self.localStore.getBlockAndProof(e.address.treeCid, e.address.index)).map( - (blkAndProof: (Block, CodexProof)) => - BlockDelivery( - address: e.address, blk: blkAndProof[0], proof: blkAndProof[1].some - ) - ) - else: - (await self.localStore.getBlock(e.address)).map( - (blk: Block) => - BlockDelivery(address: e.address, blk: blk, proof: CodexProof.none) - ) + # Blocks that have been sent have already been picked up by other tasks and + # should not be re-sent. + var + wantedBlocks = peerCtx.wantedBlocks.filterIt(not peerCtx.isBlockSent(it)) + sent: HashSet[BlockAddress] - let - blocksDeliveryFut = await allFinished(wantsBlocks.map(localLookup)) - blocksDelivery = blocksDeliveryFut.filterIt(it.completed and it.value.isOk).mapIt: - if bd =? it.value: - bd - else: - raiseAssert "Unexpected error in local lookup" + trace "Running task for peer", peer = peerCtx.id - # All the wants that failed local lookup must be set to not-in-flight again. - let - successAddresses = blocksDelivery.mapIt(it.address) - failedAddresses = wantAddresses.filterIt(it notin successAddresses) - updateInFlight(failedAddresses, false) + for wantedBlock in wantedBlocks: + peerCtx.markBlockAsSent(wantedBlock) - if blocksDelivery.len > 0: - trace "Sending blocks to peer", - peer = task.id, blocks = (blocksDelivery.mapIt(it.address)) - await self.network.request.sendBlocksDelivery(task.id, blocksDelivery) + try: + for batch in wantedBlocks.toSeq.splitBatches(self.maxBlocksPerMessage): + var blockDeliveries: seq[BlockDelivery] + for wantedBlock in batch: + # I/O is blocking so looking up blocks sequentially is fine. + without blockDelivery =? await self.localLookup(wantedBlock), err: + error "Error getting block from local store", + err = err.msg, address = wantedBlock + peerCtx.markBlockAsNotSent(wantedBlock) + continue + blockDeliveries.add(blockDelivery) + sent.incl(wantedBlock) - codex_block_exchange_blocks_sent.inc(blocksDelivery.len.int64) + if blockDeliveries.len == 0: + continue - task.peerWants.keepItIf(it.address notin successAddresses) + await self.network.request.sendBlocksDelivery(peerCtx.id, blockDeliveries) + codex_block_exchange_blocks_sent.inc(blockDeliveries.len.int64) + # Drops the batch from the peer's set of wanted blocks; i.e. assumes that after + # we send the blocks, then the peer no longer wants them, so we don't need to + # re-send them. Note that the send might still fail down the line and we will + # have removed those anyway. At that point, we rely on the requester performing + # a retry for the request to succeed. + peerCtx.wantedBlocks.keepItIf(it notin sent) + finally: + # Better safe than sorry: if an exception does happen, we don't want to keep + # those as sent, as it'll effectively prevent the blocks from ever being sent again. + peerCtx.blocksSent.keepItIf(it notin wantedBlocks) proc blockexcTaskRunner(self: BlockExcEngine) {.async: (raises: []).} = ## process tasks @@ -657,6 +968,40 @@ proc blockexcTaskRunner(self: BlockExcEngine) {.async: (raises: []).} = info "Exiting blockexc task runner" +proc selectRandom*( + peers: seq[BlockExcPeerCtx] +): BlockExcPeerCtx {.gcsafe, raises: [].} = + if peers.len == 1: + return peers[0] + + proc evalPeerScore(peer: BlockExcPeerCtx): float = + let + loadPenalty = peer.blocksRequested.len.float * 2.0 + successRate = + if peer.exchanged > 0: + peer.exchanged.float / (peer.exchanged + peer.blocksRequested.len).float + else: + 0.5 + failurePenalty = (1.0 - successRate) * 5.0 + return loadPenalty + failurePenalty + + let + scores = peers.mapIt(evalPeerScore(it)) + maxScore = scores.max() + 1.0 + weights = scores.mapIt(maxScore - it) + + var totalWeight = 0.0 + for w in weights: + totalWeight += w + + var r = rand(totalWeight) + for i, weight in weights: + r -= weight + if r <= 0.0: + return peers[i] + + return peers[^1] + proc new*( T: type BlockExcEngine, localStore: BlockStore, @@ -666,7 +1011,9 @@ proc new*( advertiser: Advertiser, peerStore: PeerCtxStore, pendingBlocks: PendingBlocksManager, + maxBlocksPerMessage = DefaultMaxBlocksPerMessage, concurrentTasks = DefaultConcurrentTasks, + selectPeer: PeerSelector = selectRandom, ): BlockExcEngine = ## Create new block exchange engine instance ## @@ -679,23 +1026,13 @@ proc new*( wallet: wallet, concurrentTasks: concurrentTasks, trackedFutures: TrackedFutures(), + maxBlocksPerMessage: maxBlocksPerMessage, taskQueue: newAsyncHeapQueue[BlockExcPeerCtx](DefaultTaskQueueSize), discovery: discovery, advertiser: advertiser, + selectPeer: selectPeer, ) - proc peerEventHandler( - peerId: PeerId, event: PeerEvent - ): Future[void] {.gcsafe, async: (raises: [CancelledError]).} = - if event.kind == PeerEventKind.Joined: - await self.setupPeer(peerId) - else: - self.dropPeer(peerId) - - if not isNil(network.switch): - network.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Joined) - network.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Left) - proc blockWantListHandler( peer: PeerId, wantList: WantList ): Future[void] {.async: (raises: []).} = @@ -721,12 +1058,24 @@ proc new*( ): Future[void] {.async: (raises: []).} = self.paymentHandler(peer, payment) + proc peerAddedHandler( + peer: PeerId + ): Future[void] {.async: (raises: [CancelledError]).} = + await self.peerAddedHandler(peer) + + proc peerDepartedHandler( + peer: PeerId + ): Future[void] {.async: (raises: [CancelledError]).} = + self.evictPeer(peer) + network.handlers = BlockExcHandlers( onWantList: blockWantListHandler, onBlocksDelivery: blocksDeliveryHandler, onPresence: blockPresenceHandler, onAccount: accountHandler, onPayment: paymentHandler, + onPeerJoined: peerAddedHandler, + onPeerDeparted: peerDepartedHandler, ) return self diff --git a/codex/blockexchange/engine/pendingblocks.nim b/codex/blockexchange/engine/pendingblocks.nim index 80c8852746..2ff1062cdc 100644 --- a/codex/blockexchange/engine/pendingblocks.nim +++ b/codex/blockexchange/engine/pendingblocks.nim @@ -34,7 +34,7 @@ declareGauge( const DefaultBlockRetries* = 3000 - DefaultRetryInterval* = 500.millis + DefaultRetryInterval* = 2.seconds type RetriesExhaustedError* = object of CatchableError @@ -42,7 +42,7 @@ type BlockReq* = object handle*: BlockHandle - inFlight*: bool + requested*: ?PeerId blockRetries*: int startTime*: int64 @@ -50,12 +50,13 @@ type blockRetries*: int = DefaultBlockRetries retryInterval*: Duration = DefaultRetryInterval blocks*: Table[BlockAddress, BlockReq] # pending Block requests + lastInclusion*: Moment # time at which we last included a block into our wantlist proc updatePendingBlockGauge(p: PendingBlocksManager) = codex_block_exchange_pending_block_requests.set(p.blocks.len.int64) proc getWantHandle*( - self: PendingBlocksManager, address: BlockAddress, inFlight = false + self: PendingBlocksManager, address: BlockAddress, requested: ?PeerId = PeerId.none ): Future[Block] {.async: (raw: true, raises: [CancelledError, RetriesExhaustedError]).} = ## Add an event for a block ## @@ -65,11 +66,13 @@ proc getWantHandle*( do: let blk = BlockReq( handle: newFuture[Block]("pendingBlocks.getWantHandle"), - inFlight: inFlight, + requested: requested, blockRetries: self.blockRetries, startTime: getMonoTime().ticks, ) self.blocks[address] = blk + self.lastInclusion = Moment.now() + let handle = blk.handle proc cleanUpBlock(data: pointer) {.raises: [].} = @@ -86,9 +89,9 @@ proc getWantHandle*( return handle proc getWantHandle*( - self: PendingBlocksManager, cid: Cid, inFlight = false + self: PendingBlocksManager, cid: Cid, requested: ?PeerId = PeerId.none ): Future[Block] {.async: (raw: true, raises: [CancelledError, RetriesExhaustedError]).} = - self.getWantHandle(BlockAddress.init(cid), inFlight) + self.getWantHandle(BlockAddress.init(cid), requested) proc completeWantHandle*( self: PendingBlocksManager, address: BlockAddress, blk: Block @@ -121,9 +124,6 @@ proc resolve*( blockReq.handle.complete(bd.blk) codex_block_exchange_retrieval_time_us.set(retrievalDurationUs) - - if retrievalDurationUs > 500000: - warn "High block retrieval time", retrievalDurationUs, address = bd.address else: trace "Block handle already finished", address = bd.address @@ -141,19 +141,40 @@ func retriesExhausted*(self: PendingBlocksManager, address: BlockAddress): bool self.blocks.withValue(address, pending): result = pending[].blockRetries <= 0 -func setInFlight*(self: PendingBlocksManager, address: BlockAddress, inFlight = true) = - ## Set inflight status for a block +func isRequested*(self: PendingBlocksManager, address: BlockAddress): bool = + ## Check if a block has been requested to a peer ## + result = false + self.blocks.withValue(address, pending): + result = pending[].requested.isSome +func getRequestPeer*(self: PendingBlocksManager, address: BlockAddress): ?PeerId = + ## Returns the peer that requested this block + ## + result = PeerId.none self.blocks.withValue(address, pending): - pending[].inFlight = inFlight + result = pending[].requested -func isInFlight*(self: PendingBlocksManager, address: BlockAddress): bool = - ## Check if a block is in flight +proc markRequested*( + self: PendingBlocksManager, address: BlockAddress, peer: PeerId +): bool = + ## Marks this block as having been requested to a peer ## + if self.isRequested(address): + return false + + self.blocks.withValue(address, pending): + pending[].requested = peer.some + return true + +proc clearRequest*( + self: PendingBlocksManager, address: BlockAddress, peer: ?PeerId = PeerId.none +) = self.blocks.withValue(address, pending): - result = pending[].inFlight + if peer.isSome: + assert peer == pending[].requested + pending[].requested = PeerId.none func contains*(self: PendingBlocksManager, cid: Cid): bool = BlockAddress.init(cid) in self.blocks diff --git a/codex/blockexchange/network/network.nim b/codex/blockexchange/network/network.nim index d47541104a..f503a15c42 100644 --- a/codex/blockexchange/network/network.nim +++ b/codex/blockexchange/network/network.nim @@ -44,6 +44,7 @@ type AccountHandler* = proc(peer: PeerId, account: Account) {.gcsafe, async: (raises: []).} PaymentHandler* = proc(peer: PeerId, payment: SignedState) {.gcsafe, async: (raises: []).} + PeerEventHandler* = proc(peer: PeerId) {.gcsafe, async: (raises: [CancelledError]).} BlockExcHandlers* = object onWantList*: WantListHandler @@ -51,6 +52,9 @@ type onPresence*: BlockPresenceHandler onAccount*: AccountHandler onPayment*: PaymentHandler + onPeerJoined*: PeerEventHandler + onPeerDeparted*: PeerEventHandler + onPeerDropped*: PeerEventHandler WantListSender* = proc( id: PeerId, @@ -240,86 +244,104 @@ proc handlePayment( await network.handlers.onPayment(peer.id, payment) proc rpcHandler( - b: BlockExcNetwork, peer: NetworkPeer, msg: Message + self: BlockExcNetwork, peer: NetworkPeer, msg: Message ) {.async: (raises: []).} = ## handle rpc messages ## if msg.wantList.entries.len > 0: - b.trackedFutures.track(b.handleWantList(peer, msg.wantList)) + self.trackedFutures.track(self.handleWantList(peer, msg.wantList)) if msg.payload.len > 0: - b.trackedFutures.track(b.handleBlocksDelivery(peer, msg.payload)) + self.trackedFutures.track(self.handleBlocksDelivery(peer, msg.payload)) if msg.blockPresences.len > 0: - b.trackedFutures.track(b.handleBlockPresence(peer, msg.blockPresences)) + self.trackedFutures.track(self.handleBlockPresence(peer, msg.blockPresences)) if account =? Account.init(msg.account): - b.trackedFutures.track(b.handleAccount(peer, account)) + self.trackedFutures.track(self.handleAccount(peer, account)) if payment =? SignedState.init(msg.payment): - b.trackedFutures.track(b.handlePayment(peer, payment)) + self.trackedFutures.track(self.handlePayment(peer, payment)) -proc getOrCreatePeer(b: BlockExcNetwork, peer: PeerId): NetworkPeer = +proc getOrCreatePeer(self: BlockExcNetwork, peer: PeerId): NetworkPeer = ## Creates or retrieves a BlockExcNetwork Peer ## - if peer in b.peers: - return b.peers.getOrDefault(peer, nil) + if peer in self.peers: + return self.peers.getOrDefault(peer, nil) var getConn: ConnProvider = proc(): Future[Connection] {. async: (raises: [CancelledError]) .} = try: trace "Getting new connection stream", peer - return await b.switch.dial(peer, Codec) + return await self.switch.dial(peer, Codec) except CancelledError as error: raise error except CatchableError as exc: trace "Unable to connect to blockexc peer", exc = exc.msg - if not isNil(b.getConn): - getConn = b.getConn + if not isNil(self.getConn): + getConn = self.getConn let rpcHandler = proc(p: NetworkPeer, msg: Message) {.async: (raises: []).} = - await b.rpcHandler(p, msg) + await self.rpcHandler(p, msg) # create new pubsub peer let blockExcPeer = NetworkPeer.new(peer, getConn, rpcHandler) debug "Created new blockexc peer", peer - b.peers[peer] = blockExcPeer + self.peers[peer] = blockExcPeer return blockExcPeer -proc setupPeer*(b: BlockExcNetwork, peer: PeerId) = - ## Perform initial setup, such as want - ## list exchange - ## - - discard b.getOrCreatePeer(peer) - -proc dialPeer*(b: BlockExcNetwork, peer: PeerRecord) {.async.} = +proc dialPeer*(self: BlockExcNetwork, peer: PeerRecord) {.async.} = ## Dial a peer ## - if b.isSelf(peer.peerId): + if self.isSelf(peer.peerId): trace "Skipping dialing self", peer = peer.peerId return - if peer.peerId in b.peers: + if peer.peerId in self.peers: trace "Already connected to peer", peer = peer.peerId return - await b.switch.connect(peer.peerId, peer.addresses.mapIt(it.address)) + await self.switch.connect(peer.peerId, peer.addresses.mapIt(it.address)) -proc dropPeer*(b: BlockExcNetwork, peer: PeerId) = +proc dropPeer*( + self: BlockExcNetwork, peer: PeerId +) {.async: (raises: [CancelledError]).} = + trace "Dropping peer", peer + + try: + if not self.switch.isNil: + await self.switch.disconnect(peer) + except CatchableError as error: + warn "Error attempting to disconnect from peer", peer = peer, error = error.msg + + if not self.handlers.onPeerDropped.isNil: + await self.handlers.onPeerDropped(peer) + +proc handlePeerJoined*( + self: BlockExcNetwork, peer: PeerId +) {.async: (raises: [CancelledError]).} = + discard self.getOrCreatePeer(peer) + if not self.handlers.onPeerJoined.isNil: + await self.handlers.onPeerJoined(peer) + +proc handlePeerDeparted*( + self: BlockExcNetwork, peer: PeerId +) {.async: (raises: [CancelledError]).} = ## Cleanup disconnected peer ## - trace "Dropping peer", peer - b.peers.del(peer) + trace "Cleaning up departed peer", peer + self.peers.del(peer) + if not self.handlers.onPeerDeparted.isNil: + await self.handlers.onPeerDeparted(peer) -method init*(self: BlockExcNetwork) = +method init*(self: BlockExcNetwork) {.raises: [].} = ## Perform protocol initialization ## @@ -327,9 +349,11 @@ method init*(self: BlockExcNetwork) = peerId: PeerId, event: PeerEvent ): Future[void] {.gcsafe, async: (raises: [CancelledError]).} = if event.kind == PeerEventKind.Joined: - self.setupPeer(peerId) + await self.handlePeerJoined(peerId) + elif event.kind == PeerEventKind.Left: + await self.handlePeerDeparted(peerId) else: - self.dropPeer(peerId) + warn "Unknown peer event", event self.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Joined) self.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Left) diff --git a/codex/blockexchange/network/networkpeer.nim b/codex/blockexchange/network/networkpeer.nim index 66c3929412..1964a3079c 100644 --- a/codex/blockexchange/network/networkpeer.nim +++ b/codex/blockexchange/network/networkpeer.nim @@ -65,7 +65,9 @@ proc readLoop*(self: NetworkPeer, conn: Connection) {.async: (raises: []).} = except CatchableError as err: warn "Exception in blockexc read loop", msg = err.msg finally: - trace "Detaching read loop", peer = self.id, connId = conn.oid + warn "Detaching read loop", peer = self.id, connId = conn.oid + if self.sendConn == conn: + self.sendConn = nil await conn.close() proc connect*( @@ -89,7 +91,12 @@ proc send*( return trace "Sending message", peer = self.id, connId = conn.oid - await conn.writeLp(protobufEncode(msg)) + try: + await conn.writeLp(protobufEncode(msg)) + except CatchableError as err: + if self.sendConn == conn: + self.sendConn = nil + raise newException(LPStreamError, "Failed to send message: " & err.msg) func new*( T: type NetworkPeer, diff --git a/codex/blockexchange/peers/peercontext.nim b/codex/blockexchange/peers/peercontext.nim index 7a299b6b32..f9b3d586ee 100644 --- a/codex/blockexchange/peers/peercontext.nim +++ b/codex/blockexchange/peers/peercontext.nim @@ -25,28 +25,77 @@ import ../../logutils export payments, nitro +const + MinRefreshInterval = 1.seconds + MaxRefreshBackoff = 36 # 36 seconds + MaxWantListBatchSize* = 1024 # Maximum blocks to send per WantList message + type BlockExcPeerCtx* = ref object of RootObj id*: PeerId blocks*: Table[BlockAddress, Presence] # remote peer have list including price - peerWants*: seq[WantListEntry] # remote peers want lists + wantedBlocks*: HashSet[BlockAddress] # blocks that the peer wants exchanged*: int # times peer has exchanged with us - lastExchange*: Moment # last time peer has exchanged with us + refreshInProgress*: bool # indicates if a refresh is in progress + lastRefresh*: Moment # last time we refreshed our knowledge of the blocks this peer has + refreshBackoff*: int = 1 # backoff factor for refresh requests account*: ?Account # ethereum account of this peer paymentChannel*: ?ChannelId # payment channel id + blocksSent*: HashSet[BlockAddress] # blocks sent to peer + blocksRequested*: HashSet[BlockAddress] # pending block requests to this peer + lastExchange*: Moment # last time peer has sent us a block + activityTimeout*: Duration + lastSentWants*: HashSet[BlockAddress] + # track what wantList we last sent for delta updates + +proc isKnowledgeStale*(self: BlockExcPeerCtx): bool = + let staleness = + self.lastRefresh + self.refreshBackoff * MinRefreshInterval < Moment.now() + + if staleness and self.refreshInProgress: + trace "Cleaning up refresh state", peer = self.id + self.refreshInProgress = false + self.refreshBackoff = 1 + + staleness + +proc isBlockSent*(self: BlockExcPeerCtx, address: BlockAddress): bool = + address in self.blocksSent + +proc markBlockAsSent*(self: BlockExcPeerCtx, address: BlockAddress) = + self.blocksSent.incl(address) + +proc markBlockAsNotSent*(self: BlockExcPeerCtx, address: BlockAddress) = + self.blocksSent.excl(address) + +proc refreshRequested*(self: BlockExcPeerCtx) = + trace "Refresh requested for peer", peer = self.id, backoff = self.refreshBackoff + self.refreshInProgress = true + self.lastRefresh = Moment.now() -proc peerHave*(self: BlockExcPeerCtx): seq[BlockAddress] = - toSeq(self.blocks.keys) +proc refreshReplied*(self: BlockExcPeerCtx) = + self.refreshInProgress = false + self.lastRefresh = Moment.now() + self.refreshBackoff = min(self.refreshBackoff * 2, MaxRefreshBackoff) -proc peerHaveCids*(self: BlockExcPeerCtx): HashSet[Cid] = - self.blocks.keys.toSeq.mapIt(it.cidOrTreeCid).toHashSet +proc havesUpdated(self: BlockExcPeerCtx) = + self.refreshBackoff = 1 -proc peerWantsCids*(self: BlockExcPeerCtx): HashSet[Cid] = - self.peerWants.mapIt(it.address.cidOrTreeCid).toHashSet +proc wantsUpdated*(self: BlockExcPeerCtx) = + self.refreshBackoff = 1 + +proc peerHave*(self: BlockExcPeerCtx): HashSet[BlockAddress] = + # XXX: this is ugly an inefficient, but since those will typically + # be used in "joins", it's better to pay the price here and have + # a linear join than to not do it and have a quadratic join. + toHashSet(self.blocks.keys.toSeq) proc contains*(self: BlockExcPeerCtx, address: BlockAddress): bool = address in self.blocks func setPresence*(self: BlockExcPeerCtx, presence: Presence) = + if presence.address notin self.blocks: + self.havesUpdated() + self.blocks[presence.address] = presence func cleanPresence*(self: BlockExcPeerCtx, addresses: seq[BlockAddress]) = @@ -63,3 +112,36 @@ func price*(self: BlockExcPeerCtx, addresses: seq[BlockAddress]): UInt256 = price += precense[].price price + +proc blockRequestScheduled*(self: BlockExcPeerCtx, address: BlockAddress) = + ## Adds a block the set of blocks that have been requested to this peer + ## (its request schedule). + if self.blocksRequested.len == 0: + self.lastExchange = Moment.now() + self.blocksRequested.incl(address) + +proc blockRequestCancelled*(self: BlockExcPeerCtx, address: BlockAddress) = + ## Removes a block from the set of blocks that have been requested to this peer + ## (its request schedule). + self.blocksRequested.excl(address) + +proc blockReceived*(self: BlockExcPeerCtx, address: BlockAddress): bool = + let wasRequested = address in self.blocksRequested + self.blocksRequested.excl(address) + self.lastExchange = Moment.now() + wasRequested + +proc activityTimer*( + self: BlockExcPeerCtx +): Future[void] {.async: (raises: [CancelledError]).} = + ## This is called by the block exchange when a block is scheduled for this peer. + ## If the peer sends no blocks for a while, it is considered inactive/uncooperative + ## and the peer is dropped. Note that ANY block that the peer sends will reset this + ## timer for all blocks. + ## + while true: + let idleTime = Moment.now() - self.lastExchange + if idleTime > self.activityTimeout: + return + + await sleepAsync(self.activityTimeout - idleTime) diff --git a/codex/blockexchange/peers/peerctxstore.nim b/codex/blockexchange/peers/peerctxstore.nim index ce2506a82f..d2762fc800 100644 --- a/codex/blockexchange/peers/peerctxstore.nim +++ b/codex/blockexchange/peers/peerctxstore.nim @@ -62,21 +62,23 @@ func len*(self: PeerCtxStore): int = self.peers.len func peersHave*(self: PeerCtxStore, address: BlockAddress): seq[BlockExcPeerCtx] = - toSeq(self.peers.values).filterIt(it.peerHave.anyIt(it == address)) + toSeq(self.peers.values).filterIt(address in it.peerHave) func peersHave*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] = + # FIXME: this is way slower and can end up leading to unexpected performance loss. toSeq(self.peers.values).filterIt(it.peerHave.anyIt(it.cidOrTreeCid == cid)) func peersWant*(self: PeerCtxStore, address: BlockAddress): seq[BlockExcPeerCtx] = - toSeq(self.peers.values).filterIt(it.peerWants.anyIt(it == address)) + toSeq(self.peers.values).filterIt(address in it.wantedBlocks) func peersWant*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] = - toSeq(self.peers.values).filterIt(it.peerWants.anyIt(it.address.cidOrTreeCid == cid)) + # FIXME: this is way slower and can end up leading to unexpected performance loss. + toSeq(self.peers.values).filterIt(it.wantedBlocks.anyIt(it.cidOrTreeCid == cid)) proc getPeersForBlock*(self: PeerCtxStore, address: BlockAddress): PeersForBlock = var res: PeersForBlock = (@[], @[]) for peer in self: - if peer.peerHave.anyIt(it == address): + if address in peer: res.with.add(peer) else: res.without.add(peer) diff --git a/codex/blockexchange/protobuf/blockexc.nim b/codex/blockexchange/protobuf/blockexc.nim index 698686810d..a0512cd332 100644 --- a/codex/blockexchange/protobuf/blockexc.nim +++ b/codex/blockexchange/protobuf/blockexc.nim @@ -9,7 +9,6 @@ import std/hashes import std/sequtils -import pkg/stew/endians2 import message @@ -20,13 +19,6 @@ export Wantlist, WantType, WantListEntry export BlockDelivery, BlockPresenceType, BlockPresence export AccountMessage, StateChannelUpdate -proc hash*(a: BlockAddress): Hash = - if a.leaf: - let data = a.treeCid.data.buffer & @(a.index.uint64.toBytesBE) - hash(data) - else: - hash(a.cid.data.buffer) - proc hash*(e: WantListEntry): Hash = hash(e.address) diff --git a/codex/blockexchange/protobuf/message.nim b/codex/blockexchange/protobuf/message.nim index 4db8972936..00dbc57b1e 100644 --- a/codex/blockexchange/protobuf/message.nim +++ b/codex/blockexchange/protobuf/message.nim @@ -25,11 +25,15 @@ type WantListEntry* = object address*: BlockAddress + # XXX: I think explicit priority is pointless as the peer will request + # the blocks in the order it wants to receive them, and all we have to + # do is process those in the same order as we send them back. It also + # complicates things for no reason at the moment, as the priority is + # always set to 0. priority*: int32 # The priority (normalized). default to 1 cancel*: bool # Whether this revokes an entry wantType*: WantType # Note: defaults to enum 0, ie Block sendDontHave*: bool # Note: defaults to false - inFlight*: bool # Whether block sending is in progress. Not serialized. WantList* = object entries*: seq[WantListEntry] # A list of wantList entries diff --git a/codex/blocktype.nim b/codex/blocktype.nim index 7e13493d84..233d4e8f4c 100644 --- a/codex/blocktype.nim +++ b/codex/blocktype.nim @@ -9,6 +9,7 @@ import std/tables import std/sugar +import std/hashes export tables @@ -18,7 +19,7 @@ push: {.upraises: [].} import pkg/libp2p/[cid, multicodec, multihash] -import pkg/stew/byteutils +import pkg/stew/[byteutils, endians2] import pkg/questionable import pkg/questionable/results @@ -67,6 +68,13 @@ proc `$`*(a: BlockAddress): string = else: "cid: " & $a.cid +proc hash*(a: BlockAddress): Hash = + if a.leaf: + let data = a.treeCid.data.buffer & @(a.index.uint64.toBytesBE) + hash(data) + else: + hash(a.cid.data.buffer) + proc cidOrTreeCid*(a: BlockAddress): Cid = if a.leaf: a.treeCid else: a.cid diff --git a/codex/codex.nim b/codex/codex.nim index 8135746410..4cb6cfb87c 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -211,7 +211,7 @@ proc new*( .withMaxConnections(config.maxPeers) .withAgentVersion(config.agentString) .withSignedPeerRecord(true) - .withTcpTransport({ServerFlags.ReuseAddr}) + .withTcpTransport({ServerFlags.ReuseAddr, ServerFlags.TcpNoDelay}) .build() var diff --git a/codex/node.nim b/codex/node.nim index 1ca471d56f..67145d22f4 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -44,7 +44,7 @@ import ./indexingstrategy import ./utils import ./errors import ./logutils -import ./utils/asynciter +import ./utils/safeasynciter import ./utils/trackedfutures export logutils @@ -52,7 +52,10 @@ export logutils logScope: topics = "codex node" -const DefaultFetchBatch = 10 +const + DefaultFetchBatch = 1024 + MaxOnBatchBlocks = 128 + BatchRefillThreshold = 0.75 # Refill when 75% of window completes type Contracts* = @@ -186,33 +189,61 @@ proc fetchBatched*( # (i: int) => self.networkStore.getBlock(BlockAddress.init(cid, i)) # ) - while not iter.finished: - let blockFutures = collect: - for i in 0 ..< batchSize: - if not iter.finished: - let address = BlockAddress.init(cid, iter.next()) - if not (await address in self.networkStore) or fetchLocal: - self.networkStore.getBlock(address) + # Sliding window: maintain batchSize blocks in-flight + let + refillThreshold = int(float(batchSize) * BatchRefillThreshold) + refillSize = max(refillThreshold, 1) + maxCallbackBlocks = min(batchSize, MaxOnBatchBlocks) + + var + blockData: seq[bt.Block] + failedBlocks = 0 + successfulBlocks = 0 + completedInWindow = 0 + + var addresses = newSeqOfCap[BlockAddress](batchSize) + for i in 0 ..< batchSize: + if not iter.finished: + let address = BlockAddress.init(cid, iter.next()) + if fetchLocal or not (await address in self.networkStore): + addresses.add(address) + + var blockResults = await self.networkStore.getBlocks(addresses) - if blockFutures.len == 0: + while not blockResults.finished: + without blk =? await blockResults.next(), err: + inc(failedBlocks) continue - without blockResults =? await allFinishedValues[?!bt.Block](blockFutures), err: - trace "Some blocks failed to fetch", err = err.msg - return failure(err) + inc(successfulBlocks) + inc(completedInWindow) - let blocks = blockResults.filterIt(it.isSuccess()).mapIt(it.value) + if not onBatch.isNil: + blockData.add(blk) + if blockData.len >= maxCallbackBlocks: + if batchErr =? (await onBatch(blockData)).errorOption: + return failure(batchErr) + blockData = @[] - let numOfFailedBlocks = blockResults.len - blocks.len - if numOfFailedBlocks > 0: - return - failure("Some blocks failed (Result) to fetch (" & $numOfFailedBlocks & ")") + if completedInWindow >= refillThreshold and not iter.finished: + var refillAddresses = newSeqOfCap[BlockAddress](refillSize) + for i in 0 ..< refillSize: + if not iter.finished: + let address = BlockAddress.init(cid, iter.next()) + if fetchLocal or not (await address in self.networkStore): + refillAddresses.add(address) - if not onBatch.isNil and batchErr =? (await onBatch(blocks)).errorOption: - return failure(batchErr) + if refillAddresses.len > 0: + blockResults = + chain(blockResults, await self.networkStore.getBlocks(refillAddresses)) + completedInWindow = 0 - if not iter.finished: - await sleepAsync(1.millis) + if failedBlocks > 0: + return failure("Some blocks failed (Result) to fetch (" & $failedBlocks & ")") + + if not onBatch.isNil and blockData.len > 0: + if batchErr =? (await onBatch(blockData)).errorOption: + return failure(batchErr) success() diff --git a/codex/stores/blockstore.nim b/codex/stores/blockstore.nim index e436577c1e..78809191f9 100644 --- a/codex/stores/blockstore.nim +++ b/codex/stores/blockstore.nim @@ -70,6 +70,14 @@ method completeBlock*( ) {.base, gcsafe.} = discard +method getBlocks*( + self: BlockStore, addresses: seq[BlockAddress] +): Future[SafeAsyncIter[Block]] {.async: (raises: [CancelledError]).} = + ## Gets a set of blocks from the blockstore. Blocks might + ## be returned in any order. + + raiseAssert("getBlocks not implemented!") + method getBlockAndProof*( self: BlockStore, treeCid: Cid, index: Natural ): Future[?!(Block, CodexProof)] {.base, async: (raises: [CancelledError]), gcsafe.} = diff --git a/codex/stores/cachestore.nim b/codex/stores/cachestore.nim index ff3fd6df9a..a86e6b1019 100644 --- a/codex/stores/cachestore.nim +++ b/codex/stores/cachestore.nim @@ -66,6 +66,21 @@ method getBlock*( trace "Error requesting block from cache", cid, error = exc.msg return failure exc +method getBlocks*( + self: CacheStore, addresses: seq[BlockAddress] +): Future[SafeAsyncIter[Block]] {.async: (raises: [CancelledError]).} = + var i = 0 + + proc isFinished(): bool = + i == addresses.len + + proc genNext(): Future[?!Block] {.async: (raises: [CancelledError]).} = + let value = await self.getBlock(addresses[i]) + inc(i) + return value + + return SafeAsyncIter[Block].new(genNext, isFinished) + method getCidAndProof*( self: CacheStore, treeCid: Cid, index: Natural ): Future[?!(Cid, CodexProof)] {.async: (raises: [CancelledError]).} = diff --git a/codex/stores/networkstore.nim b/codex/stores/networkstore.nim index 06b96b778a..d047dc51f6 100644 --- a/codex/stores/networkstore.nim +++ b/codex/stores/networkstore.nim @@ -31,6 +31,31 @@ type NetworkStore* = ref object of BlockStore engine*: BlockExcEngine # blockexc decision engine localStore*: BlockStore # local block store +method getBlocks*( + self: NetworkStore, addresses: seq[BlockAddress] +): Future[SafeAsyncIter[Block]] {.async: (raises: [CancelledError]).} = + var + localAddresses: seq[BlockAddress] + remoteAddresses: seq[BlockAddress] + + let runtimeQuota = 10.milliseconds + var lastIdle = Moment.now() + + for address in addresses: + if not (await address in self.localStore): + remoteAddresses.add(address) + else: + localAddresses.add(address) + + if (Moment.now() - lastIdle) >= runtimeQuota: + await idleAsync() + lastIdle = Moment.now() + + return chain( + await self.localStore.getBlocks(localAddresses), + self.engine.requestBlocks(remoteAddresses), + ) + method getBlock*( self: NetworkStore, address: BlockAddress ): Future[?!Block] {.async: (raises: [CancelledError]).} = diff --git a/codex/stores/repostore/store.nim b/codex/stores/repostore/store.nim index bea2971c71..ad6f03fc70 100644 --- a/codex/stores/repostore/store.nim +++ b/codex/stores/repostore/store.nim @@ -38,6 +38,21 @@ logScope: # BlockStore API ########################################################### +method getBlocks*( + self: RepoStore, addresses: seq[BlockAddress] +): Future[SafeAsyncIter[Block]] {.async: (raises: [CancelledError]).} = + var i = 0 + + proc isFinished(): bool = + i == addresses.len + + proc genNext(): Future[?!Block] {.async: (raises: [CancelledError]).} = + let value = await self.getBlock(addresses[i]) + inc(i) + return value + + return SafeAsyncIter[Block].new(genNext, isFinished) + method getBlock*( self: RepoStore, cid: Cid ): Future[?!Block] {.async: (raises: [CancelledError]).} = diff --git a/codex/utils/safeasynciter.nim b/codex/utils/safeasynciter.nim index d582fec3b5..be46cc0f16 100644 --- a/codex/utils/safeasynciter.nim +++ b/codex/utils/safeasynciter.nim @@ -232,3 +232,28 @@ proc empty*[T](_: type SafeAsyncIter[T]): SafeAsyncIter[T] = true SafeAsyncIter[T].new(genNext, isFinished) + +proc chain*[T](iters: seq[SafeAsyncIter[T]]): SafeAsyncIter[T] = + if iters.len == 0: + return SafeAsyncIter[T].empty + + var curIdx = 0 + + proc ensureNext(): void = + while curIdx < iters.len and iters[curIdx].finished: + inc(curIdx) + + proc isFinished(): bool = + curIdx == iters.len + + proc genNext(): Future[?!T] {.async: (raises: [CancelledError]).} = + let item = await iters[curIdx].next() + ensureNext() + return item + + ensureNext() + + return SafeAsyncIter[T].new(genNext, isFinished) + +proc chain*[T](iters: varargs[SafeAsyncIter[T]]): SafeAsyncIter[T] = + chain(iters.toSeq) diff --git a/tests/codex/blockexchange/discovery/testdiscovery.nim b/tests/codex/blockexchange/discovery/testdiscovery.nim index c54a1ffffe..6b36a63044 100644 --- a/tests/codex/blockexchange/discovery/testdiscovery.nim +++ b/tests/codex/blockexchange/discovery/testdiscovery.nim @@ -54,7 +54,7 @@ asyncchecksuite "Block Advertising and Discovery": peerStore = PeerCtxStore.new() pendingBlocks = PendingBlocksManager.new() - (manifest, tree) = makeManifestAndTree(blocks).tryGet() + (_, tree, manifest) = makeDataset(blocks).tryGet() manifestBlock = bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet() @@ -172,7 +172,7 @@ asyncchecksuite "E2E - Multiple Nodes Discovery": break blocks.add(bt.Block.new(chunk).tryGet()) - let (manifest, tree) = makeManifestAndTree(blocks).tryGet() + let (_, tree, manifest) = makeDataset(blocks).tryGet() manifests.add(manifest) mBlocks.add(manifest.asBlock()) trees.add(tree) @@ -216,7 +216,6 @@ asyncchecksuite "E2E - Multiple Nodes Discovery": test "E2E - Should advertise and discover blocks": # Distribute the manifests and trees amongst 1..3 # Ask 0 to download everything without connecting him beforehand - var advertised: Table[Cid, SignedPeerRecord] MockDiscovery(blockexc[1].engine.discovery.discovery).publishBlockProvideHandler = proc( @@ -242,6 +241,7 @@ asyncchecksuite "E2E - Multiple Nodes Discovery": blk: mBlocks[0], address: BlockAddress(leaf: false, cid: mBlocks[0].cid) ) ], + allowSpurious = true, ) discard blockexc[2].engine.pendingBlocks.getWantHandle(mBlocks[1].cid) @@ -252,6 +252,7 @@ asyncchecksuite "E2E - Multiple Nodes Discovery": blk: mBlocks[1], address: BlockAddress(leaf: false, cid: mBlocks[1].cid) ) ], + allowSpurious = true, ) discard blockexc[3].engine.pendingBlocks.getWantHandle(mBlocks[2].cid) @@ -262,6 +263,7 @@ asyncchecksuite "E2E - Multiple Nodes Discovery": blk: mBlocks[2], address: BlockAddress(leaf: false, cid: mBlocks[2].cid) ) ], + allowSpurious = true, ) MockDiscovery(blockexc[0].engine.discovery.discovery).findBlockProvidersHandler = proc( @@ -311,6 +313,7 @@ asyncchecksuite "E2E - Multiple Nodes Discovery": blk: mBlocks[0], address: BlockAddress(leaf: false, cid: mBlocks[0].cid) ) ], + allowSpurious = true, ) discard blockexc[2].engine.pendingBlocks.getWantHandle(mBlocks[1].cid) @@ -321,6 +324,7 @@ asyncchecksuite "E2E - Multiple Nodes Discovery": blk: mBlocks[1], address: BlockAddress(leaf: false, cid: mBlocks[1].cid) ) ], + allowSpurious = true, ) discard blockexc[3].engine.pendingBlocks.getWantHandle(mBlocks[2].cid) @@ -331,6 +335,7 @@ asyncchecksuite "E2E - Multiple Nodes Discovery": blk: mBlocks[2], address: BlockAddress(leaf: false, cid: mBlocks[2].cid) ) ], + allowSpurious = true, ) MockDiscovery(blockexc[0].engine.discovery.discovery).findBlockProvidersHandler = proc( diff --git a/tests/codex/blockexchange/discovery/testdiscoveryengine.nim b/tests/codex/blockexchange/discovery/testdiscoveryengine.nim index 9efab1a648..df3f8c80e7 100644 --- a/tests/codex/blockexchange/discovery/testdiscoveryengine.nim +++ b/tests/codex/blockexchange/discovery/testdiscoveryengine.nim @@ -43,7 +43,7 @@ asyncchecksuite "Test Discovery Engine": blocks.add(bt.Block.new(chunk).tryGet()) - (manifest, tree) = makeManifestAndTree(blocks).tryGet() + (_, tree, manifest) = makeDataset(blocks).tryGet() manifestBlock = manifest.asBlock() blocks.add(manifestBlock) diff --git a/tests/codex/blockexchange/engine/testblockexc.nim b/tests/codex/blockexchange/engine/testblockexc.nim index 6ab345d14b..7c2a9ed851 100644 --- a/tests/codex/blockexchange/engine/testblockexc.nim +++ b/tests/codex/blockexchange/engine/testblockexc.nim @@ -29,14 +29,7 @@ asyncchecksuite "NetworkStore engine - 2 nodes": nodeCmps1 = generateNodes(1, blocks1).components[0] nodeCmps2 = generateNodes(1, blocks2).components[0] - await allFuturesThrowing( - nodeCmps1.switch.start(), - nodeCmps1.blockDiscovery.start(), - nodeCmps1.engine.start(), - nodeCmps2.switch.start(), - nodeCmps2.blockDiscovery.start(), - nodeCmps2.engine.start(), - ) + await allFuturesThrowing(nodeCmps1.start(), nodeCmps2.start()) # initialize our want lists pendingBlocks1 = @@ -65,14 +58,7 @@ asyncchecksuite "NetworkStore engine - 2 nodes": check isNil(peerCtx2).not teardown: - await allFuturesThrowing( - nodeCmps1.blockDiscovery.stop(), - nodeCmps1.engine.stop(), - nodeCmps1.switch.stop(), - nodeCmps2.blockDiscovery.stop(), - nodeCmps2.engine.stop(), - nodeCmps2.switch.stop(), - ) + await allFuturesThrowing(nodeCmps1.stop(), nodeCmps2.stop()) test "Should exchange blocks on connect": await allFuturesThrowing(allFinished(pendingBlocks1)).wait(10.seconds) @@ -96,17 +82,11 @@ asyncchecksuite "NetworkStore engine - 2 nodes": test "Should send want-have for block": let blk = bt.Block.new("Block 1".toBytes).tryGet() let blkFut = nodeCmps1.pendingBlocks.getWantHandle(blk.cid) - (await nodeCmps2.localStore.putBlock(blk)).tryGet() + peerCtx2.blockRequestScheduled(blk.address) - let entry = WantListEntry( - address: blk.address, - priority: 1, - cancel: false, - wantType: WantType.WantBlock, - sendDontHave: false, - ) + (await nodeCmps2.localStore.putBlock(blk)).tryGet() - peerCtx1.peerWants.add(entry) + peerCtx1.wantedBlocks.incl(blk.address) check nodeCmps2.engine.taskQueue.pushOrUpdateNoWait(peerCtx1).isOk check eventually (await nodeCmps1.localStore.hasBlock(blk.cid)).tryGet() @@ -209,3 +189,38 @@ asyncchecksuite "NetworkStore - multiple nodes": check pendingBlocks1.mapIt(it.read) == blocks[0 .. 3] check pendingBlocks2.mapIt(it.read) == blocks[12 .. 15] + +asyncchecksuite "NetworkStore - dissemination": + var nodes: seq[NodesComponents] + + teardown: + if nodes.len > 0: + await nodes.stop() + + test "Should disseminate blocks across large diameter swarm": + let dataset = makeDataset(await makeRandomBlocks(60 * 256, 256'nb)).tryGet() + + nodes = generateNodes( + 6, + config = NodeConfig( + useRepoStore: false, + findFreePorts: false, + basePort: 8080, + createFullNode: false, + enableBootstrap: false, + enableDiscovery: true, + ), + ) + + await assignBlocks(nodes[0], dataset, 0 .. 9) + await assignBlocks(nodes[1], dataset, 10 .. 19) + await assignBlocks(nodes[2], dataset, 20 .. 29) + await assignBlocks(nodes[3], dataset, 30 .. 39) + await assignBlocks(nodes[4], dataset, 40 .. 49) + await assignBlocks(nodes[5], dataset, 50 .. 59) + + await nodes.start() + await nodes.linearTopology() + + let downloads = nodes.mapIt(downloadDataset(it, dataset)) + await allFuturesThrowing(downloads).wait(30.seconds) diff --git a/tests/codex/blockexchange/engine/testengine.nim b/tests/codex/blockexchange/engine/testengine.nim index 0541c119fd..1afe214738 100644 --- a/tests/codex/blockexchange/engine/testengine.nim +++ b/tests/codex/blockexchange/engine/testengine.nim @@ -27,8 +27,6 @@ const NopSendWantCancellationsProc = proc( asyncchecksuite "NetworkStore engine basic": var - rng: Rng - seckey: PrivateKey peerId: PeerId chunker: Chunker wallet: WalletRef @@ -39,9 +37,7 @@ asyncchecksuite "NetworkStore engine basic": done: Future[void] setup: - rng = Rng.instance() - seckey = PrivateKey.random(rng[]).tryGet() - peerId = PeerId.init(seckey.getPublicKey().tryGet()).tryGet() + peerId = PeerId.example chunker = RandomChunker.new(Rng.instance(), size = 1024'nb, chunkSize = 256'nb) wallet = WalletRef.example blockDiscovery = Discovery.new() @@ -83,7 +79,7 @@ asyncchecksuite "NetworkStore engine basic": for b in blocks: discard engine.pendingBlocks.getWantHandle(b.cid) - await engine.setupPeer(peerId) + await engine.peerAddedHandler(peerId) await done.wait(100.millis) @@ -111,14 +107,12 @@ asyncchecksuite "NetworkStore engine basic": ) engine.pricing = pricing.some - await engine.setupPeer(peerId) + await engine.peerAddedHandler(peerId) await done.wait(100.millis) asyncchecksuite "NetworkStore engine handlers": var - rng: Rng - seckey: PrivateKey peerId: PeerId chunker: Chunker wallet: WalletRef @@ -134,8 +128,7 @@ asyncchecksuite "NetworkStore engine handlers": blocks: seq[Block] setup: - rng = Rng.instance() - chunker = RandomChunker.new(rng, size = 1024'nb, chunkSize = 256'nb) + chunker = RandomChunker.new(Rng.instance(), size = 1024'nb, chunkSize = 256'nb) while true: let chunk = await chunker.getBytes() @@ -144,8 +137,7 @@ asyncchecksuite "NetworkStore engine handlers": blocks.add(Block.new(chunk).tryGet()) - seckey = PrivateKey.random(rng[]).tryGet() - peerId = PeerId.init(seckey.getPublicKey().tryGet()).tryGet() + peerId = PeerId.example wallet = WalletRef.example blockDiscovery = Discovery.new() peerStore = PeerCtxStore.new() @@ -174,7 +166,7 @@ asyncchecksuite "NetworkStore engine handlers": let ctx = await engine.taskQueue.pop() check ctx.id == peerId # only `wantBlock` scheduled - check ctx.peerWants.mapIt(it.address.cidOrTreeCid) == blocks.mapIt(it.cid) + check ctx.wantedBlocks == blocks.mapIt(it.address).toHashSet let done = handler() await engine.wantListHandler(peerId, wantList) @@ -249,6 +241,9 @@ asyncchecksuite "NetworkStore engine handlers": test "Should store blocks in local store": let pending = blocks.mapIt(engine.pendingBlocks.getWantHandle(it.cid)) + for blk in blocks: + peerCtx.blockRequestScheduled(blk.address) + let blocksDelivery = blocks.mapIt(BlockDelivery(blk: it, address: it.address)) # Install NOP for want list cancellations so they don't cause a crash @@ -274,6 +269,9 @@ asyncchecksuite "NetworkStore engine handlers": (it.address, Presence(address: it.address, price: rand(uint16).u256, have: true)) ).toTable + for blk in blocks: + peerContext.blockRequestScheduled(blk.address) + engine.network = BlockExcNetwork( request: BlockExcRequest( sendPayment: proc( @@ -337,33 +335,44 @@ asyncchecksuite "NetworkStore engine handlers": check a in peerCtx.peerHave check peerCtx.blocks[a].price == price - test "Should send cancellations for received blocks": + test "Should send cancellations for requested blocks only": let - pending = blocks.mapIt(engine.pendingBlocks.getWantHandle(it.cid)) - blocksDelivery = blocks.mapIt(BlockDelivery(blk: it, address: it.address)) - cancellations = newTable(blocks.mapIt((it.address, newFuture[void]())).toSeq) + pendingPeer = peerId # peer towards which we have pending block requests + pendingPeerCtx = peerCtx + senderPeer = PeerId.example # peer that will actually send the blocks + senderPeerCtx = BlockExcPeerCtx(id: senderPeer) + reqBlocks = @[blocks[0], blocks[4]] # blocks that we requested to pendingPeer + reqBlockAddrs = reqBlocks.mapIt(it.address) + blockHandles = blocks.mapIt(engine.pendingBlocks.getWantHandle(it.cid)) - peerCtx.blocks = blocks.mapIt( - (it.address, Presence(address: it.address, have: true, price: UInt256.example)) - ).toTable + var cancelled: HashSet[BlockAddress] + + engine.peers.add(senderPeerCtx) + for address in reqBlockAddrs: + pendingPeerCtx.blockRequestScheduled(address) + + for address in blocks.mapIt(it.address): + senderPeerCtx.blockRequestScheduled(address) proc sendWantCancellations( id: PeerId, addresses: seq[BlockAddress] ) {.async: (raises: [CancelledError]).} = + assert id == pendingPeer for address in addresses: - cancellations[address].catch.expect("address should exist").complete() + cancelled.incl(address) engine.network = BlockExcNetwork( request: BlockExcRequest(sendWantCancellations: sendWantCancellations) ) - await engine.blocksDeliveryHandler(peerId, blocksDelivery) - discard await allFinished(pending).wait(100.millis) - await allFuturesThrowing(cancellations.values().toSeq) + let blocksDelivery = blocks.mapIt(BlockDelivery(blk: it, address: it.address)) + await engine.blocksDeliveryHandler(senderPeer, blocksDelivery) + discard await allFinished(blockHandles).wait(100.millis) + + check cancelled == reqBlockAddrs.toHashSet() asyncchecksuite "Block Download": var - rng: Rng seckey: PrivateKey peerId: PeerId chunker: Chunker @@ -380,8 +389,7 @@ asyncchecksuite "Block Download": blocks: seq[Block] setup: - rng = Rng.instance() - chunker = RandomChunker.new(rng, size = 1024'nb, chunkSize = 256'nb) + chunker = RandomChunker.new(Rng.instance(), size = 1024'nb, chunkSize = 256'nb) while true: let chunk = await chunker.getBytes() @@ -390,8 +398,7 @@ asyncchecksuite "Block Download": blocks.add(Block.new(chunk).tryGet()) - seckey = PrivateKey.random(rng[]).tryGet() - peerId = PeerId.init(seckey.getPublicKey().tryGet()).tryGet() + peerId = PeerId.example wallet = WalletRef.example blockDiscovery = Discovery.new() peerStore = PeerCtxStore.new() @@ -409,13 +416,27 @@ asyncchecksuite "Block Download": localStore, wallet, network, discovery, advertiser, peerStore, pendingBlocks ) - peerCtx = BlockExcPeerCtx(id: peerId) + peerCtx = BlockExcPeerCtx(id: peerId, activityTimeout: 100.milliseconds) engine.peers.add(peerCtx) - test "Should exhaust retries": + test "Should reschedule blocks on peer timeout": + let + slowPeer = peerId + fastPeer = PeerId.example + slowPeerCtx = peerCtx + # "Fast" peer has in fact a generous timeout. This should avoid timing issues + # in the test. + fastPeerCtx = BlockExcPeerCtx(id: fastPeer, activityTimeout: 60.seconds) + requestedBlock = blocks[0] + var - retries = 2 - address = BlockAddress.init(blocks[0].cid) + slowPeerWantList = newFuture[void]("slowPeerWantList") + fastPeerWantList = newFuture[void]("fastPeerWantList") + slowPeerDropped = newFuture[void]("slowPeerDropped") + slowPeerBlockRequest = newFuture[void]("slowPeerBlockRequest") + fastPeerBlockRequest = newFuture[void]("fastPeerBlockRequest") + + engine.peers.add(fastPeerCtx) proc sendWantList( id: PeerId, @@ -426,68 +447,63 @@ asyncchecksuite "Block Download": full: bool = false, sendDontHave: bool = false, ) {.async: (raises: [CancelledError]).} = - check wantType == WantHave - check not engine.pendingBlocks.isInFlight(address) - check engine.pendingBlocks.retries(address) == retries - retries -= 1 + check addresses == @[requestedBlock.address] - engine.pendingBlocks.blockRetries = 2 - engine.pendingBlocks.retryInterval = 10.millis - engine.network = - BlockExcNetwork(request: BlockExcRequest(sendWantList: sendWantList)) + if wantType == WantBlock: + if id == slowPeer: + slowPeerBlockRequest.complete() + else: + fastPeerBlockRequest.complete() - let pending = engine.requestBlock(address) + if wantType == WantHave: + if id == slowPeer: + slowPeerWantList.complete() + else: + fastPeerWantList.complete() - expect RetriesExhaustedError: - discard (await pending).tryGet() + proc onPeerDropped( + peer: PeerId + ): Future[void] {.async: (raises: [CancelledError]).} = + assert peer == slowPeer + slowPeerDropped.complete() - test "Should retry block request": - var - address = BlockAddress.init(blocks[0].cid) - steps = newAsyncEvent() + proc selectPeer(peers: seq[BlockExcPeerCtx]): BlockExcPeerCtx = + # Looks for the slow peer. + for peer in peers: + if peer.id == slowPeer: + return peer - proc sendWantList( - id: PeerId, - addresses: seq[BlockAddress], - priority: int32 = 0, - cancel: bool = false, - wantType: WantType = WantType.WantHave, - full: bool = false, - sendDontHave: bool = false, - ) {.async: (raises: [CancelledError]).} = - case wantType - of WantHave: - check engine.pendingBlocks.isInFlight(address) == false - check engine.pendingBlocks.retriesExhausted(address) == false - steps.fire() - of WantBlock: - check engine.pendingBlocks.isInFlight(address) == true - check engine.pendingBlocks.retriesExhausted(address) == false - steps.fire() + return peers[0] - engine.pendingBlocks.blockRetries = 10 - engine.pendingBlocks.retryInterval = 10.millis - engine.network = BlockExcNetwork( - request: BlockExcRequest( - sendWantList: sendWantList, sendWantCancellations: NopSendWantCancellationsProc - ) - ) + engine.selectPeer = selectPeer + engine.pendingBlocks.retryInterval = 200.milliseconds + engine.network = + BlockExcNetwork(request: BlockExcRequest(sendWantList: sendWantList)) + engine.network.handlers.onPeerDropped = onPeerDropped - let pending = engine.requestBlock(address) - await steps.wait() + let blockHandle = engine.requestBlock(requestedBlock.address) - # add blocks precense - peerCtx.blocks = blocks.mapIt( - (it.address, Presence(address: it.address, have: true, price: UInt256.example)) - ).toTable + # Waits for the peer to send its want list to both peers. + await slowPeerWantList.wait(5.seconds) + await fastPeerWantList.wait(5.seconds) - steps.clear() - await steps.wait() + let blockPresence = + @[BlockPresence(address: requestedBlock.address, type: BlockPresenceType.Have)] + await engine.blockPresenceHandler(slowPeer, blockPresence) + await engine.blockPresenceHandler(fastPeer, blockPresence) + # Waits for the peer to ask for the block. + await slowPeerBlockRequest.wait(5.seconds) + # Don't reply and wait for the peer to be dropped by timeout. + await slowPeerDropped.wait(5.seconds) + + # The engine should retry and ask the fast peer for the block. + await fastPeerBlockRequest.wait(5.seconds) await engine.blocksDeliveryHandler( - peerId, @[BlockDelivery(blk: blocks[0], address: address)] + fastPeer, @[BlockDelivery(blk: requestedBlock, address: requestedBlock.address)] ) - check (await pending).tryGet() == blocks[0] + + discard await blockHandle.wait(5.seconds) test "Should cancel block request": var @@ -522,8 +538,6 @@ asyncchecksuite "Block Download": asyncchecksuite "Task Handler": var - rng: Rng - seckey: PrivateKey peerId: PeerId chunker: Chunker wallet: WalletRef @@ -541,8 +555,7 @@ asyncchecksuite "Task Handler": blocks: seq[Block] setup: - rng = Rng.instance() - chunker = RandomChunker.new(rng, size = 1024, chunkSize = 256'nb) + chunker = RandomChunker.new(Rng.instance(), size = 1024, chunkSize = 256'nb) while true: let chunk = await chunker.getBytes() if chunk.len <= 0: @@ -550,8 +563,7 @@ asyncchecksuite "Task Handler": blocks.add(Block.new(chunk).tryGet()) - seckey = PrivateKey.random(rng[]).tryGet() - peerId = PeerId.init(seckey.getPublicKey().tryGet()).tryGet() + peerId = PeerId.example wallet = WalletRef.example blockDiscovery = Discovery.new() peerStore = PeerCtxStore.new() @@ -571,138 +583,72 @@ asyncchecksuite "Task Handler": peersCtx = @[] for i in 0 .. 3: - let seckey = PrivateKey.random(rng[]).tryGet() - peers.add(PeerId.init(seckey.getPublicKey().tryGet()).tryGet()) - + peers.add(PeerId.example) peersCtx.add(BlockExcPeerCtx(id: peers[i])) peerStore.add(peersCtx[i]) engine.pricing = Pricing.example.some - test "Should send want-blocks in priority order": + # FIXME: this is disabled for now: I've dropped block priorities to make + # my life easier as I try to optimize the protocol, and also because + # they were not being used anywhere. + # + # test "Should send want-blocks in priority order": + # proc sendBlocksDelivery( + # id: PeerId, blocksDelivery: seq[BlockDelivery] + # ) {.async: (raises: [CancelledError]).} = + # check blocksDelivery.len == 2 + # check: + # blocksDelivery[1].address == blocks[0].address + # blocksDelivery[0].address == blocks[1].address + + # for blk in blocks: + # (await engine.localStore.putBlock(blk)).tryGet() + # engine.network.request.sendBlocksDelivery = sendBlocksDelivery + + # # second block to send by priority + # peersCtx[0].peerWants.add( + # WantListEntry( + # address: blocks[0].address, + # priority: 49, + # cancel: false, + # wantType: WantType.WantBlock, + # sendDontHave: false, + # ) + # ) + + # # first block to send by priority + # peersCtx[0].peerWants.add( + # WantListEntry( + # address: blocks[1].address, + # priority: 50, + # cancel: false, + # wantType: WantType.WantBlock, + # sendDontHave: false, + # ) + # ) + + # await engine.taskHandler(peersCtx[0]) + + test "Should mark outgoing blocks as sent": proc sendBlocksDelivery( id: PeerId, blocksDelivery: seq[BlockDelivery] ) {.async: (raises: [CancelledError]).} = - check blocksDelivery.len == 2 - check: - blocksDelivery[1].address == blocks[0].address - blocksDelivery[0].address == blocks[1].address + let blockAddress = peersCtx[0].wantedBlocks.toSeq[0] + check peersCtx[0].isBlockSent(blockAddress) for blk in blocks: (await engine.localStore.putBlock(blk)).tryGet() engine.network.request.sendBlocksDelivery = sendBlocksDelivery - # second block to send by priority - peersCtx[0].peerWants.add( - WantListEntry( - address: blocks[0].address, - priority: 49, - cancel: false, - wantType: WantType.WantBlock, - sendDontHave: false, - ) - ) - - # first block to send by priority - peersCtx[0].peerWants.add( - WantListEntry( - address: blocks[1].address, - priority: 50, - cancel: false, - wantType: WantType.WantBlock, - sendDontHave: false, - ) - ) + peersCtx[0].wantedBlocks.incl(blocks[0].address) await engine.taskHandler(peersCtx[0]) - test "Should set in-flight for outgoing blocks": - proc sendBlocksDelivery( - id: PeerId, blocksDelivery: seq[BlockDelivery] - ) {.async: (raises: [CancelledError]).} = - check peersCtx[0].peerWants[0].inFlight - - for blk in blocks: - (await engine.localStore.putBlock(blk)).tryGet() - engine.network.request.sendBlocksDelivery = sendBlocksDelivery - - peersCtx[0].peerWants.add( - WantListEntry( - address: blocks[0].address, - priority: 50, - cancel: false, - wantType: WantType.WantBlock, - sendDontHave: false, - inFlight: false, - ) - ) - await engine.taskHandler(peersCtx[0]) + test "Should not mark blocks for which local look fails as sent": + peersCtx[0].wantedBlocks.incl(blocks[0].address) - test "Should clear in-flight when local lookup fails": - peersCtx[0].peerWants.add( - WantListEntry( - address: blocks[0].address, - priority: 50, - cancel: false, - wantType: WantType.WantBlock, - sendDontHave: false, - inFlight: false, - ) - ) await engine.taskHandler(peersCtx[0]) - check not peersCtx[0].peerWants[0].inFlight - - test "Should send presence": - let present = blocks - let missing = @[Block.new("missing".toBytes).tryGet()] - let price = (!engine.pricing).price - - proc sendPresence( - id: PeerId, presence: seq[BlockPresence] - ) {.async: (raises: [CancelledError]).} = - check presence.mapIt(!Presence.init(it)) == - @[ - Presence(address: present[0].address, have: true, price: price), - Presence(address: present[1].address, have: true, price: price), - Presence(address: missing[0].address, have: false), - ] - - for blk in blocks: - (await engine.localStore.putBlock(blk)).tryGet() - engine.network.request.sendPresence = sendPresence - - # have block - peersCtx[0].peerWants.add( - WantListEntry( - address: present[0].address, - priority: 1, - cancel: false, - wantType: WantType.WantHave, - sendDontHave: false, - ) - ) - - # have block - peersCtx[0].peerWants.add( - WantListEntry( - address: present[1].address, - priority: 1, - cancel: false, - wantType: WantType.WantHave, - sendDontHave: false, - ) - ) - - # don't have block - peersCtx[0].peerWants.add( - WantListEntry( - address: missing[0].address, - priority: 1, - cancel: false, - wantType: WantType.WantHave, - sendDontHave: false, - ) - ) - - await engine.taskHandler(peersCtx[0]) + let blockAddress = peersCtx[0].wantedBlocks.toSeq[0] + check not peersCtx[0].isBlockSent(blockAddress) diff --git a/tests/codex/blockexchange/testnetwork.nim b/tests/codex/blockexchange/testnetwork.nim index b9a51c9d62..ab19a6ae2c 100644 --- a/tests/codex/blockexchange/testnetwork.nim +++ b/tests/codex/blockexchange/testnetwork.nim @@ -40,7 +40,7 @@ asyncchecksuite "Network - Handlers": done = newFuture[void]() buffer = BufferStream.new() network = BlockExcNetwork.new(switch = newStandardSwitch(), connProvider = getConn) - network.setupPeer(peerId) + await network.handlePeerJoined(peerId) networkPeer = network.peers[peerId] discard await networkPeer.connect() diff --git a/tests/codex/blockexchange/testpeerctxstore.nim b/tests/codex/blockexchange/testpeerctxstore.nim index e2983d101d..f348c1d59e 100644 --- a/tests/codex/blockexchange/testpeerctxstore.nim +++ b/tests/codex/blockexchange/testpeerctxstore.nim @@ -81,8 +81,9 @@ suite "Peer Context Store Peer Selection": ) ) - peerCtxs[0].peerWants = entries - peerCtxs[5].peerWants = entries + for address in addresses: + peerCtxs[0].wantedBlocks.incl(address) + peerCtxs[5].wantedBlocks.incl(address) let peers = store.peersWant(addresses[4]) diff --git a/tests/codex/examples.nim b/tests/codex/examples.nim index 52b8a0b8a3..260adbfc0f 100644 --- a/tests/codex/examples.nim +++ b/tests/codex/examples.nim @@ -38,8 +38,7 @@ proc example*(_: type Pricing): Pricing = Pricing(address: EthAddress.example, price: uint32.rand.u256) proc example*(_: type bt.Block, size: int = 4096): bt.Block = - let length = rand(size) - let bytes = newSeqWith(length, rand(uint8)) + let bytes = newSeqWith(size, rand(uint8)) bt.Block.new(bytes).tryGet() proc example*(_: type PeerId): PeerId = diff --git a/tests/codex/helpers.nim b/tests/codex/helpers.nim index 898dd16e10..b855f412d9 100644 --- a/tests/codex/helpers.nim +++ b/tests/codex/helpers.nim @@ -12,13 +12,16 @@ import pkg/codex/rng import pkg/codex/utils import ./helpers/nodeutils +import ./helpers/datasetutils import ./helpers/randomchunker import ./helpers/mockchunker import ./helpers/mockdiscovery import ./helpers/always import ../checktest -export randomchunker, nodeutils, mockdiscovery, mockchunker, always, checktest, manifest +export + randomchunker, nodeutils, datasetutils, mockdiscovery, mockchunker, always, checktest, + manifest export libp2p except setup, eventually @@ -46,23 +49,6 @@ proc lenPrefix*(msg: openArray[byte]): seq[byte] = return buf -proc makeManifestAndTree*(blocks: seq[Block]): ?!(Manifest, CodexTree) = - if blocks.len == 0: - return failure("Blocks list was empty") - - let - datasetSize = blocks.mapIt(it.data.len).foldl(a + b) - blockSize = blocks.mapIt(it.data.len).foldl(max(a, b)) - tree = ?CodexTree.init(blocks.mapIt(it.cid)) - treeCid = ?tree.rootCid - manifest = Manifest.new( - treeCid = treeCid, - blockSize = NBytes(blockSize), - datasetSize = NBytes(datasetSize), - ) - - return success((manifest, tree)) - proc makeWantList*( cids: seq[Cid], priority: int = 0, @@ -91,7 +77,7 @@ proc storeDataGetManifest*( (await store.putBlock(blk)).tryGet() let - (manifest, tree) = makeManifestAndTree(blocks).tryGet() + (_, tree, manifest) = makeDataset(blocks).tryGet() treeCid = tree.rootCid.tryGet() for i in 0 ..< tree.leavesCount: @@ -110,19 +96,6 @@ proc storeDataGetManifest*( return await storeDataGetManifest(store, blocks) -proc makeRandomBlocks*( - datasetSize: int, blockSize: NBytes -): Future[seq[Block]] {.async.} = - var chunker = - RandomChunker.new(Rng.instance(), size = datasetSize, chunkSize = blockSize) - - while true: - let chunk = await chunker.getBytes() - if chunk.len <= 0: - break - - result.add(Block.new(chunk).tryGet()) - proc corruptBlocks*( store: BlockStore, manifest: Manifest, blks, bytes: int ): Future[seq[int]] {.async.} = @@ -147,4 +120,5 @@ proc corruptBlocks*( bytePos.add(ii) blk.data[ii] = byte 0 + return pos diff --git a/tests/codex/helpers/datasetutils.nim b/tests/codex/helpers/datasetutils.nim new file mode 100644 index 0000000000..56f26e343b --- /dev/null +++ b/tests/codex/helpers/datasetutils.nim @@ -0,0 +1,45 @@ +import std/random + +import pkg/chronos +import pkg/codex/blocktype as bt +import pkg/codex/merkletree +import pkg/codex/manifest +import pkg/codex/rng + +import ./randomchunker + +type TestDataset* = tuple[blocks: seq[Block], tree: CodexTree, manifest: Manifest] + +proc makeRandomBlock*(size: NBytes): Block = + let bytes = newSeqWith(size.int, rand(uint8)) + Block.new(bytes).tryGet() + +proc makeRandomBlocks*( + datasetSize: int, blockSize: NBytes +): Future[seq[Block]] {.async.} = + var chunker = + RandomChunker.new(Rng.instance(), size = datasetSize, chunkSize = blockSize) + + while true: + let chunk = await chunker.getBytes() + if chunk.len <= 0: + break + + result.add(Block.new(chunk).tryGet()) + +proc makeDataset*(blocks: seq[Block]): ?!TestDataset = + if blocks.len == 0: + return failure("Blocks list was empty") + + let + datasetSize = blocks.mapIt(it.data.len).foldl(a + b) + blockSize = blocks.mapIt(it.data.len).foldl(max(a, b)) + tree = ?CodexTree.init(blocks.mapIt(it.cid)) + treeCid = ?tree.rootCid + manifest = Manifest.new( + treeCid = treeCid, + blockSize = NBytes(blockSize), + datasetSize = NBytes(datasetSize), + ) + + return success((blocks, tree, manifest)) diff --git a/tests/codex/helpers/mockdiscovery.nim b/tests/codex/helpers/mockdiscovery.nim index 4110c57781..69b61d4911 100644 --- a/tests/codex/helpers/mockdiscovery.nim +++ b/tests/codex/helpers/mockdiscovery.nim @@ -70,3 +70,31 @@ method provide*( return await d.publishHostProvideHandler(d, host) + +proc nullDiscovery*(): MockDiscovery = + proc findBlockProvidersHandler( + d: MockDiscovery, cid: Cid + ): Future[seq[SignedPeerRecord]] {.async: (raises: [CancelledError]).} = + return @[] + + proc publishBlockProvideHandler( + d: MockDiscovery, cid: Cid + ): Future[void] {.async: (raises: [CancelledError]).} = + return + + proc findHostProvidersHandler( + d: MockDiscovery, host: ca.Address + ): Future[seq[SignedPeerRecord]] {.async: (raises: [CancelledError]).} = + return @[] + + proc publishHostProvideHandler( + d: MockDiscovery, host: ca.Address + ): Future[void] {.async: (raises: [CancelledError]).} = + return + + return MockDiscovery( + findBlockProvidersHandler: findBlockProvidersHandler, + publishBlockProvideHandler: publishBlockProvideHandler, + findHostProvidersHandler: findHostProvidersHandler, + publishHostProvideHandler: publishHostProvideHandler, + ) diff --git a/tests/codex/helpers/nodeutils.nim b/tests/codex/helpers/nodeutils.nim index 6d2edd461c..12c3835018 100644 --- a/tests/codex/helpers/nodeutils.nim +++ b/tests/codex/helpers/nodeutils.nim @@ -1,4 +1,5 @@ import std/sequtils +import std/sets import pkg/chronos import pkg/taskpools @@ -12,10 +13,15 @@ import pkg/codex/blockexchange import pkg/codex/systemclock import pkg/codex/nat import pkg/codex/utils/natutils +import pkg/codex/utils/safeasynciter import pkg/codex/slots +import pkg/codex/merkletree +import pkg/codex/manifest import pkg/codex/node +import ./datasetutils +import ./mockdiscovery import ../examples import ../../helpers @@ -58,6 +64,7 @@ type basePort*: int = 8080 createFullNode*: bool = false enableBootstrap*: bool = false + enableDiscovery*: bool = true converter toTuple*( nc: NodesComponents @@ -90,6 +97,36 @@ proc localStores*(cluster: NodesCluster): seq[BlockStore] = proc switches*(cluster: NodesCluster): seq[Switch] = cluster.components.mapIt(it.switch) +proc assignBlocks*( + node: NodesComponents, + dataset: TestDataset, + indices: seq[int], + putMerkleProofs = true, +): Future[void] {.async: (raises: [CatchableError]).} = + let rootCid = dataset.tree.rootCid.tryGet() + + for i in indices: + assert (await node.networkStore.putBlock(dataset.blocks[i])).isOk + if putMerkleProofs: + assert ( + await node.networkStore.putCidAndProof( + rootCid, i, dataset.blocks[i].cid, dataset.tree.getProof(i).tryGet() + ) + ).isOk + +proc assignBlocks*( + node: NodesComponents, + dataset: TestDataset, + indices: HSlice[int, int], + putMerkleProofs = true, +): Future[void] {.async: (raises: [CatchableError]).} = + await assignBlocks(node, dataset, indices.toSeq, putMerkleProofs) + +proc assignBlocks*( + node: NodesComponents, dataset: TestDataset, putMerkleProofs = true +): Future[void] {.async: (raises: [CatchableError]).} = + await assignBlocks(node, dataset, 0 ..< dataset.blocks.len, putMerkleProofs) + proc generateNodes*( num: Natural, blocks: openArray[bt.Block] = [], config: NodeConfig = NodeConfig() ): NodesCluster = @@ -145,13 +182,18 @@ proc generateNodes*( store = RepoStore.new(repoStore.newDb(), mdStore.newDb(), clock = SystemClock.new()) blockDiscoveryStore = bdStore.newDb() - discovery = Discovery.new( - switch.peerInfo.privateKey, - announceAddrs = @[listenAddr], - bindPort = bindPort.Port, - store = blockDiscoveryStore, - bootstrapNodes = bootstrapNodes, - ) + discovery = + if config.enableDiscovery: + Discovery.new( + switch.peerInfo.privateKey, + announceAddrs = @[listenAddr], + bindPort = bindPort.Port, + store = blockDiscoveryStore, + bootstrapNodes = bootstrapNodes, + ) + else: + nullDiscovery() + waitFor store.start() (store.BlockStore, @[bdStore, repoStore, mdStore], discovery) else: @@ -225,6 +267,26 @@ proc generateNodes*( return NodesCluster(components: components, taskpool: taskpool) +proc start*(nodes: NodesComponents) {.async: (raises: [CatchableError]).} = + await allFuturesThrowing( + nodes.switch.start(), + #nodes.blockDiscovery.start(), + nodes.engine.start(), + ) + +proc stop*(nodes: NodesComponents) {.async: (raises: [CatchableError]).} = + await allFuturesThrowing( + nodes.switch.stop(), + # nodes.blockDiscovery.stop(), + nodes.engine.stop(), + ) + +proc start*(nodes: seq[NodesComponents]) {.async: (raises: [CatchableError]).} = + await allFuturesThrowing(nodes.mapIt(it.start()).toSeq) + +proc stop*(nodes: seq[NodesComponents]) {.async: (raises: [CatchableError]).} = + await allFuturesThrowing(nodes.mapIt(it.stop()).toSeq) + proc connectNodes*(nodes: seq[Switch]) {.async.} = for dialer in nodes: for node in nodes: @@ -234,6 +296,15 @@ proc connectNodes*(nodes: seq[Switch]) {.async.} = proc connectNodes*(nodes: seq[NodesComponents]) {.async.} = await connectNodes(nodes.mapIt(it.switch)) +proc connectNodes*(nodes: varargs[NodesComponents]): Future[void] = + # varargs can't be captured on closures, and async procs are closures, + # so we have to do this mess + let copy = nodes.toSeq + ( + proc() {.async.} = + await connectNodes(copy.mapIt(it.switch)) + )() + proc connectNodes*(cluster: NodesCluster) {.async.} = await connectNodes(cluster.components) @@ -252,3 +323,26 @@ proc cleanup*(cluster: NodesCluster) {.async.} = await RepoStore(component.localStore).stop() cluster.taskpool.shutdown() + +proc linearTopology*(nodes: seq[NodesComponents]) {.async.} = + for i in 0 .. nodes.len - 2: + await connectNodes(nodes[i], nodes[i + 1]) + +proc downloadDataset*( + node: NodesComponents, dataset: TestDataset +): Future[void] {.async.} = + # This is the same as fetchBatched, but we don't construct CodexNodes so I can't use + # it here. + let requestAddresses = collect: + for i in 0 ..< dataset.manifest.blocksCount: + BlockAddress.init(dataset.manifest.treeCid, i) + + let blockCids = dataset.blocks.mapIt(it.cid).toHashSet() + + var count = 0 + for blockFut in (await node.networkStore.getBlocks(requestAddresses)): + let blk = (await blockFut).tryGet() + assert blk.cid in blockCids, "Unknown block CID: " & $blk.cid + count += 1 + + assert count == dataset.blocks.len, "Incorrect number of blocks downloaded" diff --git a/tests/codex/node/testnode.nim b/tests/codex/node/testnode.nim index a3fe7e94df..dc3f7a8fe5 100644 --- a/tests/codex/node/testnode.nim +++ b/tests/codex/node/testnode.nim @@ -82,7 +82,7 @@ asyncchecksuite "Test Node - Basic": ).tryGet() test "Block Batching with corrupted blocks": - let blocks = await makeRandomBlocks(datasetSize = 64.KiBs.int, blockSize = 64.KiBs) + let blocks = await makeRandomBlocks(datasetSize = 65536, blockSize = 64.KiBs) assert blocks.len == 1 let blk = blocks[0] diff --git a/tests/codex/node/testslotrepair.nim b/tests/codex/node/testslotrepair.nim index d96078d292..2c778f442c 100644 --- a/tests/codex/node/testslotrepair.nim +++ b/tests/codex/node/testslotrepair.nim @@ -48,6 +48,7 @@ asyncchecksuite "Test Node - Slot Repair": findFreePorts: true, createFullNode: true, enableBootstrap: true, + enableDiscovery: true, ) var manifest: Manifest diff --git a/tests/codex/stores/commonstoretests.nim b/tests/codex/stores/commonstoretests.nim index e4287dd23f..d313277384 100644 --- a/tests/codex/stores/commonstoretests.nim +++ b/tests/codex/stores/commonstoretests.nim @@ -38,8 +38,8 @@ proc commonBlockStoreTests*( newBlock2 = Block.new("2".repeat(100).toBytes()).tryGet() newBlock3 = Block.new("3".repeat(100).toBytes()).tryGet() - (manifest, tree) = - makeManifestAndTree(@[newBlock, newBlock1, newBlock2, newBlock3]).tryGet() + (_, tree, manifest) = + makeDataset(@[newBlock, newBlock1, newBlock2, newBlock3]).tryGet() if not isNil(before): await before() diff --git a/tests/codex/stores/testrepostore.nim b/tests/codex/stores/testrepostore.nim index 69f38711fa..7eb9fd0d39 100644 --- a/tests/codex/stores/testrepostore.nim +++ b/tests/codex/stores/testrepostore.nim @@ -364,9 +364,11 @@ asyncchecksuite "RepoStore": let repo = RepoStore.new(repoDs, metaDs, clock = mockClock, quotaMaxBytes = 1000'nb) - dataset = await makeRandomBlocks(datasetSize = 512, blockSize = 256'nb) - blk = dataset[0] - (manifest, tree) = makeManifestAndTree(dataset).tryGet() + (blocks, tree, manifest) = makeDataset( + await makeRandomBlocks(datasetSize = 2 * 256, blockSize = 256'nb) + ) + .tryGet() + blk = blocks[0] treeCid = tree.rootCid.tryGet() proof = tree.getProof(0).tryGet() @@ -381,9 +383,11 @@ asyncchecksuite "RepoStore": let repo = RepoStore.new(repoDs, metaDs, clock = mockClock, quotaMaxBytes = 1000'nb) - dataset = await makeRandomBlocks(datasetSize = 512, blockSize = 256'nb) - blk = dataset[0] - (manifest, tree) = makeManifestAndTree(dataset).tryGet() + (blocks, tree, manifest) = makeDataset( + await makeRandomBlocks(datasetSize = 2 * 256, blockSize = 256'nb) + ) + .tryGet() + blk = blocks[0] treeCid = tree.rootCid.tryGet() proof = tree.getProof(0).tryGet() @@ -406,9 +410,9 @@ asyncchecksuite "RepoStore": let sharedBlock = blockPool[1] let - (manifest1, tree1) = makeManifestAndTree(dataset1).tryGet() + (_, tree1, manifest1) = makeDataset(dataset1).tryGet() treeCid1 = tree1.rootCid.tryGet() - (manifest2, tree2) = makeManifestAndTree(dataset2).tryGet() + (_, tree2, manifest2) = makeDataset(dataset2).tryGet() treeCid2 = tree2.rootCid.tryGet() (await repo.putBlock(sharedBlock)).tryGet() @@ -435,9 +439,9 @@ asyncchecksuite "RepoStore": let repo = RepoStore.new(repoDs, metaDs, clock = mockClock, quotaMaxBytes = 1000'nb) - dataset = await makeRandomBlocks(datasetSize = 512, blockSize = 256'nb) - blk = dataset[0] - (manifest, tree) = makeManifestAndTree(dataset).tryGet() + blocks = await makeRandomBlocks(datasetSize = 512, blockSize = 256'nb) + blk = blocks[0] + (_, tree, manifest) = makeDataset(blocks).tryGet() treeCid = tree.rootCid.tryGet() proof = tree.getProof(1).tryGet() @@ -455,9 +459,9 @@ asyncchecksuite "RepoStore": let repo = RepoStore.new(repoDs, metaDs, clock = mockClock, quotaMaxBytes = 1000'nb) - dataset = await makeRandomBlocks(datasetSize = 512, blockSize = 256'nb) - blk = dataset[0] - (manifest, tree) = makeManifestAndTree(dataset).tryGet() + blocks = await makeRandomBlocks(datasetSize = 512, blockSize = 256'nb) + blk = blocks[0] + (_, tree, manifest) = makeDataset(blocks).tryGet() treeCid = tree.rootCid.tryGet() proof = tree.getProof(1).tryGet() diff --git a/tests/codex/testblocktype.nim b/tests/codex/testblocktype.nim new file mode 100644 index 0000000000..b0ea2732f9 --- /dev/null +++ b/tests/codex/testblocktype.nim @@ -0,0 +1,44 @@ +import pkg/unittest2 +import pkg/libp2p/cid + +import pkg/codex/blocktype + +import ./examples + +suite "blocktype": + test "should hash equal non-leaf block addresses onto the same hash": + let + cid1 = Cid.example + nonLeaf1 = BlockAddress.init(cid1) + nonLeaf2 = BlockAddress.init(cid1) + + check nonLeaf1 == nonLeaf2 + check nonLeaf1.hash == nonLeaf2.hash + + test "should hash equal leaf block addresses onto the same hash": + let + cid1 = Cid.example + leaf1 = BlockAddress.init(cid1, 0) + leaf2 = BlockAddress.init(cid1, 0) + + check leaf1 == leaf2 + check leaf1.hash == leaf2.hash + + test "should hash different non-leaf block addresses onto different hashes": + let + cid1 = Cid.example + cid2 = Cid.example + nonLeaf1 = BlockAddress.init(cid1) + nonLeaf2 = BlockAddress.init(cid2) + + check nonLeaf1 != nonLeaf2 + check nonLeaf1.hash != nonLeaf2.hash + + test "should hash different leaf block addresses onto different hashes": + let + cid1 = Cid.example + leaf1 = BlockAddress.init(cid1, 0) + leaf2 = BlockAddress.init(cid1, 1) + + check leaf1 != leaf2 + check leaf1.hash != leaf2.hash diff --git a/tests/codex/utils/testsafeasynciter.nim b/tests/codex/utils/testsafeasynciter.nim index 1aeba4d2e5..87b0d84ad9 100644 --- a/tests/codex/utils/testsafeasynciter.nim +++ b/tests/codex/utils/testsafeasynciter.nim @@ -373,7 +373,7 @@ asyncchecksuite "Test SafeAsyncIter": # Now, to make sure that this mechanism works, and to document its # cancellation semantics, this test shows that when the async predicate # function is cancelled, this cancellation has immediate effect, which means - # that `next()` (or more precisely `getNext()` in `mapFilter` function), is + # that `next()` (or more precisely `getNext()` in `mapFilter` function), is # interrupted immediately. If this is the case, the the iterator be interrupted # before `next()` returns this locally captured value from the previous # iteration and this is exactly the reason why at the end of the test @@ -415,3 +415,20 @@ asyncchecksuite "Test SafeAsyncIter": # will not be returned because of the cancellation. collected == @["0", "1"] iter2.finished + + test "should allow chaining": + let + iter1 = SafeAsyncIter[int].new(0 ..< 5) + iter2 = SafeAsyncIter[int].new(5 ..< 10) + iter3 = chain[int](iter1, SafeAsyncIter[int].empty, iter2) + + var collected: seq[int] + + for fut in iter3: + without i =? (await fut), err: + fail() + collected.add(i) + + check: + iter3.finished + collected == @[0, 1, 2, 3, 4, 5, 6, 7, 8, 9] diff --git a/tests/helpers.nim b/tests/helpers.nim index b48b787edd..742bc10d34 100644 --- a/tests/helpers.nim +++ b/tests/helpers.nim @@ -7,12 +7,11 @@ import std/sequtils, chronos export multisetup, trackers, templeveldb ### taken from libp2p errorhelpers.nim -proc allFuturesThrowing*(args: varargs[FutureBase]): Future[void] = +proc allFuturesThrowing(futs: seq[FutureBase]): Future[void] = # This proc is only meant for use in tests / not suitable for general use. # - Swallowing errors arbitrarily instead of aggregating them is bad design # - It raises `CatchableError` instead of the union of the `futs` errors, # inflating the caller's `raises` list unnecessarily. `macro` could fix it - let futs = @args ( proc() {.async: (raises: [CatchableError]).} = await allFutures(futs) @@ -28,6 +27,9 @@ proc allFuturesThrowing*(args: varargs[FutureBase]): Future[void] = raise firstErr )() +proc allFuturesThrowing*(args: varargs[FutureBase]): Future[void] = + allFuturesThrowing(@args) + proc allFuturesThrowing*[T](futs: varargs[Future[T]]): Future[void] = allFuturesThrowing(futs.mapIt(FutureBase(it)))