Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etcdserver/*, wal/*: changes to snapshots and WAL logic to fix #10219 #10356

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions etcdserver/api/snap/snapshotter.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ var (
plog = capnslog.NewPackageLogger("go.etcd.io/etcd", "snap")

ErrNoSnapshot = errors.New("snap: no available snapshot")
ErrSnapshotIndex = errors.New("snap: no available snapshot index")
ErrEmptySnapshot = errors.New("snap: empty snapshot")
ErrCRCMismatch = errors.New("snap: crc mismatch")
crcTable = crc32.MakeTable(crc32.Castagnoli)
Expand Down Expand Up @@ -125,6 +126,23 @@ func (s *Snapshotter) Load() (*raftpb.Snapshot, error) {
return snap, nil
}

func (s *Snapshotter) LoadIndex(i uint64) (*raftpb.Snapshot, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's document function.

names, err := s.snapNames()
if err != nil {
return nil, err
}

if len(names) == 0 {
return nil, ErrNoSnapshot
}

if i >= uint64(len(names)) {
return nil, ErrSnapshotIndex
}

return loadSnap(s.lg, s.dir, names[i])
}

func loadSnap(lg *zap.Logger, dir, name string) (*raftpb.Snapshot, error) {
fpath := filepath.Join(dir, name)
snap, err := Read(lg, fpath)
Expand Down
35 changes: 26 additions & 9 deletions etcdserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,18 @@ func (r *raftNode) start(rh *raftReadyHandler) {
r.transport.Send(r.processMessages(rd.Messages))
}

if !raft.IsEmptySnap(rd.Snapshot) {
// gofail: var raftBeforeSaveSnap struct{}
if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
if r.lg != nil {
r.lg.Fatal("failed to save Raft snapshot", zap.Error(err))
} else {
plog.Fatalf("raft save snapshot error: %v", err)
}
}
// gofail: var raftAfterSaveSnap struct{}
}

// gofail: var raftBeforeSave struct{}
if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
if r.lg != nil {
Expand All @@ -252,25 +264,30 @@ func (r *raftNode) start(rh *raftReadyHandler) {
// gofail: var raftAfterSave struct{}

if !raft.IsEmptySnap(rd.Snapshot) {
// gofail: var raftBeforeSaveSnap struct{}
if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
if r.lg != nil {
r.lg.Fatal("failed to save Raft snapshot", zap.Error(err))
} else {
plog.Fatalf("raft save snapshot error: %v", err)
}
// Force WAL to fsync its hard state before Release() releases
// old data from the WAL. Otherwise could get an error like:
// panic: tocommit(107) is out of range [lastIndex(84)]. Was the raft log corrupted, truncated, or lost?
if err := r.storage.Sync(); err != nil {
log.Fatal(err)
}

// etcdserver now claim the snapshot has been persisted onto the disk
notifyc <- struct{}{}

// gofail: var raftAfterSaveSnap struct{}
// gofail: var raftBeforeApplySnapshot struct{}
r.raftStorage.ApplySnapshot(rd.Snapshot)

if r.lg != nil {
r.lg.Info("applied incoming Raft snapshot", zap.Uint64("snapshot-index", rd.Snapshot.Metadata.Index))
} else {
plog.Infof("raft applied incoming snapshot at index %d", rd.Snapshot.Metadata.Index)
}
// gofail: var raftAfterApplySnap struct{}
// gofail: var raftAfterApplySnapshot struct{}

if err := r.storage.Release(rd.Snapshot); err != nil {
log.Fatal(err)
}
// gofail: var raftAfterWALRelease struct{}
}

r.raftStorage.Append(rd.Entries)
Expand Down
37 changes: 34 additions & 3 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,10 +418,32 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
plog.Warningf("discovery token ignored since a cluster has already been initialized. Valid log found at %q", cfg.WALDir())
}
}
snapshot, err = ss.Load()
if err != nil && err != snap.ErrNoSnapshot {
return nil, err

// Find a snapshot to start/restart a raft node
for i := uint64(0); ; i++ {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be extracted out into a separate function?

snapshot, err = ss.LoadIndex(i)
if err != nil && err != snap.ErrNoSnapshot {
return nil, err
}

if err == snap.ErrNoSnapshot {
break
}

if checkWALSnap(cfg.Logger, cfg.WALDir(), snapshot) {
break
}

if cfg.Logger != nil {
cfg.Logger.Info(
"skip snapshot",
zap.Uint64("index", i),
)
} else {
plog.Infof("skip snapshot: `%d`", i)
}
}

if snapshot != nil {
if err = st.Recovery(snapshot.Data); err != nil {
if cfg.Logger != nil {
Expand Down Expand Up @@ -2161,6 +2183,15 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
plog.Fatalf("save snapshot error: %v", err)
}
}

if err = s.r.storage.Release(snap); err != nil {
if lg != nil {
lg.Panic("failed to release wal", zap.Error(err))
} else {
plog.Fatalf("failed to release wal error: %v", err)
}
}

if lg != nil {
lg.Info(
"saved snapshot",
Expand Down
42 changes: 32 additions & 10 deletions etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -969,15 +969,19 @@ func TestSnapshot(t *testing.T) {
ch := make(chan struct{}, 2)

go func() {
gaction, _ := p.Wait(1)
gaction, _ := p.Wait(2)
defer func() { ch <- struct{}{} }()

if len(gaction) != 1 {
t.Fatalf("len(action) = %d, want 1", len(gaction))
if len(gaction) != 2 {
t.Fatalf("len(action) = %d, want 2", len(gaction))
}
if !reflect.DeepEqual(gaction[0], testutil.Action{Name: "SaveSnap"}) {
t.Errorf("action = %s, want SaveSnap", gaction[0])
}

if !reflect.DeepEqual(gaction[1], testutil.Action{Name: "Release"}) {
t.Errorf("action = %s, want Release", gaction[1])
}
}()

go func() {
Expand Down Expand Up @@ -1060,20 +1064,32 @@ func TestSnapshotOrdering(t *testing.T) {
n.readyc <- raft.Ready{Snapshot: snapMsg.Snapshot}
}()

if ac := <-p.Chan(); ac.Name != "Save" {
ac := <-p.Chan()
if ac.Name != "Save" {
t.Fatalf("expected Save, got %+v", ac)
}

if ac := <-p.Chan(); ac.Name != "SaveSnap" {
t.Fatalf("expected SaveSnap, got %+v", ac)
}

if ac := <-p.Chan(); ac.Name != "Save" {
t.Fatalf("expected Save, got %+v", ac)
}

// confirm snapshot file still present before calling SaveSnap
snapPath := filepath.Join(snapdir, fmt.Sprintf("%016x.snap.db", 1))
if !fileutil.Exist(snapPath) {
t.Fatalf("expected file %q, got missing", snapPath)
}

// unblock SaveSnapshot, etcdserver now permitted to move snapshot file
if ac := <-p.Chan(); ac.Name != "SaveSnap" {
t.Fatalf("expected SaveSnap, got %+v", ac)
if ac := <-p.Chan(); ac.Name != "Sync" {
t.Fatalf("expected Sync, got %+v", ac)
}

if ac := <-p.Chan(); ac.Name != "Release" {
t.Fatalf("expected Release, got %+v", ac)
}
}

Expand Down Expand Up @@ -1112,16 +1128,22 @@ func TestTriggerSnap(t *testing.T) {

donec := make(chan struct{})
go func() {
wcnt := 2 + snapc
wcnt := 3 + snapc
gaction, _ := p.Wait(wcnt)

// each operation is recorded as a Save
// (SnapshotCount+1) * Puts + SaveSnap = (SnapshotCount+1) * Save + SaveSnap
// (SnapshotCount+1) * Puts + SaveSnap = (SnapshotCount+1) * Save + SaveSnap + Release
if len(gaction) != wcnt {
fmt.Println("gaction", gaction)
t.Fatalf("len(action) = %d, want %d", len(gaction), wcnt)
}
if !reflect.DeepEqual(gaction[wcnt-1], testutil.Action{Name: "SaveSnap"}) {
t.Errorf("action = %s, want SaveSnap", gaction[wcnt-1])

if !reflect.DeepEqual(gaction[wcnt-2], testutil.Action{Name: "SaveSnap"}) {
t.Errorf("action = %s, want SaveSnap", gaction[wcnt-2])
}

if !reflect.DeepEqual(gaction[wcnt-1], testutil.Action{Name: "Release"}) {
t.Errorf("action = %s, want Release", gaction[wcnt-1])
}
close(donec)
}()
Expand Down
47 changes: 43 additions & 4 deletions etcdserver/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ type Storage interface {
SaveSnap(snap raftpb.Snapshot) error
// Close closes the Storage and performs finalization.
Close() error
// Release release release the locked wal files since they will not be used.
Release(snap raftpb.Snapshot) error
// Sync WAL
Sync() error
}

type storage struct {
Expand All @@ -58,13 +62,48 @@ func (st *storage) SaveSnap(snap raftpb.Snapshot) error {
if err != nil {
return err
}
err = st.Snapshotter.SaveSnap(snap)
if err != nil {
return err
}

return st.Snapshotter.SaveSnap(snap)
}

func (st *storage) Release(snap raftpb.Snapshot) error {
return st.WAL.ReleaseLockTo(snap.Metadata.Index)
}

func checkWALSnap(lg *zap.Logger, waldir string, snapshot *raftpb.Snapshot) bool {
if snapshot == nil {
if lg != nil {
lg.Fatal("checkWALSnap: snapshot is empty")
} else {
plog.Fatal("heckWALSnap: snapshot is empty")
}
}

walsnap := walpb.Snapshot{
Index: snapshot.Metadata.Index,
Term: snapshot.Metadata.Term,
}

w, _, _, st, _ := readWAL(lg, waldir, walsnap)
defer w.Close()

if lg != nil {
lg.Info(
"checkWALSnap: snapshot and hardstate data",
zap.Uint64("snapshot-index", snapshot.Metadata.Index),
zap.Uint64("st-commit", st.Commit),
)
} else {
plog.Infof("checkWALSnap: snapshot index: `%d`, HardState Commit: `%d`", snapshot.Metadata.Index, st.Commit)
}

if snapshot.Metadata.Index > st.Commit {
return false
}

return true
}

func readWAL(lg *zap.Logger, waldir string, snap walpb.Snapshot) (w *wal.WAL, id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry) {
var (
err error
Expand Down
12 changes: 12 additions & 0 deletions pkg/mock/mockstorage/storage_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,16 @@ func (p *storageRecorder) SaveSnap(st raftpb.Snapshot) error {
return nil
}

func (p *storageRecorder) Release(st raftpb.Snapshot) error {
if !raft.IsEmptySnap(st) {
p.Record(testutil.Action{Name: "Release"})
}
return nil
}

func (p *storageRecorder) Sync() error {
p.Record(testutil.Action{Name: "Sync"})
return nil
}

func (p *storageRecorder) Close() error { return nil }
4 changes: 4 additions & 0 deletions wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,10 @@ func (w *WAL) sync() error {
return err
}

func (w *WAL) Sync() error {
return w.sync()
}

// ReleaseLockTo releases the locks, which has smaller index than the given index
// except the largest one among them.
// For example, if WAL is holding lock 1,2,3,4,5,6, ReleaseLockTo(4) will release
Expand Down