From f064e86bf602ea566a4f6ece9dc8ad4e5dbd050b Mon Sep 17 00:00:00 2001 From: Matt Thompson Date: Fri, 17 Jan 2020 18:30:36 -0800 Subject: [PATCH 1/2] Added atexit handler to call nng_fini() to stop python from killing reaper thread. Use ffi.new_handle and ffi.from_handle for pipe callbacks, eliminating _live_sockets --- build_pynng.py | 2 +- nng_api.h | 212 +++++++++++++++++++++++++++++++++++++++++++++++-- pynng/nng.py | 37 +++++---- 3 files changed, 228 insertions(+), 23 deletions(-) diff --git a/build_pynng.py b/build_pynng.py index 972dc1b..cc89e20 100644 --- a/build_pynng.py +++ b/build_pynng.py @@ -66,7 +66,7 @@ // nng_pipe_notify callback: // https://nanomsg.github.io/nng/man/tip/nng_pipe_notify.3 - extern "Python" void _nng_pipe_cb(nng_pipe, int, void *); + extern "Python" void _nng_pipe_cb(nng_pipe, nng_pipe_ev, void *); """ ffibuilder.cdef(api + callbacks) diff --git a/nng_api.h b/nng_api.h index 6e54ebe..876ae5b 100644 --- a/nng_api.h +++ b/nng_api.h @@ -1,4 +1,4 @@ -// THIS FILE WAS AUTOMATICALLY GENERATED BY ./generate_api.sh +// THIS FILE WAS AUTOMATICALLY GENERATED BY generate_api.sh typedef struct nng_ctx_s { uint32_t id; } nng_ctx; @@ -91,15 +91,34 @@ extern int nng_getopt_ms(nng_socket, const char *, nng_duration *); extern int nng_getopt_size(nng_socket, const char *, size_t *); extern int nng_getopt_uint64(nng_socket, const char *, uint64_t *); extern int nng_getopt_ptr(nng_socket, const char *, void **); +extern int nng_getopt_string(nng_socket, const char *, char **); +extern int nng_socket_set(nng_socket, const char *, const void *, size_t); +extern int nng_socket_set_bool(nng_socket, const char *, bool); +extern int nng_socket_set_int(nng_socket, const char *, int); +extern int nng_socket_set_size(nng_socket, const char *, size_t); +extern int nng_socket_set_uint64(nng_socket, const char *, uint64_t); +extern int nng_socket_set_string(nng_socket, const char *, const char *); +extern int nng_socket_set_ptr(nng_socket, const char *, void *); +extern int nng_socket_set_ms(nng_socket, const char *, nng_duration); +extern int nng_socket_set_addr( + nng_socket, const char *, const nng_sockaddr *); +extern int nng_socket_get(nng_socket, const char *, void *, size_t *); +extern int nng_socket_get_bool(nng_socket, const char *, bool *); +extern int nng_socket_get_int(nng_socket, const char *, int *); +extern int nng_socket_get_size(nng_socket, const char *, size_t *); +extern int nng_socket_get_uint64(nng_socket, const char *, uint64_t *); +extern int nng_socket_get_string(nng_socket, const char *, char **); +extern int nng_socket_get_ptr(nng_socket, const char *, void **); +extern int nng_socket_get_ms(nng_socket, const char *, nng_duration *); +extern int nng_socket_get_addr(nng_socket, const char *, nng_sockaddr *); typedef enum { NNG_PIPE_EV_ADD_PRE, NNG_PIPE_EV_ADD_POST, NNG_PIPE_EV_REM_POST, NNG_PIPE_EV_NUM, } nng_pipe_ev; -typedef void (*nng_pipe_cb)(nng_pipe, int, void *); -extern int nng_pipe_notify(nng_socket, int, nng_pipe_cb, void *); -extern int nng_getopt_string(nng_socket, const char *, char **); +typedef void (*nng_pipe_cb)(nng_pipe, nng_pipe_ev, void *); +extern int nng_pipe_notify(nng_socket, nng_pipe_ev, nng_pipe_cb, void *); extern int nng_listen(nng_socket, const char *, nng_listener *, int); extern int nng_dial(nng_socket, const char *, nng_dialer *, int); extern int nng_dialer_create(nng_dialer *, nng_socket, const char *); @@ -128,6 +147,25 @@ extern int nng_dialer_getopt_sockaddr( extern int nng_dialer_getopt_uint64(nng_dialer, const char *, uint64_t *); extern int nng_dialer_getopt_ptr(nng_dialer, const char *, void **); extern int nng_dialer_getopt_string(nng_dialer, const char *, char **); +extern int nng_dialer_set(nng_dialer, const char *, const void *, size_t); +extern int nng_dialer_set_bool(nng_dialer, const char *, bool); +extern int nng_dialer_set_int(nng_dialer, const char *, int); +extern int nng_dialer_set_size(nng_dialer, const char *, size_t); +extern int nng_dialer_set_uint64(nng_dialer, const char *, uint64_t); +extern int nng_dialer_set_string(nng_dialer, const char *, const char *); +extern int nng_dialer_set_ptr(nng_dialer, const char *, void *); +extern int nng_dialer_set_ms(nng_dialer, const char *, nng_duration); +extern int nng_dialer_set_addr( + nng_dialer, const char *, const nng_sockaddr *); +extern int nng_dialer_get(nng_dialer, const char *, void *, size_t *); +extern int nng_dialer_get_bool(nng_dialer, const char *, bool *); +extern int nng_dialer_get_int(nng_dialer, const char *, int *); +extern int nng_dialer_get_size(nng_dialer, const char *, size_t *); +extern int nng_dialer_get_uint64(nng_dialer, const char *, uint64_t *); +extern int nng_dialer_get_string(nng_dialer, const char *, char **); +extern int nng_dialer_get_ptr(nng_dialer, const char *, void **); +extern int nng_dialer_get_ms(nng_dialer, const char *, nng_duration *); +extern int nng_dialer_get_addr(nng_dialer, const char *, nng_sockaddr *); extern int nng_listener_setopt( nng_listener, const char *, const void *, size_t); extern int nng_listener_setopt_bool(nng_listener, const char *, bool); @@ -150,6 +188,26 @@ extern int nng_listener_getopt_uint64( nng_listener, const char *, uint64_t *); extern int nng_listener_getopt_ptr(nng_listener, const char *, void **); extern int nng_listener_getopt_string(nng_listener, const char *, char **); +extern int nng_listener_set( + nng_listener, const char *, const void *, size_t); +extern int nng_listener_set_bool(nng_listener, const char *, bool); +extern int nng_listener_set_int(nng_listener, const char *, int); +extern int nng_listener_set_size(nng_listener, const char *, size_t); +extern int nng_listener_set_uint64(nng_listener, const char *, uint64_t); +extern int nng_listener_set_string(nng_listener, const char *, const char *); +extern int nng_listener_set_ptr(nng_listener, const char *, void *); +extern int nng_listener_set_ms(nng_listener, const char *, nng_duration); +extern int nng_listener_set_addr( + nng_listener, const char *, const nng_sockaddr *); +extern int nng_listener_get(nng_listener, const char *, void *, size_t *); +extern int nng_listener_get_bool(nng_listener, const char *, bool *); +extern int nng_listener_get_int(nng_listener, const char *, int *); +extern int nng_listener_get_size(nng_listener, const char *, size_t *); +extern int nng_listener_get_uint64(nng_listener, const char *, uint64_t *); +extern int nng_listener_get_string(nng_listener, const char *, char **); +extern int nng_listener_get_ptr(nng_listener, const char *, void **); +extern int nng_listener_get_ms(nng_listener, const char *, nng_duration *); +extern int nng_listener_get_addr(nng_listener, const char *, nng_sockaddr *); extern const char *nng_strerror(int); extern int nng_send(nng_socket, void *, size_t, int); extern int nng_recv(nng_socket, void *, size_t *, int); @@ -172,6 +230,24 @@ extern int nng_ctx_setopt_bool(nng_ctx, const char *, bool); extern int nng_ctx_setopt_int(nng_ctx, const char *, int); extern int nng_ctx_setopt_ms(nng_ctx, const char *, nng_duration); extern int nng_ctx_setopt_size(nng_ctx, const char *, size_t); +extern int nng_ctx_get(nng_ctx, const char *, void *, size_t *); +extern int nng_ctx_get_bool(nng_ctx, const char *, bool *); +extern int nng_ctx_get_int(nng_ctx, const char *, int *); +extern int nng_ctx_get_size(nng_ctx, const char *, size_t *); +extern int nng_ctx_get_uint64(nng_ctx, const char *, uint64_t *); +extern int nng_ctx_get_string(nng_ctx, const char *, char **); +extern int nng_ctx_get_ptr(nng_ctx, const char *, void **); +extern int nng_ctx_get_ms(nng_ctx, const char *, nng_duration *); +extern int nng_ctx_get_addr(nng_ctx, const char *, nng_sockaddr *); +extern int nng_ctx_set(nng_ctx, const char *, const void *, size_t); +extern int nng_ctx_set_bool(nng_ctx, const char *, bool); +extern int nng_ctx_set_int(nng_ctx, const char *, int); +extern int nng_ctx_set_size(nng_ctx, const char *, size_t); +extern int nng_ctx_set_uint64(nng_ctx, const char *, uint64_t); +extern int nng_ctx_set_string(nng_ctx, const char *, const char *); +extern int nng_ctx_set_ptr(nng_ctx, const char *, void *); +extern int nng_ctx_set_ms(nng_ctx, const char *, nng_duration); +extern int nng_ctx_set_addr(nng_ctx, const char *, const nng_sockaddr *); extern void *nng_alloc(size_t); extern void nng_free(void *, size_t); extern char *nng_strdup(const char *); @@ -251,6 +327,15 @@ extern int nng_pipe_getopt_sockaddr(nng_pipe, const char *, nng_sockaddr *); extern int nng_pipe_getopt_uint64(nng_pipe, const char *, uint64_t *); extern int nng_pipe_getopt_ptr(nng_pipe, const char *, void **); extern int nng_pipe_getopt_string(nng_pipe, const char *, char **); +extern int nng_pipe_get(nng_pipe, const char *, void *, size_t *); +extern int nng_pipe_get_bool(nng_pipe, const char *, bool *); +extern int nng_pipe_get_int(nng_pipe, const char *, int *); +extern int nng_pipe_get_ms(nng_pipe, const char *, nng_duration *); +extern int nng_pipe_get_size(nng_pipe, const char *, size_t *); +extern int nng_pipe_get_uint64(nng_pipe, const char *, uint64_t *); +extern int nng_pipe_get_string(nng_pipe, const char *, char **); +extern int nng_pipe_get_ptr(nng_pipe, const char *, void **); +extern int nng_pipe_get_addr(nng_pipe, const char *, nng_sockaddr *); extern int nng_pipe_close(nng_pipe); extern int nng_pipe_id(nng_pipe); extern nng_socket nng_pipe_socket(nng_pipe); @@ -267,6 +352,10 @@ extern nng_stat *nng_stat_next(nng_stat *); extern nng_stat *nng_stat_child(nng_stat *); extern const char *nng_stat_name(nng_stat *); extern int nng_stat_type(nng_stat *); +extern nng_stat *nng_stat_find(nng_stat *, const char *); +extern nng_stat *nng_stat_find_socket(nng_stat *, nng_socket); +extern nng_stat *nng_stat_find_dialer(nng_stat *, nng_dialer); +extern nng_stat *nng_stat_find_listener(nng_stat *, nng_listener); enum nng_stat_type_enum { NNG_STAT_SCOPE = 0, NNG_STAT_LEVEL = 1, @@ -319,6 +408,7 @@ enum nng_errno_enum { NNG_ENOARG = 28, NNG_EAMBIGUOUS = 29, NNG_EBADTYPE = 30, + NNG_ECONNSHUT = 31, NNG_EINTERNAL = 1000, NNG_ESYSERR = 0x10000000, NNG_ETRANERR = 0x20000000 @@ -339,6 +429,116 @@ extern int nng_url_parse(nng_url **, const char *); extern void nng_url_free(nng_url *); extern int nng_url_clone(nng_url **, const nng_url *); extern const char *nng_version(void); +typedef struct nng_stream nng_stream; +typedef struct nng_stream_dialer nng_stream_dialer; +typedef struct nng_stream_listener nng_stream_listener; +extern void nng_stream_free(nng_stream *); +extern void nng_stream_close(nng_stream *); +extern void nng_stream_send(nng_stream *, nng_aio *); +extern void nng_stream_recv(nng_stream *, nng_aio *); +extern int nng_stream_get(nng_stream *, const char *, void *, size_t *); +extern int nng_stream_get_bool(nng_stream *, const char *, bool *); +extern int nng_stream_get_int(nng_stream *, const char *, int *); +extern int nng_stream_get_ms(nng_stream *, const char *, nng_duration *); +extern int nng_stream_get_size(nng_stream *, const char *, size_t *); +extern int nng_stream_get_uint64(nng_stream *, const char *, uint64_t *); +extern int nng_stream_get_string(nng_stream *, const char *, char **); +extern int nng_stream_get_ptr(nng_stream *, const char *, void **); +extern int nng_stream_get_addr(nng_stream *, const char *, nng_sockaddr *); +extern int nng_stream_set(nng_stream *, const char *, const void *, size_t); +extern int nng_stream_set_bool(nng_stream *, const char *, bool); +extern int nng_stream_set_int(nng_stream *, const char *, int); +extern int nng_stream_set_ms(nng_stream *, const char *, nng_duration); +extern int nng_stream_set_size(nng_stream *, const char *, size_t); +extern int nng_stream_set_uint64(nng_stream *, const char *, uint64_t); +extern int nng_stream_set_string(nng_stream *, const char *, const char *); +extern int nng_stream_set_ptr(nng_stream *, const char *, void *); +extern int nng_stream_set_addr( + nng_stream *, const char *, const nng_sockaddr *); +extern int nng_stream_dialer_alloc(nng_stream_dialer **, const char *); +extern int nng_stream_dialer_alloc_url( + nng_stream_dialer **, const nng_url *); +extern void nng_stream_dialer_free(nng_stream_dialer *); +extern void nng_stream_dialer_close(nng_stream_dialer *); +extern void nng_stream_dialer_dial(nng_stream_dialer *, nng_aio *); +extern int nng_stream_dialer_set( + nng_stream_dialer *, const char *, const void *, size_t); +extern int nng_stream_dialer_get( + nng_stream_dialer *, const char *, void *, size_t *); +extern int nng_stream_dialer_get_bool( + nng_stream_dialer *, const char *, bool *); +extern int nng_stream_dialer_get_int( + nng_stream_dialer *, const char *, int *); +extern int nng_stream_dialer_get_ms( + nng_stream_dialer *, const char *, nng_duration *); +extern int nng_stream_dialer_get_size( + nng_stream_dialer *, const char *, size_t *); +extern int nng_stream_dialer_get_uint64( + nng_stream_dialer *, const char *, uint64_t *); +extern int nng_stream_dialer_get_string( + nng_stream_dialer *, const char *, char **); +extern int nng_stream_dialer_get_ptr( + nng_stream_dialer *, const char *, void **); +extern int nng_stream_dialer_get_addr( + nng_stream_dialer *, const char *, nng_sockaddr *); +extern int nng_stream_dialer_set_bool( + nng_stream_dialer *, const char *, bool); +extern int nng_stream_dialer_set_int(nng_stream_dialer *, const char *, int); +extern int nng_stream_dialer_set_ms( + nng_stream_dialer *, const char *, nng_duration); +extern int nng_stream_dialer_set_size( + nng_stream_dialer *, const char *, size_t); +extern int nng_stream_dialer_set_uint64( + nng_stream_dialer *, const char *, uint64_t); +extern int nng_stream_dialer_set_string( + nng_stream_dialer *, const char *, const char *); +extern int nng_stream_dialer_set_ptr( + nng_stream_dialer *, const char *, void *); +extern int nng_stream_dialer_set_addr( + nng_stream_dialer *, const char *, const nng_sockaddr *); +extern int nng_stream_listener_alloc(nng_stream_listener **, const char *); +extern int nng_stream_listener_alloc_url( + nng_stream_listener **, const nng_url *); +extern void nng_stream_listener_free(nng_stream_listener *); +extern void nng_stream_listener_close(nng_stream_listener *); +extern int nng_stream_listener_listen(nng_stream_listener *); +extern void nng_stream_listener_accept(nng_stream_listener *, nng_aio *); +extern int nng_stream_listener_set( + nng_stream_listener *, const char *, const void *, size_t); +extern int nng_stream_listener_get( + nng_stream_listener *, const char *, void *, size_t *); +extern int nng_stream_listener_get_bool( + nng_stream_listener *, const char *, bool *); +extern int nng_stream_listener_get_int( + nng_stream_listener *, const char *, int *); +extern int nng_stream_listener_get_ms( + nng_stream_listener *, const char *, nng_duration *); +extern int nng_stream_listener_get_size( + nng_stream_listener *, const char *, size_t *); +extern int nng_stream_listener_get_uint64( + nng_stream_listener *, const char *, uint64_t *); +extern int nng_stream_listener_get_string( + nng_stream_listener *, const char *, char **); +extern int nng_stream_listener_get_ptr( + nng_stream_listener *, const char *, void **); +extern int nng_stream_listener_get_addr( + nng_stream_listener *, const char *, nng_sockaddr *); +extern int nng_stream_listener_set_bool( + nng_stream_listener *, const char *, bool); +extern int nng_stream_listener_set_int( + nng_stream_listener *, const char *, int); +extern int nng_stream_listener_set_ms( + nng_stream_listener *, const char *, nng_duration); +extern int nng_stream_listener_set_size( + nng_stream_listener *, const char *, size_t); +extern int nng_stream_listener_set_uint64( + nng_stream_listener *, const char *, uint64_t); +extern int nng_stream_listener_set_string( + nng_stream_listener *, const char *, const char *); +extern int nng_stream_listener_set_ptr( + nng_stream_listener *, const char *, void *); +extern int nng_stream_listener_set_addr( + nng_stream_listener *, const char *, const nng_sockaddr *); int nng_bus0_open(nng_socket *); int nng_bus0_open_raw(nng_socket *); int nng_pair0_open(nng_socket *); @@ -385,5 +585,5 @@ int nng_tls_config_cert_key_file( nng_tls_config *, const char *, const char *); int nng_tls_register(void); #define NNG_MAJOR_VERSION 1 -#define NNG_MINOR_VERSION 1 -#define NNG_PATCH_VERSION 1 +#define NNG_MINOR_VERSION 3 +#define NNG_PATCH_VERSION 0 diff --git a/pynng/nng.py b/pynng/nng.py index e2ecf63..73e8577 100644 --- a/pynng/nng.py +++ b/pynng/nng.py @@ -6,6 +6,7 @@ import logging import weakref import threading +import atexit import pynng from ._nng import ffi, lib @@ -28,13 +29,16 @@ Surveyor0 Respondent0 '''.split() -# a mapping of id(sock): sock for use in callbacks. When a socket is -# initialized, it adds itself to this dict. When a socket is closed, it -# removes itself from this dict. In order to allow sockets to be garbage -# collected, a weak reference to the socket is stored here instead of the -# actual socket. +# Register an atexit handler to call the nng_fini() cleanup function. +# This is necessary to ensure: +# * The Python interpreter doesn't finalize and kill the reap thread +# during a callback to _nng_pipe_cb +# * Cleanup background queue threads used by NNG -_live_sockets = weakref.WeakValueDictionary() +def _pynng_atexit(): + lib.nng_fini() + +atexit.register(_pynng_atexit) def _ensure_can_send(thing): @@ -329,16 +333,14 @@ def __init__(self, *, # set up pipe callbacks. This **must** be called before listen/dial to # avoid race conditions. - as_void = ffi.cast('void *', id(self)) + + handle = ffi.new_handle(self) + self._handle = handle + for event in (lib.NNG_PIPE_EV_ADD_PRE, lib.NNG_PIPE_EV_ADD_POST, lib.NNG_PIPE_EV_REM_POST): check_err(lib.nng_pipe_notify( - self.socket, event, lib._nng_pipe_cb, as_void)) - - # The socket *must* be added to the _live_sockets map before calling - # listen/dial so that no callbacks are called before the socket is - # added to the map (because then the callback would fail!). - _live_sockets[id(self)] = self + self.socket, event, lib._nng_pipe_cb, handle)) if listen is not None: self.listen(listen) @@ -1263,11 +1265,14 @@ def _do_callbacks(pipe, callbacks): msg = 'Exception raised in pre pipe connect callback {!r}' logger.exception(msg.format(cb)) - @ffi.def_extern() def _nng_pipe_cb(lib_pipe, event, arg): - sock_id = int(ffi.cast('size_t', arg)) - sock = _live_sockets[sock_id] + + logging.debug("Pipe callback event {}".format(event)) + + # Get the Socket from the handle passed through the callback arguments + sock = ffi.from_handle(arg) + # exceptions don't propagate out of this function, so if any exception is # raised in any of the callbacks, we just log it (using logger.exception). with sock._pipe_notify_lock: From bb32a942878d155e56bdf69eba9e403ff51b3cf7 Mon Sep 17 00:00:00 2001 From: Matt Thompson Date: Mon, 20 Jan 2020 11:05:09 -0800 Subject: [PATCH 2/2] Create and add a new Pipe to the Socket when trying to associate a received message with a pipe. Create a new Pipe if the post callback is called before pre --- pynng/nng.py | 47 +++++++++++++++++++++++++++++++++-------------- 1 file changed, 33 insertions(+), 14 deletions(-) diff --git a/pynng/nng.py b/pynng/nng.py index 73e8577..da0b4a4 100644 --- a/pynng/nng.py +++ b/pynng/nng.py @@ -496,9 +496,13 @@ def pipes(self): def _add_pipe(self, lib_pipe): # this is only called inside the pipe callback. pipe_id = lib.nng_pipe_id(lib_pipe) - pipe = Pipe(lib_pipe, self) - self._pipes[pipe_id] = pipe - return pipe + + # If the pipe already exists in the Socket, don't create a new one + if pipe_id not in self._pipes: + pipe = Pipe(lib_pipe, self) + self._pipes[pipe_id] = pipe + + return self._pipes[pipe_id] def _remove_pipe(self, lib_pipe): pipe_id = lib.nng_pipe_id(lib_pipe) @@ -570,16 +574,27 @@ def _try_associate_msg_with_pipe(self, msg): set it on the Message ``msg`` """ - lib_pipe = lib.nng_msg_get_pipe(msg._nng_msg) - pipe_id = lib.nng_pipe_id(lib_pipe) - try: - msg.pipe = self._pipes[pipe_id] - except KeyError: - # if pipe_id < 0, that *probably* means we hit a race where the - # associated pipe was closed. So only warn when pipe_id is valid - if pipe_id >= 0: - logger.warning("Could not associate msg with pipe (pipe_id == %d)", - pipe_id) + + # Wrap pipe handling inside the notify lock since we can create + # a new Pipe and associate it with the Socket if the callbacks + # haven't been called yet. This will ensure there's no race + # condition with the pipe callbacks. + with self._pipe_notify_lock: + lib_pipe = lib.nng_msg_get_pipe(msg._nng_msg) + pipe_id = lib.nng_pipe_id(lib_pipe) + try: + msg.pipe = self._pipes[pipe_id] + except KeyError: + # A message may have been received before the pipe callback was called. + # Create a new Pipe and associate it with the Socket. + # When the callback is called, it will detect that the pipe was already. + + # if pipe_id < 0, that *probably* means we hit a race where the + # associated pipe was closed. + if pipe_id >= 0: + # Add the pipe to the socket + msg.pipe = self._add_pipe(lib_pipe) + def recv_msg(self, block=True): """Receive a :class:`Message` on the socket.""" @@ -1288,7 +1303,11 @@ def _nng_pipe_cb(lib_pipe, event, arg): # will result in a KeyError below. sock._remove_pipe(lib_pipe) elif event == lib.NNG_PIPE_EV_ADD_POST: - pipe = sock._pipes[pipe_id] + # The ADD_POST event can arrive before ADD_PRE, in which case the Socket + # won't have the pipe_id in the _pipes dictionary + + # _add_pipe will return an existing pipe or create a new one if it doesn't exist + pipe = sock._add_pipe(lib_pipe) _do_callbacks(pipe, sock._on_post_pipe_add) elif event == lib.NNG_PIPE_EV_REM_POST: try: