Skip to content

Commit 4a347c4

Browse files
committed
Fix inconsistent state between WAL and saved Snapshot
Signed-off-by: zghh <[email protected]>
1 parent bcc7758 commit 4a347c4

File tree

2 files changed

+66
-41
lines changed

2 files changed

+66
-41
lines changed

orderer/consensus/etcdraft/storage.go

+61-36
Original file line numberDiff line numberDiff line change
@@ -75,22 +75,9 @@ func CreateStorage(
7575
return nil, err
7676
}
7777

78-
snapshot, err := sn.Load()
78+
snapshot, w, st, ents, err := loadNewestAvailableSnapshot(lg, walDir, snapDir)
7979
if err != nil {
80-
if err == snap.ErrNoSnapshot {
81-
lg.Debugf("No snapshot found at %s", snapDir)
82-
} else {
83-
return nil, errors.Errorf("failed to load snapshot: %s", err)
84-
}
85-
} else {
86-
// snapshot found
87-
lg.Debugf("Loaded snapshot at Term %d and Index %d, Nodes: %+v",
88-
snapshot.Metadata.Term, snapshot.Metadata.Index, snapshot.Metadata.ConfState.Nodes)
89-
}
90-
91-
w, st, ents, err := createOrReadWAL(lg, walDir, snapshot)
92-
if err != nil {
93-
return nil, errors.Errorf("failed to create or read WAL: %s", err)
80+
return nil, errors.Errorf("Failed to load snapshot and WAL: %s", err)
9481
}
9582

9683
if snapshot != nil {
@@ -120,26 +107,11 @@ func CreateStorage(
120107
// ListSnapshots returns a list of RaftIndex of snapshots stored on disk.
121108
// If a file is corrupted, rename the file.
122109
func ListSnapshots(logger *flogging.FabricLogger, snapDir string) []uint64 {
123-
dir, err := os.Open(snapDir)
110+
snapfiles, err := listSnapshotFiles(logger, snapDir)
124111
if err != nil {
125-
logger.Errorf("Failed to open snapshot directory %s: %s", snapDir, err)
112+
logger.Errorf("Failed to list snapshot files from %s: %s", snapDir, err)
126113
return nil
127114
}
128-
defer dir.Close()
129-
130-
filenames, err := dir.Readdirnames(-1)
131-
if err != nil {
132-
logger.Errorf("Failed to read snapshot files: %s", err)
133-
return nil
134-
}
135-
136-
snapfiles := []string{}
137-
for i := range filenames {
138-
if strings.HasSuffix(filenames[i], ".snap") {
139-
snapfiles = append(snapfiles, filenames[i])
140-
}
141-
}
142-
sort.Strings(snapfiles)
143115

144116
var snapshots []uint64
145117
for _, snapfile := range snapfiles {
@@ -242,15 +214,17 @@ func (rs *RaftStorage) Snapshot() raftpb.Snapshot {
242214

243215
// Store persists etcd/raft data
244216
func (rs *RaftStorage) Store(entries []raftpb.Entry, hardstate raftpb.HardState, snapshot raftpb.Snapshot) error {
245-
if err := rs.wal.Save(hardstate, entries); err != nil {
246-
return err
247-
}
248-
249217
if !raft.IsEmptySnap(snapshot) {
250218
if err := rs.saveSnap(snapshot); err != nil {
251219
return err
252220
}
221+
}
222+
223+
if err := rs.wal.Save(hardstate, entries); err != nil {
224+
return err
225+
}
253226

227+
if !raft.IsEmptySnap(snapshot) {
254228
if err := rs.ram.ApplySnapshot(snapshot); err != nil {
255229
if err == raft.ErrSnapOutOfDate {
256230
rs.lg.Warnf("Attempted to apply out-of-date snapshot at Term %d and Index %d",
@@ -447,3 +421,54 @@ func (rs *RaftStorage) Close() error {
447421

448422
return nil
449423
}
424+
425+
func loadNewestAvailableSnapshot(lg *flogging.FabricLogger, walDir, snapDir string) (*raftpb.Snapshot, *wal.WAL, raftpb.HardState, []raftpb.Entry, error) {
426+
snapfiles, err := listSnapshotFiles(lg, snapDir)
427+
if err != nil {
428+
lg.Errorf("Failed to list snapshot files from %s: %s", snapDir, err)
429+
}
430+
for i := len(snapfiles) - 1; i >= 0; i-- {
431+
snapshot, err := snap.Read(lg.Zap(), filepath.Join(snapDir, snapfiles[i]))
432+
if err != nil {
433+
lg.Warnf("Can not read snapshot from %s: %s", snapfiles[i], err)
434+
continue
435+
}
436+
w, st, ents, err := createOrReadWAL(lg, walDir, snapshot)
437+
if err != nil {
438+
lg.Warnf("Create or read wal error: %s", err)
439+
continue
440+
}
441+
if snapshot.Metadata.Index <= st.Commit {
442+
return snapshot, w, st, ents, nil
443+
}
444+
if err := w.Close(); err != nil {
445+
return nil, nil, raftpb.HardState{}, nil, err
446+
}
447+
}
448+
lg.Warnf("Not available snapshot found in %s", snapDir)
449+
w, st, ents, err := createOrReadWAL(lg, walDir, nil)
450+
return nil, w, st, ents, err
451+
}
452+
453+
func listSnapshotFiles(logging *flogging.FabricLogger, snapDir string) ([]string, error) {
454+
dir, err := os.Open(snapDir)
455+
if err != nil {
456+
return nil, err
457+
}
458+
defer dir.Close()
459+
460+
filenames, err := dir.Readdirnames(-1)
461+
if err != nil {
462+
return nil, err
463+
}
464+
465+
snapfiles := []string{}
466+
for i := range filenames {
467+
if strings.HasSuffix(filenames[i], ".snap") {
468+
snapfiles = append(snapfiles, filenames[i])
469+
}
470+
}
471+
sort.Strings(snapfiles)
472+
473+
return snapfiles, nil
474+
}

orderer/consensus/etcdraft/storage_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ func TestOpenWAL(t *testing.T) {
8080
for i := 0; i < 10; i++ {
8181
store.Store(
8282
[]raftpb.Entry{{Index: uint64(i), Data: make([]byte, 10)}},
83-
raftpb.HardState{},
83+
raftpb.HardState{Commit: uint64(i)},
8484
raftpb.Snapshot{},
8585
)
8686
}
@@ -155,7 +155,7 @@ func TestTakeSnapshot(t *testing.T) {
155155
for i := 0; i < 10; i++ {
156156
store.Store(
157157
[]raftpb.Entry{{Index: uint64(i), Data: make([]byte, 100)}},
158-
raftpb.HardState{},
158+
raftpb.HardState{Commit: uint64(i)},
159159
raftpb.Snapshot{},
160160
)
161161
}
@@ -216,7 +216,7 @@ func TestTakeSnapshot(t *testing.T) {
216216
for i := 0; i < 10; i++ {
217217
store.Store(
218218
[]raftpb.Entry{{Index: uint64(i), Data: make([]byte, 100)}},
219-
raftpb.HardState{},
219+
raftpb.HardState{Commit: uint64(i)},
220220
raftpb.Snapshot{},
221221
)
222222
}
@@ -282,7 +282,7 @@ func TestTakeSnapshot(t *testing.T) {
282282
for i := 0; i < 10; i++ {
283283
store.Store(
284284
[]raftpb.Entry{{Index: uint64(i), Data: make([]byte, 100)}},
285-
raftpb.HardState{},
285+
raftpb.HardState{Commit: uint64(i)},
286286
raftpb.Snapshot{},
287287
)
288288
}
@@ -369,7 +369,7 @@ func TestApplyOutOfDateSnapshot(t *testing.T) {
369369
for i := 0; i < 10; i++ {
370370
store.Store(
371371
[]raftpb.Entry{{Index: uint64(i), Data: make([]byte, 100)}},
372-
raftpb.HardState{},
372+
raftpb.HardState{Commit: uint64(i)},
373373
raftpb.Snapshot{},
374374
)
375375
}

0 commit comments

Comments
 (0)