Skip to content

Commit

Permalink
[FIX] direct get APIs can contain duplicate Nats-* headers, because…
Browse files Browse the repository at this point in the history
… it simply appends JSON bytes to existing headers. If the message was onboarded on a republish, this will contain system headers. Since headers are not ordered and can contain multiple values for the same header, this can break KV clients as well as create ambiguity on the stream that yielded the message.

Fix #4573
  • Loading branch information
aricart committed Oct 3, 2023
1 parent 195227e commit 7c12741
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 0 deletions.
49 changes: 49 additions & 0 deletions server/jetstream_test.go
Expand Up @@ -21931,3 +21931,52 @@ func TestJetStreamFilteredSubjectUsesNewConsumerCreateSubject(t *testing.T) {
},
})
}

func TestJetStreamNoDuplicateHeadersOnDirectGet(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, _ := jsClientConnect(t, s)
defer nc.Close()

name := nuid.Next()

req := StreamConfig{
Name: name,
Storage: MemoryStorage,
Subjects: []string{fmt.Sprintf("%s.>", name)},
RePublish: &RePublish{
Source: ">",
Destination: fmt.Sprintf("X%s.>", name),
},
}
reqJson, err := json.Marshal(req)
require_NoError(t, err)
_, err = nc.Request(fmt.Sprintf(JSApiStreamCreateT, name), reqJson, time.Second)
require_NoError(t, err)

targetStream := fmt.Sprintf("%s-2", name)
req = StreamConfig{
Name: targetStream,
Storage: MemoryStorage,
Subjects: []string{fmt.Sprintf("X%s.>", name)},
AllowDirect: true,
}
reqJson, err = json.Marshal(req)
require_NoError(t, err)
_, err = nc.Request(fmt.Sprintf(JSApiStreamCreateT, req.Name), reqJson, time.Second)
require_NoError(t, err)

// Now publish a message to the source stream.
sendStreamMsg(t, nc, fmt.Sprintf("%s.data", name), "data")

getSubj := fmt.Sprintf(JSDirectMsgGetT, targetStream)

r, err := nc.Request(getSubj, []byte("{\"seq\":1}"), time.Second)
require_NoError(t, err)

require_Equal(t, len(r.Header.Values(JSStream)), 1)
require_Equal(t, len(r.Header.Values(JSSubject)), 1)
require_Equal(t, len(r.Header.Values(JSSequence)), 1)
require_Equal(t, len(r.Header.Values(JSTimeStamp)), 1)
}
4 changes: 4 additions & 0 deletions server/stream.go
Expand Up @@ -4051,9 +4051,13 @@ func (mset *stream) getDirectRequest(req *JSApiMsgGetRequest, reply string) {
hdr = []byte(fmt.Sprintf(ht, name, sm.subj, sm.seq, ts.Format(time.RFC3339Nano)))
} else {
hdr = copyBytes(hdr)
hdr = removeHeaderIfPresent(hdr, JSStream)
hdr = genHeader(hdr, JSStream, name)
hdr = removeHeaderIfPresent(hdr, JSSubject)
hdr = genHeader(hdr, JSSubject, sm.subj)
hdr = removeHeaderIfPresent(hdr, JSSequence)
hdr = genHeader(hdr, JSSequence, strconv.FormatUint(sm.seq, 10))
hdr = removeHeaderIfPresent(hdr, JSTimeStamp)
hdr = genHeader(hdr, JSTimeStamp, ts.Format(time.RFC3339Nano))
}
mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, sm.msg, nil, 0))
Expand Down

0 comments on commit 7c12741

Please sign in to comment.