Skip to content

Commit

Permalink
[FIX] direct get APIs can contain duplicate Nats-* headers, because…
Browse files Browse the repository at this point in the history
… it simply appends JSON bytes to existing headers. If the message was onboarded on a republish, this will contain system headers. Since headers are not ordered and can contain multiple values for the same header, this can break KV clients as well as create ambiguity on the stream that yielded the message.

Fix #4573
  • Loading branch information
aricart committed Oct 3, 2023
1 parent 195227e commit 2916bb0
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 0 deletions.
18 changes: 18 additions & 0 deletions server/client.go
Expand Up @@ -3968,6 +3968,24 @@ func (c *client) setupResponseServiceImport(acc *Account, si *serviceImport, tra
return rsi
}

func removeAllNatsHeadersIfPresent(hdr []byte) []byte {
for {
start := bytes.Index(hdr, []byte(JSHeaderPrefix))
// key can't be first and we want to check that it is preceded by a '\n'
if start < 1 || hdr[start-1] != '\n' {
return hdr
}
end := bytes.Index(hdr[start:], []byte(_CRLF_))
if end < 0 {
return hdr
}
hdr = append(hdr[:start], hdr[start+end+len(_CRLF_):]...)
if len(hdr) <= len(emptyHdrLine) {
return nil
}
}
}

// Will remove a header if present.
func removeHeaderIfPresent(hdr []byte, key string) []byte {
start := bytes.Index(hdr, []byte(key))
Expand Down
103 changes: 103 additions & 0 deletions server/jetstream_test.go
Expand Up @@ -21931,3 +21931,106 @@ func TestJetStreamFilteredSubjectUsesNewConsumerCreateSubject(t *testing.T) {
},
})
}

func TestJetStreamNoDuplicateHeadersOnDirectGet(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, _ := jsClientConnect(t, s)
defer nc.Close()

name := nuid.Next()

req := StreamConfig{
Name: name,
Storage: MemoryStorage,
Subjects: []string{fmt.Sprintf("%s.>", name)},
RePublish: &RePublish{
Source: ">",
Destination: fmt.Sprintf("X%s.>", name),
},
}
reqJson, err := json.Marshal(req)
require_NoError(t, err)
_, err = nc.Request(fmt.Sprintf(JSApiStreamCreateT, name), reqJson, time.Second)
require_NoError(t, err)

targetStream := fmt.Sprintf("%s-2", name)
req = StreamConfig{
Name: targetStream,
Storage: MemoryStorage,
Subjects: []string{fmt.Sprintf("X%s.>", name)},
AllowDirect: true,
}
reqJson, err = json.Marshal(req)
require_NoError(t, err)
_, err = nc.Request(fmt.Sprintf(JSApiStreamCreateT, req.Name), reqJson, time.Second)
require_NoError(t, err)

// Now publish a message to the source stream.
sendStreamMsg(t, nc, fmt.Sprintf("%s.data", name), "data")

getSubj := fmt.Sprintf(JSDirectMsgGetT, targetStream)

r, err := nc.Request(getSubj, []byte("{\"seq\":1}"), time.Second)
require_NoError(t, err)

require_Equal(t, len(r.Header.Values(JSStream)), 1)
require_Equal(t, len(r.Header.Values(JSSubject)), 1)
require_Equal(t, len(r.Header.Values(JSSequence)), 1)
require_Equal(t, len(r.Header.Values(JSTimeStamp)), 1)
}

func TestRemoveAllJetStreamHeadersIfPresent(t *testing.T) {
// copied private fn from nats.go
headerBytes := func(h nats.Header) ([]byte, error) {
var hdr []byte
if len(h) == 0 {
return hdr, nil
}

var b bytes.Buffer
_, err := b.WriteString(hdrLine)
if err != nil {
return nil, err
}

err = http.Header(h).Write(&b)
if err != nil {
return nil, err
}

_, err = b.WriteString("\r\n")
if err != nil {
return nil, err
}

return b.Bytes(), nil
}

h := nats.Header{}
// expect empty to become nil
hb, err := headerBytes(h)
require_NoError(t, err)
hb = removeAllNatsHeadersIfPresent(hb)
if hb != nil {
t.Fatalf("expected headers to be nil")
}

// expect Nats-* to be removed, thus be nil
h.Add("Nats-Test", "test")
h.Add("Nats-Something-Else", "test")
hb, err = headerBytes(h)
require_NoError(t, err)
hb = removeAllNatsHeadersIfPresent(hb)
if hb != nil {
t.Fatalf("expected headers to be nil")
}

// expect one header to remain
h.Add("Something-Else", "hey")
hb, err = headerBytes(h)
require_NoError(t, err)
hb = removeAllNatsHeadersIfPresent(hb)
require_Equal(t, "NATS/1.0\r\nSomething-Else: hey\r\n\r\n", string(hb))
}
6 changes: 6 additions & 0 deletions server/stream.go
Expand Up @@ -341,6 +341,7 @@ const (

// Headers for republished messages and direct gets.
const (
JSHeaderPrefix = "Nats-"
JSStream = "Nats-Stream"
JSSequence = "Nats-Sequence"
JSTimeStamp = "Nats-Time-Stamp"
Expand Down Expand Up @@ -4399,6 +4400,11 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
}
}

// if we are NOT doing subject remapping, this is a message to on-board it shouldn't store Nats-* headers
if tsubj == _EMPTY_ {
hdr = removeAllNatsHeadersIfPresent(hdr)
}

// Store actual msg.
if lseq == 0 && ts == 0 {
seq, ts, err = store.StoreMsg(subject, hdr, msg)
Expand Down

0 comments on commit 2916bb0

Please sign in to comment.