Skip to content

Commit

Permalink
tests linearizability: reproduce and prevent 14571
Browse files Browse the repository at this point in the history
Signed-off-by: Chao Chen <chaochn@amazon.com>
  • Loading branch information
chaochn47 committed Jan 19, 2023
1 parent 7568751 commit 635d4b3
Show file tree
Hide file tree
Showing 10 changed files with 270 additions and 71 deletions.
2 changes: 1 addition & 1 deletion bill-of-materials.json
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@
]
},
{
"project": "github.com/stretchr/testify/assert",
"project": "github.com/stretchr/testify",
"licenses": [
{
"type": "MIT License",
Expand Down
10 changes: 5 additions & 5 deletions tests/framework/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -875,16 +875,16 @@ func findMemberIDByEndpoint(members []*etcdserverpb.Member, endpoint string) (ui

// WaitLeader returns index of the member in c.Members() that is leader
// or fails the test (if not established in 30s).
func (epc *EtcdProcessCluster) WaitLeader(t testing.TB) int {
func (epc *EtcdProcessCluster) WaitLeader(t testing.TB, opts ...config.ClientOption) int {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
return epc.WaitMembersForLeader(ctx, t, epc.Procs)
return epc.WaitMembersForLeader(ctx, t, epc.Procs, opts...)
}

// WaitMembersForLeader waits until given members agree on the same leader,
// and returns its 'index' in the 'membs' list
func (epc *EtcdProcessCluster) WaitMembersForLeader(ctx context.Context, t testing.TB, membs []EtcdProcess) int {
cc := epc.Client()
func (epc *EtcdProcessCluster) WaitMembersForLeader(ctx context.Context, t testing.TB, membs []EtcdProcess, opts ...config.ClientOption) int {
cc := epc.Client(opts...)

// ensure leader is up via linearizable get
for {
Expand All @@ -908,7 +908,7 @@ func (epc *EtcdProcessCluster) WaitMembersForLeader(ctx context.Context, t testi
default:
}
for i := range membs {
resp, err := membs[i].Client().Status(ctx)
resp, err := membs[i].Client(opts...).Status(ctx)
if err != nil {
if strings.Contains(err.Error(), "connection refused") {
// if member[i] has stopped
Expand Down
12 changes: 6 additions & 6 deletions tests/framework/integration/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,18 +405,18 @@ func (c *Cluster) WaitMembersMatch(t testutil.TB, membs []*pb.Member) {

// WaitLeader returns index of the member in c.Members that is leader
// or fails the test (if not established in 30s).
func (c *Cluster) WaitLeader(t testing.TB) int {
return c.WaitMembersForLeader(t, c.Members)
func (c *Cluster) WaitLeader(t testing.TB, opts ...framecfg.ClientOption) int {
return c.WaitMembersForLeader(t, c.Members, opts...)
}

// WaitMembersForLeader waits until given members agree on the same leader,
// and returns its 'index' in the 'membs' list
func (c *Cluster) WaitMembersForLeader(t testing.TB, membs []*Member) int {
func (c *Cluster) WaitMembersForLeader(t testing.TB, membs []*Member, opts ...framecfg.ClientOption) int {
t.Logf("WaitMembersForLeader")
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
l := 0
for l = c.waitMembersForLeader(ctx, t, membs); l < 0; {
for l = c.waitMembersForLeader(ctx, t, membs, opts...); l < 0; {
if ctx.Err() != nil {
t.Fatalf("WaitLeader FAILED: %v", ctx.Err())
}
Expand All @@ -437,13 +437,13 @@ func (c *Cluster) WaitMembersForLeader(t testing.TB, membs []*Member) int {

// WaitMembersForLeader waits until given members agree on the same leader,
// and returns its 'index' in the 'membs' list
func (c *Cluster) waitMembersForLeader(ctx context.Context, t testing.TB, membs []*Member) int {
func (c *Cluster) waitMembersForLeader(ctx context.Context, t testing.TB, membs []*Member, opts ...framecfg.ClientOption) int {
possibleLead := make(map[uint64]bool)
var lead uint64
for _, m := range membs {
possibleLead[uint64(m.Server.MemberId())] = true
}
cc, err := c.ClusterClient(t)
cc, err := c.ClusterClient(t, opts...)
if err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion tests/framework/interfaces/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type TestRunner interface {
type Cluster interface {
Members() []Member
Client(opts ...config.ClientOption) (Client, error)
WaitLeader(t testing.TB) int
WaitLeader(t testing.TB, opts ...config.ClientOption) int
Close() error
Endpoints() []string
}
Expand Down
19 changes: 16 additions & 3 deletions tests/linearizability/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (

"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/framework/config"
"go.etcd.io/etcd/tests/v3/framework/integration"
"go.etcd.io/etcd/tests/v3/linearizability/identity"
"go.etcd.io/etcd/tests/v3/linearizability/model"
)
Expand All @@ -31,13 +33,17 @@ type recordingClient struct {
history *model.AppendableHistory
}

func NewClient(endpoints []string, ids identity.Provider) (*recordingClient, error) {
cc, err := clientv3.New(clientv3.Config{
func NewClient(endpoints []string, ids identity.Provider, opts ...config.ClientOption) (*recordingClient, error) {
cfg := &clientv3.Config{
Endpoints: endpoints,
Logger: zap.NewNop(),
DialKeepAliveTime: 1 * time.Millisecond,
DialKeepAliveTimeout: 5 * time.Millisecond,
})
}
for _, opt := range opts {
opt(cfg)
}
cc, err := clientv3.New(*cfg)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -125,3 +131,10 @@ func (c *recordingClient) PutWithLease(ctx context.Context, key string, value st
c.history.AppendPutWithLease(key, value, int64(leaseId), callTime, returnTime, resp, err)
return err
}

func clientOption(authEnabled bool) config.ClientOption {
if !authEnabled {
return func(any) {}
}
return integration.WithAuth(rootUserName, rootUserPassword)
}
93 changes: 75 additions & 18 deletions tests/linearizability/failpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"
"go.uber.org/zap"

"go.etcd.io/etcd/tests/v3/framework/config"

clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/framework/e2e"
)
Expand All @@ -33,7 +36,8 @@ const (
)

var (
KillFailpoint Failpoint = killFailpoint{}
KillFailpoint Failpoint = killFailpoint{target: AnyMember}
EnableAuthKillFailpoint Failpoint = killFailpoint{enableAuth: true, target: Follower}
DefragBeforeCopyPanic Failpoint = goPanicFailpoint{"defragBeforeCopy", triggerDefrag, AnyMember}
DefragBeforeRenamePanic Failpoint = goPanicFailpoint{"defragBeforeRename", triggerDefrag, AnyMember}
BeforeCommitPanic Failpoint = goPanicFailpoint{"beforeCommit", nil, AnyMember}
Expand Down Expand Up @@ -78,15 +82,22 @@ var (
)

type Failpoint interface {
Trigger(t *testing.T, ctx context.Context, clus *e2e.EtcdProcessCluster) error
Trigger(t *testing.T, ctx context.Context, clus *e2e.EtcdProcessCluster, lg *zap.Logger) error
Name() string
Available(e2e.EtcdProcess) bool
}

type killFailpoint struct{}
type killFailpoint struct {
enableAuth bool
target failpointTarget
}

func (f killFailpoint) Trigger(t *testing.T, ctx context.Context, clus *e2e.EtcdProcessCluster) error {
member := clus.Procs[rand.Int()%len(clus.Procs)]
func (f killFailpoint) Trigger(t *testing.T, ctx context.Context, clus *e2e.EtcdProcessCluster, lg *zap.Logger) error {
opt := func(any) {}
if f.enableAuth {
opt = e2e.WithAuth(rootUserName, rootUserPassword)
}
member := pickMember(f.target, t, clus, opt)

killCtx, cancel := context.WithTimeout(ctx, triggerTimeout)
defer cancel()
Expand All @@ -101,11 +112,19 @@ func (f killFailpoint) Trigger(t *testing.T, ctx context.Context, clus *e2e.Etcd
}
}

err := member.Start(ctx)
if err != nil {
return err
// get endpoints excluding the killed member client URLs
endpoints := make([]string, 0, len(clus.EndpointsV3()))
for _, ed := range clus.EndpointsV3() {
if ed != member.EndpointsV3()[0] {
endpoints = append(endpoints, ed)
}
}
return nil

if f.enableAuth {
require.NoError(t, addTestUserAuth(ctx, endpoints))
}

return member.Start(ctx)
}

func (f killFailpoint) Name() string {
Expand All @@ -116,6 +135,38 @@ func (f killFailpoint) Available(e2e.EtcdProcess) bool {
return true
}

func addTestUserAuth(ctx context.Context, endpoints []string) (err error) {
cc, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
Logger: zap.NewNop(),
DialKeepAliveTime: 1 * time.Millisecond,
DialKeepAliveTimeout: 5 * time.Millisecond,
Username: rootUserName,
Password: rootUserPassword,
})
defer func() {
if cc != nil {
cc.Close()
}
}()
if err != nil {
return err
}
if _, err := cc.UserAdd(ctx, testUserName, testUserPassword); err != nil {
return err
}
if _, err := cc.RoleAdd(ctx, testRoleName); err != nil {
return err
}
if _, err := cc.UserGrantRole(ctx, testUserName, testRoleName); err != nil {
return err
}
if _, err := cc.RoleGrantPermission(ctx, testRoleName, startKey, endKey, clientv3.PermissionType(clientv3.PermReadWrite)); err != nil {
return err
}
return nil
}

type goPanicFailpoint struct {
failpoint string
trigger func(ctx context.Context, member e2e.EtcdProcess) error
Expand All @@ -127,10 +178,11 @@ type failpointTarget string
const (
AnyMember failpointTarget = "AnyMember"
Leader failpointTarget = "Leader"
Follower failpointTarget = "Follower"
)

func (f goPanicFailpoint) Trigger(t *testing.T, ctx context.Context, clus *e2e.EtcdProcessCluster) error {
member := f.pickMember(t, clus)
func (f goPanicFailpoint) Trigger(t *testing.T, ctx context.Context, clus *e2e.EtcdProcessCluster, lg *zap.Logger) error {
member := pickMember(f.target, t, clus)

triggerCtx, cancel := context.WithTimeout(ctx, triggerTimeout)
defer cancel()
Expand Down Expand Up @@ -159,12 +211,17 @@ func (f goPanicFailpoint) Trigger(t *testing.T, ctx context.Context, clus *e2e.E
return nil
}

func (f goPanicFailpoint) pickMember(t *testing.T, clus *e2e.EtcdProcessCluster) e2e.EtcdProcess {
switch f.target {
func pickMember(target failpointTarget, t *testing.T, clus *e2e.EtcdProcessCluster, opts ...config.ClientOption) e2e.EtcdProcess {
switch target {
case AnyMember:
return clus.Procs[rand.Int()%len(clus.Procs)]
case Leader:
return clus.Procs[clus.WaitLeader(t)]
return clus.Procs[clus.WaitLeader(t, opts...)]
case Follower:
if len(clus.Procs) == 1 {
panic("single node cluster does not have follower")
}
return clus.Procs[(clus.WaitLeader(t, opts...)+1)%len(clus.Procs)]
default:
panic("unknown target")
}
Expand Down Expand Up @@ -228,7 +285,7 @@ type randomFailpoint struct {
failpoints []Failpoint
}

func (f randomFailpoint) Trigger(t *testing.T, ctx context.Context, clus *e2e.EtcdProcessCluster) error {
func (f randomFailpoint) Trigger(t *testing.T, ctx context.Context, clus *e2e.EtcdProcessCluster, lg *zap.Logger) error {
availableFailpoints := make([]Failpoint, 0, len(f.failpoints))
for _, failpoint := range f.failpoints {
count := 0
Expand All @@ -243,7 +300,7 @@ func (f randomFailpoint) Trigger(t *testing.T, ctx context.Context, clus *e2e.Et
}
failpoint := availableFailpoints[rand.Int()%len(availableFailpoints)]
t.Logf("Triggering %v failpoint\n", failpoint.Name())
return failpoint.Trigger(t, ctx, clus)
return failpoint.Trigger(t, ctx, clus, lg)
}

func (f randomFailpoint) Name() string {
Expand All @@ -258,7 +315,7 @@ type blackholePeerNetworkFailpoint struct {
duration time.Duration
}

func (f blackholePeerNetworkFailpoint) Trigger(t *testing.T, ctx context.Context, clus *e2e.EtcdProcessCluster) error {
func (f blackholePeerNetworkFailpoint) Trigger(t *testing.T, ctx context.Context, clus *e2e.EtcdProcessCluster, lg *zap.Logger) error {
member := clus.Procs[rand.Int()%len(clus.Procs)]
proxy := member.PeerProxy()

Expand Down Expand Up @@ -286,7 +343,7 @@ type delayPeerNetworkFailpoint struct {
randomizedLatency time.Duration
}

func (f delayPeerNetworkFailpoint) Trigger(t *testing.T, ctx context.Context, clus *e2e.EtcdProcessCluster) error {
func (f delayPeerNetworkFailpoint) Trigger(t *testing.T, ctx context.Context, clus *e2e.EtcdProcessCluster, lg *zap.Logger) error {
member := clus.Procs[rand.Int()%len(clus.Procs)]
proxy := member.PeerProxy()

Expand Down
Loading

0 comments on commit 635d4b3

Please sign in to comment.