Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gcp/saver: Only return errors.KindAlreadyExists if all three exist #1957

Merged
merged 6 commits into from
Jun 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cmd/proxy/actions/app_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func addProxyRoutes(

lister := module.NewVCSLister(c.GoBinary, c.GoBinaryEnvVars, fs)
checker := storage.WithChecker(s)
withSingleFlight, err := getSingleFlight(l, c, checker)
withSingleFlight, err := getSingleFlight(l, c, s, checker)
if err != nil {
return err
}
Expand Down Expand Up @@ -137,7 +137,7 @@ func (l *athensLoggerForRedis) Printf(ctx context.Context, format string, v ...a
l.logger.WithContext(ctx).Printf(format, v...)
}

func getSingleFlight(l *log.Logger, c *config.Config, checker storage.Checker) (stash.Wrapper, error) {
func getSingleFlight(l *log.Logger, c *config.Config, s storage.Backend, checker storage.Checker) (stash.Wrapper, error) {
switch c.SingleFlightType {
case "", "memory":
return stash.WithSingleflight, nil
Expand Down Expand Up @@ -173,7 +173,7 @@ func getSingleFlight(l *log.Logger, c *config.Config, checker storage.Checker) (
if c.StorageType != "gcp" {
return nil, fmt.Errorf("gcp SingleFlight only works with a gcp storage type and not: %v", c.StorageType)
}
return stash.WithGCSLock, nil
return stash.WithGCSLock(c.SingleFlight.GCP.StaleThreshold, s)
case "azureblob":
if c.StorageType != "azureblob" {
return nil, fmt.Errorf("azureblob SingleFlight only works with a azureblob storage type and not: %v", c.StorageType)
Expand Down
4 changes: 4 additions & 0 deletions config.dev.toml
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,10 @@ ShutdownTimeout = 60
# Max retries while acquiring the lock. Defaults to 10.
# Env override: ATHENS_REDIS_LOCK_MAX_RETRIES
MaxRetries = 10
[SingleFlight.GCP]
# Threshold for how long to wait in seconds for an in-progress GCP upload to
# be considered to have failed to unlock.
StaleThreshold = 120
[Storage]
# Only storage backends that are specified in Proxy.StorageType are required here
[Storage.CDN]
Expand Down
11 changes: 11 additions & 0 deletions docs/content/configuration/storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -492,3 +492,14 @@ Optionally, like `redis`, you can also specify a password to connect to the `red
SentinelPassword = "sekret"

Distributed lock options can be customised for redis sentinal as well, in a similar manner as described above for redis.


### Using GCP as a singleflight mechanism

The GCP singleflight mechanism does not required configuration, and works out of the box. It has a
single option with which it can be customized:

[SingleFlight.GCP]
# Threshold for how long to wait in seconds for an in-progress GCP upload to
# be considered to have failed to unlock.
StaleThreshold = 120
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ func defaultConfig() *Config {
SentinelPassword: "sekret",
LockConfig: DefaultRedisLockConfig(),
},
GCP: DefaultGCPConfig(),
},
Index: &Index{
MySQL: &MySQL{
Expand Down
3 changes: 3 additions & 0 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ func TestParseExampleConfig(t *testing.T) {
LockConfig: DefaultRedisLockConfig(),
},
Etcd: &Etcd{Endpoints: "localhost:2379,localhost:22379,localhost:32379"},
GCP: DefaultGCPConfig(),
}

expConf := &Config{
Expand Down Expand Up @@ -391,6 +392,8 @@ func getEnvMap(config *Config) map[string]string {
} else if singleFlight.Etcd != nil {
envVars["ATHENS_SINGLE_FLIGHT_TYPE"] = "etcd"
envVars["ATHENS_ETCD_ENDPOINTS"] = singleFlight.Etcd.Endpoints
} else if singleFlight.GCP != nil {
envVars["ATHENS_GCP_STALE_THRESHOLD"] = strconv.Itoa(singleFlight.GCP.StaleThreshold)
}
}
return envVars
Expand Down
13 changes: 13 additions & 0 deletions pkg/config/singleflight.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ type SingleFlight struct {
Etcd *Etcd
Redis *Redis
RedisSentinel *RedisSentinel
GCP *GCP
}

// Etcd holds client side configuration
Expand Down Expand Up @@ -48,3 +49,15 @@ func DefaultRedisLockConfig() *RedisLockConfig {
MaxRetries: 10,
}
}

// GCP is the configuration for GCP locking.
type GCP struct {
StaleThreshold int `envconfig:"ATHENS_GCP_STALE_THRESHOLD"`
}

// DefaultGCPConfig returns the default GCP locking configuration.
func DefaultGCPConfig() *GCP {
return &GCP{
StaleThreshold: 120,
}
}
21 changes: 19 additions & 2 deletions pkg/stash/with_gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,32 @@ package stash

import (
"context"
"fmt"
"time"

"github.com/gomods/athens/pkg/errors"
"github.com/gomods/athens/pkg/observ"
"github.com/gomods/athens/pkg/storage"
"github.com/gomods/athens/pkg/storage/gcp"
)

// WithGCSLock returns a distributed singleflight
// using a GCS backend. See the config.toml documentation for details.
func WithGCSLock(s Stasher) Stasher {
return &gcsLock{s}
func WithGCSLock(staleThreshold int, s storage.Backend) (Wrapper, error) {
if staleThreshold <= 0 {
return nil, errors.E("stash.WithGCSLock", fmt.Errorf("invalid stale threshold"))
}
// Since we *must* be using a GCP stoagfe backend, we can abuse this
// fact to mutate it, so that we can get our threshold into Save().
// Your instincts are correct, this is kind of gross.
gs, ok := s.(*gcp.Storage)
if !ok {
return nil, errors.E("stash.WithGCSLock", fmt.Errorf("GCP singleflight can only be used with GCP storage"))
}
gs.SetStaleThreshold(time.Duration(staleThreshold) * time.Second)
return func(s Stasher) Stasher {
return &gcsLock{s}
}, nil
}

type gcsLock struct {
Expand Down
79 changes: 78 additions & 1 deletion pkg/stash/with_gcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package stash
import (
"bytes"
"context"
"fmt"
"io"
"os"
"strings"
Expand All @@ -17,6 +18,12 @@ import (
"golang.org/x/sync/errgroup"
)

type failReader int

func (f *failReader) Read([]byte) (int, error) {
return 0, fmt.Errorf("failure")
}

// TestWithGCS requires a real GCP backend implementation
// and it will ensure that saving to modules at the same time
// is done synchronously so that only the first module gets saved.
Expand All @@ -41,7 +48,11 @@ func TestWithGCS(t *testing.T) {
for i := 0; i < 5; i++ {
content := uuid.New().String()
ms := &mockGCPStasher{strg, content}
s := WithGCSLock(ms)
gs, err := WithGCSLock(120, strg)
if err != nil {
t.Fatal(err)
}
s := gs(ms)
eg.Go(func() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
Expand Down Expand Up @@ -79,6 +90,72 @@ func TestWithGCS(t *testing.T) {
}
}

// TestWithGCSPartialFailure equires a real GCP backend implementation
// and ensures that if one of the non-singleflight-lock files fails to
// upload, that the cache does not remain poisoned.
func TestWithGCSPartialFailure(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel()
const (
mod = "stashmod"
ver = "v1.0.0"
)
strg := getStorage(t)
strg.Delete(ctx, mod, ver)
defer strg.Delete(ctx, mod, ver)

// sanity check
_, err := strg.GoMod(ctx, mod, ver)
if !errors.Is(err, errors.KindNotFound) {
t.Fatalf("expected the stash bucket to return a NotFound error but got: %v", err)
}

content := uuid.New().String()
ms := &mockGCPStasher{strg, content}
fr := new(failReader)
gs, err := WithGCSLock(120, strg)
if err != nil {
t.Fatal(err)
}
s := gs(ms)
// We simulate a failure by manually passing an io.Reader that will fail.
err = ms.strg.Save(ctx, "stashmod", "v1.0.0", []byte(ms.content), fr, []byte(ms.content))
if err == nil {
// We *want* to fail.
t.Fatal(err)
}

// Now try a Stash. This should upload the missing files.
_, err = s.Stash(ctx, "stashmod", "v1.0.0")
if err != nil {
t.Fatal(err)
}

info, err := strg.Info(ctx, mod, ver)
if err != nil {
t.Fatal(err)
}
modContent, err := strg.GoMod(ctx, mod, ver)
if err != nil {
t.Fatal(err)
}
zip, err := strg.Zip(ctx, mod, ver)
if err != nil {
t.Fatal(err)
}
defer zip.Close()
zipContent, err := io.ReadAll(zip)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(info, modContent) {
t.Fatalf("expected info and go.mod to be equal but info was {%v} and content was {%v}", string(info), string(modContent))
}
if !bytes.Equal(info, zipContent) {
t.Fatalf("expected info and zip to be equal but info was {%v} and content was {%v}", string(info), string(zipContent))
}
}

// mockGCPStasher is like mockStasher
// but leverages in memory storage
// so that redis can determine
Expand Down
5 changes: 3 additions & 2 deletions pkg/storage/gcp/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ import (

// Storage implements the (./pkg/storage).Backend interface.
type Storage struct {
bucket *storage.BucketHandle
timeout time.Duration
bucket *storage.BucketHandle
timeout time.Duration
staleThreshold time.Duration
}

// New returns a new Storage instance backed by a Google Cloud Storage bucket.
Expand Down
Loading
Loading