Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Race condition on multi-threaded connections #345

Closed
conordavis opened this issue Aug 14, 2019 · 2 comments
Closed

Race condition on multi-threaded connections #345

conordavis opened this issue Aug 14, 2019 · 2 comments
Assignees
Labels
Done The issue discussion is exhausted and is closed w/ comment

Comments

@conordavis
Copy link

conordavis commented Aug 14, 2019

When multiple threads are used on a single connection, there is a race condition where a thread waiting on a reply doesn't wake up promptly when another thread handles that reply. In other words, a thread blocked in AsyncResult.wait -> Connection.serve -> Channel.poll will not wake up immediately if a different thread handles the reply that calls AsyncResult.__call__.

Suppose the server is running with 2 threads (conn.serve_threaded(2)): thread A and thread B. The server's service exposes a method taking a client-side callback as an argument. For simplicity, the client will have only one thread.

class SyncCallbackService(rpyc.Service):
    def exposed_func(self, callback):
        callback()
        return None
  1. Client calls conn.root.func(lambda: None). Thread A handles this request.
  2. Thread A executes the callback synchronously, so it sends a request back to the client and is suspended until step 6. Notably, it is suspended before trying to acquire the recv lock.
  3. Thread B acquires the recv lock.
  4. Client sends the callback reply.
  5. Thread B receives the reply and releases the recv lock.
  6. Thread A acquires the recv lock and blocks in Channel.poll.
  7. Thread B dispatches the reply, calling AsyncResult.__call__ which sets self._is_ready = True.
  8. Thread A eventually times out in Channel.poll. Thread A's AsyncResult.wait call then notices that self._ready is truthy, so it doesn't raise AsyncResultTimeout. Thread A sends exposed_func's reply to the client.
  9. The initial client call from step 1 times out waiting for the reply from step 8.

In this scenario, the client's timeout error could be avoided by setting the client's timeout to a higher value than the server's timeout value, but there are still unnecessary delays in the various AsyncResult.wait calls.

Environment
  • rpyc 4.1.0
  • python 2.7.10
  • MacOS 10.14
Minimal example
class SyncCallbackService(rpyc.Service):
    def exposed_func(self, callback):
        callback()
        return None


class Test_ServeThreaded(unittest.TestCase):
    def setUp(self):
        # This is a copy of connect_thread, except that it uses serve_threaded
        # instead of serve_all. Use a relatively short timeout because we expect
        # it to time out. To trigger a failure, we must ensure that the server
        # config's timeout is at least as long as the client config's timeout.
        listener = socket.socket()
        listener.bind(("localhost", 0))
        listener.listen(1)

        def server(listener=listener):
            with closing(listener):
                client = listener.accept()[0]
            conn = rpyc.connect_stream(rpyc.SocketStream(client),
                                       service=SyncCallbackService,
                                       config={"sync_request_timeout": 2})
            try:
                # Need at least 2 threads to reproduce problem.
                conn.serve_threaded(2)
            except KeyboardInterrupt:
                interrupt_main()

        rpyc.spawn(server)
        host, port = listener.getsockname()
        self.conn = rpyc.connect(host, port, config={"sync_request_timeout": 2})

    def tearDown(self):
        self.conn.close()

    def test_sync_callback_with_serve_threaded(self):
        """Test for race conditions between server threads.

        If this test raises TimeoutError, it probably indicates server thread A
        was waiting on an async result that was handled by server thread B.
        Thread A times out in its ``self._channel.poll`` call, but does not fail
        due to :meth:`AsyncResult.wait` checking ``self._is_ready`` even after a
        timeout. However, a client waiting on thread A to complete will timeout
        and fail as long as the client timeout is less than or equal to the
        server timeout.
        """
        def callback():
            return None

        func = self.conn.root.func
        for _ in range(100):
            func(callback)

If the test case passes, try increasing its loop count.

Possible solutions
  1. Support a way for thread B to interrupt thread A's Channel.poll call. I can sort of see using pipes to interrupt select calls on Unix. Not sure about Windows. I think Paramiko uses this technique.

  2. Use a dedicated thread to handle all recvs. The recv thread only does recvs; it never calls AsyncResult.wait. Make AsyncResult.wait wait on a threading.Event object (self._event.wait()) and make AsyncResult.__call__ set that event (self._event.set()).

    Since unboxing replies involves synchronous requests, the dedicated recv thread should not perform unboxing. I guess the first thread that wakes up from AsyncResult.wait should be responsible for unboxing replies.

    I worry about the performance impact of this option in the common case where only one thread is spawned (i.e. serve_all instead of serve_threaded).

@comrumino comrumino added the To Start Description reviewed and a maintainer needs "to start" triage label Aug 15, 2019
@comrumino comrumino self-assigned this Aug 15, 2019
@conordavis
Copy link
Author

I made quick-and-dirty subclasses of Connection/AsyncResult that seem to solve the race condition using the dedicated thread approach. This code spawns threads all over the place, but maybe it's helpful?

@comrumino
Copy link
Collaborator

This issue looks like the result of a netref being constructed while serving a request, although the unittest provided does not reproduce the issue for me. serve_threaded is likely only safe for immutable types. Trying to use netref server-side when serving a connection with multiple threads is likely to introduce more overhead and complexity than other solutions.

If a non-immutable type must be passed as parameter another approach should be considered. Using a more conventional approach where each client thread opens a new connection would allow ThreadedServer to naturally avoid multiplexing issues.

comrumino added a commit that referenced this issue Oct 2, 2019
comrumino added a commit that referenced this issue Oct 2, 2019
@comrumino comrumino added Done The issue discussion is exhausted and is closed w/ comment and removed To Start Description reviewed and a maintainer needs "to start" triage labels Oct 2, 2019
YuvalEvron pushed a commit to weka/rpyc that referenced this issue Oct 27, 2019
* Added warning to _remote_tb when the major version of local and remote mismatch (tomerfiliba-org#332)

* Added `include_local_version` to DEFAULT_CONFIG to allow for configurable security controls (e.g. `include_local_traceback`)

* Update readme.txt

* Added break to client process loop when everything is dead

* Increased chunk size to improve multi-client response time and throughput of large data tomerfiliba-org#329

* Improved test for response of client 1 while transferring a large amount of data to client 2

* Cleaned up coding style of test_service_pickle.py

* Updated issue template

* added vs code testing cfgs; updated gitignore venv

* Changed settings.json to use env USERNAME

* Name pack casted in _unbox to fix IronPython bug. Fixed tomerfiliba-org#337

* Fixed netref.class_factory id_pack usage per tomerfiliba-org#339 and added test cases

* Added .readthedocs.yml and requirements to build

* Make OneShotServer terminates after client connection ends

* Added unit test for OneShotServer. Fixed tomerfiliba-org#343

* Fixed 2.6 backwards incompatibility for format syntax

* Updated change log and bumped version --- 4.1.1

* Added support for chained connections which result in netref being passed to get_id_pack. Fixed tomerfiliba-org#346

* Added tests for get_id_pack

* Added a test for issue tomerfiliba-org#346

* Corrected the connection used to inspect a netref

* Refactored __cmp__ getattr

* Extended rpyc over rpyc unit testing and removed port parameter from TestRestricted

* Added comment explaining the inspect for intermediate proxy. Fixed tomerfiliba-org#346

* Improved docstring for serve_threaded to address when and when not to use the method. Done tomerfiliba-org#345

* Release 4.1.2

* Fixed versions referred to in security.rst

* link docs instead of mitre

* set up logging with a better formatter

* fix bug when proxy context-manager is being exited with an exception (#1)

* logging: add a rotating file log handler

* fix bug when proxy context-manager is being exited with an exception (#1)

* logging: add a rotating file log handler
# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
Done The issue discussion is exhausted and is closed w/ comment
Projects
None yet
Development

No branches or pull requests

2 participants