Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Feature/implement websockets #356

Merged
merged 11 commits into from
Sep 25, 2022
Merged

Feature/implement websockets #356

merged 11 commits into from
Sep 25, 2022

Conversation

bvanelli
Copy link
Contributor

@bvanelli bvanelli commented Sep 20, 2022

Open points:

  • Implement and test websockets transport for nats.

    • Test old behaviour
    • Test ws
    • Test wss
  • Need to review the changes made to the core of nats

    • Created a ssl_context implementation as property of client
      def ssl_context(self) -> ssl.SSLContext:
    • Added an increase in _pending_data_size. To me this seems like a bug, where the flush would be skipped if the transport used previously (stream) didn't write bytes immediately
      self._pending_data_size += len(PING_PROTO)
    • Added transport instance inside client.
      if not self._transport:
  • Need to figure out what to do when transport needs replacement (i.e. when mixing both ws and nats connections on the server list)

  • Make tests run

  • Optional: include aiohttp as a optional dependency, similar to nkeys

  • Optional: write more tests to test the transport

Here is the test client to test this functionality:

import asyncio
import nats


async def main():
    nc = await nats.connect("wss://demo.nats.io:8443")

    async def message_handler(msg):
        subject = msg.subject
        reply = msg.reply
        data = msg.data.decode()
        print("Received a message on '{subject} {reply}': {data}".format(
            subject=subject, reply=reply, data=data))

    # Simple publisher and async subscriber via coroutine.
    sub = await nc.subscribe("foo", cb=message_handler)

    await nc.publish("foo", b'Hello')
    await nc.publish("foo", b'World')
    await nc.publish("foo", b'!!!!!')

    await asyncio.sleep(5)  # wait for messages to arrive

    await sub.unsubscribe()

    # Terminate connection to NATS.
    await nc.drain()
    await nc.close()


if __name__ == '__main__':
    asyncio.run(main())

Closes #353
Closes #270

@wallyqs
Copy link
Member

wallyqs commented Sep 20, 2022

Just gave it a try, awesome work!

python3 examples/basic.py -s wss://demo.nats.io:8443
Received a message on 'foo ': Hello
Received a message on 'foo ': World
Received a message on 'bar ': First
Received a message on 'bar ': Second
Received a message on 'help _INBOX.GQhnv0uOPDvMfLg1OziGsD.GQhnv0uOPDvMfLg1OziGxXe814': help me

@wallyqs
Copy link
Member

wallyqs commented Sep 20, 2022

it looks like needs to add this to .travis.yml for the tests to run in CI:

install:
  - pip install -e .[nkeys]
  - pip install aiohttp

@bvanelli
Copy link
Contributor Author

One of the jobs failed https://app.travis-ci.com/github/nats-io/nats.py/jobs/583496905 because of google/yapf#936. I included a patch for the CI as a workaround to have tests passing. Just need to figure out when transport needs replacement and it looks in a good state to me.

@bvanelli
Copy link
Contributor Author

bvanelli commented Sep 21, 2022

I wanted to do something like:

                # figure out required transport for uri
                if s.uri.scheme in ("ws", "wss"):
                    transport = WebSocketTransport()
                else:
                    # use TcpTransport as a fallback
                    transport = TcpTransport()

                if not (type(transport) != type(self._transport)):  # noqa: type comparison is safe here
                    # replace the current transport as it changed
                    self._transport = transport

in

if not self._transport:
but it seems dirty. Any better alternative?

@wallyqs
Copy link
Member

wallyqs commented Sep 21, 2022

Looking at the Go client, when passing the options on connect, it should be checked whether it is being attempted to mix websockets and nats schemes and thrown an error:

nats: error: mixing of websocket and non websocket URLs is not allowed

https://github.com/nats-io/nats.go/blob/715a5917c806d2858f6fc80663fbf0910ad58ee3/nats.go#L1589-L1597

@bvanelli
Copy link
Contributor Author

Looking at the Go client, when passing the options on connect, it should be checked whether it is being attempted to mix websockets and nats schemes and thrown an error:

nats: error: mixing of websocket and non websocket URLs is not allowed

https://github.com/nats-io/nats.go/blob/715a5917c806d2858f6fc80663fbf0910ad58ee3/nats.go#L1589-L1597

Sounds fair, I added a check for that on connect: 7ac8e8e

Copy link
Member

@wallyqs wallyqs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thanks for this excellent contribution! 🎉

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

how to connect with websockets? [question] Will nats.py support websockets
2 participants