Skip to content

Commit 7e918e3

Browse files
committed
node/meta: validate objects
* add new "locked by" index to raw indexes * make MPT keys be placed only after they are put to DB (and validated) * prohibit removal of non-existent objects * prohibit removal of locked objects * prohibit locking of non-existent objects Closes #3178. Signed-off-by: Pavel Karpy <[email protected]>
1 parent 42a076e commit 7e918e3

File tree

4 files changed

+265
-97
lines changed

4 files changed

+265
-97
lines changed

pkg/services/meta/blocks.go

+9-55
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,8 @@ package meta
33
import (
44
"context"
55
"fmt"
6-
"sync"
76

87
"github.com/nspcc-dev/neo-go/pkg/core/block"
9-
"github.com/nspcc-dev/neo-go/pkg/core/mpt"
108
"github.com/nspcc-dev/neo-go/pkg/neorpc"
119
"go.uber.org/zap"
1210
"golang.org/x/sync/errgroup"
@@ -59,62 +57,18 @@ func (m *Meta) handleBlock(ctx context.Context, b *block.Header) error {
5957
evsByStorage[st] = append(evsByStorage[st], ev)
6058
}
6159

62-
var wg sync.WaitGroup
63-
wg.Add(1)
64-
go func() {
65-
defer wg.Done()
60+
var wg errgroup.Group
61+
wg.SetLimit(1024)
6662

67-
for st, ee := range evsByStorage {
68-
st.putMPTIndexes(ee)
69-
}
70-
}()
71-
wg.Add(1)
72-
go func() {
73-
defer wg.Done()
74-
75-
var internalWg errgroup.Group
76-
internalWg.SetLimit(1024)
77-
78-
for st, evs := range evsByStorage {
79-
internalWg.Go(func() error {
80-
err := st.putRawIndexes(ctx, evs, m.net)
81-
if err != nil {
82-
l.Error("failed to put raw indexes", zap.String("storage", st.path), zap.Error(err))
83-
}
84-
85-
// do not stop other routines ever
86-
return nil
87-
})
88-
}
89-
90-
// errors are logged, no errors are returned to WG
91-
_ = internalWg.Wait()
92-
}()
93-
94-
wg.Wait()
95-
96-
for st := range evsByStorage {
97-
// TODO: parallelize depending on what can parallelize well
98-
99-
st.m.Lock()
100-
101-
root := st.mpt.StateRoot()
102-
st.mpt.Store.Put([]byte{rootKey}, root[:])
103-
p := st.path
104-
105-
_, err := st.mpt.PutBatch(mpt.MapToMPTBatch(st.mptOpsBatch))
106-
if err != nil {
107-
st.m.Unlock()
108-
return fmt.Errorf("put batch for %d block to %q storage: %w", ind, p, err)
109-
}
110-
111-
clear(st.mptOpsBatch)
112-
113-
st.m.Unlock()
114-
115-
st.mpt.Flush(ind)
63+
for st, evs := range evsByStorage {
64+
wg.Go(func() error {
65+
st.putObjects(ctx, l.With(zap.String("storage", st.path)), ind, evs, m.net)
66+
return nil
67+
})
11668
}
11769

70+
// errors are logged, no errors are returned to WG
71+
_ = wg.Wait()
11872
l.Debug("handled block successfully")
11973

12074
return nil

pkg/services/meta/containers.go

+133-35
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"context"
66
"errors"
77
"fmt"
8+
"maps"
89
"math/big"
910
"os"
1011
"path"
@@ -21,7 +22,7 @@ import (
2122
objectsdk "github.com/nspcc-dev/neofs-sdk-go/object"
2223
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
2324
"github.com/nspcc-dev/neofs-sdk-go/version"
24-
"golang.org/x/sync/errgroup"
25+
"go.uber.org/zap"
2526
)
2627

2728
type containerStorage struct {
@@ -50,11 +51,46 @@ func (s *containerStorage) drop() error {
5051
return nil
5152
}
5253

53-
func (s *containerStorage) putMPTIndexes(ee []objEvent) {
54+
type eventWithMptKVs struct {
55+
ev objEvent
56+
additionalKVs map[string][]byte
57+
}
58+
59+
func (s *containerStorage) putObjects(ctx context.Context, l *zap.Logger, bInd uint32, ee []objEvent, net NeoFSNetwork) {
5460
s.m.Lock()
5561
defer s.m.Unlock()
5662

57-
for _, e := range ee {
63+
// raw indexes are responsible for object validation and only after
64+
// object is taken as a valid one, it goes to the slower-on-read MPT
65+
// storage via objCh
66+
objCh := make(chan eventWithMptKVs, len(ee))
67+
68+
var wg sync.WaitGroup
69+
wg.Add(1)
70+
go func() {
71+
defer wg.Done()
72+
err := s.putRawIndexes(ctx, l, ee, net, objCh)
73+
if err != nil {
74+
l.Error("failed to put raw indexes", zap.Error(err))
75+
}
76+
}()
77+
wg.Add(1)
78+
go func() {
79+
defer wg.Done()
80+
err := s.putMPTIndexes(bInd, objCh)
81+
if err != nil {
82+
l.Error("failed to put mpt indexes", zap.Error(err))
83+
}
84+
}()
85+
wg.Wait()
86+
}
87+
88+
// lock should be taken.
89+
func (s *containerStorage) putMPTIndexes(bInd uint32, ch <-chan eventWithMptKVs) error {
90+
for evWithKeys := range ch {
91+
maps.Copy(s.mptOpsBatch, evWithKeys.additionalKVs)
92+
93+
e := evWithKeys.ev
5894
commsuffix := e.oID[:]
5995

6096
// batching that is implemented for MPT ignores key's first byte
@@ -77,61 +113,122 @@ func (s *containerStorage) putMPTIndexes(ee []objEvent) {
77113
s.mptOpsBatch[string(append([]byte{0, typeIndex}, commsuffix...))] = []byte{byte(e.typ)}
78114
}
79115
}
116+
117+
root := s.mpt.StateRoot()
118+
s.mpt.Store.Put([]byte{rootKey}, root[:])
119+
120+
_, err := s.mpt.PutBatch(mpt.MapToMPTBatch(s.mptOpsBatch))
121+
if err != nil {
122+
return fmt.Errorf("put batch to MPT storage: %w", err)
123+
}
124+
clear(s.mptOpsBatch)
125+
126+
s.mpt.Flush(bInd)
127+
128+
return nil
80129
}
81130

82-
func (s *containerStorage) putRawIndexes(ctx context.Context, ee []objEvent, net NeoFSNetwork) error {
83-
var wg errgroup.Group
84-
wg.SetLimit(10)
85-
objects := make([]objectsdk.Object, len(ee))
131+
// lock should be taken.
132+
func (s *containerStorage) putRawIndexes(ctx context.Context, l *zap.Logger, ee []objEvent, net NeoFSNetwork, res chan<- eventWithMptKVs) error {
133+
batch := make(map[string][]byte)
134+
var finalErr error
135+
defer func() {
136+
close(res)
86137

87-
for i, e := range ee {
88-
wg.Go(func() error {
89-
h, err := net.Head(ctx, e.cID, e.oID)
138+
if len(batch) > 0 {
139+
err := s.db.PutChangeSet(batch, nil)
90140
if err != nil {
91-
// TODO define behavior with status (non-network) errors; maybe it is near #3140
92-
return fmt.Errorf("HEAD object: %w", err)
141+
finalErr = fmt.Errorf("put change set to DB: %w", err)
93142
}
143+
}
144+
}()
94145

95-
objects[i] = h
96-
return nil
97-
})
98-
}
146+
for _, e := range ee {
147+
err := isOpAllowed(s.db, e)
148+
if err != nil {
149+
l.Warn("skip object", zap.Stringer("oid", e.oID), zap.String("reason", err.Error()))
150+
continue
151+
}
99152

100-
err := wg.Wait()
101-
if err != nil {
102-
return err
103-
}
153+
evWithMtp := eventWithMptKVs{ev: e}
104154

105-
s.m.Lock()
106-
defer s.m.Unlock()
107-
batch := make(map[string][]byte)
155+
h, err := net.Head(ctx, e.cID, e.oID)
156+
if err != nil {
157+
// TODO define behavior with status (non-network) errors; maybe it is near #3140
158+
return fmt.Errorf("HEAD %s object: %w", e.oID, err)
159+
}
108160

109-
for i, e := range ee {
110161
commsuffix := e.oID[:]
111162

112163
batch[string(append([]byte{oidIndex}, commsuffix...))] = []byte{}
113164
if len(e.deletedObjects) > 0 {
114165
batch[string(append([]byte{deletedIndex}, commsuffix...))] = e.deletedObjects
115-
err = deleteObjectsOps(batch, s.mptOpsBatch, s.db, e.deletedObjects)
166+
evWithMtp.additionalKVs, err = deleteObjectsOps(batch, s.db, e.deletedObjects)
116167
if err != nil {
117-
return fmt.Errorf("cleaning operations for %s object: %w", e.oID, err)
168+
l.Error("cleaning deleted object", zap.Stringer("oid", e.oID), zap.Error(err))
169+
continue
118170
}
119171
}
120172
if len(e.lockedObjects) > 0 {
121173
batch[string(append([]byte{lockedIndex}, commsuffix...))] = e.lockedObjects
174+
175+
for locked := range slices.Chunk(e.lockedObjects, oid.Size) {
176+
batch[string(append([]byte{lockedByIndex}, locked...))] = commsuffix
177+
}
122178
}
123179

124-
err = object.VerifyHeaderForMetadata(objects[i])
180+
err = object.VerifyHeaderForMetadata(h)
125181
if err != nil {
126-
return fmt.Errorf("invalid %s header: %w", e.oID, err)
182+
l.Error("header verification", zap.Stringer("oid", e.oID), zap.Error(err))
183+
continue
127184
}
128185

129-
fillObjectIndex(batch, objects[i])
186+
res <- evWithMtp
187+
188+
fillObjectIndex(batch, h)
130189
}
131190

132-
err = s.db.PutChangeSet(batch, nil)
133-
if err != nil {
134-
return fmt.Errorf("put change set to DB: %w", err)
191+
return finalErr
192+
}
193+
194+
func isOpAllowed(db storage.Store, e objEvent) error {
195+
if len(e.deletedObjects) == 0 && len(e.lockedObjects) == 0 {
196+
return nil
197+
}
198+
199+
key := make([]byte, 1+oid.Size)
200+
201+
for obj := range slices.Chunk(e.deletedObjects, oid.Size) {
202+
copy(key[1:], obj)
203+
204+
// delete object that does not exist
205+
key[0] = oidIndex
206+
_, err := db.Get(key)
207+
if err != nil {
208+
return fmt.Errorf("%s object-to-delete's presence check: %w", oid.ID(obj), err)
209+
}
210+
211+
// delete object that is locked
212+
key[0] = lockedByIndex
213+
v, err := db.Get(key)
214+
if err != nil {
215+
if errors.Is(err, storage.ErrKeyNotFound) {
216+
continue
217+
}
218+
return fmt.Errorf("%s object-to-delete's lock status check: %w", oid.ID(obj), err)
219+
}
220+
return fmt.Errorf("%s object-to-delete is locked by %s", oid.ID(obj), oid.ID(v))
221+
}
222+
223+
for obj := range slices.Chunk(e.lockedObjects, oid.Size) {
224+
copy(key[1:], obj)
225+
226+
// lock object that does not exist
227+
key[0] = oidIndex
228+
_, err := db.Get(key)
229+
if err != nil {
230+
return fmt.Errorf("%s object-to-lock's presence check: %w", oid.ID(obj), err)
231+
}
135232
}
136233

137234
return nil
@@ -192,8 +289,9 @@ func fillObjectIndex(batch map[string][]byte, h objectsdk.Object) {
192289
}
193290
}
194291

195-
func deleteObjectsOps(dbKV, mptKV map[string][]byte, s storage.Store, objects []byte) error {
292+
func deleteObjectsOps(dbKV map[string][]byte, s storage.Store, objects []byte) (map[string][]byte, error) {
196293
rng := storage.SeekRange{}
294+
mptKV := make(map[string][]byte)
197295

198296
// nil value means "delete" operation
199297

@@ -271,11 +369,11 @@ func deleteObjectsOps(dbKV, mptKV map[string][]byte, s storage.Store, objects []
271369
return true
272370
})
273371
if err != nil {
274-
return err
372+
return nil, err
275373
}
276374
}
277375

278-
return nil
376+
return mptKV, nil
279377
}
280378

281379
// lastObjectKey returns the least possible key in sorted DB list that

pkg/services/meta/notifications.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ outer:
259259
}
260260

261261
const (
262-
// MPT key prefixes.
262+
// MPT-only key prefixes.
263263
oidIndex = iota
264264
attrIntToOIDIndex
265265
attrPlainToOIDIndex
@@ -271,6 +271,9 @@ const (
271271
lockedIndex
272272
typeIndex
273273

274+
// storage-only key prefixes.
275+
lockedByIndex
276+
274277
lastEnumIndex
275278
)
276279

0 commit comments

Comments
 (0)