Skip to content

Commit

Permalink
feat: Implement GetObjectRange for all storage providers (#13650)
Browse files Browse the repository at this point in the history
  • Loading branch information
benclive committed Jul 24, 2024
1 parent 498f29a commit d9c441e
Show file tree
Hide file tree
Showing 14 changed files with 229 additions and 3 deletions.
8 changes: 8 additions & 0 deletions pkg/ingester-rf1/objstore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ func (m *Multi) GetObject(ctx context.Context, objectKey string) (io.ReadCloser,
return s.GetObject(ctx, objectKey)
}

func (m *Multi) GetObjectRange(ctx context.Context, objectKey string, off, length int64) (io.ReadCloser, error) {
s, err := m.GetStoreFor(model.Now())
if err != nil {
return nil, err
}
return s.GetObjectRange(ctx, objectKey, off, length)
}

func (m *Multi) List(ctx context.Context, prefix string, delimiter string) ([]client.StorageObject, []client.StorageCommonPrefix, error) {
s, err := m.GetStoreFor(model.Now())
if err != nil {
Expand Down
20 changes: 20 additions & 0 deletions pkg/storage/chunk/client/alibaba/oss_object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,26 @@ func (s *OssObjectClient) GetObject(ctx context.Context, objectKey string) (io.R
return resp.Response.Body, int64(size), err
}

// GetObject returns a reader and the size for the specified object key from the configured OSS bucket.
func (s *OssObjectClient) GetObjectRange(ctx context.Context, objectKey string, offset, length int64) (io.ReadCloser, error) {
var resp *oss.GetObjectResult
options := []oss.Option{
oss.Range(offset, offset+length-1),
}
err := instrument.CollectedRequest(ctx, "OSS.GetObject", ossRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
var requestErr error
resp, requestErr = s.defaultBucket.DoGetObject(&oss.GetObjectRequest{ObjectKey: objectKey}, options)
if requestErr != nil {
return requestErr
}
return nil
})
if err != nil {
return nil, err
}
return resp.Response.Body, err
}

// PutObject puts the specified bytes into the configured OSS bucket at the provided key
func (s *OssObjectClient) PutObject(ctx context.Context, objectKey string, object io.Reader) error {
return instrument.CollectedRequest(ctx, "OSS.PutObject", ossRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
Expand Down
34 changes: 34 additions & 0 deletions pkg/storage/chunk/client/aws/s3_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,40 @@ func (a *S3ObjectClient) GetObject(ctx context.Context, objectKey string) (io.Re
return nil, 0, errors.Wrap(lastErr, "failed to get s3 object")
}

// GetObject from the store
func (a *S3ObjectClient) GetObjectRange(ctx context.Context, objectKey string, offset, length int64) (io.ReadCloser, error) {
var resp *s3.GetObjectOutput

// Map the key into a bucket
bucket := a.bucketFromKey(objectKey)

var lastErr error

retries := backoff.New(ctx, a.cfg.BackoffConfig)
for retries.Ongoing() {
if ctx.Err() != nil {
return nil, errors.Wrap(ctx.Err(), "ctx related error during s3 getObject")
}

lastErr = loki_instrument.TimeRequest(ctx, "S3.GetObject", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
var requestErr error
resp, requestErr = a.hedgedS3.GetObjectWithContext(ctx, &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(objectKey),
Range: aws.String(fmt.Sprintf("bytes=%d-%d", offset, offset+length-1)),
})
return requestErr
})

if lastErr == nil && resp.Body != nil {
return resp.Body, nil
}
retries.Wait()
}

return nil, errors.Wrap(lastErr, "failed to get s3 object")
}

// PutObject into the store
func (a *S3ObjectClient) PutObject(ctx context.Context, objectKey string, object io.Reader) error {
return loki_instrument.TimeRequest(ctx, "S3.PutObject", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
Expand Down
35 changes: 32 additions & 3 deletions pkg/storage/chunk/client/azure/blob_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func (b *BlobStorage) GetObject(ctx context.Context, objectKey string) (io.ReadC
)
err := loki_instrument.TimeRequest(ctx, "azure.GetObject", instrument.NewHistogramCollector(b.metrics.requestDuration), instrument.ErrorCode, func(ctx context.Context) error {
var err error
rc, size, err = b.getObject(ctx, objectKey)
rc, size, err = b.getObject(ctx, objectKey, 0, 0)
return err
})
b.metrics.egressBytesTotal.Add(float64(size))
Expand All @@ -262,14 +262,43 @@ func (b *BlobStorage) GetObject(ctx context.Context, objectKey string) (io.ReadC
return client_util.NewReadCloserWithContextCancelFunc(rc, cancel), size, nil
}

func (b *BlobStorage) getObject(ctx context.Context, objectKey string) (rc io.ReadCloser, size int64, err error) {
// GetObject returns a reader and the size for the specified object key.
func (b *BlobStorage) GetObjectRange(ctx context.Context, objectKey string, offset, length int64) (io.ReadCloser, error) {
var cancel context.CancelFunc = func() {}
if b.cfg.RequestTimeout > 0 {
ctx, cancel = context.WithTimeout(ctx, (time.Duration(b.cfg.MaxRetries)*b.cfg.RequestTimeout)+(time.Duration(b.cfg.MaxRetries-1)*b.cfg.MaxRetryDelay)) // timeout only after azure client's built in retries
}

var (
size int64
rc io.ReadCloser
)
err := loki_instrument.TimeRequest(ctx, "azure.GetObject", instrument.NewHistogramCollector(b.metrics.requestDuration), instrument.ErrorCode, func(ctx context.Context) error {
var err error
rc, size, err = b.getObject(ctx, objectKey, offset, length)
return err
})
b.metrics.egressBytesTotal.Add(float64(size))
if err != nil {
// cancel the context if there is an error.
cancel()
return nil, err
}
// else return a wrapped ReadCloser which cancels the context while closing the reader.
return client_util.NewReadCloserWithContextCancelFunc(rc, cancel), nil
}

func (b *BlobStorage) getObject(ctx context.Context, objectKey string, offset, length int64) (rc io.ReadCloser, size int64, err error) {
if offset == 0 && length == 0 {
length = azblob.CountToEnd // azblob.CountToEnd == 0 but leaving this here for clarity
}
blockBlobURL, err := b.getBlobURL(objectKey, true)
if err != nil {
return nil, 0, err
}

// Request access to the blob
downloadResponse, err := blockBlobURL.Download(ctx, 0, azblob.CountToEnd, azblob.BlobAccessConditions{}, false, noClientKey)
downloadResponse, err := blockBlobURL.Download(ctx, offset, length, azblob.BlobAccessConditions{}, false, noClientKey)
if err != nil {
return nil, 0, err
}
Expand Down
13 changes: 13 additions & 0 deletions pkg/storage/chunk/client/baidubce/bos_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,19 @@ func (b *BOSObjectStorage) GetObject(ctx context.Context, objectKey string) (io.
return res.Body, size, nil
}

func (b *BOSObjectStorage) GetObjectRange(ctx context.Context, objectKey string, offset, length int64) (io.ReadCloser, error) {
var res *api.GetObjectResult
err := instrument.CollectedRequest(ctx, "BOS.GetObject", bosRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
var requestErr error
res, requestErr = b.client.GetObject(b.cfg.BucketName, objectKey, nil, offset, offset+length-1)
return requestErr
})
if err != nil {
return nil, errors.Wrapf(err, "failed to get BOS object [ %s ]", objectKey)
}
return res.Body, nil
}

func (b *BOSObjectStorage) List(ctx context.Context, prefix string, delimiter string) ([]client.StorageObject, []client.StorageCommonPrefix, error) {
var storageObjects []client.StorageObject
var commonPrefixes []client.StorageCommonPrefix
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/chunk/client/congestion/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ func (a *AIMDController) GetObject(ctx context.Context, objectKey string) (io.Re
return rc, sz, err
}

func (a *AIMDController) GetObjectRange(ctx context.Context, objectKey string, offset, length int64) (io.ReadCloser, error) {
return a.inner.GetObjectRange(ctx, objectKey, offset, length)
}

func (a *AIMDController) List(ctx context.Context, prefix string, delimiter string) ([]client.StorageObject, []client.StorageCommonPrefix, error) {
return a.inner.List(ctx, prefix, delimiter)
}
Expand Down Expand Up @@ -213,6 +217,9 @@ func (n *NoopController) PutObject(context.Context, string, io.Reader) error { r
func (n *NoopController) GetObject(context.Context, string) (io.ReadCloser, int64, error) {
return nil, 0, nil
}
func (n *NoopController) GetObjectRange(context.Context, string, int64, int64) (io.ReadCloser, error) {
return nil, nil
}

func (n *NoopController) List(context.Context, string, string) ([]client.StorageObject, []client.StorageCommonPrefix, error) {
return nil, nil, nil
Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/chunk/client/congestion/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,9 @@ func (m *mockObjectClient) GetObject(context.Context, string) (io.ReadCloser, in

return io.NopCloser(strings.NewReader("bar")), 3, nil
}
func (m *mockObjectClient) GetObjectRange(context.Context, string, int64, int64) (io.ReadCloser, error) {
panic("not implemented")
}

func (m *mockObjectClient) ObjectExists(context.Context, string) (bool, error) {
panic("not implemented")
Expand Down
18 changes: 18 additions & 0 deletions pkg/storage/chunk/client/gcp/gcs_object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,24 @@ func (s *GCSObjectClient) GetObject(ctx context.Context, objectKey string) (io.R
return util.NewReadCloserWithContextCancelFunc(rc, cancel), size, nil
}

// GetObject returns a reader and the size for the specified object key from the configured GCS bucket.
func (s *GCSObjectClient) GetObjectRange(ctx context.Context, objectKey string, offset, length int64) (io.ReadCloser, error) {
var cancel context.CancelFunc = func() {}
if s.cfg.RequestTimeout > 0 {
ctx, cancel = context.WithTimeout(ctx, s.cfg.RequestTimeout)
}

rangeReader, err := s.getsBuckets.Object(objectKey).NewRangeReader(ctx, offset, length)
if err != nil {
// cancel the context if there is an error.
cancel()
return nil, err
}

// else return a wrapped ReadCloser which cancels the context while closing the reader.
return util.NewReadCloserWithContextCancelFunc(rangeReader, cancel), nil
}

func (s *GCSObjectClient) getObject(ctx context.Context, objectKey string) (rc io.ReadCloser, size int64, err error) {
reader, err := s.getsBuckets.Object(objectKey).NewReader(ctx)
if err != nil {
Expand Down
31 changes: 31 additions & 0 deletions pkg/storage/chunk/client/ibmcloud/cos_object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ibmcloud
import (
"context"
"flag"
"fmt"
"hash/fnv"
"io"
"net"
Expand Down Expand Up @@ -368,6 +369,36 @@ func (c *COSObjectClient) GetObject(ctx context.Context, objectKey string) (io.R
return nil, 0, errors.Wrap(err, "failed to get cos object")
}

// GetObject returns a reader and the size for the specified object key from the configured S3 bucket.
func (c *COSObjectClient) GetObjectRange(ctx context.Context, objectKey string, offset, length int64) (io.ReadCloser, error) {
var resp *cos.GetObjectOutput

// Map the key into a bucket
bucket := c.bucketFromKey(objectKey)

retries := backoff.New(ctx, c.cfg.BackoffConfig)
err := ctx.Err()
for retries.Ongoing() {
if ctx.Err() != nil {
return nil, errors.Wrap(ctx.Err(), "ctx related error during cos getObject")
}
err = instrument.CollectedRequest(ctx, "COS.GetObject", cosRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
var requestErr error
resp, requestErr = c.hedgedCOS.GetObjectWithContext(ctx, &cos.GetObjectInput{
Bucket: ibm.String(bucket),
Key: ibm.String(objectKey),
Range: ibm.String(fmt.Sprintf("bytes=%d-%d", offset, offset+length-1)),
})
return requestErr
})
if err == nil && resp.Body != nil {
return resp.Body, nil
}
retries.Wait()
}
return nil, errors.Wrap(err, "failed to get cos object")
}

// PutObject into the store
func (c *COSObjectClient) PutObject(ctx context.Context, objectKey string, object io.Reader) error {
return instrument.CollectedRequest(ctx, "COS.PutObject", cosRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
Expand Down
24 changes: 24 additions & 0 deletions pkg/storage/chunk/client/local/fs_object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ type FSObjectClient struct {
pathSeparator string
}

var _ client.ObjectClient = (*FSObjectClient)(nil)

// NewFSObjectClient makes a chunk.Client which stores chunks as files in the local filesystem.
func NewFSObjectClient(cfg FSConfig) (*FSObjectClient, error) {
// filepath.Clean cleans up the path by removing unwanted duplicate slashes, dots etc.
Expand Down Expand Up @@ -88,6 +90,28 @@ func (f *FSObjectClient) GetObject(_ context.Context, objectKey string) (io.Read
return fl, stats.Size(), nil
}

type SectionReadCloser struct {
io.Reader
closeFn func() error
}

func (l SectionReadCloser) Close() error {
return l.closeFn()
}

// GetObject from the store
func (f *FSObjectClient) GetObjectRange(_ context.Context, objectKey string, offset, length int64) (io.ReadCloser, error) {
fl, err := os.Open(filepath.Join(f.cfg.Directory, filepath.FromSlash(objectKey)))
if err != nil {
return nil, err
}
closer := SectionReadCloser{
Reader: io.NewSectionReader(fl, offset, length),
closeFn: fl.Close,
}
return closer, nil
}

// PutObject into the store
func (f *FSObjectClient) PutObject(_ context.Context, objectKey string, object io.Reader) error {
fullPath := filepath.Join(f.cfg.Directory, filepath.FromSlash(objectKey))
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/chunk/client/object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type ObjectClient interface {
PutObject(ctx context.Context, objectKey string, object io.Reader) error
// NOTE: The consumer of GetObject should always call the Close method when it is done reading which otherwise could cause a resource leak.
GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error)
GetObjectRange(ctx context.Context, objectKey string, off, length int64) (io.ReadCloser, error)

// List objects with given prefix.
//
Expand Down
14 changes: 14 additions & 0 deletions pkg/storage/chunk/client/openstack/swift_object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,20 @@ func (s *SwiftObjectClient) GetObject(_ context.Context, objectKey string) (io.R
return io.NopCloser(&buf), int64(buf.Len()), nil
}

// GetObject returns a reader and the size for the specified object key from the configured swift container.
func (s *SwiftObjectClient) GetObjectRange(_ context.Context, objectKey string, offset, length int64) (io.ReadCloser, error) {
var buf bytes.Buffer
h := swift.Headers{
"Range": fmt.Sprintf("bytes=%d-%d", offset, offset+length-1),
}
_, err := s.hedgingConn.ObjectGet(s.cfg.ContainerName, objectKey, &buf, false, h)
if err != nil {
return nil, err
}

return io.NopCloser(&buf), nil
}

// PutObject puts the specified bytes into the configured Swift container at the provided key
func (s *SwiftObjectClient) PutObject(_ context.Context, objectKey string, object io.Reader) error {
_, err := s.conn.ObjectPut(s.cfg.ContainerName, objectKey, object, false, "", "", nil)
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/chunk/client/prefixed_object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ func (p PrefixedObjectClient) GetObject(ctx context.Context, objectKey string) (
return p.downstreamClient.GetObject(ctx, p.prefix+objectKey)
}

func (p PrefixedObjectClient) GetObjectRange(ctx context.Context, objectKey string, offset, length int64) (io.ReadCloser, error) {
return p.downstreamClient.GetObjectRange(ctx, p.prefix+objectKey, offset, length)
}

func (p PrefixedObjectClient) List(ctx context.Context, prefix, delimiter string) ([]StorageObject, []StorageCommonPrefix, error) {
objects, commonPrefixes, err := p.downstreamClient.List(ctx, p.prefix+prefix, delimiter)
if err != nil {
Expand Down
20 changes: 20 additions & 0 deletions pkg/storage/chunk/client/testutils/inmemory_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,26 @@ func (m *InMemoryObjectClient) GetObject(_ context.Context, objectKey string) (i
return io.NopCloser(bytes.NewReader(buf)), int64(len(buf)), nil
}

// GetObject implements client.ObjectClient.
func (m *InMemoryObjectClient) GetObjectRange(_ context.Context, objectKey string, offset, length int64) (io.ReadCloser, error) {
m.mtx.RLock()
defer m.mtx.RUnlock()

if m.mode == MockStorageModeWriteOnly {
return nil, errPermissionDenied
}

buf, ok := m.objects[objectKey]
if !ok {
return nil, errStorageObjectNotFound
}
if len(buf) < int(offset+length) {
return nil, io.ErrUnexpectedEOF
}

return io.NopCloser(bytes.NewReader(buf[offset : offset+length])), nil
}

// PutObject implements client.ObjectClient.
func (m *InMemoryObjectClient) PutObject(_ context.Context, objectKey string, object io.Reader) error {
buf, err := io.ReadAll(object)
Expand Down

0 comments on commit d9c441e

Please sign in to comment.