@@ -6,6 +6,8 @@ const DecisionEngine = require('./decision-engine')
66const Notifications = require ( './notifications' )
77const logger = require ( './utils' ) . logger
88const Stats = require ( './stats' )
9+ const AbortController = require ( 'abort-controller' )
10+ const anySignal = require ( 'any-signal' )
911
1012const defaultOptions = {
1113 statsEnabled : false ,
@@ -101,9 +103,10 @@ class Bitswap {
101103 this . _log ( 'received block' )
102104
103105 const has = await this . blockstore . has ( block . cid )
106+
104107 this . _updateReceiveCounters ( peerId . toB58String ( ) , block , has )
105108
106- if ( has || ! wasWanted ) {
109+ if ( ! wasWanted ) {
107110 return
108111 }
109112
@@ -176,65 +179,88 @@ class Bitswap {
176179 * blockstore it is returned, otherwise the block is added to the wantlist and returned once another node sends it to us.
177180 *
178181 * @param {CID } cid
182+ * @param {Object } options
183+ * @param {AbortSignal } options.abortSignal
179184 * @returns {Promise<Block> }
180185 */
181- async get ( cid ) {
182- for await ( const block of this . getMany ( [ cid ] ) ) {
183- return block
186+ async get ( cid , options = { } ) {
187+ const fetchFromNetwork = ( cid , options ) => {
188+ // add it to the want list - n.b. later we will abort the AbortSignal
189+ // so no need to remove the blocks from the wantlist after we have it
190+ this . wm . wantBlocks ( [ cid ] , options )
191+
192+ return this . notifications . wantBlock ( cid , options )
184193 }
185- }
186194
187- /**
188- * Fetch a a list of blocks by cid. If the blocks are in the local
189- * blockstore they are returned, otherwise the blocks are added to the wantlist and returned once another node sends them to us.
190- *
191- * @param {Iterable<CID> } cids
192- * @returns {Promise<AsyncIterator<Block>> }
193- */
194- async * getMany ( cids ) {
195- let pendingStart = cids . length
196- const wantList = [ ]
197195 let promptedNetwork = false
198196
199- const fetchFromNetwork = async ( cid ) => {
200- wantList . push ( cid )
197+ const loadOrFetchFromNetwork = async ( cid , options ) => {
198+ try {
199+ // have to await here as we want to handle ERR_NOT_FOUND
200+ const block = await this . blockstore . get ( cid , options )
201201
202- const blockP = this . notifications . wantBlock ( cid )
202+ return block
203+ } catch ( err ) {
204+ if ( err . code !== 'ERR_NOT_FOUND' ) {
205+ throw err
206+ }
203207
204- if ( ! pendingStart ) {
205- this . wm . wantBlocks ( wantList )
206- }
208+ if ( ! promptedNetwork ) {
209+ promptedNetwork = true
207210
208- const block = await blockP
209- this . wm . cancelWants ( [ cid ] )
211+ this . network . findAndConnect ( cid )
212+ . catch ( ( err ) => this . _log . error ( err ) )
213+ }
210214
211- return block
215+ // we don't have the block locally so fetch it from the network
216+ return fetchFromNetwork ( cid , options )
217+ }
212218 }
213219
214- for ( const cid of cids ) {
215- const has = await this . blockstore . has ( cid )
216- pendingStart --
217- if ( has ) {
218- if ( ! pendingStart ) {
219- this . wm . wantBlocks ( wantList )
220- }
221- yield this . blockstore . get ( cid )
220+ // depending on implementation it's possible for blocks to come in while
221+ // we do the async operations to get them from the blockstore leading to
222+ // a race condition, so register for incoming block notifications as well
223+ // as trying to get it from the datastore
224+ const controller = new AbortController ( )
225+ const signal = anySignal ( [ options . signal , controller . signal ] )
226+
227+ const block = await Promise . race ( [
228+ this . notifications . wantBlock ( cid , {
229+ signal
230+ } ) ,
231+ loadOrFetchFromNetwork ( cid , {
232+ signal
233+ } )
234+ ] )
222235
223- continue
224- }
236+ // since we have the block we can now remove our listener
237+ controller . abort ( )
225238
226- if ( ! promptedNetwork ) {
227- promptedNetwork = true
228- this . network . findAndConnect ( cids [ 0 ] ) . catch ( ( err ) => this . _log . error ( err ) )
229- }
239+ return block
240+ }
230241
231- // we don't have the block locally so fetch it from the network
232- yield fetchFromNetwork ( cid )
242+ /**
243+ * Fetch a a list of blocks by cid. If the blocks are in the local
244+ * blockstore they are returned, otherwise the blocks are added to the wantlist and returned once another node sends them to us.
245+ *
246+ * @param {AsyncIterator<CID> } cids
247+ * @param {Object } options
248+ * @param {AbortSignal } options.abortSignal
249+ * @returns {Promise<AsyncIterator<Block>> }
250+ */
251+ async * getMany ( cids , options = { } ) {
252+ for await ( const cid of cids ) {
253+ yield this . get ( cid , options )
233254 }
234255 }
235256
236257 /**
237- * Removes the given CIDs from the wantlist independent of any ref counts
258+ * Removes the given CIDs from the wantlist independent of any ref counts.
259+ *
260+ * This will cause all outstanding promises for a given block to reject.
261+ *
262+ * If you want to cancel the want for a block without doing that, pass an
263+ * AbortSignal in to `.get` or `.getMany` and abort it.
238264 *
239265 * @param {Iterable<CID> } cids
240266 * @returns {void }
@@ -249,7 +275,9 @@ class Bitswap {
249275 }
250276
251277 /**
252- * Removes the given keys from the want list
278+ * Removes the given keys from the want list. This may cause pending promises
279+ * for blocks to never resolve. If you wish these promises to abort instead
280+ * call `unwant(cids)` instead.
253281 *
254282 * @param {Iterable<CID> } cids
255283 * @returns {void }
@@ -268,46 +296,40 @@ class Bitswap {
268296 * @param {Block } block
269297 * @returns {Promise<void> }
270298 */
271- async put ( block ) { // eslint-disable-line require-await
272- return this . putMany ( [ block ] )
299+ async put ( block ) {
300+ await this . blockstore . put ( block )
301+ this . _sendHaveBlockNotifications ( block )
273302 }
274303
275304 /**
276305 * Put the given blocks to the underlying blockstore and
277306 * send it to nodes that have it them their wantlist.
278307 *
279- * @param {AsyncIterable<Block>|Iterable<Block> } blocks
280- * @returns {Promise<void > }
308+ * @param {AsyncIterable<Block> } blocks
309+ * @returns {AsyncIterable<Block > }
281310 */
282- async putMany ( blocks ) { // eslint-disable-line require-await
283- const self = this
284-
285- // Add any new blocks to the blockstore
286- const newBlocks = [ ]
287- await this . blockstore . putMany ( async function * ( ) {
288- for await ( const block of blocks ) {
289- if ( await self . blockstore . has ( block . cid ) ) {
290- continue
291- }
292-
293- yield block
294- newBlocks . push ( block )
295- }
296- } ( ) )
297-
298- // Notify engine that we have new blocks
299- this . engine . receivedBlocks ( newBlocks )
311+ async * putMany ( blocks ) {
312+ for await ( const block of this . blockstore . putMany ( blocks ) ) {
313+ this . _sendHaveBlockNotifications ( block )
300314
301- // Notify listeners that we have received the new blocks
302- for ( const block of newBlocks ) {
303- this . notifications . hasBlock ( block )
304- // Note: Don't wait for provide to finish before returning
305- this . network . provide ( block . cid ) . catch ( ( err ) => {
306- self . _log . error ( 'Failed to provide: %s' , err . message )
307- } )
315+ yield block
308316 }
309317 }
310318
319+ /**
320+ * Sends notifications about the arrival of a block
321+ *
322+ * @param {Block } block
323+ */
324+ _sendHaveBlockNotifications ( block ) {
325+ this . notifications . hasBlock ( block )
326+ this . engine . receivedBlocks ( [ block ] )
327+ // Note: Don't wait for provide to finish before returning
328+ this . network . provide ( block . cid ) . catch ( ( err ) => {
329+ this . _log . error ( 'Failed to provide: %s' , err . message )
330+ } )
331+ }
332+
311333 /**
312334 * Get the current list of wants.
313335 *
0 commit comments