Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Fix issue where Python was killing reaper thread in pipe remove callbacks #55

Merged
merged 2 commits into from
Feb 1, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build_pynng.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@

// nng_pipe_notify callback:
// https://nanomsg.github.io/nng/man/tip/nng_pipe_notify.3
extern "Python" void _nng_pipe_cb(nng_pipe, int, void *);
extern "Python" void _nng_pipe_cb(nng_pipe, nng_pipe_ev, void *);
"""
ffibuilder.cdef(api + callbacks)

Expand Down
212 changes: 206 additions & 6 deletions nng_api.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// THIS FILE WAS AUTOMATICALLY GENERATED BY ./generate_api.sh
// THIS FILE WAS AUTOMATICALLY GENERATED BY generate_api.sh
typedef struct nng_ctx_s {
uint32_t id;
} nng_ctx;
Expand Down Expand Up @@ -91,15 +91,34 @@ extern int nng_getopt_ms(nng_socket, const char *, nng_duration *);
extern int nng_getopt_size(nng_socket, const char *, size_t *);
extern int nng_getopt_uint64(nng_socket, const char *, uint64_t *);
extern int nng_getopt_ptr(nng_socket, const char *, void **);
extern int nng_getopt_string(nng_socket, const char *, char **);
extern int nng_socket_set(nng_socket, const char *, const void *, size_t);
extern int nng_socket_set_bool(nng_socket, const char *, bool);
extern int nng_socket_set_int(nng_socket, const char *, int);
extern int nng_socket_set_size(nng_socket, const char *, size_t);
extern int nng_socket_set_uint64(nng_socket, const char *, uint64_t);
extern int nng_socket_set_string(nng_socket, const char *, const char *);
extern int nng_socket_set_ptr(nng_socket, const char *, void *);
extern int nng_socket_set_ms(nng_socket, const char *, nng_duration);
extern int nng_socket_set_addr(
nng_socket, const char *, const nng_sockaddr *);
extern int nng_socket_get(nng_socket, const char *, void *, size_t *);
extern int nng_socket_get_bool(nng_socket, const char *, bool *);
extern int nng_socket_get_int(nng_socket, const char *, int *);
extern int nng_socket_get_size(nng_socket, const char *, size_t *);
extern int nng_socket_get_uint64(nng_socket, const char *, uint64_t *);
extern int nng_socket_get_string(nng_socket, const char *, char **);
extern int nng_socket_get_ptr(nng_socket, const char *, void **);
extern int nng_socket_get_ms(nng_socket, const char *, nng_duration *);
extern int nng_socket_get_addr(nng_socket, const char *, nng_sockaddr *);
typedef enum {
NNG_PIPE_EV_ADD_PRE,
NNG_PIPE_EV_ADD_POST,
NNG_PIPE_EV_REM_POST,
NNG_PIPE_EV_NUM,
} nng_pipe_ev;
typedef void (*nng_pipe_cb)(nng_pipe, int, void *);
extern int nng_pipe_notify(nng_socket, int, nng_pipe_cb, void *);
extern int nng_getopt_string(nng_socket, const char *, char **);
typedef void (*nng_pipe_cb)(nng_pipe, nng_pipe_ev, void *);
extern int nng_pipe_notify(nng_socket, nng_pipe_ev, nng_pipe_cb, void *);
extern int nng_listen(nng_socket, const char *, nng_listener *, int);
extern int nng_dial(nng_socket, const char *, nng_dialer *, int);
extern int nng_dialer_create(nng_dialer *, nng_socket, const char *);
Expand Down Expand Up @@ -128,6 +147,25 @@ extern int nng_dialer_getopt_sockaddr(
extern int nng_dialer_getopt_uint64(nng_dialer, const char *, uint64_t *);
extern int nng_dialer_getopt_ptr(nng_dialer, const char *, void **);
extern int nng_dialer_getopt_string(nng_dialer, const char *, char **);
extern int nng_dialer_set(nng_dialer, const char *, const void *, size_t);
extern int nng_dialer_set_bool(nng_dialer, const char *, bool);
extern int nng_dialer_set_int(nng_dialer, const char *, int);
extern int nng_dialer_set_size(nng_dialer, const char *, size_t);
extern int nng_dialer_set_uint64(nng_dialer, const char *, uint64_t);
extern int nng_dialer_set_string(nng_dialer, const char *, const char *);
extern int nng_dialer_set_ptr(nng_dialer, const char *, void *);
extern int nng_dialer_set_ms(nng_dialer, const char *, nng_duration);
extern int nng_dialer_set_addr(
nng_dialer, const char *, const nng_sockaddr *);
extern int nng_dialer_get(nng_dialer, const char *, void *, size_t *);
extern int nng_dialer_get_bool(nng_dialer, const char *, bool *);
extern int nng_dialer_get_int(nng_dialer, const char *, int *);
extern int nng_dialer_get_size(nng_dialer, const char *, size_t *);
extern int nng_dialer_get_uint64(nng_dialer, const char *, uint64_t *);
extern int nng_dialer_get_string(nng_dialer, const char *, char **);
extern int nng_dialer_get_ptr(nng_dialer, const char *, void **);
extern int nng_dialer_get_ms(nng_dialer, const char *, nng_duration *);
extern int nng_dialer_get_addr(nng_dialer, const char *, nng_sockaddr *);
extern int nng_listener_setopt(
nng_listener, const char *, const void *, size_t);
extern int nng_listener_setopt_bool(nng_listener, const char *, bool);
Expand All @@ -150,6 +188,26 @@ extern int nng_listener_getopt_uint64(
nng_listener, const char *, uint64_t *);
extern int nng_listener_getopt_ptr(nng_listener, const char *, void **);
extern int nng_listener_getopt_string(nng_listener, const char *, char **);
extern int nng_listener_set(
nng_listener, const char *, const void *, size_t);
extern int nng_listener_set_bool(nng_listener, const char *, bool);
extern int nng_listener_set_int(nng_listener, const char *, int);
extern int nng_listener_set_size(nng_listener, const char *, size_t);
extern int nng_listener_set_uint64(nng_listener, const char *, uint64_t);
extern int nng_listener_set_string(nng_listener, const char *, const char *);
extern int nng_listener_set_ptr(nng_listener, const char *, void *);
extern int nng_listener_set_ms(nng_listener, const char *, nng_duration);
extern int nng_listener_set_addr(
nng_listener, const char *, const nng_sockaddr *);
extern int nng_listener_get(nng_listener, const char *, void *, size_t *);
extern int nng_listener_get_bool(nng_listener, const char *, bool *);
extern int nng_listener_get_int(nng_listener, const char *, int *);
extern int nng_listener_get_size(nng_listener, const char *, size_t *);
extern int nng_listener_get_uint64(nng_listener, const char *, uint64_t *);
extern int nng_listener_get_string(nng_listener, const char *, char **);
extern int nng_listener_get_ptr(nng_listener, const char *, void **);
extern int nng_listener_get_ms(nng_listener, const char *, nng_duration *);
extern int nng_listener_get_addr(nng_listener, const char *, nng_sockaddr *);
extern const char *nng_strerror(int);
extern int nng_send(nng_socket, void *, size_t, int);
extern int nng_recv(nng_socket, void *, size_t *, int);
Expand All @@ -172,6 +230,24 @@ extern int nng_ctx_setopt_bool(nng_ctx, const char *, bool);
extern int nng_ctx_setopt_int(nng_ctx, const char *, int);
extern int nng_ctx_setopt_ms(nng_ctx, const char *, nng_duration);
extern int nng_ctx_setopt_size(nng_ctx, const char *, size_t);
extern int nng_ctx_get(nng_ctx, const char *, void *, size_t *);
extern int nng_ctx_get_bool(nng_ctx, const char *, bool *);
extern int nng_ctx_get_int(nng_ctx, const char *, int *);
extern int nng_ctx_get_size(nng_ctx, const char *, size_t *);
extern int nng_ctx_get_uint64(nng_ctx, const char *, uint64_t *);
extern int nng_ctx_get_string(nng_ctx, const char *, char **);
extern int nng_ctx_get_ptr(nng_ctx, const char *, void **);
extern int nng_ctx_get_ms(nng_ctx, const char *, nng_duration *);
extern int nng_ctx_get_addr(nng_ctx, const char *, nng_sockaddr *);
extern int nng_ctx_set(nng_ctx, const char *, const void *, size_t);
extern int nng_ctx_set_bool(nng_ctx, const char *, bool);
extern int nng_ctx_set_int(nng_ctx, const char *, int);
extern int nng_ctx_set_size(nng_ctx, const char *, size_t);
extern int nng_ctx_set_uint64(nng_ctx, const char *, uint64_t);
extern int nng_ctx_set_string(nng_ctx, const char *, const char *);
extern int nng_ctx_set_ptr(nng_ctx, const char *, void *);
extern int nng_ctx_set_ms(nng_ctx, const char *, nng_duration);
extern int nng_ctx_set_addr(nng_ctx, const char *, const nng_sockaddr *);
extern void *nng_alloc(size_t);
extern void nng_free(void *, size_t);
extern char *nng_strdup(const char *);
Expand Down Expand Up @@ -251,6 +327,15 @@ extern int nng_pipe_getopt_sockaddr(nng_pipe, const char *, nng_sockaddr *);
extern int nng_pipe_getopt_uint64(nng_pipe, const char *, uint64_t *);
extern int nng_pipe_getopt_ptr(nng_pipe, const char *, void **);
extern int nng_pipe_getopt_string(nng_pipe, const char *, char **);
extern int nng_pipe_get(nng_pipe, const char *, void *, size_t *);
extern int nng_pipe_get_bool(nng_pipe, const char *, bool *);
extern int nng_pipe_get_int(nng_pipe, const char *, int *);
extern int nng_pipe_get_ms(nng_pipe, const char *, nng_duration *);
extern int nng_pipe_get_size(nng_pipe, const char *, size_t *);
extern int nng_pipe_get_uint64(nng_pipe, const char *, uint64_t *);
extern int nng_pipe_get_string(nng_pipe, const char *, char **);
extern int nng_pipe_get_ptr(nng_pipe, const char *, void **);
extern int nng_pipe_get_addr(nng_pipe, const char *, nng_sockaddr *);
extern int nng_pipe_close(nng_pipe);
extern int nng_pipe_id(nng_pipe);
extern nng_socket nng_pipe_socket(nng_pipe);
Expand All @@ -267,6 +352,10 @@ extern nng_stat *nng_stat_next(nng_stat *);
extern nng_stat *nng_stat_child(nng_stat *);
extern const char *nng_stat_name(nng_stat *);
extern int nng_stat_type(nng_stat *);
extern nng_stat *nng_stat_find(nng_stat *, const char *);
extern nng_stat *nng_stat_find_socket(nng_stat *, nng_socket);
extern nng_stat *nng_stat_find_dialer(nng_stat *, nng_dialer);
extern nng_stat *nng_stat_find_listener(nng_stat *, nng_listener);
enum nng_stat_type_enum {
NNG_STAT_SCOPE = 0,
NNG_STAT_LEVEL = 1,
Expand Down Expand Up @@ -319,6 +408,7 @@ enum nng_errno_enum {
NNG_ENOARG = 28,
NNG_EAMBIGUOUS = 29,
NNG_EBADTYPE = 30,
NNG_ECONNSHUT = 31,
NNG_EINTERNAL = 1000,
NNG_ESYSERR = 0x10000000,
NNG_ETRANERR = 0x20000000
Expand All @@ -339,6 +429,116 @@ extern int nng_url_parse(nng_url **, const char *);
extern void nng_url_free(nng_url *);
extern int nng_url_clone(nng_url **, const nng_url *);
extern const char *nng_version(void);
typedef struct nng_stream nng_stream;
typedef struct nng_stream_dialer nng_stream_dialer;
typedef struct nng_stream_listener nng_stream_listener;
extern void nng_stream_free(nng_stream *);
extern void nng_stream_close(nng_stream *);
extern void nng_stream_send(nng_stream *, nng_aio *);
extern void nng_stream_recv(nng_stream *, nng_aio *);
extern int nng_stream_get(nng_stream *, const char *, void *, size_t *);
extern int nng_stream_get_bool(nng_stream *, const char *, bool *);
extern int nng_stream_get_int(nng_stream *, const char *, int *);
extern int nng_stream_get_ms(nng_stream *, const char *, nng_duration *);
extern int nng_stream_get_size(nng_stream *, const char *, size_t *);
extern int nng_stream_get_uint64(nng_stream *, const char *, uint64_t *);
extern int nng_stream_get_string(nng_stream *, const char *, char **);
extern int nng_stream_get_ptr(nng_stream *, const char *, void **);
extern int nng_stream_get_addr(nng_stream *, const char *, nng_sockaddr *);
extern int nng_stream_set(nng_stream *, const char *, const void *, size_t);
extern int nng_stream_set_bool(nng_stream *, const char *, bool);
extern int nng_stream_set_int(nng_stream *, const char *, int);
extern int nng_stream_set_ms(nng_stream *, const char *, nng_duration);
extern int nng_stream_set_size(nng_stream *, const char *, size_t);
extern int nng_stream_set_uint64(nng_stream *, const char *, uint64_t);
extern int nng_stream_set_string(nng_stream *, const char *, const char *);
extern int nng_stream_set_ptr(nng_stream *, const char *, void *);
extern int nng_stream_set_addr(
nng_stream *, const char *, const nng_sockaddr *);
extern int nng_stream_dialer_alloc(nng_stream_dialer **, const char *);
extern int nng_stream_dialer_alloc_url(
nng_stream_dialer **, const nng_url *);
extern void nng_stream_dialer_free(nng_stream_dialer *);
extern void nng_stream_dialer_close(nng_stream_dialer *);
extern void nng_stream_dialer_dial(nng_stream_dialer *, nng_aio *);
extern int nng_stream_dialer_set(
nng_stream_dialer *, const char *, const void *, size_t);
extern int nng_stream_dialer_get(
nng_stream_dialer *, const char *, void *, size_t *);
extern int nng_stream_dialer_get_bool(
nng_stream_dialer *, const char *, bool *);
extern int nng_stream_dialer_get_int(
nng_stream_dialer *, const char *, int *);
extern int nng_stream_dialer_get_ms(
nng_stream_dialer *, const char *, nng_duration *);
extern int nng_stream_dialer_get_size(
nng_stream_dialer *, const char *, size_t *);
extern int nng_stream_dialer_get_uint64(
nng_stream_dialer *, const char *, uint64_t *);
extern int nng_stream_dialer_get_string(
nng_stream_dialer *, const char *, char **);
extern int nng_stream_dialer_get_ptr(
nng_stream_dialer *, const char *, void **);
extern int nng_stream_dialer_get_addr(
nng_stream_dialer *, const char *, nng_sockaddr *);
extern int nng_stream_dialer_set_bool(
nng_stream_dialer *, const char *, bool);
extern int nng_stream_dialer_set_int(nng_stream_dialer *, const char *, int);
extern int nng_stream_dialer_set_ms(
nng_stream_dialer *, const char *, nng_duration);
extern int nng_stream_dialer_set_size(
nng_stream_dialer *, const char *, size_t);
extern int nng_stream_dialer_set_uint64(
nng_stream_dialer *, const char *, uint64_t);
extern int nng_stream_dialer_set_string(
nng_stream_dialer *, const char *, const char *);
extern int nng_stream_dialer_set_ptr(
nng_stream_dialer *, const char *, void *);
extern int nng_stream_dialer_set_addr(
nng_stream_dialer *, const char *, const nng_sockaddr *);
extern int nng_stream_listener_alloc(nng_stream_listener **, const char *);
extern int nng_stream_listener_alloc_url(
nng_stream_listener **, const nng_url *);
extern void nng_stream_listener_free(nng_stream_listener *);
extern void nng_stream_listener_close(nng_stream_listener *);
extern int nng_stream_listener_listen(nng_stream_listener *);
extern void nng_stream_listener_accept(nng_stream_listener *, nng_aio *);
extern int nng_stream_listener_set(
nng_stream_listener *, const char *, const void *, size_t);
extern int nng_stream_listener_get(
nng_stream_listener *, const char *, void *, size_t *);
extern int nng_stream_listener_get_bool(
nng_stream_listener *, const char *, bool *);
extern int nng_stream_listener_get_int(
nng_stream_listener *, const char *, int *);
extern int nng_stream_listener_get_ms(
nng_stream_listener *, const char *, nng_duration *);
extern int nng_stream_listener_get_size(
nng_stream_listener *, const char *, size_t *);
extern int nng_stream_listener_get_uint64(
nng_stream_listener *, const char *, uint64_t *);
extern int nng_stream_listener_get_string(
nng_stream_listener *, const char *, char **);
extern int nng_stream_listener_get_ptr(
nng_stream_listener *, const char *, void **);
extern int nng_stream_listener_get_addr(
nng_stream_listener *, const char *, nng_sockaddr *);
extern int nng_stream_listener_set_bool(
nng_stream_listener *, const char *, bool);
extern int nng_stream_listener_set_int(
nng_stream_listener *, const char *, int);
extern int nng_stream_listener_set_ms(
nng_stream_listener *, const char *, nng_duration);
extern int nng_stream_listener_set_size(
nng_stream_listener *, const char *, size_t);
extern int nng_stream_listener_set_uint64(
nng_stream_listener *, const char *, uint64_t);
extern int nng_stream_listener_set_string(
nng_stream_listener *, const char *, const char *);
extern int nng_stream_listener_set_ptr(
nng_stream_listener *, const char *, void *);
extern int nng_stream_listener_set_addr(
nng_stream_listener *, const char *, const nng_sockaddr *);
int nng_bus0_open(nng_socket *);
int nng_bus0_open_raw(nng_socket *);
int nng_pair0_open(nng_socket *);
Expand Down Expand Up @@ -385,5 +585,5 @@ int nng_tls_config_cert_key_file(
nng_tls_config *, const char *, const char *);
int nng_tls_register(void);
#define NNG_MAJOR_VERSION 1
#define NNG_MINOR_VERSION 1
#define NNG_PATCH_VERSION 1
#define NNG_MINOR_VERSION 3
#define NNG_PATCH_VERSION 0
37 changes: 21 additions & 16 deletions pynng/nng.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import logging
import weakref
import threading
import atexit

import pynng
from ._nng import ffi, lib
Expand All @@ -28,13 +29,16 @@
Surveyor0 Respondent0
'''.split()

# a mapping of id(sock): sock for use in callbacks. When a socket is
# initialized, it adds itself to this dict. When a socket is closed, it
# removes itself from this dict. In order to allow sockets to be garbage
# collected, a weak reference to the socket is stored here instead of the
# actual socket.
# Register an atexit handler to call the nng_fini() cleanup function.
# This is necessary to ensure:
# * The Python interpreter doesn't finalize and kill the reap thread
# during a callback to _nng_pipe_cb
# * Cleanup background queue threads used by NNG

_live_sockets = weakref.WeakValueDictionary()
def _pynng_atexit():
lib.nng_fini()

atexit.register(_pynng_atexit)


def _ensure_can_send(thing):
Expand Down Expand Up @@ -329,16 +333,14 @@ def __init__(self, *,

# set up pipe callbacks. This **must** be called before listen/dial to
# avoid race conditions.
as_void = ffi.cast('void *', id(self))

handle = ffi.new_handle(self)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm having a hard time reasoning about this code wrt use after free possibilities. I want to make sure it's not possible that a pipe callback gets called after Python has garbage collected the socket. My thinking here is that there is nothing stopping that from happening here; the socket keeps a reference to its handle, but since that is the only reference, it could happen that the GC collects both the socket and the handle at the same time.

Wait a second, now I'm pretty sure I'm wrong about that possibility. Since __del__ calls close on the socket, and IIRC nng calls the POST_PIPE_REM callback synchronously in close, the handle should always be valid. So actually, this looks great, and way better than keeping track of everything in the global dict.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that NNG_PIPE_EV_REM_POST is only ever called from a reap_worker thread, so there may in fact be a possible use after free.

I'll see if I can come up with a test for it.

Copy link
Contributor Author

@wtfuzz wtfuzz Jan 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Running a simple stress test:

from pynng import Req0, Rep0
import time

listener = Rep0(listen='tcp://localhost:9999')

while True:
    client = Req0(dial='tcp://localhost:9999')
    client.send(b'a')
    client.close()
    listener.recv_msg()

I get key errors such as this:

From cffi callback <function _nng_pipe_cb at 0x7f36a3ebae18>:
Traceback (most recent call last):
  File "/home/fuzz/pynng/pynng/nng.py", line 1291, in _nng_pipe_cb
    pipe = sock._pipes[pipe_id]
KeyError: 636305841

These are a result of the connect callbacks arriving out of order, but I haven't found any issues with use after free on the REM_POST events.

self._handle = handle

for event in (lib.NNG_PIPE_EV_ADD_PRE, lib.NNG_PIPE_EV_ADD_POST,
lib.NNG_PIPE_EV_REM_POST):
check_err(lib.nng_pipe_notify(
self.socket, event, lib._nng_pipe_cb, as_void))

# The socket *must* be added to the _live_sockets map before calling
# listen/dial so that no callbacks are called before the socket is
# added to the map (because then the callback would fail!).
_live_sockets[id(self)] = self
self.socket, event, lib._nng_pipe_cb, handle))

if listen is not None:
self.listen(listen)
Expand Down Expand Up @@ -1263,11 +1265,14 @@ def _do_callbacks(pipe, callbacks):
msg = 'Exception raised in pre pipe connect callback {!r}'
logger.exception(msg.format(cb))


@ffi.def_extern()
def _nng_pipe_cb(lib_pipe, event, arg):
sock_id = int(ffi.cast('size_t', arg))
sock = _live_sockets[sock_id]

logging.debug("Pipe callback event {}".format(event))

# Get the Socket from the handle passed through the callback arguments
sock = ffi.from_handle(arg)

# exceptions don't propagate out of this function, so if any exception is
# raised in any of the callbacks, we just log it (using logger.exception).
with sock._pipe_notify_lock:
Expand Down