Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: Introduce subject package #4886

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
153 changes: 84 additions & 69 deletions server/consumer.go
Expand Up @@ -29,6 +29,7 @@ import (
"time"

"github.com/nats-io/nats-server/v2/server/avl"
"github.com/nats-io/nats-server/v2/subject"
"github.com/nats-io/nuid"
"golang.org/x/time/rate"
)
Expand Down Expand Up @@ -386,21 +387,19 @@ type consumer struct {

// A single subject filter.
type subjectFilter struct {
subject string
nextSeq uint64
currentSeq uint64
pmsg *jsPubMsg
err error
hasWildcard bool
tokenizedSubject []string
subject *subject.Subject
nextSeq uint64
currentSeq uint64
pmsg *jsPubMsg
err error
}

type subjectFilters []*subjectFilter

// subjects is a helper function used for updating consumers.
// It is not used and should not be used in hotpath.
func (s subjectFilters) subjects() []string {
subjects := make([]string, 0, len(s))
func (s subjectFilters) subjects() []*subject.Subject {
subjects := make([]*subject.Subject, 0, len(s))
for _, filter := range s {
subjects = append(subjects, filter.subject)
}
Expand Down Expand Up @@ -591,15 +590,15 @@ func checkConsumerCfg(
return NewJSConsumerEmptyFilterError()
}
}
subjectFilters := gatherSubjectFilters(config.FilterSubject, config.FilterSubjects)
subjectFilters, err := gatherSubjectFilters(config.FilterSubject, config.FilterSubjects)
if err != nil {
return NewJSStreamInvalidConfigError(err)
}

// Check subject filters overlap.
for outer, subject := range subjectFilters {
if !IsValidSubject(subject) {
return NewJSStreamInvalidConfigError(ErrBadSubject)
}
for inner, ssubject := range subjectFilters {
if inner != outer && subjectIsSubsetMatch(subject, ssubject) {
if inner != outer && subject.IsSubsetMatch(ssubject) {
return NewJSConsumerOverlappingSubjectFiltersError()
}
}
Expand Down Expand Up @@ -780,7 +779,10 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
}
// Check for overlapping subjects.
if mset.cfg.Retention == WorkQueuePolicy {
subjects := gatherSubjectFilters(config.FilterSubject, config.FilterSubjects)
subjects, err := gatherSubjectFilters(config.FilterSubject, config.FilterSubjects)
if err != nil {
return nil, NewJSConsumerCreateError(err, Unless(err))
}
if !mset.partitionUnique(cName, subjects) {
return nil, NewJSConsumerWQConsumerNotUniqueError()
}
Expand Down Expand Up @@ -819,7 +821,11 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
}

if len(mset.consumers) > 0 {
subjects := gatherSubjectFilters(config.FilterSubject, config.FilterSubjects)
subjects, err := gatherSubjectFilters(config.FilterSubject, config.FilterSubjects)
if err != nil {
mset.mu.Unlock()
return nil, NewJSConsumerCreateError(err, Unless(err))
}
if len(subjects) == 0 {
mset.mu.Unlock()
return nil, NewJSConsumerWQMultipleUnfilteredError()
Expand Down Expand Up @@ -934,12 +940,14 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
o.store = store
}

subjects := gatherSubjectFilters(o.cfg.FilterSubject, o.cfg.FilterSubjects)
subjects, err := gatherSubjectFilters(o.cfg.FilterSubject, o.cfg.FilterSubjects)
if err != nil {
mset.mu.Unlock()
return nil, NewJSConsumerCreateError(err, Unless(err))
}
for _, filter := range subjects {
sub := &subjectFilter{
subject: filter,
hasWildcard: subjectHasWildcard(filter),
tokenizedSubject: tokenizeSubjectIntoSlice(nil, filter),
subject: filter,
}
o.subjf = append(o.subjf, sub)
}
Expand Down Expand Up @@ -1855,14 +1863,15 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error {
}

// Check for Subject Filters update.
newSubjects := gatherSubjectFilters(cfg.FilterSubject, cfg.FilterSubjects)
newSubjects, err := gatherSubjectFilters(cfg.FilterSubject, cfg.FilterSubjects)
if err != nil {
return NewJSConsumerCreateError(err, Unless(err))
}
if !subjectSliceEqual(newSubjects, o.subjf.subjects()) {
newSubjf := make(subjectFilters, 0, len(newSubjects))
for _, newFilter := range newSubjects {
fs := &subjectFilter{
subject: newFilter,
hasWildcard: subjectHasWildcard(newFilter),
tokenizedSubject: tokenizeSubjectIntoSlice(nil, newFilter),
subject: newFilter,
}
// If given subject was present, we will retain its fields values
// so `getNextMgs` can take advantage of already buffered `pmsgs`.
Expand Down Expand Up @@ -2781,7 +2790,7 @@ func (o *consumer) isFiltered() bool {
// Here we avoid iteration over slices if there is only one subject in stream
// and one filter for the consumer.
if len(mset.cfg.Subjects) == 1 && len(o.subjf) == 1 {
return mset.cfg.Subjects[0] != o.subjf[0].subject
return mset.cfg.Subjects[0] != o.subjf[0].subject.String()
}

// if the list is not equal length, we can return early, as this is filtered.
Expand All @@ -2795,7 +2804,7 @@ func (o *consumer) isFiltered() bool {
// so it can't be used here.
cfilters := make(map[string]struct{}, len(o.subjf))
for _, val := range o.subjf {
cfilters[val.subject] = struct{}{}
cfilters[val.subject.String()] = struct{}{}
}
for _, val := range mset.cfg.Subjects {
if _, ok := cfilters[val]; !ok {
Expand Down Expand Up @@ -3358,17 +3367,17 @@ func (o *consumer) isFilteredMatch(subj string) bool {
return true
}
for _, filter := range o.subjf {
if !filter.hasWildcard && subj == filter.subject {
if filter.subject.EqualsLiteral(subj) {
return true
}
}
// It's quicker to first check for non-wildcard filters, then
// iterate again to check for subset match.
tsa := [32]string{}
tts := tokenizeSubjectIntoSlice(tsa[:0], subj)
for _, filter := range o.subjf {
if isSubsetMatchTokenized(tts, filter.tokenizedSubject) {
return true
if s, err := subject.Stack(subj); err == nil {
for _, filter := range o.subjf {
if s.IsSubsetMatch(filter.subject) {
return true
}
}
}
return false
Expand All @@ -3379,16 +3388,17 @@ func (o *consumer) isFilteredMatch(subj string) bool {
// Lock should be held.
func (o *consumer) isEqualOrSubsetMatch(subj string) bool {
for _, filter := range o.subjf {
if !filter.hasWildcard && subj == filter.subject {
if filter.subject.EqualsLiteral(subj) {
return true
}
}
tsa := [32]string{}
tts := tokenizeSubjectIntoSlice(tsa[:0], subj)
for _, filter := range o.subjf {
if isSubsetMatchTokenized(filter.tokenizedSubject, tts) {
return true
if s, err := subject.Stack(subj); err == nil {
for _, filter := range o.subjf {
if filter.subject.IsSubsetMatch(&s) {
return true
}
}

}
return false
}
Expand Down Expand Up @@ -3496,10 +3506,10 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) {
// if this subject didn't fetch any message before, do it now
if filter.pmsg == nil {
// We will unlock here in case lots of contention, e.g. WQ.
filterSubject, filterWC, nextSeq := filter.subject, filter.hasWildcard, filter.nextSeq
filterSubject, nextSeq := filter.subject, filter.nextSeq
o.mu.Unlock()
pmsg := getJSPubMsgFromPool()
sm, sseq, err := store.LoadNextMsg(filterSubject, filterWC, nextSeq, &pmsg.StoreMsg)
sm, sseq, err := store.LoadNextMsg(filterSubject.String(), filterSubject.HasWildcard(), nextSeq, &pmsg.StoreMsg)
o.mu.Lock()

filter.err = err
Expand Down Expand Up @@ -3968,14 +3978,15 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
}
} else {
if o.subjf != nil {
tsa := [32]string{}
tts := tokenizeSubjectIntoSlice(tsa[:0], pmsg.subj)
for i, filter := range o.subjf {
if isSubsetMatchTokenized(tts, filter.tokenizedSubject) {
o.subjf[i].currentSeq--
o.subjf[i].nextSeq--
break
if s, err := subject.Stack(pmsg.subj); err == nil {
for i, filter := range o.subjf {
if s.IsSubsetMatch(filter.subject) {
o.subjf[i].currentSeq--
o.subjf[i].nextSeq--
break
}
}

}
}
// We will redo this one.
Expand Down Expand Up @@ -4180,7 +4191,7 @@ func (o *consumer) streamNumPending() uint64 {
}
// Consumer with filters.
for _, filter := range o.subjf {
npc, npf := o.mset.store.NumPending(o.sseq, filter.subject, isLastPerSubject)
npc, npf := o.mset.store.NumPending(o.sseq, filter.subject.String(), isLastPerSubject)
o.npc += int64(npc)
if npf > o.npf {
o.npf = npf // Always last
Expand All @@ -4202,7 +4213,7 @@ func (o *consumer) streamNumPending() uint64 {
if filter.currentSeq < o.sseq {
filter.currentSeq = o.sseq
}
npc, npf := o.mset.store.NumPending(filter.currentSeq, filter.subject, isLastPerSubject)
npc, npf := o.mset.store.NumPending(filter.currentSeq, filter.subject.String(), isLastPerSubject)
o.npc += int64(npc)
if npf > o.npf {
o.npf = npf // Always last
Expand Down Expand Up @@ -4718,7 +4729,7 @@ func (o *consumer) selectStartingSeqNo() {
}
// If we are partitioned here this will be properly set when we become leader.
for _, filter := range o.subjf {
ss := o.mset.store.FilteredState(1, filter.subject)
ss := o.mset.store.FilteredState(1, filter.subject.String())
filter.nextSeq = ss.Last
if ss.Last > o.sseq {
o.sseq = ss.Last
Expand All @@ -4733,23 +4744,15 @@ func (o *consumer) selectStartingSeqNo() {
// A threshold for when we switch from get last msg to subjects state.
const numSubjectsThresh = 256
lss := &lastSeqSkipList{resume: state.LastSeq}
var filters []string
if o.subjf == nil {
filters = append(filters, o.cfg.FilterSubject)
} else {
for _, filter := range o.subjf {
filters = append(filters, filter.subject)
}
}
for _, filter := range filters {
if st := o.mset.store.SubjectsTotals(filter); len(st) < numSubjectsThresh {
for _, filter := range o.subjf {
if st := o.mset.store.SubjectsTotals(filter.subject.String()); len(st) < numSubjectsThresh {
var smv StoreMsg
for subj := range st {
if sm, err := o.mset.store.LoadLastMsg(subj, &smv); err == nil {
lss.seqs = append(lss.seqs, sm.seq)
}
}
} else if mss := o.mset.store.SubjectsState(filter); len(mss) > 0 {
} else if mss := o.mset.store.SubjectsState(filter.subject.String()); len(mss) > 0 {
for _, ss := range mss {
lss.seqs = append(lss.seqs, ss.Last)
}
Expand Down Expand Up @@ -5232,7 +5235,7 @@ func (o *consumer) signalSubs() []*subscription {
}

for _, filter := range o.subjf {
subs = append(subs, &subscription{subject: []byte(filter.subject), icb: o.processStreamSignal})
subs = append(subs, &subscription{subject: []byte(filter.subject.String()), icb: o.processStreamSignal})
}
o.sigSubs = subs
return subs
Expand Down Expand Up @@ -5263,16 +5266,16 @@ func (o *consumer) processStreamSignal(_ *subscription, _ *client, _ *Account, s
}

// Used to compare if two multiple filtered subject lists are equal.
func subjectSliceEqual(slice1 []string, slice2 []string) bool {
func subjectSliceEqual(slice1 []*subject.Subject, slice2 []*subject.Subject) bool {
if len(slice1) != len(slice2) {
return false
}
set2 := make(map[string]struct{}, len(slice2))
for _, val := range slice2 {
set2[val] = struct{}{}
set2[val.String()] = struct{}{}
}
for _, val := range slice1 {
if _, ok := set2[val]; !ok {
if _, ok := set2[val.String()]; !ok {
return false
}
}
Expand All @@ -5282,12 +5285,24 @@ func subjectSliceEqual(slice1 []string, slice2 []string) bool {
// Utility for simpler if conditions in Consumer config checks.
// In future iteration, we can immediately create `o.subjf` and
// use it to validate things.
func gatherSubjectFilters(filter string, filters []string) []string {
func gatherSubjectFilters(filter string, filters []string) ([]*subject.Subject, error) {
subjects := make([]*subject.Subject, 0, 1+len(filters))
if filter != _EMPTY_ {
filters = append(filters, filter)
s, err := subject.New(filter)
if err != nil {
return nil, err
}
subjects = append(subjects, s)
}
for _, filter := range filters {
// list of filters should never contain non-empty filter.
s, err := subject.New(filter)
if err != nil {
return nil, err
}
subjects = append(subjects, s)
}
// list of filters should never contain non-empty filter.
return filters
return subjects, nil
}

// shouldStartMonitor will return true if we should start a monitor
Expand Down
8 changes: 3 additions & 5 deletions server/jetstream_consumer_test.go
Expand Up @@ -27,6 +27,7 @@ import (
"testing"
"time"

"github.com/nats-io/nats-server/v2/subject"
"github.com/nats-io/nats.go"
"github.com/nats-io/nuid"
)
Expand Down Expand Up @@ -798,11 +799,8 @@ func TestJetStreamConsumerMultipleFiltersLastPerSubject(t *testing.T) {
func consumerWithFilterSubjects(filterSubjects []string) *consumer {
c := consumer{}
for _, filter := range filterSubjects {
sub := &subjectFilter{
subject: filter,
hasWildcard: subjectHasWildcard(filter),
tokenizedSubject: tokenizeSubjectIntoSlice(nil, filter),
}
s, _ := subject.New(filter)
sub := &subjectFilter{subject: s}
c.subjf = append(c.subjf, sub)
}

Expand Down