Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIX] remove extra system headers on direct gets #4618

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 13 additions & 0 deletions server/client.go
Expand Up @@ -3968,6 +3968,19 @@ func (c *client) setupResponseServiceImport(acc *Account, si *serviceImport, tra
return rsi
}

func removeStreamIdentityHeaders(hdr []byte) []byte {
if hdr == nil {
return hdr
}
if idx := bytes.Index(hdr, []byte(JSHeaderPrefix)); idx == -1 {
return hdr
}
for _, h := range []string{JSStream, JSSequence, JSTimeStamp, JSSubject} {
hdr = removeHeaderIfPresent(hdr, h)
}
return hdr
}

// Will remove a header if present.
func removeHeaderIfPresent(hdr []byte, key string) []byte {
start := bytes.Index(hdr, []byte(key))
Expand Down
111 changes: 111 additions & 0 deletions server/jetstream_test.go
Expand Up @@ -21931,3 +21931,114 @@ func TestJetStreamFilteredSubjectUsesNewConsumerCreateSubject(t *testing.T) {
},
})
}

func TestJetStreamNoDuplicateHeadersOnDirectGet(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, _ := jsClientConnect(t, s)
defer nc.Close()

name := nuid.Next()

req := StreamConfig{
Name: name,
Storage: MemoryStorage,
Subjects: []string{fmt.Sprintf("%s.>", name)},
RePublish: &RePublish{
Source: ">",
Destination: fmt.Sprintf("X%s.>", name),
},
}
reqJson, err := json.Marshal(req)
require_NoError(t, err)
_, err = nc.Request(fmt.Sprintf(JSApiStreamCreateT, name), reqJson, time.Second)
require_NoError(t, err)

targetStream := fmt.Sprintf("%s-2", name)
req = StreamConfig{
Name: targetStream,
Storage: MemoryStorage,
Subjects: []string{fmt.Sprintf("X%s.>", name)},
AllowDirect: true,
}
reqJson, err = json.Marshal(req)
require_NoError(t, err)
_, err = nc.Request(fmt.Sprintf(JSApiStreamCreateT, req.Name), reqJson, time.Second)
require_NoError(t, err)

// Now publish a message to the source stream.
sendStreamMsg(t, nc, fmt.Sprintf("%s.data", name), "data")

getSubj := fmt.Sprintf(JSDirectMsgGetT, targetStream)

r, err := nc.Request(getSubj, []byte("{\"seq\":1}"), time.Second)
require_NoError(t, err)

require_Equal(t, len(r.Header.Values(JSStream)), 1)
require_Equal(t, len(r.Header.Values(JSSubject)), 1)
require_Equal(t, len(r.Header.Values(JSSequence)), 1)
require_Equal(t, len(r.Header.Values(JSTimeStamp)), 1)
}

func TestRemoveAllJetStreamHeadersIfPresent(t *testing.T) {
// copied private fn from nats.go
headerBytes := func(h nats.Header) ([]byte, error) {
var hdr []byte
if len(h) == 0 {
return hdr, nil
}

var b bytes.Buffer
_, err := b.WriteString(hdrLine)
if err != nil {
return nil, err
}

err = http.Header(h).Write(&b)
if err != nil {
return nil, err
}

_, err = b.WriteString("\r\n")
if err != nil {
return nil, err
}

return b.Bytes(), nil
}

// test nil input
v := removeStreamIdentityHeaders(nil)
if v != nil {
t.Fatalf("expected headers to be nil")
}

h := nats.Header{}
// expect empty to become nil
hb, err := headerBytes(h)
require_NoError(t, err)
hb = removeStreamIdentityHeaders(hb)
if hb != nil {
t.Fatalf("expected headers to be nil")
}

// expect Nats-Stream|Sequence|Subject|Time-Stamp to be removed
h.Add(JSStream, "test")
h.Add(JSSequence, "1")
h.Add(JSSubject, "test.bar")
h.Add(JSTimeStamp, "something")
hb, err = headerBytes(h)
require_NoError(t, err)
hb = removeStreamIdentityHeaders(hb)
if hb != nil {
t.Fatalf("expected headers to be nil")
}

// expect one header to remain
h.Add("Something-Else", "hey")
hb, err = headerBytes(h)
require_NoError(t, err)
hb = removeStreamIdentityHeaders(hb)
require_Equal(t, "NATS/1.0\r\nSomething-Else: hey\r\n\r\n", string(hb))
}
8 changes: 8 additions & 0 deletions server/stream.go
Expand Up @@ -341,6 +341,7 @@ const (

// Headers for republished messages and direct gets.
const (
JSHeaderPrefix = "Nats-"
JSStream = "Nats-Stream"
JSSequence = "Nats-Sequence"
JSTimeStamp = "Nats-Time-Stamp"
Expand Down Expand Up @@ -4051,6 +4052,8 @@ func (mset *stream) getDirectRequest(req *JSApiMsgGetRequest, reply string) {
hdr = []byte(fmt.Sprintf(ht, name, sm.subj, sm.seq, ts.Format(time.RFC3339Nano)))
} else {
hdr = copyBytes(hdr)
// streams that onboarded via republish could contain the original stream headers
hdr = removeStreamIdentityHeaders(hdr)
hdr = genHeader(hdr, JSStream, name)
hdr = genHeader(hdr, JSSubject, sm.subj)
hdr = genHeader(hdr, JSSequence, strconv.FormatUint(sm.seq, 10))
Expand Down Expand Up @@ -4399,6 +4402,11 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
}
}

// if we are NOT doing subject remapping, this is a message to on-board it shouldn't store Nats-* headers
if tsubj == _EMPTY_ {
hdr = removeStreamIdentityHeaders(hdr)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is correct, but big change since we have always stored the message and headers as we received them. Be good to discuss with @wallyqs and @bruth

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More importantly - this doesn't fix the issue for direct get - it simply solves it for the on-boarding. We would still need to have the initial fix on the outbound to ensure that clients won't get duplicate headers.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes since they are stored with headers today..

}

// Store actual msg.
if lseq == 0 && ts == 0 {
seq, ts, err = store.StoreMsg(subject, hdr, msg)
Expand Down