Skip to content

Commit 5e77c4c

Browse files
Parallelize WC flush process (#3179)
2 parents fc63648 + 77cca37 commit 5e77c4c

File tree

6 files changed

+193
-55
lines changed

6 files changed

+193
-55
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ Changelog for NeoFS Node
4040
- `ObjectService.GetRange(Hash)` ops now handle zero ranges as full payload (#3071)
4141
- Add some GAS to system fee of notary transactions (#3176)
4242
- IR now applies sane limits to containers' storage policies recently added to the protocol (#3075)
43+
- Make flushing objects in write-cache in parallel (#3179)
4344

4445
### Removed
4546
- Drop creating new eacl tables with public keys (#3096)

pkg/local_object_storage/writecache/flush.go

+98-54
Original file line numberDiff line numberDiff line change
@@ -18,31 +18,73 @@ import (
1818
const (
1919
// defaultFlushInterval is default time interval between successive flushes.
2020
defaultFlushInterval = 10 * time.Second
21+
// defaultWorkerCount is a default number of workers that flush objects.
22+
defaultWorkerCount = 20
2123
)
2224

2325
// runFlushLoop starts background workers which periodically flush objects to the blobstor.
2426
func (c *cache) runFlushLoop() {
27+
for i := range c.workersCount {
28+
c.wg.Add(1)
29+
go c.flushWorker(i)
30+
}
31+
2532
c.wg.Add(1)
26-
go func() {
27-
defer c.wg.Done()
28-
tick := time.NewTicker(defaultFlushInterval)
29-
for {
30-
select {
31-
case <-tick.C:
32-
c.modeMtx.RLock()
33-
if c.readOnly() {
34-
c.modeMtx.RUnlock()
35-
break
36-
}
33+
go c.flushScheduler()
34+
}
3735

38-
_ = c.flush(true)
36+
func (c *cache) flushScheduler() {
37+
defer c.wg.Done()
38+
ticker := time.NewTicker(defaultFlushInterval)
39+
defer ticker.Stop()
3940

41+
for {
42+
select {
43+
case <-ticker.C:
44+
c.modeMtx.RLock()
45+
if c.readOnly() {
4046
c.modeMtx.RUnlock()
41-
case <-c.closeCh:
47+
continue
48+
}
49+
c.modeMtx.RUnlock()
50+
err := c.fsTree.IterateAddresses(func(addr oid.Address) error {
51+
select {
52+
case c.flushCh <- addr:
53+
return nil
54+
case <-c.closeCh:
55+
return errors.New("closed during iteration")
56+
}
57+
}, true)
58+
if err != nil {
59+
c.log.Warn("iteration failed", zap.Error(err))
60+
}
61+
case <-c.closeCh:
62+
return
63+
}
64+
}
65+
}
66+
67+
func (c *cache) flushWorker(id int) {
68+
defer c.wg.Done()
69+
for {
70+
select {
71+
case addr, ok := <-c.flushCh:
72+
if !ok {
4273
return
4374
}
75+
c.modeMtx.RLock()
76+
if c.readOnly() {
77+
c.modeMtx.RUnlock()
78+
continue
79+
}
80+
if err := c.flushSingle(addr, true); err != nil {
81+
c.log.Warn("flush failed", zap.Int("worker", id), zap.Error(err))
82+
}
83+
c.modeMtx.RUnlock()
84+
case <-c.closeCh:
85+
return
4486
}
45-
}()
87+
}
4688
}
4789

4890
func (c *cache) reportFlushError(msg string, addr string, err error) {
@@ -55,56 +97,52 @@ func (c *cache) reportFlushError(msg string, addr string, err error) {
5597
}
5698
}
5799

58-
func (c *cache) flush(ignoreErrors bool) error {
59-
var addrHandler = func(addr oid.Address) error {
60-
sAddr := addr.EncodeToString()
61-
62-
data, err := c.fsTree.GetBytes(addr)
63-
if err != nil {
64-
if errors.As(err, new(apistatus.ObjectNotFound)) {
65-
// an object can be removed b/w iterating over it
66-
// and reading its payload; not an error
67-
return nil
68-
}
100+
func (c *cache) flushSingle(addr oid.Address, ignoreErrors bool) error {
101+
sAddr := addr.EncodeToString()
69102

70-
c.reportFlushError("can't read a file", sAddr, err)
71-
if ignoreErrors {
72-
return nil
73-
}
74-
return err
103+
data, err := c.fsTree.GetBytes(addr)
104+
if err != nil {
105+
if errors.As(err, new(apistatus.ObjectNotFound)) {
106+
// an object can be removed b/w iterating over it
107+
// and reading its payload; not an error
108+
return nil
75109
}
76110

77-
var obj object.Object
78-
err = obj.Unmarshal(data)
79-
if err != nil {
80-
c.reportFlushError("can't unmarshal an object", sAddr, err)
81-
if ignoreErrors {
82-
return nil
83-
}
84-
return err
111+
c.reportFlushError("can't read a file", sAddr, err)
112+
if ignoreErrors {
113+
return nil
85114
}
115+
return err
116+
}
86117

87-
err = c.flushObject(&obj, data)
88-
if err != nil {
89-
return err
118+
var obj object.Object
119+
err = obj.Unmarshal(data)
120+
if err != nil {
121+
c.reportFlushError("can't unmarshal an object", sAddr, err)
122+
if ignoreErrors {
123+
return nil
90124
}
125+
return err
126+
}
91127

92-
err = c.fsTree.Delete(addr)
93-
if err != nil && !errors.As(err, new(apistatus.ObjectNotFound)) {
94-
c.log.Error("can't remove object from write-cache", zap.Error(err))
95-
} else if err == nil {
96-
storagelog.Write(c.log,
97-
storagelog.AddressField(addr),
98-
storagelog.StorageTypeField(wcStorageType),
99-
storagelog.OpField("DELETE"),
100-
)
101-
c.objCounters.Delete(addr)
102-
}
128+
err = c.flushObject(&obj, data)
129+
if err != nil {
130+
return err
131+
}
103132

104-
return nil
133+
err = c.fsTree.Delete(addr)
134+
if err != nil && !errors.As(err, new(apistatus.ObjectNotFound)) {
135+
c.log.Error("can't remove object from write-cache", zap.Error(err))
136+
} else if err == nil {
137+
storagelog.Write(c.log,
138+
storagelog.AddressField(addr),
139+
storagelog.StorageTypeField(wcStorageType),
140+
storagelog.OpField("DELETE"),
141+
)
142+
c.objCounters.Delete(addr)
105143
}
106144

107-
return c.fsTree.IterateAddresses(addrHandler, ignoreErrors)
145+
return nil
108146
}
109147

110148
// flushObject is used to write object directly to the main storage.
@@ -150,3 +188,9 @@ func (c *cache) Flush(ignoreErrors bool) error {
150188

151189
return c.flush(ignoreErrors)
152190
}
191+
192+
func (c *cache) flush(ignoreErrors bool) error {
193+
return c.fsTree.IterateAddresses(func(addr oid.Address) error {
194+
return c.flushSingle(addr, ignoreErrors)
195+
}, ignoreErrors)
196+
}

pkg/local_object_storage/writecache/flush_test.go

+75
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package writecache
22

33
import (
4+
"fmt"
45
"os"
56
"path/filepath"
67
"sync/atomic"
78
"testing"
9+
"time"
810

911
objectCore "github.com/nspcc-dev/neofs-node/pkg/core/object"
1012
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
@@ -204,6 +206,79 @@ func TestFlush(t *testing.T) {
204206
})
205207
}
206208

209+
func TestFlushPerformance(t *testing.T) {
210+
objectCounts := []int{100, 1000}
211+
workerCounts := []int{1, 4, 16}
212+
213+
for _, objCount := range objectCounts {
214+
for _, workerCount := range workerCounts {
215+
t.Run(fmt.Sprintf("objects=%d_workers=%d", objCount, workerCount), func(t *testing.T) {
216+
t.Parallel()
217+
wc, bs, mb := newCache(t, WithFlushWorkersCount(workerCount))
218+
defer wc.Close()
219+
220+
objects := make([]objectPair, objCount)
221+
for i := range objects {
222+
objects[i] = putObject(t, wc, 1+(i%2)*1024)
223+
}
224+
for _, obj := range objects {
225+
_, err := wc.Get(obj.addr)
226+
require.NoError(t, err)
227+
}
228+
require.NoError(t, wc.Close())
229+
230+
require.NoError(t, bs.SetMode(mode.ReadWrite))
231+
require.NoError(t, mb.SetMode(mode.ReadWrite))
232+
233+
require.NoError(t, wc.Open(false))
234+
require.NoError(t, wc.Init())
235+
start := time.Now()
236+
waitForFlush(t, wc, objects)
237+
duration := time.Since(start)
238+
239+
for i := range objects {
240+
id, err := mb.StorageID(objects[i].addr)
241+
require.NoError(t, err)
242+
243+
res, err := bs.Get(objects[i].addr, id)
244+
require.NoError(t, err)
245+
require.Equal(t, objects[i].obj, res)
246+
}
247+
require.Equal(t, uint64(0), wc.(*cache).objCounters.Size())
248+
for _, obj := range objects {
249+
_, err := wc.Get(obj.addr)
250+
require.Error(t, err)
251+
}
252+
253+
t.Logf("Flush took %v for %d objects with %d workers", duration-defaultFlushInterval, objCount, workerCount)
254+
})
255+
}
256+
}
257+
}
258+
259+
func waitForFlush(t *testing.T, wc Cache, objects []objectPair) {
260+
timeout := time.After(60 * time.Second)
261+
ticker := time.NewTicker(100 * time.Millisecond)
262+
defer ticker.Stop()
263+
264+
for {
265+
select {
266+
case <-ticker.C:
267+
cachedCount := 0
268+
for _, obj := range objects {
269+
if _, err := wc.Get(obj.addr); err == nil {
270+
cachedCount++
271+
}
272+
}
273+
if cachedCount == 0 {
274+
return
275+
}
276+
case <-timeout:
277+
t.Fatalf("Flush did not complete within 60 seconds, %d objects still cached", len(objects))
278+
}
279+
}
280+
}
281+
207282
func putObject(t *testing.T, c Cache, size int) objectPair {
208283
obj, data := newObject(t, size)
209284

pkg/local_object_storage/writecache/options.go

+11
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ type options struct {
3838
// maxCacheSize is the maximum total size of all objects saved in cache.
3939
// 1 GiB by default.
4040
maxCacheSize uint64
41+
// workersCount is the number of workers flushing objects in parallel.
42+
workersCount int
4143
// objCounters contains object list along with sizes and overall size of cache.
4244
objCounters counters
4345
// noSync is true iff FSTree allows unsynchronized writes.
@@ -83,6 +85,15 @@ func WithMaxObjectSize(sz uint64) Option {
8385
}
8486
}
8587

88+
// WithFlushWorkersCount sets number of workers to flushing objects in parallel.
89+
func WithFlushWorkersCount(c int) Option {
90+
return func(o *options) {
91+
if c > 0 {
92+
o.workersCount = c
93+
}
94+
}
95+
}
96+
8697
// WithMaxCacheSize sets maximum write-cache size in bytes.
8798
func WithMaxCacheSize(sz uint64) Option {
8899
return func(o *options) {

pkg/local_object_storage/writecache/state.go

+3
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ func (x *counters) Size() uint64 {
3737

3838
func (c *cache) initCounters() error {
3939
var sizeHandler = func(addr oid.Address, size uint64) error {
40+
if _, ok := c.objCounters.objMap[addr]; ok {
41+
return nil
42+
}
4043
c.objCounters.Add(addr, size)
4144
return nil
4245
}

pkg/local_object_storage/writecache/writecache.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ type cache struct {
6363
// whether object should be compressed.
6464
compressFlags map[string]struct{}
6565

66+
// flushCh is a channel with objects to flush.
67+
flushCh chan oid.Address
6668
// closeCh is close channel.
6769
closeCh chan struct{}
6870
// wg is a wait group for flush workers.
@@ -92,7 +94,8 @@ var (
9294
// New creates new writecache instance.
9395
func New(opts ...Option) Cache {
9496
c := &cache{
95-
mode: mode.ReadWrite,
97+
flushCh: make(chan oid.Address),
98+
mode: mode.ReadWrite,
9699

97100
compressFlags: make(map[string]struct{}),
98101
options: options{
@@ -102,6 +105,7 @@ func New(opts ...Option) Cache {
102105
objCounters: counters{
103106
objMap: make(map[oid.Address]uint64),
104107
},
108+
workersCount: defaultWorkerCount,
105109
},
106110
}
107111

0 commit comments

Comments
 (0)