Skip to content

Commit

Permalink
[FIX] direct get APIs can contain duplicate Nats-* headers, because…
Browse files Browse the repository at this point in the history
… it simply appends JSON bytes to existing headers. If the message was onboarded on a republish, this will contain system headers. Since headers are not ordered and can contain multiple values for the same header, this can break KV clients as well as create ambiguity on the stream that yielded the message.

Fix #4573
  • Loading branch information
aricart committed Oct 3, 2023
1 parent 195227e commit 36b4e2d
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 0 deletions.
18 changes: 18 additions & 0 deletions server/client.go
Expand Up @@ -3968,6 +3968,24 @@ func (c *client) setupResponseServiceImport(acc *Account, si *serviceImport, tra
return rsi
}

func removeAllNatsHeadersIfPresent(hdr []byte) []byte {
for {
start := bytes.Index(hdr, []byte(JSHeaderPrefix))
// key can't be first and we want to check that it is preceded by a '\n'
if start < 1 || hdr[start-1] != '\n' {
return hdr
}
end := bytes.Index(hdr[start:], []byte(_CRLF_))
if end < 0 {
return hdr
}
hdr = append(hdr[:start], hdr[start+end+len(_CRLF_):]...)
if len(hdr) <= len(emptyHdrLine) {
return nil
}
}
}

// Will remove a header if present.
func removeHeaderIfPresent(hdr []byte, key string) []byte {
start := bytes.Index(hdr, []byte(key))
Expand Down
109 changes: 109 additions & 0 deletions server/jetstream_test.go
Expand Up @@ -21931,3 +21931,112 @@ func TestJetStreamFilteredSubjectUsesNewConsumerCreateSubject(t *testing.T) {
},
})
}

func TestJetStreamNoDuplicateHeadersOnDirectGet(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, _ := jsClientConnect(t, s)
defer nc.Close()

name := nuid.Next()

req := StreamConfig{
Name: name,
Storage: MemoryStorage,
Subjects: []string{fmt.Sprintf("%s.>", name)},
RePublish: &RePublish{
Source: ">",
Destination: fmt.Sprintf("X%s.>", name),
},
}
reqJson, err := json.Marshal(req)
require_NoError(t, err)
_, err = nc.Request(fmt.Sprintf(JSApiStreamCreateT, name), reqJson, time.Second)
require_NoError(t, err)

targetStream := fmt.Sprintf("%s-2", name)
req = StreamConfig{
Name: targetStream,
Storage: MemoryStorage,
Subjects: []string{fmt.Sprintf("X%s.>", name)},
AllowDirect: true,
}
reqJson, err = json.Marshal(req)
require_NoError(t, err)
_, err = nc.Request(fmt.Sprintf(JSApiStreamCreateT, req.Name), reqJson, time.Second)
require_NoError(t, err)

// Now publish a message to the source stream.
sendStreamMsg(t, nc, fmt.Sprintf("%s.data", name), "data")

getSubj := fmt.Sprintf(JSDirectMsgGetT, targetStream)

r, err := nc.Request(getSubj, []byte("{\"seq\":1}"), time.Second)
require_NoError(t, err)

require_Equal(t, len(r.Header.Values(JSStream)), 1)
require_Equal(t, len(r.Header.Values(JSSubject)), 1)
require_Equal(t, len(r.Header.Values(JSSequence)), 1)
require_Equal(t, len(r.Header.Values(JSTimeStamp)), 1)
}

func TestRemoveAllJetStreamHeadersIfPresent(t *testing.T) {
// copied private fn from nats.go
headerBytes := func(h nats.Header) ([]byte, error) {
var hdr []byte
if len(h) == 0 {
return hdr, nil
}

var b bytes.Buffer
_, err := b.WriteString(hdrLine)
if err != nil {
return nil, err
}

err = http.Header(h).Write(&b)
if err != nil {
return nil, err
}

_, err = b.WriteString("\r\n")
if err != nil {
return nil, err
}

return b.Bytes(), nil
}

// test nil input
v := removeAllNatsHeadersIfPresent(nil)
if v != nil {
t.Fatalf("expected headers to be nil")
}

h := nats.Header{}
// expect empty to become nil
hb, err := headerBytes(h)
require_NoError(t, err)
hb = removeAllNatsHeadersIfPresent(hb)
if hb != nil {
t.Fatalf("expected headers to be nil")
}

// expect Nats-* to be removed, thus be nil
h.Add("Nats-Test", "test")
h.Add("Nats-Something-Else", "test")
hb, err = headerBytes(h)
require_NoError(t, err)
hb = removeAllNatsHeadersIfPresent(hb)
if hb != nil {
t.Fatalf("expected headers to be nil")
}

// expect one header to remain
h.Add("Something-Else", "hey")
hb, err = headerBytes(h)
require_NoError(t, err)
hb = removeAllNatsHeadersIfPresent(hb)
require_Equal(t, "NATS/1.0\r\nSomething-Else: hey\r\n\r\n", string(hb))
}
6 changes: 6 additions & 0 deletions server/stream.go
Expand Up @@ -341,6 +341,7 @@ const (

// Headers for republished messages and direct gets.
const (
JSHeaderPrefix = "Nats-"
JSStream = "Nats-Stream"
JSSequence = "Nats-Sequence"
JSTimeStamp = "Nats-Time-Stamp"
Expand Down Expand Up @@ -4399,6 +4400,11 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
}
}

// if we are NOT doing subject remapping, this is a message to on-board it shouldn't store Nats-* headers
if tsubj == _EMPTY_ {
hdr = removeAllNatsHeadersIfPresent(hdr)
}

// Store actual msg.
if lseq == 0 && ts == 0 {
seq, ts, err = store.StoreMsg(subject, hdr, msg)
Expand Down

0 comments on commit 36b4e2d

Please sign in to comment.