Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lockfree fixed size sublist cache #4451

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
71 changes: 21 additions & 50 deletions server/sublist.go
Expand Up @@ -47,8 +47,6 @@ var (
const (
// cacheMax is used to bound limit the frontend cache
slCacheMax = 1024
// If we run a sweeper we will drain to this count.
slCacheSweep = 256
// plistMin is our lower bounds to create a fast plist for Match.
plistMin = 256
)
Expand All @@ -68,8 +66,7 @@ type Sublist struct {
inserts uint64
removes uint64
root *level
cache map[string]*SublistResult
ccSweep int32
cache *Cache
notify *notifyMaps
count uint32
}
Expand Down Expand Up @@ -114,7 +111,7 @@ func newLevel() *level {
// NewSublist will create a default sublist with caching enabled per the flag.
func NewSublist(enableCache bool) *Sublist {
if enableCache {
return &Sublist{root: newLevel(), cache: make(map[string]*SublistResult)}
return &Sublist{root: newLevel(), cache: NewCache()}
}
return &Sublist{root: newLevel()}
}
Expand Down Expand Up @@ -489,16 +486,17 @@ func (s *Sublist) addToCache(subject string, sub *subscription) {
}
// If literal we can direct match.
if subjectIsLiteral(subject) {
if r := s.cache[subject]; r != nil {
s.cache[subject] = r.addSubToResult(sub)
if r, _ := s.cache.Get(subject); r != nil {
s.cache.Set(subject, r.addSubToResult(sub))
}
return
}
for key, r := range s.cache {

s.cache.Iterate(func(key string, value *SublistResult) {
if matchLiteral(key, subject) {
s.cache[key] = r.addSubToResult(sub)
s.cache.Set(key, value.addSubToResult(sub))
}
}
})
}

// removeFromCache will remove the sub from any active cache entries.
Expand All @@ -509,15 +507,15 @@ func (s *Sublist) removeFromCache(subject string, sub *subscription) {
}
// If literal we can direct match.
if subjectIsLiteral(subject) {
delete(s.cache, subject)
s.cache.Delete(subject)
return
}
// Wildcard here.
for key := range s.cache {
s.cache.Iterate(func(key string, value *SublistResult) {
if matchLiteral(key, subject) {
delete(s.cache, key)
s.cache.Delete(key)
}
}
})
}

// a place holder for an empty result.
Expand All @@ -537,13 +535,9 @@ func (s *Sublist) match(subject string, doLock bool) *SublistResult {
atomic.AddUint64(&s.matches, 1)

// Check cache first.
if doLock {
s.RLock()
}
r, ok := s.cache[subject]
if doLock {
s.RUnlock()
}

r, ok := s.cache.Get(subject)

if ok {
atomic.AddUint64(&s.cacheHits, 1)
return r
Expand Down Expand Up @@ -571,8 +565,6 @@ func (s *Sublist) match(subject string, doLock bool) *SublistResult {

// Get result from the main structure and place into the shared cache.
// Hold the read lock to avoid race between match and store.
var n int

if doLock {
s.Lock()
}
Expand All @@ -583,36 +575,15 @@ func (s *Sublist) match(subject string, doLock bool) *SublistResult {
result = emptyResult
}
if s.cache != nil {
s.cache[subject] = result
n = len(s.cache)
s.cache.Set(subject, result)
}
if doLock {
s.Unlock()
}

// Reduce the cache count if we have exceeded our set maximum.
if n > slCacheMax && atomic.CompareAndSwapInt32(&s.ccSweep, 0, 1) {
go s.reduceCacheCount()
}

return result
}

// Remove entries in the cache until we are under the maximum.
// TODO(dlc) this could be smarter now that its not inline.
func (s *Sublist) reduceCacheCount() {
defer atomic.StoreInt32(&s.ccSweep, 0)
// If we are over the cache limit randomly drop until under the limit.
s.Lock()
for key := range s.cache {
delete(s.cache, key)
if len(s.cache) <= slCacheSweep {
break
}
}
s.Unlock()
}

// Helper function for auto-expanding remote qsubs.
func isRemoteQSub(sub *subscription) bool {
return sub != nil && sub.queue != nil && sub.client != nil && (sub.client.kind == ROUTER || sub.client.kind == LEAF)
Expand Down Expand Up @@ -832,7 +803,7 @@ func (s *Sublist) RemoveBatch(subs []*subscription) error {
// Turn caching back on here.
atomic.AddUint64(&s.genid, 1)
if wasEnabled {
s.cache = make(map[string]*SublistResult)
s.cache = NewCache()
}
return err
}
Expand Down Expand Up @@ -914,7 +885,7 @@ func (s *Sublist) Count() uint32 {
// CacheCount returns the number of result sets in the cache.
func (s *Sublist) CacheCount() int {
s.RLock()
cc := len(s.cache)
cc := s.cache.Len()
s.RUnlock()
return cc
}
Expand Down Expand Up @@ -963,7 +934,7 @@ func (s *Sublist) Stats() *SublistStats {

s.RLock()
cache := s.cache
cc := len(s.cache)
cc := s.CacheCount()
st.NumSubs = s.count
st.NumInserts = s.inserts
st.NumRemoves = s.removes
Expand All @@ -981,14 +952,14 @@ func (s *Sublist) Stats() *SublistStats {
if cache != nil {
tot, max, clen := 0, 0, 0
s.RLock()
for _, r := range s.cache {
cache.Iterate(func(key string, r *SublistResult) {
clen++
l := len(r.psubs) + len(r.qsubs)
tot += l
if l > max {
max = l
}
}
})
s.RUnlock()
st.totFanout = tot
st.cacheCnt = clen
Expand Down
91 changes: 91 additions & 0 deletions server/sublist_cache.go
@@ -0,0 +1,91 @@
// Copyright 2016-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
"hash/maphash"
"sync/atomic"
)

type Cache struct {
data [slCacheMax]atomic.Pointer[cacheEntry]
hashFunc func(string) int
}

type cacheEntry struct {
key string
value *SublistResult
}

func NewCache() *Cache {
seed := maphash.MakeSeed()
hfunc := func(str string) int {
return int(maphash.String(seed, str) % slCacheMax)
}

return &Cache{
hashFunc: hfunc,
}
}

func (c *Cache) Set(key string, value *SublistResult) {
if c == nil {
return
}

index := c.hashFunc(key)
entry := &cacheEntry{key: key, value: value}
c.data[index].Store(entry)
}

func (c *Cache) Get(key string) (*SublistResult, bool) {
if c == nil {
return nil, false
}

index := c.hashFunc(key)
entry := c.data[index].Load()
if entry != nil {
if entry.key == key {
return entry.value, true
}
}
return nil, false
}

func (c *Cache) Delete(key string) {
index := c.hashFunc(key)
c.data[index].Store(nil)
}

func (c *Cache) Len() int {
var cc int
c.Iterate(func(key string, value *SublistResult) {
cc++
})
return cc
}

func (c *Cache) Iterate(cf func(key string, value *SublistResult)) {
if c == nil {
return
}

for i := range c.data {
p := c.data[i].Load()
if p != nil {
cf(p.key, p.value)
}
}
}