Skip to content

Commit

Permalink
Merge pull request #545 from hashicorp/post-restore-reset
Browse files Browse the repository at this point in the history
Post restore reset
  • Loading branch information
jmurret committed Mar 17, 2023
2 parents 14d2c5e + fdcb70b commit d3128ae
Show file tree
Hide file tree
Showing 6 changed files with 303 additions and 21 deletions.
15 changes: 14 additions & 1 deletion log.go
Expand Up @@ -122,13 +122,26 @@ type LogStore interface {
// StoreLog stores a log entry.
StoreLog(log *Log) error

// StoreLogs stores multiple log entries.
// StoreLogs stores multiple log entries. By default the logs stored may not be contiguous with previous logs (i.e. may have a gap in Index since the last log written). If an implementation can't tolerate this it may optionally implement `MonotonicLogStore` to indicate that this is not allowed. This changes Raft's behaviour after restoring a user snapshot to remove all previous logs instead of relying on a "gap" to signal the discontinuity between logs before the snapshot and logs after.
StoreLogs(logs []*Log) error

// DeleteRange deletes a range of log entries. The range is inclusive.
DeleteRange(min, max uint64) error
}

// MonotonicLogStore is an optional interface for LogStore implementations that
// cannot tolerate gaps in between the Index values of consecutive log entries. For example,
// this may allow more efficient indexing because the Index values are densely populated. If true is
// returned, Raft will avoid relying on gaps to trigger re-synching logs on followers after a
// snapshot is restored. The LogStore must have an efficient implementation of
// DeleteLogs for the case where all logs are removed, as this must be called after snapshot restore when gaps are not allowed.
// We avoid deleting all records for LogStores that do not implement MonotonicLogStore
// because although it's always correct to do so, it has a major negative performance impact on the BoltDB store that is currently
// the most widely used.
type MonotonicLogStore interface {
IsMonotonic() bool
}

func oldestLog(s LogStore) (Log, error) {
var l Log

Expand Down
10 changes: 10 additions & 0 deletions log_cache.go
Expand Up @@ -33,6 +33,16 @@ func NewLogCache(capacity int, store LogStore) (*LogCache, error) {
return c, nil
}

// IsMonotonic implements the MonotonicLogStore interface. This is a shim to
// expose the underyling store as monotonically indexed or not.
func (c *LogCache) IsMonotonic() bool {
if store, ok := c.store.(MonotonicLogStore); ok {
return store.IsMonotonic()
}

return false
}

func (c *LogCache) GetLog(idx uint64, log *Log) error {
// Check the buffer for an entry
c.l.RLock()
Expand Down
19 changes: 15 additions & 4 deletions raft.go
Expand Up @@ -1122,7 +1122,14 @@ func (r *Raft) restoreUserSnapshot(meta *SnapshotMeta, reader io.Reader) error {
r.setLastApplied(lastIndex)
r.setLastSnapshot(lastIndex, term)

r.logger.Info("restored user snapshot", "index", latestIndex)
// Remove old logs if r.logs is a MonotonicLogStore. Log any errors and continue.
if logs, ok := r.logs.(MonotonicLogStore); ok && logs.IsMonotonic() {
if err := r.removeOldLogs(); err != nil {
r.logger.Error("failed to remove old logs", "error", err)
}
}

r.logger.Info("restored user snapshot", "index", lastIndex)
return nil
}

Expand Down Expand Up @@ -1790,15 +1797,19 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) {
r.setLatestConfiguration(reqConfiguration, reqConfigurationIndex)
r.setCommittedConfiguration(reqConfiguration, reqConfigurationIndex)

// Compact logs, continue even if this fails
if err := r.compactLogs(req.LastLogIndex); err != nil {
// Clear old logs if r.logs is a MonotonicLogStore. Otherwise compact the
// logs. In both cases, log any errors and continue.
if mlogs, ok := r.logs.(MonotonicLogStore); ok && mlogs.IsMonotonic() {
if err := r.removeOldLogs(); err != nil {
r.logger.Error("failed to reset logs", "error", err)
}
} else if err := r.compactLogs(req.LastLogIndex); err != nil {
r.logger.Error("failed to compact logs", "error", err)
}

r.logger.Info("Installed remote snapshot")
resp.Success = true
r.setLastContact()
return
}

// setLastContact is used to set the last contact time to now
Expand Down
190 changes: 182 additions & 8 deletions raft_test.go
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -1019,6 +1020,88 @@ func TestRaft_SnapshotRestore(t *testing.T) {
}
}

func TestRaft_RestoreSnapshotOnStartup_Monotonic(t *testing.T) {
// Make the cluster
conf := inmemConfig(t)
conf.TrailingLogs = 10
opts := &MakeClusterOpts{
Peers: 1,
Bootstrap: true,
Conf: conf,
MonotonicLogs: true,
}
c := MakeClusterCustom(t, opts)
defer c.Close()

leader := c.Leader()

// Commit a lot of things
var future Future
for i := 0; i < 100; i++ {
future = leader.Apply([]byte(fmt.Sprintf("test%d", i)), 0)
}

// Wait for the last future to apply
if err := future.Error(); err != nil {
t.Fatalf("err: %v", err)
}

// Take a snapshot
snapFuture := leader.Snapshot()
if err := snapFuture.Error(); err != nil {
t.Fatalf("err: %v", err)
}

// Check for snapshot
snaps, _ := leader.snapshots.List()
if len(snaps) != 1 {
t.Fatalf("should have a snapshot")
}
snap := snaps[0]

// Logs should be trimmed
firstIdx, err := leader.logs.FirstIndex()
if err != nil {
t.Fatalf("err: %v", err)
}
lastIdx, err := leader.logs.LastIndex()
if err != nil {
t.Fatalf("err: %v", err)
}

if firstIdx != snap.Index-conf.TrailingLogs+1 {
t.Fatalf("should trim logs to %d: but is %d", snap.Index-conf.TrailingLogs+1, firstIdx)
}

// Shutdown
shutdown := leader.Shutdown()
if err := shutdown.Error(); err != nil {
t.Fatalf("err: %v", err)
}

// Restart the Raft
r := leader
// Can't just reuse the old transport as it will be closed
_, trans2 := NewInmemTransport(r.trans.LocalAddr())
cfg := r.config()
r, err = NewRaft(&cfg, r.fsm, r.logs, r.stable, r.snapshots, trans2)
if err != nil {
t.Fatalf("err: %v", err)
}
c.rafts[0] = r

// We should have restored from the snapshot!
if last := r.getLastApplied(); last != snap.Index {
t.Fatalf("bad last index: %d, expecting %d", last, snap.Index)
}

// Verify that logs have not been reset
first, _ := r.logs.FirstIndex()
last, _ := r.logs.LastIndex()
assert.Equal(t, firstIdx, first)
assert.Equal(t, lastIdx, last)
}

func TestRaft_SnapshotRestore_Progress(t *testing.T) {
// Make the cluster
conf := inmemConfig(t)
Expand Down Expand Up @@ -1342,7 +1425,9 @@ func TestRaft_UserSnapshot(t *testing.T) {

// snapshotAndRestore does a snapshot and restore sequence and applies the given
// offset to the snapshot index, so we can try out different situations.
func snapshotAndRestore(t *testing.T, offset uint64) {
func snapshotAndRestore(t *testing.T, offset uint64, monotonicLogStore bool, restoreNewCluster bool) {
t.Helper()

// Make the cluster.
conf := inmemConfig(t)

Expand All @@ -1352,7 +1437,19 @@ func snapshotAndRestore(t *testing.T, offset uint64) {
conf.ElectionTimeout = 500 * time.Millisecond
conf.LeaderLeaseTimeout = 500 * time.Millisecond

c := MakeCluster(3, t, conf)
var c *cluster
numPeers := 3
optsMonotonic := &MakeClusterOpts{
Peers: numPeers,
Bootstrap: true,
Conf: conf,
MonotonicLogs: true,
}
if monotonicLogStore {
c = MakeClusterCustom(t, optsMonotonic)
} else {
c = MakeCluster(numPeers, t, conf)
}
defer c.Close()

// Wait for things to get stable and commit some things.
Expand Down Expand Up @@ -1382,6 +1479,17 @@ func snapshotAndRestore(t *testing.T, offset uint64) {
// Get the last index before the restore.
preIndex := leader.getLastIndex()

if restoreNewCluster {
var c2 *cluster
if monotonicLogStore {
c2 = MakeClusterCustom(t, optsMonotonic)
} else {
c2 = MakeCluster(numPeers, t, conf)
}
c = c2
leader = c.Leader()
}

// Restore the snapshot, twiddling the index with the offset.
meta, reader, err := snap.Open()
meta.Index += offset
Expand All @@ -1397,17 +1505,40 @@ func snapshotAndRestore(t *testing.T, offset uint64) {
// an index to create a hole, and then we apply a no-op after the
// restore.
var expected uint64
if meta.Index < preIndex {
if !restoreNewCluster && meta.Index < preIndex {
expected = preIndex + 2
} else {
// restoring onto a new cluster should always have a last index based
// off of the snaphsot meta index
expected = meta.Index + 2
}

lastIndex := leader.getLastIndex()
if lastIndex != expected {
t.Fatalf("Index was not updated correctly: %d vs. %d", lastIndex, expected)
}

// Ensure all the logs are the same and that we have everything that was
// Ensure raft logs are removed for monotonic log stores but remain
// untouched for non-monotic (BoltDB) logstores.
// When first index = 1, then logs have remained untouched.
// When first indext is set to the next commit index / last index, then
// it means logs have been removed.
raftNodes := make([]*Raft, 0, numPeers+1)
raftNodes = append(raftNodes, leader)
raftNodes = append(raftNodes, c.Followers()...)
for _, raftNode := range raftNodes {
firstLogIndex, err := raftNode.logs.FirstIndex()
require.NoError(t, err)
lastLogIndex, err := raftNode.logs.LastIndex()
require.NoError(t, err)
if monotonicLogStore {
require.Equal(t, expected, firstLogIndex)
} else {
require.Equal(t, uint64(1), firstLogIndex)
}
require.Equal(t, expected, lastLogIndex)
}
// Ensure all the fsm logs are the same and that we have everything that was
// part of the original snapshot, and that the contents after were
// reverted.
c.EnsureSame(t)
Expand All @@ -1418,7 +1549,7 @@ func snapshotAndRestore(t *testing.T, offset uint64) {
}
for i, entry := range fsm.logs {
expected := []byte(fmt.Sprintf("test %d", i))
if bytes.Compare(entry, expected) != 0 {
if !bytes.Equal(entry, expected) {
t.Fatalf("Log entry bad: %v", entry)
}
}
Expand Down Expand Up @@ -1446,10 +1577,17 @@ func TestRaft_UserRestore(t *testing.T) {
10000,
}

restoreToNewClusterCases := []bool{false, true}

for _, c := range cases {
t.Run(fmt.Sprintf("case %v", c), func(t *testing.T) {
snapshotAndRestore(t, c)
})
for _, restoreNewCluster := range restoreToNewClusterCases {
t.Run(fmt.Sprintf("case %v | restored to new cluster: %t", c, restoreNewCluster), func(t *testing.T) {
snapshotAndRestore(t, c, false, restoreNewCluster)
})
t.Run(fmt.Sprintf("monotonic case %v | restored to new cluster: %t", c, restoreNewCluster), func(t *testing.T) {
snapshotAndRestore(t, c, true, restoreNewCluster)
})
}
}
}

Expand Down Expand Up @@ -2380,6 +2518,7 @@ func TestRaft_LeadershipTransferStopRightAway(t *testing.T) {
t.Errorf("leadership shouldn't have started, but instead it error with: %v", err)
}
}

func TestRaft_GetConfigurationNoBootstrap(t *testing.T) {
c := MakeCluster(2, t, nil)
defer c.Close()
Expand Down Expand Up @@ -2417,6 +2556,41 @@ func TestRaft_GetConfigurationNoBootstrap(t *testing.T) {
}
}

func TestRaft_LogStoreIsMonotonic(t *testing.T) {
c := MakeCluster(1, t, nil)
defer c.Close()

// Should be one leader
leader := c.Leader()
c.EnsureLeader(t, leader.localAddr)

// Test the monotonic type assertion on the InmemStore.
_, ok := leader.logs.(MonotonicLogStore)
assert.False(t, ok)

var log LogStore

// Wrapping the non-monotonic store as a LogCache should make it pass the
// type assertion, but the underlying store is still non-monotonic.
log, _ = NewLogCache(100, leader.logs)
mcast, ok := log.(MonotonicLogStore)
require.True(t, ok)
assert.False(t, mcast.IsMonotonic())

// Now create a new MockMonotonicLogStore using the leader logs and expect
// it to work.
log = &MockMonotonicLogStore{s: leader.logs}
mcast, ok = log.(MonotonicLogStore)
require.True(t, ok)
assert.True(t, mcast.IsMonotonic())

// Wrap the mock logstore in a LogCache and check again.
log, _ = NewLogCache(100, log)
mcast, ok = log.(MonotonicLogStore)
require.True(t, ok)
assert.True(t, mcast.IsMonotonic())
}

func TestRaft_CacheLogWithStoreError(t *testing.T) {
c := MakeCluster(2, t, nil)
defer c.Close()
Expand Down

0 comments on commit d3128ae

Please sign in to comment.