Skip to content

Commit 0160ebf

Browse files
committed
ir/audit: Use SearchV2 for storage group listing
New search allows to filter out expired storage groups at the listing stage. The code volume is reduced in concert. Closes #3144. Refs #3058. Signed-off-by: Leonard Lyubich <[email protected]>
1 parent 2c1f717 commit 0160ebf

File tree

7 files changed

+45
-99
lines changed

7 files changed

+45
-99
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ Changelog for NeoFS Node
2929
- Number of cuncurrenly handled notifications from the chain was increased from 10 to 300 for IR (#3068)
3030
- Write-cache size estimations (#3106)
3131
- New network map support solving the limit of ~320 nodes per network
32+
- IR calls `ObjectService.SearchV2` to select SG objects now (#3144)
3233

3334
### Removed
3435
- Drop creating new eacl tables with public keys (#3096)

pkg/core/client/client.go

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ type Client interface {
2626
ObjectGetInit(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectGet) (object.Object, *client.PayloadReader, error)
2727
ObjectHead(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectHead) (*object.Object, error)
2828
ObjectSearchInit(ctx context.Context, containerID cid.ID, signer user.Signer, prm client.PrmObjectSearch) (*client.ObjectListReader, error)
29+
SearchObjects(context.Context, cid.ID, object.SearchFilters, []string, string, neofscrypto.Signer, client.SearchObjectsOptions) ([]client.SearchResultItem, string, error)
2930
ObjectRangeInit(ctx context.Context, containerID cid.ID, objectID oid.ID, offset, length uint64, signer user.Signer, prm client.PrmObjectRange) (*client.ObjectRangeReader, error)
3031
ObjectHash(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectHash) ([][]byte, error)
3132
AnnounceLocalTrust(ctx context.Context, epoch uint64, trusts []reputationSDK.Trust, prm client.PrmAnnounceLocalTrust) error

pkg/core/storagegroup/storagegroup.go

+3-16
Original file line numberDiff line numberDiff line change
@@ -10,20 +10,6 @@ import (
1010
"github.com/nspcc-dev/neofs-sdk-go/storagegroup"
1111
)
1212

13-
// SearchSGPrm groups the parameters which are formed by Processor to search the storage group objects.
14-
type SearchSGPrm struct {
15-
Context context.Context
16-
17-
Container cid.ID
18-
19-
NodeInfo client.NodeInfo
20-
}
21-
22-
// SearchSGDst groups the target values which Processor expects from SG searching to process.
23-
type SearchSGDst struct {
24-
Objects []oid.ID
25-
}
26-
2713
// GetSGPrm groups parameter of GetSG operation.
2814
type GetSGPrm struct {
2915
Context context.Context
@@ -37,10 +23,11 @@ type GetSGPrm struct {
3723

3824
// SGSource is a storage group information source interface.
3925
type SGSource interface {
40-
// ListSG must list storage group objects in the container. Formed list must be written to destination.
26+
// ListSG must list container's storage group objects not expired at the
27+
// specified current epoch.
4128
//
4229
// Must return any error encountered which did not allow to form the list.
43-
ListSG(*SearchSGDst, SearchSGPrm) error
30+
ListSG(context.Context, client.NodeInfo, cid.ID, uint64) ([]oid.ID, error)
4431

4532
// GetSG must return storage group object for the provided CID, OID,
4633
// container and netmap state.

pkg/innerring/internal/client/client.go

-53
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,7 @@ import (
88
"io"
99

1010
clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client"
11-
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/storagegroup"
1211
"github.com/nspcc-dev/neofs-sdk-go/client"
13-
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
1412
"github.com/nspcc-dev/neofs-sdk-go/object"
1513
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
1614
"github.com/nspcc-dev/neofs-sdk-go/user"
@@ -33,57 +31,6 @@ func (x *Client) SetPrivateKey(key *ecdsa.PrivateKey) {
3331
x.signer = user.NewAutoIDSigner(*key)
3432
}
3533

36-
// SearchSGPrm groups parameters of SearchSG operation.
37-
type SearchSGPrm struct {
38-
contextPrm
39-
40-
cnrID cid.ID
41-
}
42-
43-
// SetContainerID sets the ID of the container to search for storage groups.
44-
func (x *SearchSGPrm) SetContainerID(id cid.ID) {
45-
x.cnrID = id
46-
}
47-
48-
// SearchSGRes groups the resulting values of SearchSG operation.
49-
type SearchSGRes struct {
50-
cliRes []oid.ID
51-
}
52-
53-
// IDList returns a list of IDs of storage groups in the container.
54-
func (x SearchSGRes) IDList() []oid.ID {
55-
return x.cliRes
56-
}
57-
58-
var sgFilter = storagegroup.SearchQuery()
59-
60-
// SearchSG lists objects of storage group type in the container.
61-
//
62-
// Returns any error which prevented the operation from completing correctly in error return.
63-
func (x Client) SearchSG(prm SearchSGPrm) (*SearchSGRes, error) {
64-
var cliPrm client.PrmObjectSearch
65-
cliPrm.SetFilters(sgFilter)
66-
67-
rdr, err := x.c.ObjectSearchInit(prm.ctx, prm.cnrID, x.signer, cliPrm)
68-
if err != nil {
69-
return nil, fmt.Errorf("init object search: %w", err)
70-
}
71-
72-
var list []oid.ID
73-
74-
err = rdr.Iterate(func(id oid.ID) bool {
75-
list = append(list, id)
76-
return false
77-
})
78-
if err != nil {
79-
return nil, fmt.Errorf("search objects using NeoFS API: %w", err)
80-
}
81-
82-
return &SearchSGRes{
83-
cliRes: list,
84-
}, nil
85-
}
86-
8734
// GetObjectPrm groups parameters of GetObject operation.
8835
type GetObjectPrm struct {
8936
getObjectPrm

pkg/innerring/processors/audit/process.go

+3-15
Original file line numberDiff line numberDiff line change
@@ -118,12 +118,7 @@ func (ap *Processor) findStorageGroups(cnr cid.ID, shuffled netmapcore.Nodes) []
118118

119119
ln := len(shuffled)
120120

121-
var (
122-
info clientcore.NodeInfo
123-
prm storagegroup.SearchSGPrm
124-
)
125-
126-
prm.Container = cnr
121+
var info clientcore.NodeInfo
127122

128123
for i := range shuffled { // consider iterating over some part of container
129124
log := ap.log.With(
@@ -141,22 +136,15 @@ func (ap *Processor) findStorageGroups(cnr cid.ID, shuffled netmapcore.Nodes) []
141136
}
142137

143138
ctx, cancel := context.WithTimeout(context.Background(), ap.searchTimeout)
144-
145-
prm.Context = ctx
146-
prm.NodeInfo = info
147-
148-
var dst storagegroup.SearchSGDst
149-
150-
err = ap.sgSrc.ListSG(&dst, prm)
151-
139+
ids, err := ap.sgSrc.ListSG(ctx, info, cnr, ap.epochSrc.EpochCounter())
152140
cancel()
153141

154142
if err != nil {
155143
log.Warn("error in storage group search", zap.Error(err))
156144
continue
157145
}
158146

159-
sg = append(sg, dst.Objects...)
147+
sg = append(sg, ids...)
160148

161149
break // we found storage groups, so break loop
162150
}

pkg/innerring/rpc.go

+27-15
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"context"
55
"crypto/ecdsa"
66
"fmt"
7+
"slices"
8+
"strconv"
79
"sync"
810
"time"
911

@@ -14,7 +16,11 @@ import (
1416
"github.com/nspcc-dev/neofs-node/pkg/network/cache"
1517
"github.com/nspcc-dev/neofs-node/pkg/services/audit/auditor"
1618
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
19+
storagegroupsvc "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/storagegroup"
20+
"github.com/nspcc-dev/neofs-sdk-go/client"
1721
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
22+
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
23+
neofsecdsa "github.com/nspcc-dev/neofs-sdk-go/crypto/ecdsa"
1824
"github.com/nspcc-dev/neofs-sdk-go/netmap"
1925
"github.com/nspcc-dev/neofs-sdk-go/object"
2026
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
@@ -226,23 +232,29 @@ func (c *ClientCache) getWrappedClient(info clientcore.NodeInfo) (neofsapiclient
226232
return cInternal, nil
227233
}
228234

229-
func (c ClientCache) ListSG(dst *storagegroup2.SearchSGDst, prm storagegroup2.SearchSGPrm) error {
230-
cli, err := c.getWrappedClient(prm.NodeInfo)
235+
func (c ClientCache) ListSG(ctx context.Context, node clientcore.NodeInfo, cnr cid.ID, notExpiredAt uint64) ([]oid.ID, error) {
236+
cli, err := c.Get(node)
231237
if err != nil {
232-
return fmt.Errorf("could not get API client from cache")
238+
return nil, fmt.Errorf("could not get API client from cache: %w", err)
233239
}
234240

235-
var cliPrm neofsapiclient.SearchSGPrm
236-
237-
cliPrm.SetContext(prm.Context)
238-
cliPrm.SetContainerID(prm.Container)
239-
240-
res, err := cli.SearchSG(cliPrm)
241-
if err != nil {
242-
return err
241+
fs := storagegroupsvc.SearchQuery()
242+
fs.AddFilter(object.AttributeExpirationEpoch, strconv.FormatUint(notExpiredAt, 10), object.MatchNumGE)
243+
var opts client.SearchObjectsOptions
244+
var cursor string
245+
var next []client.SearchResultItem
246+
var res []oid.ID
247+
for {
248+
next, cursor, err = cli.SearchObjects(ctx, cnr, fs, nil, cursor, (*neofsecdsa.Signer)(c.key), opts)
249+
if err != nil {
250+
return nil, fmt.Errorf("search objects RPC: %w", err)
251+
}
252+
res = slices.Grow(res, len(res)+len(next))
253+
for i := range next {
254+
res = append(res, next[i].ID)
255+
}
256+
if cursor == "" {
257+
return res, nil
258+
}
243259
}
244-
245-
dst.Objects = res.IDList()
246-
247-
return nil
248260
}

pkg/network/cache/multi.go

+10
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,16 @@ func (x *multiClient) ObjectSearchInit(ctx context.Context, containerID cid.ID,
323323
return
324324
}
325325

326+
func (x *multiClient) SearchObjects(ctx context.Context, cnr cid.ID, fs objectSDK.SearchFilters, attrs []string, cursor string,
327+
signer neofscrypto.Signer, opts client.SearchObjectsOptions) ([]client.SearchResultItem, string, error) {
328+
var res []client.SearchResultItem
329+
return res, cursor, x.iterateClients(ctx, func(c clientcore.Client) error {
330+
var err error
331+
res, cursor, err = c.SearchObjects(ctx, cnr, fs, attrs, cursor, signer, opts)
332+
return err
333+
})
334+
}
335+
326336
func (x *multiClient) AnnounceLocalTrust(ctx context.Context, epoch uint64, trusts []reputationSDK.Trust, prm client.PrmAnnounceLocalTrust) error {
327337
return x.iterateClients(ctx, func(c clientcore.Client) error {
328338
return c.AnnounceLocalTrust(ctx, epoch, trusts, prm)

0 commit comments

Comments
 (0)