Skip to content

Commit

Permalink
Snapshot restore progress (#490)
Browse files Browse the repository at this point in the history
When restoring a snapshot (on startup, installed from the leader, or during recovery) the logs are extremely terse. There are typically bookend messages indicating that a restore is going to happen, and that it is complete, but there's a big dead space in the middle.

For small snapshots this is probably fine, but for larger multi-GB snapshots this can stretch out and can be unnerving as an operator to know if it's stuck or still making progress.

This PR adjusts the logging to indicate a simple progress log message every 10s about overall completion in bytes-consumed.
  • Loading branch information
rboyer committed Feb 11, 2022
1 parent d68b78b commit 3cb47c5
Show file tree
Hide file tree
Showing 6 changed files with 320 additions and 35 deletions.
79 changes: 48 additions & 31 deletions api.go
Expand Up @@ -4,7 +4,6 @@ import (
"errors"
"fmt"
"io"
"os"
"strconv"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -315,6 +314,9 @@ func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore,
if err != nil {
return fmt.Errorf("failed to list snapshots: %v", err)
}

logger := conf.getOrCreateLogger()

for _, snapshot := range snapshots {
var source io.ReadCloser
_, source, err = snaps.Open(snapshot.ID)
Expand All @@ -330,9 +332,18 @@ func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore,
// server instance. If the same process will eventually become a Raft peer
// then it will call NewRaft and restore again from disk then which will
// report metrics.
err = fsm.Restore(source)
snapLogger := logger.With(
"id", snapshot.ID,
"last-index", snapshot.Index,
"last-term", snapshot.Term,
"size-in-bytes", snapshot.Size,
)
crc := newCountingReadCloser(source)
monitor := startSnapshotRestoreMonitor(snapLogger, crc, snapshot.Size, false)
err = fsm.Restore(crc)
// Close the source after the restore has completed
source.Close()
monitor.StopAndWait()
if err != nil {
// Same here, skip and try the next one.
continue
Expand Down Expand Up @@ -463,20 +474,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
}

// Ensure we have a LogOutput.
var logger hclog.Logger
if conf.Logger != nil {
logger = conf.Logger
} else {
if conf.LogOutput == nil {
conf.LogOutput = os.Stderr
}

logger = hclog.New(&hclog.LoggerOptions{
Name: "raft",
Level: hclog.LevelFromString(conf.LogLevel),
Output: conf.LogOutput,
})
}
logger := conf.getOrCreateLogger()

// Try to restore the current term.
currentTerm, err := stable.GetUint64(keyCurrentTerm)
Expand Down Expand Up @@ -600,21 +598,8 @@ func (r *Raft) restoreSnapshot() error {

// Try to load in order of newest to oldest
for _, snapshot := range snapshots {
if !r.config().NoSnapshotRestoreOnStart {
_, source, err := r.snapshots.Open(snapshot.ID)
if err != nil {
r.logger.Error("failed to open snapshot", "id", snapshot.ID, "error", err)
continue
}

if err := fsmRestoreAndMeasure(r.fsm, source); err != nil {
source.Close()
r.logger.Error("failed to restore snapshot", "id", snapshot.ID, "error", err)
continue
}
source.Close()

r.logger.Info("restored from snapshot", "id", snapshot.ID)
if success := r.tryRestoreSingleSnapshot(snapshot); !success {
continue
}

// Update the lastApplied so we don't replay old logs
Expand Down Expand Up @@ -650,6 +635,38 @@ func (r *Raft) restoreSnapshot() error {
return nil
}

func (r *Raft) tryRestoreSingleSnapshot(snapshot *SnapshotMeta) bool {
if r.config().NoSnapshotRestoreOnStart {
return true
}

snapLogger := r.logger.With(
"id", snapshot.ID,
"last-index", snapshot.Index,
"last-term", snapshot.Term,
"size-in-bytes", snapshot.Size,
)

snapLogger.Info("starting restore from snapshot")

_, source, err := r.snapshots.Open(snapshot.ID)
if err != nil {
snapLogger.Error("failed to open snapshot", "error", err)
return false
}

if err := fsmRestoreAndMeasure(snapLogger, r.fsm, source, snapshot.Size); err != nil {
source.Close()
snapLogger.Error("failed to restore snapshot", "error", err)
return false
}
source.Close()

snapLogger.Info("restored from snapshot")

return true
}

func (r *Raft) config() Config {
return r.conf.Load().(Config)
}
Expand Down
16 changes: 16 additions & 0 deletions config.go
Expand Up @@ -3,6 +3,7 @@ package raft
import (
"fmt"
"io"
"os"
"time"

"github.com/hashicorp/go-hclog"
Expand Down Expand Up @@ -222,6 +223,21 @@ type Config struct {
skipStartup bool
}

func (conf *Config) getOrCreateLogger() hclog.Logger {
if conf.Logger != nil {
return conf.Logger
}
if conf.LogOutput == nil {
conf.LogOutput = os.Stderr
}

return hclog.New(&hclog.LoggerOptions{
Name: "raft",
Level: hclog.LevelFromString(conf.LogLevel),
Output: conf.LogOutput,
})
}

// ReloadableConfig is the subset of Config that may be reconfigured during
// runtime using raft.ReloadConfig. We choose to duplicate fields over embedding
// or accepting a Config but only using specific fields to keep the API clear.
Expand Down
21 changes: 18 additions & 3 deletions fsm.go
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/armon/go-metrics"
hclog "github.com/hashicorp/go-hclog"
)

// FSM is implemented by clients to make use of the replicated log.
Expand Down Expand Up @@ -184,8 +185,15 @@ func (r *Raft) runFSM() {
}
defer source.Close()

snapLogger := r.logger.With(
"id", req.ID,
"last-index", meta.Index,
"last-term", meta.Term,
"size-in-bytes", meta.Size,
)

// Attempt to restore
if err := fsmRestoreAndMeasure(r.fsm, source); err != nil {
if err := fsmRestoreAndMeasure(snapLogger, r.fsm, source, meta.Size); err != nil {
req.respond(fmt.Errorf("failed to restore snapshot %v: %v", req.ID, err))
return
}
Expand Down Expand Up @@ -241,13 +249,20 @@ func (r *Raft) runFSM() {
// fsmRestoreAndMeasure wraps the Restore call on an FSM to consistently measure
// and report timing metrics. The caller is still responsible for calling Close
// on the source in all cases.
func fsmRestoreAndMeasure(fsm FSM, source io.ReadCloser) error {
func fsmRestoreAndMeasure(logger hclog.Logger, fsm FSM, source io.ReadCloser, snapshotSize int64) error {
start := time.Now()
if err := fsm.Restore(source); err != nil {

crc := newCountingReadCloser(source)

monitor := startSnapshotRestoreMonitor(logger, crc, snapshotSize, false)
defer monitor.StopAndWait()

if err := fsm.Restore(crc); err != nil {
return err
}
metrics.MeasureSince([]string{"raft", "fsm", "restore"}, start)
metrics.SetGauge([]string{"raft", "fsm", "lastRestoreDuration"},
float32(time.Since(start).Milliseconds()))

return nil
}
130 changes: 130 additions & 0 deletions progress.go
@@ -0,0 +1,130 @@
package raft

import (
"context"
"io"
"sync"
"time"

hclog "github.com/hashicorp/go-hclog"
)

const (
snapshotRestoreMonitorInterval = 10 * time.Second
)

type snapshotRestoreMonitor struct {
logger hclog.Logger
cr CountingReader
size int64
networkTransfer bool

once sync.Once
cancel func()
doneCh chan struct{}
}

func startSnapshotRestoreMonitor(
logger hclog.Logger,
cr CountingReader,
size int64,
networkTransfer bool,
) *snapshotRestoreMonitor {
ctx, cancel := context.WithCancel(context.Background())

m := &snapshotRestoreMonitor{
logger: logger,
cr: cr,
size: size,
networkTransfer: networkTransfer,
cancel: cancel,
doneCh: make(chan struct{}),
}
go m.run(ctx)
return m
}

func (m *snapshotRestoreMonitor) run(ctx context.Context) {
defer close(m.doneCh)

ticker := time.NewTicker(snapshotRestoreMonitorInterval)
defer ticker.Stop()

ranOnce := false
for {
select {
case <-ctx.Done():
if !ranOnce {
m.runOnce()
}
return
case <-ticker.C:
m.runOnce()
ranOnce = true
}
}
}

func (m *snapshotRestoreMonitor) runOnce() {
readBytes := m.cr.Count()
pct := float64(100*readBytes) / float64(m.size)

message := "snapshot restore progress"
if m.networkTransfer {
message = "snapshot network transfer progress"
}

m.logger.Info(message,
"read-bytes", readBytes,
"percent-complete", hclog.Fmt("%0.2f%%", pct),
)
}

func (m *snapshotRestoreMonitor) StopAndWait() {
m.once.Do(func() {
m.cancel()
<-m.doneCh
})
}

type CountingReader interface {
io.Reader
Count() int64
}

type countingReader struct {
reader io.Reader

mu sync.Mutex
bytes int64
}

func (r *countingReader) Read(p []byte) (n int, err error) {
n, err = r.reader.Read(p)
r.mu.Lock()
r.bytes += int64(n)
r.mu.Unlock()
return n, err
}

func (r *countingReader) Count() int64 {
r.mu.Lock()
defer r.mu.Unlock()
return r.bytes
}

func newCountingReader(r io.Reader) *countingReader {
return &countingReader{reader: r}
}

type countingReadCloser struct {
*countingReader
io.Closer
}

func newCountingReadCloser(rc io.ReadCloser) *countingReadCloser {
return &countingReadCloser{
countingReader: newCountingReader(rc),
Closer: rc,
}
}
8 changes: 7 additions & 1 deletion raft.go
Expand Up @@ -1608,8 +1608,14 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) {
return
}

// Separately track the progress of streaming a snapshot over the network
// because this too can take a long time.
countingRPCReader := newCountingReader(rpc.Reader)

// Spill the remote snapshot to disk
n, err := io.Copy(sink, rpc.Reader)
transferMonitor := startSnapshotRestoreMonitor(r.logger, countingRPCReader, req.Size, true)
n, err := io.Copy(sink, countingRPCReader)
transferMonitor.StopAndWait()
if err != nil {
sink.Cancel()
r.logger.Error("failed to copy snapshot", "error", err)
Expand Down

0 comments on commit 3cb47c5

Please sign in to comment.