Skip to content

Commit

Permalink
ringbuf: read the header len atomically
Browse files Browse the repository at this point in the history
Reading the len field of the event header needs to be atomic to match
the release memory barrier from
https://github.com/torvalds/linux/blob/eb26cbb1a754ccde5d4d74527dad5ba051808fad/kernel/bpf/ringbuf.c#L487

We also don't care about the PgOff field which is only used by rinbuf
internals.

Signed-off-by: Paul Cacheux <paul.cacheux@datadoghq.com>
  • Loading branch information
paulcacheux committed Mar 13, 2024
1 parent e078736 commit f8ed272
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 15 deletions.
20 changes: 5 additions & 15 deletions ringbuf/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ var (

// ringbufHeader from 'struct bpf_ringbuf_hdr' in kernel/bpf/ringbuf.c
type ringbufHeader struct {
Len uint32
PgOff uint32
Len uint32
}

func (rh *ringbufHeader) isBusy() bool {
Expand All @@ -47,23 +46,16 @@ type Record struct {
}

// Read a record from an event ring.
//
// buf must be at least BPF_RINGBUF_HDR_SZ bytes long.
func readRecord(rd *ringbufEventRing, rec *Record, buf []byte) error {
func readRecord(rd *ringbufEventRing, rec *Record) error {
rd.loadConsumer()

buf = buf[:unix.BPF_RINGBUF_HDR_SZ]
if _, err := io.ReadFull(rd, buf); err == io.EOF {
header, err := rd.readHeader()
if err == io.EOF {
return errEOR
} else if err != nil {
return fmt.Errorf("read event header: %w", err)
}

header := ringbufHeader{
internal.NativeEndian.Uint32(buf[0:4]),
internal.NativeEndian.Uint32(buf[4:8]),
}

if header.isBusy() {
// the next sample in the ring is not committed yet so we
// exit without storing the reader/consumer position
Expand Down Expand Up @@ -111,7 +103,6 @@ type Reader struct {
mu sync.Mutex
ring *ringbufEventRing
epollEvents []unix.EpollEvent
header []byte
haveData bool
deadline time.Time
bufferSize int
Expand Down Expand Up @@ -148,7 +139,6 @@ func NewReader(ringbufMap *ebpf.Map) (*Reader, error) {
poller: poller,
ring: ring,
epollEvents: make([]unix.EpollEvent, 1),
header: make([]byte, unix.BPF_RINGBUF_HDR_SZ),
bufferSize: ring.size(),
}, nil
}
Expand Down Expand Up @@ -220,7 +210,7 @@ func (r *Reader) ReadInto(rec *Record) error {
}

for {
err := readRecord(r.ring, rec, r.header)
err := readRecord(r.ring, rec)
// Not using errors.Is which is quite a bit slower
// For a tight loop it might make a difference
if err == errBusy || err == errDiscard {
Expand Down
13 changes: 13 additions & 0 deletions ringbuf/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,19 @@ func (rr *ringReader) remaining() int {
return int((prod - cons) & rr.mask)
}

func (rr *ringReader) readHeader() (ringbufHeader, error) {
prod := atomic.LoadUint64(rr.prod_pos)

// ensure we do not go over the prod position
if prod-rr.cons < unix.BPF_RINGBUF_HDR_SZ {
return ringbufHeader{}, io.EOF
}

len := atomic.LoadUint32((*uint32)((unsafe.Pointer)(&rr.ring[rr.cons&rr.mask])))
rr.cons += unix.BPF_RINGBUF_HDR_SZ
return ringbufHeader{len}, nil
}

func (rr *ringReader) Read(p []byte) (int, error) {
prod := atomic.LoadUint64(rr.prod_pos)

Expand Down

0 comments on commit f8ed272

Please sign in to comment.