Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JetStream subscriptions not working with k8s #445

Closed
niranjannitesh opened this issue Aug 31, 2021 · 7 comments
Closed

JetStream subscriptions not working with k8s #445

niranjannitesh opened this issue Aug 31, 2021 · 7 comments

Comments

@niranjannitesh
Copy link

JetStream subscriptions are not triggered when used inside k8s
it works when I run it with docker-compose or locally but not when used inside k8s

Not sure if this issue should be opened on the NATS Server itself.
I checked with https://github.com/nats-io/nats-box I am able to subscribe and see messages

@niranjannitesh
Copy link
Author

import * as nats from "nats"
import { randomUUID } from "crypto"

const sc = nats.JSONCodec()

function encode<T>(obj: { data: T; traceId: unknown }) {
  return sc.encode(obj)
}

function decode<T>(str: Uint8Array) {
  return sc.decode(str) as { data: T; traceId: unknown }
}

export interface IBus {
  subscribe<T>(
    name: string,
    callback: (arg: {
      obj: { data: T; traceId: unknown }
      m: nats.JsMsg
    }) => void
  ): Promise<void>
  publish<T>(name: string, data: T): Promise<void>
}

export default function createNATSTransport({
  servers,
  durableName,
  logger,
  rTracer,
}: {
  servers: string | string[]
  durableName: string
  logger: any
  rTracer: any
}) {
  let isConnected = false
  let nc: nats.NatsConnection,
    jsm: nats.JetStreamManager,
    js: nats.JetStreamClient

  async function connect() {
    nc = await nats.connect({ servers, name: durableName, timeout: 10 * 1000 })
    jsm = await nc.jetstreamManager()
    js = await nc.jetstream()
    logger.info({
      msg: `connected to nats`,
      url: servers,
    })
    isConnected = true
  }

  async function drain() {
    await nc.drain()
  }

  async function disconnect() {
    await nc.close()
  }

  const bus: IBus = {
    async subscribe<T>(
      name: string,
      callback: (arg: {
        obj: { data: T; traceId: unknown }
        m: nats.JsMsg
      }) => void
    ) {
      if (!isConnected) {
        throw new Error("NATS is not connected")
      }
      const streams = await jsm.streams.list().next()
      const [stream, ...extra] = name.split(".")
      const streamExists =
        streams.findIndex((x) => x.config.name === stream) > -1
      if (!streamExists) {
        await jsm.streams.add({ name: stream, subjects: [`${stream}.*`] })
      }
      const inbox = nats.createInbox()
      const opts = nats.consumerOpts({
        ack_policy: nats.AckPolicy.Explicit,
        deliver_policy: nats.DeliverPolicy.Last,
        deliver_subject: inbox,
        flow_control: true,
        durable_name: durableName + (extra.length ? "-" + extra.join("-") : ""),
      })
      const sub = await js.subscribe(name, { ...opts, queue: durableName })
      ;(async () => {
        for await (const m of sub) {
          const obj = decode<T>(m.data)
          logger.info({
            msg: "bus event recieved",
            traceId: obj.traceId,
            event: {
              name,
              data: obj.data,
            },
          })
          callback({
            obj,
            m: m,
          })
        }
      })()
    },
    async publish<T>(name: string, data: T) {
      if (!isConnected) {
        throw new Error("NATS is not connected")
      }

      const streams = await jsm.streams.list().next()
      const [stream, ...extra] = name.split(".")
      const streamExists =
        streams.findIndex((x) => x.config.name === stream) > -1
      if (!streamExists) {
        await jsm.streams.add({ name: stream, subjects: [`${stream}.*`] })
      }

      const wrappedData = {
        data,
        traceId: rTracer.id() || randomUUID(),
      }
      js.publish(name, encode<T>(wrappedData))
      logger.info({
        msg: `bus event published`,
        event: {
          name,
        },
      })
    },
  }

  const transport = {
    reply<RequestData, ResponseData>(
      subject: string,
      callback: (obj: {
        data: RequestData
        traceId: unknown
      }) => ResponseData | Promise<ResponseData>
    ) {
      if (!isConnected) {
        throw new Error("NATS is not connected")
      }
      const sub = nc.subscribe("x" + subject, { queue: durableName })
      ;(async () => {
        for await (const m of sub) {
          const obj = decode<RequestData>(m.data)
          logger.info({
            msg: "nats transport request recieved",
            traceId: obj.traceId,
            event: {
              name: subject,
              data: obj.data,
            },
          })
          const result = await callback(obj)
          const encodedData = encode<ResponseData>({
            data: result,
            traceId: obj.traceId,
          })
          if (m.respond(encodedData)) {
            logger.info({
              msg: "nats transport reply",
              traceId: obj.traceId,
              event: {
                name: subject,
              },
            })
          }
        }
      })()
    },
    async request<RequestData, ResponseData>(
      subject: string,
      data: RequestData
    ) {
      if (!isConnected) {
        throw new Error("NATS is not connected")
      }
      try {
        const timeout = process.env.NODE_ENV === "test" ? 30 * 1000 : 5 * 1000
        const traceId = rTracer.id() || randomUUID()
        const response = await nc.request(
          "x" + subject,
          encode<RequestData>({ data, traceId }),
          { timeout }
        )

        return decode<ResponseData>(response.data)
      } catch (error) {
        logger.error({
          msg: `nats request failed`,
          event: {
            name: subject,
            data: data,
          },
        })
        throw error
      }
    },
  }

  return {
    connect,
    bus,
    transport,
    drain,
    disconnect,
  }
}
import natsTransport from "../services/nats"
import registerEventSubsription from "../nats-events"

export default async function loaders() {
  let retries = 1
  while (retries <= 5) {
    try {
      await sleep(5000)
      await natsTransport.connect().then(() => {
        registerEventSubsription()
      })
      break
    } catch (error) {
      retries++
      if (retries >= 5) {
        throw error
      }
    }
  }
}
import natsTransport from "../services/nats"

export default function registerEventSubsription() {
  natsTransport.bus.subscribe(
    "account.user:created",
    ({ obj, m }) => {
      m.ack()
    }
  )
}

@niranjannitesh
Copy link
Author

nats-io/nats.go#453

This seems similar

@niranjannitesh
Copy link
Author

If I remove the queue then it works.

@niranjannitesh
Copy link
Author

I wrote this https://github.com/nowandme/ionats basic client thing to test if there is an issue with nats or the js lib.
Since I couldn't find any docs for jetstream protocol
I have implemented the basic PUB SUB thing with NATS with queue group and it works both locally and with k8s.
Hopefully, this can help us find the bug in the current lib.

@aricart
Copy link
Member

aricart commented Aug 31, 2021

The issue is you are using a queue Jetstream subscription with a new server and an old client...

Please update your client.

See https://github.com/nats-io/nats.js/releases/tag/v2.2.0

nats-io/nats-server#2438 (comment)

@aricart aricart closed this as completed Aug 31, 2021
@niranjannitesh
Copy link
Author

oh! 😛

@aricart
Copy link
Member

aricart commented Aug 31, 2021

#446

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants