Skip to content

Commit 32d92c7

Browse files
committed
node/meta: fill full object index in container storage
Make underlying KV database store the same index structure as in the new metabase structure for search V2. Exclude repetitive fields from the database in the old MPT form. Receive object headers via internal object get (head) service. Closes #3139. Signed-off-by: Pavel Karpy <[email protected]>
1 parent 3a57b3f commit 32d92c7

File tree

7 files changed

+435
-106
lines changed

7 files changed

+435
-106
lines changed

cmd/neofs-node/config.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -441,7 +441,7 @@ type cfgGRPC struct {
441441
}
442442

443443
type cfgMeta struct {
444-
cLister meta.ContainerLister
444+
network meta.NeoFSNetwork
445445
}
446446

447447
type cfgMorph struct {

cmd/neofs-node/meta.go

+32-12
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,11 @@ import (
1111
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
1212
cntClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/container"
1313
"github.com/nspcc-dev/neofs-node/pkg/services/meta"
14+
getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get"
1415
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
1516
netmapsdk "github.com/nspcc-dev/neofs-sdk-go/netmap"
17+
"github.com/nspcc-dev/neofs-sdk-go/object"
18+
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
1619
"go.uber.org/zap"
1720
"golang.org/x/sync/errgroup"
1821
)
@@ -22,22 +25,23 @@ func initMeta(c *cfg) {
2225
initMorphComponents(c)
2326
}
2427

25-
c.cfgMeta.cLister = &metaContainerListener{
28+
c.cfgMeta.network = &neofsNetwork{
2629
key: c.binPublicKey,
2730
cnrClient: c.basics.cCli,
2831
containers: c.cfgObject.cnrSource,
2932
network: c.basics.netMapSource,
33+
header: c.cfgObject.getSvc,
3034
}
3135

3236
var err error
3337
p := meta.Parameters{
34-
Logger: c.log.With(zap.String("service", "meta data")),
35-
ContainerLister: c.cfgMeta.cLister,
36-
Timeout: c.applicationConfiguration.fsChain.dialTimeout,
37-
NeoEnpoints: c.applicationConfiguration.fsChain.endpoints,
38-
ContainerHash: c.basics.containerSH,
39-
NetmapHash: c.basics.netmapSH,
40-
RootPath: c.applicationConfiguration.metadata.path,
38+
Logger: c.log.With(zap.String("service", "meta data")),
39+
Network: c.cfgMeta.network,
40+
Timeout: c.applicationConfiguration.fsChain.dialTimeout,
41+
NeoEnpoints: c.applicationConfiguration.fsChain.endpoints,
42+
ContainerHash: c.basics.containerSH,
43+
NetmapHash: c.basics.netmapSH,
44+
RootPath: c.applicationConfiguration.metadata.path,
4145
}
4246
c.shared.metaService, err = meta.New(p)
4347
fatalOnErr(err)
@@ -50,20 +54,36 @@ func initMeta(c *cfg) {
5054
}))
5155
}
5256

53-
type metaContainerListener struct {
57+
type neofsNetwork struct {
5458
key []byte
5559

5660
cnrClient *cntClient.Client
5761
containers container.Source
5862
network netmap.Source
63+
header *getsvc.Service
5964

6065
m sync.RWMutex
6166
prevCnrs []cid.ID
6267
prevNetMap *netmapsdk.NetMap
6368
prevRes map[cid.ID]struct{}
6469
}
6570

66-
func (c *metaContainerListener) IsMineWithMeta(id cid.ID) (bool, error) {
71+
func (c *neofsNetwork) Head(ctx context.Context, cID cid.ID, oID oid.ID) (object.Object, error) {
72+
var hw headerWriter
73+
var addr oid.Address
74+
addr.SetContainer(cID)
75+
addr.SetObject(oID)
76+
77+
var hPrm getsvc.HeadPrm
78+
hPrm.SetHeaderWriter(&hw)
79+
hPrm.WithAddress(addr)
80+
81+
err := c.header.Head(ctx, hPrm)
82+
83+
return *hw.h, err
84+
}
85+
86+
func (c *neofsNetwork) IsMineWithMeta(id cid.ID) (bool, error) {
6787
curEpoch, err := c.network.Epoch()
6888
if err != nil {
6989
return false, fmt.Errorf("read current NeoFS epoch: %w", err)
@@ -75,7 +95,7 @@ func (c *metaContainerListener) IsMineWithMeta(id cid.ID) (bool, error) {
7595
return c.isMineWithMeta(id, networkMap)
7696
}
7797

78-
func (c *metaContainerListener) isMineWithMeta(id cid.ID, networkMap *netmapsdk.NetMap) (bool, error) {
98+
func (c *neofsNetwork) isMineWithMeta(id cid.ID, networkMap *netmapsdk.NetMap) (bool, error) {
7999
cnr, err := c.containers.Get(id)
80100
if err != nil {
81101
return false, fmt.Errorf("read %s container: %w", id, err)
@@ -104,7 +124,7 @@ func (c *metaContainerListener) isMineWithMeta(id cid.ID, networkMap *netmapsdk.
104124
return false, nil
105125
}
106126

107-
func (c *metaContainerListener) List() (map[cid.ID]struct{}, error) {
127+
func (c *neofsNetwork) List() (map[cid.ID]struct{}, error) {
108128
actualContainers, err := c.cnrClient.List(nil)
109129
if err != nil {
110130
return nil, fmt.Errorf("read containers: %w", err)

pkg/services/meta/blocks.go

+22-8
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,10 @@ import (
88
"github.com/nspcc-dev/neo-go/pkg/core/mpt"
99
"github.com/nspcc-dev/neo-go/pkg/neorpc"
1010
"go.uber.org/zap"
11+
"golang.org/x/sync/errgroup"
1112
)
1213

13-
func (m *Meta) handleBlock(b *block.Header) error {
14+
func (m *Meta) handleBlock(ctx context.Context, b *block.Header) error {
1415
h := b.Hash()
1516
ind := b.Index
1617
l := m.l.With(zap.Stringer("block hash", h), zap.Uint32("index", ind))
@@ -35,6 +36,11 @@ func (m *Meta) handleBlock(b *block.Header) error {
3536
m.m.RLock()
3637
defer m.m.RUnlock()
3738

39+
var wg errgroup.Group
40+
wg.SetLimit(1024)
41+
ctx, cancel := context.WithTimeout(ctx, m.timeout)
42+
defer cancel()
43+
3844
for _, n := range res.Application {
3945
ev, err := parseObjNotification(n)
4046
if err != nil {
@@ -48,13 +54,21 @@ func (m *Meta) handleBlock(b *block.Header) error {
4854
continue
4955
}
5056

51-
err = m.handleObjectNotification(s, ev)
52-
if err != nil {
53-
l.Error("handling object notification", zap.Error(err))
54-
continue
55-
}
57+
wg.Go(func() error {
58+
err := m.handleObjectNotification(ctx, s, ev)
59+
if err != nil {
60+
return fmt.Errorf("handling %s/%s object notification: %w", ev.cID, ev.oID, err)
61+
}
62+
63+
l.Debug("handled object notification successfully", zap.Stringer("cID", ev.cID), zap.Stringer("oID", ev.oID))
5664

57-
l.Debug("handled object notification successfully", zap.Stringer("cID", ev.cID), zap.Stringer("oID", ev.oID))
65+
return nil
66+
})
67+
}
68+
69+
err = wg.Wait()
70+
if err != nil {
71+
l.Error("failed to handle block's notifications", zap.Error(err))
5872
}
5973

6074
for _, st := range m.storages {
@@ -91,7 +105,7 @@ func (m *Meta) blockFetcher(ctx context.Context, buff <-chan *block.Header) {
91105
case <-ctx.Done():
92106
return
93107
case b := <-buff:
94-
err := m.handleBlock(b)
108+
err := m.handleBlock(ctx, b)
95109
if err != nil {
96110
m.l.Error("block handling failed", zap.Error(err))
97111
continue

0 commit comments

Comments
 (0)