Skip to content

Commit a1d0543

Browse files
committed
add Topic option to skip publishing to partial message-capable peers
1 parent e8d2979 commit a1d0543

File tree

4 files changed

+137
-4
lines changed

4 files changed

+137
-4
lines changed

gossipsub.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1295,6 +1295,13 @@ func (gs *GossipSubRouter) PublishBatch(messages []*Message, opts *BatchPublishO
12951295
}
12961296
}
12971297

1298+
func (gs *GossipSubRouter) skipPartialMessageCapablePeers(topicID string) bool {
1299+
if t, ok := gs.p.myTopics[topicID]; ok {
1300+
return t.skipPublishingToPartialMessageCapablePeers
1301+
}
1302+
return false
1303+
}
1304+
12981305
func (gs *GossipSubRouter) Publish(msg *Message) {
12991306
for p, rpc := range gs.rpcs(msg) {
13001307
gs.sendRPC(p, rpc, false)
@@ -1371,8 +1378,9 @@ func (gs *GossipSubRouter) rpcs(msg *Message) iter.Seq2[peer.ID, *RPC] {
13711378
}
13721379

13731380
out := rpcWithMessages(msg.Message)
1381+
skipPMPeers := gs.skipPartialMessageCapablePeers(msg.GetTopic())
13741382
for pid := range tosend {
1375-
if pid == from || pid == peer.ID(msg.GetFrom()) {
1383+
if pid == from || pid == peer.ID(msg.GetFrom()) || (skipPMPeers && gs.extensions.peerExtensions[pid].PartialMessages) {
13761384
continue
13771385
}
13781386

gossipsub_test.go

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,15 @@ func TestGossipSubBootstrapParamsValidate(t *testing.T) {
9393
}
9494
}
9595

96+
func getGossipsubsOptFn(ctx context.Context, hs []host.Host, optFn func(int, host.Host) []Option) []*PubSub {
97+
var psubs []*PubSub
98+
for i, h := range hs {
99+
opts := optFn(i, h)
100+
psubs = append(psubs, getGossipsub(ctx, h, opts...))
101+
}
102+
return psubs
103+
}
104+
96105
func TestSparseGossipsub(t *testing.T) {
97106
ctx, cancel := context.WithCancel(context.Background())
98107
defer cancel()
@@ -4655,3 +4664,101 @@ outer:
46554664
t.Errorf("Expected no missing parts, got %v", missing)
46564665
}
46574666
}
4667+
4668+
func TestSkipPublishingToPeersWithPartialMessageSupport(t *testing.T) {
4669+
topicName := "test-topic"
4670+
4671+
// 3 hosts.
4672+
// hosts[0]: Publisher. Supports partial messages
4673+
// hosts[1]: Subscriber. Supports partial messages
4674+
// hosts[2]: Alternate publisher. Does not support partial messages. Only
4675+
// connected to hosts[0]
4676+
hosts := getDefaultHosts(t, 3)
4677+
4678+
partialExt := make([]*partialmessages.PartialMessageExtension, 2)
4679+
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug}))
4680+
4681+
for i := range partialExt {
4682+
partialExt[i] = &partialmessages.PartialMessageExtension{
4683+
Logger: logger,
4684+
ValidateRPC: func(from peer.ID, rpc *pb.PartialMessagesExtension) error {
4685+
return nil
4686+
},
4687+
EagerIWantLimitPerHeartbeat: 0,
4688+
IWantLimitPerHeartbeat: 1,
4689+
NewPartialMessage: func(topic string, groupID []byte) (partialmessages.PartialMessage, error) {
4690+
return &minimalTestPartialMessage{
4691+
Group: groupID,
4692+
onExtended: func(m *minimalTestPartialMessage) {
4693+
t.Logf("Received new part and extended partial message")
4694+
},
4695+
}, nil
4696+
},
4697+
}
4698+
}
4699+
4700+
psubs := make([]*PubSub, 0, len(hosts)-1)
4701+
for i, h := range hosts[:2] {
4702+
psub := getGossipsub(context.Background(), h, WithPartialMessagesExtension(partialExt[i]))
4703+
psubs = append(psubs, psub)
4704+
}
4705+
4706+
nonPartialPubsub := getGossipsub(context.Background(), hosts[2])
4707+
4708+
denseConnect(t, hosts[:2])
4709+
time.Sleep(2 * time.Second)
4710+
4711+
// Connect nonPartialPubsub to the publisher
4712+
connect(t, hosts[0], hosts[2])
4713+
4714+
var topics []*Topic
4715+
var subs []*Subscription
4716+
for _, psub := range psubs {
4717+
topic, err := psub.Join(topicName, WithSkipPublishingToPartialMessageCapablePeers())
4718+
if err != nil {
4719+
t.Fatal(err)
4720+
}
4721+
topics = append(topics, topic)
4722+
s, err := topic.Subscribe()
4723+
if err != nil {
4724+
t.Fatal(err)
4725+
}
4726+
subs = append(subs, s)
4727+
}
4728+
4729+
topicForNonPartial, err := nonPartialPubsub.Join(topicName)
4730+
if err != nil {
4731+
t.Fatal(err)
4732+
}
4733+
4734+
// Wait for subscriptions to propagate
4735+
time.Sleep(time.Second)
4736+
4737+
topics[0].Publish(context.Background(), []byte("Hello"))
4738+
4739+
// Publish from another peer, the publisher (psub[0]) should not forward this to psub[1].
4740+
topicForNonPartial.Publish(context.Background(), []byte("from non-partial"))
4741+
4742+
recvdMessage := make(chan struct{}, 1)
4743+
ctx, cancel := context.WithCancel(context.Background())
4744+
defer cancel()
4745+
go func() {
4746+
msg, err := subs[1].Next(ctx)
4747+
if err == context.Canceled {
4748+
return
4749+
}
4750+
if err != nil {
4751+
t.Log(err)
4752+
t.Fail()
4753+
return
4754+
}
4755+
t.Log("Received msg", string(msg.Data))
4756+
recvdMessage <- struct{}{}
4757+
}()
4758+
4759+
select {
4760+
case <-recvdMessage:
4761+
t.Fatal("Received message")
4762+
case <-time.After(2 * time.Second):
4763+
}
4764+
}

pubsub.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1376,7 +1376,7 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) {
13761376
continue
13771377
}
13781378

1379-
msg := &Message{pmsg, "", rpc.from, nil, false}
1379+
msg := &Message{Message: pmsg, ID: "", ReceivedFrom: rpc.from, ValidatorData: nil, Local: false}
13801380
if p.shouldPush(msg) {
13811381
toPush = append(toPush, msg)
13821382
}
@@ -1515,7 +1515,16 @@ type rmTopicReq struct {
15151515
resp chan error
15161516
}
15171517

1518-
type TopicOptions struct{}
1518+
type TopicOptions struct {
1519+
SkipPublishingToPartialMessageCapablePeers bool
1520+
}
1521+
1522+
func WithSkipPublishingToPartialMessageCapablePeers() TopicOpt {
1523+
return func(t *Topic) error {
1524+
t.skipPublishingToPartialMessageCapablePeers = true
1525+
return nil
1526+
}
1527+
}
15191528

15201529
type TopicOpt func(t *Topic) error
15211530

topic.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ type Topic struct {
3232

3333
mux sync.RWMutex
3434
closed bool
35+
36+
skipPublishingToPartialMessageCapablePeers bool
3537
}
3638

3739
// String returns the topic associated with t
@@ -348,7 +350,14 @@ func (t *Topic) validate(ctx context.Context, data []byte, opts ...PubOpt) (*Mes
348350
}
349351
}
350352

351-
msg := &Message{m, "", t.p.host.ID(), pub.validatorData, pub.local}
353+
msg := &Message{
354+
Message: m,
355+
ID: "",
356+
ReceivedFrom: t.p.host.ID(),
357+
ValidatorData: pub.validatorData,
358+
Local: pub.local,
359+
}
360+
352361
select {
353362
case t.p.eval <- func() {
354363
t.p.rt.Preprocess(t.p.host.ID(), []*Message{msg})

0 commit comments

Comments
 (0)