Skip to content

Commit 18e5264

Browse files
committed
meta: handle container events via blocks
Do not subscribe for containers additionally, use blocks as a single source of notifications. Now meta service is always in one of the two mutually exclusive state: 1. IDLE: subscribed to new containers event and not subscribed to blocks 2. ACTIVE: subscribed to blocks and not subscribed to new containers Refs #3175, closes #3259. Signed-off-by: Pavel Karpy <[email protected]>
1 parent 1cc30df commit 18e5264

File tree

4 files changed

+145
-77
lines changed

4 files changed

+145
-77
lines changed

pkg/services/meta/blocks.go

Lines changed: 75 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,9 @@ func (m *Meta) handleBlock(ctx context.Context, b *block.Header) error {
1717
l := m.l.With(zap.Stringer("block hash", h), zap.Uint32("index", ind))
1818
l.Debug("handling block")
1919

20-
evName := objPutEvName
2120
m.cliM.RLock()
2221
res, err := m.ws.GetBlockNotifications(h, &neorpc.NotificationFilter{
2322
Contract: &m.cnrH,
24-
Name: &evName,
2523
})
2624
if err != nil {
2725
m.cliM.RUnlock()
@@ -34,30 +32,88 @@ func (m *Meta) handleBlock(ctx context.Context, b *block.Header) error {
3432
return nil
3533
}
3634

37-
m.stM.RLock()
38-
defer m.stM.RUnlock()
39-
4035
evsByStorage := make(map[*containerStorage][]objEvent)
4136
for _, n := range res.Application {
42-
ev, err := parseObjNotification(n)
43-
if err != nil {
44-
l.Error("invalid object notification received", zap.Error(err))
45-
continue
46-
}
37+
l := m.l.With(zap.Stringer("tx", n.Container))
4738

48-
if magic := uint32(ev.network.Uint64()); magic != m.magicNumber {
49-
l.Warn("skipping object notification with wrong magic number", zap.Uint32("expected", m.magicNumber), zap.Uint32("got", magic))
50-
}
39+
switch n.Name {
40+
case objPutEvName:
41+
ev, err := parseObjNotification(n)
42+
if err != nil {
43+
l.Error("invalid object notification received", zap.Error(err))
44+
continue
45+
}
46+
47+
if magic := uint32(ev.network.Uint64()); magic != m.magicNumber {
48+
l.Warn("skipping object notification with wrong magic number", zap.Uint32("expected", m.magicNumber), zap.Uint32("got", magic))
49+
continue
50+
}
51+
52+
m.stM.RLock()
53+
st, ok := m.storages[ev.cID]
54+
m.stM.RUnlock()
55+
if !ok {
56+
l.Debug("skipping object notification", zap.Stringer("container", ev.cID))
57+
continue
58+
}
59+
60+
m.l.Debug("received object notification", zap.Stringer("address", oid.NewAddress(ev.cID, ev.oID)))
61+
62+
evsByStorage[st] = append(evsByStorage[st], ev)
63+
case cnrDeleteName:
64+
ev, err := parseCnrNotification(n)
65+
if err != nil {
66+
l.Error("invalid container removal notification received", zap.Error(err))
67+
continue
68+
}
69+
70+
m.stM.RLock()
71+
_, ok := m.storages[ev.cID]
72+
m.stM.RUnlock()
73+
if !ok {
74+
l.Debug("skipping container notification", zap.Stringer("container", ev.cID))
75+
continue
76+
}
5177

52-
st, ok := m.storages[ev.cID]
53-
if !ok {
54-
l.Debug("skipping object notification", zap.Stringer("inactual container", ev.cID))
78+
err = m.dropContainer(ev.cID)
79+
if err != nil {
80+
l.Error("deleting container failed", zap.Error(err))
81+
continue
82+
}
83+
84+
l.Debug("deleted container", zap.Stringer("cID", ev.cID))
85+
case cnrPutName:
86+
ev, err := parseCnrNotification(n)
87+
if err != nil {
88+
l.Error("invalid container notification received", zap.Error(err))
89+
continue
90+
}
91+
92+
ok, err := m.net.IsMineWithMeta(ev.cID)
93+
if err != nil {
94+
l.Error("can't get container data", zap.Error(err))
95+
continue
96+
}
97+
if !ok {
98+
l.Debug("skip new inactual container", zap.Stringer("cid", ev.cID))
99+
continue
100+
}
101+
102+
err = m.addContainer(ev.cID)
103+
if err != nil {
104+
l.Error("could not handle new container", zap.Stringer("cID", ev.cID), zap.Error(err))
105+
continue
106+
}
107+
108+
l.Debug("added container storage", zap.Stringer("cID", ev.cID))
109+
default:
110+
l.Debug("skip notification", zap.String("event name", n.Name))
55111
continue
56112
}
113+
}
57114

58-
m.l.Debug("received object notification", zap.Stringer("address", oid.NewAddress(ev.cID, ev.oID)))
59-
60-
evsByStorage[st] = append(evsByStorage[st], ev)
115+
if len(evsByStorage) == 0 {
116+
return nil
61117
}
62118

63119
var wg errgroup.Group

pkg/services/meta/meta.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,8 @@ type Meta struct {
7979
cliM sync.RWMutex
8080
ws wsClient
8181
blockSubID string
82+
cnrSubID string
8283
bCh chan *block.Header
83-
cnrDelEv chan *state.ContainedNotificationEvent
8484
cnrPutEv chan *state.ContainedNotificationEvent
8585
epochEv chan *state.ContainedNotificationEvent
8686

@@ -206,7 +206,6 @@ func New(p Parameters) (*Meta, error) {
206206
endpoints: p.NeoEnpoints,
207207
timeout: p.Timeout,
208208
bCh: make(chan *block.Header, notificationBuffSize),
209-
cnrDelEv: make(chan *state.ContainedNotificationEvent, notificationBuffSize),
210209
cnrPutEv: make(chan *state.ContainedNotificationEvent, notificationBuffSize),
211210
epochEv: make(chan *state.ContainedNotificationEvent, notificationBuffSize),
212211
blockBuff: make(chan *block.Header, blockBuffSize),
@@ -262,9 +261,14 @@ func (m *Meta) Run(ctx context.Context) error {
262261
if err != nil {
263262
return fmt.Errorf("block subscription: %w", err)
264263
}
264+
} else {
265+
m.cnrSubID, err = m.subscribeForNewContainers(m.cnrPutEv)
266+
if err != nil {
267+
return fmt.Errorf("new container subscription: %w", err)
268+
}
265269
}
266270

267-
err = m.subscribeForMeta()
271+
err = m.subscribeEvents()
268272
if err != nil {
269273
return fmt.Errorf("subscribe for meta notifications: %w", err)
270274
}

pkg/services/meta/notifications.go

Lines changed: 63 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -48,21 +48,32 @@ func (m *Meta) unsubscribeFromBlocks() {
4848
m.l.Debug("successfully unsubscribed from blocks")
4949
}
5050

51-
func (m *Meta) subscribeForMeta() error {
52-
cnrDeleteEv := cnrDeleteName
53-
_, err := m.ws.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &m.cnrH, Name: &cnrDeleteEv}, m.cnrDelEv)
54-
if err != nil {
55-
return fmt.Errorf("subscribe for container removal notifications: %w", err)
56-
}
51+
// subscribeForNewContainers requires [Meta.cliM] to be taken.
52+
func (m *Meta) subscribeForNewContainers(ch chan<- *state.ContainedNotificationEvent) (string, error) {
53+
m.l.Debug("subscribing for containers")
5754

5855
cnrPutEv := cnrPutName
59-
_, err = m.ws.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &m.cnrH, Name: &cnrPutEv}, m.cnrPutEv)
56+
return m.ws.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &m.cnrH, Name: &cnrPutEv}, ch)
57+
}
58+
59+
// unsubscribeFromNewContainers requires [Meta.cliM] to be taken.
60+
func (m *Meta) unsubscribeFromNewContainers() {
61+
m.l.Debug("unsubscribing from containers")
62+
63+
err := m.ws.Unsubscribe(m.cnrSubID)
6064
if err != nil {
61-
return fmt.Errorf("subscribe for container addition notifications: %w", err)
65+
m.l.Error("could not unsubscribe from containers", zap.Error(err))
66+
return
6267
}
6368

69+
m.cnrSubID = ""
70+
71+
m.l.Debug("successfully unsubscribed from containers")
72+
}
73+
74+
func (m *Meta) subscribeEvents() error {
6475
epochEv := newEpochName
65-
_, err = m.ws.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &m.netmapH, Name: &epochEv}, m.epochEv)
76+
_, err := m.ws.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &m.netmapH, Name: &epochEv}, m.epochEv)
6677
if err != nil {
6778
return fmt.Errorf("subscribe for epoch notifications: %w", err)
6879
}
@@ -84,7 +95,7 @@ func (m *Meta) listenNotifications(ctx context.Context) error {
8495
}
8596

8697
m.blockBuff <- h
87-
case aer, ok := <-m.cnrDelEv:
98+
case aer, ok := <-m.cnrPutEv:
8899
if !ok {
89100
err := m.reconnect(ctx)
90101
if err != nil {
@@ -94,44 +105,17 @@ func (m *Meta) listenNotifications(ctx context.Context) error {
94105
continue
95106
}
96107

97-
l := m.l.With(zap.Stringer("notification container", aer.Container))
98-
99-
ev, err := parseCnrNotification(aer)
100-
if err != nil {
101-
l.Error("invalid container notification received", zap.Error(err))
102-
continue
103-
}
104-
105-
m.stM.RLock()
106-
_, ok = m.storages[ev.cID]
107-
m.stM.RUnlock()
108-
if !ok {
109-
l.Debug("skipping container notification", zap.Stringer("inactual container", ev.cID))
110-
continue
111-
}
112-
113-
go func() {
114-
err = m.dropContainer(ev.cID)
115-
if err != nil {
116-
l.Error("deleting container failed", zap.Error(err))
117-
return
118-
}
119-
120-
l.Debug("deleted container", zap.Stringer("cID", ev.cID))
121-
}()
122-
case aer, ok := <-m.cnrPutEv:
123-
if !ok {
124-
err := m.reconnect(ctx)
125-
if err != nil {
126-
return err
127-
}
128-
108+
m.cliM.RLock()
109+
alreadyListenToContainers := m.blockSubID != ""
110+
m.cliM.RUnlock()
111+
if alreadyListenToContainers {
112+
// container will be handled
129113
continue
130114
}
131115

132116
l := m.l.With(zap.Stringer("notification container", aer.Container))
133117

134-
ev, err := parseCnrNotification(aer)
118+
ev, err := parseCnrNotification(*aer)
135119
if err != nil {
136120
l.Error("invalid container notification received", zap.Error(err))
137121
continue
@@ -204,14 +188,19 @@ func (m *Meta) reconnect(ctx context.Context) error {
204188
m.stM.RUnlock()
205189
return fmt.Errorf("subscription for blocks: %w", err)
206190
}
191+
} else {
192+
m.cnrPutEv = make(chan *state.ContainedNotificationEvent, notificationBuffSize)
193+
m.cnrSubID, err = m.subscribeForNewContainers(m.cnrPutEv)
194+
if err != nil {
195+
m.stM.RUnlock()
196+
return fmt.Errorf("subscription for containers: %w", err)
197+
}
207198
}
208199
m.stM.RUnlock()
209200

210-
m.cnrDelEv = make(chan *state.ContainedNotificationEvent, notificationBuffSize)
211-
m.cnrPutEv = make(chan *state.ContainedNotificationEvent, notificationBuffSize)
212201
m.epochEv = make(chan *state.ContainedNotificationEvent, notificationBuffSize)
213202

214-
err = m.subscribeForMeta()
203+
err = m.subscribeEvents()
215204
if err != nil {
216205
return fmt.Errorf("subscribe for meta notifications: %w", err)
217206
}
@@ -430,7 +419,7 @@ type cnrEvent struct {
430419
cID cid.ID
431420
}
432421

433-
func parseCnrNotification(ev *state.ContainedNotificationEvent) (cnrEvent, error) {
422+
func parseCnrNotification(ev state.ContainedNotificationEvent) (cnrEvent, error) {
434423
var res cnrEvent
435424

436425
arr, ok := ev.Item.Value().([]stackitem.Item)
@@ -481,7 +470,11 @@ func (m *Meta) dropContainer(cID cid.ID) error {
481470
if len(m.storages) == 0 {
482471
m.cliM.Lock()
483472
m.unsubscribeFromBlocks()
473+
m.cnrSubID, err = m.subscribeForNewContainers(m.cnrPutEv)
484474
m.cliM.Unlock()
475+
if err != nil {
476+
return fmt.Errorf("subscribing for new containers: %w", err)
477+
}
485478
}
486479

487480
return nil
@@ -500,6 +493,7 @@ func (m *Meta) addContainer(cID cid.ID) error {
500493
m.cliM.Unlock()
501494
return fmt.Errorf("blocks subscription: %w", err)
502495
}
496+
m.unsubscribeFromNewContainers()
503497

504498
m.cliM.Unlock()
505499
}
@@ -573,14 +567,29 @@ func (m *Meta) handleEpochNotification(e uint64) error {
573567
m.stM.Unlock()
574568

575569
m.cliM.Lock()
576-
if len(m.storages) > 0 && m.blockSubID == "" {
577-
m.blockSubID, err = m.subscribeForBlocks(m.bCh)
578-
if err != nil {
579-
m.cliM.Unlock()
580-
return fmt.Errorf("blocks subscription: %w", err)
570+
if len(m.storages) > 0 {
571+
if m.blockSubID == "" {
572+
m.blockSubID, err = m.subscribeForBlocks(m.bCh)
573+
if err != nil {
574+
m.cliM.Unlock()
575+
return fmt.Errorf("blocks subscription: %w", err)
576+
}
577+
}
578+
if m.cnrSubID != "" {
579+
m.unsubscribeFromBlocks()
580+
}
581+
} else {
582+
if m.blockSubID != "" {
583+
m.unsubscribeFromBlocks()
584+
}
585+
586+
if m.cnrSubID == "" {
587+
m.cnrSubID, err = m.subscribeForNewContainers(m.cnrPutEv)
588+
if err != nil {
589+
m.cliM.Unlock()
590+
return fmt.Errorf("containers subscription: %w", err)
591+
}
581592
}
582-
} else if len(m.storages) == 0 && m.blockSubID != "" {
583-
m.unsubscribeFromBlocks()
584593
}
585594
m.cliM.Unlock()
586595

pkg/services/meta/notifications_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,6 @@ func createAndRunTestMeta(t *testing.T, ws wsClient, network NeoFSNetwork) (*Met
202202
rootPath: t.TempDir(),
203203
magicNumber: 102938475,
204204
bCh: make(chan *block.Header),
205-
cnrDelEv: make(chan *state.ContainedNotificationEvent),
206205
cnrPutEv: make(chan *state.ContainedNotificationEvent),
207206
epochEv: make(chan *state.ContainedNotificationEvent),
208207
blockBuff: make(chan *block.Header, blockBuffSize),

0 commit comments

Comments
 (0)