From 7ceb9deea1554fd152e4db3bb15a2184c8ee4b09 Mon Sep 17 00:00:00 2001 From: nolash Date: Tue, 17 Dec 2019 18:17:25 +0100 Subject: [PATCH 01/12] forward: Add SessionContext skeleton --- network/forward/forward.go | 1 + network/forward/forward_test.go | 32 ++++++++++++++++++++++++++++++++ network/forward/types.go | 29 +++++++++++++++++++++++++++++ 3 files changed, 62 insertions(+) create mode 100644 network/forward/forward.go create mode 100644 network/forward/forward_test.go create mode 100644 network/forward/types.go diff --git a/network/forward/forward.go b/network/forward/forward.go new file mode 100644 index 0000000000..281a83ed0b --- /dev/null +++ b/network/forward/forward.go @@ -0,0 +1 @@ +package forward diff --git a/network/forward/forward_test.go b/network/forward/forward_test.go new file mode 100644 index 0000000000..3f073c852b --- /dev/null +++ b/network/forward/forward_test.go @@ -0,0 +1,32 @@ +package forward + +import ( + "testing" + + "github.com/ethersphere/swarm/network" + "github.com/ethersphere/swarm/network/capability" + "github.com/ethersphere/swarm/pot" +) + +func TestGet(t *testing.T) { + addr := make([]byte, 32) + kadParams := network.NewKadParams() + kad := network.NewKademlia(addr, kadParams) + cp := capability.NewCapability(4, 2) + kad.RegisterCapabilityIndex("foo", *cp) + sctx := NewSessionContext("foo") + fwd := New(sctx) + + bytesFar := pot.NewAddressFromString("10000000") + bytesNear := pot.NewAddressFromString("00000001") + addrFar := network.NewBzzAddr(bytesFar, []byte{}) + addrNear := network.NewBzzAddr(bytesNear, []byte{}) + addrFar.Capabilities.Add(cp) + addrNear.Capabilities.Add(cp) + peerFar := network.NewPeer(&network.BzzPeer{BzzAddr: addrFar}, kad) + peerNear := network.NewPeer(&network.BzzPeer{BzzAddr: addrNear}, kad) + kad.Register(addrFar) + kad.Register(addrNear) + kad.On(peerFar) + kad.On(peerNear) +} diff --git a/network/forward/types.go b/network/forward/types.go new file mode 100644 index 0000000000..b2013c8e59 --- /dev/null +++ b/network/forward/types.go @@ -0,0 +1,29 @@ +package forward + +var ( + sessionId = 0 +) + +type ForwardPeer struct { +} + +type SessionInterface interface { + Subscribe() <-chan ForwardPeer + Get(numberOfPeers int) ([]ForwardPeer, error) + Close() +} + +// also implements context.Context +type SessionContext struct { + CapabilityIndex string + SessionId int +} + +func NewSessionContext(cpidx string) SessionContext { + sctx := SessionContext{ + CapabilityIndex: cpidx, + SessionId: sessionId, + } + sessionId++ + return sctx +} From 538af94e1cadd18ae5a2828a02549ffbc7e75109 Mon Sep 17 00:00:00 2001 From: nolash Date: Tue, 17 Dec 2019 19:21:08 +0100 Subject: [PATCH 02/12] forward: Add Session constructor and pivot test --- network/forward/forward.go | 34 ++++++++++++++++ network/forward/forward_test.go | 71 +++++++++++++++++++++++++-------- network/forward/types.go | 57 +++++++++++++++++++++++--- 3 files changed, 140 insertions(+), 22 deletions(-) diff --git a/network/forward/forward.go b/network/forward/forward.go index 281a83ed0b..86ca37aaf0 100644 --- a/network/forward/forward.go +++ b/network/forward/forward.go @@ -1 +1,35 @@ package forward + +import ( + "context" + + "github.com/ethersphere/swarm/log" + "github.com/ethersphere/swarm/network" +) + +type Session struct { + sessionContext context.Context + kademlia *network.Kademlia + pivot []byte +} + +func New(sctx context.Context, kad *network.Kademlia) *Session { + s := &Session{ + sessionContext: sctx, + kademlia: kad, + } + addr := sctx.Value("address") + log.Trace("addr", "addr", addr) + if addr == nil { + s.pivot = kad.BaseAddr() + } else { + s.pivot = addr.([]byte) + } + return s +} + +func (s *Session) Get(numPeers int) ([]ForwardPeer, error) { + var result []ForwardPeer + + return result, nil +} diff --git a/network/forward/forward_test.go b/network/forward/forward_test.go index 3f073c852b..e2433e6e16 100644 --- a/network/forward/forward_test.go +++ b/network/forward/forward_test.go @@ -1,32 +1,69 @@ package forward import ( + "bytes" "testing" "github.com/ethersphere/swarm/network" - "github.com/ethersphere/swarm/network/capability" "github.com/ethersphere/swarm/pot" + "github.com/ethersphere/swarm/testutil" ) -func TestGet(t *testing.T) { +func init() { + testutil.Init() +} + +func TestNew(t *testing.T) { + addr := make([]byte, 32) + addr[31] = 0x01 kadParams := network.NewKadParams() kad := network.NewKademlia(addr, kadParams) - cp := capability.NewCapability(4, 2) - kad.RegisterCapabilityIndex("foo", *cp) - sctx := NewSessionContext("foo") - fwd := New(sctx) - bytesFar := pot.NewAddressFromString("10000000") + sctx := NewSessionContext() + fwdBase := New(sctx, kad) + if !bytes.Equal(fwdBase.pivot, addr) { + t.Fatalf("pivot base; expected %x, got %x", addr, fwdBase.pivot) + } + bytesNear := pot.NewAddressFromString("00000001") - addrFar := network.NewBzzAddr(bytesFar, []byte{}) - addrNear := network.NewBzzAddr(bytesNear, []byte{}) - addrFar.Capabilities.Add(cp) - addrNear.Capabilities.Add(cp) - peerFar := network.NewPeer(&network.BzzPeer{BzzAddr: addrFar}, kad) - peerNear := network.NewPeer(&network.BzzPeer{BzzAddr: addrNear}, kad) - kad.Register(addrFar) - kad.Register(addrNear) - kad.On(peerFar) - kad.On(peerNear) + sctx.SetAddress(bytesNear) + fwdExplicit := New(sctx, kad) + if !bytes.Equal(fwdExplicit.pivot, bytesNear) { + t.Fatalf("pivot explicit; expected %x, got %x", bytesNear, fwdExplicit.pivot) + } } + +//func TestGet() { +//addr := make([]byte, 32) +// kadParams := network.NewKadParams() +// kad := network.NewKademlia(addr, kadParams) +// cp := capability.NewCapability(4, 2) +// kad.RegisterCapabilityIndex("foo", *cp) +// +// bytesFar := pot.NewAddressFromString("10000000") +// bytesNear := pot.NewAddressFromString("00000001") +// addrFar := network.NewBzzAddr(bytesFar, []byte{}) +// addrNear := network.NewBzzAddr(bytesNear, []byte{}) +// addrFar.Capabilities.Add(cp) +// addrNear.Capabilities.Add(cp) +// peerFar := network.NewPeer(&network.BzzPeer{BzzAddr: addrFar}, kad) +// peerNear := network.NewPeer(&network.BzzPeer{BzzAddr: addrNear}, kad) +// kad.Register(addrFar) +// kad.Register(addrNear) +// kad.On(peerFar) +// kad.On(peerNear) +// +// +//resultNear, err := fwdBase.Get(1) +// if err != nil { +// t.Fatal(err) +// } +// if len(resultNear) != 1 { +// t.Fatalf("peer missing, expected %d, got %d", 1, len(resultNear)) +// } +// if !bytes.Equal(resultNear[0].Address(), addrNear.Address()) { +// t.Fatalf("peer mismatch, expected %x, got %x", addrNear.Address(), resultNear[0].Address()) +// } + +//} diff --git a/network/forward/types.go b/network/forward/types.go index b2013c8e59..ea17cd5b02 100644 --- a/network/forward/types.go +++ b/network/forward/types.go @@ -1,10 +1,18 @@ package forward +import ( + "time" + + "github.com/ethersphere/swarm/network" +) + var ( sessionId = 0 + zeroTime = time.Unix(0, 0) ) type ForwardPeer struct { + *network.BzzPeer } type SessionInterface interface { @@ -17,13 +25,52 @@ type SessionInterface interface { type SessionContext struct { CapabilityIndex string SessionId int + Address []byte } -func NewSessionContext(cpidx string) SessionContext { - sctx := SessionContext{ - CapabilityIndex: cpidx, - SessionId: sessionId, - } +func NewSessionContext() *SessionContext { + sctx := newSessionContext("", sessionId, nil) sessionId++ return sctx } + +func newSessionContext(capabilityIndex string, sessionId int, addr []byte) *SessionContext { + return &SessionContext{ + CapabilityIndex: capabilityIndex, + SessionId: sessionId, + Address: addr, + } +} + +func (c *SessionContext) Deadline() (time.Time, bool) { + return zeroTime, false +} + +func (c *SessionContext) Done() <-chan struct{} { + return nil +} + +func (c *SessionContext) Err() error { + return nil +} + +func (c *SessionContext) Value(k interface{}) interface{} { + ks, ok := k.(string) + if !ok { + return nil + } + switch ks { + case "address": + if c.Address == nil { + return nil + } + return c.Address + case "id": + return c.SessionId + } + return nil +} + +func (c *SessionContext) SetAddress(addr []byte) { + c.Address = addr +} From 667603ce960e748f70a55e1f57e70e2ec9a8e57b Mon Sep 17 00:00:00 2001 From: nolash Date: Tue, 17 Dec 2019 19:35:24 +0100 Subject: [PATCH 03/12] network: Create Session from context --- network/forward/forward.go | 25 +++++++++++++++---------- network/forward/forward_test.go | 18 ++++++++++++++++-- network/forward/types.go | 9 +++++++++ 3 files changed, 40 insertions(+), 12 deletions(-) diff --git a/network/forward/forward.go b/network/forward/forward.go index 86ca37aaf0..2ea2577fe5 100644 --- a/network/forward/forward.go +++ b/network/forward/forward.go @@ -1,30 +1,35 @@ package forward import ( - "context" - - "github.com/ethersphere/swarm/log" "github.com/ethersphere/swarm/network" ) type Session struct { - sessionContext context.Context - kademlia *network.Kademlia - pivot []byte + kademlia *network.Kademlia + pivot []byte + id int + capabilityIndex string } -func New(sctx context.Context, kad *network.Kademlia) *Session { +func NewFromContext(sctx *SessionContext, kad *network.Kademlia) *Session { s := &Session{ - sessionContext: sctx, - kademlia: kad, + kademlia: kad, } + + s.id = sctx.Value("id").(int) + addr := sctx.Value("address") - log.Trace("addr", "addr", addr) if addr == nil { s.pivot = kad.BaseAddr() } else { s.pivot = addr.([]byte) } + + capabilityIndex := sctx.Value("capability") + if capabilityIndex != nil { + s.capabilityIndex = capabilityIndex.(string) + } + return s } diff --git a/network/forward/forward_test.go b/network/forward/forward_test.go index e2433e6e16..6d362364e3 100644 --- a/network/forward/forward_test.go +++ b/network/forward/forward_test.go @@ -20,18 +20,32 @@ func TestNew(t *testing.T) { kadParams := network.NewKadParams() kad := network.NewKademlia(addr, kadParams) + sessionId = 42 sctx := NewSessionContext() - fwdBase := New(sctx, kad) + fwdBase := NewFromContext(sctx, kad) if !bytes.Equal(fwdBase.pivot, addr) { t.Fatalf("pivot base; expected %x, got %x", addr, fwdBase.pivot) } + if fwdBase.id != 42 { + t.Fatalf("sessionId; expected %d, got %d", 42, fwdBase.id) + } bytesNear := pot.NewAddressFromString("00000001") + capabilityIndex := "foo" + sctx = NewSessionContext() + sctx.SetCapability(capabilityIndex) sctx.SetAddress(bytesNear) - fwdExplicit := New(sctx, kad) + fwdExplicit := NewFromContext(sctx, kad) if !bytes.Equal(fwdExplicit.pivot, bytesNear) { t.Fatalf("pivot explicit; expected %x, got %x", bytesNear, fwdExplicit.pivot) } + + if sctx.CapabilityIndex != capabilityIndex { + t.Fatalf("capability; expected %s, got %s", capabilityIndex, fwdExplicit.capabilityIndex) + } + if fwdExplicit.id != 43 { + t.Fatalf("sessionId; expected %d, got %d", 43, fwdExplicit.id) + } } //func TestGet() { diff --git a/network/forward/types.go b/network/forward/types.go index ea17cd5b02..04aa734c52 100644 --- a/network/forward/types.go +++ b/network/forward/types.go @@ -65,6 +65,11 @@ func (c *SessionContext) Value(k interface{}) interface{} { return nil } return c.Address + case "capability": + if c.CapabilityIndex == "" { + return nil + } + return c.CapabilityIndex case "id": return c.SessionId } @@ -74,3 +79,7 @@ func (c *SessionContext) Value(k interface{}) interface{} { func (c *SessionContext) SetAddress(addr []byte) { c.Address = addr } + +func (c *SessionContext) SetCapability(capabilityIndex string) { + c.CapabilityIndex = capabilityIndex +} From 694c36f95028f01522483c9310faaefb1f76b5ae Mon Sep 17 00:00:00 2001 From: nolash Date: Tue, 17 Dec 2019 19:46:17 +0100 Subject: [PATCH 04/12] forward: Make explicit constructor args instead of context --- network/forward/forward.go | 48 +++++++++++++++++++++++---------- network/forward/forward_test.go | 15 ++++------- network/forward/types.go | 4 +-- 3 files changed, 40 insertions(+), 27 deletions(-) diff --git a/network/forward/forward.go b/network/forward/forward.go index 2ea2577fe5..b7ff0d9fa2 100644 --- a/network/forward/forward.go +++ b/network/forward/forward.go @@ -4,6 +4,11 @@ import ( "github.com/ethersphere/swarm/network" ) +var ( + sessionId = 0 + sessions []*Session +) + type Session struct { kademlia *network.Kademlia pivot []byte @@ -11,28 +16,43 @@ type Session struct { capabilityIndex string } -func NewFromContext(sctx *SessionContext, kad *network.Kademlia) *Session { +func New(kad *network.Kademlia, capabilityIndex string, pivot []byte) *Session { s := &Session{ - kademlia: kad, + kademlia: kad, + id: sessionId, + capabilityIndex: capabilityIndex, } - - s.id = sctx.Value("id").(int) - - addr := sctx.Value("address") - if addr == nil { + if pivot == nil { s.pivot = kad.BaseAddr() } else { - s.pivot = addr.([]byte) + s.pivot = pivot } - - capabilityIndex := sctx.Value("capability") - if capabilityIndex != nil { - s.capabilityIndex = capabilityIndex.(string) - } - + sessionId++ return s } +//func NewFromContext(sctx *SessionContext, kad *network.Kademlia) *Session { +// s := &Session{ +// kademlia: kad, +// } +// +// s.id = sctx.Value("id").(int) +// +// addr := sctx.Value("address") +// if addr == nil { +// s.pivot = kad.BaseAddr() +// } else { +// s.pivot = addr.([]byte) +// } +// +// capabilityIndex := sctx.Value("capability") +// if capabilityIndex != nil { +// s.capabilityIndex = capabilityIndex.(string) +// } +// +// return s +//} + func (s *Session) Get(numPeers int) ([]ForwardPeer, error) { var result []ForwardPeer diff --git a/network/forward/forward_test.go b/network/forward/forward_test.go index 6d362364e3..93308f92f7 100644 --- a/network/forward/forward_test.go +++ b/network/forward/forward_test.go @@ -21,8 +21,7 @@ func TestNew(t *testing.T) { kad := network.NewKademlia(addr, kadParams) sessionId = 42 - sctx := NewSessionContext() - fwdBase := NewFromContext(sctx, kad) + fwdBase := New(kad, "", nil) if !bytes.Equal(fwdBase.pivot, addr) { t.Fatalf("pivot base; expected %x, got %x", addr, fwdBase.pivot) } @@ -32,20 +31,16 @@ func TestNew(t *testing.T) { bytesNear := pot.NewAddressFromString("00000001") capabilityIndex := "foo" - sctx = NewSessionContext() - sctx.SetCapability(capabilityIndex) - sctx.SetAddress(bytesNear) - fwdExplicit := NewFromContext(sctx, kad) + fwdExplicit := New(kad, capabilityIndex, bytesNear) if !bytes.Equal(fwdExplicit.pivot, bytesNear) { t.Fatalf("pivot explicit; expected %x, got %x", bytesNear, fwdExplicit.pivot) } - - if sctx.CapabilityIndex != capabilityIndex { - t.Fatalf("capability; expected %s, got %s", capabilityIndex, fwdExplicit.capabilityIndex) - } if fwdExplicit.id != 43 { t.Fatalf("sessionId; expected %d, got %d", 43, fwdExplicit.id) } + if fwdExplicit.capabilityIndex != capabilityIndex { + t.Fatalf("capabilityindex, expected %s, got %s", capabilityIndex, fwdExplicit.capabilityIndex) + } } //func TestGet() { diff --git a/network/forward/types.go b/network/forward/types.go index 04aa734c52..324a701552 100644 --- a/network/forward/types.go +++ b/network/forward/types.go @@ -7,8 +7,7 @@ import ( ) var ( - sessionId = 0 - zeroTime = time.Unix(0, 0) + zeroTime = time.Unix(0, 0) ) type ForwardPeer struct { @@ -30,7 +29,6 @@ type SessionContext struct { func NewSessionContext() *SessionContext { sctx := newSessionContext("", sessionId, nil) - sessionId++ return sctx } From b25cdaab4f90d4d36d153632860e78f6ad662857 Mon Sep 17 00:00:00 2001 From: nolash Date: Tue, 17 Dec 2019 19:52:57 +0100 Subject: [PATCH 05/12] forward: Introduce SessionManager --- network/forward/forward.go | 30 +++++++++++++++++++++--------- network/forward/forward_test.go | 13 ++++++++----- network/forward/types.go | 5 ----- 3 files changed, 29 insertions(+), 19 deletions(-) diff --git a/network/forward/forward.go b/network/forward/forward.go index b7ff0d9fa2..bbfad6e887 100644 --- a/network/forward/forward.go +++ b/network/forward/forward.go @@ -1,12 +1,9 @@ package forward import ( - "github.com/ethersphere/swarm/network" -) + "sync" -var ( - sessionId = 0 - sessions []*Session + "github.com/ethersphere/swarm/network" ) type Session struct { @@ -16,10 +13,26 @@ type Session struct { capabilityIndex string } -func New(kad *network.Kademlia, capabilityIndex string, pivot []byte) *Session { +type SessionManager struct { + sessions []*Session + mu sync.Mutex +} + +func NewSessionManager() *SessionManager { + return &SessionManager{} +} + +func (m *SessionManager) add(s *Session) *Session { + m.mu.Lock() + defer m.mu.Unlock() + s.id = len(m.sessions) + m.sessions = append(m.sessions, s) + return s +} + +func (m *SessionManager) New(kad *network.Kademlia, capabilityIndex string, pivot []byte) *Session { s := &Session{ kademlia: kad, - id: sessionId, capabilityIndex: capabilityIndex, } if pivot == nil { @@ -27,8 +40,7 @@ func New(kad *network.Kademlia, capabilityIndex string, pivot []byte) *Session { } else { s.pivot = pivot } - sessionId++ - return s + return m.add(s) } //func NewFromContext(sctx *SessionContext, kad *network.Kademlia) *Session { diff --git a/network/forward/forward_test.go b/network/forward/forward_test.go index 93308f92f7..4f4f26c1a7 100644 --- a/network/forward/forward_test.go +++ b/network/forward/forward_test.go @@ -20,27 +20,30 @@ func TestNew(t *testing.T) { kadParams := network.NewKadParams() kad := network.NewKademlia(addr, kadParams) - sessionId = 42 - fwdBase := New(kad, "", nil) + mgr := NewSessionManager() + fwdBase := mgr.New(kad, "", nil) if !bytes.Equal(fwdBase.pivot, addr) { t.Fatalf("pivot base; expected %x, got %x", addr, fwdBase.pivot) } - if fwdBase.id != 42 { + if fwdBase.id != 0 { t.Fatalf("sessionId; expected %d, got %d", 42, fwdBase.id) } bytesNear := pot.NewAddressFromString("00000001") capabilityIndex := "foo" - fwdExplicit := New(kad, capabilityIndex, bytesNear) + fwdExplicit := mgr.New(kad, capabilityIndex, bytesNear) if !bytes.Equal(fwdExplicit.pivot, bytesNear) { t.Fatalf("pivot explicit; expected %x, got %x", bytesNear, fwdExplicit.pivot) } - if fwdExplicit.id != 43 { + if fwdExplicit.id != 1 { t.Fatalf("sessionId; expected %d, got %d", 43, fwdExplicit.id) } if fwdExplicit.capabilityIndex != capabilityIndex { t.Fatalf("capabilityindex, expected %s, got %s", capabilityIndex, fwdExplicit.capabilityIndex) } + if len(mgr.sessions) != 2 { + t.Fatalf("sessions array; expected %d, got %d", 2, len(mgr.sessions)) + } } //func TestGet() { diff --git a/network/forward/types.go b/network/forward/types.go index 324a701552..2c8546441c 100644 --- a/network/forward/types.go +++ b/network/forward/types.go @@ -27,11 +27,6 @@ type SessionContext struct { Address []byte } -func NewSessionContext() *SessionContext { - sctx := newSessionContext("", sessionId, nil) - return sctx -} - func newSessionContext(capabilityIndex string, sessionId int, addr []byte) *SessionContext { return &SessionContext{ CapabilityIndex: capabilityIndex, From fa8435228122cf420ab2193af3ae2300b4d3b993 Mon Sep 17 00:00:00 2001 From: nolash Date: Tue, 17 Dec 2019 20:07:32 +0100 Subject: [PATCH 06/12] forward: Add create and retrieve by context in manager --- network/forward/forward.go | 59 +++++++++++++++++++++------------ network/forward/forward_test.go | 37 +++++++++++++++++++++ network/forward/types.go | 2 +- 3 files changed, 76 insertions(+), 22 deletions(-) diff --git a/network/forward/forward.go b/network/forward/forward.go index bbfad6e887..3907fbabfd 100644 --- a/network/forward/forward.go +++ b/network/forward/forward.go @@ -1,6 +1,7 @@ package forward import ( + "fmt" "sync" "github.com/ethersphere/swarm/network" @@ -43,27 +44,43 @@ func (m *SessionManager) New(kad *network.Kademlia, capabilityIndex string, pivo return m.add(s) } -//func NewFromContext(sctx *SessionContext, kad *network.Kademlia) *Session { -// s := &Session{ -// kademlia: kad, -// } -// -// s.id = sctx.Value("id").(int) -// -// addr := sctx.Value("address") -// if addr == nil { -// s.pivot = kad.BaseAddr() -// } else { -// s.pivot = addr.([]byte) -// } -// -// capabilityIndex := sctx.Value("capability") -// if capabilityIndex != nil { -// s.capabilityIndex = capabilityIndex.(string) -// } -// -// return s -//} +func (m *SessionManager) ToContext(id int) (*SessionContext, error) { + if id >= len(m.sessions) { + return nil, fmt.Errorf("No such session %d (max %d)", id, len(m.sessions)) + } + s := m.sessions[id] + return &SessionContext{ + CapabilityIndex: s.capabilityIndex, + SessionId: s.id, + Address: s.pivot, + }, nil +} + +func (m *SessionManager) FromContext(sctx *SessionContext) (*Session, error) { + + sessionId := sctx.Value("id") + if sessionId != nil { + id := sessionId.(int) + if id < len(m.sessions) { + return m.sessions[id], nil + } + } + return nil, nil + // + // addr := sctx.Value("address") + // if addr == nil { + // s.pivot = kad.BaseAddr() + // } else { + // s.pivot = addr.([]byte) + // } + // + // capabilityIndex := sctx.Value("capability") + // if capabilityIndex != nil { + // s.capabilityIndex = capabilityIndex.(string) + // } + // + // return s +} func (s *Session) Get(numPeers int) ([]ForwardPeer, error) { var result []ForwardPeer diff --git a/network/forward/forward_test.go b/network/forward/forward_test.go index 4f4f26c1a7..242a669b06 100644 --- a/network/forward/forward_test.go +++ b/network/forward/forward_test.go @@ -46,6 +46,43 @@ func TestNew(t *testing.T) { } } +func TestManagerContext(t *testing.T) { + addr := make([]byte, 32) + addr[31] = 0x01 + kadParams := network.NewKadParams() + kad := network.NewKademlia(addr, kadParams) + + mgr := NewSessionManager() + _ = mgr.New(kad, "", nil) + fwdOne := mgr.New(kad, "", nil) + sctx := NewSessionContext(fwdOne.id, "", nil) + fwdRetrieved, err := mgr.FromContext(sctx) + if err != nil { + t.Fatal(err) + } + if fwdRetrieved != fwdOne { + t.Fatalf("fromcontext; expected %p, got %p", fwdOne, fwdRetrieved) + } + + newAddr := make([]byte, 32) + newAddr[31] = 0x02 + fwdTwo := mgr.New(kad, "foo", newAddr) + sctx, err = mgr.ToContext(2) + if err != nil { + t.Fatal(err) + } + if fwdTwo.id != sctx.SessionId { + t.Fatalf("to context id; expected %d, got %d", fwdTwo.id, sctx.SessionId) + } + if fwdTwo.capabilityIndex != sctx.CapabilityIndex { + t.Fatalf("to context id; expected %s, got %s", fwdTwo.capabilityIndex, sctx.CapabilityIndex) + } + if !bytes.Equal(fwdTwo.pivot, sctx.Address) { + t.Fatalf("to context id; expected %x, got %x", fwdTwo.pivot, sctx.Address) + } + +} + //func TestGet() { //addr := make([]byte, 32) // kadParams := network.NewKadParams() diff --git a/network/forward/types.go b/network/forward/types.go index 2c8546441c..acdf5c4aeb 100644 --- a/network/forward/types.go +++ b/network/forward/types.go @@ -27,7 +27,7 @@ type SessionContext struct { Address []byte } -func newSessionContext(capabilityIndex string, sessionId int, addr []byte) *SessionContext { +func NewSessionContext(sessionId int, capabilityIndex string, addr []byte) *SessionContext { return &SessionContext{ CapabilityIndex: capabilityIndex, SessionId: sessionId, From 806ec836df33c0e7cbb94c56f33c89ddbeb250bb Mon Sep 17 00:00:00 2001 From: nolash Date: Tue, 17 Dec 2019 22:33:58 +0100 Subject: [PATCH 07/12] forward: Add missing files --- network/forward/forward.go | 58 +++++++++++++++------------------ network/forward/forward_test.go | 40 +++++++++++++++-------- network/forward/types.go | 3 +- 3 files changed, 54 insertions(+), 47 deletions(-) diff --git a/network/forward/forward.go b/network/forward/forward.go index 3907fbabfd..5bf69458cd 100644 --- a/network/forward/forward.go +++ b/network/forward/forward.go @@ -4,6 +4,7 @@ import ( "fmt" "sync" + "github.com/ethersphere/swarm/log" "github.com/ethersphere/swarm/network" ) @@ -15,29 +16,34 @@ type Session struct { } type SessionManager struct { - sessions []*Session + sessions map[int]*Session + kademlia *network.Kademlia + lastId int // starts at 1 to make create from context easier mu sync.Mutex } -func NewSessionManager() *SessionManager { - return &SessionManager{} +func NewSessionManager(kademlia *network.Kademlia) *SessionManager { + return &SessionManager{ + sessions: make(map[int]*Session), + kademlia: kademlia, + } } func (m *SessionManager) add(s *Session) *Session { m.mu.Lock() defer m.mu.Unlock() - s.id = len(m.sessions) - m.sessions = append(m.sessions, s) + m.lastId++ + log.Trace("adding session", "id", m.lastId) + m.sessions[m.lastId] = s return s } -func (m *SessionManager) New(kad *network.Kademlia, capabilityIndex string, pivot []byte) *Session { +func (m *SessionManager) New(capabilityIndex string, pivot []byte) *Session { s := &Session{ - kademlia: kad, capabilityIndex: capabilityIndex, } if pivot == nil { - s.pivot = kad.BaseAddr() + s.pivot = m.kademlia.BaseAddr() } else { s.pivot = pivot } @@ -45,10 +51,10 @@ func (m *SessionManager) New(kad *network.Kademlia, capabilityIndex string, pivo } func (m *SessionManager) ToContext(id int) (*SessionContext, error) { - if id >= len(m.sessions) { - return nil, fmt.Errorf("No such session %d (max %d)", id, len(m.sessions)) + s, ok := m.sessions[id] + if !ok { + return nil, fmt.Errorf("No such session %d", id) } - s := m.sessions[id] return &SessionContext{ CapabilityIndex: s.capabilityIndex, SessionId: s.id, @@ -58,28 +64,18 @@ func (m *SessionManager) ToContext(id int) (*SessionContext, error) { func (m *SessionManager) FromContext(sctx *SessionContext) (*Session, error) { - sessionId := sctx.Value("id") - if sessionId != nil { - id := sessionId.(int) - if id < len(m.sessions) { - return m.sessions[id], nil + sessionId, ok := sctx.Value("id").(int) + if ok { + s, ok := m.sessions[sessionId] + if !ok { + return nil, fmt.Errorf("No such session %d", sessionId) } + return s, nil } - return nil, nil - // - // addr := sctx.Value("address") - // if addr == nil { - // s.pivot = kad.BaseAddr() - // } else { - // s.pivot = addr.([]byte) - // } - // - // capabilityIndex := sctx.Value("capability") - // if capabilityIndex != nil { - // s.capabilityIndex = capabilityIndex.(string) - // } - // - // return s + + addr, _ := sctx.Value("address").([]byte) + capabilityIndex, _ := sctx.Value("capability").(string) + return m.New(capabilityIndex, addr), nil } func (s *Session) Get(numPeers int) ([]ForwardPeer, error) { diff --git a/network/forward/forward_test.go b/network/forward/forward_test.go index 242a669b06..65ffe6df74 100644 --- a/network/forward/forward_test.go +++ b/network/forward/forward_test.go @@ -20,8 +20,8 @@ func TestNew(t *testing.T) { kadParams := network.NewKadParams() kad := network.NewKademlia(addr, kadParams) - mgr := NewSessionManager() - fwdBase := mgr.New(kad, "", nil) + mgr := NewSessionManager(kad) + fwdBase := mgr.New("", nil) if !bytes.Equal(fwdBase.pivot, addr) { t.Fatalf("pivot base; expected %x, got %x", addr, fwdBase.pivot) } @@ -31,7 +31,7 @@ func TestNew(t *testing.T) { bytesNear := pot.NewAddressFromString("00000001") capabilityIndex := "foo" - fwdExplicit := mgr.New(kad, capabilityIndex, bytesNear) + fwdExplicit := mgr.New(capabilityIndex, bytesNear) if !bytes.Equal(fwdExplicit.pivot, bytesNear) { t.Fatalf("pivot explicit; expected %x, got %x", bytesNear, fwdExplicit.pivot) } @@ -52,22 +52,20 @@ func TestManagerContext(t *testing.T) { kadParams := network.NewKadParams() kad := network.NewKademlia(addr, kadParams) - mgr := NewSessionManager() - _ = mgr.New(kad, "", nil) - fwdOne := mgr.New(kad, "", nil) - sctx := NewSessionContext(fwdOne.id, "", nil) - fwdRetrieved, err := mgr.FromContext(sctx) - if err != nil { - t.Fatal(err) + mgr := NewSessionManager(kad) + _ = mgr.New("", nil) // id 1 + fwdOne := mgr.New("", nil) // id 2 + if len(mgr.sessions) != 2 { + t.Fatalf("mgr session length; expected 2, got %d", len(mgr.sessions)) } - if fwdRetrieved != fwdOne { - t.Fatalf("fromcontext; expected %p, got %p", fwdOne, fwdRetrieved) + if mgr.sessions[2] != fwdOne { + t.Fatalf("fromcontext; expected %p, got %p", fwdOne, mgr.sessions[2]) } newAddr := make([]byte, 32) newAddr[31] = 0x02 - fwdTwo := mgr.New(kad, "foo", newAddr) - sctx, err = mgr.ToContext(2) + fwdTwo := mgr.New("foo", newAddr) // id 3 + sctx, err := mgr.ToContext(3) if err != nil { t.Fatal(err) } @@ -81,6 +79,20 @@ func TestManagerContext(t *testing.T) { t.Fatalf("to context id; expected %x, got %x", fwdTwo.pivot, sctx.Address) } + sctx = NewSessionContext("bar", newAddr) + fwdThree, err := mgr.FromContext(sctx) + if err != nil { + t.Fatal(err) + } + if fwdThree.id != 3 { + t.Fatalf("from new context id; expected %d, got %d", 3, fwdThree.id) + } + if fwdThree.capabilityIndex != sctx.CapabilityIndex { + t.Fatalf("to context id; expected %s, got %s", fwdThree.capabilityIndex, sctx.CapabilityIndex) + } + if !bytes.Equal(fwdThree.pivot, sctx.Address) { + t.Fatalf("to context id; expected %x, got %x", fwdThree.pivot, sctx.Address) + } } //func TestGet() { diff --git a/network/forward/types.go b/network/forward/types.go index acdf5c4aeb..aef8259444 100644 --- a/network/forward/types.go +++ b/network/forward/types.go @@ -27,10 +27,9 @@ type SessionContext struct { Address []byte } -func NewSessionContext(sessionId int, capabilityIndex string, addr []byte) *SessionContext { +func NewSessionContext(capabilityIndex string, addr []byte) *SessionContext { return &SessionContext{ CapabilityIndex: capabilityIndex, - SessionId: sessionId, Address: addr, } } From 27152c134c117c6477c90569ef344fad9f1ab306 Mon Sep 17 00:00:00 2001 From: nolash Date: Wed, 18 Dec 2019 10:21:56 +0100 Subject: [PATCH 08/12] forward: Implement Get call (without state) --- network/forward/forward.go | 13 ++++- network/forward/forward_test.go | 97 ++++++++++++++++++--------------- network/forward/types.go | 2 +- 3 files changed, 65 insertions(+), 47 deletions(-) diff --git a/network/forward/forward.go b/network/forward/forward.go index 5bf69458cd..db2b92c285 100644 --- a/network/forward/forward.go +++ b/network/forward/forward.go @@ -34,6 +34,7 @@ func (m *SessionManager) add(s *Session) *Session { defer m.mu.Unlock() m.lastId++ log.Trace("adding session", "id", m.lastId) + s.id = m.lastId m.sessions[m.lastId] = s return s } @@ -41,6 +42,7 @@ func (m *SessionManager) add(s *Session) *Session { func (m *SessionManager) New(capabilityIndex string, pivot []byte) *Session { s := &Session{ capabilityIndex: capabilityIndex, + kademlia: m.kademlia, } if pivot == nil { s.pivot = m.kademlia.BaseAddr() @@ -81,5 +83,14 @@ func (m *SessionManager) FromContext(sctx *SessionContext) (*Session, error) { func (s *Session) Get(numPeers int) ([]ForwardPeer, error) { var result []ForwardPeer - return result, nil + i := 0 + err := s.kademlia.EachConnFiltered(s.pivot, s.capabilityIndex, 255, func(p *network.Peer, po int) bool { + result = append(result, ForwardPeer{Peer: p}) + i++ + if i == numPeers { + return false + } + return true + }) + return result, err } diff --git a/network/forward/forward_test.go b/network/forward/forward_test.go index 65ffe6df74..a37c8df259 100644 --- a/network/forward/forward_test.go +++ b/network/forward/forward_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/ethersphere/swarm/network" + "github.com/ethersphere/swarm/network/capability" "github.com/ethersphere/swarm/pot" "github.com/ethersphere/swarm/testutil" ) @@ -25,8 +26,8 @@ func TestNew(t *testing.T) { if !bytes.Equal(fwdBase.pivot, addr) { t.Fatalf("pivot base; expected %x, got %x", addr, fwdBase.pivot) } - if fwdBase.id != 0 { - t.Fatalf("sessionId; expected %d, got %d", 42, fwdBase.id) + if fwdBase.id != 1 { + t.Fatalf("sessionId; expected %d, got %d", 1, fwdBase.id) } bytesNear := pot.NewAddressFromString("00000001") @@ -35,8 +36,8 @@ func TestNew(t *testing.T) { if !bytes.Equal(fwdExplicit.pivot, bytesNear) { t.Fatalf("pivot explicit; expected %x, got %x", bytesNear, fwdExplicit.pivot) } - if fwdExplicit.id != 1 { - t.Fatalf("sessionId; expected %d, got %d", 43, fwdExplicit.id) + if fwdExplicit.id != 2 { + t.Fatalf("sessionId; expected %d, got %d", 2, fwdExplicit.id) } if fwdExplicit.capabilityIndex != capabilityIndex { t.Fatalf("capabilityindex, expected %s, got %s", capabilityIndex, fwdExplicit.capabilityIndex) @@ -79,52 +80,58 @@ func TestManagerContext(t *testing.T) { t.Fatalf("to context id; expected %x, got %x", fwdTwo.pivot, sctx.Address) } - sctx = NewSessionContext("bar", newAddr) + sctx = NewSessionContext("", nil) + sctx.SessionId = 3 fwdThree, err := mgr.FromContext(sctx) if err != nil { t.Fatal(err) } - if fwdThree.id != 3 { - t.Fatalf("from new context id; expected %d, got %d", 3, fwdThree.id) - } - if fwdThree.capabilityIndex != sctx.CapabilityIndex { - t.Fatalf("to context id; expected %s, got %s", fwdThree.capabilityIndex, sctx.CapabilityIndex) - } - if !bytes.Equal(fwdThree.pivot, sctx.Address) { - t.Fatalf("to context id; expected %x, got %x", fwdThree.pivot, sctx.Address) + if fwdThree != fwdTwo { + t.Fatalf("from new context; expected %p, got %p", fwdTwo, fwdThree) } } -//func TestGet() { -//addr := make([]byte, 32) -// kadParams := network.NewKadParams() -// kad := network.NewKademlia(addr, kadParams) -// cp := capability.NewCapability(4, 2) -// kad.RegisterCapabilityIndex("foo", *cp) -// -// bytesFar := pot.NewAddressFromString("10000000") -// bytesNear := pot.NewAddressFromString("00000001") -// addrFar := network.NewBzzAddr(bytesFar, []byte{}) -// addrNear := network.NewBzzAddr(bytesNear, []byte{}) -// addrFar.Capabilities.Add(cp) -// addrNear.Capabilities.Add(cp) -// peerFar := network.NewPeer(&network.BzzPeer{BzzAddr: addrFar}, kad) -// peerNear := network.NewPeer(&network.BzzPeer{BzzAddr: addrNear}, kad) -// kad.Register(addrFar) -// kad.Register(addrNear) -// kad.On(peerFar) -// kad.On(peerNear) -// -// -//resultNear, err := fwdBase.Get(1) -// if err != nil { -// t.Fatal(err) -// } -// if len(resultNear) != 1 { -// t.Fatalf("peer missing, expected %d, got %d", 1, len(resultNear)) -// } -// if !bytes.Equal(resultNear[0].Address(), addrNear.Address()) { -// t.Fatalf("peer mismatch, expected %x, got %x", addrNear.Address(), resultNear[0].Address()) -// } +func TestGet(t *testing.T) { + bytesOwn := pot.NewAddressFromString("00000000") + kadParams := network.NewKadParams() + kad := network.NewKademlia(bytesOwn, kadParams) + cp := capability.NewCapability(4, 2) + kad.RegisterCapabilityIndex("foo", *cp) + + bytesFar := pot.NewAddressFromString("10000000") + bytesNear := pot.NewAddressFromString("00000001") + addrFar := network.NewBzzAddr(bytesFar, []byte{}) + addrNear := network.NewBzzAddr(bytesNear, []byte{}) + addrFar.Capabilities.Add(cp) + addrNear.Capabilities.Add(cp) + peerFar := network.NewPeer(&network.BzzPeer{BzzAddr: addrFar}, kad) + peerNear := network.NewPeer(&network.BzzPeer{BzzAddr: addrNear}, kad) + kad.Register(addrFar) + kad.Register(addrNear) + kad.On(peerFar) + kad.On(peerNear) + + mgr := NewSessionManager(kad) + fwd := mgr.New("foo", nil) + p, err := fwd.Get(1) + if err != nil { + t.Fatal(err) + } + if len(p) != 1 { + t.Fatalf("get first count; expected 1, got %d", len(p)) + } + if !bytes.Equal(p[0].Address(), bytesNear) { + t.Fatalf("get first address; expected %x, got %x", bytesNear, p[0].Address()) + } -//} + p, err = fwd.Get(1) + if err != nil { + t.Fatal(err) + } + if len(p) != 1 { + t.Fatalf("get peers count; expected 1, got %d", len(p)) + } + if !bytes.Equal(p[0].Address(), bytesFar) { + t.Fatalf("get second address; expected %x, got %x", bytesFar, p[0].Address()) + } +} diff --git a/network/forward/types.go b/network/forward/types.go index aef8259444..702ba6dc28 100644 --- a/network/forward/types.go +++ b/network/forward/types.go @@ -11,7 +11,7 @@ var ( ) type ForwardPeer struct { - *network.BzzPeer + *network.Peer } type SessionInterface interface { From 60eae052e237ee1e11d5042b499f43dc7450cf24 Mon Sep 17 00:00:00 2001 From: nolash Date: Wed, 18 Dec 2019 10:40:23 +0100 Subject: [PATCH 09/12] forward: Implement cursor for Get --- network/forward/forward.go | 29 ++++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/network/forward/forward.go b/network/forward/forward.go index db2b92c285..362895bb9c 100644 --- a/network/forward/forward.go +++ b/network/forward/forward.go @@ -13,6 +13,9 @@ type Session struct { pivot []byte id int capabilityIndex string + loaded bool + cache []ForwardPeer + last int } type SessionManager struct { @@ -49,6 +52,7 @@ func (m *SessionManager) New(capabilityIndex string, pivot []byte) *Session { } else { s.pivot = pivot } + _ = s.load() // handle error! return m.add(s) } @@ -80,17 +84,24 @@ func (m *SessionManager) FromContext(sctx *SessionContext) (*Session, error) { return m.New(capabilityIndex, addr), nil } -func (s *Session) Get(numPeers int) ([]ForwardPeer, error) { - var result []ForwardPeer +func (s *Session) Get(numPeers int) ([]*ForwardPeer, error) { + var result []*ForwardPeer + target := s.last + numPeers + if target > len(s.cache) { + target = len(s.cache) + } + for i := s.last; i < target; i++ { + result = append(result, &s.cache[i]) + s.last++ + } + return result, nil +} - i := 0 +func (s *Session) load() error { + s.cache = []ForwardPeer{} err := s.kademlia.EachConnFiltered(s.pivot, s.capabilityIndex, 255, func(p *network.Peer, po int) bool { - result = append(result, ForwardPeer{Peer: p}) - i++ - if i == numPeers { - return false - } + s.cache = append(s.cache, ForwardPeer{Peer: p}) return true }) - return result, err + return err } From 0740f3a91b8ad0aa1d49239dec62740acce53291 Mon Sep 17 00:00:00 2001 From: nolash Date: Wed, 18 Dec 2019 13:36:31 +0100 Subject: [PATCH 10/12] pot, network: Add load balancer, asynchronous blocking iterator --- network/forward/forward.go | 45 +++++++++++++++++++------------ network/forward/forward_test.go | 22 ++++++++++++--- network/kademlia_load_balancer.go | 6 +++++ pot/address.go | 4 +++ 4 files changed, 56 insertions(+), 21 deletions(-) diff --git a/network/forward/forward.go b/network/forward/forward.go index 362895bb9c..23360a7baa 100644 --- a/network/forward/forward.go +++ b/network/forward/forward.go @@ -9,23 +9,22 @@ import ( ) type Session struct { - kademlia *network.Kademlia + kademlia *network.KademliaLoadBalancer pivot []byte id int capabilityIndex string - loaded bool - cache []ForwardPeer - last int + nextC chan struct{} + getC chan *ForwardPeer } type SessionManager struct { sessions map[int]*Session - kademlia *network.Kademlia + kademlia *network.KademliaLoadBalancer lastId int // starts at 1 to make create from context easier mu sync.Mutex } -func NewSessionManager(kademlia *network.Kademlia) *SessionManager { +func NewSessionManager(kademlia *network.KademliaLoadBalancer) *SessionManager { return &SessionManager{ sessions: make(map[int]*Session), kademlia: kademlia, @@ -46,13 +45,15 @@ func (m *SessionManager) New(capabilityIndex string, pivot []byte) *Session { s := &Session{ capabilityIndex: capabilityIndex, kademlia: m.kademlia, + nextC: make(chan struct{}), + getC: make(chan *ForwardPeer), } if pivot == nil { s.pivot = m.kademlia.BaseAddr() } else { s.pivot = pivot } - _ = s.load() // handle error! + go s.load() return m.add(s) } @@ -86,22 +87,32 @@ func (m *SessionManager) FromContext(sctx *SessionContext) (*Session, error) { func (s *Session) Get(numPeers int) ([]*ForwardPeer, error) { var result []*ForwardPeer - target := s.last + numPeers - if target > len(s.cache) { - target = len(s.cache) - } - for i := s.last; i < target; i++ { - result = append(result, &s.cache[i]) - s.last++ + for i := 0; i < numPeers; i++ { + s.nextC <- struct{}{} + p, ok := <-s.getC + if !ok { + break + } + result = append(result, p) } return result, nil } func (s *Session) load() error { - s.cache = []ForwardPeer{} - err := s.kademlia.EachConnFiltered(s.pivot, s.capabilityIndex, 255, func(p *network.Peer, po int) bool { - s.cache = append(s.cache, ForwardPeer{Peer: p}) + err := s.kademlia.EachBinFiltered(s.pivot, s.capabilityIndex, func(bin network.LBBin) bool { + for _, p := range bin.LBPeers { + _, ok := <-s.nextC + if !ok { + return false + } + s.getC <- &ForwardPeer{Peer: p.Peer} + } return true }) + close(s.getC) return err } + +func (s *Session) Close() { + close(s.nextC) +} diff --git a/network/forward/forward_test.go b/network/forward/forward_test.go index a37c8df259..d2fcdcbf1b 100644 --- a/network/forward/forward_test.go +++ b/network/forward/forward_test.go @@ -4,6 +4,7 @@ import ( "bytes" "testing" + "github.com/ethersphere/swarm/log" "github.com/ethersphere/swarm/network" "github.com/ethersphere/swarm/network/capability" "github.com/ethersphere/swarm/pot" @@ -20,9 +21,12 @@ func TestNew(t *testing.T) { addr[31] = 0x01 kadParams := network.NewKadParams() kad := network.NewKademlia(addr, kadParams) + kadLB := network.NewKademliaLoadBalancer(kad, false) + defer kadLB.Stop() - mgr := NewSessionManager(kad) + mgr := NewSessionManager(kadLB) fwdBase := mgr.New("", nil) + defer fwdBase.Close() if !bytes.Equal(fwdBase.pivot, addr) { t.Fatalf("pivot base; expected %x, got %x", addr, fwdBase.pivot) } @@ -52,10 +56,14 @@ func TestManagerContext(t *testing.T) { addr[31] = 0x01 kadParams := network.NewKadParams() kad := network.NewKademlia(addr, kadParams) + kadLB := network.NewKademliaLoadBalancer(kad, false) + defer kadLB.Stop() - mgr := NewSessionManager(kad) - _ = mgr.New("", nil) // id 1 + mgr := NewSessionManager(kadLB) + fwdVoid := mgr.New("", nil) // id 1 + defer fwdVoid.Close() fwdOne := mgr.New("", nil) // id 2 + defer fwdOne.Close() if len(mgr.sessions) != 2 { t.Fatalf("mgr session length; expected 2, got %d", len(mgr.sessions)) } @@ -66,6 +74,7 @@ func TestManagerContext(t *testing.T) { newAddr := make([]byte, 32) newAddr[31] = 0x02 fwdTwo := mgr.New("foo", newAddr) // id 3 + defer fwdTwo.Close() sctx, err := mgr.ToContext(3) if err != nil { t.Fatal(err) @@ -95,6 +104,8 @@ func TestGet(t *testing.T) { bytesOwn := pot.NewAddressFromString("00000000") kadParams := network.NewKadParams() kad := network.NewKademlia(bytesOwn, kadParams) + kadLB := network.NewKademliaLoadBalancer(kad, false) + defer kadLB.Stop() cp := capability.NewCapability(4, 2) kad.RegisterCapabilityIndex("foo", *cp) @@ -111,7 +122,7 @@ func TestGet(t *testing.T) { kad.On(peerFar) kad.On(peerNear) - mgr := NewSessionManager(kad) + mgr := NewSessionManager(kadLB) fwd := mgr.New("foo", nil) p, err := fwd.Get(1) if err != nil { @@ -134,4 +145,7 @@ func TestGet(t *testing.T) { if !bytes.Equal(p[0].Address(), bytesFar) { t.Fatalf("get second address; expected %x, got %x", bytesFar, p[0].Address()) } + log.Trace("peer", "peer", p) + + fwd.Close() } diff --git a/network/kademlia_load_balancer.go b/network/kademlia_load_balancer.go index a710554e60..35d8c44f33 100644 --- a/network/kademlia_load_balancer.go +++ b/network/kademlia_load_balancer.go @@ -24,6 +24,7 @@ import ( ) // KademliaBackend is the required interface of KademliaLoadBalancer. +// TODO: Consider if KademliaLoadBalancer itself should implement KademliaBackend type KademliaBackend interface { SubscribeToPeerChanges() *pubsubchannel.Subscription BaseAddr() []byte @@ -124,6 +125,11 @@ func (klb *KademliaLoadBalancer) EachBinDesc(base []byte, consumeBin LBBinConsum }) } +// BaseAddr returns the base address of the underlying kademlia backend +func (klb *KademliaLoadBalancer) BaseAddr() []byte { + return klb.kademlia.BaseAddr() +} + func (klb *KademliaLoadBalancer) peerBinToPeerList(bin *PeerBin) []LBPeer { resources := make([]resourceusestats.Resource, bin.Size) var i int diff --git a/pot/address.go b/pot/address.go index cc88c35d37..1ec916c2d5 100644 --- a/pot/address.go +++ b/pot/address.go @@ -79,6 +79,10 @@ func (a Address) Bytes() []byte { return a[:] } +func (a Address) Address() []byte { + return a[:] +} + // Distance returns the distance between address x and address y as a (comparable) big integer using the distance metric defined in the swarm specification // Fails if not all addresses are of equal length func Distance(x, y []byte) (*big.Int, error) { From c42f7f8c67a3694b34bc25ff2c83db8844862ded Mon Sep 17 00:00:00 2001 From: nolash Date: Wed, 18 Dec 2019 14:05:04 +0100 Subject: [PATCH 11/12] forward: Add comments and simplify --- network/forward/forward.go | 140 +++++++++++++++++++++----------- network/forward/forward_test.go | 26 +++--- network/forward/types.go | 29 +++---- 3 files changed, 116 insertions(+), 79 deletions(-) diff --git a/network/forward/forward.go b/network/forward/forward.go index 23360a7baa..9d077f29d3 100644 --- a/network/forward/forward.go +++ b/network/forward/forward.go @@ -1,6 +1,7 @@ package forward import ( + "errors" "fmt" "sync" @@ -8,15 +9,67 @@ import ( "github.com/ethersphere/swarm/network" ) +var ( + NoMorePeers = errors.New("no more peers") +) + +// Session encapsulates one single peer iteration query type Session struct { - kademlia *network.KademliaLoadBalancer - pivot []byte + kademlia *network.KademliaLoadBalancer // kademlia backend + base []byte // id int capabilityIndex string nextC chan struct{} - getC chan *ForwardPeer + getC chan *network.Peer +} + +// Id returns the session id +func (s *Session) Id() int { + return s.id +} + +// Get returns up to numPeers peers from the current position of the iterator +// If no further peers are available a NoMorePeers error will be returned +func (s *Session) Get(numPeers int) ([]*network.Peer, error) { + var result []*network.Peer + select { + case <-s.getC: + return result, NoMorePeers + default: + } + for i := 0; i < numPeers; i++ { + s.nextC <- struct{}{} + p, ok := <-s.getC + if !ok { + break + } + result = append(result, p) + } + return result, nil +} + +// starts the iterator and blocks for request for next peer through Get() +func (s *Session) load() error { + err := s.kademlia.EachBinFiltered(s.base, s.capabilityIndex, func(bin network.LBBin) bool { + for _, p := range bin.LBPeers { + _, ok := <-s.nextC + if !ok { + return false + } + s.getC <- p.Peer + } + return true + }) + close(s.getC) + return err +} + +// frees resources +func (s *Session) destroy() { + close(s.nextC) } +// SessionManager is the Session object factory type SessionManager struct { sessions map[int]*Session kademlia *network.KademliaLoadBalancer @@ -24,6 +77,9 @@ type SessionManager struct { mu sync.Mutex } +// NewSessionManager is the SessionManager constructor +// Sessions created with the SessionManager will use the provided kademlia backend +// TODO: argument should be network.KademliaBackend, but needs KademliaLoadBalancer to implement this func NewSessionManager(kademlia *network.KademliaLoadBalancer) *SessionManager { return &SessionManager{ sessions: make(map[int]*Session), @@ -31,32 +87,36 @@ func NewSessionManager(kademlia *network.KademliaLoadBalancer) *SessionManager { } } -func (m *SessionManager) add(s *Session) *Session { - m.mu.Lock() - defer m.mu.Unlock() - m.lastId++ - log.Trace("adding session", "id", m.lastId) - s.id = m.lastId - m.sessions[m.lastId] = s - return s -} - -func (m *SessionManager) New(capabilityIndex string, pivot []byte) *Session { +// New creates a new Session object with the given capabilityindex and base address +// if capabilityIndex is empty, the global kademlia database will be used +// if base is nil, the kademlia base address will be used as comparator for the iteration +func (m *SessionManager) New(capabilityIndex string, base []byte) *Session { s := &Session{ capabilityIndex: capabilityIndex, kademlia: m.kademlia, nextC: make(chan struct{}), - getC: make(chan *ForwardPeer), + getC: make(chan *network.Peer), } - if pivot == nil { - s.pivot = m.kademlia.BaseAddr() + if base == nil { + s.base = m.kademlia.BaseAddr() } else { - s.pivot = pivot + s.base = base } go s.load() return m.add(s) } +// Reap frees the Session object resources and removes it from the session index +func (m *SessionManager) Reap(sessionId int) { + s, ok := m.sessions[sessionId] + if !ok { + return + } + s.destroy() +} + +// ToContext creates a SessionContext from the existing Session matching the provided id +// if the session does not exist an error is returned func (m *SessionManager) ToContext(id int) (*SessionContext, error) { s, ok := m.sessions[id] if !ok { @@ -65,10 +125,13 @@ func (m *SessionManager) ToContext(id int) (*SessionContext, error) { return &SessionContext{ CapabilityIndex: s.capabilityIndex, SessionId: s.id, - Address: s.pivot, + Address: s.base, }, nil } +// FromContext retrieves or creates a Session from a provided context +// If the context has the "id" value set, the corresponding Session is returned, or error if it does not exist +// Otherwise, a new Session is created and returned, optionally with the "address" and/or "capability" values provided in the context func (m *SessionManager) FromContext(sctx *SessionContext) (*Session, error) { sessionId, ok := sctx.Value("id").(int) @@ -85,34 +148,13 @@ func (m *SessionManager) FromContext(sctx *SessionContext) (*Session, error) { return m.New(capabilityIndex, addr), nil } -func (s *Session) Get(numPeers int) ([]*ForwardPeer, error) { - var result []*ForwardPeer - for i := 0; i < numPeers; i++ { - s.nextC <- struct{}{} - p, ok := <-s.getC - if !ok { - break - } - result = append(result, p) - } - return result, nil -} - -func (s *Session) load() error { - err := s.kademlia.EachBinFiltered(s.pivot, s.capabilityIndex, func(bin network.LBBin) bool { - for _, p := range bin.LBPeers { - _, ok := <-s.nextC - if !ok { - return false - } - s.getC <- &ForwardPeer{Peer: p.Peer} - } - return true - }) - close(s.getC) - return err -} - -func (s *Session) Close() { - close(s.nextC) +// adds a new session to the sessionmanager +func (m *SessionManager) add(s *Session) *Session { + m.mu.Lock() + defer m.mu.Unlock() + m.lastId++ + log.Trace("adding session", "id", m.lastId) + s.id = m.lastId + m.sessions[m.lastId] = s + return s } diff --git a/network/forward/forward_test.go b/network/forward/forward_test.go index d2fcdcbf1b..70f84b8796 100644 --- a/network/forward/forward_test.go +++ b/network/forward/forward_test.go @@ -15,8 +15,8 @@ func init() { testutil.Init() } +// TestNew tests that the SessionManager constructor creates Session object with expected values func TestNew(t *testing.T) { - addr := make([]byte, 32) addr[31] = 0x01 kadParams := network.NewKadParams() @@ -26,9 +26,9 @@ func TestNew(t *testing.T) { mgr := NewSessionManager(kadLB) fwdBase := mgr.New("", nil) - defer fwdBase.Close() - if !bytes.Equal(fwdBase.pivot, addr) { - t.Fatalf("pivot base; expected %x, got %x", addr, fwdBase.pivot) + defer mgr.Reap(fwdBase.Id()) + if !bytes.Equal(fwdBase.base, addr) { + t.Fatalf("base base; expected %x, got %x", addr, fwdBase.base) } if fwdBase.id != 1 { t.Fatalf("sessionId; expected %d, got %d", 1, fwdBase.id) @@ -37,8 +37,8 @@ func TestNew(t *testing.T) { bytesNear := pot.NewAddressFromString("00000001") capabilityIndex := "foo" fwdExplicit := mgr.New(capabilityIndex, bytesNear) - if !bytes.Equal(fwdExplicit.pivot, bytesNear) { - t.Fatalf("pivot explicit; expected %x, got %x", bytesNear, fwdExplicit.pivot) + if !bytes.Equal(fwdExplicit.base, bytesNear) { + t.Fatalf("base explicit; expected %x, got %x", bytesNear, fwdExplicit.base) } if fwdExplicit.id != 2 { t.Fatalf("sessionId; expected %d, got %d", 2, fwdExplicit.id) @@ -51,6 +51,7 @@ func TestNew(t *testing.T) { } } +// TestManagerContext tests that the SessionManager's context translations creates Session objects with expected values, and retrieves existing matching Session objects func TestManagerContext(t *testing.T) { addr := make([]byte, 32) addr[31] = 0x01 @@ -61,9 +62,9 @@ func TestManagerContext(t *testing.T) { mgr := NewSessionManager(kadLB) fwdVoid := mgr.New("", nil) // id 1 - defer fwdVoid.Close() + defer mgr.Reap(fwdVoid.Id()) fwdOne := mgr.New("", nil) // id 2 - defer fwdOne.Close() + defer mgr.Reap(fwdOne.Id()) if len(mgr.sessions) != 2 { t.Fatalf("mgr session length; expected 2, got %d", len(mgr.sessions)) } @@ -74,7 +75,7 @@ func TestManagerContext(t *testing.T) { newAddr := make([]byte, 32) newAddr[31] = 0x02 fwdTwo := mgr.New("foo", newAddr) // id 3 - defer fwdTwo.Close() + defer mgr.Reap(fwdTwo.Id()) sctx, err := mgr.ToContext(3) if err != nil { t.Fatal(err) @@ -85,8 +86,8 @@ func TestManagerContext(t *testing.T) { if fwdTwo.capabilityIndex != sctx.CapabilityIndex { t.Fatalf("to context id; expected %s, got %s", fwdTwo.capabilityIndex, sctx.CapabilityIndex) } - if !bytes.Equal(fwdTwo.pivot, sctx.Address) { - t.Fatalf("to context id; expected %x, got %x", fwdTwo.pivot, sctx.Address) + if !bytes.Equal(fwdTwo.base, sctx.Address) { + t.Fatalf("to context id; expected %x, got %x", fwdTwo.base, sctx.Address) } sctx = NewSessionContext("", nil) @@ -100,6 +101,7 @@ func TestManagerContext(t *testing.T) { } } +// TestGet verifies that the synchronous Get method retrieves peers in the correct order func TestGet(t *testing.T) { bytesOwn := pot.NewAddressFromString("00000000") kadParams := network.NewKadParams() @@ -124,6 +126,7 @@ func TestGet(t *testing.T) { mgr := NewSessionManager(kadLB) fwd := mgr.New("foo", nil) + defer mgr.Reap(fwd.Id()) p, err := fwd.Get(1) if err != nil { t.Fatal(err) @@ -147,5 +150,4 @@ func TestGet(t *testing.T) { } log.Trace("peer", "peer", p) - fwd.Close() } diff --git a/network/forward/types.go b/network/forward/types.go index 702ba6dc28..ec5a6597c7 100644 --- a/network/forward/types.go +++ b/network/forward/types.go @@ -10,42 +10,43 @@ var ( zeroTime = time.Unix(0, 0) ) -type ForwardPeer struct { - *network.Peer -} - +// SessionInterface provides an interface for an individual session object type SessionInterface interface { - Subscribe() <-chan ForwardPeer - Get(numberOfPeers int) ([]ForwardPeer, error) - Close() + Subscribe() <-chan *network.Peer + Get(numberOfPeers int) ([]*network.Peer, error) } -// also implements context.Context +// SessionContext is a context.Context that can be used to reference existing sessions or create new sessions type SessionContext struct { CapabilityIndex string SessionId int Address []byte } -func NewSessionContext(capabilityIndex string, addr []byte) *SessionContext { +// NewSessionContext creates a new SessionContext with the provided capabilityIndex and base address +func NewSessionContext(capabilityIndex string, base []byte) *SessionContext { return &SessionContext{ CapabilityIndex: capabilityIndex, - Address: addr, + Address: base, } } +// Deadline implements context.Context func (c *SessionContext) Deadline() (time.Time, bool) { return zeroTime, false } +// Done implements context.Context func (c *SessionContext) Done() <-chan struct{} { return nil } +// Err implements context.Context func (c *SessionContext) Err() error { return nil } +// Value implements context.Context func (c *SessionContext) Value(k interface{}) interface{} { ks, ok := k.(string) if !ok { @@ -67,11 +68,3 @@ func (c *SessionContext) Value(k interface{}) interface{} { } return nil } - -func (c *SessionContext) SetAddress(addr []byte) { - c.Address = addr -} - -func (c *SessionContext) SetCapability(capabilityIndex string) { - c.CapabilityIndex = capabilityIndex -} From 299ca7b4abf0a7f7b19d07ca9c0e0382fd92ce57 Mon Sep 17 00:00:00 2001 From: nolash Date: Wed, 18 Dec 2019 14:16:15 +0100 Subject: [PATCH 12/12] forward: Add missing member comments --- network/forward/forward.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/network/forward/forward.go b/network/forward/forward.go index 9d077f29d3..28094f4d3d 100644 --- a/network/forward/forward.go +++ b/network/forward/forward.go @@ -16,11 +16,11 @@ var ( // Session encapsulates one single peer iteration query type Session struct { kademlia *network.KademliaLoadBalancer // kademlia backend - base []byte // - id int - capabilityIndex string - nextC chan struct{} - getC chan *network.Peer + base []byte // base address to use for iteration + id int // id of session + capabilityIndex string // kademlia capabilityIndex in use + nextC chan struct{} // triggered to output one single peer from iterator + getC chan *network.Peer // receives peer from iterator } // Id returns the session id @@ -71,10 +71,10 @@ func (s *Session) destroy() { // SessionManager is the Session object factory type SessionManager struct { - sessions map[int]*Session - kademlia *network.KademliaLoadBalancer - lastId int // starts at 1 to make create from context easier - mu sync.Mutex + kademlia *network.KademliaLoadBalancer // underlying kademlia backend + sessions map[int]*Session // index of active sessions, mapped by session id + lastId int // last assigned id for session, starts at 1 to make create from context easier + mu sync.Mutex // protects sessions map } // NewSessionManager is the SessionManager constructor