Skip to content

Commit a6fc57d

Browse files
committed
node/meta: deduplicate header verification
It is the same for metabase and metaservice. Index delimiter was also moved. Signed-off-by: Pavel Karpy <[email protected]>
1 parent 1157372 commit a6fc57d

File tree

7 files changed

+67
-79
lines changed

7 files changed

+67
-79
lines changed

Diff for: pkg/core/object/metadata.go

+32
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,14 @@ package object
22

33
import (
44
"bytes"
5+
"errors"
56
"fmt"
67
"math/big"
78
"slices"
89
"strings"
910

1011
"github.com/nspcc-dev/neofs-sdk-go/client"
12+
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
1113
"github.com/nspcc-dev/neofs-sdk-go/object"
1214
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
1315
"github.com/nspcc-dev/neofs-sdk-go/user"
@@ -165,3 +167,33 @@ func calcMaxUniqueSearchResults(lim uint16, sets [][]client.SearchResultItem) ui
165167
}
166168
return n
167169
}
170+
171+
// AttributeDelimiter is attribute key and value separator used in metadata DB.
172+
var AttributeDelimiter = []byte{0x00}
173+
174+
// VerifyHeaderForMetadata checks whether given header corresponds to metadata
175+
// bucket requirements and limits.
176+
func VerifyHeaderForMetadata(hdr object.Object) error {
177+
if ln := hdr.HeaderLen(); ln > object.MaxHeaderLen {
178+
return fmt.Errorf("header len %d exceeds the limit", ln)
179+
}
180+
if hdr.GetContainerID().IsZero() {
181+
return fmt.Errorf("invalid container: %w", cid.ErrZero)
182+
}
183+
if hdr.Owner().IsZero() {
184+
return fmt.Errorf("invalid owner: %w", user.ErrZeroID)
185+
}
186+
if _, ok := hdr.PayloadChecksum(); !ok {
187+
return errors.New("missing payload checksum")
188+
}
189+
attrs := hdr.Attributes()
190+
for i := range attrs {
191+
if strings.IndexByte(attrs[i].Key(), AttributeDelimiter[0]) >= 0 {
192+
return fmt.Errorf("attribute #%d key contains 0x%02X byte used in sep", i, AttributeDelimiter[0])
193+
}
194+
if strings.IndexByte(attrs[i].Value(), AttributeDelimiter[0]) >= 0 {
195+
return fmt.Errorf("attribute #%d value contains 0x%02X byte used in sep", i, AttributeDelimiter[0])
196+
}
197+
}
198+
return nil
199+
}

Diff for: pkg/local_object_storage/metabase/metadata.go

+19-47
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"math/big"
1111
"slices"
1212
"strconv"
13-
"strings"
1413

1514
"github.com/google/uuid"
1615
"github.com/mr-tron/base58"
@@ -49,33 +48,6 @@ func invalidMetaBucketKeyErr(key []byte, cause error) error {
4948
return fmt.Errorf("invalid meta bucket key (prefix 0x%X): %w", key[0], cause)
5049
}
5150

52-
// checks whether given header corresponds to metadata bucket requirements and
53-
// limits.
54-
func verifyHeaderForMetadata(hdr object.Object) error {
55-
if ln := hdr.HeaderLen(); ln > object.MaxHeaderLen {
56-
return fmt.Errorf("header len %d exceeds the limit", ln)
57-
}
58-
if hdr.GetContainerID().IsZero() {
59-
return fmt.Errorf("invalid container: %w", cid.ErrZero)
60-
}
61-
if hdr.Owner().IsZero() {
62-
return fmt.Errorf("invalid owner: %w", user.ErrZeroID)
63-
}
64-
if _, ok := hdr.PayloadChecksum(); !ok {
65-
return errors.New("missing payload checksum")
66-
}
67-
attrs := hdr.Attributes()
68-
for i := range attrs {
69-
if strings.IndexByte(attrs[i].Key(), attributeDelimiter[0]) >= 0 {
70-
return fmt.Errorf("attribute #%d key contains 0x%02X byte used in sep", i, attributeDelimiter[0])
71-
}
72-
if strings.IndexByte(attrs[i].Value(), attributeDelimiter[0]) >= 0 {
73-
return fmt.Errorf("attribute #%d value contains 0x%02X byte used in sep", i, attributeDelimiter[0])
74-
}
75-
}
76-
return nil
77-
}
78-
7951
// PutMetadataForObject fills object meta-data indexes using bbolt transaction.
8052
// Transaction must be writable. Additional bucket for container's meta-data
8153
// may be created using {255, CID...} form as a key.
@@ -180,21 +152,21 @@ func deleteMetadata(tx *bbolt.Tx, cnr cid.ID, id oid.ID) error {
180152
pref[0] = metaPrefixIDAttr
181153
c := metaBkt.Cursor()
182154
for kIDAttr, _ := c.Seek(pref); bytes.HasPrefix(kIDAttr, pref); kIDAttr, _ = c.Next() {
183-
sepInd := bytes.LastIndex(kIDAttr, attributeDelimiter)
155+
sepInd := bytes.LastIndex(kIDAttr, objectcore.AttributeDelimiter)
184156
if sepInd < 0 {
185157
return fmt.Errorf("invalid key with prefix 0x%X in meta bucket: missing delimiter", kIDAttr[0])
186158
}
187159
kAttrID := make([]byte, len(kIDAttr)+attributeDelimiterLen)
188160
kAttrID[0] = metaPrefixAttrIDPlain
189161
off := 1 + copy(kAttrID[1:], kIDAttr[1+oid.Size:])
190-
off += copy(kAttrID[off:], attributeDelimiter)
162+
off += copy(kAttrID[off:], objectcore.AttributeDelimiter)
191163
copy(kAttrID[off:], id[:])
192164
ks = append(ks, kIDAttr, kAttrID)
193165
if n, ok := new(big.Int).SetString(string(kIDAttr[sepInd+attributeDelimiterLen:]), 10); ok && intWithinLimits(n) {
194166
kAttrIDInt := make([]byte, sepInd+attributeDelimiterLen+intValLen)
195167
kAttrIDInt[0] = metaPrefixAttrIDInt
196168
off := 1 + copy(kAttrIDInt[1:], kIDAttr[1+oid.Size:sepInd])
197-
off += copy(kAttrIDInt[off:], attributeDelimiter)
169+
off += copy(kAttrIDInt[off:], objectcore.AttributeDelimiter)
198170
putInt(kAttrIDInt[off:off+intValLen], n)
199171
copy(kAttrIDInt[off+intValLen:], id[:])
200172
ks = append(ks, kAttrIDInt)
@@ -307,7 +279,7 @@ func PreprocessSearchQuery(fs object.SearchFilters, attrs []string, cursor strin
307279
if !bytes.Equal(primKeysPrefix[1:1+len(attrs[0])], []byte(attrs[0])) {
308280
return nil, nil, fmt.Errorf("%w: %w", errInvalidCursor, errWrongPrimaryAttribute)
309281
}
310-
if !bytes.Equal(primKeysPrefix[1+len(attrs[0]):], attributeDelimiter) {
282+
if !bytes.Equal(primKeysPrefix[1+len(attrs[0]):], objectcore.AttributeDelimiter) {
311283
return nil, nil, fmt.Errorf("%w: %w", errInvalidCursor, errWrongKeyValDelim)
312284
}
313285
if primSeekKey[len(primKeysPrefix)] > 1 {
@@ -321,10 +293,10 @@ func PreprocessSearchQuery(fs object.SearchFilters, attrs []string, cursor strin
321293
if !bytes.Equal(primKeysPrefix[1:1+len(attrs[0])], []byte(attrs[0])) {
322294
return nil, nil, fmt.Errorf("%w: %w", errInvalidCursor, errWrongPrimaryAttribute)
323295
}
324-
if !bytes.Equal(primKeysPrefix[1+len(attrs[0]):], attributeDelimiter) {
296+
if !bytes.Equal(primKeysPrefix[1+len(attrs[0]):], objectcore.AttributeDelimiter) {
325297
return nil, nil, fmt.Errorf("%w: %w", errInvalidCursor, errWrongKeyValDelim)
326298
}
327-
if !bytes.Equal(primSeekKey[len(primSeekKey)-oid.Size-attributeDelimiterLen:][:attributeDelimiterLen], attributeDelimiter) {
299+
if !bytes.Equal(primSeekKey[len(primSeekKey)-oid.Size-attributeDelimiterLen:][:attributeDelimiterLen], objectcore.AttributeDelimiter) {
328300
return nil, nil, fmt.Errorf("%w: %w", errInvalidCursor, errWrongValOIDDelim)
329301
}
330302
}
@@ -354,15 +326,15 @@ func PreprocessSearchQuery(fs object.SearchFilters, attrs []string, cursor strin
354326
if objectcore.IsIntegerSearchOp(primMatcher) {
355327
f := fInt[0]
356328
if !f.auto && (primMatcher == object.MatchNumGE || primMatcher == object.MatchNumGT) {
357-
primSeekKey = slices.Concat([]byte{metaPrefixAttrIDInt}, []byte(attrs[0]), attributeDelimiter, f.b)
329+
primSeekKey = slices.Concat([]byte{metaPrefixAttrIDInt}, []byte(attrs[0]), objectcore.AttributeDelimiter, f.b)
358330
primKeysPrefix = primSeekKey[:1+len(attrs[0])+attributeDelimiterLen]
359331
} else {
360-
primSeekKey = slices.Concat([]byte{metaPrefixAttrIDInt}, []byte(attrs[0]), attributeDelimiter)
332+
primSeekKey = slices.Concat([]byte{metaPrefixAttrIDInt}, []byte(attrs[0]), objectcore.AttributeDelimiter)
361333
primKeysPrefix = primSeekKey
362334
}
363335
} else {
364336
// according to the condition above, primValDB is empty for '!=' matcher as it should be
365-
primSeekKey = slices.Concat([]byte{metaPrefixAttrIDPlain}, []byte(attrs[0]), attributeDelimiter, primValDB)
337+
primSeekKey = slices.Concat([]byte{metaPrefixAttrIDPlain}, []byte(attrs[0]), objectcore.AttributeDelimiter, primValDB)
366338
primKeysPrefix = primSeekKey[:1+len(attrs[0])+attributeDelimiterLen]
367339
}
368340
}
@@ -377,7 +349,7 @@ func splitValOID(b []byte) ([]byte, []byte, error) {
377349
}
378350
idOff := len(b) - oid.Size
379351
valLn := idOff - attributeDelimiterLen
380-
if !bytes.Equal(b[valLn:idOff], attributeDelimiter) {
352+
if !bytes.Equal(b[valLn:idOff], objectcore.AttributeDelimiter) {
381353
return nil, nil, errWrongValOIDDelim
382354
}
383355
return b[:valLn], b[idOff:], nil
@@ -800,11 +772,11 @@ func prepareMetaAttrIDKey(buf *keyBuffer, id oid.ID, attr string, valLen int, in
800772
k[0] = metaPrefixAttrIDPlain
801773
}
802774
off := 1 + copy(k[1:], attr)
803-
off += copy(k[off:], attributeDelimiter)
775+
off += copy(k[off:], objectcore.AttributeDelimiter)
804776
valOff := off
805777
off += valLen
806778
if !intAttr {
807-
off += copy(k[off:], attributeDelimiter)
779+
off += copy(k[off:], objectcore.AttributeDelimiter)
808780
}
809781
copy(k[off:], id[:])
810782
return k, valOff
@@ -816,7 +788,7 @@ func prepareMetaIDAttrKey(buf *keyBuffer, id oid.ID, attr string, valLen int) []
816788
k[0] = metaPrefixIDAttr
817789
off := 1 + copy(k[1:], id[:])
818790
off += copy(k[off:], attr)
819-
copy(k[off:], attributeDelimiter)
791+
copy(k[off:], objectcore.AttributeDelimiter)
820792
return k
821793
}
822794

@@ -854,7 +826,7 @@ func (x *metaAttributeSeeker) get(id []byte, attr string) ([]byte, error) {
854826
pref[0] = metaPrefixIDAttr
855827
off := 1 + copy(pref[1:], id)
856828
off += copy(pref[off:], attr)
857-
copy(pref[off:], attributeDelimiter)
829+
copy(pref[off:], objectcore.AttributeDelimiter)
858830
if x.crsr == nil {
859831
x.crsr = x.bkt.Cursor()
860832
}
@@ -916,7 +888,7 @@ func CalculateCursor(fs object.SearchFilters, lastItem client.SearchResultItem)
916888
}
917889
res := make([]byte, len(attr)+attributeDelimiterLen+intValLen+oid.Size)
918890
off := copy(res, attr)
919-
off += copy(res[off:], attributeDelimiter)
891+
off += copy(res[off:], objectcore.AttributeDelimiter)
920892
putInt(res[off:off+intValLen], n)
921893
copy(res[off+intValLen:], lastItem.ID[:])
922894
return res, nil
@@ -933,12 +905,12 @@ func CalculateCursor(fs object.SearchFilters, lastItem client.SearchResultItem)
933905
}
934906
res := make([]byte, len(attr)+attributeDelimiterLen+ln+attributeDelimiterLen+oid.Size)
935907
off := copy(res, attr)
936-
off += copy(res[off:], attributeDelimiter)
908+
off += copy(res[off:], objectcore.AttributeDelimiter)
937909
var err error
938910
if _, err = hex.Decode(res[off:], []byte(lastItemVal)); err != nil {
939911
return nil, fmt.Errorf("decode %q attribute from HEX: %w", attr, err)
940912
}
941-
off += copy(res[off+ln:], attributeDelimiter)
913+
off += copy(res[off+ln:], objectcore.AttributeDelimiter)
942914
copy(res[off:], lastItem.ID[:])
943915
return res, nil
944916
case object.FilterSplitID:
@@ -955,9 +927,9 @@ func CalculateCursor(fs object.SearchFilters, lastItem client.SearchResultItem)
955927
kln := len(attr) + attributeDelimiterLen + len(val) + attributeDelimiterLen + oid.Size
956928
res := make([]byte, kln)
957929
off := copy(res, attr)
958-
off += copy(res[off:], attributeDelimiter)
930+
off += copy(res[off:], objectcore.AttributeDelimiter)
959931
off += copy(res[off:], val)
960-
off += copy(res[off:], attributeDelimiter)
932+
off += copy(res[off:], objectcore.AttributeDelimiter)
961933
copy(res[off:], lastItem.ID[:])
962934
return res, nil
963935
}

Diff for: pkg/local_object_storage/metabase/put.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ func (db *DB) Put(obj *objectSDK.Object, storageID []byte, binHeader []byte) err
6464
func (db *DB) put(
6565
tx *bbolt.Tx, obj *objectSDK.Object, id []byte,
6666
si *objectSDK.SplitInfo, currEpoch uint64, hdrBin []byte) error {
67-
if err := verifyHeaderForMetadata(*obj); err != nil {
67+
if err := objectCore.VerifyHeaderForMetadata(*obj); err != nil {
6868
return err
6969
}
7070

Diff for: pkg/local_object_storage/metabase/util.go

-3
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,6 @@ import (
1212
"go.etcd.io/bbolt"
1313
)
1414

15-
// object attribute key and value separator used in DB.
16-
var attributeDelimiter = []byte{0x00}
17-
1815
const attributeDelimiterLen = 1
1916

2017
var (

Diff for: pkg/local_object_storage/metabase/util_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package meta
33
import (
44
"testing"
55

6+
"github.com/nspcc-dev/neofs-node/pkg/core/object"
67
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
78
"github.com/stretchr/testify/require"
89
)
@@ -15,7 +16,7 @@ type mockContainers struct {
1516
func (x mockContainers) Exists(cid.ID) (bool, error) { return !x.absent, x.err }
1617

1718
func TestAttributeDelimiter(t *testing.T) {
18-
t.Run("len", func(t *testing.T) { require.Len(t, attributeDelimiter, attributeDelimiterLen) })
19+
t.Run("len", func(t *testing.T) { require.Len(t, object.AttributeDelimiter, attributeDelimiterLen) })
1920
}
2021

2122
func TestKeyBuffer(t *testing.T) {

Diff for: pkg/local_object_storage/metabase/version.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"slices"
1010

1111
objectconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/object"
12+
objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object"
1213
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr"
1314
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
1415
"github.com/nspcc-dev/neofs-sdk-go/object"
@@ -224,7 +225,7 @@ func migrateObjectToMetaBucket(l *zap.Logger, tx *bbolt.Tx, metaBkt *bbolt.Bucke
224225
zap.Stringer("container", cnr), zap.Stringer("object", oid.ID(id)), zap.Binary("data", bin))
225226
return false, nil
226227
}
227-
if err := verifyHeaderForMetadata(hdr); err != nil {
228+
if err := objectcore.VerifyHeaderForMetadata(hdr); err != nil {
228229
l.Info("invalid header in the container bucket, ignoring", zap.Error(err),
229230
zap.Stringer("container", cnr), zap.Stringer("object", oid.ID(id)), zap.Binary("data", bin))
230231
return false, nil
@@ -242,7 +243,7 @@ func migrateObjectToMetaBucket(l *zap.Logger, tx *bbolt.Tx, metaBkt *bbolt.Bucke
242243
return false, fmt.Errorf("put metadata for object %s: %w", oid.ID(id), err)
243244
}
244245
if hasParent && !par.GetID().IsZero() { // skip the first object without useful info similar to DB.put
245-
if err := verifyHeaderForMetadata(hdr); err != nil {
246+
if err := objectcore.VerifyHeaderForMetadata(hdr); err != nil {
246247
l.Info("invalid parent header in the container bucket, ignoring", zap.Error(err),
247248
zap.Stringer("container", cnr), zap.Stringer("child", oid.ID(id)),
248249
zap.Stringer("parent", par.GetID()), zap.Binary("data", bin))

0 commit comments

Comments
 (0)