Skip to content

Commit d5f5e64

Browse files
committed
ir/audit: Use SearchV2 for storage group listing
New search allows to filter out expired storage groups at the listing stage. The code volume is reduced in concert. Closes #3144. Refs #3058. Signed-off-by: Leonard Lyubich <[email protected]>
1 parent 59d6877 commit d5f5e64

File tree

8 files changed

+56
-99
lines changed

8 files changed

+56
-99
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ Changelog for NeoFS Node
88
### Fixed
99

1010
### Changed
11+
- IR calls `ObjectService.SearchV2` to select SG objects now (#3144)
1112

1213
### Removed
1314

pkg/core/client/client.go

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ type Client interface {
2626
ObjectGetInit(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectGet) (object.Object, *client.PayloadReader, error)
2727
ObjectHead(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectHead) (*object.Object, error)
2828
ObjectSearchInit(ctx context.Context, containerID cid.ID, signer user.Signer, prm client.PrmObjectSearch) (*client.ObjectListReader, error)
29+
SearchObjects(context.Context, cid.ID, object.SearchFilters, []string, string, neofscrypto.Signer, client.SearchObjectsOptions) ([]client.SearchResultItem, string, error)
2930
ObjectRangeInit(ctx context.Context, containerID cid.ID, objectID oid.ID, offset, length uint64, signer user.Signer, prm client.PrmObjectRange) (*client.ObjectRangeReader, error)
3031
ObjectHash(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectHash) ([][]byte, error)
3132
AnnounceLocalTrust(ctx context.Context, epoch uint64, trusts []reputationSDK.Trust, prm client.PrmAnnounceLocalTrust) error

pkg/core/storagegroup/storagegroup.go

+3-16
Original file line numberDiff line numberDiff line change
@@ -10,20 +10,6 @@ import (
1010
"github.com/nspcc-dev/neofs-sdk-go/storagegroup"
1111
)
1212

13-
// SearchSGPrm groups the parameters which are formed by Processor to search the storage group objects.
14-
type SearchSGPrm struct {
15-
Context context.Context
16-
17-
Container cid.ID
18-
19-
NodeInfo client.NodeInfo
20-
}
21-
22-
// SearchSGDst groups the target values which Processor expects from SG searching to process.
23-
type SearchSGDst struct {
24-
Objects []oid.ID
25-
}
26-
2713
// GetSGPrm groups parameter of GetSG operation.
2814
type GetSGPrm struct {
2915
Context context.Context
@@ -37,10 +23,11 @@ type GetSGPrm struct {
3723

3824
// SGSource is a storage group information source interface.
3925
type SGSource interface {
40-
// ListSG must list storage group objects in the container. Formed list must be written to destination.
26+
// ListSG must list container's storage group objects not expired at the
27+
// specified current epoch.
4128
//
4229
// Must return any error encountered which did not allow to form the list.
43-
ListSG(*SearchSGDst, SearchSGPrm) error
30+
ListSG(context.Context, client.NodeInfo, cid.ID, uint64) ([]oid.ID, error)
4431

4532
// GetSG must return storage group object for the provided CID, OID,
4633
// container and netmap state.

pkg/innerring/internal/client/client.go

-53
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,7 @@ import (
88
"io"
99

1010
clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client"
11-
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/storagegroup"
1211
"github.com/nspcc-dev/neofs-sdk-go/client"
13-
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
1412
"github.com/nspcc-dev/neofs-sdk-go/object"
1513
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
1614
"github.com/nspcc-dev/neofs-sdk-go/user"
@@ -33,57 +31,6 @@ func (x *Client) SetPrivateKey(key *ecdsa.PrivateKey) {
3331
x.signer = user.NewAutoIDSigner(*key)
3432
}
3533

36-
// SearchSGPrm groups parameters of SearchSG operation.
37-
type SearchSGPrm struct {
38-
contextPrm
39-
40-
cnrID cid.ID
41-
}
42-
43-
// SetContainerID sets the ID of the container to search for storage groups.
44-
func (x *SearchSGPrm) SetContainerID(id cid.ID) {
45-
x.cnrID = id
46-
}
47-
48-
// SearchSGRes groups the resulting values of SearchSG operation.
49-
type SearchSGRes struct {
50-
cliRes []oid.ID
51-
}
52-
53-
// IDList returns a list of IDs of storage groups in the container.
54-
func (x SearchSGRes) IDList() []oid.ID {
55-
return x.cliRes
56-
}
57-
58-
var sgFilter = storagegroup.SearchQuery()
59-
60-
// SearchSG lists objects of storage group type in the container.
61-
//
62-
// Returns any error which prevented the operation from completing correctly in error return.
63-
func (x Client) SearchSG(prm SearchSGPrm) (*SearchSGRes, error) {
64-
var cliPrm client.PrmObjectSearch
65-
cliPrm.SetFilters(sgFilter)
66-
67-
rdr, err := x.c.ObjectSearchInit(prm.ctx, prm.cnrID, x.signer, cliPrm)
68-
if err != nil {
69-
return nil, fmt.Errorf("init object search: %w", err)
70-
}
71-
72-
var list []oid.ID
73-
74-
err = rdr.Iterate(func(id oid.ID) bool {
75-
list = append(list, id)
76-
return false
77-
})
78-
if err != nil {
79-
return nil, fmt.Errorf("search objects using NeoFS API: %w", err)
80-
}
81-
82-
return &SearchSGRes{
83-
cliRes: list,
84-
}, nil
85-
}
86-
8734
// GetObjectPrm groups parameters of GetObject operation.
8835
type GetObjectPrm struct {
8936
getObjectPrm

pkg/innerring/processors/audit/process.go

+3-15
Original file line numberDiff line numberDiff line change
@@ -118,12 +118,7 @@ func (ap *Processor) findStorageGroups(cnr cid.ID, shuffled netmapcore.Nodes) []
118118

119119
ln := len(shuffled)
120120

121-
var (
122-
info clientcore.NodeInfo
123-
prm storagegroup.SearchSGPrm
124-
)
125-
126-
prm.Container = cnr
121+
var info clientcore.NodeInfo
127122

128123
for i := range shuffled { // consider iterating over some part of container
129124
log := ap.log.With(
@@ -141,22 +136,15 @@ func (ap *Processor) findStorageGroups(cnr cid.ID, shuffled netmapcore.Nodes) []
141136
}
142137

143138
ctx, cancel := context.WithTimeout(context.Background(), ap.searchTimeout)
144-
145-
prm.Context = ctx
146-
prm.NodeInfo = info
147-
148-
var dst storagegroup.SearchSGDst
149-
150-
err = ap.sgSrc.ListSG(&dst, prm)
151-
139+
ids, err := ap.sgSrc.ListSG(ctx, info, cnr, ap.epochSrc.EpochCounter())
152140
cancel()
153141

154142
if err != nil {
155143
log.Warn("error in storage group search", zap.Error(err))
156144
continue
157145
}
158146

159-
sg = append(sg, dst.Objects...)
147+
sg = append(sg, ids...)
160148

161149
break // we found storage groups, so break loop
162150
}

pkg/innerring/rpc.go

+27-15
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"context"
55
"crypto/ecdsa"
66
"fmt"
7+
"slices"
8+
"strconv"
79
"sync"
810
"time"
911

@@ -14,7 +16,11 @@ import (
1416
"github.com/nspcc-dev/neofs-node/pkg/network/cache"
1517
"github.com/nspcc-dev/neofs-node/pkg/services/audit/auditor"
1618
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
19+
storagegroupsvc "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/storagegroup"
20+
"github.com/nspcc-dev/neofs-sdk-go/client"
1721
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
22+
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
23+
neofsecdsa "github.com/nspcc-dev/neofs-sdk-go/crypto/ecdsa"
1824
"github.com/nspcc-dev/neofs-sdk-go/netmap"
1925
"github.com/nspcc-dev/neofs-sdk-go/object"
2026
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
@@ -226,23 +232,29 @@ func (c *ClientCache) getWrappedClient(info clientcore.NodeInfo) (neofsapiclient
226232
return cInternal, nil
227233
}
228234

229-
func (c ClientCache) ListSG(dst *storagegroup2.SearchSGDst, prm storagegroup2.SearchSGPrm) error {
230-
cli, err := c.getWrappedClient(prm.NodeInfo)
235+
func (c ClientCache) ListSG(ctx context.Context, node clientcore.NodeInfo, cnr cid.ID, notExpiredAt uint64) ([]oid.ID, error) {
236+
cli, err := c.Get(node)
231237
if err != nil {
232-
return fmt.Errorf("could not get API client from cache")
238+
return nil, fmt.Errorf("could not get API client from cache: %w", err)
233239
}
234240

235-
var cliPrm neofsapiclient.SearchSGPrm
236-
237-
cliPrm.SetContext(prm.Context)
238-
cliPrm.SetContainerID(prm.Container)
239-
240-
res, err := cli.SearchSG(cliPrm)
241-
if err != nil {
242-
return err
241+
fs := storagegroupsvc.SearchQuery()
242+
fs.AddFilter(object.AttributeExpirationEpoch, strconv.FormatUint(notExpiredAt, 10), object.MatchNumGE)
243+
var opts client.SearchObjectsOptions
244+
var cursor string
245+
var next []client.SearchResultItem
246+
var res []oid.ID
247+
for {
248+
next, cursor, err = cli.SearchObjects(ctx, cnr, fs, nil, cursor, (*neofsecdsa.Signer)(c.key), opts)
249+
if err != nil {
250+
return nil, fmt.Errorf("search objects RPC: %w", err)
251+
}
252+
res = slices.Grow(res, len(res)+len(next))
253+
for i := range next {
254+
res = append(res, next[i].ID)
255+
}
256+
if cursor == "" {
257+
return res, nil
258+
}
243259
}
244-
245-
dst.Objects = res.IDList()
246-
247-
return nil
248260
}

pkg/network/cache/clients.go

+11
Original file line numberDiff line numberDiff line change
@@ -394,6 +394,17 @@ func (x *connections) AnnounceIntermediateTrust(ctx context.Context, epoch uint6
394394
})
395395
}
396396

397+
func (x *connections) SearchObjects(ctx context.Context, cnr cid.ID, fs object.SearchFilters, attrs []string, cursor string,
398+
signer neofscrypto.Signer, opts client.SearchObjectsOptions) ([]client.SearchResultItem, string, error) {
399+
var resItems []client.SearchResultItem
400+
var resCursor string
401+
return resItems, resCursor, x.forEach(ctx, func(ctx context.Context, c *client.Client) error {
402+
var err error
403+
resItems, resCursor, err = c.SearchObjects(ctx, cnr, fs, attrs, cursor, signer, opts)
404+
return err
405+
})
406+
}
407+
397408
func isTempError(err error) bool {
398409
st, ok := status.FromError(err)
399410
return ok && st.Code() == codes.Unavailable

pkg/network/cache/multi.go

+10
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,16 @@ func (x *multiClient) ObjectSearchInit(ctx context.Context, containerID cid.ID,
322322
return
323323
}
324324

325+
func (x *multiClient) SearchObjects(ctx context.Context, cnr cid.ID, fs objectSDK.SearchFilters, attrs []string, cursor string,
326+
signer neofscrypto.Signer, opts client.SearchObjectsOptions) ([]client.SearchResultItem, string, error) {
327+
var res []client.SearchResultItem
328+
return res, cursor, x.iterateClients(ctx, func(c clientcore.Client) error {
329+
var err error
330+
res, cursor, err = c.SearchObjects(ctx, cnr, fs, attrs, cursor, signer, opts)
331+
return err
332+
})
333+
}
334+
325335
func (x *multiClient) AnnounceLocalTrust(ctx context.Context, epoch uint64, trusts []reputationSDK.Trust, prm client.PrmAnnounceLocalTrust) error {
326336
return x.iterateClients(ctx, func(c clientcore.Client) error {
327337
return c.AnnounceLocalTrust(ctx, epoch, trusts, prm)

0 commit comments

Comments
 (0)