Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gcp/saver: Only return errors.KindAlreadyExists if all three exist #1957

Merged
merged 6 commits into from
Jun 2, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions pkg/stash/with_gcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package stash
import (
"bytes"
"context"
"fmt"
"io"
"os"
"strings"
Expand All @@ -17,6 +18,12 @@ import (
"golang.org/x/sync/errgroup"
)

type failReader int

func (f *failReader) Read([]byte) (int, error) {
return 0, fmt.Errorf("failure")
}

// TestWithGCS requires a real GCP backend implementation
// and it will ensure that saving to modules at the same time
// is done synchronously so that only the first module gets saved.
Expand Down Expand Up @@ -79,6 +86,68 @@ func TestWithGCS(t *testing.T) {
}
}

// TestWithGCSPartialFailure equires a real GCP backend implementation
// and ensures that if one of the non-singleflight-lock files fails to
// upload, that the cache does not remain poisoned.
func TestWithGCSPartialFailure(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel()
const (
mod = "stashmod"
ver = "v1.0.0"
)
strg := getStorage(t)
strg.Delete(ctx, mod, ver)
defer strg.Delete(ctx, mod, ver)

// sanity check
_, err := strg.GoMod(ctx, mod, ver)
if !errors.Is(err, errors.KindNotFound) {
t.Fatalf("expected the stash bucket to return a NotFound error but got: %v", err)
}

content := uuid.New().String()
ms := &mockGCPStasher{strg, content}
fr := new(failReader)
// We simulate a failure by manually passing an io.Reader that will fail.
err = ms.strg.Save(ctx, "stashmod", "v1.0.0", []byte(ms.content), fr, []byte(ms.content))
if err == nil {
// We *want* to fail.
t.Fatal(err)
}

// Now try a Stash. This should upload the missing files.
s := WithGCSLock(ms)
_, err = s.Stash(ctx, "stashmod", "v1.0.0")
if err != nil {
t.Fatal(err)
}

info, err := strg.Info(ctx, mod, ver)
if err != nil {
t.Fatal(err)
}
modContent, err := strg.GoMod(ctx, mod, ver)
if err != nil {
t.Fatal(err)
}
zip, err := strg.Zip(ctx, mod, ver)
if err != nil {
t.Fatal(err)
}
defer zip.Close()
zipContent, err := io.ReadAll(zip)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(info, modContent) {
t.Fatalf("expected info and go.mod to be equal but info was {%v} and content was {%v}", string(info), string(modContent))
}
if !bytes.Equal(info, zipContent) {
t.Fatalf("expected info and zip to be equal but info was {%v} and content was {%v}", string(info), string(zipContent))
}
}

// mockGCPStasher is like mockStasher
// but leverages in memory storage
// so that redis can determine
Expand Down
116 changes: 107 additions & 9 deletions pkg/storage/gcp/saver.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"io"
"time"

"cloud.google.com/go/storage"
"github.com/gomods/athens/pkg/config"
Expand All @@ -12,6 +13,10 @@ import (
googleapi "google.golang.org/api/googleapi"
)

// After how long we consider an "in_progress" metadata key stale,
// due to failure to remove it.
const inProgressStaleThreshold = 2 * time.Minute
matt0x6F marked this conversation as resolved.
Show resolved Hide resolved

// Save uploads the module's .mod, .zip and .info files for a given version
// It expects a context, which can be provided using context.Background
// from the standard library until context has been threaded down the stack.
Expand All @@ -20,40 +25,133 @@ import (
// Uploaded files are publicly accessible in the storage bucket as per
// an ACL rule.
func (s *Storage) Save(ctx context.Context, module, version string, mod []byte, zip io.Reader, info []byte) error {
const op errors.Op = "gcp.Save"
const op errors.Op = "gcp.save"
ctx, span := observ.StartSpan(ctx, op.String())
defer span.End()
gomodPath := config.PackageVersionedName(module, version, "mod")
err := s.upload(ctx, gomodPath, bytes.NewReader(mod))
if err != nil {
innerErr := s.save(ctx, module, version, mod, zip, info)
if errors.Is(innerErr, errors.KindAlreadyExists) {
// Cache hit.
return errors.E(op, innerErr)
}
// No cache hit. Remove the metadata lock if it is there.
inProgress, outerErr := s.checkUploadInProgress(ctx, gomodPath)
if outerErr != nil {
return errors.E(op, outerErr)
}
if inProgress {
outerErr = s.removeInProgressMetadata(ctx, gomodPath)
if outerErr != nil {
return errors.E(op, outerErr)
}
}
return innerErr
}

func (s *Storage) save(ctx context.Context, module, version string, mod []byte, zip io.Reader, info []byte) error {
const op errors.Op = "gcp.save"
ctx, span := observ.StartSpan(ctx, op.String())
defer span.End()
gomodPath := config.PackageVersionedName(module, version, "mod")
seenAlreadyExists := 0
err := s.upload(ctx, gomodPath, bytes.NewReader(mod), true)
// If it already exists, check the object metadata to see if the
// other two are still uploading in progress somewhere else. If they
// are, return a cache hit. If not, continue on to the other two,
// and only return a cache hit if all three exist.
if errors.Is(err, errors.KindAlreadyExists) {
inProgress, progressErr := s.checkUploadInProgress(ctx, gomodPath)
if progressErr != nil {
return errors.E(op, progressErr)
}
if inProgress {
// err is known to be errors.KindAlreadyExists at this point, so
// this is a cache hit return.
return errors.E(op, err)
}
seenAlreadyExists++
} else if err != nil {
// Other errors
return errors.E(op, err)
}
zipPath := config.PackageVersionedName(module, version, "zip")
err = s.upload(ctx, zipPath, zip)
if err != nil {
err = s.upload(ctx, zipPath, zip, false)
if errors.Is(err, errors.KindAlreadyExists) {
seenAlreadyExists++
} else if err != nil {
return errors.E(op, err)
}
infoPath := config.PackageVersionedName(module, version, "info")
err = s.upload(ctx, infoPath, bytes.NewReader(info))
err = s.upload(ctx, infoPath, bytes.NewReader(info), false)
// Have all three returned errors.KindAlreadyExists?
if errors.Is(err, errors.KindAlreadyExists) {
if seenAlreadyExists == 2 {
return errors.E(op, err)
}
} else if err != nil {
return errors.E(op, err)
}
return nil
}

func (s *Storage) removeInProgressMetadata(ctx context.Context, gomodPath string) error {
const op errors.Op = "gcp.removeInProgressMetadata"
ctx, span := observ.StartSpan(ctx, op.String())
defer span.End()
_, err := s.bucket.Object(gomodPath).Update(ctx, storage.ObjectAttrsToUpdate{
Metadata: map[string]string{},
})
if err != nil {
return errors.E(op, err)
}
return nil
}

func (s *Storage) upload(ctx context.Context, path string, stream io.Reader) error {
func (s *Storage) checkUploadInProgress(ctx context.Context, gomodPath string) (bool, error) {
const op errors.Op = "gcp.checkUploadInProgress"
ctx, span := observ.StartSpan(ctx, op.String())
defer span.End()
attrs, err := s.bucket.Object(gomodPath).Attrs(ctx)
if err != nil {
return false, errors.E(op, err)
}
if attrs.Metadata != nil {
_, ok := attrs.Metadata["in_progress"]
if ok {
// In case the final call to remove the metadata fails for some reason,
// we have a threshold after which we consider this to be stale.
if time.Since(attrs.Created) > inProgressStaleThreshold {
return false, nil
}
return true, nil
}
}
return false, nil
}

func (s *Storage) upload(ctx context.Context, path string, stream io.Reader, first bool) error {
const op errors.Op = "gcp.upload"
ctx, span := observ.StartSpan(ctx, op.String())
defer span.End()
cancelCtx, cancel := context.WithCancel(ctx)
defer cancel()

wc := s.bucket.Object(path).If(storage.Conditions{
DoesNotExist: true,
}).NewWriter(ctx)
}).NewWriter(cancelCtx)

// We set this metadata only for the first of the three files uploaded,
// for use as a singleflight lock.
if first {
wc.ObjectAttrs.Metadata = make(map[string]string)
wc.ObjectAttrs.Metadata["in_progress"] = "true"
}

// NOTE: content type is auto detected on GCP side and ACL defaults to public
// Once we support private storage buckets this may need refactoring
// unless there is a way to set the default perms in the project.
if _, err := io.Copy(wc, stream); err != nil {
_ = wc.Close()
// Purposely do not close it to avoid creating a partial file.
return err
}

Expand Down
Loading