Skip to content

Commit 75b6f5e

Browse files
committed
feat: support grid read write buffer
1 parent 28aa45e commit 75b6f5e

File tree

7 files changed

+286
-230
lines changed

7 files changed

+286
-230
lines changed

gridbuf/readbuf.go

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
// Copyright 2025 CloudWeGo Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package gridbuf
16+
17+
import (
18+
"errors"
19+
"sync"
20+
21+
"github.com/bytedance/gopkg/lang/mcache"
22+
)
23+
24+
var (
25+
errReadBufferNotEnough = errors.New("error grid read buffer not enough")
26+
readBufferPool = sync.Pool{
27+
New: func() interface{} {
28+
return &ReadBuffer{
29+
pool: make([][]byte, 0, 16),
30+
}
31+
},
32+
}
33+
)
34+
35+
type ReadBuffer struct {
36+
off int
37+
chunk []byte
38+
chunks [][]byte
39+
pool [][]byte
40+
}
41+
42+
func NewReadBuffer(bufs [][]byte) *ReadBuffer {
43+
rb := readBufferPool.Get().(*ReadBuffer)
44+
rb.chunk = bufs[0]
45+
rb.chunks = bufs[1:]
46+
return rb
47+
}
48+
49+
// ReadN read n bytes from chunk, if chunk is not enough, it will read from next chunks.
50+
//
51+
// MAKE SURE IT CAN BE INLINE:
52+
// `can inline (*XReadBuffer).ReadN with cost 80`
53+
func (b *ReadBuffer) ReadN(n int) (buf []byte) {
54+
buf = b.chunk[b.off:]
55+
if len(buf) < n {
56+
buf = b.readSlow(n)
57+
} else {
58+
b.off += n
59+
}
60+
return
61+
}
62+
63+
func (b *ReadBuffer) readSlow(n int) (buf []byte) {
64+
buf = mcache.Malloc(n)
65+
b.pool = append(b.pool, buf)
66+
var l, m int
67+
if len(b.chunk)-b.off > 0 {
68+
m = copy(buf[l:], b.chunk[b.off:])
69+
l += m
70+
}
71+
for l < n {
72+
if len(b.chunks) == 0 {
73+
panic(errReadBufferNotEnough.Error())
74+
}
75+
b.chunk = b.chunks[0]
76+
b.off = 0
77+
b.chunks = b.chunks[1:]
78+
m = copy(buf[l:], b.chunk)
79+
l += m
80+
}
81+
b.off += m
82+
return
83+
}
84+
85+
// CopyBytes copy bytes from chunk, if chunk is not enough, it will copy from next chunks.
86+
//
87+
// MAKE SURE IT CAN BE INLINE:
88+
// `can inline (*XReadBuffer).CopyBytes with cost 80`
89+
func (b *ReadBuffer) CopyBytes(buf []byte) {
90+
n := copy(buf, b.chunk[b.off:])
91+
if len(buf) > n {
92+
b.copySlow(buf)
93+
} else {
94+
b.off += n
95+
}
96+
}
97+
98+
func (b *ReadBuffer) copySlow(buf []byte) {
99+
m := len(b.chunk) - b.off
100+
l := m
101+
for l < len(buf) {
102+
if len(b.chunks) == 0 {
103+
panic(errReadBufferNotEnough.Error())
104+
}
105+
b.chunk = b.chunks[0]
106+
b.off = 0
107+
b.chunks = b.chunks[1:]
108+
m = copy(buf[l:], b.chunk)
109+
l += m
110+
}
111+
b.off += m
112+
}
113+
114+
func (b *ReadBuffer) Free() {
115+
b.off = 0
116+
b.chunk = nil
117+
b.chunks = nil
118+
for i := range b.pool {
119+
mcache.Free(b.pool[i])
120+
b.pool[i] = nil
121+
}
122+
b.pool = b.pool[:0]
123+
readBufferPool.Put(b)
124+
}

xbuf/readbuf_test.go renamed to gridbuf/readbuf_test.go

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,18 @@
1-
package xbuf
1+
// Copyright 2025 CloudWeGo Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package gridbuf
216

317
import (
418
"runtime/debug"
@@ -7,7 +21,7 @@ import (
721
)
822

923
func TestReadBuf_Inline(t *testing.T) {
10-
var x *XReadBuffer
24+
var x *ReadBuffer
1125

1226
defer func() {
1327
r := recover()
@@ -36,7 +50,7 @@ func TestReadBuf_Inline(t *testing.T) {
3650
}
3751

3852
func TestReadBuf_CrossPad(t *testing.T) {
39-
tf := func(getBuf func(x *XReadBuffer, n int) []byte) {
53+
tf := func(getBuf func(x *ReadBuffer, n int) []byte) {
4054
ori1 := make([]byte, padLength)
4155
for i := range ori1 {
4256
ori1[i] = 'a'
@@ -46,7 +60,7 @@ func TestReadBuf_CrossPad(t *testing.T) {
4660
ori2[i] = 'b'
4761
}
4862
ori := [][]byte{ori1, ori2}
49-
x := NewXReadBuffer(ori)
63+
x := NewReadBuffer(ori)
5064
defer x.Free()
5165
buf := getBuf(x, padLength-1)
5266
if len(buf) < padLength-1 {
@@ -77,18 +91,18 @@ func TestReadBuf_CrossPad(t *testing.T) {
7791
}
7892
}
7993
}
80-
tf(func(x *XReadBuffer, n int) []byte {
94+
tf(func(x *ReadBuffer, n int) []byte {
8195
return x.ReadN(n)
8296
})
83-
tf(func(x *XReadBuffer, n int) []byte {
97+
tf(func(x *ReadBuffer, n int) []byte {
8498
buf := make([]byte, n)
8599
x.CopyBytes(buf)
86100
return buf
87101
})
88102
}
89103

90104
func TestReadBuf_NoCrossPad(t *testing.T) {
91-
tf := func(getBuf func(x *XReadBuffer, n int) []byte) {
105+
tf := func(getBuf func(x *ReadBuffer, n int) []byte) {
92106
ori1 := make([]byte, padLength/2)
93107
for i := range ori1 {
94108
ori1[i] = 'a'
@@ -98,7 +112,7 @@ func TestReadBuf_NoCrossPad(t *testing.T) {
98112
ori2[i] = 'b'
99113
}
100114
ori := append(ori1, ori2...)
101-
x := NewXReadBuffer([][]byte{ori})
115+
x := NewReadBuffer([][]byte{ori})
102116
defer x.Free()
103117

104118
buf := getBuf(x, padLength/2)
@@ -120,10 +134,10 @@ func TestReadBuf_NoCrossPad(t *testing.T) {
120134
}
121135
}
122136
}
123-
tf(func(x *XReadBuffer, n int) []byte {
137+
tf(func(x *ReadBuffer, n int) []byte {
124138
return x.ReadN(n)
125139
})
126-
tf(func(x *XReadBuffer, n int) []byte {
140+
tf(func(x *ReadBuffer, n int) []byte {
127141
buf := make([]byte, n)
128142
x.CopyBytes(buf)
129143
return buf

gridbuf/writebuf.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
// Copyright 2025 CloudWeGo Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package gridbuf
16+
17+
import (
18+
"sync"
19+
20+
"github.com/bytedance/gopkg/lang/mcache"
21+
)
22+
23+
const padLength = 1 << 13
24+
25+
var writeBufferPool = sync.Pool{
26+
New: func() interface{} {
27+
return &WriteBuffer{
28+
chunks: make([][]byte, 0, 16),
29+
pool: make([][]byte, 0, 16),
30+
}
31+
},
32+
}
33+
34+
type WriteBuffer struct {
35+
off int // write offset of chunk
36+
chunk []byte
37+
chunks [][]byte
38+
pool [][]byte
39+
}
40+
41+
func NewWriteBuffer() *WriteBuffer {
42+
return writeBufferPool.Get().(*WriteBuffer)
43+
}
44+
45+
func (b *WriteBuffer) Bytes() [][]byte {
46+
if b.off > 0 {
47+
b.chunks = append(b.chunks, b.chunk[:b.off])
48+
b.chunk = b.chunk[b.off:]
49+
b.off = 0
50+
}
51+
return b.chunks
52+
}
53+
54+
func (b *WriteBuffer) Free() {
55+
b.off = 0
56+
b.chunk = nil
57+
for i := range b.chunks {
58+
b.chunks[i] = nil
59+
}
60+
b.chunks = b.chunks[:0]
61+
for i := range b.pool {
62+
mcache.Free(b.pool[i])
63+
b.pool[i] = nil
64+
}
65+
b.pool = b.pool[:0]
66+
writeBufferPool.Put(b)
67+
}
68+
69+
// MallocN malloc n bytes from chunk, if chunk is not enough, it will grow.
70+
//
71+
// MAKE SURE IT CAN BE INLINE:
72+
// `can inline (*XWriteBuffer).MallocN with cost 79`
73+
func (b *WriteBuffer) MallocN(n int) (buf []byte) {
74+
buf = b.chunk[b.off:]
75+
if len(buf) < n {
76+
buf = b.growSlow(n)
77+
}
78+
b.off += n
79+
return
80+
}
81+
82+
func (b *WriteBuffer) growSlow(n int) []byte {
83+
if b.off > 0 {
84+
b.chunk = b.chunk[:b.off]
85+
b.chunks = append(b.chunks, b.chunk)
86+
b.off = 0
87+
}
88+
// refresh chunk
89+
if n < padLength {
90+
n = padLength
91+
}
92+
buf := mcache.Malloc(n)
93+
buf = buf[:cap(buf)]
94+
b.pool = append(b.pool, buf)
95+
b.chunk = buf
96+
return buf
97+
}
98+
99+
func (b *WriteBuffer) WriteDirect(buf []byte) {
100+
// relink chunks
101+
if b.off > 0 {
102+
b.chunks = append(b.chunks, b.chunk[:b.off])
103+
b.chunk = b.chunk[b.off:]
104+
b.off = 0
105+
}
106+
107+
// write directly
108+
b.chunks = append(b.chunks, buf)
109+
return
110+
}

xbuf/writebuf_test.go renamed to gridbuf/writebuf_test.go

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,27 @@
1-
package xbuf
1+
// Copyright 2025 CloudWeGo Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package gridbuf
216

317
import (
418
"runtime/debug"
519
"strings"
620
"testing"
721
)
822

9-
func TestXWriteBuffer_Inline(t *testing.T) {
10-
var b *XWriteBuffer
23+
func TestWriteBuffer_Inline(t *testing.T) {
24+
var b *WriteBuffer
1125

1226
defer func() {
1327
r := recover()
@@ -23,8 +37,8 @@ func TestXWriteBuffer_Inline(t *testing.T) {
2337
b.MallocN(10)
2438
}
2539

26-
func TestXWriteBuffer_CrossPad(t *testing.T) {
27-
b := NewXWriteBuffer()
40+
func TestWriteBuffer_CrossPad(t *testing.T) {
41+
b := NewWriteBuffer()
2842
defer b.Free()
2943
buf := b.MallocN(padLength - 1)
3044
for i := range buf {
@@ -56,8 +70,8 @@ func TestXWriteBuffer_CrossPad(t *testing.T) {
5670
}
5771
}
5872

59-
func TestXWriteBuffer_NoCrossPad(t *testing.T) {
60-
b := NewXWriteBuffer()
73+
func TestWriteBuffer_NoCrossPad(t *testing.T) {
74+
b := NewWriteBuffer()
6175
defer b.Free()
6276
buf := b.MallocN(1024)
6377
for i := range buf {
@@ -84,8 +98,8 @@ func TestXWriteBuffer_NoCrossPad(t *testing.T) {
8498
}
8599
}
86100

87-
func TestXWriteBuffer_WriteDirect(t *testing.T) {
88-
b := NewXWriteBuffer()
101+
func TestWriteBuffer_WriteDirect(t *testing.T) {
102+
b := NewWriteBuffer()
89103
defer b.Free()
90104
buf := b.MallocN(1024)
91105
for i := range buf {

0 commit comments

Comments
 (0)