Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified btf/testdata/btf_testmod.btf
Binary file not shown.
Binary file modified btf/testdata/vmlinux.btf.gz
Binary file not shown.
65 changes: 62 additions & 3 deletions ringbuf/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ import (
"fmt"
"os"
"sync"
"sync/atomic"
"time"
"unsafe"

"iter"

"github.com/cilium/ebpf"
"github.com/cilium/ebpf/internal/platform"
"github.com/cilium/ebpf/internal/sys"
Expand Down Expand Up @@ -40,6 +43,11 @@ func (rh *ringbufHeader) dataLen() int {
}

type Record struct {
// RawSample contains the raw bytes of a ringbuf record.
//
// When obtained via [Reader.Records], RawSample is a zero-copy view into the
// underlying mmap. It is only valid until the iterator yields the next
// record or terminates. Callers must copy the data if they need to retain it.
RawSample []byte

// The minimum number of bytes remaining in the ring buffer after this Record has been read.
Expand Down Expand Up @@ -144,6 +152,59 @@ func (r *Reader) Read() (Record, error) {

// ReadInto is like Read except that it allows reusing Record and associated buffers.
func (r *Reader) ReadInto(rec *Record) error {
return r.readLocked(func() error {
return r.ring.readRecord(rec)
})
}

// Records iterates over records in the reader until [Reader.Close] is called.
//
// Record.RawSample is only valid until the next call to the iterator. Callers
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even more explicit: both Record and Record.RawSample are only valid until the next call.

// must copy the data if it needs to outlive the current iteration.
//
// This convenience wrapper allocates a single Record once. To fully avoid
// allocations, use [Reader.RecordsInto] and pass in a reusable Record.
func (r *Reader) Records() iter.Seq2[*Record, error] {
rec := Record{}
return r.RecordsInto(&rec)
}

// RecordsInto is like Records but allows reusing the Record and associated buffers.
func (r *Reader) RecordsInto(rec *Record) iter.Seq2[*Record, error] {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If Records() can get away with allocating Record once when creating the iterator there isn't much point in this function IMO.

return func(yield func(*Record, error) bool) {
var (
sample []byte
remaining int
nextCons uintptr
)

for {
err := r.readLocked(func() error {
var err error
sample, remaining, nextCons, err = r.ring.readSample()
return err
})
if err != nil {
yield(nil, err)
return
}

// Limit cap to len so append can't write past the record and corrupt the ring.
rec.RawSample = sample[:len(sample):len(sample)]
rec.Remaining = remaining

if !yield(rec, nil) {
atomic.StoreUintptr(r.ring.cons_pos, nextCons)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is safe to do. Consider two concurrent callers of Records():

  • First caller reads record A, offset 10
  • Second caller reads record B, offset 20
  • Second caller finishes the loop quicker than the first, offset 20 is written.
  • Data of the first caller is corrupted.

I think you need to hold the lock across the yield. Maybe its as simple as moving it all into readLocked?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am aware of this concurrent read issue, my first version was holding the lock during the iterator. However, that leads to the problem of dead lock when calling reader.SetDeadline:

for rec, err := range reader.Records() {
    reader.SetDeadline() // dead lock
}

reader.Close() also tries to acquire the reader.mu, it will become impossible to call Close() during iteration 😬

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I drafted a PR on my fork for your preview: jschwinger233#5

return
}

atomic.StoreUintptr(r.ring.cons_pos, nextCons)
}
}
}

// readLocked drives the polling / data-availability loop shared by Record reads.
func (r *Reader) readLocked(read func() error) error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

having read be a closure over Reader state is a bit black magic. How about

Suggested change
func (r *Reader) readLocked(read func() error) error {
func (r *Reader) readLocked(handle func(sample []byte, remaining int) error) error {

readLocked would be responsible for calling readSample and advancing the consumer position. readRecord would just be func(*Record, sample []byte, remaining) error.

Probably also easier for the compiler to turn into good code since handle doesn't have to be a closure.

r.mu.Lock()
defer r.mu.Unlock()

Expand Down Expand Up @@ -171,9 +232,7 @@ func (r *Reader) ReadInto(rec *Record) error {
}

for {
err := r.ring.readRecord(rec)
// Not using errors.Is which is quite a bit slower
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: please keep the comment.

// For a tight loop it might make a difference
err := read()
if err == errBusy {
continue
}
Expand Down
31 changes: 31 additions & 0 deletions ringbuf/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,3 +384,34 @@ func BenchmarkReadInto(b *testing.B) {
}
}
}

func TestRecordsIterator(t *testing.T) {
testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer")

prog, events := mustOutputSamplesProg(t,
sampleMessage{size: 5, flags: 0},
sampleMessage{size: 7, flags: 0},
)
mustRun(t, prog)

rd, err := NewReader(events)
if err != nil {
t.Fatal(err)
}
defer rd.Close()

var seen [][]byte
for rec, err := range rd.Records() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs a test that Reader.Close() stops the iterator without yielding io.ErrClosed. Using iter.Pull might be the easiest way to achieve that.

if err != nil {
t.Fatalf("iteration error: %v", err)
}
seen = append(seen, append([]byte(nil), rec.RawSample...))
if len(seen) == 2 {
break
}
}

qt.Assert(t, qt.Equals(len(seen), 2))
qt.Assert(t, qt.DeepEquals(seen[0], []byte{1, 2, 3, 4, 4}))
qt.Assert(t, qt.DeepEquals(seen[1], []byte{1, 2, 3, 4, 4, 3, 2}))
}
42 changes: 27 additions & 15 deletions ringbuf/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,17 @@ func (rr *ringReader) AvailableBytes() uint64 {
return uint64(prod - cons)
}

// Read a record from an event ring.
func (rr *ringReader) readRecord(rec *Record) error {
// readSample returns a zero-copy view into the next sample, together with the
// consumer position that should be stored to release the data.
func (rr *ringReader) readSample() (sample []byte, remaining int, nextCons uintptr, err error) {
prod := atomic.LoadUintptr(rr.prod_pos)
cons := atomic.LoadUintptr(rr.cons_pos)

for {
if remaining := prod - cons; remaining == 0 {
return errEOR
return nil, 0, 0, errEOR
} else if remaining < sys.BPF_RINGBUF_HDR_SZ {
return fmt.Errorf("read record header: %w", io.ErrUnexpectedEOF)
return nil, 0, 0, fmt.Errorf("read record header: %w", io.ErrUnexpectedEOF)
}

// read the len field of the header atomically to ensure a happens before
Expand All @@ -65,15 +66,15 @@ func (rr *ringReader) readRecord(rec *Record) error {
// the next sample in the ring is not committed yet so we
// exit without storing the reader/consumer position
// and start again from the same position.
return errBusy
return nil, 0, 0, errBusy
}

cons += sys.BPF_RINGBUF_HDR_SZ

// Data is always padded to 8 byte alignment.
dataLenAligned := uintptr(internal.Align(header.dataLen(), 8))
if remaining := prod - cons; remaining < dataLenAligned {
return fmt.Errorf("read sample data: %w", io.ErrUnexpectedEOF)
return nil, 0, 0, fmt.Errorf("read sample data: %w", io.ErrUnexpectedEOF)
}

start = cons & rr.mask
Expand All @@ -87,15 +88,26 @@ func (rr *ringReader) readRecord(rec *Record) error {
continue
}

if n := header.dataLen(); cap(rec.RawSample) < n {
rec.RawSample = make([]byte, n)
} else {
rec.RawSample = rec.RawSample[:n]
}
end := int(start) + header.dataLen()
return rr.ring[start:end], int(prod - cons), cons, nil
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reslicing to reduce cap could already happen here.

}
}

copy(rec.RawSample, rr.ring[start:])
rec.Remaining = int(prod - cons)
atomic.StoreUintptr(rr.cons_pos, cons)
return nil
// Read a record from an event ring, copying the sample into the provided Record.
func (rr *ringReader) readRecord(rec *Record) error {
sample, remaining, nextCons, err := rr.readSample()
if err != nil {
return err
}

if n := len(sample); cap(rec.RawSample) < n {
rec.RawSample = make([]byte, n)
} else {
rec.RawSample = rec.RawSample[:n]
}

copy(rec.RawSample, sample)
rec.Remaining = remaining
atomic.StoreUintptr(rr.cons_pos, nextCons)
return nil
}