Skip to content

Commit

Permalink
feat: Introduce a new Object Storage WAL format. (#13253)
Browse files Browse the repository at this point in the history
  • Loading branch information
cyriltovena committed Jun 19, 2024
1 parent 467eb1b commit 1d6f8d5
Show file tree
Hide file tree
Showing 23 changed files with 35,338 additions and 19 deletions.
9 changes: 1 addition & 8 deletions pkg/storage/stores/shipper/indexshipper/tsdb/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ func (m *Metadata) EnsureBounds(from, through int64) {
if m.Through == 0 || through > m.Through {
m.Through = through
}

}

// NewTOCFromByteSlice return parsed TOC from given index byte slice.
Expand Down Expand Up @@ -1646,7 +1645,6 @@ func readFingerprintOffsetsTable(bs ByteSlice, off uint64) (FingerprintOffsets,
}

return res, d.Err()

}

// Close the reader and its underlying resources.
Expand Down Expand Up @@ -2074,7 +2072,7 @@ func (dec *Decoder) Postings(b []byte) (int, Postings, error) {
if len(l) != 4*n {
return 0, nil, fmt.Errorf("unexpected postings length, should be %d bytes for %d postings, got %d bytes", 4*n, n, len(l))
}
return n, newBigEndianPostings(l), nil
return n, NewBigEndianPostings(l), nil
}

// LabelNamesOffsetsFor decodes the offsets of the name symbols for a given series.
Expand Down Expand Up @@ -2335,7 +2333,6 @@ func (dec *Decoder) readChunkStatsV3(d *encoding.Decbuf, from, through int64) (r
}

return res, d.Err()

}

func (dec *Decoder) accumulateChunkStats(d *encoding.Decbuf, nChunks int, from, through int64) (res ChunkStats, err error) {
Expand Down Expand Up @@ -2372,16 +2369,13 @@ func (dec *Decoder) readChunkStatsPriorV3(d *encoding.Decbuf, seriesRef storage.
} else if chk.MinTime >= through {
break
}

}

return res, nil

}

// Series decodes a series entry from the given byte slice into lset and chks.
func (dec *Decoder) Series(version int, b []byte, seriesRef storage.SeriesRef, from int64, through int64, lbls *labels.Labels, chks *[]ChunkMeta) (uint64, error) {

d, fprint, err := dec.prepSeries(b, lbls, chks)
if err != nil {
return 0, err
Expand All @@ -2392,7 +2386,6 @@ func (dec *Decoder) Series(version int, b []byte, seriesRef storage.SeriesRef, f
return 0, errors.Wrapf(err, "series %s", lbls.String())
}
return fprint, nil

}

func (dec *Decoder) readChunks(version int, d *encoding.Decbuf, seriesRef storage.SeriesRef, from int64, through int64, chks *[]ChunkMeta) error {
Expand Down
16 changes: 8 additions & 8 deletions pkg/storage/stores/shipper/indexshipper/tsdb/index/postings.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,22 +777,22 @@ func (it *ListPostings) Err() error {
return nil
}

// bigEndianPostings implements the Postings interface over a byte stream of
// BigEndianPostings implements the Postings interface over a byte stream of
// big endian numbers.
type bigEndianPostings struct {
type BigEndianPostings struct {
list []byte
cur uint32
}

func newBigEndianPostings(list []byte) *bigEndianPostings {
return &bigEndianPostings{list: list}
func NewBigEndianPostings(list []byte) *BigEndianPostings {
return &BigEndianPostings{list: list}
}

func (it *bigEndianPostings) At() storage.SeriesRef {
func (it *BigEndianPostings) At() storage.SeriesRef {
return storage.SeriesRef(it.cur)
}

func (it *bigEndianPostings) Next() bool {
func (it *BigEndianPostings) Next() bool {
if len(it.list) >= 4 {
it.cur = binary.BigEndian.Uint32(it.list)
it.list = it.list[4:]
Expand All @@ -801,7 +801,7 @@ func (it *bigEndianPostings) Next() bool {
return false
}

func (it *bigEndianPostings) Seek(x storage.SeriesRef) bool {
func (it *BigEndianPostings) Seek(x storage.SeriesRef) bool {
if storage.SeriesRef(it.cur) >= x {
return true
}
Expand All @@ -821,7 +821,7 @@ func (it *bigEndianPostings) Seek(x storage.SeriesRef) bool {
return false
}

func (it *bigEndianPostings) Err() error {
func (it *BigEndianPostings) Err() error {
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ func TestBigEndian(t *testing.T) {
}

t.Run("Iteration", func(t *testing.T) {
bep := newBigEndianPostings(beLst)
bep := NewBigEndianPostings(beLst)
for i := 0; i < num; i++ {
require.True(t, bep.Next())
require.Equal(t, storage.SeriesRef(ls[i]), bep.At())
Expand Down Expand Up @@ -764,7 +764,7 @@ func TestBigEndian(t *testing.T) {
},
}

bep := newBigEndianPostings(beLst)
bep := NewBigEndianPostings(beLst)

for _, v := range table {
require.Equal(t, v.found, bep.Seek(storage.SeriesRef(v.seek)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,6 @@ func (c *IndexClient) Volume(ctx context.Context, userID string, from, through m
}

func (c *IndexClient) GetShards(ctx context.Context, userID string, from, through model.Time, targetBytesPerShard uint64, predicate chunk.Predicate) (*logproto.ShardsResponse, error) {

// TODO(owen-d): perf, this is expensive :(
var mtx sync.Mutex

Expand Down
183 changes: 183 additions & 0 deletions pkg/storage/wal/README.md

Large diffs are not rendered by default.

Loading

0 comments on commit 1d6f8d5

Please sign in to comment.