Skip to content

Commit d81f7d9

Browse files
committed
node/meta: validate objects
* add new "locked by" index to raw indexes * make MPT keys be placed only after they are put to DB (and validated) * prohibit removal of non-existent objects * prohibit removal of locked objects * prohibit locking of non-existent objects Closes #3178. Signed-off-by: Pavel Karpy <[email protected]>
1 parent 96b5b43 commit d81f7d9

File tree

4 files changed

+267
-97
lines changed

4 files changed

+267
-97
lines changed

pkg/services/meta/blocks.go

Lines changed: 9 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,8 @@ package meta
33
import (
44
"context"
55
"fmt"
6-
"sync"
76

87
"github.com/nspcc-dev/neo-go/pkg/core/block"
9-
"github.com/nspcc-dev/neo-go/pkg/core/mpt"
108
"github.com/nspcc-dev/neo-go/pkg/neorpc"
119
"go.uber.org/zap"
1210
"golang.org/x/sync/errgroup"
@@ -59,62 +57,18 @@ func (m *Meta) handleBlock(ctx context.Context, b *block.Header) error {
5957
evsByStorage[st] = append(evsByStorage[st], ev)
6058
}
6159

62-
var wg sync.WaitGroup
63-
wg.Add(1)
64-
go func() {
65-
defer wg.Done()
60+
var wg errgroup.Group
61+
wg.SetLimit(1024)
6662

67-
for st, ee := range evsByStorage {
68-
st.putMPTIndexes(ee)
69-
}
70-
}()
71-
wg.Add(1)
72-
go func() {
73-
defer wg.Done()
74-
75-
var internalWg errgroup.Group
76-
internalWg.SetLimit(1024)
77-
78-
for st, evs := range evsByStorage {
79-
internalWg.Go(func() error {
80-
err := st.putRawIndexes(ctx, evs, m.net)
81-
if err != nil {
82-
l.Error("failed to put raw indexes", zap.String("storage", st.path), zap.Error(err))
83-
}
84-
85-
// do not stop other routines ever
86-
return nil
87-
})
88-
}
89-
90-
// errors are logged, no errors are returned to WG
91-
_ = internalWg.Wait()
92-
}()
93-
94-
wg.Wait()
95-
96-
for st := range evsByStorage {
97-
// TODO: parallelize depending on what can parallelize well
98-
99-
st.m.Lock()
100-
101-
root := st.mpt.StateRoot()
102-
st.mpt.Store.Put([]byte{rootKey}, root[:])
103-
p := st.path
104-
105-
_, err := st.mpt.PutBatch(mpt.MapToMPTBatch(st.mptOpsBatch))
106-
if err != nil {
107-
st.m.Unlock()
108-
return fmt.Errorf("put batch for %d block to %q storage: %w", ind, p, err)
109-
}
110-
111-
clear(st.mptOpsBatch)
112-
113-
st.m.Unlock()
114-
115-
st.mpt.Flush(ind)
63+
for st, evs := range evsByStorage {
64+
wg.Go(func() error {
65+
st.putObjects(ctx, l.With(zap.String("storage", st.path)), ind, evs, m.net)
66+
return nil
67+
})
11668
}
11769

70+
// errors are logged, no errors are returned to WG
71+
_ = wg.Wait()
11872
l.Debug("handled block successfully")
11973

12074
return nil

pkg/services/meta/containers.go

Lines changed: 135 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"context"
66
"errors"
77
"fmt"
8+
"maps"
89
"math/big"
910
"os"
1011
"path"
@@ -21,7 +22,7 @@ import (
2122
objectsdk "github.com/nspcc-dev/neofs-sdk-go/object"
2223
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
2324
"github.com/nspcc-dev/neofs-sdk-go/version"
24-
"golang.org/x/sync/errgroup"
25+
"go.uber.org/zap"
2526
)
2627

2728
type containerStorage struct {
@@ -50,11 +51,46 @@ func (s *containerStorage) drop() error {
5051
return nil
5152
}
5253

53-
func (s *containerStorage) putMPTIndexes(ee []objEvent) {
54+
type eventWithMptKVs struct {
55+
ev objEvent
56+
additionalKVs map[string][]byte
57+
}
58+
59+
func (s *containerStorage) putObjects(ctx context.Context, l *zap.Logger, bInd uint32, ee []objEvent, net NeoFSNetwork) {
5460
s.m.Lock()
5561
defer s.m.Unlock()
5662

57-
for _, e := range ee {
63+
// raw indexes are responsible for object validation and only after
64+
// object is taken as a valid one, it goes to the slower-on-read MPT
65+
// storage via objCh
66+
objCh := make(chan eventWithMptKVs, len(ee))
67+
68+
var wg sync.WaitGroup
69+
wg.Add(1)
70+
go func() {
71+
defer wg.Done()
72+
err := s.putRawIndexes(ctx, l, ee, net, objCh)
73+
if err != nil {
74+
l.Error("failed to put raw indexes", zap.Error(err))
75+
}
76+
}()
77+
wg.Add(1)
78+
go func() {
79+
defer wg.Done()
80+
err := s.putMPTIndexes(bInd, objCh)
81+
if err != nil {
82+
l.Error("failed to put mpt indexes", zap.Error(err))
83+
}
84+
}()
85+
wg.Wait()
86+
}
87+
88+
// lock should be taken.
89+
func (s *containerStorage) putMPTIndexes(bInd uint32, ch <-chan eventWithMptKVs) error {
90+
for evWithKeys := range ch {
91+
maps.Copy(s.mptOpsBatch, evWithKeys.additionalKVs)
92+
93+
e := evWithKeys.ev
5894
commsuffix := e.oID[:]
5995

6096
// batching that is implemented for MPT ignores key's first byte
@@ -77,61 +113,124 @@ func (s *containerStorage) putMPTIndexes(ee []objEvent) {
77113
s.mptOpsBatch[string(append([]byte{0, typeIndex}, commsuffix...))] = []byte{byte(e.typ)}
78114
}
79115
}
116+
117+
root := s.mpt.StateRoot()
118+
s.mpt.Store.Put([]byte{rootKey}, root[:])
119+
120+
_, err := s.mpt.PutBatch(mpt.MapToMPTBatch(s.mptOpsBatch))
121+
if err != nil {
122+
return fmt.Errorf("put batch to MPT storage: %w", err)
123+
}
124+
clear(s.mptOpsBatch)
125+
126+
s.mpt.Flush(bInd)
127+
128+
return nil
80129
}
81130

82-
func (s *containerStorage) putRawIndexes(ctx context.Context, ee []objEvent, net NeoFSNetwork) error {
83-
var wg errgroup.Group
84-
wg.SetLimit(10)
85-
objects := make([]objectsdk.Object, len(ee))
131+
// lock should be taken.
132+
func (s *containerStorage) putRawIndexes(ctx context.Context, l *zap.Logger, ee []objEvent, net NeoFSNetwork, res chan<- eventWithMptKVs) (finalErr error) {
133+
batch := make(map[string][]byte)
134+
defer func() {
135+
close(res)
86136

87-
for i, e := range ee {
88-
wg.Go(func() error {
89-
h, err := net.Head(ctx, e.cID, e.oID)
137+
if finalErr == nil && len(batch) > 0 {
138+
err := s.db.PutChangeSet(batch, nil)
90139
if err != nil {
91-
// TODO define behavior with status (non-network) errors; maybe it is near #3140
92-
return fmt.Errorf("HEAD object: %w", err)
140+
finalErr = fmt.Errorf("put change set to DB: %w", err)
93141
}
142+
}
143+
}()
94144

95-
objects[i] = h
96-
return nil
97-
})
98-
}
145+
for _, e := range ee {
146+
err := isOpAllowed(s.db, e)
147+
if err != nil {
148+
l.Warn("skip object", zap.Stringer("oid", e.oID), zap.String("reason", err.Error()))
149+
continue
150+
}
99151

100-
err := wg.Wait()
101-
if err != nil {
102-
return err
103-
}
152+
evWithMpt := eventWithMptKVs{ev: e}
104153

105-
s.m.Lock()
106-
defer s.m.Unlock()
107-
batch := make(map[string][]byte)
154+
h, err := net.Head(ctx, e.cID, e.oID)
155+
if err != nil {
156+
// TODO define behavior with status (non-network) errors; maybe it is near #3140
157+
return fmt.Errorf("HEAD %s object: %w", e.oID, err)
158+
}
108159

109-
for i, e := range ee {
110160
commsuffix := e.oID[:]
111161

112162
batch[string(append([]byte{oidIndex}, commsuffix...))] = []byte{}
113163
if len(e.deletedObjects) > 0 {
114164
batch[string(append([]byte{deletedIndex}, commsuffix...))] = e.deletedObjects
115-
err = deleteObjectsOps(batch, s.mptOpsBatch, s.db, e.deletedObjects)
165+
evWithMpt.additionalKVs, err = deleteObjectsOps(batch, s.db, e.deletedObjects)
116166
if err != nil {
117-
return fmt.Errorf("cleaning operations for %s object: %w", e.oID, err)
167+
l.Error("cleaning deleted object", zap.Stringer("oid", e.oID), zap.Error(err))
168+
continue
118169
}
119170
}
120171
if len(e.lockedObjects) > 0 {
121172
batch[string(append([]byte{lockedIndex}, commsuffix...))] = e.lockedObjects
173+
174+
for locked := range slices.Chunk(e.lockedObjects, oid.Size) {
175+
batch[string(append([]byte{lockedByIndex}, locked...))] = commsuffix
176+
}
122177
}
123178

124-
err = object.VerifyHeaderForMetadata(objects[i])
179+
err = object.VerifyHeaderForMetadata(h)
125180
if err != nil {
126-
return fmt.Errorf("invalid %s header: %w", e.oID, err)
181+
l.Error("header verification", zap.Stringer("oid", e.oID), zap.Error(err))
182+
continue
127183
}
128184

129-
fillObjectIndex(batch, objects[i])
185+
res <- evWithMpt
186+
187+
fillObjectIndex(batch, h)
130188
}
131189

132-
err = s.db.PutChangeSet(batch, nil)
133-
if err != nil {
134-
return fmt.Errorf("put change set to DB: %w", err)
190+
return finalErr
191+
}
192+
193+
func isOpAllowed(db storage.Store, e objEvent) error {
194+
if len(e.deletedObjects) == 0 && len(e.lockedObjects) == 0 {
195+
return nil
196+
}
197+
198+
key := make([]byte, 1+oid.Size)
199+
200+
for obj := range slices.Chunk(e.deletedObjects, oid.Size) {
201+
copy(key[1:], obj)
202+
203+
// delete object that does not exist
204+
key[0] = oidIndex
205+
_, err := db.Get(key)
206+
if err != nil {
207+
if errors.Is(err, storage.ErrKeyNotFound) {
208+
return fmt.Errorf("%s object-to-delete is missing", oid.ID(obj))
209+
}
210+
return fmt.Errorf("%s object-to-delete's presence check: %w", oid.ID(obj), err)
211+
}
212+
213+
// delete object that is locked
214+
key[0] = lockedByIndex
215+
v, err := db.Get(key)
216+
if err != nil {
217+
if errors.Is(err, storage.ErrKeyNotFound) {
218+
continue
219+
}
220+
return fmt.Errorf("%s object-to-delete's lock status check: %w", oid.ID(obj), err)
221+
}
222+
return fmt.Errorf("%s object-to-delete is locked by %s", oid.ID(obj), oid.ID(v))
223+
}
224+
225+
for obj := range slices.Chunk(e.lockedObjects, oid.Size) {
226+
copy(key[1:], obj)
227+
228+
// lock object that does not exist
229+
key[0] = oidIndex
230+
_, err := db.Get(key)
231+
if err != nil {
232+
return fmt.Errorf("%s object-to-lock's presence check: %w", oid.ID(obj), err)
233+
}
135234
}
136235

137236
return nil
@@ -192,8 +291,9 @@ func fillObjectIndex(batch map[string][]byte, h objectsdk.Object) {
192291
}
193292
}
194293

195-
func deleteObjectsOps(dbKV, mptKV map[string][]byte, s storage.Store, objects []byte) error {
294+
func deleteObjectsOps(dbKV map[string][]byte, s storage.Store, objects []byte) (map[string][]byte, error) {
196295
rng := storage.SeekRange{}
296+
mptKV := make(map[string][]byte)
197297

198298
// nil value means "delete" operation
199299

@@ -271,11 +371,11 @@ func deleteObjectsOps(dbKV, mptKV map[string][]byte, s storage.Store, objects []
271371
return true
272372
})
273373
if err != nil {
274-
return err
374+
return nil, err
275375
}
276376
}
277377

278-
return nil
378+
return mptKV, nil
279379
}
280380

281381
// lastObjectKey returns the least possible key in sorted DB list that

pkg/services/meta/notifications.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ outer:
259259
}
260260

261261
const (
262-
// MPT key prefixes.
262+
// MPT-only key prefixes.
263263
oidIndex = iota
264264
attrIntToOIDIndex
265265
attrPlainToOIDIndex
@@ -271,6 +271,9 @@ const (
271271
lockedIndex
272272
typeIndex
273273

274+
// storage-only key prefixes.
275+
lockedByIndex
276+
274277
lastEnumIndex
275278
)
276279

0 commit comments

Comments
 (0)