-
Notifications
You must be signed in to change notification settings - Fork 160
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
JetStream subscriptions not working with k8s #445
Comments
import * as nats from "nats"
import { randomUUID } from "crypto"
const sc = nats.JSONCodec()
function encode<T>(obj: { data: T; traceId: unknown }) {
return sc.encode(obj)
}
function decode<T>(str: Uint8Array) {
return sc.decode(str) as { data: T; traceId: unknown }
}
export interface IBus {
subscribe<T>(
name: string,
callback: (arg: {
obj: { data: T; traceId: unknown }
m: nats.JsMsg
}) => void
): Promise<void>
publish<T>(name: string, data: T): Promise<void>
}
export default function createNATSTransport({
servers,
durableName,
logger,
rTracer,
}: {
servers: string | string[]
durableName: string
logger: any
rTracer: any
}) {
let isConnected = false
let nc: nats.NatsConnection,
jsm: nats.JetStreamManager,
js: nats.JetStreamClient
async function connect() {
nc = await nats.connect({ servers, name: durableName, timeout: 10 * 1000 })
jsm = await nc.jetstreamManager()
js = await nc.jetstream()
logger.info({
msg: `connected to nats`,
url: servers,
})
isConnected = true
}
async function drain() {
await nc.drain()
}
async function disconnect() {
await nc.close()
}
const bus: IBus = {
async subscribe<T>(
name: string,
callback: (arg: {
obj: { data: T; traceId: unknown }
m: nats.JsMsg
}) => void
) {
if (!isConnected) {
throw new Error("NATS is not connected")
}
const streams = await jsm.streams.list().next()
const [stream, ...extra] = name.split(".")
const streamExists =
streams.findIndex((x) => x.config.name === stream) > -1
if (!streamExists) {
await jsm.streams.add({ name: stream, subjects: [`${stream}.*`] })
}
const inbox = nats.createInbox()
const opts = nats.consumerOpts({
ack_policy: nats.AckPolicy.Explicit,
deliver_policy: nats.DeliverPolicy.Last,
deliver_subject: inbox,
flow_control: true,
durable_name: durableName + (extra.length ? "-" + extra.join("-") : ""),
})
const sub = await js.subscribe(name, { ...opts, queue: durableName })
;(async () => {
for await (const m of sub) {
const obj = decode<T>(m.data)
logger.info({
msg: "bus event recieved",
traceId: obj.traceId,
event: {
name,
data: obj.data,
},
})
callback({
obj,
m: m,
})
}
})()
},
async publish<T>(name: string, data: T) {
if (!isConnected) {
throw new Error("NATS is not connected")
}
const streams = await jsm.streams.list().next()
const [stream, ...extra] = name.split(".")
const streamExists =
streams.findIndex((x) => x.config.name === stream) > -1
if (!streamExists) {
await jsm.streams.add({ name: stream, subjects: [`${stream}.*`] })
}
const wrappedData = {
data,
traceId: rTracer.id() || randomUUID(),
}
js.publish(name, encode<T>(wrappedData))
logger.info({
msg: `bus event published`,
event: {
name,
},
})
},
}
const transport = {
reply<RequestData, ResponseData>(
subject: string,
callback: (obj: {
data: RequestData
traceId: unknown
}) => ResponseData | Promise<ResponseData>
) {
if (!isConnected) {
throw new Error("NATS is not connected")
}
const sub = nc.subscribe("x" + subject, { queue: durableName })
;(async () => {
for await (const m of sub) {
const obj = decode<RequestData>(m.data)
logger.info({
msg: "nats transport request recieved",
traceId: obj.traceId,
event: {
name: subject,
data: obj.data,
},
})
const result = await callback(obj)
const encodedData = encode<ResponseData>({
data: result,
traceId: obj.traceId,
})
if (m.respond(encodedData)) {
logger.info({
msg: "nats transport reply",
traceId: obj.traceId,
event: {
name: subject,
},
})
}
}
})()
},
async request<RequestData, ResponseData>(
subject: string,
data: RequestData
) {
if (!isConnected) {
throw new Error("NATS is not connected")
}
try {
const timeout = process.env.NODE_ENV === "test" ? 30 * 1000 : 5 * 1000
const traceId = rTracer.id() || randomUUID()
const response = await nc.request(
"x" + subject,
encode<RequestData>({ data, traceId }),
{ timeout }
)
return decode<ResponseData>(response.data)
} catch (error) {
logger.error({
msg: `nats request failed`,
event: {
name: subject,
data: data,
},
})
throw error
}
},
}
return {
connect,
bus,
transport,
drain,
disconnect,
}
} import natsTransport from "../services/nats"
import registerEventSubsription from "../nats-events"
export default async function loaders() {
let retries = 1
while (retries <= 5) {
try {
await sleep(5000)
await natsTransport.connect().then(() => {
registerEventSubsription()
})
break
} catch (error) {
retries++
if (retries >= 5) {
throw error
}
}
}
} import natsTransport from "../services/nats"
export default function registerEventSubsription() {
natsTransport.bus.subscribe(
"account.user:created",
({ obj, m }) => {
m.ack()
}
)
} |
This seems similar |
If I remove the queue then it works. |
I wrote this https://github.com/nowandme/ionats basic client thing to test if there is an issue with nats or the js lib. |
The issue is you are using a queue Jetstream subscription with a new server and an old client... Please update your client. |
oh! 😛 |
JetStream subscriptions are not triggered when used inside k8s
it works when I run it with docker-compose or locally but not when used inside k8s
Not sure if this issue should be opened on the NATS Server itself.
I checked with
https://github.com/nats-io/nats-box
I am able to subscribe and see messagesThe text was updated successfully, but these errors were encountered: