Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lockfree fixed size sublist cache #4451

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
71 changes: 21 additions & 50 deletions server/sublist.go
Expand Up @@ -47,8 +47,6 @@ var (
const (
// cacheMax is used to bound limit the frontend cache
slCacheMax = 1024
// If we run a sweeper we will drain to this count.
slCacheSweep = 256
// plistMin is our lower bounds to create a fast plist for Match.
plistMin = 256
)
Expand All @@ -68,8 +66,7 @@ type Sublist struct {
inserts uint64
removes uint64
root *level
cache map[string]*SublistResult
ccSweep int32
cache *Cache
notify *notifyMaps
count uint32
}
Expand Down Expand Up @@ -114,7 +111,7 @@ func newLevel() *level {
// NewSublist will create a default sublist with caching enabled per the flag.
func NewSublist(enableCache bool) *Sublist {
if enableCache {
return &Sublist{root: newLevel(), cache: make(map[string]*SublistResult)}
return &Sublist{root: newLevel(), cache: NewCache(MemHashString)}
}
return &Sublist{root: newLevel()}
}
Expand Down Expand Up @@ -489,16 +486,17 @@ func (s *Sublist) addToCache(subject string, sub *subscription) {
}
// If literal we can direct match.
if subjectIsLiteral(subject) {
if r := s.cache[subject]; r != nil {
s.cache[subject] = r.addSubToResult(sub)
if r, _ := s.cache.Get(subject); r != nil {
s.cache.Set(subject, r.addSubToResult(sub))
}
return
}
for key, r := range s.cache {

s.cache.Iterate(func(key string, value *SublistResult) {
if matchLiteral(key, subject) {
s.cache[key] = r.addSubToResult(sub)
s.cache.Set(key, value.addSubToResult(sub))
}
}
})
}

// removeFromCache will remove the sub from any active cache entries.
Expand All @@ -509,15 +507,15 @@ func (s *Sublist) removeFromCache(subject string, sub *subscription) {
}
// If literal we can direct match.
if subjectIsLiteral(subject) {
delete(s.cache, subject)
s.cache.Delete(subject)
return
}
// Wildcard here.
for key := range s.cache {
s.cache.Iterate(func(key string, value *SublistResult) {
if matchLiteral(key, subject) {
delete(s.cache, key)
s.cache.Delete(key)
}
}
})
}

// a place holder for an empty result.
Expand All @@ -537,13 +535,9 @@ func (s *Sublist) match(subject string, doLock bool) *SublistResult {
atomic.AddUint64(&s.matches, 1)

// Check cache first.
if doLock {
s.RLock()
}
r, ok := s.cache[subject]
if doLock {
s.RUnlock()
}

r, ok := s.cache.Get(subject)

if ok {
atomic.AddUint64(&s.cacheHits, 1)
return r
Expand Down Expand Up @@ -571,8 +565,6 @@ func (s *Sublist) match(subject string, doLock bool) *SublistResult {

// Get result from the main structure and place into the shared cache.
// Hold the read lock to avoid race between match and store.
var n int

if doLock {
s.Lock()
}
Expand All @@ -583,36 +575,15 @@ func (s *Sublist) match(subject string, doLock bool) *SublistResult {
result = emptyResult
}
if s.cache != nil {
s.cache[subject] = result
n = len(s.cache)
s.cache.Set(subject, result)
}
if doLock {
s.Unlock()
}

// Reduce the cache count if we have exceeded our set maximum.
if n > slCacheMax && atomic.CompareAndSwapInt32(&s.ccSweep, 0, 1) {
go s.reduceCacheCount()
}

return result
}

// Remove entries in the cache until we are under the maximum.
// TODO(dlc) this could be smarter now that its not inline.
func (s *Sublist) reduceCacheCount() {
defer atomic.StoreInt32(&s.ccSweep, 0)
// If we are over the cache limit randomly drop until under the limit.
s.Lock()
for key := range s.cache {
delete(s.cache, key)
if len(s.cache) <= slCacheSweep {
break
}
}
s.Unlock()
}

// Helper function for auto-expanding remote qsubs.
func isRemoteQSub(sub *subscription) bool {
return sub != nil && sub.queue != nil && sub.client != nil && (sub.client.kind == ROUTER || sub.client.kind == LEAF)
Expand Down Expand Up @@ -832,7 +803,7 @@ func (s *Sublist) RemoveBatch(subs []*subscription) error {
// Turn caching back on here.
atomic.AddUint64(&s.genid, 1)
if wasEnabled {
s.cache = make(map[string]*SublistResult)
s.cache = NewCache(MemHashString)
}
return err
}
Expand Down Expand Up @@ -914,7 +885,7 @@ func (s *Sublist) Count() uint32 {
// CacheCount returns the number of result sets in the cache.
func (s *Sublist) CacheCount() int {
s.RLock()
cc := len(s.cache)
cc := s.cache.Len()
s.RUnlock()
return cc
}
Expand Down Expand Up @@ -963,7 +934,7 @@ func (s *Sublist) Stats() *SublistStats {

s.RLock()
cache := s.cache
cc := len(s.cache)
cc := s.CacheCount()
st.NumSubs = s.count
st.NumInserts = s.inserts
st.NumRemoves = s.removes
Expand All @@ -981,14 +952,14 @@ func (s *Sublist) Stats() *SublistStats {
if cache != nil {
tot, max, clen := 0, 0, 0
s.RLock()
for _, r := range s.cache {
cache.Iterate(func(key string, r *SublistResult) {
clen++
l := len(r.psubs) + len(r.qsubs)
tot += l
if l > max {
max = l
}
}
})
s.RUnlock()
st.totFanout = tot
st.cacheCnt = clen
Expand Down
101 changes: 101 additions & 0 deletions server/sublist_cache.go
@@ -0,0 +1,101 @@
// Copyright 2016-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
"sync/atomic"
"unsafe"
)

type Cache struct {
data [slCacheMax]atomic.Pointer[cacheEntry]
hashFunc func(string) int
}

type cacheEntry struct {
key string
value *SublistResult
}

type stringStruct struct {
str unsafe.Pointer
len int
}

//go:noescape
//go:linkname memhash runtime.memhash
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While the idea of the PR is interesting and the results promising, relying on go:linkname in this way is dangerous as it ties us into the behaviour of a specific Go compiler implementation. This probably means that NATS Server would no longer compile using gccgo for example.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it to use the maphash package (what is already used in the server).
The performance seems to be even a little bit better.

Sidenote:

the maphash package does the same here: https://cs.opensource.google/go/go/+/refs/tags/go1.21.0:src/hash/maphash/maphash_runtime.go

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for switching to the maphash package, indeed it does call out with go:linkname in the happy case, but more importantly, it has a purego fallback.

func memhash(p unsafe.Pointer, h, s uintptr) uintptr

// MemHashString is the hash function used by go map, it utilizes available hardware instructions
func MemHashString(str string) int {
ss := (*stringStruct)(unsafe.Pointer(&str))
return int(uint64(memhash(ss.str, 0, uintptr(ss.len))) % slCacheMax)
}

func NewCache(hashFunc func(string) int) *Cache {
return &Cache{
hashFunc: hashFunc,
}
}

func (c *Cache) Set(key string, value *SublistResult) {
if c == nil {
return
}

index := c.hashFunc(key)
entry := &cacheEntry{key: key, value: value}
c.data[index].Store(entry)
}

func (c *Cache) Get(key string) (*SublistResult, bool) {
if c == nil {
return nil, false
}

index := c.hashFunc(key)
entry := c.data[index].Load()
if entry != nil {
if entry.key == key {
return entry.value, true
}
}
return nil, false
}

func (c *Cache) Delete(key string) {
index := c.hashFunc(key)
c.data[index].Store(nil)
}

func (c *Cache) Len() int {
var cc int
c.Iterate(func(key string, value *SublistResult) {
cc++
})
return cc
}

func (c *Cache) Iterate(cf func(key string, value *SublistResult)) {
if c == nil {
return
}

for i := range c.data {
p := c.data[i].Load()
if p != nil {
cf(p.key, p.value)
}
}
}