Skip to content
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
6a057a2
Add missing test
markspanbroek Mar 25, 2021
b8e5f19
Fix warnings
markspanbroek Apr 1, 2021
9fb50a3
Remove duplicate requirement
markspanbroek Apr 1, 2021
3daa26f
Add package nim-nitro
markspanbroek Apr 1, 2021
8e1995a
Workaround no longer necessary
markspanbroek Apr 1, 2021
bd68f2c
Remove deprecated libp2p import
markspanbroek Apr 6, 2021
e55ac4e
Add nitro wallet to BitswapNetwork
markspanbroek Apr 7, 2021
59e19ef
Add package questionable
markspanbroek Apr 7, 2021
3a9066a
protobuf message for bandwidth pricing
markspanbroek Apr 7, 2021
70473aa
Add package upraises
markspanbroek Apr 7, 2021
ac2c7c2
Ensure that encoding of pricing doesn't raise
markspanbroek Apr 7, 2021
c78b460
protobuf message for state channel updates
markspanbroek Apr 7, 2021
8465660
Fix tests on 32 bit platforms
markspanbroek Apr 7, 2021
06766f2
Move pricing and payments into bitswap protobuf message
markspanbroek Apr 8, 2021
c5ee2d3
Fix typo
markspanbroek Apr 8, 2021
acaa372
Add handler for pricing messages
markspanbroek Apr 8, 2021
0491558
Add handler for payment messages
markspanbroek Apr 8, 2021
1afdf7a
Broadcast pricing
markspanbroek Apr 8, 2021
d0aec00
Broadcast payment
markspanbroek Apr 8, 2021
ba41405
Exchange pricing when connecting to new peer
markspanbroek Apr 8, 2021
a95e4b0
Remove dead code
markspanbroek Apr 8, 2021
6eabd82
Revert "Add nitro wallet to BitswapNetwork"
markspanbroek Apr 8, 2021
835357b
Add nitro wallet to BitswapEngine
markspanbroek Apr 8, 2021
670b9fb
Move peer context into its own module
markspanbroek Apr 12, 2021
fd5d370
Update to latest versions of nitro and questionable
markspanbroek Apr 12, 2021
b75eed2
Add proc to engine that pays peers for bytes
markspanbroek Apr 14, 2021
6535fd8
Engine sends payments for received blocks
markspanbroek Apr 14, 2021
a1deed8
Fix build failure on 32 bit platforms
markspanbroek Apr 14, 2021
56995dc
Update to latest versions of nitro and questionable
markspanbroek Apr 15, 2021
254bca9
Use reference semantics for wallets
markspanbroek Apr 19, 2021
4b716d2
Receive payments for blocks that were sent
markspanbroek Apr 19, 2021
10cef2d
Hard-code asset address
markspanbroek Apr 22, 2021
4a21d30
Add pricing to block presence messages
markspanbroek Apr 26, 2021
ddbe304
Add block prices to peer context
markspanbroek Apr 26, 2021
a7698e1
Update to version 0.9.1 of questionable
markspanbroek May 10, 2021
fae61fb
Simplify test
markspanbroek May 10, 2021
57894f0
Simplify
markspanbroek May 10, 2021
6bc537d
Send block prices
markspanbroek May 10, 2021
5488ef0
Pay per-block price instead of per-peer price
markspanbroek May 10, 2021
66d5172
Remove debt ratio
markspanbroek May 10, 2021
b3fbe89
Remove double bookkeeping in peerHave and peerPrices
markspanbroek May 10, 2021
8ef6787
Replace pricing exchange by account exchange
markspanbroek May 10, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions dagger.nimble
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,7 @@ requires "nim >= 1.2.6",
"secp256k1",
"stew#head",
"protobufserialization >= 0.2.0 & < 0.3.0",
"asynctest >= 0.2.1 & < 0.3.0",
"stew"
"https://github.com/status-im/nim-nitro >= 0.2.0 & < 0.3.0",
"questionable >= 0.5.0 & < 0.6.0",
"upraises >= 0.1.0 & < 0.2.0",
"asynctest >= 0.2.1 & < 0.3.0"
24 changes: 6 additions & 18 deletions dagger/bitswap.nim
Original file line number Diff line number Diff line change
Expand Up @@ -104,31 +104,15 @@ method putBlocks*(b: Bitswap, blocks: seq[bt.Block]) =
proc new*(
T: type Bitswap,
localStore: BlockStore,
wallet: Wallet,
network: BitswapNetwork,
concurrentTasks = DefaultConcurrentTasks,
maxRetries = DefaultMaxRetries,
peersPerRequest = DefaultMaxPeersPerRequest): T =

proc sendWantList(
id: PeerID,
cids: seq[Cid],
priority: int32 = 0,
cancel: bool = false,
wantType: WantType = WantType.wantHave,
full: bool = false,
sendDontHave: bool = false) {.gcsafe.} =
network.broadcastWantList(
id, cids, priority, cancel,
wantType, full, sendDontHave)

proc sendBlocks(id: PeerID, blocks: seq[bt.Block]) {.gcsafe.} =
network.broadcastBlocks(id, blocks)

proc sendPresence(id: PeerID, presence: seq[BlockPresence]) {.gcsafe.} =
network.broadcastBlockPresence(id, presence)

let engine = BitswapEngine.new(
localStore = localStore,
wallet = wallet,
peersPerRequest = peersPerRequest,
request = network.request,
)
Expand Down Expand Up @@ -174,10 +158,14 @@ proc new*(
blocks: seq[bt.Block]) {.gcsafe.} =
engine.blocksHandler(peer, blocks)

proc pricingHandler(peer: PeerId, pricing: Pricing) =
engine.pricingHandler(peer, pricing)

network.handlers = BitswapHandlers(
onWantList: blockWantListHandler,
onBlocks: blocksHandler,
onPresence: blockPresenceHandler,
onPricing: pricingHandler
)

return b
38 changes: 17 additions & 21 deletions dagger/bitswap/engine.nim
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import ../utils/asyncheapqueue

import ./network
import ./pendingblocks
import ./peercontext

export peercontext

logScope:
topics = "dagger bitswap engine"
Expand All @@ -34,15 +37,6 @@ type
TaskHandler* = proc(task: BitswapPeerCtx): Future[void] {.gcsafe.}
TaskScheduler* = proc(task: BitswapPeerCtx): bool {.gcsafe.}

BitswapPeerCtx* = ref object of RootObj
id*: PeerID
peerHave*: seq[Cid] # remote peers have lists
peerWants*: seq[Entry] # remote peers want lists
bytesSent*: int # bytes sent to remote
bytesRecv*: int # bytes received from remote
exchanged*: int # times peer has exchanged with us
lastExchange*: Moment # last time peer has exchanged with us

BitswapEngine* = ref object of RootObj
localStore*: BlockStore # where we localStore blocks for this instance
peers*: seq[BitswapPeerCtx] # peers we're currently actively exchanging with
Expand All @@ -51,25 +45,15 @@ type
peersPerRequest: int # max number of peers to request from
scheduleTask*: TaskScheduler # schedule a new task with the task runner
request*: BitswapRequest # bitswap network requests
wallet*: Wallet # nitro wallet for micropayments
pricing*: ?Pricing # optional bandwidth pricing

proc contains*(a: AsyncHeapQueue[Entry], b: Cid): bool =
## Convenience method to check for entry prepense
##

a.anyIt( it.cid == b )

proc contains*(a: openArray[BitswapPeerCtx], b: PeerID): bool =
## Convenience method to check for peer prepense
##

a.anyIt( it.id == b )

proc debtRatio*(b: BitswapPeerCtx): float =
b.bytesSent / (b.bytesRecv + 1)

proc `<`*(a, b: BitswapPeerCtx): bool =
a.debtRatio < b.debtRatio

proc getPeerCtx*(b: BitswapEngine, peerId: PeerID): BitswapPeerCtx =
## Get the peer's context
##
Expand Down Expand Up @@ -259,6 +243,13 @@ proc wantListHandler*(
if not b.scheduleTask(peerCtx):
trace "Unable to schedule task for peer", peer

proc pricingHandler*(engine: BitswapEngine, peer: PeerID, pricing: Pricing) =
let context = engine.getPeerCtx(peer)
if context.isNil:
return

context.pricing = pricing.some

proc setupPeer*(b: BitswapEngine, peer: PeerID) =
## Perform initial setup, such as want
## list exchange
Expand All @@ -274,6 +265,9 @@ proc setupPeer*(b: BitswapEngine, peer: PeerID) =
if b.wantList.len > 0:
b.request.sendWantList(peer, b.wantList, full = true)

if b.pricing.isSome:
b.request.sendPricing(peer, b.pricing.get)

proc dropPeer*(b: BitswapEngine, peer: PeerID) =
## Cleanup disconnected peer
##
Expand Down Expand Up @@ -329,6 +323,7 @@ proc taskHandler*(b: BitswapEngine, task: BitswapPeerCtx) {.gcsafe, async.} =
proc new*(
T: type BitswapEngine,
localStore: BlockStore,
wallet: Wallet,
request: BitswapRequest = BitswapRequest(),
scheduleTask: TaskScheduler = nil,
peersPerRequest = DefaultMaxPeersPerRequest): T =
Expand All @@ -343,6 +338,7 @@ proc new*(
peersPerRequest: peersPerRequest,
scheduleTask: taskScheduler,
request: request,
wallet: wallet
)

return b
50 changes: 50 additions & 0 deletions dagger/bitswap/network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ import pkg/libp2p

import ../blocktype as bt
import ./protobuf/bitswap as pb
import ./protobuf/payments
import ./networkpeer

export pb, networkpeer
export payments

logScope:
topics = "dagger bitswap network"
Expand All @@ -29,11 +31,15 @@ type
WantListHandler* = proc(peer: PeerID, wantList: WantList) {.gcsafe.}
BlocksHandler* = proc(peer: PeerID, blocks: seq[bt.Block]) {.gcsafe.}
BlockPresenceHandler* = proc(peer: PeerID, precense: seq[BlockPresence]) {.gcsafe.}
PricingHandler* = proc(peer: PeerID, pricing: Pricing) {.gcsafe.}
PaymentHandler* = proc(peer: PeerID, payment: SignedState) {.gcsafe.}

BitswapHandlers* = object
onWantList*: WantListHandler
onBlocks*: BlocksHandler
onPresence*: BlockPresenceHandler
onPricing*: PricingHandler
onPayment*: PaymentHandler

WantListBroadcaster* = proc(
id: PeerID,
Expand All @@ -46,11 +52,13 @@ type

BlocksBroadcaster* = proc(peer: PeerID, presence: seq[bt.Block]) {.gcsafe.}
PresenceBroadcaster* = proc(peer: PeerID, presence: seq[BlockPresence]) {.gcsafe.}
PricingBroadcaster* = proc(peer: PeerID, pricing: Pricing) {.gcsafe.}

BitswapRequest* = object
sendWantList*: WantListBroadcaster
sendBlocks*: BlocksBroadcaster
sendPresence*: PresenceBroadcaster
sendPricing*: PricingBroadcaster

BitswapNetwork* = ref object of LPProtocol
peers*: Table[PeerID, NetworkPeer]
Expand Down Expand Up @@ -192,6 +200,38 @@ proc broadcastBlockPresence*(
trace "Sending presence to peer", peer = id
asyncSpawn b.peers[id].send(Message(blockPresences: presence))

proc handlePricing(network: BitswapNetwork,
peer: NetworkPeer,
pricing: Pricing) =
if network.handlers.onPricing.isNil:
return
network.handlers.onPricing(peer.id, pricing)

proc broadcastPricing*(network: BitswapNetwork,
id: PeerId,
pricing: Pricing) =
if id notin network.peers:
return

let message = Message(pricing: PricingMessage.init(pricing))
asyncSpawn network.peers[id].send(message)

proc broadcastPayment*(network: BitswapNetwork,
id: PeerId,
payment: SignedState) =
if id notin network.peers:
return

let message = Message(payment: StateChannelUpdate.init(payment))
asyncSpawn network.peers[id].send(message)

proc handlePayment(network: BitswapNetwork,
peer: NetworkPeer,
payment: SignedState) =
if network.handlers.onPayment.isNil:
return
network.handlers.onPayment(peer.id, payment)

proc rpcHandler(b: BitswapNetwork, peer: NetworkPeer, msg: Message) {.async.} =
try:
if msg.wantlist.entries.len > 0:
Expand All @@ -206,6 +246,12 @@ proc rpcHandler(b: BitswapNetwork, peer: NetworkPeer, msg: Message) {.async.} =
if msg.blockPresences.len > 0:
b.handleBlockPresence(peer, msg.blockPresences)

if pricing =? Pricing.init(msg.pricing):
b.handlePricing(peer, pricing)

if payment =? SignedState.init(msg.payment):
b.handlePayment(peer, payment)

except CatchableError as exc:
trace "Exception in bitswap rpc handler", exc = exc.msg

Expand Down Expand Up @@ -299,10 +345,14 @@ proc new*(
proc sendPresence(id: PeerID, presence: seq[BlockPresence]) {.gcsafe.} =
b.broadcastBlockPresence(id, presence)

proc sendPricing(id: PeerID, pricing: Pricing) =
b.broadcastPricing(id, pricing)

b.request = BitswapRequest(
sendWantList: sendWantList,
sendBlocks: sendBlocks,
sendPresence: sendPresence,
sendPricing: sendPricing
)

b.init()
Expand Down
30 changes: 30 additions & 0 deletions dagger/bitswap/peercontext.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import std/sequtils
import pkg/libp2p
import pkg/chronos
import pkg/questionable
import ./protobuf/bitswap
import ./protobuf/payments

type
BitswapPeerCtx* = ref object of RootObj
id*: PeerID
peerHave*: seq[Cid] # remote peers have lists
peerWants*: seq[Entry] # remote peers want lists
bytesSent*: int # bytes sent to remote
bytesRecv*: int # bytes received from remote
exchanged*: int # times peer has exchanged with us
lastExchange*: Moment # last time peer has exchanged with us
pricing*: ?Pricing # optional bandwidth price for this peer

proc contains*(a: openArray[BitswapPeerCtx], b: PeerID): bool =
## Convenience method to check for peer prepense
##

a.anyIt( it.id == b )

proc debtRatio*(b: BitswapPeerCtx): float =
b.bytesSent / (b.bytesRecv + 1)

proc `<`*(a, b: BitswapPeerCtx): bool =
a.debtRatio < b.debtRatio

1 change: 1 addition & 0 deletions dagger/bitswap/protobuf/bitswap.nim
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import_proto3 "message.proto"
export Message
export Wantlist, WantType, Entry
export Block, BlockPresenceType, BlockPresence
export PricingMessage, StateChannelUpdate

proc hash*(e: Entry): Hash =
hash(e.`block`)
Expand Down
12 changes: 12 additions & 0 deletions dagger/bitswap/protobuf/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,21 @@ message Message {
BlockPresenceType type = 2;
}

message PricingMessage {
bytes address = 1; // Ethereum address to which payments should be made
bytes asset = 2; // Asset (coin) with which to pay
bytes price = 3; // Amount of assets to pay per byte (UInt256)
}

message StateChannelUpdate {
bytes update = 1; // Signed Nitro state, serialized as JSON
}

Wantlist wantlist = 1;
repeated bytes blocks = 2; // used to send Blocks in bitswap 1.0.0
repeated Block payload = 3; // used to send Blocks in bitswap 1.1.0
repeated BlockPresence blockPresences = 4;
int32 pendingBytes = 5;
PricingMessage pricing = 6;
StateChannelUpdate payment = 7;
}
53 changes: 53 additions & 0 deletions dagger/bitswap/protobuf/payments.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import pkg/protobuf_serialization
import pkg/stew/byteutils
import pkg/nitro
import pkg/questionable
import pkg/upraises
import ./bitswap

export PricingMessage
export StateChannelUpdate

export nitro

push: {.upraises: [].}

type
Pricing* = object
address*: EthAddress
asset*: EthAddress
price*: UInt256

func init*(_: type PricingMessage, pricing: Pricing): PricingMessage =
PricingMessage(
address: @(pricing.address.toArray),
asset: @(pricing.asset.toArray),
price: @(pricing.price.toBytesBE)
)

func parse(_: type EthAddress, bytes: seq[byte]): ?EthAddress =
var address: array[20, byte]
if bytes.len != address.len:
return EthAddress.none
for i in 0..<address.len:
address[i] = bytes[i]
EthAddress(address).some

func parse(_: type UInt256, bytes: seq[byte]): ?UInt256 =
if bytes.len > 32:
return UInt256.none
UInt256.fromBytesBE(bytes).some

func init*(_: type Pricing, message: PricingMessage): ?Pricing =
let address = EthAddress.parse(message.address)
let asset = EThAddress.parse(message.asset)
let price = UInt256.parse(message.price)
if address.isNone or asset.isNone or price.isNone:
return Pricing.none
Pricing(address: address.get, asset: asset.get, price: price.get).some

func init*(_: type StateChannelUpdate, state: SignedState): StateChannelUpdate =
StateChannelUpdate(update: state.toJson.toBytes)

proc init*(_: type SignedState, update: StateChannelUpdate): ?SignedState =
SignedState.fromJson(string.fromBytes(update.update))
Loading