Skip to content

Commit

Permalink
etcdserver,wal: fix inconsistencies in WAL and snapshot
Browse files Browse the repository at this point in the history
etcdserver/*, wal/*: changes to snapshots and wal logic
etcdserver/*: changes to snapshots and wal logic to fix #10219
etcdserver/*, wal/*: add Sync method
etcdserver/*, wal/*: find valid snapshots by cross checking snap files and wal snap entries
etcdserver/*, wal/*:Add comments, clean up error messages and tests
etcdserver/*, wal/*: Remove orphaned .snap.db files during Release

Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
  • Loading branch information
Viacheslav Biriukov authored and gyuho committed May 15, 2020
1 parent e048e16 commit 3bf4aed
Show file tree
Hide file tree
Showing 9 changed files with 371 additions and 44 deletions.
61 changes: 53 additions & 8 deletions etcdserver/api/snap/snapshotter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,17 @@ import (
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"time"

"github.com/coreos/pkg/capnslog"
"go.etcd.io/etcd/etcdserver/api/snap/snappb"
pioutil "go.etcd.io/etcd/pkg/ioutil"
"go.etcd.io/etcd/pkg/pbutil"
"go.etcd.io/etcd/raft"
"go.etcd.io/etcd/raft/raftpb"

"github.com/coreos/pkg/capnslog"
"go.etcd.io/etcd/wal/walpb"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -108,21 +109,37 @@ func (s *Snapshotter) save(snapshot *raftpb.Snapshot) error {
return nil
}

// Load returns the newest snapshot.
func (s *Snapshotter) Load() (*raftpb.Snapshot, error) {
return s.loadMatching(func(*raftpb.Snapshot) bool { return true })
}

// LoadNewestAvailable loads the newest snapshot available that is in walSnaps.
func (s *Snapshotter) LoadNewestAvailable(walSnaps []walpb.Snapshot) (*raftpb.Snapshot, error) {
return s.loadMatching(func(snapshot *raftpb.Snapshot) bool {
m := snapshot.Metadata
for i := len(walSnaps) - 1; i >= 0; i-- {
if m.Term == walSnaps[i].Term && m.Index == walSnaps[i].Index {
return true
}
}
return false
})
}

// loadMatching returns the newest snapshot where matchFn returns true.
func (s *Snapshotter) loadMatching(matchFn func(*raftpb.Snapshot) bool) (*raftpb.Snapshot, error) {
names, err := s.snapNames()
if err != nil {
return nil, err
}
var snap *raftpb.Snapshot
for _, name := range names {
if snap, err = loadSnap(s.lg, s.dir, name); err == nil {
break
if snap, err = loadSnap(s.lg, s.dir, name); err == nil && matchFn(snap) {
return snap, nil
}
}
if err != nil {
return nil, ErrNoSnapshot
}
return snap, nil
return nil, ErrNoSnapshot
}

func loadSnap(lg *zap.Logger, dir, name string) (*raftpb.Snapshot, error) {
Expand Down Expand Up @@ -274,3 +291,31 @@ func (s *Snapshotter) cleanupSnapdir(filenames []string) error {
}
return nil
}

func (s *Snapshotter) ReleaseSnapDBs(snap raftpb.Snapshot) error {
dir, err := os.Open(s.dir)
if err != nil {
return err
}
defer dir.Close()
filenames, err := dir.Readdirnames(-1)
if err != nil {
return err
}
for _, filename := range filenames {
if strings.HasSuffix(filename, ".snap.db") {
hexIndex := strings.TrimSuffix(filepath.Base(filename), ".snap.db")
index, err := strconv.ParseUint(hexIndex, 16, 64)
if err != nil {
return fmt.Errorf("failed to parse index from .snap.db filename '%s': %w", filename, err)
}
if index < snap.Metadata.Index {
s.lg.Info("found orphaned .snap.db file; deleting", zap.String("path", filename))
if rmErr := os.Remove(filepath.Join(s.dir, filename)); rmErr != nil && !os.IsNotExist(rmErr) {
return fmt.Errorf("failed to remove orphaned defragmentation file %s: %v", filename, rmErr)
}
}
}
}
return nil
}
87 changes: 82 additions & 5 deletions etcdserver/api/snap/snapshotter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ import (
"reflect"
"testing"

"go.etcd.io/etcd/pkg/fileutil"

"go.etcd.io/etcd/raft/raftpb"
"go.etcd.io/etcd/wal/walpb"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -166,12 +169,47 @@ func TestLoadNewestSnap(t *testing.T) {
t.Fatal(err)
}

g, err := ss.Load()
if err != nil {
t.Errorf("err = %v, want nil", err)
cases := []struct {
name string
availableWalSnaps []walpb.Snapshot
expected *raftpb.Snapshot
}{
{
name: "load-newest",
expected: &newSnap,
},
{
name: "loadnewestavailable-newest",
availableWalSnaps: []walpb.Snapshot{{Index: 0, Term: 0}, {Index: 1, Term: 1}, {Index: 5, Term: 1}},
expected: &newSnap,
},
{
name: "loadnewestavailable-newest-unsorted",
availableWalSnaps: []walpb.Snapshot{{Index: 5, Term: 1}, {Index: 1, Term: 1}, {Index: 0, Term: 0}},
expected: &newSnap,
},
{
name: "loadnewestavailable-previous",
availableWalSnaps: []walpb.Snapshot{{Index: 0, Term: 0}, {Index: 1, Term: 1}},
expected: testSnap,
},
}
if !reflect.DeepEqual(g, &newSnap) {
t.Errorf("snap = %#v, want %#v", g, &newSnap)
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
var err error
var g *raftpb.Snapshot
if tc.availableWalSnaps != nil {
g, err = ss.LoadNewestAvailable(tc.availableWalSnaps)
} else {
g, err = ss.Load()
}
if err != nil {
t.Errorf("err = %v, want nil", err)
}
if !reflect.DeepEqual(g, tc.expected) {
t.Errorf("snap = %#v, want %#v", g, tc.expected)
}
})
}
}

Expand Down Expand Up @@ -229,3 +267,42 @@ func TestAllSnapshotBroken(t *testing.T) {
t.Errorf("err = %v, want %v", err, ErrNoSnapshot)
}
}

func TestReleaseSnapDBs(t *testing.T) {
dir := filepath.Join(os.TempDir(), "snapshot")
err := os.Mkdir(dir, 0700)
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dir)

snapIndices := []uint64{100, 200, 300, 400}
for _, index := range snapIndices {
filename := filepath.Join(dir, fmt.Sprintf("%016x.snap.db", index))
if err := ioutil.WriteFile(filename, []byte("snap file\n"), 0644); err != nil {
t.Fatal(err)
}
}

ss := New(zap.NewExample(), dir)

if err := ss.ReleaseSnapDBs(raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{Index: 300}}); err != nil {
t.Fatal(err)
}

deleted := []uint64{100, 200}
for _, index := range deleted {
filename := filepath.Join(dir, fmt.Sprintf("%016x.snap.db", index))
if fileutil.Exist(filename) {
t.Errorf("expected %s (index: %d) to be deleted, but it still exists", filename, index)
}
}

retained := []uint64{300, 400}
for _, index := range retained {
filename := filepath.Join(dir, fmt.Sprintf("%016x.snap.db", index))
if !fileutil.Exist(filename) {
t.Errorf("expected %s (index: %d) to be retained, but it no longer exists", filename, index)
}
}
}
39 changes: 33 additions & 6 deletions etcdserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,12 +231,26 @@ func (r *raftNode) start(rh *raftReadyHandler) {
r.transport.Send(r.processMessages(rd.Messages))
}

// Must save the snapshot file and WAL snapshot entry before saving any other entries or hardstate to
// ensure that recovery after a snapshot restore is possible.
if !raft.IsEmptySnap(rd.Snapshot) {
// gofail: var raftBeforeSaveSnap struct{}
if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
if r.lg != nil {
r.lg.Fatal("failed to save Raft snapshot", zap.Error(err))
} else {
plog.Fatalf("failed to save Raft snapshot %v", err)
}
}
// gofail: var raftAfterSaveSnap struct{}
}

// gofail: var raftBeforeSave struct{}
if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
if r.lg != nil {
r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err))
} else {
plog.Fatalf("raft save state and entries error: %v", err)
plog.Fatalf("failed to save state and entries error: %v", err)
}
}
if !raft.IsEmptyHardState(rd.HardState) {
Expand All @@ -245,25 +259,38 @@ func (r *raftNode) start(rh *raftReadyHandler) {
// gofail: var raftAfterSave struct{}

if !raft.IsEmptySnap(rd.Snapshot) {
// gofail: var raftBeforeSaveSnap struct{}
if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
// Force WAL to fsync its hard state before Release() releases
// old data from the WAL. Otherwise could get an error like:
// panic: tocommit(107) is out of range [lastIndex(84)]. Was the raft log corrupted, truncated, or lost?
// See https://github.com/etcd-io/etcd/issues/10219 for more details.
if err := r.storage.Sync(); err != nil {
if r.lg != nil {
r.lg.Fatal("failed to save Raft snapshot", zap.Error(err))
r.lg.Fatal("failed to sync Raft snapshot", zap.Error(err))
} else {
plog.Fatalf("raft save snapshot error: %v", err)
plog.Fatalf("failed to sync Raft snapshot %v", err)
}
}

// etcdserver now claim the snapshot has been persisted onto the disk
notifyc <- struct{}{}

// gofail: var raftAfterSaveSnap struct{}
// gofail: var raftBeforeApplySnap struct{}
r.raftStorage.ApplySnapshot(rd.Snapshot)
if r.lg != nil {
r.lg.Info("applied incoming Raft snapshot", zap.Uint64("snapshot-index", rd.Snapshot.Metadata.Index))
} else {
plog.Infof("raft applied incoming snapshot at index %d", rd.Snapshot.Metadata.Index)
}
// gofail: var raftAfterApplySnap struct{}

if err := r.storage.Release(rd.Snapshot); err != nil {
if r.lg != nil {
r.lg.Fatal("failed to release Raft wal", zap.Error(err))
} else {
plog.Fatalf("failed to release Raft wal %v", err)
}
}
// gofail: var raftAfterWALRelease struct{}
}

r.raftStorage.Append(rd.Entries)
Expand Down
30 changes: 22 additions & 8 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ import (
"sync/atomic"
"time"

"github.com/coreos/go-semver/semver"
"github.com/coreos/pkg/capnslog"
humanize "github.com/dustin/go-humanize"
"github.com/prometheus/client_golang/prometheus"
"go.etcd.io/etcd/auth"
"go.etcd.io/etcd/etcdserver/api"
"go.etcd.io/etcd/etcdserver/api/membership"
Expand Down Expand Up @@ -57,11 +61,6 @@ import (
"go.etcd.io/etcd/raft/raftpb"
"go.etcd.io/etcd/version"
"go.etcd.io/etcd/wal"

"github.com/coreos/go-semver/semver"
"github.com/coreos/pkg/capnslog"
humanize "github.com/dustin/go-humanize"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -426,10 +425,19 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
plog.Warningf("discovery token ignored since a cluster has already been initialized. Valid log found at %q", cfg.WALDir())
}
}
snapshot, err = ss.Load()

// Find a snapshot to start/restart a raft node
walSnaps, err := wal.ValidSnapshotEntries(cfg.Logger, cfg.WALDir())
if err != nil {
return nil, err
}
// snapshot files can be orphaned if etcd crashes after writing them but before writing the corresponding
// wal log entries
snapshot, err := ss.LoadNewestAvailable(walSnaps)
if err != nil && err != snap.ErrNoSnapshot {
return nil, err
}

if snapshot != nil {
if err = st.Recovery(snapshot.Data); err != nil {
if cfg.Logger != nil {
Expand Down Expand Up @@ -2370,8 +2378,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
plog.Panicf("unexpected create snapshot error %v", err)
}
}
// SaveSnap saves the snapshot and releases the locked wal files
// to the snapshot index.
// SaveSnap saves the snapshot to file and appends the corresponding WAL entry.
if err = s.r.storage.SaveSnap(snap); err != nil {
if lg != nil {
lg.Panic("failed to save snapshot", zap.Error(err))
Expand All @@ -2387,6 +2394,13 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
} else {
plog.Infof("saved snapshot at index %d", snap.Metadata.Index)
}
if err = s.r.storage.Release(snap); err != nil {
if lg != nil {
lg.Panic("failed to release wal", zap.Error(err))
} else {
plog.Panicf("failed to release wal %v", err)
}
}

// When sending a snapshot, etcd will pause compaction.
// After receives a snapshot, the slow follower needs to get all the entries right after
Expand Down
Loading

0 comments on commit 3bf4aed

Please sign in to comment.