Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pub/Sub not working in compat mode when using "inproc" transport #570

Open
arieb opened this issue Jun 7, 2023 · 0 comments
Open

Pub/Sub not working in compat mode when using "inproc" transport #570

arieb opened this issue Jun 7, 2023 · 0 comments
Labels

Comments

@arieb
Copy link

arieb commented Jun 7, 2023

Describe the bug
When trying to write a very basic worker_thread pub sub server/client flow using the "inproc" transport the client subscriber fails to receive the messages.

Reproducing

const { Worker, isMainThread, parentPort } = require('node:worker_threads');
const process = require('process')
const zmq = require('zeromq/v5-compat')

function resolveOnSignals(...signals) {
  return new Promise((resolve) => {
    for (const sig of signals) {
      process.on(sig, () => resolve(sig))
    }
  })
}

// when using other transport types like ipc and tcp it works

const url = 'inproc://foo'
//const url = 'ipc://foo'
//const url = 'tcp://127.0.0.1:7301'

async function publisher(worker){
  console.log('Server Started!')
  const publisher = zmq.socket('pub')
  publisher.on('error', (err) => {
    console.error('error', err.message)
  })
  await new Promise((resolve, reject) => {
    publisher.bind(url, (err) => {
      if (err) {
        return reject(err)
      }
      resolve()
    })
  })

  let i = 0
  const interval = setInterval(() => {
    const msg = ['weasel', i]
    console.log('Sending msg', msg)
    publisher.send(msg)
    i++
  }, 1000)

  console.info('Waiting for os signal to stop us')
  const stopReason = await Promise.race([resolveOnSignals('SIGINT')])
  console.info('Asked to stop by %s', stopReason)

  clearInterval(interval)
  publisher.close()

  worker.terminate()

  return 0
}

async function subscriber(){
  console.log('Client Started!')

  const sub = zmq.socket('sub')
  sub.on('error', (err) => {
    console.error('error', err.message)
  })
  sub.on('message', (topic, msg) => {
    console.log('GOT MSG topic', topic, 'MSG=', msg)
  })

  sub.connect(url)
  sub.subscribe('')

  console.info('Waiting for os signal to stop us')
  const stopReason = await Promise.race([resolveOnSignals('SIGINT')])
  console.info('Asked to stop by %s', stopReason)

  sub.close()

  return 0
}

if (isMainThread) {
  const worker = new Worker(__filename)
  worker.unref()
  publisher(worker)
    .then((exitCode) => {
    })
    .catch((err) => {
      console.error(err.message)
      process.exit(1)
    })
} else {
  subscriber()
    .then((exitCode) => {
    })
    .catch((err) => {
      console.error(err.message)
      process.exit(1)
    })

}

Expected behavior
i expect the message to be printed in the client
and for the client not to crash when causing SIGINT in the process
Tested on

  • OS: MacOS 12.6 (Monterey)
  • ZeroMQ.js version: 6.0.0-beta.16
@arieb arieb added the bug label Jun 7, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant