Skip to content

Commit

Permalink
[ADDED] Publish header "Nats-Expected-Last-Subject-Sequence-Subject"
Browse files Browse the repository at this point in the history
This change adds a new header
"Nats-Expected-Last-Subject-Sequence-Subject" when when paired with
"Nats-Expected-Last-Subject-Sequence" allows publishers to customize the
subject used when the server enforces
"Nats-Expected-Last-Subject-Sequence". Publishers can specify a
alternative subject to be used that includes wildcards.

Resolves #5280

Signed-off-by: Caleb Champlin <caleb.champlin@gmail.com>
  • Loading branch information
cchamplin committed Apr 7, 2024
1 parent fcff483 commit f5d181d
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 14 deletions.
8 changes: 7 additions & 1 deletion server/jetstream_cluster.go
Expand Up @@ -7657,9 +7657,15 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
// Expected last sequence per subject.
// We can check for last sequence per subject but only if the expected seq <= lseq.
if seq, exists := getExpectedLastSeqPerSubject(hdr); exists && store != nil && seq > 0 && seq <= lseq {
// Allow override of the subject used for the check.
seqSubj := subject
if optSubj := getExpectedLastSeqPerSubjectForSubject(hdr); optSubj != _EMPTY_ {
seqSubj = optSubj
}

var smv StoreMsg
var fseq uint64
sm, err := store.LoadLastMsg(subject, &smv)
sm, err := store.LoadLastMsg(seqSubj, &smv)
if sm != nil {
fseq = sm.seq
}
Expand Down
94 changes: 94 additions & 0 deletions server/jetstream_test.go
Expand Up @@ -11269,6 +11269,100 @@ func TestJetStreamLastSequenceBySubject(t *testing.T) {
}
}

func TestJetStreamLastSequenceBySubjectWithSubject(t *testing.T) {
for _, st := range []StorageType{FileStorage, MemoryStorage} {
t.Run(st.String(), func(t *testing.T) {
c := createJetStreamClusterExplicit(t, "JSC", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

cfg := StreamConfig{
Name: "KV",
Subjects: []string{"kv.>"},
Storage: st,
Replicas: 3,
MaxMsgsPer: 1,
}

req, err := json.Marshal(cfg)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Do manually for now.
m, err := nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second)
require_NoError(t, err)
si, err := js.StreamInfo("KV")
if err != nil {
t.Fatalf("Unexpected error: %v, respmsg: %q", err, string(m.Data))
}
if si == nil || si.Config.Name != "KV" {
t.Fatalf("StreamInfo is not correct %+v", si)
}

js.PublishAsync("kv.1.foo", []byte("1:1")) // Last is 1 for kv.1.foo; 1 for kv.1.*;
js.PublishAsync("kv.1.bar", []byte("1:2")) // Last is 2 for kv.1.bar; 2 for kv.1.*;
js.PublishAsync("kv.2.foo", []byte("2:1")) // Last is 3 for kv.2.foo; 3 for kv.2.*;
js.PublishAsync("kv.3.bar", []byte("3:1")) // Last is 4 for kv.3.bar; 4 for kv.3.*;
js.PublishAsync("kv.1.baz", []byte("1:3")) // Last is 5 for kv.1.baz; 5 for kv.1.*;
js.PublishAsync("kv.1.bar", []byte("1:4")) // Last is 6 for kv.1.baz; 6 for kv.1.*;
js.PublishAsync("kv.2.baz", []byte("2:2")) // Last is 7 for kv.2.baz; 7 for kv.2.*;

select {
case <-js.PublishAsyncComplete():
case <-time.After(time.Second):
t.Fatalf("Did not receive completion signal")
}

// Now make sure we get an error if the last sequence is not correct per subject.
pubAndCheck := func(subj, seq string, ok bool) {
t.Helper()
m := nats.NewMsg(subj)
m.Data = []byte("HELLO")

// Expect last to be seq.
m.Header.Set(JSExpectedLastSubjSeq, seq)

// Constrain the sequence restriction to a specific subject
// e.g. "kv.1.*" for kv.1.foo, kv.1.bar, kv.1.baz; kv.2.* for kv.2.foo, kv.2.baz; kv.3.* for kv.3.bar
filterSubject := fmt.Sprintf("%s.*", subj[:strings.LastIndex(subj, ".")])
m.Header.Set(JSExpectedLastSubjSeqSubj, filterSubject)
_, err := js.PublishMsg(m)
if ok && err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !ok && err == nil {
t.Fatalf("Expected to get an error and got none")
}
}

pubAndCheck("kv.1.foo", "0", false)
pubAndCheck("kv.1.bar", "0", false)
pubAndCheck("kv.1.xxx", "0", false)
pubAndCheck("kv.1.foo", "1", false)
pubAndCheck("kv.1.bar", "1", false)
pubAndCheck("kv.1.xxx", "1", false)
pubAndCheck("kv.2.foo", "1", false)
pubAndCheck("kv.2.bar", "1", false)
pubAndCheck("kv.2.xxx", "1", false)
pubAndCheck("kv.1.bar", "2", false)
pubAndCheck("kv.1.bar", "3", false)
pubAndCheck("kv.1.bar", "4", false)
pubAndCheck("kv.1.bar", "5", false)
pubAndCheck("kv.1.bar", "6", true) // Last is 8 for kv.1.bar; 8 for kv.1.*;
pubAndCheck("kv.1.baz", "2", false)
pubAndCheck("kv.1.bar", "7", false)
pubAndCheck("kv.1.xxx", "8", true) // Last is 9 for kv.1.xxx; 9 for kv.1.*;
pubAndCheck("kv.2.foo", "2", false)
pubAndCheck("kv.2.foo", "7", true) // Last is 10 for kv.2.foo; 10 for kv.2.*;
pubAndCheck("kv.xxx", "0", true) // Last is 11 for kv.xxx; 11 for kv.*;
pubAndCheck("kv.3.xxx", "4", true) // Last is 12 for kv.3.xxx; 12 for kv.3.*;
pubAndCheck("kv.3.xyz", "12", true) // Last is 13 for kv.3.xyz; 13 for kv.3.*;
})
}
}

func TestJetStreamFilteredConsumersWithWiderFilter(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()
Expand Down
38 changes: 25 additions & 13 deletions server/stream.go
Expand Up @@ -334,18 +334,19 @@ const (

// Headers for published messages.
const (
JSMsgId = "Nats-Msg-Id"
JSExpectedStream = "Nats-Expected-Stream"
JSExpectedLastSeq = "Nats-Expected-Last-Sequence"
JSExpectedLastSubjSeq = "Nats-Expected-Last-Subject-Sequence"
JSExpectedLastMsgId = "Nats-Expected-Last-Msg-Id"
JSStreamSource = "Nats-Stream-Source"
JSLastConsumerSeq = "Nats-Last-Consumer"
JSLastStreamSeq = "Nats-Last-Stream"
JSConsumerStalled = "Nats-Consumer-Stalled"
JSMsgRollup = "Nats-Rollup"
JSMsgSize = "Nats-Msg-Size"
JSResponseType = "Nats-Response-Type"
JSMsgId = "Nats-Msg-Id"
JSExpectedStream = "Nats-Expected-Stream"
JSExpectedLastSeq = "Nats-Expected-Last-Sequence"
JSExpectedLastSubjSeq = "Nats-Expected-Last-Subject-Sequence"
JSExpectedLastSubjSeqSubj = "Nats-Expected-Last-Subject-Sequence-Subject"
JSExpectedLastMsgId = "Nats-Expected-Last-Msg-Id"
JSStreamSource = "Nats-Stream-Source"
JSLastConsumerSeq = "Nats-Last-Consumer"
JSLastStreamSeq = "Nats-Last-Stream"
JSConsumerStalled = "Nats-Consumer-Stalled"
JSMsgRollup = "Nats-Rollup"
JSMsgSize = "Nats-Msg-Size"
JSResponseType = "Nats-Response-Type"
)

// Headers for republished messages and direct gets.
Expand Down Expand Up @@ -3978,6 +3979,11 @@ func getExpectedLastSeqPerSubject(hdr []byte) (uint64, bool) {
return uint64(parseInt64(bseq)), true
}

// Fast lookup of expected subject for the expected stream sequence per subject.
func getExpectedLastSeqPerSubjectForSubject(hdr []byte) string {
return string(getHeader(JSExpectedLastSubjSeqSubj, hdr))
}

// Signal if we are clustered. Will acquire rlock.
func (mset *stream) IsClustered() bool {
mset.mu.RLock()
Expand Down Expand Up @@ -4538,10 +4544,16 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
// Expected last sequence per subject.
// If we are clustered we have prechecked seq > 0.
if seq, exists := getExpectedLastSeqPerSubject(hdr); exists {
// Allow override of the subject used for the check.
seqSubj := subject
if optSubj := getExpectedLastSeqPerSubjectForSubject(hdr); optSubj != _EMPTY_ {
seqSubj = optSubj
}

// TODO(dlc) - We could make a new store func that does this all in one.
var smv StoreMsg
var fseq uint64
sm, err := store.LoadLastMsg(subject, &smv)
sm, err := store.LoadLastMsg(seqSubj, &smv)
if sm != nil {
fseq = sm.seq
}
Expand Down

0 comments on commit f5d181d

Please sign in to comment.