Skip to content
Open
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# CHANGELOG

## Unreleased

- [IMPROVEMENT] Implement buffer and map pooling to reduce allocations and memory usage

[v1.8.0](https://github.com/graph-gophers/graphql-go/releases/tag/v1.8.0) Release v1.8.0

* [FEATURE] Added `DecodeSelectedFieldArgs` helper function to decode argument values for any (nested) selected field path directly from a resolver context, enabling efficient multi-level prefetching without per-resolver argument reflection. This enables selective, multi‑level batching (Category → Products → Reviews) by loading only requested fields, mitigating N+1 issues despite complex filters or pagination.
Expand Down
14 changes: 13 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,18 @@ type Tracer interface {
}
```


### [Examples](https://github.com/graph-gophers/graphql-go/wiki/Examples)

## Testing

### Run All Tests

```bash
go test ./... -count=1
```

### Run Memory Benchmarks

```bash
go test -run=^$ -bench='BenchmarkMemory.*' -benchmem .
```
39 changes: 29 additions & 10 deletions internal/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ type extensionser interface {
}

func (r *Request) Execute(ctx context.Context, s *resolvable.Schema, op *ast.OperationDefinition) ([]byte, []*errors.QueryError) {
var out bytes.Buffer
out := getBuffer()
defer putBuffer(out)

func() {
defer r.handlePanic(ctx)
sels := selected.ApplyOperation(&r.Request, s, op)
Expand All @@ -63,14 +65,14 @@ func (r *Request) Execute(ctx context.Context, s *resolvable.Schema, op *ast.Ope
return
}

r.execSelections(ctx, sels, nil, s, resolver, &out, op.Type == query.Mutation)
r.execSelections(ctx, sels, nil, s, resolver, out, op.Type == query.Mutation)
}()

if err := ctx.Err(); err != nil {
return nil, []*errors.QueryError{errors.Errorf("%s", err)}
}

return out.Bytes(), r.Errs
return copyBuffer(out), r.Errs
}

type fieldToValidate struct {
Expand All @@ -97,7 +99,9 @@ func (r *Request) execSelections(ctx context.Context, sels []selected.Selection,
async := !serially && selected.HasAsyncSel(sels)

var fields []*fieldToExec
collectFieldsToResolve(sels, s, resolver, &fields, make(map[string]*fieldToExec))
fieldMap := getFieldMap()
collectFieldsToResolve(sels, s, resolver, &fields, fieldMap)
putFieldMap(fieldMap)

if async {
var wg sync.WaitGroup
Expand All @@ -106,14 +110,14 @@ func (r *Request) execSelections(ctx context.Context, sels []selected.Selection,
go func(f *fieldToExec) {
defer wg.Done()
defer r.handlePanic(ctx)
f.out = new(bytes.Buffer)
f.out = getBuffer()
execFieldSelection(ctx, r, s, f, &pathSegment{path, f.field.Alias}, true)
}(f)
}
wg.Wait()
} else {
for _, f := range fields {
f.out = new(bytes.Buffer)
f.out = getBuffer()
execFieldSelection(ctx, r, s, f, &pathSegment{path, f.field.Alias}, true)
}
}
Expand All @@ -126,6 +130,9 @@ func (r *Request) execSelections(ctx context.Context, sels []selected.Selection,
if _, ok := f.field.Type.(*ast.NonNull); ok && resolvedToNull(f.out) {
out.Reset()
out.Write([]byte("null"))
for _, field := range fields {
putBuffer(field.out)
}
return
}

Expand All @@ -139,6 +146,10 @@ func (r *Request) execSelections(ctx context.Context, sels []selected.Selection,
out.Write(f.out.Bytes())
}
out.WriteByte('}')

for _, f := range fields {
putBuffer(f.out)
}
}

func collectFieldsToResolve(sels []selected.Selection, s *resolvable.Schema, resolver reflect.Value, fields *[]*fieldToExec, fieldByAlias map[string]*fieldToExec) {
Expand Down Expand Up @@ -334,7 +345,15 @@ func (r *Request) execSelectionSet(ctx context.Context, sels []selected.Selectio

func (r *Request) execList(ctx context.Context, sels []selected.Selection, typ *ast.List, path *pathSegment, s *resolvable.Schema, resolver reflect.Value, out *bytes.Buffer) {
l := resolver.Len()
entryouts := make([]bytes.Buffer, l)
entryouts := make([]*bytes.Buffer, l)
for i := 0; i < l; i++ {
entryouts[i] = getBuffer()
}
defer func() {
for _, buf := range entryouts {
putBuffer(buf)
}
}()

if selected.HasAsyncSel(sels) {
// Limit the number of concurrent goroutines spawned as it can lead to large
Expand All @@ -346,15 +365,15 @@ func (r *Request) execList(ctx context.Context, sels []selected.Selection, typ *
go func(i int) {
defer func() { <-sem }()
defer r.handlePanic(ctx)
r.execSelectionSet(ctx, sels, typ.OfType, &pathSegment{path, i}, s, resolver.Index(i), &entryouts[i])
r.execSelectionSet(ctx, sels, typ.OfType, &pathSegment{path, i}, s, resolver.Index(i), entryouts[i])
}(i)
}
for i := 0; i < concurrency; i++ {
sem <- struct{}{}
}
} else {
for i := 0; i < l; i++ {
r.execSelectionSet(ctx, sels, typ.OfType, &pathSegment{path, i}, s, resolver.Index(i), &entryouts[i])
r.execSelectionSet(ctx, sels, typ.OfType, &pathSegment{path, i}, s, resolver.Index(i), entryouts[i])
}
}

Expand All @@ -364,7 +383,7 @@ func (r *Request) execList(ctx context.Context, sels []selected.Selection, typ *
for i, entryout := range entryouts {
// If the list wraps a non-null type and one of the list elements
// resolves to null, then the entire list resolves to null.
if listOfNonNull && resolvedToNull(&entryout) {
if listOfNonNull && resolvedToNull(entryout) {
out.Reset()
out.WriteString("null")
return
Expand Down
60 changes: 60 additions & 0 deletions internal/exec/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package exec

import (
"bytes"
"sync"
)

const (
maxBufferCap = 64 * 1024
maxFieldMapSize = 128
newFieldMapSize = 16
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if these should be configurable through schema options, in case some projects have special requirements...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a good question—I'm not sure what level it should be configurable on, and it's possible that there's actually not much need for configuring, and might just cause more problems than it solves by exposing it.

In any case, rather than storing these globally, alternatively we could try to maintain buffers per-schema and manage them there, making it easier to configure at that level if we want. Thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I love the idea about buffer pool per schema. Then it can even be configured easier.

)

var bufferPool = sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
}

func getBuffer() *bytes.Buffer {
buf := bufferPool.Get().(*bytes.Buffer)
buf.Reset()
return buf
}

func putBuffer(buf *bytes.Buffer) {
if buf.Cap() > maxBufferCap {
return
}
bufferPool.Put(buf)
}

func copyBuffer(buf *bytes.Buffer) []byte {
if buf.Len() == 0 {
return nil
}
result := make([]byte, buf.Len())
copy(result, buf.Bytes())
return result
}

var fieldMapPool = sync.Pool{
New: func() interface{} {
return make(map[string]*fieldToExec, newFieldMapSize)
},
}

func getFieldMap() map[string]*fieldToExec {
return fieldMapPool.Get().(map[string]*fieldToExec)
}

func putFieldMap(m map[string]*fieldToExec) {
if len(m) > maxFieldMapSize {
return
}
for k := range m {
delete(m, k)
}
fieldMapPool.Put(m)
}
93 changes: 93 additions & 0 deletions internal/exec/pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package exec

import (
"testing"
)

func TestBufferPool(t *testing.T) {
t.Run("resets buffer before returning", func(t *testing.T) {
buf := getBuffer()
buf.WriteString("test data")
putBuffer(buf)

buf2 := getBuffer()
if buf2.Len() != 0 {
t.Errorf("expected reset buffer, got length %d", buf2.Len())
}
putBuffer(buf2)
})

t.Run("copyBuffer copies data correctly", func(t *testing.T) {
buf := getBuffer()
buf.WriteString("test data")

copied := copyBuffer(buf)
if string(copied) != "test data" {
t.Errorf("expected 'test data', got %q", string(copied))
}

// Original buffer should be unchanged
if buf.String() != "test data" {
t.Errorf("original buffer modified")
}

putBuffer(buf)
})

t.Run("copyBuffer returns nil for empty buffer", func(t *testing.T) {
buf := getBuffer()
copied := copyBuffer(buf)
if copied != nil {
t.Errorf("expected nil for empty buffer, got %v", copied)
}
putBuffer(buf)
})

t.Run("does not pool oversized buffers", func(t *testing.T) {
buf := getBuffer()
large := make([]byte, 65*1024)
buf.Write(large)

if buf.Cap() <= maxBufferCap {
t.Skip("buffer didn't grow large enough for test")
}

putBuffer(buf) // Should not be added to pool

buf2 := getBuffer()
if buf2.Cap() > maxBufferCap {
t.Errorf("got oversized buffer from pool, capacity: %d", buf2.Cap())
}
putBuffer(buf2)
})
}

func TestFieldMapPool(t *testing.T) {
t.Run("clears map before returning", func(t *testing.T) {
m := getFieldMap()
m["test"] = &fieldToExec{}
m["foo"] = &fieldToExec{}
putFieldMap(m)

m2 := getFieldMap()
if len(m2) != 0 {
t.Errorf("expected cleared map, got length %d", len(m2))
}
putFieldMap(m2)
})

t.Run("does not pool oversized maps", func(t *testing.T) {
m := getFieldMap()
for i := 0; i < 129; i++ {
m[string(rune(i))] = &fieldToExec{}
}

putFieldMap(m) // Should not be added to pool

m2 := getFieldMap()
if len(m2) != 0 {
t.Errorf("got non-empty map from pool, length: %d", len(m2))
}
putFieldMap(m2)
})
}
18 changes: 11 additions & 7 deletions internal/exec/subscribe.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package exec

import (
"bytes"
"context"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -123,7 +122,7 @@ func (r *Request) Subscribe(ctx context.Context, s *resolvable.Schema, op *ast.O
Tracer: r.Tracer,
Logger: r.Logger,
}
var out bytes.Buffer
out := getBuffer()
func() {
timeout := r.SubscribeResolverTimeout
if timeout == 0 {
Expand All @@ -137,31 +136,36 @@ func (r *Request) Subscribe(ctx context.Context, s *resolvable.Schema, op *ast.O
func() {
defer subR.handlePanic(subCtx)

var buf bytes.Buffer
subR.execSelectionSet(subCtx, f.sels, f.field.Type, &pathSegment{nil, f.field.Alias}, s, resp, &buf)
buf := getBuffer()
defer putBuffer(buf)
subR.execSelectionSet(subCtx, f.sels, f.field.Type, &pathSegment{nil, f.field.Alias}, s, resp, buf)

propagateChildError := false
if _, nonNullChild := f.field.Type.(*ast.NonNull); nonNullChild && resolvedToNull(&buf) {
if _, nonNullChild := f.field.Type.(*ast.NonNull); nonNullChild && resolvedToNull(buf) {
propagateChildError = true
}

if !propagateChildError {
out.WriteString(fmt.Sprintf(`{"%s":`, f.field.Alias))
fmt.Fprintf(out, `{"%s":`, f.field.Alias)
out.Write(buf.Bytes())
out.WriteString(`}`)
}
}()

if err := subCtx.Err(); err != nil {
putBuffer(out)
c <- &Response{Errors: []*errors.QueryError{errors.Errorf("%s", err)}}
return
}

data := copyBuffer(out)
putBuffer(out)

// Send response within timeout
// TODO: maybe block until sent?
select {
case <-subCtx.Done():
case c <- &Response{Data: out.Bytes(), Errors: subR.Errs}:
case c <- &Response{Data: data, Errors: subR.Errs}:
}
}()
}
Expand Down
Loading
Loading