Skip to content

Commit

Permalink
[FIX] direct get APIs can contain duplicate Nats-* headers, because…
Browse files Browse the repository at this point in the history
… it simply appends JSON bytes to existing headers. If the message was onboarded on a republish, this will contain system headers. Since headers are not ordered and can contain multiple values for the same header, this can break KV clients as well as create ambiguity on the stream that yielded the message.

Fix #4573
  • Loading branch information
aricart committed Oct 3, 2023
1 parent 195227e commit 34c9222
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 0 deletions.
49 changes: 49 additions & 0 deletions server/jetstream_test.go
Expand Up @@ -21931,3 +21931,52 @@ func TestJetStreamFilteredSubjectUsesNewConsumerCreateSubject(t *testing.T) {
},
})
}

func Test_NoDuplicateHeadersOnDirectGet(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, _ := jsClientConnect(t, s)
defer nc.Close()

name := nuid.Next()

req := StreamConfig{
Name: name,
Storage: MemoryStorage,
Subjects: []string{fmt.Sprintf("%s.>", name)},
RePublish: &RePublish{
Source: ">",
Destination: fmt.Sprintf("X%s.>", name),
},
}
reqJson, err := json.Marshal(req)
require_NoError(t, err)
_, err = nc.Request(fmt.Sprintf(JSApiStreamCreateT, name), reqJson, time.Second)
require_NoError(t, err)

targetStream := fmt.Sprintf("%s-2", name)
req = StreamConfig{
Name: targetStream,
Storage: MemoryStorage,
Subjects: []string{fmt.Sprintf("X%s.>", name)},
AllowDirect: true,
}
reqJson, err = json.Marshal(req)
require_NoError(t, err)
_, err = nc.Request(fmt.Sprintf(JSApiStreamCreateT, req.Name), reqJson, time.Second)
require_NoError(t, err)

// Now publish a message to the source stream.
sendStreamMsg(t, nc, fmt.Sprintf("%s.data", name), "data")

getSubj := fmt.Sprintf(JSDirectMsgGetT, targetStream)

r, err := nc.Request(getSubj, []byte("{\"seq\":1}"), time.Second)
require_NoError(t, err)

require_Equal(t, len(r.Header.Values(JSStream)), 1)
require_Equal(t, len(r.Header.Values(JSSubject)), 1)
require_Equal(t, len(r.Header.Values(JSSequence)), 1)
require_Equal(t, len(r.Header.Values(JSTimeStamp)), 1)
}
4 changes: 4 additions & 0 deletions server/stream.go
Expand Up @@ -4051,9 +4051,13 @@ func (mset *stream) getDirectRequest(req *JSApiMsgGetRequest, reply string) {
hdr = []byte(fmt.Sprintf(ht, name, sm.subj, sm.seq, ts.Format(time.RFC3339Nano)))
} else {
hdr = copyBytes(hdr)
hdr = removeHeaderIfPresent(hdr, JSStream)
hdr = genHeader(hdr, JSStream, name)
hdr = removeHeaderIfPresent(hdr, JSSubject)
hdr = genHeader(hdr, JSSubject, sm.subj)
hdr = removeHeaderIfPresent(hdr, JSSequence)
hdr = genHeader(hdr, JSSequence, strconv.FormatUint(sm.seq, 10))
hdr = removeHeaderIfPresent(hdr, JSTimeStamp)
hdr = genHeader(hdr, JSTimeStamp, ts.Format(time.RFC3339Nano))
}
mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, sm.msg, nil, 0))
Expand Down

0 comments on commit 34c9222

Please sign in to comment.