Skip to content

Commit

Permalink
Cherry-pick PRs for 2.10.15-RC.5 (#5376)
Browse files Browse the repository at this point in the history
Includes:

- #5366
- #5359
- #5369
- #5371
- #5370
- #5377
- #5372
  • Loading branch information
wallyqs committed May 1, 2024
2 parents d561f49 + 56ea6c6 commit 9d29e8d
Show file tree
Hide file tree
Showing 7 changed files with 451 additions and 135 deletions.
52 changes: 30 additions & 22 deletions server/filestore.go
Expand Up @@ -52,6 +52,8 @@ type FileStoreConfig struct {
BlockSize uint64
// CacheExpire is how long with no activity until we expire the cache.
CacheExpire time.Duration
// SubjectStateExpire is how long with no activity until we expire a msg block's subject state.
SubjectStateExpire time.Duration
// SyncInterval is how often we sync to disk in the background.
SyncInterval time.Duration
// SyncAlways is when the stream should sync all data writes.
Expand Down Expand Up @@ -222,6 +224,7 @@ type msgBlock struct {
cache *cache
cloads uint64
cexp time.Duration
fexp time.Duration
ctmr *time.Timer
werr error
dmap avl.SequenceSet
Expand Down Expand Up @@ -296,6 +299,8 @@ const (
defaultSyncInterval = 2 * time.Minute
// default idle timeout to close FDs.
closeFDsIdle = 30 * time.Second
// default expiration time for mb.fss when idle.
defaultFssExpiration = 10 * time.Second
// coalesceMinimum
coalesceMinimum = 16 * 1024
// maxFlushWait is maximum we will wait to gather messages to flush.
Expand Down Expand Up @@ -359,6 +364,9 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
if fcfg.CacheExpire == 0 {
fcfg.CacheExpire = defaultCacheBufferExpiration
}
if fcfg.SubjectStateExpire == 0 {
fcfg.SubjectStateExpire = defaultFssExpiration
}
if fcfg.SyncInterval == 0 {
fcfg.SyncInterval = defaultSyncInterval
}
Expand Down Expand Up @@ -877,7 +885,14 @@ func (fs *fileStore) noTrackSubjects() bool {

// Will init the basics for a message block.
func (fs *fileStore) initMsgBlock(index uint32) *msgBlock {
mb := &msgBlock{fs: fs, index: index, cexp: fs.fcfg.CacheExpire, noTrack: fs.noTrackSubjects(), syncAlways: fs.fcfg.SyncAlways}
mb := &msgBlock{
fs: fs,
index: index,
cexp: fs.fcfg.CacheExpire,
fexp: fs.fcfg.SubjectStateExpire,
noTrack: fs.noTrackSubjects(),
syncAlways: fs.fcfg.SyncAlways,
}

mdir := filepath.Join(fs.fcfg.StoreDir, msgDir)
mb.mfn = filepath.Join(mdir, fmt.Sprintf(blkScan, index))
Expand Down Expand Up @@ -4430,9 +4445,16 @@ func (mb *msgBlock) clearCacheAndOffset() {

// Lock should be held.
func (mb *msgBlock) clearCache() {
if mb.ctmr != nil && mb.fss == nil {
mb.ctmr.Stop()
mb.ctmr = nil
if mb.ctmr != nil {
tsla := mb.sinceLastActivity()
if mb.fss == nil || tsla > mb.fexp {
// Force
mb.fss = nil
mb.ctmr.Stop()
mb.ctmr = nil
} else {
mb.resetCacheExpireTimer(mb.fexp - tsla)
}
}

if mb.cache == nil {
Expand Down Expand Up @@ -4497,7 +4519,7 @@ func (mb *msgBlock) tryExpireWriteCache() []byte {

// Lock should be held.
func (mb *msgBlock) expireCacheLocked() {
if mb.cache == nil {
if mb.cache == nil && mb.fss == nil {
if mb.ctmr != nil {
mb.ctmr.Stop()
mb.ctmr = nil
Expand Down Expand Up @@ -5105,7 +5127,6 @@ func (fs *fileStore) syncBlocks() {
}
blks := append([]*msgBlock(nil), fs.blks...)
lmb := fs.lmb
syncInterval := fs.fcfg.SyncInterval
fs.mu.RUnlock()

var markDirty bool
Expand All @@ -5120,11 +5141,6 @@ func (fs *fileStore) syncBlocks() {
if mb.mfd != nil && mb.sinceLastWriteActivity() > closeFDsIdle {
mb.dirtyCloseWithRemove(false)
}
// Check our fss subject metadata.
// If we have no activity within sync interval remove.
if mb.fssLoaded() && mb.sinceLastActivity() > syncInterval {
mb.fss = nil
}

// Check if we should compact here as well.
// Do not compact last mb.
Expand Down Expand Up @@ -5591,12 +5607,6 @@ func (mb *msgBlock) fssNotLoaded() bool {
return mb.fss == nil && !mb.noTrack
}

// Report if we have our fss loaded.
// Lock should be held.
func (mb *msgBlock) fssLoaded() bool {
return mb.fss != nil
}

// Wrap openBlock for the gated semaphore processing.
// Lock should be held
func (mb *msgBlock) openBlock() (*os.File, error) {
Expand Down Expand Up @@ -7154,8 +7164,6 @@ func (mb *msgBlock) dirtyCloseWithRemove(remove bool) {
mb.ctmr.Stop()
mb.ctmr = nil
}
// Clear any tracking by subject.
mb.fss = nil
// Close cache
mb.clearCacheAndOffset()
// Quit our loops.
Expand All @@ -7168,6 +7176,8 @@ func (mb *msgBlock) dirtyCloseWithRemove(remove bool) {
mb.mfd = nil
}
if remove {
// Clear any tracking by subject if we are removing.
mb.fss = nil
if mb.mfn != _EMPTY_ {
os.Remove(mb.mfn)
mb.mfn = _EMPTY_
Expand Down Expand Up @@ -7490,9 +7500,7 @@ const (
fullStateVersion = uint8(1)
)

// This go routine runs and receives kicks to write out our full stream state index.
// This will get kicked when we create a new block or when we delete a block in general.
// This is also called during Stop().
// This go routine periodically writes out our full stream state index.
func (fs *fileStore) flushStreamStateLoop(qch, done chan struct{}) {
// Signal we are done on exit.
defer close(done)
Expand Down
58 changes: 47 additions & 11 deletions server/filestore_test.go
Expand Up @@ -3793,8 +3793,8 @@ func (fs *fileStore) reportMeta() (hasPSIM, hasAnyFSS bool) {
func TestFileStoreExpireSubjectMeta(t *testing.T) {
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
fcfg.BlockSize = 1024
fcfg.CacheExpire = time.Second
fcfg.SyncInterval = time.Second
fcfg.CacheExpire = 500 * time.Millisecond
fcfg.SubjectStateExpire = time.Second
cfg := StreamConfig{Name: "zzz", Subjects: []string{"kv.>"}, Storage: FileStorage, MaxMsgsPer: 1}
fs, err := newFileStoreWithCreated(fcfg, cfg, time.Now(), prf(&fcfg), nil)
require_NoError(t, err)
Expand All @@ -3821,7 +3821,7 @@ func TestFileStoreExpireSubjectMeta(t *testing.T) {
}

// Make sure we clear mb fss meta
checkFor(t, 10*time.Second, 500*time.Millisecond, func() error {
checkFor(t, fcfg.SubjectStateExpire*2, 500*time.Millisecond, func() error {
if _, hasAnyFSS := fs.reportMeta(); hasAnyFSS {
return fmt.Errorf("Still have mb fss state")
}
Expand All @@ -3832,7 +3832,7 @@ func TestFileStoreExpireSubjectMeta(t *testing.T) {
_, err = fs.LoadLastMsg("kv.22", nil)
require_NoError(t, err)
// Make sure we clear mb fss meta
checkFor(t, 10*time.Second, 500*time.Millisecond, func() error {
checkFor(t, fcfg.SubjectStateExpire*2, 500*time.Millisecond, func() error {
if _, hasAnyFSS := fs.reportMeta(); hasAnyFSS {
return fmt.Errorf("Still have mb fss state")
}
Expand Down Expand Up @@ -3923,7 +3923,7 @@ func TestFileStoreSubjectStateCacheExpiration(t *testing.T) {
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
fcfg.BlockSize = 32
fcfg.CacheExpire = time.Second
fcfg.SyncInterval = time.Second
fcfg.SubjectStateExpire = time.Second
cfg := StreamConfig{Name: "zzz", Subjects: []string{"kv.>"}, Storage: FileStorage, MaxMsgsPer: 2}
fs, err := newFileStoreWithCreated(fcfg, cfg, time.Now(), prf(&fcfg), nil)
require_NoError(t, err)
Expand Down Expand Up @@ -6336,7 +6336,7 @@ func TestFileStorePurgeExBufPool(t *testing.T) {
func TestFileStoreFSSMeta(t *testing.T) {
sd := t.TempDir()
fs, err := newFileStore(
FileStoreConfig{StoreDir: sd, BlockSize: 100, CacheExpire: 200 * time.Millisecond, SyncInterval: time.Second},
FileStoreConfig{StoreDir: sd, BlockSize: 100, CacheExpire: 200 * time.Millisecond, SubjectStateExpire: time.Second},
StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()
Expand All @@ -6358,23 +6358,22 @@ func TestFileStoreFSSMeta(t *testing.T) {
require_NoError(t, err)
require_Equal(t, p, 2)

// Make sure cache is not loaded but fss state still is.
var stillHasCache, noFSS bool
// Make sure cache is not loaded.
var stillHasCache bool
fs.mu.RLock()
for _, mb := range fs.blks {
mb.mu.RLock()
stillHasCache = stillHasCache || mb.cacheAlreadyLoaded()
noFSS = noFSS || mb.fssNotLoaded()
mb.mu.RUnlock()
}
fs.mu.RUnlock()

require_False(t, stillHasCache)
require_False(t, noFSS)

// Let fss expire via syncInterval.
// Let fss expire via SubjectStateExpire.
time.Sleep(time.Second)

var noFSS bool
fs.mu.RLock()
for _, mb := range fs.blks {
mb.mu.RLock()
Expand Down Expand Up @@ -6688,6 +6687,43 @@ func TestFileStoreWriteFullStateAfterPurgeEx(t *testing.T) {
require_Equal(t, ss.LastSeq, 10)
}

func TestFileStoreMB_FSS_Expire(t *testing.T) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: t.TempDir(), BlockSize: 8192, CacheExpire: 1 * time.Second, SyncInterval: 2 * time.Second},
StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, MaxMsgsPer: 1, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

msg := []byte("abc")
for i := 1; i <= 1000; i++ {
fs.StoreMsg(fmt.Sprintf("foo.%d", i), nil, msg)
}
// Flush fss by hand, cache should be flushed as well.
fs.mu.RLock()
for _, mb := range fs.blks {
mb.mu.Lock()
mb.fss = nil
mb.mu.Unlock()
}
fs.mu.RUnlock()

fs.StoreMsg("foo.11", nil, msg)
time.Sleep(900 * time.Millisecond)
// This should keep fss alive in the first block..
// As well as cache itself due to remove activity.
fs.StoreMsg("foo.22", nil, msg)
time.Sleep(300 * time.Millisecond)
// Check that fss and the cache are still loaded.
fs.mu.RLock()
mb := fs.blks[0]
fs.mu.RUnlock()
mb.mu.RLock()
cache, fss := mb.cache, mb.fss
mb.mu.RUnlock()
require_True(t, fss != nil)
require_True(t, cache != nil)
}

///////////////////////////////////////////////////////////////////////////
// Benchmarks
///////////////////////////////////////////////////////////////////////////
Expand Down
2 changes: 1 addition & 1 deletion server/jetstream_cluster.go
Expand Up @@ -7497,7 +7497,7 @@ func encodeStreamMsg(subject, reply string, hdr, msg []byte, lseq uint64, ts int

// Threshold for compression.
// TODO(dlc) - Eventually make configurable.
const compressThreshold = 256
const compressThreshold = 8192 // 8k

// If allowed and contents over the threshold we will compress.
func encodeStreamMsgAllowCompress(subject, reply string, hdr, msg []byte, lseq uint64, ts int64, compressOK bool) []byte {
Expand Down
2 changes: 1 addition & 1 deletion server/jetstream_cluster_2_test.go
Expand Up @@ -1501,7 +1501,7 @@ func TestJetStreamClusterMirrorAndSourceExpiration(t *testing.T) {
sendBatch(100)
// Need to check both in parallel.
scheck, mcheck := uint64(0), uint64(0)
checkFor(t, 10*time.Second, 50*time.Millisecond, func() error {
checkFor(t, 20*time.Second, 500*time.Millisecond, func() error {
if scheck != 100 {
if si, _ := js.StreamInfo("S"); si != nil {
scheck = si.State.Msgs
Expand Down
6 changes: 5 additions & 1 deletion server/jetstream_cluster_4_test.go
Expand Up @@ -23,6 +23,7 @@ import (
"math/rand"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -240,7 +241,7 @@ func TestJetStreamClusterSourceWorkingQueueWithLimit(t *testing.T) {

sendBatch := func(subject string, n int) {
for i := 0; i < n; i++ {
_, err = js.Publish(subject, []byte("OK"))
_, err = js.Publish(subject, []byte(strconv.Itoa(i)))
require_NoError(t, err)
}
}
Expand All @@ -266,6 +267,9 @@ func TestJetStreamClusterSourceWorkingQueueWithLimit(t *testing.T) {
for i := 0; i < 300; i++ {
m, err := ss.Fetch(1, nats.MaxWait(3*time.Second))
require_NoError(t, err)
p, err := strconv.Atoi(string(m[0].Data))
require_NoError(t, err)
require_Equal(t, p, i)
time.Sleep(11 * time.Millisecond)
err = m[0].Ack()
require_NoError(t, err)
Expand Down

0 comments on commit 9d29e8d

Please sign in to comment.