Skip to content

Commit 5a9e733

Browse files
authored
Fix/meta/full index (#3177)
2 parents ea6b99c + 1c2d1a6 commit 5a9e733

File tree

13 files changed

+626
-222
lines changed

13 files changed

+626
-222
lines changed

Diff for: cmd/neofs-node/config.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -441,7 +441,7 @@ type cfgGRPC struct {
441441
}
442442

443443
type cfgMeta struct {
444-
cLister meta.ContainerLister
444+
network meta.NeoFSNetwork
445445
}
446446

447447
type cfgMorph struct {

Diff for: cmd/neofs-node/meta.go

+28-12
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,11 @@ import (
1111
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
1212
cntClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/container"
1313
"github.com/nspcc-dev/neofs-node/pkg/services/meta"
14+
getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get"
1415
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
1516
netmapsdk "github.com/nspcc-dev/neofs-sdk-go/netmap"
17+
"github.com/nspcc-dev/neofs-sdk-go/object"
18+
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
1619
"go.uber.org/zap"
1720
"golang.org/x/sync/errgroup"
1821
)
@@ -22,22 +25,23 @@ func initMeta(c *cfg) {
2225
initMorphComponents(c)
2326
}
2427

25-
c.cfgMeta.cLister = &metaContainerListener{
28+
c.cfgMeta.network = &neofsNetwork{
2629
key: c.binPublicKey,
2730
cnrClient: c.basics.cCli,
2831
containers: c.cfgObject.cnrSource,
2932
network: c.basics.netMapSource,
33+
header: c.cfgObject.getSvc,
3034
}
3135

3236
var err error
3337
p := meta.Parameters{
34-
Logger: c.log.With(zap.String("service", "meta data")),
35-
ContainerLister: c.cfgMeta.cLister,
36-
Timeout: c.applicationConfiguration.fsChain.dialTimeout,
37-
NeoEnpoints: c.applicationConfiguration.fsChain.endpoints,
38-
ContainerHash: c.basics.containerSH,
39-
NetmapHash: c.basics.netmapSH,
40-
RootPath: c.applicationConfiguration.metadata.path,
38+
Logger: c.log.With(zap.String("service", "meta data")),
39+
Network: c.cfgMeta.network,
40+
Timeout: c.applicationConfiguration.fsChain.dialTimeout,
41+
NeoEnpoints: c.applicationConfiguration.fsChain.endpoints,
42+
ContainerHash: c.basics.containerSH,
43+
NetmapHash: c.basics.netmapSH,
44+
RootPath: c.applicationConfiguration.metadata.path,
4145
}
4246
c.shared.metaService, err = meta.New(p)
4347
fatalOnErr(err)
@@ -50,20 +54,32 @@ func initMeta(c *cfg) {
5054
}))
5155
}
5256

53-
type metaContainerListener struct {
57+
type neofsNetwork struct {
5458
key []byte
5559

5660
cnrClient *cntClient.Client
5761
containers container.Source
5862
network netmap.Source
63+
header *getsvc.Service
5964

6065
m sync.RWMutex
6166
prevCnrs []cid.ID
6267
prevNetMap *netmapsdk.NetMap
6368
prevRes map[cid.ID]struct{}
6469
}
6570

66-
func (c *metaContainerListener) IsMineWithMeta(id cid.ID) (bool, error) {
71+
func (c *neofsNetwork) Head(ctx context.Context, cID cid.ID, oID oid.ID) (object.Object, error) {
72+
var hw headerWriter
73+
var hPrm getsvc.HeadPrm
74+
hPrm.SetHeaderWriter(&hw)
75+
hPrm.WithAddress(oid.NewAddress(cID, oID))
76+
77+
err := c.header.Head(ctx, hPrm)
78+
79+
return *hw.h, err
80+
}
81+
82+
func (c *neofsNetwork) IsMineWithMeta(id cid.ID) (bool, error) {
6783
curEpoch, err := c.network.Epoch()
6884
if err != nil {
6985
return false, fmt.Errorf("read current NeoFS epoch: %w", err)
@@ -75,7 +91,7 @@ func (c *metaContainerListener) IsMineWithMeta(id cid.ID) (bool, error) {
7591
return c.isMineWithMeta(id, networkMap)
7692
}
7793

78-
func (c *metaContainerListener) isMineWithMeta(id cid.ID, networkMap *netmapsdk.NetMap) (bool, error) {
94+
func (c *neofsNetwork) isMineWithMeta(id cid.ID, networkMap *netmapsdk.NetMap) (bool, error) {
7995
cnr, err := c.containers.Get(id)
8096
if err != nil {
8197
return false, fmt.Errorf("read %s container: %w", id, err)
@@ -104,7 +120,7 @@ func (c *metaContainerListener) isMineWithMeta(id cid.ID, networkMap *netmapsdk.
104120
return false, nil
105121
}
106122

107-
func (c *metaContainerListener) List() (map[cid.ID]struct{}, error) {
123+
func (c *neofsNetwork) List() (map[cid.ID]struct{}, error) {
108124
actualContainers, err := c.cnrClient.List(nil)
109125
if err != nil {
110126
return nil, fmt.Errorf("read containers: %w", err)

Diff for: pkg/core/object/metadata.go

+32
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,14 @@ package object
22

33
import (
44
"bytes"
5+
"errors"
56
"fmt"
67
"math/big"
78
"slices"
89
"strings"
910

1011
"github.com/nspcc-dev/neofs-sdk-go/client"
12+
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
1113
"github.com/nspcc-dev/neofs-sdk-go/object"
1214
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
1315
"github.com/nspcc-dev/neofs-sdk-go/user"
@@ -165,3 +167,33 @@ func calcMaxUniqueSearchResults(lim uint16, sets [][]client.SearchResultItem) ui
165167
}
166168
return n
167169
}
170+
171+
// AttributeDelimiter is attribute key and value separator used in metadata DB.
172+
var AttributeDelimiter = []byte{0x00}
173+
174+
// VerifyHeaderForMetadata checks whether given header corresponds to metadata
175+
// bucket requirements and limits.
176+
func VerifyHeaderForMetadata(hdr object.Object) error {
177+
if ln := hdr.HeaderLen(); ln > object.MaxHeaderLen {
178+
return fmt.Errorf("header len %d exceeds the limit", ln)
179+
}
180+
if hdr.GetContainerID().IsZero() {
181+
return fmt.Errorf("invalid container: %w", cid.ErrZero)
182+
}
183+
if hdr.Owner().IsZero() {
184+
return fmt.Errorf("invalid owner: %w", user.ErrZeroID)
185+
}
186+
if _, ok := hdr.PayloadChecksum(); !ok {
187+
return errors.New("missing payload checksum")
188+
}
189+
attrs := hdr.Attributes()
190+
for i := range attrs {
191+
if strings.IndexByte(attrs[i].Key(), AttributeDelimiter[0]) >= 0 {
192+
return fmt.Errorf("attribute #%d key contains 0x%02X byte used in sep", i, AttributeDelimiter[0])
193+
}
194+
if strings.IndexByte(attrs[i].Value(), AttributeDelimiter[0]) >= 0 {
195+
return fmt.Errorf("attribute #%d value contains 0x%02X byte used in sep", i, AttributeDelimiter[0])
196+
}
197+
}
198+
return nil
199+
}

Diff for: pkg/local_object_storage/metabase/metadata.go

+23-49
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"math/big"
1111
"slices"
1212
"strconv"
13-
"strings"
1413

1514
"github.com/google/uuid"
1615
"github.com/mr-tron/base58"
@@ -49,35 +48,10 @@ func invalidMetaBucketKeyErr(key []byte, cause error) error {
4948
return fmt.Errorf("invalid meta bucket key (prefix 0x%X): %w", key[0], cause)
5049
}
5150

52-
// checks whether given header corresponds to metadata bucket requirements and
53-
// limits.
54-
func verifyHeaderForMetadata(hdr object.Object) error {
55-
if ln := hdr.HeaderLen(); ln > object.MaxHeaderLen {
56-
return fmt.Errorf("header len %d exceeds the limit", ln)
57-
}
58-
if hdr.GetContainerID().IsZero() {
59-
return fmt.Errorf("invalid container: %w", cid.ErrZero)
60-
}
61-
if hdr.Owner().IsZero() {
62-
return fmt.Errorf("invalid owner: %w", user.ErrZeroID)
63-
}
64-
if _, ok := hdr.PayloadChecksum(); !ok {
65-
return errors.New("missing payload checksum")
66-
}
67-
attrs := hdr.Attributes()
68-
for i := range attrs {
69-
if strings.IndexByte(attrs[i].Key(), attributeDelimiter[0]) >= 0 {
70-
return fmt.Errorf("attribute #%d key contains 0x%02X byte used in sep", i, attributeDelimiter[0])
71-
}
72-
if strings.IndexByte(attrs[i].Value(), attributeDelimiter[0]) >= 0 {
73-
return fmt.Errorf("attribute #%d value contains 0x%02X byte used in sep", i, attributeDelimiter[0])
74-
}
75-
}
76-
return nil
77-
}
78-
79-
// returns BoltDB errors only.
80-
func putMetadataForObject(tx *bbolt.Tx, hdr object.Object, hasParent, phy bool) error {
51+
// PutMetadataForObject fills object meta-data indexes using bbolt transaction.
52+
// Transaction must be writable. Additional bucket for container's meta-data
53+
// may be created using {255, CID...} form as a key.
54+
func PutMetadataForObject(tx *bbolt.Tx, hdr object.Object, hasParent, phy bool) error {
8155
metaBkt, err := tx.CreateBucketIfNotExists(metaBucketKey(hdr.GetContainerID()))
8256
if err != nil {
8357
return fmt.Errorf("create meta bucket for container: %w", err)
@@ -178,21 +152,21 @@ func deleteMetadata(tx *bbolt.Tx, cnr cid.ID, id oid.ID) error {
178152
pref[0] = metaPrefixIDAttr
179153
c := metaBkt.Cursor()
180154
for kIDAttr, _ := c.Seek(pref); bytes.HasPrefix(kIDAttr, pref); kIDAttr, _ = c.Next() {
181-
sepInd := bytes.LastIndex(kIDAttr, attributeDelimiter)
155+
sepInd := bytes.LastIndex(kIDAttr, objectcore.AttributeDelimiter)
182156
if sepInd < 0 {
183157
return fmt.Errorf("invalid key with prefix 0x%X in meta bucket: missing delimiter", kIDAttr[0])
184158
}
185159
kAttrID := make([]byte, len(kIDAttr)+attributeDelimiterLen)
186160
kAttrID[0] = metaPrefixAttrIDPlain
187161
off := 1 + copy(kAttrID[1:], kIDAttr[1+oid.Size:])
188-
off += copy(kAttrID[off:], attributeDelimiter)
162+
off += copy(kAttrID[off:], objectcore.AttributeDelimiter)
189163
copy(kAttrID[off:], id[:])
190164
ks = append(ks, kIDAttr, kAttrID)
191165
if n, ok := new(big.Int).SetString(string(kIDAttr[sepInd+attributeDelimiterLen:]), 10); ok && intWithinLimits(n) {
192166
kAttrIDInt := make([]byte, sepInd+attributeDelimiterLen+intValLen)
193167
kAttrIDInt[0] = metaPrefixAttrIDInt
194168
off := 1 + copy(kAttrIDInt[1:], kIDAttr[1+oid.Size:sepInd])
195-
off += copy(kAttrIDInt[off:], attributeDelimiter)
169+
off += copy(kAttrIDInt[off:], objectcore.AttributeDelimiter)
196170
putInt(kAttrIDInt[off:off+intValLen], n)
197171
copy(kAttrIDInt[off+intValLen:], id[:])
198172
ks = append(ks, kAttrIDInt)
@@ -305,7 +279,7 @@ func PreprocessSearchQuery(fs object.SearchFilters, attrs []string, cursor strin
305279
if !bytes.Equal(primKeysPrefix[1:1+len(attrs[0])], []byte(attrs[0])) {
306280
return nil, nil, fmt.Errorf("%w: %w", errInvalidCursor, errWrongPrimaryAttribute)
307281
}
308-
if !bytes.Equal(primKeysPrefix[1+len(attrs[0]):], attributeDelimiter) {
282+
if !bytes.Equal(primKeysPrefix[1+len(attrs[0]):], objectcore.AttributeDelimiter) {
309283
return nil, nil, fmt.Errorf("%w: %w", errInvalidCursor, errWrongKeyValDelim)
310284
}
311285
if primSeekKey[len(primKeysPrefix)] > 1 {
@@ -319,10 +293,10 @@ func PreprocessSearchQuery(fs object.SearchFilters, attrs []string, cursor strin
319293
if !bytes.Equal(primKeysPrefix[1:1+len(attrs[0])], []byte(attrs[0])) {
320294
return nil, nil, fmt.Errorf("%w: %w", errInvalidCursor, errWrongPrimaryAttribute)
321295
}
322-
if !bytes.Equal(primKeysPrefix[1+len(attrs[0]):], attributeDelimiter) {
296+
if !bytes.Equal(primKeysPrefix[1+len(attrs[0]):], objectcore.AttributeDelimiter) {
323297
return nil, nil, fmt.Errorf("%w: %w", errInvalidCursor, errWrongKeyValDelim)
324298
}
325-
if !bytes.Equal(primSeekKey[len(primSeekKey)-oid.Size-attributeDelimiterLen:][:attributeDelimiterLen], attributeDelimiter) {
299+
if !bytes.Equal(primSeekKey[len(primSeekKey)-oid.Size-attributeDelimiterLen:][:attributeDelimiterLen], objectcore.AttributeDelimiter) {
326300
return nil, nil, fmt.Errorf("%w: %w", errInvalidCursor, errWrongValOIDDelim)
327301
}
328302
}
@@ -352,15 +326,15 @@ func PreprocessSearchQuery(fs object.SearchFilters, attrs []string, cursor strin
352326
if objectcore.IsIntegerSearchOp(primMatcher) {
353327
f := fInt[0]
354328
if !f.auto && (primMatcher == object.MatchNumGE || primMatcher == object.MatchNumGT) {
355-
primSeekKey = slices.Concat([]byte{metaPrefixAttrIDInt}, []byte(attrs[0]), attributeDelimiter, f.b)
329+
primSeekKey = slices.Concat([]byte{metaPrefixAttrIDInt}, []byte(attrs[0]), objectcore.AttributeDelimiter, f.b)
356330
primKeysPrefix = primSeekKey[:1+len(attrs[0])+attributeDelimiterLen]
357331
} else {
358-
primSeekKey = slices.Concat([]byte{metaPrefixAttrIDInt}, []byte(attrs[0]), attributeDelimiter)
332+
primSeekKey = slices.Concat([]byte{metaPrefixAttrIDInt}, []byte(attrs[0]), objectcore.AttributeDelimiter)
359333
primKeysPrefix = primSeekKey
360334
}
361335
} else {
362336
// according to the condition above, primValDB is empty for '!=' matcher as it should be
363-
primSeekKey = slices.Concat([]byte{metaPrefixAttrIDPlain}, []byte(attrs[0]), attributeDelimiter, primValDB)
337+
primSeekKey = slices.Concat([]byte{metaPrefixAttrIDPlain}, []byte(attrs[0]), objectcore.AttributeDelimiter, primValDB)
364338
primKeysPrefix = primSeekKey[:1+len(attrs[0])+attributeDelimiterLen]
365339
}
366340
}
@@ -375,7 +349,7 @@ func splitValOID(b []byte) ([]byte, []byte, error) {
375349
}
376350
idOff := len(b) - oid.Size
377351
valLn := idOff - attributeDelimiterLen
378-
if !bytes.Equal(b[valLn:idOff], attributeDelimiter) {
352+
if !bytes.Equal(b[valLn:idOff], objectcore.AttributeDelimiter) {
379353
return nil, nil, errWrongValOIDDelim
380354
}
381355
return b[:valLn], b[idOff:], nil
@@ -798,11 +772,11 @@ func prepareMetaAttrIDKey(buf *keyBuffer, id oid.ID, attr string, valLen int, in
798772
k[0] = metaPrefixAttrIDPlain
799773
}
800774
off := 1 + copy(k[1:], attr)
801-
off += copy(k[off:], attributeDelimiter)
775+
off += copy(k[off:], objectcore.AttributeDelimiter)
802776
valOff := off
803777
off += valLen
804778
if !intAttr {
805-
off += copy(k[off:], attributeDelimiter)
779+
off += copy(k[off:], objectcore.AttributeDelimiter)
806780
}
807781
copy(k[off:], id[:])
808782
return k, valOff
@@ -814,7 +788,7 @@ func prepareMetaIDAttrKey(buf *keyBuffer, id oid.ID, attr string, valLen int) []
814788
k[0] = metaPrefixIDAttr
815789
off := 1 + copy(k[1:], id[:])
816790
off += copy(k[off:], attr)
817-
copy(k[off:], attributeDelimiter)
791+
copy(k[off:], objectcore.AttributeDelimiter)
818792
return k
819793
}
820794

@@ -852,7 +826,7 @@ func (x *metaAttributeSeeker) get(id []byte, attr string) ([]byte, error) {
852826
pref[0] = metaPrefixIDAttr
853827
off := 1 + copy(pref[1:], id)
854828
off += copy(pref[off:], attr)
855-
copy(pref[off:], attributeDelimiter)
829+
copy(pref[off:], objectcore.AttributeDelimiter)
856830
if x.crsr == nil {
857831
x.crsr = x.bkt.Cursor()
858832
}
@@ -914,7 +888,7 @@ func CalculateCursor(fs object.SearchFilters, lastItem client.SearchResultItem)
914888
}
915889
res := make([]byte, len(attr)+attributeDelimiterLen+intValLen+oid.Size)
916890
off := copy(res, attr)
917-
off += copy(res[off:], attributeDelimiter)
891+
off += copy(res[off:], objectcore.AttributeDelimiter)
918892
putInt(res[off:off+intValLen], n)
919893
copy(res[off+intValLen:], lastItem.ID[:])
920894
return res, nil
@@ -931,12 +905,12 @@ func CalculateCursor(fs object.SearchFilters, lastItem client.SearchResultItem)
931905
}
932906
res := make([]byte, len(attr)+attributeDelimiterLen+ln+attributeDelimiterLen+oid.Size)
933907
off := copy(res, attr)
934-
off += copy(res[off:], attributeDelimiter)
908+
off += copy(res[off:], objectcore.AttributeDelimiter)
935909
var err error
936910
if _, err = hex.Decode(res[off:], []byte(lastItemVal)); err != nil {
937911
return nil, fmt.Errorf("decode %q attribute from HEX: %w", attr, err)
938912
}
939-
off += copy(res[off+ln:], attributeDelimiter)
913+
off += copy(res[off+ln:], objectcore.AttributeDelimiter)
940914
copy(res[off:], lastItem.ID[:])
941915
return res, nil
942916
case object.FilterSplitID:
@@ -953,9 +927,9 @@ func CalculateCursor(fs object.SearchFilters, lastItem client.SearchResultItem)
953927
kln := len(attr) + attributeDelimiterLen + len(val) + attributeDelimiterLen + oid.Size
954928
res := make([]byte, kln)
955929
off := copy(res, attr)
956-
off += copy(res[off:], attributeDelimiter)
930+
off += copy(res[off:], objectcore.AttributeDelimiter)
957931
off += copy(res[off:], val)
958-
off += copy(res[off:], attributeDelimiter)
932+
off += copy(res[off:], objectcore.AttributeDelimiter)
959933
copy(res[off:], lastItem.ID[:])
960934
return res, nil
961935
}

Diff for: pkg/local_object_storage/metabase/put.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ func (db *DB) Put(obj *objectSDK.Object, storageID []byte, binHeader []byte) err
6464
func (db *DB) put(
6565
tx *bbolt.Tx, obj *objectSDK.Object, id []byte,
6666
si *objectSDK.SplitInfo, currEpoch uint64, hdrBin []byte) error {
67-
if err := verifyHeaderForMetadata(*obj); err != nil {
67+
if err := objectCore.VerifyHeaderForMetadata(*obj); err != nil {
6868
return err
6969
}
7070

@@ -152,7 +152,7 @@ func (db *DB) put(
152152
}
153153
}
154154

155-
if err := putMetadataForObject(tx, *obj, par != nil, !isParent); err != nil {
155+
if err := PutMetadataForObject(tx, *obj, par != nil, !isParent); err != nil {
156156
return fmt.Errorf("put metadata: %w", err)
157157
}
158158

Diff for: pkg/local_object_storage/metabase/util.go

-3
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,6 @@ import (
1212
"go.etcd.io/bbolt"
1313
)
1414

15-
// object attribute key and value separator used in DB.
16-
var attributeDelimiter = []byte{0x00}
17-
1815
const attributeDelimiterLen = 1
1916

2017
var (

Diff for: pkg/local_object_storage/metabase/util_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package meta
33
import (
44
"testing"
55

6+
"github.com/nspcc-dev/neofs-node/pkg/core/object"
67
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
78
"github.com/stretchr/testify/require"
89
)
@@ -15,7 +16,7 @@ type mockContainers struct {
1516
func (x mockContainers) Exists(cid.ID) (bool, error) { return !x.absent, x.err }
1617

1718
func TestAttributeDelimiter(t *testing.T) {
18-
t.Run("len", func(t *testing.T) { require.Len(t, attributeDelimiter, attributeDelimiterLen) })
19+
t.Run("len", func(t *testing.T) { require.Len(t, object.AttributeDelimiter, attributeDelimiterLen) })
1920
}
2021

2122
func TestKeyBuffer(t *testing.T) {

0 commit comments

Comments
 (0)