From c97ac25966c18b86a5552e5a1fc1ef75c20576ee Mon Sep 17 00:00:00 2001 From: Dr Washington Sanchez Date: Tue, 16 Apr 2019 10:20:10 +1000 Subject: [PATCH 1/6] tweak: Remove pause in message retrieval during startup --- mobile/node.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mobile/node.go b/mobile/node.go index 1ab091a016..d055d733a3 100644 --- a/mobile/node.go +++ b/mobile/node.go @@ -485,7 +485,7 @@ func (n *Node) start() error { PR := rep.NewPointerRepublisher(n.OpenBazaarNode.DHT, n.OpenBazaarNode.Datastore, n.OpenBazaarNode.PushNodes, n.OpenBazaarNode.IsModerator) go PR.Run() n.OpenBazaarNode.PointerRepublisher = PR - MR.Wait() + // MR.Wait() n.OpenBazaarNode.PublishLock.Unlock() publishUnlocked = true From e01532feb5d8a3d86257c99b2930fb220b7e716b Mon Sep 17 00:00:00 2001 From: Dr Washington Sanchez Date: Tue, 16 Apr 2019 10:25:29 +1000 Subject: [PATCH 2/6] tweak: Add timeout to prevent lengthy query failures This significantly reduces the time it takes to publish a record to IPNS. --- .../go-libp2p-kad-dht/lookup.go | 3 +++ .../go-libp2p-kad-dht/lookup.go | 3 +++ 2 files changed, 6 insertions(+) diff --git a/vendor/gx/ipfs/QmSY3nkMNLzh9GdbFKK5tT7YMfLpf52iUZ8ZRkr29MJaa5/go-libp2p-kad-dht/lookup.go b/vendor/gx/ipfs/QmSY3nkMNLzh9GdbFKK5tT7YMfLpf52iUZ8ZRkr29MJaa5/go-libp2p-kad-dht/lookup.go index bc02a43aed..d8416ab585 100644 --- a/vendor/gx/ipfs/QmSY3nkMNLzh9GdbFKK5tT7YMfLpf52iUZ8ZRkr29MJaa5/go-libp2p-kad-dht/lookup.go +++ b/vendor/gx/ipfs/QmSY3nkMNLzh9GdbFKK5tT7YMfLpf52iUZ8ZRkr29MJaa5/go-libp2p-kad-dht/lookup.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strings" + "time" kb "gx/ipfs/QmSNE1XryoCMnZCbRaj1D23k6YKCaTQ386eJciu1pAfu8M/go-libp2p-kbucket" pb "gx/ipfs/QmSY3nkMNLzh9GdbFKK5tT7YMfLpf52iUZ8ZRkr29MJaa5/go-libp2p-kad-dht/pb" @@ -65,6 +66,7 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee // since the query doesnt actually pass our context down // we have to hack this here. whyrusleeping isnt a huge fan of goprocess parent := ctx + ctx, _ = context.WithTimeout(ctx, time.Second*3) query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) { // For DHT query command notif.PublishQueryEvent(parent, ¬if.QueryEvent{ @@ -72,6 +74,7 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee ID: p, }) + ctx, _ = context.WithTimeout(ctx, time.Second*3) pmes, err := dht.findPeerSingle(ctx, p, peer.ID(key)) if err != nil { logger.Debugf("error getting closer peers: %s", err) diff --git a/vendor/gx/ipfs/QmdR6WN3TUEAVQ9KWE2UiFJikWTbUvgBJay6mjB4yUJebq/go-libp2p-kad-dht/lookup.go b/vendor/gx/ipfs/QmdR6WN3TUEAVQ9KWE2UiFJikWTbUvgBJay6mjB4yUJebq/go-libp2p-kad-dht/lookup.go index 0cc7e0dc59..6f52b98d1a 100644 --- a/vendor/gx/ipfs/QmdR6WN3TUEAVQ9KWE2UiFJikWTbUvgBJay6mjB4yUJebq/go-libp2p-kad-dht/lookup.go +++ b/vendor/gx/ipfs/QmdR6WN3TUEAVQ9KWE2UiFJikWTbUvgBJay6mjB4yUJebq/go-libp2p-kad-dht/lookup.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strings" + "time" kb "gx/ipfs/QmSNE1XryoCMnZCbRaj1D23k6YKCaTQ386eJciu1pAfu8M/go-libp2p-kbucket" cid "gx/ipfs/QmTbxNB1NwDesLmKTscr4udL2tVP7MaxvXnD1D9yX7g3PN/go-cid" @@ -65,6 +66,7 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee // since the query doesnt actually pass our context down // we have to hack this here. whyrusleeping isnt a huge fan of goprocess parent := ctx + ctx, _ = context.WithTimeout(ctx, time.Second*3) query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) { // For DHT query command notif.PublishQueryEvent(parent, ¬if.QueryEvent{ @@ -72,6 +74,7 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee ID: p, }) + ctx, _ = context.WithTimeout(ctx, time.Second*3) pmes, err := dht.findPeerSingle(ctx, p, peer.ID(key)) if err != nil { logger.Debugf("error getting closer peers: %s", err) From c519cd452a39f2e270e354048a275fa4848730e3 Mon Sep 17 00:00:00 2001 From: Dr Washington Sanchez Date: Fri, 3 Apr 2020 11:42:00 +1000 Subject: [PATCH 3/6] feat: Attempt fetch of offline messages once per session --- mobile/node.go | 1 + net/retriever/retriever.go | 25 ++++++++++++++++++++++--- 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/mobile/node.go b/mobile/node.go index d055d733a3..5030e302b9 100644 --- a/mobile/node.go +++ b/mobile/node.go @@ -480,6 +480,7 @@ func (n *Node) start() error { SendAck: n.OpenBazaarNode.SendOfflineAck, SendError: n.OpenBazaarNode.SendError, }) + go MR.ResetPointerList() go MR.Run() n.OpenBazaarNode.MessageRetriever = MR PR := rep.NewPointerRepublisher(n.OpenBazaarNode.DHT, n.OpenBazaarNode.Datastore, n.OpenBazaarNode.PushNodes, n.OpenBazaarNode.IsModerator) diff --git a/net/retriever/retriever.go b/net/retriever/retriever.go index a7399a28dc..cbe765eab9 100644 --- a/net/retriever/retriever.go +++ b/net/retriever/retriever.go @@ -29,7 +29,11 @@ import ( const DefaultPointerPrefixLength = 14 -var log = logging.MustGetLogger("retriever") +var ( + // Initialize a clear pointerList for the DHT on start + pointerList = []string{} + log = logging.MustGetLogger("retriever") +) type MRConfig struct { Db repo.Datastore @@ -66,6 +70,20 @@ type offlineMessage struct { env pb.Envelope } +func stringInSlice(str string, list []string) bool { + for _, v := range list { + if v == str { + return true + } + } + return false +} + +// Reset on startup +func (m *MessageRetriever) ResetPointerList() { + pointerList = []string{} +} + func NewMessageRetriever(cfg MRConfig) *MessageRetriever { var client *http.Client if cfg.Dialer != nil { @@ -100,8 +118,8 @@ func (m *MessageRetriever) Run() { peers := time.NewTicker(time.Minute) defer dht.Stop() defer peers.Stop() - go m.fetchPointersFromDHT() go m.fetchPointersFromPushNodes() + go m.fetchPointersFromDHT() for { select { case <-dht.C: @@ -159,7 +177,8 @@ func (m *MessageRetriever) downloadMessages(peerOut chan ps.PeerInfo) { inFlight := make(map[string]bool) // Iterate over the pointers, adding 1 to the waitgroup for each pointer found for p := range peerOut { - if len(p.Addrs) > 0 && !m.db.OfflineMessages().Has(p.Addrs[0].String()) && !inFlight[p.Addrs[0].String()] { + if len(p.Addrs) > 0 && !m.db.OfflineMessages().Has(p.Addrs[0].String()) && !stringInSlice(p.Addrs[0].String(), pointerList) && !inFlight[p.Addrs[0].String()] { + pointerList = append(pointerList, p.Addrs[0].String()) inFlight[p.Addrs[0].String()] = true log.Debugf("Found pointer with location %s", p.Addrs[0].String()) // IPFS From 0fd53d2d6d562681f8cb34fc0308e0ff201183a7 Mon Sep 17 00:00:00 2001 From: Brian Hoffman Date: Fri, 3 Apr 2020 14:30:49 -0400 Subject: [PATCH 4/6] Increase logging and wait for DHT to bootstrap --- ipfs/pointers.go | 1 + net/retriever/retriever.go | 5 ++++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/ipfs/pointers.go b/ipfs/pointers.go index c84284d88d..f79afcf5bc 100644 --- a/ipfs/pointers.go +++ b/ipfs/pointers.go @@ -89,6 +89,7 @@ func PutPointerToPeer(dht *routing.IpfsDHT, ctx context.Context, peer peer.ID, p func GetPointersFromPeer(dht *routing.IpfsDHT, ctx context.Context, p peer.ID, key *cid.Cid) ([]*ps.PeerInfo, error) { pmes := dhtpb.NewMessage(dhtpb.Message_GET_PROVIDERS, key.Bytes(), 0) + log.Debugf("Fetching pointers from: %v\n", p.Pretty()) resp, err := dht.SendRequest(ctx, p, pmes) if err != nil { return []*ps.PeerInfo{}, err diff --git a/net/retriever/retriever.go b/net/retriever/retriever.go index cbe765eab9..2a341e1446 100644 --- a/net/retriever/retriever.go +++ b/net/retriever/retriever.go @@ -234,12 +234,15 @@ func (m *MessageRetriever) getPointersFromDataPeersRoutine(peerOut chan ps.PeerI wg.Add(1) go func(pid peer.ID) { defer wg.Done() - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*35) defer cancel() + time.Sleep(time.Second*15) provs, err := ipfs.GetPointersFromPeer(m.routing, ctx, pid, &k) if err != nil { + log.Errorf("Could not get pointers from push node because: %v", err) return } + log.Debugf("Successfully queried %s for pointers", pid.Pretty()) for _, pi := range provs { peerOut <- *pi } From 11843cb111a97ffdcbcf60f5a7bc051c6db9af12 Mon Sep 17 00:00:00 2001 From: Dr Washington Sanchez Date: Sun, 5 Apr 2020 21:07:26 +1000 Subject: [PATCH 5/6] Move debug line --- net/retriever/retriever.go | 1 + 1 file changed, 1 insertion(+) diff --git a/net/retriever/retriever.go b/net/retriever/retriever.go index 2a341e1446..c97bae8a7e 100644 --- a/net/retriever/retriever.go +++ b/net/retriever/retriever.go @@ -179,6 +179,7 @@ func (m *MessageRetriever) downloadMessages(peerOut chan ps.PeerInfo) { for p := range peerOut { if len(p.Addrs) > 0 && !m.db.OfflineMessages().Has(p.Addrs[0].String()) && !stringInSlice(p.Addrs[0].String(), pointerList) && !inFlight[p.Addrs[0].String()] { pointerList = append(pointerList, p.Addrs[0].String()) + log.Debugf("Looking for pointer [%v] at %v\n", p.ID.Pretty(), p.Addrs) inFlight[p.Addrs[0].String()] = true log.Debugf("Found pointer with location %s", p.Addrs[0].String()) // IPFS From 431911aacfecf41f043c223e085b5c4d95a6cde6 Mon Sep 17 00:00:00 2001 From: Dr Washington Sanchez Date: Sun, 5 Apr 2020 21:44:13 +1000 Subject: [PATCH 6/6] tweak: Extend ticker time for push nodes --- net/retriever/retriever.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/net/retriever/retriever.go b/net/retriever/retriever.go index c97bae8a7e..c4c69dce3e 100644 --- a/net/retriever/retriever.go +++ b/net/retriever/retriever.go @@ -237,7 +237,7 @@ func (m *MessageRetriever) getPointersFromDataPeersRoutine(peerOut chan ps.PeerI defer wg.Done() ctx, cancel := context.WithTimeout(context.Background(), time.Second*35) defer cancel() - time.Sleep(time.Second*15) + time.Sleep(time.Second * 15) provs, err := ipfs.GetPointersFromPeer(m.routing, ctx, pid, &k) if err != nil { log.Errorf("Could not get pointers from push node because: %v", err)