Skip to content

Commit 9da207e

Browse files
thepeterstoneAndrew Poydence
authored and
Andrew Poydence
committed
Add deterministic routing
Signed-off-by: Andrew Poydence <[email protected]>
1 parent f037b97 commit 9da207e

File tree

4 files changed

+179
-33
lines changed

4 files changed

+179
-33
lines changed

internal/node/node.go

+29-9
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,26 @@ package node
22

33
type Node struct {
44
children map[uint64]*Node
5-
subscriptions map[string][]SubscriptionEnvelope
5+
subscriptions map[string]subscriptionInfo
66
shards map[int64]string
77
rand func(int64) int64
88
}
99

10+
type subscriptionInfo struct {
11+
deterministicRoutingCount int
12+
envelopes []SubscriptionEnvelope
13+
}
14+
1015
type SubscriptionEnvelope struct {
1116
Subscription func(interface{})
1217
id int64
18+
dName string
1319
}
1420

1521
func New(int63n func(n int64) int64) *Node {
1622
return &Node{
1723
children: make(map[uint64]*Node),
18-
subscriptions: make(map[string][]SubscriptionEnvelope),
24+
subscriptions: make(map[string]subscriptionInfo),
1925
shards: make(map[int64]string),
2026
rand: int63n,
2127
}
@@ -59,16 +65,24 @@ func (n *Node) ChildLen() int {
5965
return len(n.children)
6066
}
6167

62-
func (n *Node) AddSubscription(s func(interface{}), shardID string) int64 {
68+
func (n *Node) AddSubscription(s func(interface{}), shardID, deterministicRoutingName string) int64 {
6369
if n == nil {
6470
return 0
6571
}
6672

6773
id := n.createAndSetID(shardID)
68-
n.subscriptions[shardID] = append(n.subscriptions[shardID], SubscriptionEnvelope{
74+
75+
si := n.subscriptions[shardID]
76+
si.envelopes = append(si.envelopes, SubscriptionEnvelope{
6977
Subscription: s,
7078
id: id,
79+
dName: deterministicRoutingName,
7180
})
81+
if deterministicRoutingName != "" {
82+
si.deterministicRoutingCount++
83+
}
84+
n.subscriptions[shardID] = si
85+
7286
return id
7387
}
7488

@@ -85,15 +99,21 @@ func (n *Node) DeleteSubscription(id int64) {
8599
delete(n.shards, id)
86100

87101
s := n.subscriptions[shardID]
88-
for i, ss := range s {
102+
for i, ss := range s.envelopes {
89103
if ss.id != id {
90104
continue
91105
}
92106

93-
n.subscriptions[shardID] = append(s[:i], s[i+1:]...)
107+
if ss.dName != "" {
108+
s.deterministicRoutingCount--
109+
}
110+
111+
s.envelopes = append(s.envelopes[:i], s.envelopes[i+1:]...)
112+
break
94113
}
114+
n.subscriptions[shardID] = s
95115

96-
if len(n.subscriptions[shardID]) == 0 {
116+
if len(n.subscriptions[shardID].envelopes) == 0 {
97117
delete(n.subscriptions, shardID)
98118
}
99119
}
@@ -102,13 +122,13 @@ func (n *Node) SubscriptionLen() int {
102122
return len(n.shards)
103123
}
104124

105-
func (n *Node) ForEachSubscription(f func(shardID string, s []SubscriptionEnvelope)) {
125+
func (n *Node) ForEachSubscription(f func(shardID string, isDeterministic bool, s []SubscriptionEnvelope)) {
106126
if n == nil {
107127
return
108128
}
109129

110130
for shardID, s := range n.subscriptions {
111-
f(shardID, s)
131+
f(shardID, s.deterministicRoutingCount > 0, s.envelopes)
112132
}
113133
}
114134

internal/node/node_test.go

+26-6
Original file line numberDiff line numberDiff line change
@@ -50,26 +50,46 @@ func TestNode(t *testing.T) {
5050
})
5151

5252
o.Spec("returns all subscriptions", func(t TN) {
53-
id1 := t.n.AddSubscription(func(interface{}) {}, "")
53+
id1 := t.n.AddSubscription(func(interface{}) {}, "", "")
5454

55-
t.n.AddSubscription(func(interface{}) {}, "")
56-
t.n.AddSubscription(func(interface{}) {}, "")
55+
t.n.AddSubscription(func(interface{}) {}, "", "")
56+
t.n.AddSubscription(func(interface{}) {}, "", "")
5757
t.n.DeleteSubscription(id1)
5858

5959
var ss []func(interface{})
60-
t.n.ForEachSubscription(func(id string, s []node.SubscriptionEnvelope) {
60+
t.n.ForEachSubscription(func(id string, isD bool, s []node.SubscriptionEnvelope) {
6161
for _, x := range s {
6262
ss = append(ss, x.Subscription)
6363
}
64+
Expect(t, isD).To(Equal(false))
6465
})
6566
Expect(t, ss).To(HaveLen(2))
6667
Expect(t, t.n.SubscriptionLen()).To(Equal(2))
6768
})
6869

70+
o.Spec("returns is deterministic if a single route has deterministic name", func(t TN) {
71+
t.n.AddSubscription(func(interface{}) {}, "a", "")
72+
t.n.AddSubscription(func(interface{}) {}, "a", "some-name")
73+
74+
t.n.ForEachSubscription(func(id string, isD bool, s []node.SubscriptionEnvelope) {
75+
Expect(t, isD).To(Equal(true))
76+
})
77+
})
78+
79+
o.Spec("returns is not deterministic if all deterministic names have been deleted", func(t TN) {
80+
t.n.AddSubscription(func(interface{}) {}, "a", "")
81+
id := t.n.AddSubscription(func(interface{}) {}, "a", "some-name")
82+
t.n.DeleteSubscription(id)
83+
84+
t.n.ForEachSubscription(func(id string, isD bool, s []node.SubscriptionEnvelope) {
85+
Expect(t, isD).To(Equal(false))
86+
})
87+
})
88+
6989
o.Spec("it handles ID collisions", func(t TN) {
7090
n := node.New(func(int64) int64 { return 0 })
71-
id1 := n.AddSubscription(func(interface{}) {}, "")
72-
id2 := n.AddSubscription(func(interface{}) {}, "")
91+
id1 := n.AddSubscription(func(interface{}) {}, "", "")
92+
id2 := n.AddSubscription(func(interface{}) {}, "", "")
7393

7494
Expect(t, id1).To(Not(Equal(id2)))
7595
})

pubsub.go

+51-18
Original file line numberDiff line numberDiff line change
@@ -24,25 +24,32 @@ import (
2424
// of PubSub's methods safe to access concurrently. PubSub should be
2525
// constructed with New().
2626
type PubSub struct {
27-
mu rlocker
28-
n *node.Node
29-
rand func(n int64) int64
27+
mu rlocker
28+
n *node.Node
29+
rand func(n int64) int64
30+
deterministicRoutingHasher func(interface{}) uint64
3031
}
3132

3233
// New constructs a new PubSub.
3334
func New(opts ...PubSubOption) *PubSub {
34-
p := &PubSub{
35+
s := &PubSub{
3536
mu: &sync.RWMutex{},
3637
rand: rand.Int63n,
3738
}
3839

3940
for _, o := range opts {
40-
o.configure(p)
41+
o.configure(s)
4142
}
4243

43-
p.n = node.New(p.rand)
44+
if s.deterministicRoutingHasher == nil {
45+
s.deterministicRoutingHasher = func(_ interface{}) uint64 {
46+
return uint64(s.rand(0x7FFFFFFFFFFFFFFF))
47+
}
48+
}
4449

45-
return p
50+
s.n = node.New(s.rand)
51+
52+
return s
4653
}
4754

4855
// PubSubOption is used to configure a PubSub.
@@ -52,25 +59,34 @@ type PubSubOption interface {
5259

5360
type pubsubConfigFunc func(*PubSub)
5461

55-
func (f pubsubConfigFunc) configure(p *PubSub) {
56-
f(p)
62+
func (f pubsubConfigFunc) configure(s *PubSub) {
63+
f(s)
5764
}
5865

5966
// WithNoMutex configures a PubSub that does not have any internal mutexes.
6067
// This is useful if more complex or custom locking is required. For example,
6168
// if a subscription needs to subscribe while being published to.
6269
func WithNoMutex() PubSubOption {
63-
return pubsubConfigFunc(func(p *PubSub) {
64-
p.mu = nopLock{}
70+
return pubsubConfigFunc(func(s *PubSub) {
71+
s.mu = nopLock{}
6572
})
6673
}
6774

6875
// WithRand configures a PubSub that will use the given function to make
6976
// sharding decisions. The given function has to match the symantics of
7077
// math/rand.Int63n.
7178
func WithRand(int63 func(max int64) int64) PubSubOption {
72-
return pubsubConfigFunc(func(p *PubSub) {
73-
p.rand = int63
79+
return pubsubConfigFunc(func(s *PubSub) {
80+
s.rand = int63
81+
})
82+
}
83+
84+
// WithDeterministicHashing configures a PubSub that will use the given
85+
// function to hash each published data point. The hash is used only for a
86+
// subscription that has set its deterministic routing name.
87+
func WithDeterministicHashing(hashFunction func(interface{}) uint64) PubSubOption {
88+
return pubsubConfigFunc(func(s *PubSub) {
89+
s.deterministicRoutingHasher = hashFunction
7490
})
7591
}
7692

@@ -106,9 +122,19 @@ func WithPath(path []uint64) SubscribeOption {
106122
})
107123
}
108124

125+
// WithDeterministicRouting configures a subscription to have a deterministic
126+
// routing name. A PubSub configured to use deterministic hashing will use
127+
// this name and the subscription's shard ID to maintain consistent routing.
128+
func WithDeterministicRouting(name string) SubscribeOption {
129+
return subscribeConfigFunc(func(c *subscribeConfig) {
130+
c.deterministicRoutingName = name
131+
})
132+
}
133+
109134
type subscribeConfig struct {
110-
shardID string
111-
path []uint64
135+
shardID string
136+
deterministicRoutingName string
137+
path []uint64
112138
}
113139

114140
type subscribeConfigFunc func(*subscribeConfig)
@@ -133,7 +159,7 @@ func (s *PubSub) Subscribe(sub Subscription, opts ...SubscribeOption) Unsubscrib
133159
for _, p := range c.path {
134160
n = n.AddChild(p)
135161
}
136-
id := n.AddSubscription(sub, c.shardID)
162+
id := n.AddSubscription(sub, c.shardID, c.deterministicRoutingName)
137163

138164
return func() {
139165
s.mu.Lock()
@@ -261,15 +287,15 @@ func (s *PubSub) traversePublish(d, next interface{}, a TreeTraverser, n *node.N
261287
if n == nil {
262288
return
263289
}
264-
n.ForEachSubscription(func(shardID string, ss []node.SubscriptionEnvelope) {
290+
n.ForEachSubscription(func(shardID string, isDeterministic bool, ss []node.SubscriptionEnvelope) {
265291
if shardID == "" {
266292
for _, x := range ss {
267293
x.Subscription(d)
268294
}
269295
return
270296
}
271297

272-
idx := s.rand(int64(len(ss)))
298+
idx := s.determineIdx(d, len(ss), isDeterministic)
273299
ss[idx].Subscription(d)
274300
})
275301

@@ -291,6 +317,13 @@ func (s *PubSub) traversePublish(d, next interface{}, a TreeTraverser, n *node.N
291317
}
292318
}
293319

320+
func (s *PubSub) determineIdx(d interface{}, l int, isDeterministic bool) int64 {
321+
if isDeterministic {
322+
return int64(s.deterministicRoutingHasher(d) % uint64(l))
323+
}
324+
return s.rand(int64(l))
325+
}
326+
294327
// rlocker is used to hold either a real sync.RWMutex or a nop lock.
295328
// This is used to turn off locking.
296329
type rlocker interface {

pubsub_test.go

+73
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,79 @@ func TestPubSubWithShardID(t *testing.T) {
172172
})
173173
}
174174

175+
func TestPubSubWithShardingScheme(t *testing.T) {
176+
t.Parallel()
177+
o := onpar.New()
178+
defer o.Run(t)
179+
o.BeforeEach(func(t *testing.T) TPS {
180+
s, f := newSpySubscrption()
181+
182+
return TPS{
183+
T: t,
184+
subscription: s,
185+
sub: f,
186+
p: pubsub.New(pubsub.WithDeterministicHashing(func(data interface{}) uint64 {
187+
return uint64(data.(*testStruct).a)
188+
})),
189+
}
190+
})
191+
192+
o.Spec("it splits data between same shardIDs", func(t TPS) {
193+
sub1, f1 := newSpySubscrption()
194+
sub2, f2 := newSpySubscrption()
195+
sub3, f3 := newSpySubscrption()
196+
sub4, f4 := newSpySubscrption()
197+
sub5, f5 := newSpySubscrption()
198+
199+
t.p.Subscribe(f1,
200+
pubsub.WithShardID("1"),
201+
pubsub.WithDeterministicRouting("black"),
202+
pubsub.WithPath(testStructTravCreatePath(&testStructFilter{})),
203+
)
204+
205+
t.p.Subscribe(f2,
206+
pubsub.WithShardID("1"),
207+
pubsub.WithDeterministicRouting("blue"),
208+
pubsub.WithPath(testStructTravCreatePath(&testStructFilter{})),
209+
)
210+
t.p.Subscribe(f3,
211+
pubsub.WithShardID("2"),
212+
pubsub.WithPath(testStructTravCreatePath(&testStructFilter{})),
213+
)
214+
215+
t.p.Subscribe(f4,
216+
pubsub.WithPath(testStructTravCreatePath(&testStructFilter{})),
217+
)
218+
219+
t.p.Subscribe(f5,
220+
pubsub.WithPath(testStructTravCreatePath(&testStructFilter{})),
221+
)
222+
223+
for i := 0; i < 100; i++ {
224+
t.p.Publish(&testStruct{a: 1, b: 2}, testStructTravTraverse)
225+
t.p.Publish(&testStruct{a: 2, b: 3}, testStructTravTraverse)
226+
}
227+
228+
Expect(t, len(sub1.data)).To(Equal(100))
229+
Expect(t, len(sub2.data)).To(Equal(100))
230+
231+
Expect(t, len(sub3.data)).To(Equal(200))
232+
Expect(t, len(sub4.data)).To(Equal(200))
233+
Expect(t, len(sub5.data)).To(Equal(200))
234+
235+
Expect(t, sub1.data[0]).To(Or(
236+
Equal(&testStruct{a: 1, b: 2}),
237+
Equal(&testStruct{a: 2, b: 3}),
238+
))
239+
240+
Expect(t, sub2.data[0]).To(Or(
241+
Equal(&testStruct{a: 1, b: 2}),
242+
Equal(&testStruct{a: 2, b: 3}),
243+
))
244+
Expect(t, sub1.data[0]).To(Not(Equal(sub2.data[0])))
245+
})
246+
}
247+
175248
type spySubscription struct {
176249
mu sync.Mutex
177250
data []interface{}

0 commit comments

Comments
 (0)