Skip to content

Commit 74b015e

Browse files
committed
add Topic option to skip publishing to partial message-capable peers
1 parent 07af472 commit 74b015e

File tree

4 files changed

+143
-5
lines changed

4 files changed

+143
-5
lines changed

gossipsub.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1206,6 +1206,13 @@ func (gs *GossipSubRouter) PublishBatch(messages []*Message, opts *BatchPublishO
12061206
}
12071207
}
12081208

1209+
func (gs *GossipSubRouter) skipPartialMessageCapablePeers(topicID string) bool {
1210+
if t, ok := gs.p.myTopics[topicID]; ok {
1211+
return t.skipPublishingToPartialMessageCapablePeers
1212+
}
1213+
return false
1214+
}
1215+
12091216
func (gs *GossipSubRouter) Publish(msg *Message) {
12101217
for p, rpc := range gs.rpcs(msg) {
12111218
gs.sendRPC(p, rpc, false)
@@ -1282,8 +1289,9 @@ func (gs *GossipSubRouter) rpcs(msg *Message) iter.Seq2[peer.ID, *RPC] {
12821289
}
12831290

12841291
out := rpcWithMessages(msg.Message)
1292+
skipPMPeers := gs.skipPartialMessageCapablePeers(msg.GetTopic())
12851293
for pid := range tosend {
1286-
if pid == from || pid == peer.ID(msg.GetFrom()) {
1294+
if pid == from || pid == peer.ID(msg.GetFrom()) || (skipPMPeers && gs.extensions.peerExtensions[pid].PartialMessages) {
12871295
continue
12881296
}
12891297

gossipsub_test.go

Lines changed: 113 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,27 @@ func TestGossipSubParamsValidate(t *testing.T) {
6262
}
6363
}
6464

65+
func getGossipsubsOptFn(ctx context.Context, hs []host.Host, optFn func(int, host.Host) []Option) []*PubSub {
66+
var psubs []*PubSub
67+
for i, h := range hs {
68+
opts := optFn(i, h)
69+
psubs = append(psubs, getGossipsub(ctx, h, opts...))
70+
}
71+
return psubs
72+
}
73+
6574
func TestSparseGossipsub(t *testing.T) {
6675
ctx, cancel := context.WithCancel(context.Background())
6776
defer cancel()
6877
hosts := getDefaultHosts(t, 20)
6978

70-
psubs := getGossipsubs(ctx, hosts)
79+
psubs := getGossipsubsOptFn(ctx, hosts, func(i int, h host.Host) []Option {
80+
lh := slog.NewJSONHandler(os.Stdout, nil)
81+
logger := slog.New(lh.WithAttrs([]slog.Attr{slog.String("id", h.ID().String())}))
82+
return []Option{
83+
WithRPCLogger(logger),
84+
}
85+
})
7186

7287
var msgs []*Subscription
7388
for _, ps := range psubs {
@@ -4356,3 +4371,100 @@ outer:
43564371
}
43574372
}
43584373

4374+
func TestSkipPublishingToPeersWithPartialMessageSupport(t *testing.T) {
4375+
topicName := "test-topic"
4376+
4377+
// 3 hosts.
4378+
// hosts[0]: Publisher. Supports partial messages
4379+
// hosts[1]: Subscriber. Supports partial messages
4380+
// hosts[2]: Alternate publisher. Does not support partial messages. Only
4381+
// connected to hosts[0]
4382+
hosts := getDefaultHosts(t, 3)
4383+
4384+
partialExt := make([]*partialmessages.PartialMessageExtension, 2)
4385+
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug}))
4386+
4387+
for i := range partialExt {
4388+
partialExt[i] = &partialmessages.PartialMessageExtension{
4389+
Logger: logger,
4390+
ValidateRPC: func(from peer.ID, rpc *pb.PartialMessagesExtension) error {
4391+
return nil
4392+
},
4393+
EagerIWantLimitPerHeartbeat: 0,
4394+
IWantLimitPerHeartbeat: 1,
4395+
NewPartialMessage: func(topic string, groupID []byte) (partialmessages.PartialMessage, error) {
4396+
return &minimalTestPartialMessage{
4397+
Group: groupID,
4398+
onExtended: func(m *minimalTestPartialMessage) {
4399+
t.Logf("Received new part and extended partial message")
4400+
},
4401+
}, nil
4402+
},
4403+
}
4404+
}
4405+
4406+
psubs := make([]*PubSub, 0, len(hosts)-1)
4407+
for i, h := range hosts[:2] {
4408+
psub := getGossipsub(context.Background(), h, WithPartialMessagesExtension(partialExt[i]))
4409+
psubs = append(psubs, psub)
4410+
}
4411+
4412+
nonPartialPubsub := getGossipsub(context.Background(), hosts[2])
4413+
4414+
denseConnect(t, hosts[:2])
4415+
time.Sleep(2 * time.Second)
4416+
4417+
// Connect nonPartialPubsub to the publisher
4418+
connect(t, hosts[0], hosts[2])
4419+
4420+
var topics []*Topic
4421+
var subs []*Subscription
4422+
for _, psub := range psubs {
4423+
topic, err := psub.Join(topicName, WithSkipPublishingToPartialMessageCapablePeers())
4424+
if err != nil {
4425+
t.Fatal(err)
4426+
}
4427+
topics = append(topics, topic)
4428+
s, err := topic.Subscribe()
4429+
if err != nil {
4430+
t.Fatal(err)
4431+
}
4432+
subs = append(subs, s)
4433+
}
4434+
4435+
topicForNonPartial, err := nonPartialPubsub.Join(topicName)
4436+
if err != nil {
4437+
t.Fatal(err)
4438+
}
4439+
4440+
// Wait for subscriptions to propagate
4441+
time.Sleep(time.Second)
4442+
4443+
topics[0].Publish(context.Background(), []byte("Hello"))
4444+
4445+
// Publish from another peer, the publisher (psub[0]) should not forward this to psub[1].
4446+
topicForNonPartial.Publish(context.Background(), []byte("from non-partial"))
4447+
4448+
recvdMessage := make(chan struct{}, 1)
4449+
ctx, cancel := context.WithCancel(context.Background())
4450+
defer cancel()
4451+
go func() {
4452+
msg, err := subs[1].Next(ctx)
4453+
if err == context.Canceled {
4454+
return
4455+
}
4456+
if err != nil {
4457+
t.Log(err)
4458+
t.Fail()
4459+
return
4460+
}
4461+
t.Log("Received msg", string(msg.Data))
4462+
recvdMessage <- struct{}{}
4463+
}()
4464+
4465+
select {
4466+
case <-recvdMessage:
4467+
t.Fatal("Received message")
4468+
case <-time.After(2 * time.Second):
4469+
}
4470+
}

pubsub.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1365,7 +1365,7 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) {
13651365
continue
13661366
}
13671367

1368-
msg := &Message{pmsg, "", rpc.from, nil, false}
1368+
msg := &Message{Message: pmsg, ID: "", ReceivedFrom: rpc.from, ValidatorData: nil, Local: false}
13691369
if p.shouldPush(msg) {
13701370
toPush = append(toPush, msg)
13711371
}
@@ -1504,7 +1504,16 @@ type rmTopicReq struct {
15041504
resp chan error
15051505
}
15061506

1507-
type TopicOptions struct{}
1507+
type TopicOptions struct {
1508+
SkipPublishingToPartialMessageCapablePeers bool
1509+
}
1510+
1511+
func WithSkipPublishingToPartialMessageCapablePeers() TopicOpt {
1512+
return func(t *Topic) error {
1513+
t.skipPublishingToPartialMessageCapablePeers = true
1514+
return nil
1515+
}
1516+
}
15081517

15091518
type TopicOpt func(t *Topic) error
15101519

topic.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ type Topic struct {
3232

3333
mux sync.RWMutex
3434
closed bool
35+
36+
skipPublishingToPartialMessageCapablePeers bool
3537
}
3638

3739
// String returns the topic associated with t
@@ -348,7 +350,14 @@ func (t *Topic) validate(ctx context.Context, data []byte, opts ...PubOpt) (*Mes
348350
}
349351
}
350352

351-
msg := &Message{m, "", t.p.host.ID(), pub.validatorData, pub.local}
353+
msg := &Message{
354+
Message: m,
355+
ID: "",
356+
ReceivedFrom: t.p.host.ID(),
357+
ValidatorData: pub.validatorData,
358+
Local: pub.local,
359+
}
360+
352361
select {
353362
case t.p.eval <- func() {
354363
t.p.rt.Preprocess(t.p.host.ID(), []*Message{msg})

0 commit comments

Comments
 (0)