Skip to content

Commit 2c1f717

Browse files
authored
feat/Process object notification from chain (#3085)
2 parents d50f062 + dcbd80d commit 2c1f717

File tree

15 files changed

+1605
-1
lines changed

15 files changed

+1605
-1
lines changed

cmd/neofs-node/config.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
fstreeconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/blobstor/fstree"
2727
fschainconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/fschain"
2828
loggerconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/logger"
29+
metaconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/meta"
2930
nodeconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/node"
3031
objectconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/object"
3132
policerconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/policer"
@@ -46,6 +47,7 @@ import (
4647
"github.com/nspcc-dev/neofs-node/pkg/network/cache"
4748
"github.com/nspcc-dev/neofs-node/pkg/services/control"
4849
controlSvc "github.com/nspcc-dev/neofs-node/pkg/services/control/server"
50+
"github.com/nspcc-dev/neofs-node/pkg/services/meta"
4951
getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get"
5052
"github.com/nspcc-dev/neofs-node/pkg/services/policer"
5153
"github.com/nspcc-dev/neofs-node/pkg/services/replicator"
@@ -94,6 +96,10 @@ type applicationConfiguration struct {
9496
timestamp bool
9597
}
9698

99+
metadata struct {
100+
path string
101+
}
102+
97103
engine struct {
98104
errorThreshold uint32
99105
shardPoolSize uint32
@@ -159,6 +165,10 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {
159165
a.policer.replicationCooldown = policerconfig.ReplicationCooldown(c)
160166
a.policer.objectBatchSize = policerconfig.ObjectBatchSize(c)
161167

168+
// Meta data
169+
170+
a.metadata.path = metaconfig.Path(c)
171+
162172
// Storage Engine
163173

164174
a.engine.errorThreshold = engineconfig.ShardErrorThreshold(c)
@@ -396,6 +406,7 @@ type cfg struct {
396406
// configuration of the internal
397407
// services
398408
cfgGRPC cfgGRPC
409+
cfgMeta cfgMeta
399410
cfgMorph cfgMorph
400411
cfgContainer cfgContainer
401412
cfgNodeInfo cfgNodeInfo
@@ -427,6 +438,10 @@ type cfgGRPC struct {
427438
servers []*grpc.Server
428439
}
429440

441+
type cfgMeta struct {
442+
cLister meta.ContainerLister
443+
}
444+
430445
type cfgMorph struct {
431446
client *client.Client
432447

cmd/neofs-node/config/internal/validate/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ type valideConfig struct {
2323
ShutdownTimeout time.Duration `mapstructure:"shutdown_timeout"`
2424
} `mapstructure:"prometheus"`
2525

26+
Meta struct {
27+
Path string `mapstructure:"path"`
28+
} `mapstructure:"metadata"`
29+
2630
Node struct {
2731
Wallet struct {
2832
Path string `mapstructure:"path"`

cmd/neofs-node/config/meta/config.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package metaconfig
2+
3+
import (
4+
"github.com/nspcc-dev/neofs-node/cmd/neofs-node/config"
5+
)
6+
7+
const (
8+
subsection = "metadata"
9+
)
10+
11+
// Path returns the value of "path" config parameter
12+
// from "metadata" section.
13+
//
14+
// Returns empty string if the value is missing or invalid.
15+
func Path(c *config.Config) string {
16+
return config.StringSafe(c.Sub(subsection), "path")
17+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package metaconfig_test
2+
3+
import (
4+
"testing"
5+
6+
"github.com/nspcc-dev/neofs-node/cmd/neofs-node/config"
7+
metaconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/meta"
8+
configtest "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/test"
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
func TestLoggerSection_Level(t *testing.T) {
13+
t.Run("defaults", func(t *testing.T) {
14+
emptyConfig := configtest.EmptyConfig()
15+
require.Equal(t, "", metaconfig.Path(emptyConfig))
16+
})
17+
18+
const path = "../../../../config/example/node"
19+
20+
var fileConfigTest = func(c *config.Config) {
21+
require.Equal(t, "path/to/meta", metaconfig.Path(c))
22+
}
23+
24+
configtest.ForEachFileType(path, fileConfigTest)
25+
26+
t.Run("ENV", func(t *testing.T) {
27+
configtest.ForEnvFileType(path, fileConfigTest)
28+
})
29+
}

cmd/neofs-node/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ func initApp(c *cfg) {
136136
initAndLog(c, "session", initSessionService)
137137
initAndLog(c, "reputation", initReputationService)
138138
initAndLog(c, "object", initObjectService)
139+
initAndLog(c, "meta", initMeta)
139140
initAndLog(c, "tree", initTreeService)
140141

141142
initAndLog(c, "morph notifications", listenMorphNotifications)

cmd/neofs-node/meta.go

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
package main
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"fmt"
7+
"slices"
8+
"sync"
9+
10+
"github.com/nspcc-dev/neofs-node/pkg/core/container"
11+
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
12+
cntClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/container"
13+
"github.com/nspcc-dev/neofs-node/pkg/services/meta"
14+
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
15+
netmapsdk "github.com/nspcc-dev/neofs-sdk-go/netmap"
16+
"go.uber.org/zap"
17+
"golang.org/x/sync/errgroup"
18+
)
19+
20+
func initMeta(c *cfg) {
21+
if c.cfgMorph.client == nil {
22+
initMorphComponents(c)
23+
}
24+
25+
c.cfgMeta.cLister = &containerListener{
26+
key: c.binPublicKey,
27+
cnrClient: c.basics.cCli,
28+
containers: c.cfgObject.cnrSource,
29+
network: c.basics.netMapSource,
30+
}
31+
32+
m, err := meta.New(c.log.With(zap.String("service", "meta data")),
33+
c.cfgMeta.cLister,
34+
c.applicationConfiguration.fsChain.dialTimeout,
35+
c.applicationConfiguration.fsChain.endpoints,
36+
c.basics.containerSH,
37+
c.basics.netmapSH,
38+
c.applicationConfiguration.metadata.path)
39+
fatalOnErr(err)
40+
41+
c.workers = append(c.workers, newWorkerFromFunc(func(ctx context.Context) {
42+
err = m.Run(ctx)
43+
if err != nil {
44+
c.internalErr <- fmt.Errorf("meta data service error: %w", err)
45+
}
46+
}))
47+
}
48+
49+
type containerListener struct {
50+
key []byte
51+
52+
cnrClient *cntClient.Client
53+
containers container.Source
54+
network netmap.Source
55+
56+
m sync.RWMutex
57+
prevCnrs []cid.ID
58+
prevNetMap *netmapsdk.NetMap
59+
prevRes map[cid.ID]struct{}
60+
}
61+
62+
func (c *containerListener) List() (map[cid.ID]struct{}, error) {
63+
actualContainers, err := c.cnrClient.List(nil)
64+
if err != nil {
65+
return nil, fmt.Errorf("read containers: %w", err)
66+
}
67+
curEpoch, err := c.network.Epoch()
68+
if err != nil {
69+
return nil, fmt.Errorf("read current NeoFS epoch: %w", err)
70+
}
71+
networkMap, err := c.network.GetNetMapByEpoch(curEpoch)
72+
if err != nil {
73+
return nil, fmt.Errorf("read network map at the current epoch #%d: %w", curEpoch, err)
74+
}
75+
76+
c.m.RLock()
77+
if c.prevNetMap != nil && c.prevCnrs != nil {
78+
cnrsSame := slices.Equal(c.prevCnrs, actualContainers)
79+
if !cnrsSame {
80+
c.m.RUnlock()
81+
return c.prevRes, nil
82+
}
83+
netmapSame := slices.EqualFunc(c.prevNetMap.Nodes(), networkMap.Nodes(), func(n1 netmapsdk.NodeInfo, n2 netmapsdk.NodeInfo) bool {
84+
return bytes.Equal(n1.PublicKey(), n2.PublicKey())
85+
})
86+
if netmapSame {
87+
c.m.RUnlock()
88+
return c.prevRes, nil
89+
}
90+
}
91+
c.m.RUnlock()
92+
93+
var locM sync.Mutex
94+
res := make(map[cid.ID]struct{})
95+
var wg errgroup.Group
96+
for _, cID := range actualContainers {
97+
wg.Go(func() error {
98+
cnr, err := c.containers.Get(cID)
99+
if err != nil {
100+
return fmt.Errorf("read %s container: %w", cID, err)
101+
}
102+
103+
nodeSets, err := networkMap.ContainerNodes(cnr.Value.PlacementPolicy(), cID)
104+
if err != nil {
105+
return fmt.Errorf("apply container storage policy to %s container: %w", cID, err)
106+
}
107+
108+
for _, nodeSet := range nodeSets {
109+
for _, node := range nodeSet {
110+
if bytes.Equal(node.PublicKey(), c.key) {
111+
locM.Lock()
112+
res[cID] = struct{}{}
113+
locM.Unlock()
114+
return nil
115+
}
116+
}
117+
}
118+
119+
return nil
120+
})
121+
}
122+
123+
err = wg.Wait()
124+
if err != nil {
125+
return nil, err
126+
}
127+
128+
c.m.Lock()
129+
c.prevCnrs = actualContainers
130+
c.prevNetMap = networkMap
131+
c.prevRes = res
132+
c.m.Unlock()
133+
134+
return res, nil
135+
}

config/example/node.env

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ NEOFS_NODE_RELAY=true
2222
NEOFS_NODE_PERSISTENT_SESSIONS_PATH=/sessions
2323
NEOFS_NODE_PERSISTENT_STATE_PATH=/state
2424

25+
# Meta data section
26+
NEOFS_METADATA_PATH=path/to/meta
27+
2528
# Tree service section
2629
NEOFS_TREE_ENABLED=true
2730
NEOFS_TREE_CACHE_SIZE=15

config/example/node.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@
3737
"path": "/state"
3838
}
3939
},
40+
"metadata": {
41+
"path": "path/to/meta"
42+
},
4043
"grpc": [
4144
{
4245
"endpoint": "s01.neofs.devenv:8080",

config/example/node.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,9 @@ object:
105105
put:
106106
pool_size_remote: 100 # number of async workers for remote PUT operations
107107

108+
metadata:
109+
path: path/to/meta # path to meta data storages, required
110+
108111
storage:
109112
# note: shard configuration can be omitted for relay node (see `node.relay`)
110113
shard_pool_size: 15 # size of per-shard worker pools used for PUT operations

docs/storage-node-configuration.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ There are some custom types used for brevity:
2525
| `replicator` | [Replicator service configuration](#replicator-section) |
2626
| `storage` | [Storage engine configuration](#storage-section) |
2727
| `grpc` | [gRPC configuration](#grpc-section) |
28+
| `metadata` | [Meta service configuration](#meta-section) |
2829
| `node` | [Node configuration](#node-section) |
2930
| `object` | [Object service configuration](#object-section) |
3031
| `tree` | [Tree service configuration](#tree-section) |
@@ -300,6 +301,17 @@ pilorama:
300301

301302

302303

304+
# `metadata` section
305+
306+
```yaml
307+
metadata:
308+
path: path/to/meta
309+
```
310+
311+
| Parameter | Type | Default value | Description |
312+
|-------------------|------------|---------------|---------------------------------------|
313+
| `path` | `string` | | Path to meta data storages, required. |
314+
303315
# `node` section
304316

305317
```yaml

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ require (
3030
github.com/stretchr/testify v1.9.0
3131
go.etcd.io/bbolt v1.3.11
3232
go.uber.org/zap v1.27.0
33+
golang.org/x/exp v0.0.0-20240823005443-9b4947da3948
3334
golang.org/x/net v0.28.0
3435
golang.org/x/sync v0.10.0
3536
golang.org/x/sys v0.28.0
@@ -93,7 +94,6 @@ require (
9394
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect
9495
go.uber.org/multierr v1.11.0 // indirect
9596
golang.org/x/crypto v0.31.0 // indirect
96-
golang.org/x/exp v0.0.0-20240823005443-9b4947da3948 // indirect
9797
golang.org/x/text v0.21.0 // indirect
9898
google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 // indirect
9999
gopkg.in/ini.v1 v1.67.0 // indirect

0 commit comments

Comments
 (0)