Skip to content

Commit 1df0045

Browse files
authored
node/meta: do not keep block subscription without meta-containers (#3180)
2 parents 5e77c4c + 501f589 commit 1df0045

File tree

4 files changed

+128
-53
lines changed

4 files changed

+128
-53
lines changed

pkg/services/meta/blocks.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ func (m *Meta) handleBlock(b *block.Header) error {
3232
return nil
3333
}
3434

35-
m.m.RLock()
36-
defer m.m.RUnlock()
35+
m.stM.RLock()
36+
defer m.stM.RUnlock()
3737

3838
for _, n := range res.Application {
3939
ev, err := parseObjNotification(n)

pkg/services/meta/meta.go

+18-5
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ type wsClient interface {
4343

4444
ReceiveHeadersOfAddedBlocks(flt *neorpc.BlockFilter, rcvr chan<- *block.Header) (string, error)
4545
ReceiveExecutionNotifications(flt *neorpc.NotificationFilter, rcvr chan<- *state.ContainedNotificationEvent) (string, error)
46+
Unsubscribe(id string) error
4647

4748
Close()
4849
}
@@ -57,13 +58,14 @@ type Meta struct {
5758
cnrH util.Uint160
5859
cLister ContainerLister
5960

60-
m sync.RWMutex
61+
stM sync.RWMutex
6162
storages map[cid.ID]*containerStorage
6263

6364
timeout time.Duration
6465
magicNumber uint32
6566
cliM sync.RWMutex
6667
ws wsClient
68+
blockSubID string
6769
bCh chan *block.Header
6870
cnrDelEv chan *state.ContainedNotificationEvent
6971
cnrPutEv chan *state.ContainedNotificationEvent
@@ -182,15 +184,15 @@ func (m *Meta) Reload(p Parameters) error {
182184
// with [New]. Blocked until context is done.
183185
func (m *Meta) Run(ctx context.Context) error {
184186
defer func() {
185-
m.m.Lock()
187+
m.stM.Lock()
186188
for _, st := range m.storages {
187189
st.m.Lock()
188190
_ = st.db.Close()
189191
st.m.Unlock()
190192
}
191193
maps.Clear(m.storages)
192194

193-
m.m.Unlock()
195+
m.stM.Unlock()
194196
}()
195197

196198
var err error
@@ -206,6 +208,17 @@ func (m *Meta) Run(ctx context.Context) error {
206208
}
207209
m.magicNumber = uint32(v.Protocol.Network)
208210

211+
m.stM.RLock()
212+
hasContainers := len(m.storages) > 0
213+
m.stM.RUnlock()
214+
215+
if hasContainers {
216+
m.blockSubID, err = m.subscribeForBlocks(m.bCh)
217+
if err != nil {
218+
return fmt.Errorf("block subscription: %w", err)
219+
}
220+
}
221+
209222
err = m.subscribeForMeta()
210223
if err != nil {
211224
return fmt.Errorf("subscribe for meta notifications: %w", err)
@@ -229,7 +242,7 @@ func (m *Meta) flusher(ctx context.Context) {
229242
for {
230243
select {
231244
case <-t.C:
232-
m.m.RLock()
245+
m.stM.RLock()
233246

234247
var wg errgroup.Group
235248
wg.SetLimit(1024)
@@ -256,7 +269,7 @@ func (m *Meta) flusher(ctx context.Context) {
256269

257270
err := wg.Wait()
258271

259-
m.m.RUnlock()
272+
m.stM.RUnlock()
260273

261274
if err != nil {
262275
m.l.Error("storage flusher failed", zap.Error(err))

pkg/services/meta/notifications.go

+81-19
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,31 @@ const (
2525
newEpochName = "NewEpoch"
2626
)
2727

28-
func (m *Meta) subscribeForMeta() error {
29-
_, err := m.ws.ReceiveHeadersOfAddedBlocks(nil, m.bCh)
28+
// subscribeForBlocks reqauires [Meta.cliM] to be taken.
29+
func (m *Meta) subscribeForBlocks(ch chan<- *block.Header) (string, error) {
30+
m.l.Debug("subscribe for blocks")
31+
return m.ws.ReceiveHeadersOfAddedBlocks(nil, ch)
32+
}
33+
34+
func (m *Meta) unsubscribeFromBlocks() {
35+
var err error
36+
m.cliM.Lock()
37+
defer m.cliM.Unlock()
38+
39+
err = m.ws.Unsubscribe(m.blockSubID)
3040
if err != nil {
31-
return fmt.Errorf("subscribe for block headers: %w", err)
41+
m.l.Warn("could not unsubscribe from blocks", zap.String("ID", m.blockSubID))
42+
return
3243
}
3344

45+
m.blockSubID = ""
46+
47+
m.l.Debug("successfully unsubscribed from blocks")
48+
}
49+
50+
func (m *Meta) subscribeForMeta() error {
3451
cnrDeleteEv := cnrDeleteName
35-
_, err = m.ws.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &m.cnrH, Name: &cnrDeleteEv}, m.cnrDelEv)
52+
_, err := m.ws.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &m.cnrH, Name: &cnrDeleteEv}, m.cnrDelEv)
3653
if err != nil {
3754
return fmt.Errorf("subscribe for container removal notifications: %w", err)
3855
}
@@ -84,9 +101,9 @@ func (m *Meta) listenNotifications(ctx context.Context) error {
84101
continue
85102
}
86103

87-
m.m.RLock()
104+
m.stM.RLock()
88105
_, ok = m.storages[ev.cID]
89-
m.m.RUnlock()
106+
m.stM.RUnlock()
90107
if !ok {
91108
l.Debug("skipping container notification", zap.Stringer("inactual container", ev.cID))
92109
continue
@@ -128,16 +145,10 @@ func (m *Meta) listenNotifications(ctx context.Context) error {
128145
continue
129146
}
130147

131-
m.m.Lock()
132-
133-
st, err := storageForContainer(m.rootPath, ev.cID)
148+
err = m.addContainer(ev.cID)
134149
if err != nil {
135-
m.m.Unlock()
136-
return fmt.Errorf("open new storage for %s container: %w", ev.cID, err)
150+
return fmt.Errorf("could not handle new %s container: %w", ev.cID, err)
137151
}
138-
m.storages[ev.cID] = st
139-
140-
m.m.Unlock()
141152

142153
l.Debug("added container storage", zap.Stringer("cID", ev.cID))
143154
case aer, ok := <-m.epochEv:
@@ -184,7 +195,17 @@ func (m *Meta) reconnect(ctx context.Context) error {
184195
return fmt.Errorf("reconnecting to web socket: %w", err)
185196
}
186197

187-
m.bCh = make(chan *block.Header)
198+
m.stM.RLock()
199+
if len(m.storages) > 0 {
200+
m.bCh = make(chan *block.Header)
201+
m.blockSubID, err = m.subscribeForBlocks(m.bCh)
202+
if err != nil {
203+
m.stM.RUnlock()
204+
return fmt.Errorf("subscription for blocks: %w", err)
205+
}
206+
}
207+
m.stM.RUnlock()
208+
188209
m.cnrDelEv = make(chan *state.ContainedNotificationEvent)
189210
m.cnrPutEv = make(chan *state.ContainedNotificationEvent)
190211
m.epochEv = make(chan *state.ContainedNotificationEvent)
@@ -448,8 +469,8 @@ func parseCnrNotification(ev *state.ContainedNotificationEvent) (cnrEvent, error
448469
}
449470

450471
func (m *Meta) dropContainer(cID cid.ID) error {
451-
m.m.Lock()
452-
defer m.m.Unlock()
472+
m.stM.Lock()
473+
defer m.stM.Unlock()
453474

454475
st, ok := m.storages[cID]
455476
if !ok {
@@ -463,6 +484,36 @@ func (m *Meta) dropContainer(cID cid.ID) error {
463484

464485
delete(m.storages, cID)
465486

487+
if len(m.storages) == 0 {
488+
m.unsubscribeFromBlocks()
489+
}
490+
491+
return nil
492+
}
493+
494+
func (m *Meta) addContainer(cID cid.ID) error {
495+
var err error
496+
m.stM.Lock()
497+
defer m.stM.Unlock()
498+
499+
if len(m.storages) == 0 {
500+
m.cliM.Lock()
501+
502+
m.blockSubID, err = m.subscribeForBlocks(m.bCh)
503+
if err != nil {
504+
m.cliM.Unlock()
505+
return fmt.Errorf("blocks subscription: %w", err)
506+
}
507+
508+
m.cliM.Unlock()
509+
}
510+
511+
st, err := storageForContainer(m.rootPath, cID)
512+
if err != nil {
513+
return fmt.Errorf("open new storage for %s container: %w", cID, err)
514+
}
515+
m.storages[cID] = st
516+
466517
return nil
467518
}
468519

@@ -493,8 +544,8 @@ func (m *Meta) handleEpochNotification(e int64) error {
493544
return fmt.Errorf("list containers: %w", err)
494545
}
495546

496-
m.m.Lock()
497-
defer m.m.Unlock()
547+
m.stM.Lock()
548+
defer m.stM.Unlock()
498549

499550
for cID, st := range m.storages {
500551
_, ok := cnrsNetwork[cID]
@@ -520,6 +571,17 @@ func (m *Meta) handleEpochNotification(e int64) error {
520571
m.storages[cID] = st
521572
}
522573

574+
m.cliM.Lock()
575+
defer m.cliM.Unlock()
576+
if len(m.storages) > 0 && m.blockSubID == "" {
577+
m.blockSubID, err = m.subscribeForBlocks(m.bCh)
578+
if err != nil {
579+
return fmt.Errorf("blocks subscription: %w", err)
580+
}
581+
} else if len(m.storages) == 0 && m.blockSubID != "" {
582+
m.unsubscribeFromBlocks()
583+
}
584+
523585
m.l.Debug("handled new epoch successfully", zap.Int64("epoch", e))
524586

525587
return nil

pkg/services/meta/notifications_test.go

+27-27
Original file line numberDiff line numberDiff line change
@@ -99,10 +99,23 @@ func checkDBFiles(t *testing.T, path string, cnrs map[cid.ID]struct{}) {
9999

100100
type testWS struct {
101101
m sync.RWMutex
102+
bCh chan<- *block.Header
102103
notifications []state.ContainedNotificationEvent
103104
err error
104105
}
105106

107+
func (t *testWS) blockCh() chan<- *block.Header {
108+
t.m.RLock()
109+
defer t.m.RUnlock()
110+
111+
return t.bCh
112+
}
113+
114+
func (t *testWS) Unsubscribe(id string) error {
115+
// TODO implement me
116+
panic("not expected for now")
117+
}
118+
106119
func (t *testWS) swapResults(notifications []state.ContainedNotificationEvent, err error) {
107120
t.m.Lock()
108121
defer t.m.Unlock()
@@ -125,7 +138,11 @@ func (t *testWS) GetVersion() (*result.Version, error) {
125138
}
126139

127140
func (t *testWS) ReceiveHeadersOfAddedBlocks(flt *neorpc.BlockFilter, rcvr chan<- *block.Header) (string, error) {
128-
panic("not expected for now")
141+
t.m.Lock()
142+
t.bCh = rcvr
143+
t.m.Unlock()
144+
145+
return "", nil
129146
}
130147

131148
func (t *testWS) ReceiveExecutionNotifications(flt *neorpc.NotificationFilter, rcvr chan<- *state.ContainedNotificationEvent) (string, error) {
@@ -197,9 +214,9 @@ func checkObject(t *testing.T, m *Meta, cID cid.ID, oID, firstPart, previousPart
197214
return true
198215
}
199216

200-
m.m.RLock()
217+
m.stM.RLock()
201218
st := m.storages[cID]
202-
m.m.RUnlock()
219+
m.stM.RUnlock()
203220

204221
st.m.RLock()
205222
defer st.m.RUnlock()
@@ -312,33 +329,15 @@ func TestObjectPut(t *testing.T) {
312329
metaStack, err := stackitem.Deserialize(metaRaw)
313330
require.NoError(t, err)
314331

315-
stopTest := make(chan struct{})
316-
t.Cleanup(func() {
317-
close(stopTest)
318-
})
319-
go func() {
320-
var i uint32 = 1
321-
tick := time.NewTicker(100 * time.Millisecond)
322-
for {
323-
select {
324-
case <-tick.C:
325-
m.bCh <- &block.Header{
326-
Index: i,
327-
}
328-
i++
329-
case <-stopTest:
330-
return
331-
}
332-
}
333-
}()
332+
bCH := ws.blockCh()
334333

335334
ws.swapResults(append(ws.notifications, state.ContainedNotificationEvent{
336335
NotificationEvent: state.NotificationEvent{
337336
Name: objPutEvName,
338337
Item: stackitem.NewArray([]stackitem.Item{stackitem.Make(cID[:]), stackitem.Make(oID[:]), metaStack}),
339338
},
340339
}), nil)
341-
m.bCh <- &block.Header{Index: 0}
340+
bCH <- &block.Header{Index: 0}
342341

343342
require.Eventually(t, func() bool {
344343
return checkObject(t, m, cID, oID, fPart, pPart, size, typ, deleted, nil, testVUB, m.magicNumber)
@@ -354,13 +353,14 @@ func TestObjectPut(t *testing.T) {
354353
metaStack, err := stackitem.Deserialize(metaRaw)
355354
require.NoError(t, err)
356355

356+
bCH := ws.blockCh()
357357
ws.swapResults(append(ws.notifications, state.ContainedNotificationEvent{
358358
NotificationEvent: state.NotificationEvent{
359359
Name: objPutEvName,
360360
Item: stackitem.NewArray([]stackitem.Item{stackitem.Make(cID[:]), stackitem.Make(objToDeleteOID[:]), metaStack}),
361361
},
362362
}), nil)
363-
m.bCh <- &block.Header{Index: 0}
363+
bCH <- &block.Header{Index: 0}
364364

365365
require.Eventually(t, func() bool {
366366
return checkObject(t, m, cID, objToDeleteOID, oid.ID{}, oid.ID{}, size, objectsdk.TypeRegular, nil, nil, testVUB, m.magicNumber)
@@ -381,12 +381,12 @@ func TestObjectPut(t *testing.T) {
381381
Item: stackitem.NewArray([]stackitem.Item{stackitem.Make(tsCID[:]), stackitem.Make(tsOID[:]), metaStack}),
382382
},
383383
}), nil)
384-
m.bCh <- &block.Header{Index: 0}
384+
bCH <- &block.Header{Index: 0}
385385

386386
require.Eventually(t, func() bool {
387-
m.m.RLock()
387+
m.stM.RLock()
388388
st := m.storages[tsCID]
389-
m.m.RUnlock()
389+
m.stM.RUnlock()
390390

391391
st.m.RLock()
392392
defer st.m.RUnlock()

0 commit comments

Comments
 (0)