Skip to content

Commit

Permalink
Merge pull request #15104 from lavacat/main-tests-snapshots
Browse files Browse the repository at this point in the history
tests linearizability: trigger snapshot related failpoints
  • Loading branch information
serathius authored Feb 6, 2023
2 parents 65aa0fa + 91b0569 commit dc51cf1
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 10 deletions.
98 changes: 88 additions & 10 deletions tests/linearizability/failpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

const (
triggerTimeout = 2 * time.Second
triggerTimeout = 5 * time.Second
)

var (
Expand Down Expand Up @@ -68,13 +68,16 @@ var (
BlackholePeerNetwork,
DelayPeerNetwork,
}}
RaftBeforeApplySnapPanic Failpoint = goPanicFailpoint{"raftBeforeApplySnap", triggerBlackholeUntilSnapshot, Follower}
RaftAfterApplySnapPanic Failpoint = goPanicFailpoint{"raftAfterApplySnap", triggerBlackholeUntilSnapshot, Follower}
RaftAfterWALReleasePanic Failpoint = goPanicFailpoint{"raftAfterWALRelease", triggerBlackholeUntilSnapshot, Follower}
RaftBeforeSaveSnapPanic Failpoint = goPanicFailpoint{"raftBeforeSaveSnap", triggerBlackholeUntilSnapshot, Follower}
RaftAfterSaveSnapPanic Failpoint = goPanicFailpoint{"raftAfterSaveSnap", triggerBlackholeUntilSnapshot, Follower}
RandomSnapshotFailpoint Failpoint = randomFailpoint{[]Failpoint{
RaftBeforeApplySnapPanic, RaftAfterApplySnapPanic, RaftAfterWALReleasePanic, RaftBeforeSaveSnapPanic, RaftAfterSaveSnapPanic,
}}
// TODO: Figure out how to reliably trigger below failpoints and add them to RandomFailpoint
raftBeforeApplySnapPanic Failpoint = goPanicFailpoint{"raftBeforeApplySnap", nil, AnyMember}
raftAfterApplySnapPanic Failpoint = goPanicFailpoint{"raftAfterApplySnap", nil, AnyMember}
raftAfterWALReleasePanic Failpoint = goPanicFailpoint{"raftAfterWALRelease", nil, AnyMember}
raftBeforeFollowerSendPanic Failpoint = goPanicFailpoint{"raftBeforeFollowerSend", nil, AnyMember}
raftBeforeSaveSnapPanic Failpoint = goPanicFailpoint{"raftBeforeSaveSnap", nil, AnyMember}
raftAfterSaveSnapPanic Failpoint = goPanicFailpoint{"raftAfterSaveSnap", nil, AnyMember}
)

type Failpoint interface {
Expand Down Expand Up @@ -119,7 +122,7 @@ func (f killFailpoint) Available(e2e.EtcdProcess) bool {

type goPanicFailpoint struct {
failpoint string
trigger func(ctx context.Context, member e2e.EtcdProcess) error
trigger func(t *testing.T, ctx context.Context, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster) error
target failpointTarget
}

Expand All @@ -128,6 +131,7 @@ type failpointTarget string
const (
AnyMember failpointTarget = "AnyMember"
Leader failpointTarget = "Leader"
Follower failpointTarget = "Follower"
)

func (f goPanicFailpoint) Trigger(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error {
Expand All @@ -148,7 +152,7 @@ func (f goPanicFailpoint) Trigger(ctx context.Context, t *testing.T, lg *zap.Log
}
if f.trigger != nil {
lg.Info("Triggering gofailpoint", zap.String("failpoint", f.Name()))
err = f.trigger(triggerCtx, member)
err = f.trigger(t, triggerCtx, member, clus)
if err != nil {
lg.Info("gofailpoint trigger failed", zap.String("failpoint", f.Name()), zap.Error(err))
}
Expand All @@ -175,6 +179,8 @@ func (f goPanicFailpoint) pickMember(t *testing.T, clus *e2e.EtcdProcessCluster)
return clus.Procs[rand.Int()%len(clus.Procs)]
case Leader:
return clus.Procs[clus.WaitLeader(t)]
case Follower:
return clus.Procs[(clus.WaitLeader(t)+1)%len(clus.Procs)]
default:
panic("unknown target")
}
Expand All @@ -194,7 +200,7 @@ func (f goPanicFailpoint) Name() string {
return f.failpoint
}

func triggerDefrag(ctx context.Context, member e2e.EtcdProcess) error {
func triggerDefrag(_ *testing.T, ctx context.Context, member e2e.EtcdProcess, _ *e2e.EtcdProcessCluster) error {
cc, err := clientv3.New(clientv3.Config{
Endpoints: member.EndpointsV3(),
Logger: zap.NewNop(),
Expand All @@ -212,7 +218,7 @@ func triggerDefrag(ctx context.Context, member e2e.EtcdProcess) error {
return nil
}

func triggerCompact(ctx context.Context, member e2e.EtcdProcess) error {
func triggerCompact(_ *testing.T, ctx context.Context, member e2e.EtcdProcess, _ *e2e.EtcdProcessCluster) error {
cc, err := clientv3.New(clientv3.Config{
Endpoints: member.EndpointsV3(),
Logger: zap.NewNop(),
Expand All @@ -234,6 +240,78 @@ func triggerCompact(ctx context.Context, member e2e.EtcdProcess) error {
return nil
}

// latestRevisionForEndpoint gets latest revision of the first endpoint in Client.Endpoints list
func latestRevisionForEndpoint(ctx context.Context, c *clientv3.Client) (int64, error) {
cntx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
defer cancel()
resp, err := c.Status(cntx, c.Endpoints()[0])
if err != nil {
return 0, err
}
return resp.Header.Revision, err
}

func triggerBlackholeUntilSnapshot(t *testing.T, ctx context.Context, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster) error {
leader := clus.Procs[clus.WaitLeader(t)]
lc, err := clientv3.New(clientv3.Config{
Endpoints: []string{leader.Config().ClientURL},
Logger: zap.NewNop(),
DialKeepAliveTime: 1 * time.Millisecond,
DialKeepAliveTimeout: 5 * time.Millisecond,
})
if err != nil {
return err
}
defer lc.Close()

mc, err := clientv3.New(clientv3.Config{
Endpoints: []string{member.Config().ClientURL},
Logger: zap.NewNop(),
DialKeepAliveTime: 1 * time.Millisecond,
DialKeepAliveTimeout: 5 * time.Millisecond,
})
if err != nil {
return err
}
defer mc.Close()

proxy := member.PeerProxy()
// Blackholing will cause peers to not be able to use streamWriters registered with member
// but peer traffic is still possible because member has 'pipeline' with peers
// TODO: find a way to stop all traffic
proxy.BlackholeTx()
proxy.BlackholeRx()

for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// Have to refresh revBlackholedMem. It can still increase as member processes changes that are received but not yet applied.
revBlackholedMem, err := latestRevisionForEndpoint(ctx, mc)
if err != nil {
return err
}
revLeader, err := latestRevisionForEndpoint(ctx, lc)
if err != nil {
return err
}
t.Logf("Leader: [%s], Member: [%s], revLeader: %d, revBlackholedMem: %d", leader.Config().Name, member.Config().Name, revLeader, revBlackholedMem)
// Blackholed member has to be sufficiently behind to trigger snapshot transfer.
// Need to make sure leader compacted latest revBlackholedMem inside EtcdServer.snapshot.
// That's why we wait for clus.Cfg.SnapshotCount (to trigger snapshot) + clus.Cfg.SnapshotCatchUpEntries (EtcdServer.snapshot compaction offset)
if revLeader-revBlackholedMem > int64(clus.Cfg.SnapshotCount+clus.Cfg.SnapshotCatchUpEntries) {
break
}
time.Sleep(100 * time.Millisecond)
}

proxy.UnblackholeTx()
proxy.UnblackholeRx()
return nil
}

type randomFailpoint struct {
failpoints []Failpoint
}
Expand Down
13 changes: 13 additions & 0 deletions tests/linearizability/linearizability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,19 @@ func TestLinearizability(t *testing.T) {
e2e.WithSnapshotCount(100),
),
},
// TODO: investigate periodic `Model is not linearizable` failures
// see https://github.com/etcd-io/etcd/pull/15104#issuecomment-1416371288
/*{
name: "Snapshot",
failpoint: RandomSnapshotFailpoint,
traffic: &HighTraffic,
config: *e2e.NewConfig(
e2e.WithGoFailEnabled(true),
e2e.WithSnapshotCount(100),
e2e.WithSnapshotCatchUpEntries(100),
e2e.WithPeerProxy(true),
),
},*/
}...)
for _, scenario := range scenarios {
if scenario.traffic == nil {
Expand Down

0 comments on commit dc51cf1

Please sign in to comment.