From b3443fc6d698b8ce4f73a796d84af175c13a2a2d Mon Sep 17 00:00:00 2001 From: Viacheslav Biriukov Date: Thu, 27 Dec 2018 15:38:38 +0000 Subject: [PATCH 1/5] etcdserver/*, wal/*: changes to snapshots and wal logic to fix #10219 --- build | 2 +- etcdserver/api/snap/snapshotter.go | 18 ++++++++ etcdserver/raft.go | 36 ++++++++++------ etcdserver/server.go | 37 ++++++++++++++--- etcdserver/storage.go | 51 +++++++++++++++++++++++ wal/wal.go | 66 ++++++++++++++++++++++++++++++ 6 files changed, 191 insertions(+), 19 deletions(-) diff --git a/build b/build index 2d2ee5cf16c..565d0732a73 100755 --- a/build +++ b/build @@ -16,7 +16,7 @@ GO_LDFLAGS="$GO_LDFLAGS -X ${REPO_PATH}/version.GitSHA=${GIT_SHA}" toggle_failpoints() { mode="$1" if command -v gofail >/dev/null 2>&1; then - gofail "$mode" etcdserver/ mvcc/backend/ + gofail "$mode" etcdserver/ mvcc/backend/ wal/ elif [[ "$mode" != "disable" ]]; then echo "FAILPOINTS set but gofail not found" exit 1 diff --git a/etcdserver/api/snap/snapshotter.go b/etcdserver/api/snap/snapshotter.go index feb420b1401..7c13cc141d5 100644 --- a/etcdserver/api/snap/snapshotter.go +++ b/etcdserver/api/snap/snapshotter.go @@ -41,6 +41,7 @@ var ( plog = capnslog.NewPackageLogger("go.etcd.io/etcd", "snap") ErrNoSnapshot = errors.New("snap: no available snapshot") + ErrSnapshotIndex = errors.New("snap: no available snapshot index") ErrEmptySnapshot = errors.New("snap: empty snapshot") ErrCRCMismatch = errors.New("snap: crc mismatch") crcTable = crc32.MakeTable(crc32.Castagnoli) @@ -125,6 +126,23 @@ func (s *Snapshotter) Load() (*raftpb.Snapshot, error) { return snap, nil } +func (s *Snapshotter) LoadIndex(i uint64) (*raftpb.Snapshot, error) { + names, err := s.snapNames() + if err != nil { + return nil, err + } + + if len(names) == 0 { + return nil, ErrNoSnapshot + } + + if i >= uint64(len(names)) { + return nil, ErrSnapshotIndex + } + + return loadSnap(s.lg, s.dir, names[i]) +} + func loadSnap(lg *zap.Logger, dir, name string) (*raftpb.Snapshot, error) { fpath := filepath.Join(dir, name) snap, err := Read(lg, fpath) diff --git a/etcdserver/raft.go b/etcdserver/raft.go index 0285a12a6d5..396a72b6952 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -238,8 +238,20 @@ func (r *raftNode) start(rh *raftReadyHandler) { r.transport.Send(r.processMessages(rd.Messages)) } - // gofail: var raftBeforeSave struct{} - if err := r.storage.Save(rd.HardState, rd.Entries); err != nil { + if !raft.IsEmptySnap(rd.Snapshot) { + // gofail: var raftBeforeSaveSnap struct{} + if err := r.storage.SaveSnapshot(rd.Snapshot); err != nil { + if r.lg != nil { + r.lg.Fatal("failed to save Raft snapshot", zap.Error(err)) + } else { + plog.Fatalf("raft save snapshot error: %v", err) + } + } + // gofail: var raftAfterSaveSnap struct{} + } + + // gofail: var raftBeforeSaveAll struct{} + if err := r.storage.SaveAll(rd.HardState, rd.Entries, rd.Snapshot); err != nil { if r.lg != nil { r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err)) } else { @@ -249,28 +261,26 @@ func (r *raftNode) start(rh *raftReadyHandler) { if !raft.IsEmptyHardState(rd.HardState) { proposalsCommitted.Set(float64(rd.HardState.Commit)) } - // gofail: var raftAfterSave struct{} + // gofail: var raftAfterSaveAll struct{} if !raft.IsEmptySnap(rd.Snapshot) { - // gofail: var raftBeforeSaveSnap struct{} - if err := r.storage.SaveSnap(rd.Snapshot); err != nil { - if r.lg != nil { - r.lg.Fatal("failed to save Raft snapshot", zap.Error(err)) - } else { - plog.Fatalf("raft save snapshot error: %v", err) - } - } // etcdserver now claim the snapshot has been persisted onto the disk notifyc <- struct{}{} - // gofail: var raftAfterSaveSnap struct{} + // gofail: var raftBeforeApplySnapshot struct{} r.raftStorage.ApplySnapshot(rd.Snapshot) + if r.lg != nil { r.lg.Info("applied incoming Raft snapshot", zap.Uint64("snapshot-index", rd.Snapshot.Metadata.Index)) } else { plog.Infof("raft applied incoming snapshot at index %d", rd.Snapshot.Metadata.Index) } - // gofail: var raftAfterApplySnap struct{} + // gofail: var raftAfterApplySnapshot struct{} + + if err := r.storage.Release(rd.Snapshot); err != nil { + log.Fatal(err) + } + // gofail: var raftAfterWALRelease struct{} } r.raftStorage.Append(rd.Entries) diff --git a/etcdserver/server.go b/etcdserver/server.go index 5a97b8341ef..5af566b06f0 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -65,14 +65,14 @@ import ( ) const ( - DefaultSnapshotCount = 100000 + DefaultSnapshotCount = 10 // DefaultSnapshotCatchUpEntries is the number of entries for a slow follower // to catch-up after compacting the raft storage entries. // We expect the follower has a millisecond level latency with the leader. // The max throughput is around 10K. Keep a 5K entries is enough for helping // follower to catch up. - DefaultSnapshotCatchUpEntries uint64 = 5000 + DefaultSnapshotCatchUpEntries uint64 = 10 StoreClusterPrefix = "/0" StoreKeysPrefix = "/1" @@ -418,10 +418,37 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { plog.Warningf("discovery token ignored since a cluster has already been initialized. Valid log found at %q", cfg.WALDir()) } } - snapshot, err = ss.Load() - if err != nil && err != snap.ErrNoSnapshot { - return nil, err + + // Find a snapshot to start/restart a raft node + var ( + snapshot *raftpb.Snapshot + err error + ) + + for i := uint64(0); ; i++ { + snapshot, err = ss.LoadIndex(i) + if err != nil && err != snap.ErrNoSnapshot { + return nil, err + } + + if err == snap.ErrNoSnapshot { + break + } + + if checkWALSnap(cfg.Logger, cfg.WALDir(), snapshot) { + break + } + + if cfg.Logger != nil { + cfg.Logger.Info( + "skip snapshot", + zap.Uint64("index", i), + ) + } else { + plog.Infof("skip snapshot: `%d`", i) + } } + if snapshot != nil { if err = st.Recovery(snapshot.Data); err != nil { if cfg.Logger != nil { diff --git a/etcdserver/storage.go b/etcdserver/storage.go index d57b6f9a58d..401e162a4c6 100644 --- a/etcdserver/storage.go +++ b/etcdserver/storage.go @@ -36,6 +36,14 @@ type Storage interface { SaveSnap(snap raftpb.Snapshot) error // Close closes the Storage and performs finalization. Close() error + + // SaveSnapshot function saves only snapshot to the underlying stable storage. + SaveSnapshot(snap raftpb.Snapshot) error + // SaveAll function saves ents, snapshot and state to the underlying stable storage. + // SaveAll MUST block until st and ents are on stable storage. + SaveAll(st raftpb.HardState, ents []raftpb.Entry, snap raftpb.Snapshot) error + // Release release release the locked wal files since they will not be used. + Release(snap raftpb.Snapshot) error } type storage struct { @@ -65,6 +73,49 @@ func (st *storage) SaveSnap(snap raftpb.Snapshot) error { return st.WAL.ReleaseLockTo(snap.Metadata.Index) } +// SaveSnapshot saves the snapshot to disk. +func (st *storage) SaveSnapshot(snap raftpb.Snapshot) error { + return st.Snapshotter.SaveSnap(snap) +} + +func (st *storage) Release(snap raftpb.Snapshot) error { + return st.WAL.ReleaseLockTo(snap.Metadata.Index) +} + +func checkWALSnap(lg *zap.Logger, waldir string, snapshot *raftpb.Snapshot) bool { + if snapshot == nil { + if lg != nil { + lg.Fatal("checkWALSnap: snapshot is empty") + } else { + plog.Fatal("heckWALSnap: snapshot is empty") + } + } + + walsnap := walpb.Snapshot{ + Index: snapshot.Metadata.Index, + Term: snapshot.Metadata.Term, + } + + w, _, _, st, _ := readWAL(lg, waldir, walsnap) + defer w.Close() + + if lg != nil { + lg.Info( + "checkWALSnap: snapshot and hardstate data", + zap.Uint64("snapshot-index", snapshot.Metadata.Index), + zap.Uint64("st-commit", st.Commit), + ) + } else { + plog.Infof("checkWALSnap: snapshot index: `%d`, HardState Commit: `%d`", snapshot.Metadata.Index, st.Commit) + } + + if snapshot.Metadata.Index > st.Commit { + return false + } + + return true +} + func readWAL(lg *zap.Logger, waldir string, snap walpb.Snapshot) (w *wal.WAL, id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry) { var ( err error diff --git a/wal/wal.go b/wal/wal.go index 7200ad088dd..d86a9fea362 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -746,6 +746,72 @@ func (w *WAL) SaveSnapshot(e walpb.Snapshot) error { return w.sync() } +func (w *WAL) SaveAll(st raftpb.HardState, ents []raftpb.Entry, snap raftpb.Snapshot) error { + w.mu.Lock() + defer w.mu.Unlock() + + // short cut, do not call sync + if raft.IsEmptyHardState(st) && len(ents) == 0 && raft.IsEmptySnap(snap) { + return nil + } + + mustSync := raft.MustSync(st, w.state, len(ents)) + + if !raft.IsEmptySnap(snap) { + mustSync = true + } + + // 1. Save entries + // TODO(xiangli): no more reference operator + for i := range ents { + if err := w.saveEntry(&ents[i]); err != nil { + return err + } + // gofail: var raftAfterSaveWALFirstEntry struct{} + } + // gofail: var raftAfterSaveWALEntries struct{} + + // 2. Save snapshot + if !raft.IsEmptySnap(snap) { + e := walpb.Snapshot{ + Index: snap.Metadata.Index, + Term: snap.Metadata.Term, + } + + b := pbutil.MustMarshal(&e) + + rec := &walpb.Record{Type: snapshotType, Data: b} + if err := w.encoder.encode(rec); err != nil { + return err + } + + // update enti only when snapshot is ahead of last index + if w.enti < e.Index { + w.enti = e.Index + } + // gofail: var raftAfterSaveWALSnap struct{} + } + + // 3. Save HardState + if err := w.saveState(&st); err != nil { + return err + } + // gofail: var raftAfterSaveWALState struct{} + + curOff, err := w.tail().Seek(0, io.SeekCurrent) + if err != nil { + return err + } + if curOff < SegmentSizeBytes { + if mustSync { + return w.sync() + } + return nil + } + + return w.cut() +} + func (w *WAL) saveCrc(prevCrc uint32) error { return w.encoder.encode(&walpb.Record{Type: crcType, Crc: prevCrc}) } From e67b5ae9a7f7044005c41ed0a22b9a9b6e9ab3e9 Mon Sep 17 00:00:00 2001 From: Viacheslav Biriukov Date: Fri, 4 Jan 2019 17:45:02 +0000 Subject: [PATCH 2/5] etcdserver/*: changes to snapshots and wal logic to fix #10219 --- build | 2 +- etcdserver/raft.go | 8 +++--- etcdserver/server.go | 9 ++++++ etcdserver/storage.go | 14 --------- wal/wal.go | 66 ------------------------------------------- 5 files changed, 14 insertions(+), 85 deletions(-) diff --git a/build b/build index 565d0732a73..2d2ee5cf16c 100755 --- a/build +++ b/build @@ -16,7 +16,7 @@ GO_LDFLAGS="$GO_LDFLAGS -X ${REPO_PATH}/version.GitSHA=${GIT_SHA}" toggle_failpoints() { mode="$1" if command -v gofail >/dev/null 2>&1; then - gofail "$mode" etcdserver/ mvcc/backend/ wal/ + gofail "$mode" etcdserver/ mvcc/backend/ elif [[ "$mode" != "disable" ]]; then echo "FAILPOINTS set but gofail not found" exit 1 diff --git a/etcdserver/raft.go b/etcdserver/raft.go index 396a72b6952..9cfacaa6248 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -240,7 +240,7 @@ func (r *raftNode) start(rh *raftReadyHandler) { if !raft.IsEmptySnap(rd.Snapshot) { // gofail: var raftBeforeSaveSnap struct{} - if err := r.storage.SaveSnapshot(rd.Snapshot); err != nil { + if err := r.storage.SaveSnap(rd.Snapshot); err != nil { if r.lg != nil { r.lg.Fatal("failed to save Raft snapshot", zap.Error(err)) } else { @@ -250,8 +250,8 @@ func (r *raftNode) start(rh *raftReadyHandler) { // gofail: var raftAfterSaveSnap struct{} } - // gofail: var raftBeforeSaveAll struct{} - if err := r.storage.SaveAll(rd.HardState, rd.Entries, rd.Snapshot); err != nil { + // gofail: var raftBeforeSave struct{} + if err := r.storage.Save(rd.HardState, rd.Entries); err != nil { if r.lg != nil { r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err)) } else { @@ -261,7 +261,7 @@ func (r *raftNode) start(rh *raftReadyHandler) { if !raft.IsEmptyHardState(rd.HardState) { proposalsCommitted.Set(float64(rd.HardState.Commit)) } - // gofail: var raftAfterSaveAll struct{} + // gofail: var raftAfterSave struct{} if !raft.IsEmptySnap(rd.Snapshot) { // etcdserver now claim the snapshot has been persisted onto the disk diff --git a/etcdserver/server.go b/etcdserver/server.go index 5af566b06f0..d0c9a925630 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -2188,6 +2188,15 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { plog.Fatalf("save snapshot error: %v", err) } } + + if err = s.r.storage.Release(snap); err != nil { + if lg != nil { + lg.Panic("failed to release wal", zap.Error(err)) + } else { + plog.Fatalf("failed to release wal error: %v", err) + } + } + if lg != nil { lg.Info( "saved snapshot", diff --git a/etcdserver/storage.go b/etcdserver/storage.go index 401e162a4c6..29a43691547 100644 --- a/etcdserver/storage.go +++ b/etcdserver/storage.go @@ -36,12 +36,6 @@ type Storage interface { SaveSnap(snap raftpb.Snapshot) error // Close closes the Storage and performs finalization. Close() error - - // SaveSnapshot function saves only snapshot to the underlying stable storage. - SaveSnapshot(snap raftpb.Snapshot) error - // SaveAll function saves ents, snapshot and state to the underlying stable storage. - // SaveAll MUST block until st and ents are on stable storage. - SaveAll(st raftpb.HardState, ents []raftpb.Entry, snap raftpb.Snapshot) error // Release release release the locked wal files since they will not be used. Release(snap raftpb.Snapshot) error } @@ -66,15 +60,7 @@ func (st *storage) SaveSnap(snap raftpb.Snapshot) error { if err != nil { return err } - err = st.Snapshotter.SaveSnap(snap) - if err != nil { - return err - } - return st.WAL.ReleaseLockTo(snap.Metadata.Index) -} -// SaveSnapshot saves the snapshot to disk. -func (st *storage) SaveSnapshot(snap raftpb.Snapshot) error { return st.Snapshotter.SaveSnap(snap) } diff --git a/wal/wal.go b/wal/wal.go index d86a9fea362..7200ad088dd 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -746,72 +746,6 @@ func (w *WAL) SaveSnapshot(e walpb.Snapshot) error { return w.sync() } -func (w *WAL) SaveAll(st raftpb.HardState, ents []raftpb.Entry, snap raftpb.Snapshot) error { - w.mu.Lock() - defer w.mu.Unlock() - - // short cut, do not call sync - if raft.IsEmptyHardState(st) && len(ents) == 0 && raft.IsEmptySnap(snap) { - return nil - } - - mustSync := raft.MustSync(st, w.state, len(ents)) - - if !raft.IsEmptySnap(snap) { - mustSync = true - } - - // 1. Save entries - // TODO(xiangli): no more reference operator - for i := range ents { - if err := w.saveEntry(&ents[i]); err != nil { - return err - } - // gofail: var raftAfterSaveWALFirstEntry struct{} - } - // gofail: var raftAfterSaveWALEntries struct{} - - // 2. Save snapshot - if !raft.IsEmptySnap(snap) { - e := walpb.Snapshot{ - Index: snap.Metadata.Index, - Term: snap.Metadata.Term, - } - - b := pbutil.MustMarshal(&e) - - rec := &walpb.Record{Type: snapshotType, Data: b} - if err := w.encoder.encode(rec); err != nil { - return err - } - - // update enti only when snapshot is ahead of last index - if w.enti < e.Index { - w.enti = e.Index - } - // gofail: var raftAfterSaveWALSnap struct{} - } - - // 3. Save HardState - if err := w.saveState(&st); err != nil { - return err - } - // gofail: var raftAfterSaveWALState struct{} - - curOff, err := w.tail().Seek(0, io.SeekCurrent) - if err != nil { - return err - } - if curOff < SegmentSizeBytes { - if mustSync { - return w.sync() - } - return nil - } - - return w.cut() -} - func (w *WAL) saveCrc(prevCrc uint32) error { return w.encoder.encode(&walpb.Record{Type: crcType, Crc: prevCrc}) } From de17035e2b28d92e2e21698d58cbcb4a2a41d580 Mon Sep 17 00:00:00 2001 From: Viacheslav Biriukov Date: Mon, 7 Jan 2019 10:48:16 +0000 Subject: [PATCH 3/5] etcdserver/*: fix tests --- etcdserver/server_test.go | 36 ++++++++++++++++++------ pkg/mock/mockstorage/storage_recorder.go | 7 +++++ 2 files changed, 34 insertions(+), 9 deletions(-) diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 46a5363f815..665514c6bf3 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -972,12 +972,16 @@ func TestSnapshot(t *testing.T) { gaction, _ := p.Wait(1) defer func() { ch <- struct{}{} }() - if len(gaction) != 1 { - t.Fatalf("len(action) = %d, want 1", len(gaction)) + if len(gaction) != 2 { + t.Fatalf("len(action) = %d, want 2", len(gaction)) } if !reflect.DeepEqual(gaction[0], testutil.Action{Name: "SaveSnap"}) { t.Errorf("action = %s, want SaveSnap", gaction[0]) } + + if !reflect.DeepEqual(gaction[1], testutil.Action{Name: "Release"}) { + t.Errorf("action = %s, want Release", gaction[1]) + } }() go func() { @@ -1060,20 +1064,28 @@ func TestSnapshotOrdering(t *testing.T) { n.readyc <- raft.Ready{Snapshot: snapMsg.Snapshot} }() - if ac := <-p.Chan(); ac.Name != "Save" { + ac := <-p.Chan() + if ac.Name != "Save" { t.Fatalf("expected Save, got %+v", ac) } + + if ac := <-p.Chan(); ac.Name != "SaveSnap" { + t.Fatalf("expected SaveSnap, got %+v", ac) + } + if ac := <-p.Chan(); ac.Name != "Save" { t.Fatalf("expected Save, got %+v", ac) } + // confirm snapshot file still present before calling SaveSnap snapPath := filepath.Join(snapdir, fmt.Sprintf("%016x.snap.db", 1)) if !fileutil.Exist(snapPath) { t.Fatalf("expected file %q, got missing", snapPath) } + // unblock SaveSnapshot, etcdserver now permitted to move snapshot file - if ac := <-p.Chan(); ac.Name != "SaveSnap" { - t.Fatalf("expected SaveSnap, got %+v", ac) + if ac := <-p.Chan(); ac.Name != "Release" { + t.Fatalf("expected Release, got %+v", ac) } } @@ -1112,16 +1124,22 @@ func TestTriggerSnap(t *testing.T) { donec := make(chan struct{}) go func() { - wcnt := 2 + snapc + wcnt := 3 + snapc gaction, _ := p.Wait(wcnt) // each operation is recorded as a Save - // (SnapshotCount+1) * Puts + SaveSnap = (SnapshotCount+1) * Save + SaveSnap + // (SnapshotCount+1) * Puts + SaveSnap = (SnapshotCount+1) * Save + SaveSnap + Release if len(gaction) != wcnt { + fmt.Println("gaction", gaction) t.Fatalf("len(action) = %d, want %d", len(gaction), wcnt) } - if !reflect.DeepEqual(gaction[wcnt-1], testutil.Action{Name: "SaveSnap"}) { - t.Errorf("action = %s, want SaveSnap", gaction[wcnt-1]) + + if !reflect.DeepEqual(gaction[wcnt-2], testutil.Action{Name: "SaveSnap"}) { + t.Errorf("action = %s, want SaveSnap", gaction[wcnt-2]) + } + + if !reflect.DeepEqual(gaction[wcnt-1], testutil.Action{Name: "Release"}) { + t.Errorf("action = %s, want Release", gaction[wcnt-1]) } close(donec) }() diff --git a/pkg/mock/mockstorage/storage_recorder.go b/pkg/mock/mockstorage/storage_recorder.go index d05413e62a7..9585b68e0e8 100644 --- a/pkg/mock/mockstorage/storage_recorder.go +++ b/pkg/mock/mockstorage/storage_recorder.go @@ -45,4 +45,11 @@ func (p *storageRecorder) SaveSnap(st raftpb.Snapshot) error { return nil } +func (p *storageRecorder) Release(st raftpb.Snapshot) error { + if !raft.IsEmptySnap(st) { + p.Record(testutil.Action{Name: "Release"}) + } + return nil +} + func (p *storageRecorder) Close() error { return nil } From 6492bcb4478cd5f075679afb7bb6a9d45722b8fd Mon Sep 17 00:00:00 2001 From: Viacheslav Biriukov Date: Mon, 7 Jan 2019 14:27:19 +0000 Subject: [PATCH 4/5] etcdserver/*: rollback default settings --- etcdserver/server.go | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/etcdserver/server.go b/etcdserver/server.go index d0c9a925630..3e68710d417 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -65,14 +65,14 @@ import ( ) const ( - DefaultSnapshotCount = 10 + DefaultSnapshotCount = 100000 // DefaultSnapshotCatchUpEntries is the number of entries for a slow follower // to catch-up after compacting the raft storage entries. // We expect the follower has a millisecond level latency with the leader. // The max throughput is around 10K. Keep a 5K entries is enough for helping // follower to catch up. - DefaultSnapshotCatchUpEntries uint64 = 10 + DefaultSnapshotCatchUpEntries uint64 = 5000 StoreClusterPrefix = "/0" StoreKeysPrefix = "/1" @@ -420,11 +420,6 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { } // Find a snapshot to start/restart a raft node - var ( - snapshot *raftpb.Snapshot - err error - ) - for i := uint64(0); ; i++ { snapshot, err = ss.LoadIndex(i) if err != nil && err != snap.ErrNoSnapshot { From 389c1a004c803753d0bf3a8c78ef65e49af13e91 Mon Sep 17 00:00:00 2001 From: Viacheslav Biriukov Date: Tue, 8 Jan 2019 17:15:30 +0000 Subject: [PATCH 5/5] etcdserver/*, wal/*: add Sync method --- etcdserver/raft.go | 7 +++++++ etcdserver/server_test.go | 6 +++++- etcdserver/storage.go | 2 ++ pkg/mock/mockstorage/storage_recorder.go | 5 +++++ wal/wal.go | 4 ++++ 5 files changed, 23 insertions(+), 1 deletion(-) diff --git a/etcdserver/raft.go b/etcdserver/raft.go index 9cfacaa6248..ca8020daada 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -264,6 +264,13 @@ func (r *raftNode) start(rh *raftReadyHandler) { // gofail: var raftAfterSave struct{} if !raft.IsEmptySnap(rd.Snapshot) { + // Force WAL to fsync its hard state before Release() releases + // old data from the WAL. Otherwise could get an error like: + // panic: tocommit(107) is out of range [lastIndex(84)]. Was the raft log corrupted, truncated, or lost? + if err := r.storage.Sync(); err != nil { + log.Fatal(err) + } + // etcdserver now claim the snapshot has been persisted onto the disk notifyc <- struct{}{} diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 665514c6bf3..a2be3b86c53 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -969,7 +969,7 @@ func TestSnapshot(t *testing.T) { ch := make(chan struct{}, 2) go func() { - gaction, _ := p.Wait(1) + gaction, _ := p.Wait(2) defer func() { ch <- struct{}{} }() if len(gaction) != 2 { @@ -1084,6 +1084,10 @@ func TestSnapshotOrdering(t *testing.T) { } // unblock SaveSnapshot, etcdserver now permitted to move snapshot file + if ac := <-p.Chan(); ac.Name != "Sync" { + t.Fatalf("expected Sync, got %+v", ac) + } + if ac := <-p.Chan(); ac.Name != "Release" { t.Fatalf("expected Release, got %+v", ac) } diff --git a/etcdserver/storage.go b/etcdserver/storage.go index 29a43691547..64568d17ee1 100644 --- a/etcdserver/storage.go +++ b/etcdserver/storage.go @@ -38,6 +38,8 @@ type Storage interface { Close() error // Release release release the locked wal files since they will not be used. Release(snap raftpb.Snapshot) error + // Sync WAL + Sync() error } type storage struct { diff --git a/pkg/mock/mockstorage/storage_recorder.go b/pkg/mock/mockstorage/storage_recorder.go index 9585b68e0e8..24938b868c4 100644 --- a/pkg/mock/mockstorage/storage_recorder.go +++ b/pkg/mock/mockstorage/storage_recorder.go @@ -52,4 +52,9 @@ func (p *storageRecorder) Release(st raftpb.Snapshot) error { return nil } +func (p *storageRecorder) Sync() error { + p.Record(testutil.Action{Name: "Sync"}) + return nil +} + func (p *storageRecorder) Close() error { return nil } diff --git a/wal/wal.go b/wal/wal.go index 7200ad088dd..71d9e02869e 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -595,6 +595,10 @@ func (w *WAL) sync() error { return err } +func (w *WAL) Sync() error { + return w.sync() +} + // ReleaseLockTo releases the locks, which has smaller index than the given index // except the largest one among them. // For example, if WAL is holding lock 1,2,3,4,5,6, ReleaseLockTo(4) will release