-
Notifications
You must be signed in to change notification settings - Fork 61
/
Copy pathnng.py
1610 lines (1265 loc) · 56 KB
/
nng.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Provides a Pythonic interface to cffi nng bindings
"""
import logging
import threading
import atexit
import pynng
from ._nng import ffi, lib
from .exceptions import check_err
from . import options
from . import _aio
logger = logging.getLogger(__name__)
__all__ = """
ffi
Bus0
Pair0
Pair1
Pull0 Push0
Pub0 Sub0
Req0 Rep0
Socket
Surveyor0 Respondent0
""".split()
# Register an atexit handler to call the nng_fini() cleanup function.
# This is necessary to ensure:
# * The Python interpreter doesn't finalize and kill the reap thread
# during a callback to _nng_pipe_cb
# * Cleanup background queue threads used by NNG
def _pynng_atexit():
lib.nng_fini()
atexit.register(_pynng_atexit)
def _ensure_can_send(thing):
"""
It's easy to accidentally pass in a str instead of bytes when send()ing.
This gives a more informative message if a ``str`` was accidentally passed
to a send method.
"""
# at some point it might be nice to check for the specific types we **can**
# send...
if isinstance(thing, str):
raise ValueError(
"Cannot send type str. " 'Maybe you left out a ".encode()" somewhere?'
)
def to_char(charlike, add_null_term=False):
"""Convert str or bytes to char*."""
# fast path for stuff that doesn't need to be changed.
if isinstance(charlike, ffi.CData):
return charlike
if isinstance(charlike, str):
charlike = charlike.encode()
if add_null_term:
charlike = charlike + b"\x00"
charlike = ffi.new("char[]", charlike)
return charlike
class _NNGOption:
"""A descriptor for more easily getting/setting NNG option."""
# this class should not be instantiated directly! Instantiation will work,
# but getting/setting will fail.
# subclasses set _getter and _setter to the module-level getter and setter
# functions
_getter = None
_setter = None
def __init__(self, option_name):
self.option = to_char(option_name)
def __get__(self, instance, owner):
# have to look up the getter on the class
if self._getter is None:
raise TypeError("{} cannot be set".format(self.__class__))
return self.__class__._getter(instance, self.option)
def __set__(self, instance, value):
if self._setter is None:
raise TypeError("{} is readonly".format(self.__class__))
self.__class__._setter(instance, self.option, value)
class IntOption(_NNGOption):
"""Descriptor for getting/setting integer options"""
_getter = options._getopt_int
_setter = options._setopt_int
class MsOption(_NNGOption):
"""Descriptor for getting/setting durations (in milliseconds)"""
_getter = options._getopt_ms
_setter = options._setopt_ms
class SockAddrOption(_NNGOption):
"""Descriptor for getting/setting durations (in milliseconds)"""
_getter = options._getopt_sockaddr
class SizeOption(_NNGOption):
"""Descriptor for getting/setting size_t options"""
_getter = options._getopt_size
_setter = options._setopt_size
class StringOption(_NNGOption):
"""Descriptor for getting/setting string options"""
_getter = options._getopt_string
_setter = options._setopt_string
class BooleanOption(_NNGOption):
"""Descriptor for getting/setting boolean values"""
_getter = options._getopt_bool
_setter = options._setopt_bool
class PointerOption(_NNGOption):
"""Descriptor for setting pointer values"""
_setter = options._setopt_ptr
class NotImplementedOption(_NNGOption):
"""Represents a currently un-implemented option in Python."""
def __init__(self, option_name, errmsg):
super().__init__(option_name)
self.errmsg = errmsg
def __get__(self, instance, owner):
raise NotImplementedError(self.errmsg)
def __set__(self, instance, value):
raise NotImplementedError(self.errmsg)
class Socket:
"""
Open a socket with one of the scalability protocols. This should not be
instantiated directly; instead, one of its subclasses should be used.
There is one subclass per protocol. The available protocols are:
* :class:`Pair0`
* :class:`Pair1`
* :class:`Req0` / :class:`Rep0`
* :class:`Pub0` / :class:`Sub0`
* :class:`Push0` / :class:`Pull0`
* :class:`Surveyor0` / :class:`Respondent0`
* :class:`Bus0`
The socket initializer receives no positional arguments. It accepts the
following keyword arguments, with the same meaning as the :ref:`attributes
<socket-attributes>` described below: ``recv_timeout``, ``send_timeout``,
``recv_buffer_size``, ``send_buffer_size``, ``reconnect_time_min``,
``reconnect_time_max``, and ``name``
To talk to another socket, you have to either :meth:`~Socket.dial`
its address, or :meth:`~Socket.listen` for connections. Then you can
:meth:`~Socket.send` to send data to the remote sockets or
:meth:`~Socket.recv` to receive data from the remote sockets.
Asynchronous versions are available as well, as :meth:`~Socket.asend`
and :meth:`~Socket.arecv`. The supported event loops are :mod:`asyncio`
and `Trio`_. You must ensure that you :meth:`~Socket.close` the socket
when you are finished with it. Sockets can also be used as a context
manager; this is the preferred way to use them when possible.
.. _socket-attributes:
Sockets have the following attributes. Generally, you should set these
attributes before :meth:`~Socket.listen`-ing or
:meth:`~Socket.dial`-ing, or by passing them in as keyword arguments
when creating the :class:`Socket`:
* **recv_timeout** (int): Receive timeout, in ms. If a socket takes longer
than the specified time, raises a ``pynng.exceptions.Timeout``.
Corresponds to library option ``NNG_OPT_RECVTIMEO``
* **send_timeout** (int): Send timeout, in ms. If the message cannot
be queued in the specified time, raises a pynng.exceptions.Timeout.
Corresponds to library option ``NNG_OPT_SENDTIMEO``.
* **recv_max_size** (int): The largest size of a message to receive.
Messages larger than this size will be silently dropped. A size of 0
indicates unlimited size. The default size is 1 MB.
* **recv_buffer_size** (int): The number of messages that the socket
will buffer on receive. Corresponds to ``NNG_OPT_RECVBUF``.
* **send_buffer_size** (int): The number of messages that the socket
will buffer on send. Corresponds to ``NNG_OPT_SENDBUF``.
* **name** (str): The socket name. Corresponds to
``NNG_OPT_SOCKNAME``. This is useful for debugging purposes.
* **raw** (bool): A boolean, indicating whether the socket is raw or cooked.
Returns ``True`` if the socket is raw, else ``False``. This property
is read-only. Corresponds to library option ``NNG_OPT_RAW``. For
more information see `nng's documentation.
<https://nanomsg.github.io/nng/man/v1.0.1/nng.7.html#raw_mode>`_
Note that currently, pynng does not support ``raw`` mode sockets, but
we intend to `in the future
<https://github.com/codypiersall/pynng/issues/35>`_:
* **protocol** (int): Read-only option which returns the 16-bit number
of the socket's protocol.
* **protocol_name** (str): Read-only option which returns the name of the
socket's protocol.
* **peer** (int): Returns the peer protocol id for the socket.
* **local_address**: The :class:`~pynng.sockaddr.SockAddr` representing
the local address. Corresponds to ``NNG_OPT_LOCADDR``.
* **reconnect_time_min** (int): The minimum time to wait before
attempting reconnects, in ms. Corresponds to ``NNG_OPT_RECONNMINT``.
This can also be overridden on the dialers.
* **reconnect_time_max** (int): The maximum time to wait before
attempting reconnects, in ms. Corresponds to ``NNG_OPT_RECONNMAXT``.
If this is non-zero, then the time between successive connection
attempts will start at the value of ``reconnect_time_min``, and grow
exponentially, until it reaches this value. This option can be set
on the socket, or on the dialers associated with the socket.
* **recv_fd** (int): The receive file descriptor associated with the
socket. This is suitable to be passed into poll functions like
:func:`select.poll` or :func:`select.select`. That is the only thing
this file descriptor is good for; do not attempt to read from or
write to it. The file descriptor will be marked as **readable**
whenever it can receive data without blocking. Corresponds to
``NNG_OPT_RECVFD``.
* **send_fd** (int): The sending file descriptor associated with the
socket. This is suitable to be passed into poll functions like
:func:`select.poll` or :func:`select.select`. That is the only thing
this file descriptor is good for; do not attempt to read from or
write to it. The file descriptor will be marked as **readable**
whenever it can send data without blocking. Corresponds to
``NNG_OPT_SENDFD``.
.. Note::
When used in :func:`select.poll` or :func:`select.select`,
``recv_fd`` and ``send_fd`` are both marked as **readable** when
they can receive or send data without blocking. So the upshot is
that for :func:`select.select` they should be passed in as the
*rlist* and for :meth:`select.poll.register` the *eventmask*
should be ``POLLIN``.
* **tls_config** (:class:`~pynng.TLSConfig`): The TLS configuration for
this socket. This option is only valid if the socket is using the
TLS transport. See :class:`~pynng.TLSConfig` for information about
the TLS configuration. Corresponds to ``NNG_OPT_TLS_CONFIG``. This
option is write-only.
.. _Trio: https://trio.readthedocs.io
"""
# TODO: Do we need to document ttl_max? We're not supporting nng_device
# yet, so I guess not?
# the following options correspond to nng options documented at
# https://nanomsg.github.io/nng/man/v1.0.1/nng_options.5.html
name = StringOption("socket-name")
raw = BooleanOption("raw")
protocol = IntOption("protocol")
protocol_name = StringOption("protocol-name")
peer = IntOption("peer")
peer_name = StringOption("peer-name")
recv_buffer_size = IntOption("recv-buffer")
send_buffer_size = IntOption("send-buffer")
recv_timeout = MsOption("recv-timeout")
send_timeout = MsOption("send-timeout")
ttl_max = IntOption("ttl-max")
recv_max_size = SizeOption("recv-size-max")
reconnect_time_min = MsOption("reconnect-time-min")
reconnect_time_max = MsOption("reconnect-time-max")
recv_fd = IntOption("recv-fd")
send_fd = IntOption("send-fd")
tcp_nodelay = BooleanOption("tcp-nodelay")
tcp_keepalive = BooleanOption("tcp-keepalive")
tls_config = PointerOption("tls-config")
def __init__(
self,
*,
dial=None,
listen=None,
recv_timeout=None,
send_timeout=None,
recv_buffer_size=None,
send_buffer_size=None,
recv_max_size=None,
reconnect_time_min=None,
reconnect_time_max=None,
opener=None,
block_on_dial=None,
name=None,
tls_config=None,
async_backend=None,
):
# mapping of id: Python objects
self._dialers = {}
self._listeners = {}
self._pipes = {}
self._on_pre_pipe_add = []
self._on_post_pipe_add = []
self._on_post_pipe_remove = []
self._pipe_notify_lock = threading.Lock()
self._async_backend = async_backend
self._socket = ffi.new(
"nng_socket *",
)
if opener is not None:
self._opener = opener
if opener is None and not hasattr(self, "_opener"):
raise TypeError("Cannot directly instantiate a Socket. Try a subclass.")
check_err(self._opener(self._socket))
if tls_config is not None:
self.tls_config = tls_config
if recv_timeout is not None:
self.recv_timeout = recv_timeout
if send_timeout is not None:
self.send_timeout = send_timeout
if recv_max_size is not None:
self.recv_max_size = recv_max_size
if reconnect_time_min is not None:
self.reconnect_time_min = reconnect_time_min
if reconnect_time_max is not None:
self.reconnect_time_max = reconnect_time_max
if recv_buffer_size is not None:
self.recv_buffer_size = recv_buffer_size
if send_buffer_size is not None:
self.send_buffer_size = send_buffer_size
if name is not None:
self.name = name
# set up pipe callbacks. This **must** be called before listen/dial to
# avoid race conditions.
handle = ffi.new_handle(self)
self._handle = handle
for event in (
lib.NNG_PIPE_EV_ADD_PRE,
lib.NNG_PIPE_EV_ADD_POST,
lib.NNG_PIPE_EV_REM_POST,
):
check_err(lib.nng_pipe_notify(self.socket, event, lib._nng_pipe_cb, handle))
if listen is not None:
self.listen(listen)
if dial is not None:
self.dial(dial, block=block_on_dial)
def dial(self, address, *, block=None):
"""Dial the specified address.
Args:
address: The address to dial.
block: Whether to block or not. There are three possible values
this can take:
1. If ``True``, a blocking dial is attempted. If it fails for
any reason, the dial fails and an exception is raised.
2. If ``False``, a non-blocking dial is started. The dial is
retried periodically in the background until it is
successful.
3. (**Default behavior**): If ``None``, a blocking dial is
first attempted. If it fails an exception is logged (using
the Python logging module), then a non-blocking dial is
done.
"""
if block:
return self._dial(address, flags=0)
elif block is None:
try:
return self.dial(address, block=True)
except pynng.ConnectionRefused:
msg = "Synchronous dial failed; attempting asynchronous now"
logger.exception(msg)
return self.dial(address, block=False)
else:
return self._dial(address, flags=lib.NNG_FLAG_NONBLOCK)
def _dial(self, address, flags=0):
"""Dial specified ``address``
``flags`` usually do not need to be given.
"""
dialer = ffi.new("nng_dialer *")
ret = lib.nng_dial(self.socket, to_char(address), dialer, flags)
check_err(ret)
# we can only get here if check_err doesn't raise
d_id = lib.nng_dialer_id(dialer[0])
py_dialer = Dialer(dialer, self)
self._dialers[d_id] = py_dialer
return py_dialer
def listen(self, address, flags=0):
"""Listen at specified address.
``listener`` and ``flags`` usually do not need to be given.
"""
listener = ffi.new("nng_listener *")
ret = lib.nng_listen(self.socket, to_char(address), listener, flags)
check_err(ret)
# we can only get here if check_err doesn't raise
l_id = lib.nng_listener_id(listener[0])
py_listener = Listener(listener, self)
self._listeners[l_id] = py_listener
return py_listener
def close(self):
"""Close the socket, freeing all system resources."""
# if a TypeError occurs (e.g. a bad keyword to __init__) we don't have
# the attribute _socket yet. This prevents spewing extra exceptions
if hasattr(self, "_socket"):
lib.nng_close(self.socket)
# cleanup the list of listeners/dialers. A program would be likely to
# segfault if a user accessed the listeners or dialers after this
# point.
self._listeners = {}
self._dialers = {}
def __del__(self):
self.close()
@property
def socket(self):
return self._socket[0]
def recv(self, block=True):
"""Receive data on the socket. If the request times out the exception
:class:`pynng.Timeout` is raised. If the socket cannot perform that
operation (e.g., a :class:`Pub0`, which can only
:meth:`~Socket.send`), the exception :class:`pynng.NotSupported`
is raised.
Args:
block: If block is True (the default), the function will not return
until the operation is completed or times out. If block is False,
the function will return data immediately. If no data is ready on
the socket, the function will raise ``pynng.TryAgain``.
"""
# TODO: someday we should support some kind of recv_into() operation
# where the user provides the data buffer.
flags = lib.NNG_FLAG_ALLOC
if not block:
flags |= lib.NNG_FLAG_NONBLOCK
data = ffi.new("char **")
size_t = ffi.new("size_t *")
ret = lib.nng_recv(self.socket, data, size_t, flags)
check_err(ret)
recvd = ffi.unpack(data[0], size_t[0])
lib.nng_free(data[0], size_t[0])
return recvd
def send(self, data, block=True):
"""Sends ``data`` on socket.
Args:
data: either ``bytes`` or ``bytearray``
block: If block is True (the default), the function will
not return until the operation is completed or times out.
If block is False, the function will raise ``pynng.TryAgain``
immediately if no data was sent.
"""
_ensure_can_send(data)
flags = 0
if not block:
flags |= lib.NNG_FLAG_NONBLOCK
err = lib.nng_send(self.socket, data, len(data), flags)
check_err(err)
async def arecv(self):
"""The asynchronous version of :meth:`~Socket.recv`"""
with _aio.AIOHelper(self, self._async_backend) as aio:
return await aio.arecv()
async def asend(self, data):
"""Asynchronous version of :meth:`~Socket.send`."""
_ensure_can_send(data)
with _aio.AIOHelper(self, self._async_backend) as aio:
return await aio.asend(data)
def __enter__(self):
return self
def __exit__(self, *tb_info):
self.close()
@property
def dialers(self):
"""A list of the active dialers"""
return tuple(self._dialers.values())
@property
def listeners(self):
"""A list of the active listeners"""
return tuple(self._listeners.values())
@property
def pipes(self):
"""A list of the active pipes"""
return tuple(self._pipes.values())
def _add_pipe(self, lib_pipe):
# this is only called inside the pipe callback.
pipe_id = lib.nng_pipe_id(lib_pipe)
# If the pipe already exists in the Socket, don't create a new one
if pipe_id not in self._pipes:
pipe = Pipe(lib_pipe, self)
self._pipes[pipe_id] = pipe
return self._pipes[pipe_id]
def _remove_pipe(self, lib_pipe):
pipe_id = lib.nng_pipe_id(lib_pipe)
del self._pipes[pipe_id]
def new_context(self):
"""Return a new :class:`Context` for this socket."""
return Context(self)
def add_pre_pipe_connect_cb(self, callback):
"""
Add a callback which will be called before a Pipe is connected to a
Socket. You can add as many callbacks as you want, and they will be
called in the order they were added.
The callback provided must accept a single argument: a Pipe. The
socket associated with the pipe can be accessed through the pipe's
``socket`` attribute. If the pipe is closed, the callbacks for
post_pipe_connect and post_pipe_remove will not be called.
"""
self._on_pre_pipe_add.append(callback)
def add_post_pipe_connect_cb(self, callback):
"""
Add a callback which will be called after a Pipe is connected to a
Socket. You can add as many callbacks as you want, and they will be
called in the order they were added.
The callback provided must accept a single argument: a :class:`Pipe`.
"""
self._on_post_pipe_add.append(callback)
def add_post_pipe_remove_cb(self, callback):
"""
Add a callback which will be called after a Pipe is removed from a
Socket. You can add as many callbacks as you want, and they will be
called in the order they were added.
The callback provided must accept a single argument: a :class:`Pipe`.
"""
self._on_post_pipe_remove.append(callback)
def remove_pre_pipe_connect_cb(self, callback):
"""Remove ``callback`` from the list of callbacks for pre pipe connect
events
"""
self._on_pre_pipe_add.remove(callback)
def remove_post_pipe_connect_cb(self, callback):
"""Remove ``callback`` from the list of callbacks for post pipe connect
events
"""
self._on_post_pipe_add.remove(callback)
def remove_post_pipe_remove_cb(self, callback):
"""Remove ``callback`` from the list of callbacks for post pipe remove
events
"""
self._on_post_pipe_remove.remove(callback)
def _try_associate_msg_with_pipe(self, msg):
"""Looks up the nng_msg associated with the ``msg`` and attempts to
set it on the Message ``msg``
"""
# Wrap pipe handling inside the notify lock since we can create
# a new Pipe and associate it with the Socket if the callbacks
# haven't been called yet. This will ensure there's no race
# condition with the pipe callbacks.
with self._pipe_notify_lock:
lib_pipe = lib.nng_msg_get_pipe(msg._nng_msg)
pipe_id = lib.nng_pipe_id(lib_pipe)
try:
msg.pipe = self._pipes[pipe_id]
except KeyError:
# A message may have been received before the pipe callback was called.
# Create a new Pipe and associate it with the Socket.
# When the callback is called, it will detect that the pipe was already.
# if pipe_id < 0, that *probably* means we hit a race where the
# associated pipe was closed.
if pipe_id >= 0:
# Add the pipe to the socket
msg.pipe = self._add_pipe(lib_pipe)
def recv_msg(self, block=True):
"""Receive a :class:`Message` on the socket."""
flags = 0
if not block:
flags |= lib.NNG_FLAG_NONBLOCK
msg_p = ffi.new("nng_msg **")
check_err(lib.nng_recvmsg(self.socket, msg_p, flags))
msg = msg_p[0]
msg = Message(msg)
self._try_associate_msg_with_pipe(msg)
return msg
def send_msg(self, msg, block=True):
"""Send the :class:`Message` ``msg`` on the socket.
.. Note::
It's may be more convenient to call :meth:`Pipe.send` than this
method.
"""
flags = 0
if not block:
flags |= lib.NNG_FLAG_NONBLOCK
with msg._mem_freed_lock:
msg._ensure_can_send()
check_err(lib.nng_sendmsg(self.socket, msg._nng_msg, flags))
msg._mem_freed = True
async def asend_msg(self, msg):
"""
Asynchronously send the :class:`Message` ``msg`` on the socket.
"""
with msg._mem_freed_lock:
msg._ensure_can_send()
with _aio.AIOHelper(self, self._async_backend) as aio:
# Note: the aio helper sets the _mem_freed flag on the msg
return await aio.asend_msg(msg)
async def arecv_msg(self):
"""
Asynchronously receive the :class:`Message` ``msg`` on the socket.
"""
with _aio.AIOHelper(self, self._async_backend) as aio:
msg = await aio.arecv_msg()
self._try_associate_msg_with_pipe(msg)
return msg
class Bus0(Socket):
"""A bus0 socket. The Python version of `nng_bus
<https://nanomsg.github.io/nng/man/tip/nng_bus.7>`_.
It accepts the same keyword arguments as :class:`Socket` and also has the
same :ref:`attributes <socket-attributes>`.
A :class:`Bus0` socket sends a message to all directly connected peers.
This enables creating mesh networks. Note that messages are only sent to
*directly* connected peers. You must explicitly connect all nodes with the
:meth:`~Socket.listen` and corresponding :meth:`~Socket.listen` calls.
Here is a demonstration of using the bus protocol:
.. literalinclude:: snippets/bus0_sync.py
:language: python3
"""
_opener = lib.nng_bus0_open
class Pair0(Socket):
"""A socket for bidrectional, one-to-one communication, with a single
partner. The Python version of `nng_pair0
<https://nanomsg.github.io/nng/man/tip/nng_pair.7>`_.
This is the most basic type of socket.
It accepts the same keyword arguments as :class:`Socket` and also has the
same :ref:`attributes <socket-attributes>`.
This demonstrates the synchronous API:
.. literalinclude:: snippets/pair0_sync.py
:language: python3
This demonstrates the asynchronous API using `Trio`_. Remember that
:mod:`asyncio` is also supported.
.. literalinclude:: snippets/pair0_async.py
:language: python3
"""
_opener = lib.nng_pair0_open
class Pair1(Socket):
"""A socket for bidrectional communication with potentially many peers.
The Python version of `nng_pair1
<https://nanomsg.github.io/nng/man/tip/nng_pair.7>`_.
It accepts the same keyword arguments as :class:`Socket` and also has the
same :ref:`attributes <socket-attributes>`. It also has one extra
keyword-only argument, ``polyamorous``, which must be set to ``True`` to
connect with more than one peer.
.. Warning::
If you want to connect to multiple peers you **must** pass
``polyamorous=True`` when you create your socket. ``polyamorous`` is a
read-only attribute of the socket and cannot be changed after creation.
.. Warning::
Pair1 was an experimental feature in nng, and is currently deprecated.
It will likely be removed in the future; see `nng's docs
<https://nng.nanomsg.org/man/v1.3.2/nng_pair_open.3.html>`_ for
details.
To get the benefits of polyamory, you need to use the methods that work
with :class:`Message` objects: :meth:`Socket.recv_msg` and
:meth:`Socket.arecv_msg` for receiving, and :meth:`Pipe.send`
and :meth:`Pipe.asend` for sending.
Here is an example of the synchronous API, where a single listener connects
to multiple peers. This is more complex than the :class:`Pair0` case,
because it requires to use the :class:`Pipe` and :class:`Message`
interfaces.
.. literalinclude:: snippets/pair1_sync.py
And here is an example using the async API, using `Trio`_.
.. literalinclude:: snippets/pair1_async.py
"""
def __init__(self, *, polyamorous=False, **kwargs):
# make sure we don't listen/dial before setting polyamorous, so we pop
# them out of kwargs, then do the dial/listen below.
# It's not beautiful, but it will work.
dial_addr = kwargs.pop("dial", None)
listen_addr = kwargs.pop("dial", None)
super().__init__(**kwargs)
if polyamorous:
self._opener = lib.nng_pair1_open_poly
else:
self._opener = lib.nng_pair1_open
# now we can do the listen/dial
if dial_addr is not None:
self.dial(dial_addr, block=kwargs.get("block_on_dial"))
if listen_addr is not None:
self.listen(listen_addr)
_opener = lib.nng_pair1_open_poly
polyamorous = BooleanOption("pair1:polyamorous")
class Push0(Socket):
"""A push0 socket.
The Python version of `nng_push
<https://nanomsg.github.io/nng/man/tip/nng_push.7>`_.
It accepts the same keyword arguments as :class:`Socket` and also
has the same :ref:`attributes <socket-attributes>`.
A :class:`Push0` socket is the pushing end of a data pipeline. Data sent
from a push socket will be sent to a *single* connected :class:`Pull0`
socket. This can be useful for distributing work to multiple nodes, for
example. Attempting to call :meth:`~Socket.recv()` on a Push0 socket
will raise a :class:`pynng.NotSupported` exception.
Here is an example of two :class:`Pull0` sockets connected to a
:class:`Push0` socket.
.. literalinclude:: snippets/pushpull_sync.py
"""
_opener = lib.nng_push0_open
class Pull0(Socket):
"""A pull0 socket.
The Python version of `nng_pull
<https://nanomsg.github.io/nng/man/tip/nng_pull.7>`_.
It accepts the same keyword arguments as :class:`Socket` and also
has the same :ref:`attributes <socket-attributes>`.
A :class:`Pull0` is the receiving end of a data pipeline. It needs to be
paired with a :class:`Push0` socket.
Attempting to :meth:`~Socket.send()`
with a Pull0 socket will raise a :class:`pynng.NotSupported` exception.
See :class:`Push0` for an example of push/pull in action.
"""
_opener = lib.nng_pull0_open
class Pub0(Socket):
"""A pub0 socket.
The Python version of `nng_pub
<https://nanomsg.github.io/nng/man/tip/nng_pub.7>`_.
It accepts the same keyword arguments as :class:`Socket` and also has the
same :ref:`attributes <socket-attributes>`. A :class:`Pub0` socket calls
:meth:`~Socket.send`, the data is published to all connected
:class:`subscribers <Sub0>`.
Attempting to :meth:`~Socket.recv` with a Pub0 socket will raise a
:class:`pynng.NotSupported` exception.
See docs for :class:`Sub0` for an example.
"""
_opener = lib.nng_pub0_open
class Sub0(Socket):
"""A sub0 socket.
The Python version of `nng_sub
<https://nanomsg.github.io/nng/man/tip/nng_sub.7>`_.
It accepts the same keyword arguments as :class:`Socket` and also
has the same :ref:`attributes <socket-attributes>`. It also has one
additional keyword argument: ``topics``. If ``topics`` is given, it must
be either a :class:`str`, :class:`bytes`, or an iterable of str and bytes.
A subscriber must :meth:`~Sub0.subscribe` to specific topics, and only
messages that match the topic will be received. A subscriber can subscribe
to as many topics as you want it to.
A match is determined if the message starts with one of the subscribed
topics. So if the subscribing socket is subscribed to the topic
``b'hel'``, then the messages ``b'hel'``, ``b'help him`` and ``b'hello'``
would match, but the message ``b'hexagon'`` would not. Subscribing to an
empty string (``b''``) means that all messages will match. If a sub socket
is not subscribed to any topics, no messages will be receieved.
.. Note ::
pub/sub is a "best effort" transport; if you have a very high volume of
messages be prepared for some messages to be silently dropped.
Attempting to :meth:`~Socket.send` with a Sub0 socket will raise a
:class:`pynng.NotSupported` exception.
The following example demonstrates a basic usage of pub/sub:
.. literalinclude:: snippets/pubsub_sync.py
"""
_opener = lib.nng_sub0_open
def __init__(self, *, topics=None, **kwargs):
super().__init__(**kwargs)
if topics is None:
return
# special-case str/bytes
if isinstance(topics, (str, bytes)):
topics = [topics]
for topic in topics:
self.subscribe(topic)
def subscribe(self, topic):
"""Subscribe to the specified topic.
Topics are matched by looking at the first bytes of any received
message.
.. Note::
If you pass a :class:`str` as the ``topic``, it will be
automatically encoded with :meth:`str.encode`. If this is not the
desired behavior, just pass :class:`bytes` in as the topic.
"""
options._setopt_string_nonnull(self, b"sub:subscribe", topic)
def unsubscribe(self, topic):
"""Unsubscribe to the specified topic.
.. Note::
If you pass a :class:`str` as the ``topic``, it will be
automatically encoded with :meth:`str.encode`. If this is not the
desired behavior, just pass :class:`bytes` in as the topic.
"""
options._setopt_string_nonnull(self, b"sub:unsubscribe", topic)
class Req0(Socket):
"""A req0 socket.
The Python version of `nng_req
<https://nanomsg.github.io/nng/man/tip/nng_req.7>`_.
It accepts the same keyword arguments as :class:`Socket` and also
has the same :ref:`attributes <socket-attributes>`. It also has one extra
keyword-argument: ``resend_time``. ``resend_time`` corresponds to
``NNG_OPT_REQ_RESENDTIME``
A :class:`Req0` socket is paired with a :class:`Rep0` socket and together
they implement normal request/response behavior. the req socket
:meth:`send()s <Socket.send>` a request, the rep socket :meth:`recv()s
<Socket.recv>` it, the rep socket :meth:`send()s <Socket.Send>` a response,
and the req socket :meth:`recv()s <Socket.recv>` it.
If a req socket attempts to do a :meth:`~Socket.recv` without first doing a
:meth:`~Socket.send`, a :class:`pynng.BadState` exception is raised.
A :class:`Req0` socket supports opening multiple :class:`Contexts
<Context>` by calling :meth:`~Socket.new_context`. In this way a req
socket can have multiple outstanding requests to a single rep socket.
Without opening a :class:`Context`, the socket can only have a single
outstanding request at a time.
Here is an example demonstrating the request/response pattern.
.. literalinclude:: snippets/reqrep_sync.py
"""
resend_time = MsOption("req:resend-time")
_opener = lib.nng_req0_open
def __init__(self, *, resend_time=None, **kwargs):
super().__init__(**kwargs)
if resend_time is not None:
self.resend_time = resend_time
class Rep0(Socket):
"""A rep0 socket.
The Python version of `nng_rep
<https://nanomsg.github.io/nng/man/tip/nng_rep.7>`_.
It accepts the same keyword arguments as :class:`Socket` and also
has the same :ref:`attributes <socket-attributes>`.
A :class:`Rep0` socket along with a :class:`Req0` socket implement the
request/response pattern:
the req socket :meth:`send()s <Socket.send>` a
request, the rep socket :meth:`recv()s <Socket.recv>` it, the rep socket
:meth:`send()s <Socket.Send>` a response, and the req socket :meth:`recv()s
<Socket.recv>` it.
A :class:`Rep0` socket supports opening multiple :class:`Contexts
<Context>` by calling :meth:`~Socket.new_context`. In this way a rep
socket can service multiple requests at the same time. Without opening a
:class:`Context`, the rep socket can only service a single request at a
time.
See the documentation for :class:`Req0` for an example.
"""
_opener = lib.nng_rep0_open
class Surveyor0(Socket):
"""A surveyor0 socket.
The Python version of `nng_surveyor
<https://nanomsg.github.io/nng/man/tip/nng_surveyor.7>`_.
It accepts the same keyword arguments as :class:`Socket` and also