Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Pub/Sub not working in compat mode when using "inproc" transport #570

Open
arieb opened this issue Jun 7, 2023 · 0 comments
Open

Pub/Sub not working in compat mode when using "inproc" transport #570

arieb opened this issue Jun 7, 2023 · 0 comments
Labels

Comments

@arieb
Copy link

arieb commented Jun 7, 2023

Describe the bug
When trying to write a very basic worker_thread pub sub server/client flow using the "inproc" transport the client subscriber fails to receive the messages.

Reproducing

const { Worker, isMainThread, parentPort } = require('node:worker_threads');
const process = require('process')
const zmq = require('zeromq/v5-compat')

function resolveOnSignals(...signals) {
  return new Promise((resolve) => {
    for (const sig of signals) {
      process.on(sig, () => resolve(sig))
    }
  })
}

// when using other transport types like ipc and tcp it works

const url = 'inproc://foo'
//const url = 'ipc://foo'
//const url = 'tcp://127.0.0.1:7301'

async function publisher(worker){
  console.log('Server Started!')
  const publisher = zmq.socket('pub')
  publisher.on('error', (err) => {
    console.error('error', err.message)
  })
  await new Promise((resolve, reject) => {
    publisher.bind(url, (err) => {
      if (err) {
        return reject(err)
      }
      resolve()
    })
  })

  let i = 0
  const interval = setInterval(() => {
    const msg = ['weasel', i]
    console.log('Sending msg', msg)
    publisher.send(msg)
    i++
  }, 1000)

  console.info('Waiting for os signal to stop us')
  const stopReason = await Promise.race([resolveOnSignals('SIGINT')])
  console.info('Asked to stop by %s', stopReason)

  clearInterval(interval)
  publisher.close()

  worker.terminate()

  return 0
}

async function subscriber(){
  console.log('Client Started!')

  const sub = zmq.socket('sub')
  sub.on('error', (err) => {
    console.error('error', err.message)
  })
  sub.on('message', (topic, msg) => {
    console.log('GOT MSG topic', topic, 'MSG=', msg)
  })

  sub.connect(url)
  sub.subscribe('')

  console.info('Waiting for os signal to stop us')
  const stopReason = await Promise.race([resolveOnSignals('SIGINT')])
  console.info('Asked to stop by %s', stopReason)

  sub.close()

  return 0
}

if (isMainThread) {
  const worker = new Worker(__filename)
  worker.unref()
  publisher(worker)
    .then((exitCode) => {
    })
    .catch((err) => {
      console.error(err.message)
      process.exit(1)
    })
} else {
  subscriber()
    .then((exitCode) => {
    })
    .catch((err) => {
      console.error(err.message)
      process.exit(1)
    })

}

Expected behavior
i expect the message to be printed in the client
and for the client not to crash when causing SIGINT in the process
Tested on

  • OS: MacOS 12.6 (Monterey)
  • ZeroMQ.js version: 6.0.0-beta.16
@arieb arieb added the bug label Jun 7, 2023
# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant