Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] Publish header "Nats-Expected-Last-Subject-Sequence-Subject" #5281

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 7 additions & 1 deletion server/jetstream_cluster.go
Expand Up @@ -7740,9 +7740,15 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
// Expected last sequence per subject.
// We can check for last sequence per subject but only if the expected seq <= lseq.
if seq, exists := getExpectedLastSeqPerSubject(hdr); exists && store != nil && seq > 0 && seq <= lseq {
// Allow override of the subject used for the check.
seqSubj := subject
if optSubj := getExpectedLastSeqPerSubjectForSubject(hdr); optSubj != _EMPTY_ {
seqSubj = optSubj
}

var smv StoreMsg
var fseq uint64
sm, err := store.LoadLastMsg(subject, &smv)
sm, err := store.LoadLastMsg(seqSubj, &smv)
if sm != nil {
fseq = sm.seq
}
Expand Down
94 changes: 94 additions & 0 deletions server/jetstream_test.go
Expand Up @@ -11269,6 +11269,100 @@ func TestJetStreamLastSequenceBySubject(t *testing.T) {
}
}

func TestJetStreamLastSequenceBySubjectWithSubject(t *testing.T) {
for _, st := range []StorageType{FileStorage, MemoryStorage} {
t.Run(st.String(), func(t *testing.T) {
c := createJetStreamClusterExplicit(t, "JSC", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

cfg := StreamConfig{
Name: "KV",
Subjects: []string{"kv.>"},
Storage: st,
Replicas: 3,
MaxMsgsPer: 1,
}

req, err := json.Marshal(cfg)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Do manually for now.
m, err := nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second)
require_NoError(t, err)
si, err := js.StreamInfo("KV")
if err != nil {
t.Fatalf("Unexpected error: %v, respmsg: %q", err, string(m.Data))
}
if si == nil || si.Config.Name != "KV" {
t.Fatalf("StreamInfo is not correct %+v", si)
}

js.PublishAsync("kv.1.foo", []byte("1:1")) // Last is 1 for kv.1.foo; 1 for kv.1.*;
js.PublishAsync("kv.1.bar", []byte("1:2")) // Last is 2 for kv.1.bar; 2 for kv.1.*;
js.PublishAsync("kv.2.foo", []byte("2:1")) // Last is 3 for kv.2.foo; 3 for kv.2.*;
js.PublishAsync("kv.3.bar", []byte("3:1")) // Last is 4 for kv.3.bar; 4 for kv.3.*;
js.PublishAsync("kv.1.baz", []byte("1:3")) // Last is 5 for kv.1.baz; 5 for kv.1.*;
js.PublishAsync("kv.1.bar", []byte("1:4")) // Last is 6 for kv.1.baz; 6 for kv.1.*;
js.PublishAsync("kv.2.baz", []byte("2:2")) // Last is 7 for kv.2.baz; 7 for kv.2.*;

select {
case <-js.PublishAsyncComplete():
case <-time.After(time.Second):
t.Fatalf("Did not receive completion signal")
}

// Now make sure we get an error if the last sequence is not correct per subject.
pubAndCheck := func(subj, filterSubject, seq string, ok bool) {
t.Helper()
m := nats.NewMsg(subj)
m.Data = []byte("HELLO")

// Expect last to be seq.
m.Header.Set(JSExpectedLastSubjSeq, seq)

// Constrain the sequence restriction to a specific subject
// e.g. "kv.1.*" for kv.1.foo, kv.1.bar, kv.1.baz; kv.2.* for kv.2.foo, kv.2.baz; kv.3.* for kv.3.bar
m.Header.Set(JSExpectedLastSubjSeqSubj, filterSubject)
_, err := js.PublishMsg(m)
if ok && err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !ok && err == nil {
t.Fatalf("Expected to get an error and got none")
}
}

pubAndCheck("kv.1.foo", "kv.1.*", "0", false)
pubAndCheck("kv.1.bar", "kv.1.*", "0", false)
pubAndCheck("kv.1.xxx", "kv.1.*", "0", false)
pubAndCheck("kv.1.foo", "kv.1.*", "1", false)
pubAndCheck("kv.1.bar", "kv.1.*", "1", false)
pubAndCheck("kv.1.xxx", "kv.1.*", "1", false)
pubAndCheck("kv.2.foo", "kv.2.*", "1", false)
pubAndCheck("kv.2.bar", "kv.2.*", "1", false)
pubAndCheck("kv.2.xxx", "kv.2.*", "1", false)
pubAndCheck("kv.1.bar", "kv.1.*", "2", false)
pubAndCheck("kv.1.bar", "kv.1.*", "3", false)
pubAndCheck("kv.1.bar", "kv.1.*", "4", false)
pubAndCheck("kv.1.bar", "kv.1.*", "5", false)
pubAndCheck("kv.1.bar", "kv.1.*", "6", true) // Last is 8 for kv.1.bar; 8 for kv.1.*;
pubAndCheck("kv.1.baz", "kv.1.*", "2", false)
pubAndCheck("kv.1.bar", "kv.1.*", "7", false)
pubAndCheck("kv.1.xxx", "kv.1.*", "8", true) // Last is 9 for kv.1.xxx; 9 for kv.1.*;
pubAndCheck("kv.2.foo", "kv.2.*", "2", false)
pubAndCheck("kv.2.foo", "kv.2.*", "7", true) // Last is 10 for kv.2.foo; 10 for kv.2.*;
pubAndCheck("kv.xxx", "kv.*", "0", true) // Last is 0 for kv.xxx; 0 for kv.*;
pubAndCheck("kv.xxx", "kv.*.*", "0", false) // Last is 11 for kv.xxx; 11 for kv.*.*;
pubAndCheck("kv.3.xxx", "kv.3.*", "4", true) // Last is 12 for kv.3.xxx; 12 for kv.3.*;
pubAndCheck("kv.3.xyz", "kv.3.*", "12", true) // Last is 13 for kv.3.xyz; 13 for kv.3.*;
})
}
}

func TestJetStreamFilteredConsumersWithWiderFilter(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()
Expand Down
38 changes: 25 additions & 13 deletions server/stream.go
Expand Up @@ -334,18 +334,19 @@ const (

// Headers for published messages.
const (
JSMsgId = "Nats-Msg-Id"
JSExpectedStream = "Nats-Expected-Stream"
JSExpectedLastSeq = "Nats-Expected-Last-Sequence"
JSExpectedLastSubjSeq = "Nats-Expected-Last-Subject-Sequence"
JSExpectedLastMsgId = "Nats-Expected-Last-Msg-Id"
JSStreamSource = "Nats-Stream-Source"
JSLastConsumerSeq = "Nats-Last-Consumer"
JSLastStreamSeq = "Nats-Last-Stream"
JSConsumerStalled = "Nats-Consumer-Stalled"
JSMsgRollup = "Nats-Rollup"
JSMsgSize = "Nats-Msg-Size"
JSResponseType = "Nats-Response-Type"
JSMsgId = "Nats-Msg-Id"
JSExpectedStream = "Nats-Expected-Stream"
JSExpectedLastSeq = "Nats-Expected-Last-Sequence"
JSExpectedLastSubjSeq = "Nats-Expected-Last-Subject-Sequence"
JSExpectedLastSubjSeqSubj = "Nats-Expected-Last-Subject-Sequence-Subject"
JSExpectedLastMsgId = "Nats-Expected-Last-Msg-Id"
JSStreamSource = "Nats-Stream-Source"
JSLastConsumerSeq = "Nats-Last-Consumer"
JSLastStreamSeq = "Nats-Last-Stream"
JSConsumerStalled = "Nats-Consumer-Stalled"
JSMsgRollup = "Nats-Rollup"
JSMsgSize = "Nats-Msg-Size"
JSResponseType = "Nats-Response-Type"
)

// Headers for republished messages and direct gets.
Expand Down Expand Up @@ -3977,6 +3978,11 @@ func getExpectedLastSeqPerSubject(hdr []byte) (uint64, bool) {
return uint64(parseInt64(bseq)), true
}

// Fast lookup of expected subject for the expected stream sequence per subject.
func getExpectedLastSeqPerSubjectForSubject(hdr []byte) string {
return string(getHeader(JSExpectedLastSubjSeqSubj, hdr))
}

// Signal if we are clustered. Will acquire rlock.
func (mset *stream) IsClustered() bool {
mset.mu.RLock()
Expand Down Expand Up @@ -4537,10 +4543,16 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
// Expected last sequence per subject.
// If we are clustered we have prechecked seq > 0.
if seq, exists := getExpectedLastSeqPerSubject(hdr); exists {
// Allow override of the subject used for the check.
seqSubj := subject
if optSubj := getExpectedLastSeqPerSubjectForSubject(hdr); optSubj != _EMPTY_ {
seqSubj = optSubj
}

// TODO(dlc) - We could make a new store func that does this all in one.
var smv StoreMsg
var fseq uint64
sm, err := store.LoadLastMsg(subject, &smv)
sm, err := store.LoadLastMsg(seqSubj, &smv)
if sm != nil {
fseq = sm.seq
}
Expand Down