Skip to content

Commit

Permalink
tests linearizability: trigger snapshot related failpoints
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Kanivets <bkanivets@apple.com>
  • Loading branch information
Bogdan Kanivets committed Jan 13, 2023
1 parent 6315f1c commit f877cd3
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 10 deletions.
17 changes: 17 additions & 0 deletions tests/framework/e2e/etcd_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,9 +307,26 @@ func (f *BinaryFailpoints) Setup(ctx context.Context, failpoint, payload string)
if resp.StatusCode != http.StatusNoContent {
return fmt.Errorf("bad status code: %d", resp.StatusCode)
}

// if member restarts, failpoint that was set using http call will be unset
// set failpoint using env var for this case
evars := map[string]string{"GOFAIL_FAILPOINTS": fmt.Sprintf("%s=%s", failpoint, payload)}
for k, v := range f.member.Config().EnvVars {
evars[k] = v
}
f.member.Config().EnvVars = evars

return nil
}

func (f *BinaryFailpoints) Unset() {
//TODO: add unset using http call for the case of failpoints that don't use `panic`
// and member is still running after failpoint trigger

// reset failpoints that were set using env vars
delete(f.member.Config().EnvVars, "GOFAIL_FAILPOINTS")
}

var httpClient = http.Client{
Timeout: 10 * time.Millisecond,
}
Expand Down
91 changes: 81 additions & 10 deletions tests/linearizability/failpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

const (
triggerTimeout = time.Second
triggerTimeout = 2 * time.Second
)

var (
Expand Down Expand Up @@ -68,13 +68,16 @@ var (
BlackholePeerNetwork,
DelayPeerNetwork,
}}
RaftBeforeApplySnapPanic Failpoint = goPanicFailpoint{"raftBeforeApplySnap", triggerRestartWithSnapshot, Follower}
RaftAfterApplySnapPanic Failpoint = goPanicFailpoint{"raftAfterApplySnap", triggerRestartWithSnapshot, Follower}
RaftAfterWALReleasePanic Failpoint = goPanicFailpoint{"raftAfterWALRelease", triggerRestartWithSnapshot, Follower}
RaftBeforeSaveSnapPanic Failpoint = goPanicFailpoint{"raftBeforeSaveSnap", triggerRestartWithSnapshot, Follower}
RaftAfterSaveSnapPanic Failpoint = goPanicFailpoint{"raftAfterSaveSnap", triggerRestartWithSnapshot, Follower}
RandomSnapshotFailpoint Failpoint = randomFailpoint{[]Failpoint{
RaftBeforeApplySnapPanic, RaftAfterApplySnapPanic, RaftAfterWALReleasePanic, RaftBeforeSaveSnapPanic, RaftAfterSaveSnapPanic,
}}
// TODO: Figure out how to reliably trigger below failpoints and add them to RandomFailpoint
raftBeforeApplySnapPanic Failpoint = goPanicFailpoint{"raftBeforeApplySnap", nil, AnyMember}
raftAfterApplySnapPanic Failpoint = goPanicFailpoint{"raftAfterApplySnap", nil, AnyMember}
raftAfterWALReleasePanic Failpoint = goPanicFailpoint{"raftAfterWALRelease", nil, AnyMember}
raftBeforeFollowerSendPanic Failpoint = goPanicFailpoint{"raftBeforeFollowerSend", nil, AnyMember}
raftBeforeSaveSnapPanic Failpoint = goPanicFailpoint{"raftBeforeSaveSnap", nil, AnyMember}
raftAfterSaveSnapPanic Failpoint = goPanicFailpoint{"raftAfterSaveSnap", nil, AnyMember}
)

type Failpoint interface {
Expand Down Expand Up @@ -118,7 +121,7 @@ func (f killFailpoint) Available(e2e.EtcdProcess) bool {

type goPanicFailpoint struct {
failpoint string
trigger func(ctx context.Context, member e2e.EtcdProcess) error
trigger func(ctx context.Context, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster) error
target failpointTarget
}

Expand All @@ -127,6 +130,7 @@ type failpointTarget string
const (
AnyMember failpointTarget = "AnyMember"
Leader failpointTarget = "Leader"
Follower failpointTarget = "Follower"
)

func (f goPanicFailpoint) Trigger(t *testing.T, ctx context.Context, clus *e2e.EtcdProcessCluster) error {
Expand All @@ -141,7 +145,7 @@ func (f goPanicFailpoint) Trigger(t *testing.T, ctx context.Context, clus *e2e.E
t.Logf("gofailpoint setup failed: %v", err)
}
if f.trigger != nil {
err = f.trigger(triggerCtx, member)
err = f.trigger(triggerCtx, member, clus)
if err != nil {
t.Logf("triggering gofailpoint failed: %v", err)
}
Expand All @@ -152,6 +156,7 @@ func (f goPanicFailpoint) Trigger(t *testing.T, ctx context.Context, clus *e2e.E
}
}

member.Failpoints().Unset()
err := member.Start(ctx)
if err != nil {
return err
Expand All @@ -165,6 +170,8 @@ func (f goPanicFailpoint) pickMember(t *testing.T, clus *e2e.EtcdProcessCluster)
return clus.Procs[rand.Int()%len(clus.Procs)]
case Leader:
return clus.Procs[clus.WaitLeader(t)]
case Follower:
return clus.Procs[(clus.WaitLeader(t)+1)%len(clus.Procs)]
default:
panic("unknown target")
}
Expand All @@ -184,7 +191,7 @@ func (f goPanicFailpoint) Name() string {
return f.failpoint
}

func triggerDefrag(ctx context.Context, member e2e.EtcdProcess) error {
func triggerDefrag(ctx context.Context, member e2e.EtcdProcess, _ *e2e.EtcdProcessCluster) error {
cc, err := clientv3.New(clientv3.Config{
Endpoints: member.EndpointsV3(),
Logger: zap.NewNop(),
Expand All @@ -202,7 +209,7 @@ func triggerDefrag(ctx context.Context, member e2e.EtcdProcess) error {
return nil
}

func triggerCompact(ctx context.Context, member e2e.EtcdProcess) error {
func triggerCompact(ctx context.Context, member e2e.EtcdProcess, _ *e2e.EtcdProcessCluster) error {
cc, err := clientv3.New(clientv3.Config{
Endpoints: member.EndpointsV3(),
Logger: zap.NewNop(),
Expand All @@ -224,6 +231,70 @@ func triggerCompact(ctx context.Context, member e2e.EtcdProcess) error {
return nil
}

func latestRevision(clus *e2e.EtcdProcessCluster) (int64, error) {
latestRev := int64(-1)
for _, ep := range clus.EndpointsV3() {
cc, err := clientv3.New(clientv3.Config{
Endpoints: []string{ep},
Logger: zap.NewNop(),
DialKeepAliveTime: 1 * time.Millisecond,
DialKeepAliveTimeout: 5 * time.Millisecond,
})
if err != nil {
return -1, fmt.Errorf("failed creating client: %w", err)
}
// some members might be stopped/paused, use short timeout to fail fast
cntx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
status, errS := cc.Status(cntx, ep)
if errS != nil {
// ignore error, some members might be stopped
cancel()
cc.Close()
continue
}
if latestRev < status.Header.Revision {
latestRev = status.Header.Revision
}
cancel()
cc.Close()
}
return latestRev, nil
}

func triggerRestartWithSnapshot(ctx context.Context, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster) error {
revBefore, errBefore := latestRevision(clus)
if errBefore != nil {
return errBefore
}
err := member.Stop()
if err != nil {
return fmt.Errorf("failed to stop: %w", err)
}

// wait for SnapshotCount revisions, this will trigger snapshot transfer when stopped member is started
deadline := time.Now().Add(100 * time.Millisecond)
for {
revAfter, errAfter := latestRevision(clus)
if errAfter != nil {
return errAfter
}
if revAfter-revBefore > int64(clus.Cfg.SnapshotCount) {
break
}
if time.Now().After(deadline) {
return fmt.Errorf("waiting too long for SnapshotCount %d", clus.Cfg.SnapshotCount)
}
}

// start stopped member. It will get snapshot msg and trigger snapshot related failpoints
err = member.Start(ctx)
if err == nil || !strings.Contains(err.Error(), "match not found") {
return fmt.Errorf("expected member.Start to fail with \"match not found\". Got err %w", err)
}

return nil
}

type randomFailpoint struct {
failpoints []Failpoint
}
Expand Down
9 changes: 9 additions & 0 deletions tests/linearizability/linearizability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,15 @@ func TestLinearizability(t *testing.T) {
e2e.WithGoFailEnabled(true),
),
},
{
name: "Snapshot",
failpoint: RandomSnapshotFailpoint,
config: *e2e.NewConfig(
e2e.WithGoFailEnabled(true),
e2e.WithSnapshotCount(5),
e2e.WithSnapshotCatchUpEntries(5),
),
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
Expand Down

0 comments on commit f877cd3

Please sign in to comment.