@@ -5,8 +5,10 @@ import (
5
5
"errors"
6
6
"fmt"
7
7
"maps"
8
+ "math/big"
8
9
"os"
9
10
"path"
11
+ "strconv"
10
12
"sync"
11
13
12
14
"github.com/nspcc-dev/neo-go/pkg/core/mpt"
@@ -16,6 +18,8 @@ import (
16
18
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
17
19
objectsdk "github.com/nspcc-dev/neofs-sdk-go/object"
18
20
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
21
+ "github.com/nspcc-dev/neofs-sdk-go/user"
22
+ "github.com/nspcc-dev/neofs-sdk-go/version"
19
23
)
20
24
21
25
type containerStorage struct {
@@ -44,46 +48,110 @@ func (s *containerStorage) drop() error {
44
48
return nil
45
49
}
46
50
47
- func (s * containerStorage ) putObject (e objEvent ) error {
51
+ func (s * containerStorage ) putObject (e objEvent , h objectsdk. Object ) error {
48
52
s .m .Lock ()
49
53
defer s .m .Unlock ()
50
54
51
55
newKVs := make (map [string ][]byte )
56
+ commonKVs := make (map [string ][]byte ) // for MPT and raw index storage
52
57
commsuffix := e .oID [:]
53
58
54
59
// batching that is implemented for MPT ignores key's first byte
55
60
56
- newKVs [string (append ([]byte {0 , oidIndex }, commsuffix ... ))] = []byte {}
61
+ commonKVs [string (append ([]byte {0 , oidIndex }, commsuffix ... ))] = []byte {}
62
+ if len (e .deletedObjects ) > 0 {
63
+ commonKVs [string (append ([]byte {0 , deletedIndex }, commsuffix ... ))] = e .deletedObjects
64
+ maps .Copy (commonKVs , deleteObjectsOps (s .db , e .deletedObjects ))
65
+ }
66
+ if len (e .lockedObjects ) > 0 {
67
+ commonKVs [string (append ([]byte {0 , lockedIndex }, commsuffix ... ))] = e .lockedObjects
68
+ }
57
69
newKVs [string (append ([]byte {0 , sizeIndex }, commsuffix ... ))] = e .size .Bytes ()
58
70
if len (e .firstObject ) > 0 {
59
71
newKVs [string (append ([]byte {0 , firstPartIndex }, commsuffix ... ))] = e .firstObject
60
72
}
61
73
if len (e .prevObject ) > 0 {
62
74
newKVs [string (append ([]byte {0 , previousPartIndex }, commsuffix ... ))] = e .prevObject
63
75
}
64
- if len (e .deletedObjects ) > 0 {
65
- newKVs [string (append ([]byte {0 , deletedIndex }, commsuffix ... ))] = e .deletedObjects
66
- maps .Copy (newKVs , deleteObjectsOps (s .db , e .deletedObjects ))
67
- }
68
- if len (e .lockedObjects ) > 0 {
69
- newKVs [string (append ([]byte {0 , lockedIndex }, commsuffix ... ))] = e .lockedObjects
70
- }
71
76
if e .typ != objectsdk .TypeRegular {
72
77
newKVs [string (append ([]byte {0 , typeIndex }, commsuffix ... ))] = []byte {byte (e .typ )}
73
78
}
74
79
75
80
if s .opsBatch == nil {
76
81
s .opsBatch = make (map [string ][]byte )
77
82
}
83
+ maps .Copy (s .opsBatch , commonKVs )
78
84
maps .Copy (s .opsBatch , newKVs )
79
85
80
- err := s .db .PutChangeSet (mptToStoreBatch (newKVs ), nil )
86
+ fullIndex := mptToStoreBatch (commonKVs )
87
+ err := fillObjectIndex (fullIndex , h )
88
+ if err != nil {
89
+ return fmt .Errorf ("filling full header index: %w" , err )
90
+ }
91
+
92
+ err = s .db .PutChangeSet (fullIndex , nil )
81
93
if err != nil {
82
94
return fmt .Errorf ("put MPT KVs to the raw storage manually: %w" , err )
83
95
}
84
96
85
- // TODO: receive full object header and store its non-MPT parts in
86
- // persistent object storage
97
+ return nil
98
+ }
99
+
100
+ const binPropertyMarker = "1" // ROOT, PHY, etc.
101
+
102
+ func fillObjectIndex (batch map [string ][]byte , h objectsdk.Object ) error {
103
+ id := h .GetID ()
104
+ typ := h .Type ()
105
+ owner := h .Owner ()
106
+ if owner .IsZero () {
107
+ return fmt .Errorf ("invalid owner: %w" , user .ErrZeroID )
108
+ }
109
+ creationEpoch := h .CreationEpoch ()
110
+ pSize := h .PayloadSize ()
111
+ fPart := h .GetFirstID ()
112
+ prevPart := h .GetPreviousID ()
113
+ hasParent := h .Parent () != nil
114
+ phy := hasParent || (fPart .IsZero () && prevPart .IsZero ())
115
+ pldHash , ok := h .PayloadChecksum ()
116
+ if ! ok {
117
+ return errors .New ("missing payload checksum" )
118
+ }
119
+ var ver version.Version
120
+ if v := h .Version (); v != nil {
121
+ ver = * v
122
+ }
123
+ var pldHmmHash []byte
124
+ if hash , ok := h .PayloadHomomorphicHash (); ok {
125
+ pldHmmHash = hash .Value ()
126
+ }
127
+
128
+ putPlainAttribute (batch , id , objectsdk .FilterVersion , ver .String ())
129
+ putPlainAttribute (batch , id , objectsdk .FilterOwnerID , string (owner [:]))
130
+ putPlainAttribute (batch , id , objectsdk .FilterType , typ .String ())
131
+ putIntAttribute (batch , id , objectsdk .FilterCreationEpoch , strconv .FormatUint (creationEpoch , 10 ), new (big.Int ).SetUint64 (creationEpoch ))
132
+ putIntAttribute (batch , id , objectsdk .FilterPayloadSize , strconv .FormatUint (pSize , 10 ), new (big.Int ).SetUint64 (pSize ))
133
+ putPlainAttribute (batch , id , objectsdk .FilterPayloadChecksum , string (pldHash .Value ()))
134
+ putPlainAttribute (batch , id , objectsdk .FilterPayloadHomomorphicHash , string (pldHmmHash ))
135
+ if ! fPart .IsZero () {
136
+ putPlainAttribute (batch , id , objectsdk .FilterFirstSplitObject , string (fPart [:]))
137
+ }
138
+ if ! prevPart .IsZero () {
139
+ putPlainAttribute (batch , id , objectsdk .FilterParentID , string (prevPart [:]))
140
+ }
141
+ if ! hasParent && typ == objectsdk .TypeRegular {
142
+ putPlainAttribute (batch , id , objectsdk .FilterRoot , binPropertyMarker )
143
+ }
144
+ if phy {
145
+ putPlainAttribute (batch , id , objectsdk .FilterPhysical , binPropertyMarker )
146
+ }
147
+ for _ , a := range h .Attributes () {
148
+ ak , av := a .Key (), a .Value ()
149
+ if n , isInt := parseInt (av ); isInt {
150
+ putIntAttribute (batch , id , ak , av , n )
151
+ } else {
152
+ putPlainAttribute (batch , id , ak , av )
153
+ }
154
+ }
87
155
88
156
return nil
89
157
}
@@ -158,3 +226,64 @@ func storageForContainer(rootPath string, cID cid.ID) (*containerStorage, error)
158
226
db : st ,
159
227
}, nil
160
228
}
229
+
230
+ // object attribute key and value separator used in DB.
231
+ var attributeDelimiter = []byte {0x00 }
232
+
233
+ const (
234
+ intValLen = 33 // prefix byte for sign + fixed256 in metaPrefixAttrIDInt
235
+ attributeDelimiterLen = 1
236
+ )
237
+
238
+ func parseInt (s string ) (* big.Int , bool ) {
239
+ return new (big.Int ).SetString (s , 10 )
240
+ }
241
+
242
+ func putPlainAttribute (batch map [string ][]byte , id oid.ID , k , v string ) {
243
+ resKey := make ([]byte , 0 , 1 + 2 * attributeDelimiterLen + oid .Size + len (k )+ len (v ))
244
+
245
+ // PREFIX_OID_ATTR_DELIM_VAL
246
+ resKey = append (resKey , oidToAttrIndex )
247
+ resKey = append (resKey , id [:]... )
248
+ resKey = append (resKey , k ... )
249
+ resKey = append (resKey , attributeDelimiter ... )
250
+ resKey = append (resKey , v ... )
251
+
252
+ batch [string (resKey )] = []byte {}
253
+ resKey = resKey [:0 ]
254
+
255
+ // PREFIX_ATTR_DELIM_VAL_DELIM_OID
256
+ resKey = append (resKey , attrToPlainIndex )
257
+ resKey = append (resKey , k ... )
258
+ resKey = append (resKey , attributeDelimiter ... )
259
+ resKey = append (resKey , v ... )
260
+ resKey = append (resKey , attributeDelimiter ... )
261
+ resKey = append (resKey , id [:]... )
262
+ }
263
+
264
+ func putIntAttribute (batch map [string ][]byte , id oid.ID , k , vRaw string , vParsed * big.Int ) {
265
+ putPlainAttribute (batch , id , k , vRaw )
266
+
267
+ resKey := make ([]byte , 0 , 1 + len (k )+ attributeDelimiterLen + intValLen + oid .Size )
268
+
269
+ // PREFIX_ATTR_DELIM_VAL_OID
270
+ resKey = append (resKey , attrToIntIndex )
271
+ resKey = append (resKey , k ... )
272
+ resKey = append (resKey , attributeDelimiter ... )
273
+
274
+ neg := vParsed .Sign () < 0
275
+ if neg {
276
+ resKey = append (resKey , 0 )
277
+ } else {
278
+ resKey = append (resKey , 1 )
279
+ }
280
+ vParsed .FillBytes (resKey [len (resKey ) : len (resKey )+ intValLen ])
281
+ if neg {
282
+ for i := range resKey [1 :] {
283
+ resKey [1 + i ] = ^ resKey [1 + i ]
284
+ }
285
+ }
286
+
287
+ resKey = append (resKey , id [:]... )
288
+ batch [string (resKey )] = []byte {}
289
+ }
0 commit comments