From 1ac26dbc55e7b5e9b9823c01f13e1ec2fb41fa8a Mon Sep 17 00:00:00 2001 From: Andrey Butusov Date: Fri, 18 Apr 2025 17:35:43 +0300 Subject: [PATCH] services/object: use SearchV2 for search objects Closes #3143. Signed-off-by: Andrey Butusov --- CHANGELOG.md | 1 + cmd/neofs-node/object.go | 51 ++++--- pkg/services/object/server.go | 146 ++++++++++--------- pkg/services/object/tombstone/verify.go | 42 ++---- pkg/services/object/tombstone/verify_test.go | 17 ++- 5 files changed, 134 insertions(+), 123 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8425ebc43a..699b6593ed 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -48,6 +48,7 @@ Changelog for NeoFS Node - Storage Nodes do not accept REPLICATE with a header that exceeds the limit (#3297) - Search API is served from SearchV2 indexes now (#3316) - Blobstor can be of exactly one type, with no substorages (#3330) +- SN uses SearchV2 to verify tombstones (#3312) ### Removed - SN `apiclient.allow_external` config (#3235) diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index b0d2d35ccb..7222b668fe 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -40,8 +40,10 @@ import ( objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" protoobject "github.com/nspcc-dev/neofs-sdk-go/proto/object" + protosession "github.com/nspcc-dev/neofs-sdk-go/proto/session" apireputation "github.com/nspcc-dev/neofs-sdk-go/reputation" "github.com/nspcc-dev/neofs-sdk-go/user" + "github.com/nspcc-dev/neofs-sdk-go/version" "go.uber.org/zap" ) @@ -244,6 +246,7 @@ func initObjectService(c *cfg) { mNumber, err := c.shared.basics.cli.MagicNumber() fatalOnErr(err) + os := &objectSource{get: sGet} sPut := putsvc.NewService(&transport{clients: putConstructor}, c, putsvc.WithNetworkMagic(mNumber), putsvc.WithKeyStorage(keyStorage), @@ -257,7 +260,7 @@ func initObjectService(c *cfg) { putsvc.WithWorkerPools(c.cfgObject.pool.putRemote, c.cfgObject.pool.putLocal), putsvc.WithLogger(c.log), putsvc.WithSplitChainVerifier(split.NewVerifier(sGet)), - putsvc.WithTombstoneVerifier(tombstone.NewVerifier(objectSource{sGet, sSearch})), + putsvc.WithTombstoneVerifier(tombstone.NewVerifier(os)), ) sDelete := deletesvc.New( @@ -314,6 +317,7 @@ func initObjectService(c *cfg) { keys: keyStorage, } server := objectService.New(objSvc, mNumber, fsChain, storage, c.metaService, c.shared.basics.key.PrivateKey, c.metricsCollector, aclChecker, aclSvc, coreConstructor) + os.server = server for _, srv := range c.cfgGRPC.servers { protoobject.RegisterObjectServiceServer(srv, server) @@ -639,16 +643,9 @@ func (x storageForObjectService) GetSessionPrivateKey(usr user.ID, uid uuid.UUID type objectSource struct { get *getsvc.Service - search *searchsvc.Service -} - -type searchWriter struct { - ids []oid.ID -} - -func (w *searchWriter) WriteIDs(ids []oid.ID) error { - w.ids = append(w.ids, ids...) - return nil + server interface { + ProcessSearch(ctx context.Context, req *protoobject.SearchV2Request) ([]client.SearchResultItem, []byte, error) + } } func (o objectSource) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) { @@ -664,20 +661,30 @@ func (o objectSource) Head(ctx context.Context, addr oid.Address) (*objectSDK.Ob return hw.h, err } -func (o objectSource) Search(ctx context.Context, cnr cid.ID, filters objectSDK.SearchFilters) ([]oid.ID, error) { - var sw searchWriter - - var sPrm searchsvc.Prm - sPrm.SetWriter(&sw) - sPrm.WithSearchFilters(filters) - sPrm.WithContainerID(cnr) - - err := o.search.Search(ctx, sPrm) +func (o objectSource) SearchOne(ctx context.Context, cnr cid.ID, filters objectSDK.SearchFilters) (oid.ID, error) { + var id oid.ID + res, _, err := o.server.ProcessSearch(ctx, &protoobject.SearchV2Request{ + Body: &protoobject.SearchV2Request_Body{ + ContainerId: cnr.ProtoMessage(), + Version: 1, + Filters: filters.ProtoMessage(), + Cursor: "", + Count: 1, + Attributes: nil, + }, + MetaHeader: &protosession.RequestMetaHeader{ + Version: version.Current().ProtoMessage(), + Ttl: 2, + }, + }) if err != nil { - return nil, err + return id, err } - return sw.ids, nil + if len(res) == 1 { + return res[0].ID, nil + } + return id, nil } // IsLocalNodePublicKey checks whether given binary-encoded public key is diff --git a/pkg/services/object/server.go b/pkg/services/object/server.go index 27d332e1e8..dd840d1343 100644 --- a/pkg/services/object/server.go +++ b/pkg/services/object/server.go @@ -177,7 +177,11 @@ const ( maxObjAddrRespAmount = maxRespDataChunkSize / addrMsgSize // each address is about 72 bytes ) -type server struct { +// Server represents Object Service server that provides object manipulation +// operations including Get, Put, Head, Range, Delete, Search, and Replicate. +// The server enforces access control, verifies requests, and handles data storage +// and retrieval operations. +type Server struct { handlers Handlers fsChain FSChain storage Storage @@ -192,13 +196,13 @@ type server struct { } // New provides protoobject.ObjectServiceServer for the given parameters. -func New(hs Handlers, magicNumber uint32, fsChain FSChain, st Storage, metaSvc *metasvc.Meta, signer ecdsa.PrivateKey, m MetricCollector, ac aclsvc.ACLChecker, rp ACLInfoExtractor, cs searchsvc.ClientConstructor) protoobject.ObjectServiceServer { +func New(hs Handlers, magicNumber uint32, fsChain FSChain, st Storage, metaSvc *metasvc.Meta, signer ecdsa.PrivateKey, m MetricCollector, ac aclsvc.ACLChecker, rp ACLInfoExtractor, cs searchsvc.ClientConstructor) *Server { // TODO: configurable capacity sp, err := ants.NewPool(100, ants.WithNonblocking(true)) if err != nil { panic(fmt.Errorf("create ants pool: %w", err)) // fails on invalid input only } - return &server{ + return &Server{ handlers: hs, fsChain: fsChain, storage: st, @@ -213,7 +217,7 @@ func New(hs Handlers, magicNumber uint32, fsChain FSChain, st Storage, metaSvc * } } -func (s *server) pushOpExecResult(op stat.Method, err error, startedAt time.Time) { +func (s *Server) pushOpExecResult(op stat.Method, err error, startedAt time.Time) { s.metrics.HandleOpExecResult(op, err == nil, time.Since(startedAt)) } @@ -221,7 +225,7 @@ func newCurrentProtoVersionMessage() *refs.Version { return version.Current().ProtoMessage() } -func (s *server) makeResponseMetaHeader(st *protostatus.Status) *protosession.ResponseMetaHeader { +func (s *Server) makeResponseMetaHeader(st *protostatus.Status) *protosession.ResponseMetaHeader { return &protosession.ResponseMetaHeader{ Version: newCurrentProtoVersionMessage(), Epoch: s.fsChain.CurrentEpoch(), @@ -229,12 +233,12 @@ func (s *server) makeResponseMetaHeader(st *protostatus.Status) *protosession.Re } } -func (s *server) sendPutResponse(stream protoobject.ObjectService_PutServer, resp *protoobject.PutResponse) error { +func (s *Server) sendPutResponse(stream protoobject.ObjectService_PutServer, resp *protoobject.PutResponse) error { resp.VerifyHeader = util.SignResponse(&s.signer, resp) return stream.SendAndClose(resp) } -func (s *server) sendStatusPutResponse(stream protoobject.ObjectService_PutServer, err error) error { +func (s *Server) sendStatusPutResponse(stream protoobject.ObjectService_PutServer, err error) error { return s.sendPutResponse(stream, &protoobject.PutResponse{ MetaHeader: s.makeResponseMetaHeader(util.ToStatus(err)), }) @@ -405,7 +409,7 @@ func (x *putStream) close() (*protoobject.PutResponse, error) { }, nil } -func (s *server) Put(gStream protoobject.ObjectService_PutServer) error { +func (s *Server) Put(gStream protoobject.ObjectService_PutServer) error { t := time.Now() stream, err := s.handlers.Put(gStream.Context()) @@ -472,12 +476,12 @@ func (s *server) Put(gStream protoobject.ObjectService_PutServer) error { } } -func (s *server) signDeleteResponse(resp *protoobject.DeleteResponse) *protoobject.DeleteResponse { +func (s *Server) signDeleteResponse(resp *protoobject.DeleteResponse) *protoobject.DeleteResponse { resp.VerifyHeader = util.SignResponse(&s.signer, resp) return resp } -func (s *server) makeStatusDeleteResponse(err error) *protoobject.DeleteResponse { +func (s *Server) makeStatusDeleteResponse(err error) *protoobject.DeleteResponse { return s.signDeleteResponse(&protoobject.DeleteResponse{ MetaHeader: s.makeResponseMetaHeader(util.ToStatus(err)), }) @@ -489,7 +493,7 @@ func (x *deleteResponseBody) SetAddress(addr oid.Address) { x.Tombstone = addr.ProtoMessage() } -func (s *server) Delete(ctx context.Context, req *protoobject.DeleteRequest) (*protoobject.DeleteResponse, error) { +func (s *Server) Delete(ctx context.Context, req *protoobject.DeleteRequest) (*protoobject.DeleteResponse, error) { var ( err error t = time.Now() @@ -547,12 +551,12 @@ func (s *server) Delete(ctx context.Context, req *protoobject.DeleteRequest) (*p return s.signDeleteResponse(&protoobject.DeleteResponse{Body: &rb}), nil } -func (s *server) signHeadResponse(resp *protoobject.HeadResponse) *protoobject.HeadResponse { +func (s *Server) signHeadResponse(resp *protoobject.HeadResponse) *protoobject.HeadResponse { resp.VerifyHeader = util.SignResponse(&s.signer, resp) return resp } -func (s *server) makeStatusHeadResponse(err error) *protoobject.HeadResponse { +func (s *Server) makeStatusHeadResponse(err error) *protoobject.HeadResponse { var splitErr *object.SplitInfoError if errors.As(err, &splitErr) { return s.signHeadResponse(&protoobject.HeadResponse{ @@ -568,7 +572,7 @@ func (s *server) makeStatusHeadResponse(err error) *protoobject.HeadResponse { }) } -func (s *server) Head(ctx context.Context, req *protoobject.HeadRequest) (*protoobject.HeadResponse, error) { +func (s *Server) Head(ctx context.Context, req *protoobject.HeadRequest) (*protoobject.HeadResponse, error) { var ( err error t = time.Now() @@ -794,19 +798,19 @@ func getHeaderFromRemoteNode(ctx context.Context, conn *grpc.ClientConn, nodePub return obj, nil } -func (s *server) signHashResponse(resp *protoobject.GetRangeHashResponse) *protoobject.GetRangeHashResponse { +func (s *Server) signHashResponse(resp *protoobject.GetRangeHashResponse) *protoobject.GetRangeHashResponse { resp.VerifyHeader = util.SignResponse(&s.signer, resp) return resp } -func (s *server) makeStatusHashResponse(err error) *protoobject.GetRangeHashResponse { +func (s *Server) makeStatusHashResponse(err error) *protoobject.GetRangeHashResponse { return s.signHashResponse(&protoobject.GetRangeHashResponse{ MetaHeader: s.makeResponseMetaHeader(util.ToStatus(err)), }) } // GetRangeHash converts gRPC GetRangeHashRequest message and passes it to internal Object service. -func (s *server) GetRangeHash(ctx context.Context, req *protoobject.GetRangeHashRequest) (*protoobject.GetRangeHashResponse, error) { +func (s *Server) GetRangeHash(ctx context.Context, req *protoobject.GetRangeHashRequest) (*protoobject.GetRangeHashResponse, error) { var ( err error t = time.Now() @@ -958,12 +962,12 @@ func getHashesFromRemoteNode(ctx context.Context, conn *grpc.ClientConn, nodePub return resp.GetBody().GetHashList(), nil } -func (s *server) sendGetResponse(stream protoobject.ObjectService_GetServer, resp *protoobject.GetResponse) error { +func (s *Server) sendGetResponse(stream protoobject.ObjectService_GetServer, resp *protoobject.GetResponse) error { resp.VerifyHeader = util.SignResponse(&s.signer, resp) return stream.Send(resp) } -func (s *server) sendStatusGetResponse(stream protoobject.ObjectService_GetServer, err error) error { +func (s *Server) sendStatusGetResponse(stream protoobject.ObjectService_GetServer, err error) error { var splitErr *object.SplitInfoError if errors.As(err, &splitErr) { return s.sendGetResponse(stream, &protoobject.GetResponse{ @@ -981,7 +985,7 @@ func (s *server) sendStatusGetResponse(stream protoobject.ObjectService_GetServe type getStream struct { base protoobject.ObjectService_GetServer - srv *server + srv *Server reqInfo aclsvc.RequestInfo } @@ -1019,7 +1023,7 @@ func (s *getStream) WriteChunk(chunk []byte) error { return nil } -func (s *server) Get(req *protoobject.GetRequest, gStream protoobject.ObjectService_GetServer) error { +func (s *Server) Get(req *protoobject.GetRequest, gStream protoobject.ObjectService_GetServer) error { var ( err error t = time.Now() @@ -1211,12 +1215,12 @@ func continueGetFromRemoteNode(ctx context.Context, conn *grpc.ClientConn, nodeP } } -func (s *server) sendRangeResponse(stream protoobject.ObjectService_GetRangeServer, resp *protoobject.GetRangeResponse) error { +func (s *Server) sendRangeResponse(stream protoobject.ObjectService_GetRangeServer, resp *protoobject.GetRangeResponse) error { resp.VerifyHeader = util.SignResponse(&s.signer, resp) return stream.Send(resp) } -func (s *server) sendStatusRangeResponse(stream protoobject.ObjectService_GetRangeServer, err error) error { +func (s *Server) sendStatusRangeResponse(stream protoobject.ObjectService_GetRangeServer, err error) error { var splitErr *object.SplitInfoError if errors.As(err, &splitErr) { return s.sendRangeResponse(stream, &protoobject.GetRangeResponse{ @@ -1234,7 +1238,7 @@ func (s *server) sendStatusRangeResponse(stream protoobject.ObjectService_GetRan type rangeStream struct { base protoobject.ObjectService_GetRangeServer - srv *server + srv *Server reqInfo aclsvc.RequestInfo } @@ -1259,7 +1263,7 @@ func (s *rangeStream) WriteChunk(chunk []byte) error { return nil } -func (s *server) GetRange(req *protoobject.GetRangeRequest, gStream protoobject.ObjectService_GetRangeServer) error { +func (s *Server) GetRange(req *protoobject.GetRangeRequest, gStream protoobject.ObjectService_GetRangeServer) error { var ( err error t = time.Now() @@ -1432,12 +1436,12 @@ func continueRangeFromRemoteNode(ctx context.Context, conn *grpc.ClientConn, nod } } -func (s *server) sendSearchResponse(stream protoobject.ObjectService_SearchServer, resp *protoobject.SearchResponse) error { +func (s *Server) sendSearchResponse(stream protoobject.ObjectService_SearchServer, resp *protoobject.SearchResponse) error { resp.VerifyHeader = util.SignResponse(&s.signer, resp) return stream.Send(resp) } -func (s *server) sendStatusSearchResponse(stream protoobject.ObjectService_SearchServer, err error) error { +func (s *Server) sendStatusSearchResponse(stream protoobject.ObjectService_SearchServer, err error) error { return s.sendSearchResponse(stream, &protoobject.SearchResponse{ MetaHeader: s.makeResponseMetaHeader(util.ToStatus(err)), }) @@ -1445,7 +1449,7 @@ func (s *server) sendStatusSearchResponse(stream protoobject.ObjectService_Searc type searchStream struct { base protoobject.ObjectService_SearchServer - srv *server + srv *Server reqInfo aclsvc.RequestInfo } @@ -1479,7 +1483,7 @@ func (s *searchStream) WriteIDs(ids []oid.ID) error { return nil } -func (s *server) Search(req *protoobject.SearchRequest, gStream protoobject.ObjectService_SearchServer) error { +func (s *Server) Search(req *protoobject.SearchRequest, gStream protoobject.ObjectService_SearchServer) error { var ( err error t = time.Now() @@ -1626,7 +1630,7 @@ func searchOnRemoteNode(ctx context.Context, conn *grpc.ClientConn, nodePub []by } // Replicate serves neo.fs.v2.object.ObjectService/Replicate RPC. -func (s *server) Replicate(_ context.Context, req *protoobject.ReplicateRequest) (*protoobject.ReplicateResponse, error) { +func (s *Server) Replicate(_ context.Context, req *protoobject.ReplicateRequest) (*protoobject.ReplicateResponse, error) { if req.Object == nil { return &protoobject.ReplicateResponse{Status: &protostatus.Status{ Code: codeInternal, Message: "binary object field is missing/empty", @@ -1795,18 +1799,18 @@ func (s *server) Replicate(_ context.Context, req *protoobject.ReplicateRequest) return resp, nil } -func (s *server) signSearchResponse(resp *protoobject.SearchV2Response) *protoobject.SearchV2Response { +func (s *Server) signSearchResponse(resp *protoobject.SearchV2Response) *protoobject.SearchV2Response { resp.VerifyHeader = util.SignResponse(&s.signer, resp) return resp } -func (s *server) makeStatusSearchResponse(err error) *protoobject.SearchV2Response { +func (s *Server) makeStatusSearchResponse(err error) *protoobject.SearchV2Response { return s.signSearchResponse(&protoobject.SearchV2Response{ MetaHeader: s.makeResponseMetaHeader(apistatus.FromError(err)), }) } -func (s *server) SearchV2(ctx context.Context, req *protoobject.SearchV2Request) (*protoobject.SearchV2Response, error) { +func (s *Server) SearchV2(ctx context.Context, req *protoobject.SearchV2Request) (*protoobject.SearchV2Response, error) { var ( err error t = time.Now() @@ -1858,7 +1862,7 @@ func verifySearchFilter(f *protoobject.SearchFilter) error { return nil } -func (s *server) processSearchRequest(ctx context.Context, req *protoobject.SearchV2Request) (*protoobject.SearchV2Response_Body, error) { +func (s *Server) processSearchRequest(ctx context.Context, req *protoobject.SearchV2Request) (*protoobject.SearchV2Response_Body, error) { body := req.GetBody() if body == nil { return nil, errors.New("missing body") @@ -1866,10 +1870,6 @@ func (s *server) processSearchRequest(ctx context.Context, req *protoobject.Sear if body.ContainerId == nil { return nil, errors.New("missing container ID") } - var cID cid.ID - if err := cID.FromProtoMessage(body.ContainerId); err != nil { - return nil, fmt.Errorf("invalid container ID: %w", err) - } if body.Version != 1 { return nil, errors.New("unsupported query version") } @@ -1900,25 +1900,53 @@ func (s *server) processSearchRequest(ctx context.Context, req *protoobject.Sear if len(body.Attributes) > 0 && (len(body.Filters) == 0 || body.Filters[0].Key != body.Attributes[0]) { return nil, errors.New("primary attribute must be filtered 1st") } + + res, newCursor, err := s.ProcessSearch(ctx, req) + if err != nil { + return nil, err + } + + resBody := &protoobject.SearchV2Response_Body{ + Result: make([]*protoobject.SearchV2Response_OIDWithMeta, len(res)), + } + for i := range res { + resBody.Result[i] = &protoobject.SearchV2Response_OIDWithMeta{ + Id: res[i].ID.ProtoMessage(), + Attributes: res[i].Attributes, + } + } + if newCursor != nil { + resBody.Cursor = base64.StdEncoding.EncodeToString(newCursor) + } + return resBody, nil +} + +func (s *Server) ProcessSearch(ctx context.Context, req *protoobject.SearchV2Request) ([]sdkclient.SearchResultItem, []byte, error) { ttl := req.MetaHeader.GetTtl() if ttl == 0 { - return nil, errors.New("zero TTL") + return nil, nil, errors.New("zero TTL") } + + body := req.GetBody() var fs object.SearchFilters if err := fs.FromProtoMessage(body.Filters); err != nil { - return nil, fmt.Errorf("invalid filters: %w", err) + return nil, nil, fmt.Errorf("invalid filters: %w", err) } ofs, cursor, err := objectcore.PreprocessSearchQuery(fs, body.Attributes, body.Cursor) if err != nil { if errors.Is(err, objectcore.ErrUnreachableQuery) { - return nil, nil + return nil, nil, nil } - return nil, err + return nil, nil, err } + var cID cid.ID + if err = cID.FromProtoMessage(body.ContainerId); err != nil { + return nil, nil, fmt.Errorf("invalid container ID: %w", err) + } cnr, err := s.fsChain.Get(cID) if err != nil { - return nil, fmt.Errorf("fetching container: %w", err) + return nil, nil, fmt.Errorf("fetching container: %w", err) } var handleWithMetaService bool const metaOnChainAttr = "__NEOFS__METAINFO_CONSISTENCY" @@ -1934,12 +1962,12 @@ func (s *server) processSearchRequest(ctx context.Context, req *protoobject.Sear switch { case ttl == 1: if res, newCursor, err = s.storage.SearchObjects(cID, ofs, body.Attributes, cursor, count); err != nil { - return nil, err + return nil, nil, err } case handleWithMetaService: res, newCursor, err = s.meta.Search(cID, ofs, body.Attributes, cursor, count) if err != nil { - return nil, err + return nil, nil, err } default: var signed bool @@ -1973,7 +2001,7 @@ func (s *server) processSearchRequest(ctx context.Context, req *protoobject.Sear } if !signed { req.MetaHeader = &protosession.RequestMetaHeader{Ttl: 1, Origin: req.MetaHeader} - if req.VerifyHeader, err = neofscrypto.SignRequestWithBuffer(neofsecdsa.Signer(s.signer), req, nil); err != nil { + if req.VerifyHeader, err = neofscrypto.SignRequestWithBuffer[*protoobject.SearchV2Request_Body](neofsecdsa.Signer(s.signer), req, nil); err != nil { resErr = fmt.Errorf("sign request: %w", err) return false } @@ -1995,44 +2023,32 @@ func (s *server) processSearchRequest(ctx context.Context, req *protoobject.Sear err = resErr } if err != nil { - return nil, err + return nil, nil, err } var ( firstAttr string firstFilter *object.SearchFilter ) if len(body.Attributes) > 0 { - firstAttr = body.Filters[0].Key + firstAttr = fs[0].Header() firstFilter = &ofs[0].SearchFilter } cmpInt := firstAttr != "" && objectcore.IsIntegerSearchOp(fs[0].Operation()) var more bool if res, more, err = objectcore.MergeSearchResults(count, firstAttr, cmpInt, sets, mores); err != nil { - return nil, fmt.Errorf("merge results from container nodes: %w", err) + return nil, nil, fmt.Errorf("merge results from container nodes: %w", err) } if more { if newCursor, err = objectcore.CalculateCursor(firstFilter, res[len(res)-1]); err != nil { - return nil, fmt.Errorf("recalculate cursor: %w", err) + return nil, nil, fmt.Errorf("recalculate cursor: %w", err) } } } - resBody := &protoobject.SearchV2Response_Body{ - Result: make([]*protoobject.SearchV2Response_OIDWithMeta, len(res)), - } - for i := range res { - resBody.Result[i] = &protoobject.SearchV2Response_OIDWithMeta{ - Id: res[i].ID.ProtoMessage(), - Attributes: res[i].Attributes, - } - } - if newCursor != nil { - resBody.Cursor = base64.StdEncoding.EncodeToString(newCursor) - } - return resBody, nil + return res, newCursor, nil } -func (s *server) searchOnRemoteNode(ctx context.Context, node sdknetmap.NodeInfo, req *protoobject.SearchV2Request) ([]sdkclient.SearchResultItem, bool, error) { +func (s *Server) searchOnRemoteNode(ctx context.Context, node sdknetmap.NodeInfo, req *protoobject.SearchV2Request) ([]sdkclient.SearchResultItem, bool, error) { // TODO: copy-pasted from old search implementation, consider deduplicating in // the client constructor var endpoints network.AddressGroup @@ -2127,7 +2143,7 @@ func objectFromMessage(gMsg *protoobject.Object) (*object.Object, error) { return obj, nil } -func (s *server) metaInfoSignature(o object.Object) ([]byte, error) { +func (s *Server) metaInfoSignature(o object.Object) ([]byte, error) { firstObj := o.GetFirstID() if o.HasParent() && firstObj.IsZero() { // object itself is the first one diff --git a/pkg/services/object/tombstone/verify.go b/pkg/services/object/tombstone/verify.go index 033f886681..b3d008b7b1 100644 --- a/pkg/services/object/tombstone/verify.go +++ b/pkg/services/object/tombstone/verify.go @@ -24,9 +24,10 @@ type ObjectSource interface { // for removal. Head(ctx context.Context, addr oid.Address) (*object.Object, error) - // Search returns objects that satisfy provided search filters and + // SearchOne returns objects ID that satisfy provided search filters with limit 1 and // any error that does not allow processing operation. - Search(ctx context.Context, cnr cid.ID, filter object.SearchFilters) ([]oid.ID, error) + // If an object is not found, it returns zero ID. + SearchOne(ctx context.Context, cnr cid.ID, filter object.SearchFilters) (oid.ID, error) } // Verifier implements [object.TombVerifier] interface. @@ -131,29 +132,15 @@ func (v *Verifier) verifyMember(ctx context.Context, cnr cid.ID, member oid.ID) func (v *Verifier) verifyV1Child(ctx context.Context, cnr cid.ID, sID object.SplitID) error { filters := object.SearchFilters{} filters.AddSplitIDFilter(object.MatchStringEqual, sID) + filters.AddPayloadSizeFilter(object.MatchStringEqual, 0) - ids, err := v.objs.Search(ctx, cnr, filters) + id, err := v.objs.SearchOne(ctx, cnr, filters) if err != nil { return fmt.Errorf("searching objects: %w", err) } - var addr oid.Address - addr.SetContainer(cnr) - - for _, child := range ids { - addr.SetObject(child) - - header, err := v.objs.Head(ctx, addr) - if err != nil { - if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) { // see similar call - return nil - } - return fmt.Errorf("heading %s object that was searched: %w", addr, err) - } - - if len(header.Children()) != 0 { - return fmt.Errorf("found link object %s", addr) - } + if !id.IsZero() { + return fmt.Errorf("found link object %s", id) } return nil @@ -164,20 +151,13 @@ func (v *Verifier) verifyV2Child(ctx context.Context, cnr cid.ID, firstObject oi filters.AddFirstSplitObjectFilter(object.MatchStringEqual, firstObject) filters.AddTypeFilter(object.MatchStringEqual, object.TypeLink) - ids, err := v.objs.Search(ctx, cnr, filters) + id, err := v.objs.SearchOne(ctx, cnr, filters) if err != nil { return fmt.Errorf("searching objects: %w", err) } - switch len(ids) { - case 0: - // no link object, child can be deleted - return nil - case 1: - return fmt.Errorf("found link object %s", ids[0]) - default: - // more than one link object somehow, sad but - // nothing can be done here - return errors.New("link object was found") + if !id.IsZero() { + return fmt.Errorf("found link object %s", id) } + return nil } diff --git a/pkg/services/object/tombstone/verify_test.go b/pkg/services/object/tombstone/verify_test.go index 8ae8e00107..38619bc4cb 100644 --- a/pkg/services/object/tombstone/verify_test.go +++ b/pkg/services/object/tombstone/verify_test.go @@ -31,13 +31,14 @@ func (t *testObjectSource) Head(_ context.Context, addr oid.Address) (*object.Ob return res.h, res.err } -func (t *testObjectSource) Search(_ context.Context, _ cid.ID, ff object.SearchFilters) ([]oid.ID, error) { +func (t *testObjectSource) SearchOne(_ context.Context, _ cid.ID, ff object.SearchFilters) (oid.ID, error) { + var id oid.ID f := ff[0] switch f.Header() { case object.FilterSplitID: if t.searchV1 == nil { - return nil, nil + return id, nil } var splitID object.SplitID @@ -46,10 +47,13 @@ func (t *testObjectSource) Search(_ context.Context, _ cid.ID, ff object.SearchF panic(err) } - return t.searchV1[splitID], nil + if len(t.searchV1[splitID]) == 1 { + return t.searchV1[splitID][0], nil + } + return id, nil case object.FilterFirstSplitObject: if t.searchV2 == nil { - return nil, nil + return id, nil } var firstObject oid.ID @@ -58,7 +62,10 @@ func (t *testObjectSource) Search(_ context.Context, _ cid.ID, ff object.SearchF panic(err) } - return t.searchV2[firstObject], nil + if len(t.searchV2[firstObject]) == 1 { + return t.searchV2[firstObject][0], nil + } + return id, nil default: panic("unexpected search call") }