Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KV get fails to get republished value (message) with multiple headers #586

Closed
a-mytra opened this issue Sep 6, 2023 · 10 comments · Fixed by #619
Closed

KV get fails to get republished value (message) with multiple headers #586

a-mytra opened this issue Sep 6, 2023 · 10 comments · Fixed by #619
Labels
defect Suspected defect such as a bug or regression

Comments

@a-mytra
Copy link

a-mytra commented Sep 6, 2023

What version were you using?

nats-server: v2.9.21
nats.ws 1.16.1
client running on Chrome browser

What environment was the server running in?

PRETTY_NAME="Debian GNU/Linux 11 (bullseye)"
NAME="Debian GNU/Linux"
VERSION_ID="11"
VERSION="11 (bullseye)"
VERSION_CODENAME=bullseye
ID=debian
HOME_URL="https://www.debian.org/"
SUPPORT_URL="https://www.debian.org/support"
BUG_REPORT_URL="https://bugs.debian

Is this defect reproducible?

Steps to reproduce

I create a KV named B

nats kv add B

I create a stream named DC_B to subscribe to all message on the subject A.> and republish to $KV.B.>

nats stream add DC_B 
      --subjects='A.>'
      --republish-source='>'
      --republish-destination='$KV.B.>'
      --storage=memory 
      --replicas=1 
      --retention=limits 
      --discard=old 
      --max-msgs=1 
      --max-msgs-per-subject=-1 
      --max-bytes=-1 
      --max-age=-1 
      --max-msg-size=-1 
      --dupe-window="2m0s" 
      --allow-rollup 
      --no-deny-delete 
      --no-deny-purge

I publish a message on A

nats pub "A.tomato" "hello"

As a sanity check I also explicitly put a value directly into B to compare

nats kv put B 'A.potato' "yo"

It does get capture by the stream DC_B and does get republished to the KV bucket B correctly

-> A.tomato "hello"
    -> KV B
        -> A.tomato: "hello"

Watching with nats sub '>' while this is happening I can see the message that goes to B contains multiple headers

[#48] Received on "_INBOX.Olz80ftB4Yj0cBmlJhobQO.XOE2JA36"
Nats-Stream: DC_B
Nats-Stream: KV_B
Nats-Subject: A.tomato                 <------ this
Nats-Subject: $KV.B.A.tomato           <------ and this
Nats-Sequence: 2
Nats-Sequence: 2
Nats-Last-Sequence: 1
Nats-Time-Stamp: 2023-09-02T14:48:06.891798Z

hello

[#43] Received on "_INBOX.qppVDoINpVj2TabJ60qInw.LhKzBUHR"
Nats-Stream: KV_B
Nats-Subject: $KV.B.A.potato
Nats-Sequence: 3
Nats-Time-Stamp: 2023-09-02T14:52:48.235748Z

yo

Using the CLI I can get the value using the key A.tomato from B as expected 👍

nats kv get B "A.tomato"

hello

This also works using the Python client correctly. 👍

Here is the bug 🐛 in the JS implementation 👇

const B = await js.views.kv("B")
let tomato = await B.get("A.tomato")
let potato = await B.get("A.potato")

I get tomato is null 😞 and and potato is correctly "yo"

The only difference is that A.tomato was republished into the KV and has multiple headers.

I tracked it down to the Bucket class's get function here

if (ke.key !== ek) {

This equality fails

      if (ke.key !== ek) {

And it fails due to the incorrect header retrieval in thelength getter in the KvStoredEntryImpl class here
https://github.com/nats-io/nats.deno/blob/db885246875d3c1d6401e1f686a9a0570ac20e9b/jetstream/kv.ts#L984C5

get length() {
    const slen = this.sm.header.get(JsHeaders.MessageSizeHdr) || ""
    if (slen !== "") {
        return parseInt(slen, 10)
    }
    return this.sm.data.length
}

The call to this.sm.header.get randomly (?) returns one of the headers for Nats-Subject so it doesn't match the subject equality check.

The value is correctly being retrieved but it is rejected and thrown out.

Given the capability you are leveraging, describe your expectation?

I expect

const B = await js.views.kv("B")
let tomato = await B.get("A.tomato")

to return the value stored.

Given the expectation, what is the defect you are observing?

Multiple headers (specifically Nats-Subject) in a KV key messages cause the value to be irretrievable by the JS implementation while Python and CLI (assuming Go as well) work.

@a-mytra a-mytra added the defect Suspected defect such as a bug or regression label Sep 6, 2023
@a-mytra
Copy link
Author

a-mytra commented Sep 6, 2023

For a quick fix on my end, I inject a new get function into a KV that the erroneous subject check

// --- INJECT FIX for broken multi-header bug
const AsyncFunction = async function () { }.constructor
const ngc = AsyncFunction("k", "opts", `
  const ek = this.encodeKey(k);
  this.validateKey(ek);
  let arg = {
      last_by_subj: this.subjectForKey(ek)
  };
  if (opts && opts.revision > 0) {
      arg = {
          seq: opts.revision
      };
  }
  let sm;
  try {
      if (this.direct) {
          const direct = this.jsm.direct;
          sm = await direct.getMessage(this.bucketName(), arg);
      } else {
          sm = await this.jsm.streams.getMessage(this.bucketName(), arg);
      }
      const ke = this.smToEntry(sm);
      return ke;
  } catch (err) {
      if (err.code === ErrorCode.JetStream404NoMessages) {
          return null;
      }
      throw err;
  }`)

// @ts-ignore
kv.get = ngc

I think a better implementation is to either use the correct headers for checking or drop the check altogether.

@aricart
Copy link
Member

aricart commented Sep 21, 2023

@a-mytra not sure if there's some interaction with the nats cli or the other tooling you are using. But if I run this test:

  const jsm = await nc.jetstreamManager()

  await jsm.streams.add({
    name: "A",
    subjects: ["A.>"],
    storage: StorageType.Memory,
    republish: {
      src: ">",
      dest: "$KV.B.>"
    }
  });

  nc.subscribe("$KV.B.>", {callback: (_, msg) => {
    console.log(`${msg.subject} ${msg.headers ? msg.headers : "no headers"} ${msg.string()}`);
    }})

  const js = nc.jetstream();
  const B = await js.views.kv("B");

  nc.publish("A.orange", "hey");
  await js.publish("A.tomato", "hello");

  await B.put("A.potato", "yo");

  await nc.flush();

  const iter = await B.keys();
  for await(const v of iter) {
    console.log(">", v)
  }
$KV.B.A.orange NATS/1.0
Nats-Stream: A
Nats-Subject: A.orange
Nats-Sequence: 1
Nats-Last-Sequence: 0

 hey
$KV.B.A.tomato NATS/1.0
Nats-Stream: A
Nats-Subject: A.tomato
Nats-Sequence: 2
Nats-Last-Sequence: 0

 hello
$KV.B.A.potato no headers yo
> A.orange
> A.tomato
> A.potato

I do not get any duplicate headers, and your usecase works correctly. I tried this with your server version 2.9.21 and with the just released 2.10.1

@aricart
Copy link
Member

aricart commented Sep 21, 2023

Also what it looks like is that the KV is doing republishing. Can you print the kv configuration

@aricart
Copy link
Member

aricart commented Sep 21, 2023

OK take it back - I can reproduce the issue

@a-mytra
Copy link
Author

a-mytra commented Sep 21, 2023

Ok I'm glad, I was about to carve out some time to sit down and asciinema on Mac and Linux to demonstrate. I already pulled too much hair out and a couple of weekends figuring out what was going on.

The way I found out what was going on was by manually inspecting the binary frame on the websocket message reply after a .get and I realized we are indeed getting the value back from NATS but somehow the JS implementation was discarding it.

Thank you for looking into this 🙇

@jnmoyne
Copy link

jnmoyne commented Sep 21, 2023

Leaving the specific issue aside and stepping back a little bit I would suggest you upgrade to nats-server V 2.10.1 (and nats V 0.1.1) and simply use KV bucket sourcing rather than trying to feed the bucket from republished message.

e.g.

nats kv add A
nats kv add B --source A
nats kv put A a hello
nats kv get B a

Besides this being cleaner and simpler, it is also much better in terms of reliability and quality of service: with feeding the KV bucket from (Core NATS) republished message if for some reason or another the republished message gets dropped (maybe one of the nats-servers in the cluster getting restarted right at the same time for example) then that message would never make it to the destination KV bucket. Using sourcing between the buckets is safe from any Core NATS message getting dropped or any other issue with the server infrastructure and sourcing/mirroring between streams or KV is 'store and forward reliable'.

@aricart
Copy link
Member

aricart commented Sep 21, 2023

@a-mytra the issue is not a client issue - I believe the reason why it works for you with a different client has to do with the order the values come out. So that is will be specific to the language/map etc that is used on read. I suspect this is possibly a server issue, because I wouldn't expect Nats- headers to be persisted. On the direct gets for KV, the headers are added, and that is why we get messages with 2 values when we should only be getting one for those. Keep tuned

@aricart
Copy link
Member

aricart commented Sep 21, 2023

Thank you for looking into this 🙇

Actually thank you for filling the detailed issue.

@aricart
Copy link
Member

aricart commented Sep 21, 2023

I have a workaround for you, if you set the option on the javascript kv allow_direct: false you'll work around the issue

@aricart
Copy link
Member

aricart commented Sep 22, 2023

nats-io/nats-server#4573

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
defect Suspected defect such as a bug or regression
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants