Skip to content

Commit

Permalink
TEMP
Browse files Browse the repository at this point in the history
  • Loading branch information
mprimi committed Jan 3, 2024
1 parent 8dcb801 commit e92592e
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 4 deletions.
4 changes: 3 additions & 1 deletion server/jetstream_helpers_test.go
Expand Up @@ -24,6 +24,7 @@ import (
"net"
"net/url"
"os"
"path/filepath"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -785,8 +786,9 @@ func createJetStreamClusterEx(t testing.TB, tmpl, cName, snPre string, numServer
}

for cp := portStart; cp < portStart+numServers; cp++ {
storeDir := t.TempDir()
sn := fmt.Sprintf("%sS-%d", snPre, cp-portStart+1)
storeDir := filepath.Join(t.TempDir(), sn)
fmt.Printf("Store dir: %s => %s\n", sn, storeDir)
conf := fmt.Sprintf(tmpl, sn, storeDir, cName, cp, routeConfig)
if modify != nil {
conf = modify(sn, cName, storeDir, conf)
Expand Down
5 changes: 5 additions & 0 deletions server/raft.go
Expand Up @@ -1123,6 +1123,7 @@ func (n *raft) setupLastSnapshot() {
sfile := filepath.Join(snapDir, sf.Name())
var term, index uint64
term, index, err := termAndIndexFromSnapFile(sf.Name())
fmt.Printf("Found snapshot [term: %d, index: %d] (%s)\n", term, index, sf.Name())
if err == nil {
if term > lterm {
lterm, lindex = term, index
Expand Down Expand Up @@ -1179,11 +1180,14 @@ func (n *raft) setupLastSnapshot() {
if _, err := n.wal.Compact(snap.lastIndex + 1); err != nil {
n.setWriteErrLocked(err)
}
fmt.Printf("Loaded last snapshot successfully\n")
}

// loadLastSnapshot will load and return our last snapshot.
// Lock should be held.
func (n *raft) loadLastSnapshot() (*snapshot, error) {
fmt.Printf("Loading last snapshot: %s\n", n.snapfile)

if n.snapfile == _EMPTY_ {
return nil, errNoSnapAvailable
}
Expand Down Expand Up @@ -1231,6 +1235,7 @@ func (n *raft) loadLastSnapshot() (*snapshot, error) {
return nil, errSnapshotCorrupt
}

fmt.Printf("Loaded last snapshot: %s\n", n.snapfile)
return snap, nil
}

Expand Down
14 changes: 11 additions & 3 deletions server/raft_helpers_test.go
Expand Up @@ -24,6 +24,7 @@ import (
"hash"
"hash/crc32"
"math/rand"
"path/filepath"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -112,12 +113,15 @@ func (c *cluster) createRaftGroupWithPeers(name string, servers []*Server, smf s
}

for _, s := range servers {
baseDir := filepath.Join(c.t.TempDir(), fmt.Sprintf("%s-%s", name, s.Name()))
logDir := filepath.Join(baseDir, "log")
storeDir := filepath.Join(baseDir, "store")
fs, err := newFileStore(
FileStoreConfig{StoreDir: c.t.TempDir(), BlockSize: defaultMediumBlockSize, AsyncFlush: false, SyncInterval: 5 * time.Minute},
FileStoreConfig{StoreDir: logDir, BlockSize: defaultMediumBlockSize, AsyncFlush: false, SyncInterval: 5 * time.Minute},
StreamConfig{Name: name, Storage: FileStorage},
)
require_NoError(c.t, err)
cfg := &RaftConfig{Name: name, Store: c.t.TempDir(), Log: fs}
cfg := &RaftConfig{Name: name, Store: storeDir, Log: fs}
s.bootstrapRaftNode(cfg, peers, true)
n, err := s.startRaftNode(globalAccountName, cfg, pprofLabels{})
require_NoError(c.t, err)
Expand Down Expand Up @@ -489,7 +493,11 @@ func (sm *raftChainStateMachine) snapshot() {
return
}

sm.logDebug("Snapshot (with %d blocks applied)", sm.blocksApplied)
sm.logDebug(
"Snapshot (with %d blocks applied, %d since last snapshot)",
sm.blocksApplied,
sm.blocksAppliedSinceSnapshot,
)

// Serialize the internal state of the hash block
serializedHash, err := sm.hash.(encoding.BinaryMarshaler).MarshalBinary()
Expand Down

0 comments on commit e92592e

Please sign in to comment.