diff --git a/ringbuf/reader.go b/ringbuf/reader.go index 5e05e7946..6f3a06761 100644 --- a/ringbuf/reader.go +++ b/ringbuf/reader.go @@ -23,8 +23,7 @@ var ( // ringbufHeader from 'struct bpf_ringbuf_hdr' in kernel/bpf/ringbuf.c type ringbufHeader struct { - Len uint32 - PgOff uint32 + Len uint32 } func (rh *ringbufHeader) isBusy() bool { @@ -47,23 +46,16 @@ type Record struct { } // Read a record from an event ring. -// -// buf must be at least BPF_RINGBUF_HDR_SZ bytes long. -func readRecord(rd *ringbufEventRing, rec *Record, buf []byte) error { +func readRecord(rd *ringbufEventRing, rec *Record) error { rd.loadConsumer() - buf = buf[:unix.BPF_RINGBUF_HDR_SZ] - if _, err := io.ReadFull(rd, buf); err == io.EOF { + header, err := rd.readHeader() + if err == io.EOF { return errEOR } else if err != nil { return fmt.Errorf("read event header: %w", err) } - header := ringbufHeader{ - internal.NativeEndian.Uint32(buf[0:4]), - internal.NativeEndian.Uint32(buf[4:8]), - } - if header.isBusy() { // the next sample in the ring is not committed yet so we // exit without storing the reader/consumer position @@ -111,7 +103,6 @@ type Reader struct { mu sync.Mutex ring *ringbufEventRing epollEvents []unix.EpollEvent - header []byte haveData bool deadline time.Time bufferSize int @@ -148,7 +139,6 @@ func NewReader(ringbufMap *ebpf.Map) (*Reader, error) { poller: poller, ring: ring, epollEvents: make([]unix.EpollEvent, 1), - header: make([]byte, unix.BPF_RINGBUF_HDR_SZ), bufferSize: ring.size(), }, nil } @@ -220,7 +210,7 @@ func (r *Reader) ReadInto(rec *Record) error { } for { - err := readRecord(r.ring, rec, r.header) + err := readRecord(r.ring, rec) // Not using errors.Is which is quite a bit slower // For a tight loop it might make a difference if err == errBusy || err == errDiscard { diff --git a/ringbuf/ring.go b/ringbuf/ring.go index 6dd04a93e..3dc693ae4 100644 --- a/ringbuf/ring.go +++ b/ringbuf/ring.go @@ -109,6 +109,19 @@ func (rr *ringReader) remaining() int { return int((prod - cons) & rr.mask) } +func (rr *ringReader) readHeader() (ringbufHeader, error) { + prod := atomic.LoadUint64(rr.prod_pos) + + // ensure we do not go over the prod position + if prod-rr.cons < unix.BPF_RINGBUF_HDR_SZ { + return ringbufHeader{}, io.EOF + } + + len := atomic.LoadUint32((*uint32)((unsafe.Pointer)(&rr.ring[rr.cons&rr.mask]))) + rr.cons += unix.BPF_RINGBUF_HDR_SZ + return ringbufHeader{len}, nil +} + func (rr *ringReader) Read(p []byte) (int, error) { prod := atomic.LoadUint64(rr.prod_pos)