Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

rpyc consume cpu on "big" data #329

Closed
p5-vbnekit opened this issue May 29, 2019 · 4 comments
Closed

rpyc consume cpu on "big" data #329

p5-vbnekit opened this issue May 29, 2019 · 4 comments
Assignees

Comments

@p5-vbnekit
Copy link
Contributor

p5-vbnekit commented May 29, 2019

Environment
  • rpyc version: 4.1.0
  • Python 3.7.3rc1
  • Debian GNU/Linux 10
Minimal example

Server:

import rpyc, numpy
from rpyc.utils.server import OneShotServer
rpyc.lib.setup_logger()

big_data = numpy.arange(4 * 64 * 1024 * 1024).tobytes()

class ListService(rpyc.Service):
  def exposed_print(self, data): print(len(data))
  def exposed_get_data(self): return big_data

server = OneShotServer(ListService, port=12345)
server.start()

Client:

import numpy, rpyc
d1 = numpy.arange(4 * 64 * 1024 * 1024).tobytes()
c = rpyc.connect("localhost", 12345)
c.root.print(d1) # CPU stuck
d2 = c.root.get_data() # same behaviour, CPU stuck

but it works fine:

x = rpyc.core.brine.dump(numpy.arange(4 * 64 * 1024 * 1024).tobytes())

passed too:

y = rpyc.core.brine.load(x)
@comrumino
Copy link
Collaborator

comrumino commented May 29, 2019

Notes:

  • The size of d1 is approximately 2GB (2147483681 Bytes)
  • Confirmed this issue for 4.0.2 as well.

Before (top), modified MAX_IO_CHUNK (middle), and max socket throughput (bottom)
wireshark-compare

The default behavior should probably handle large amounts of data better. At the very least settings such as MAX_IO_CHUNK should be better documented. Increasing throughput will require a thorough analysis to find the optimal changes to make.

Modified server.py to increase MAX_IO_CHUNK during import

import rpyc
rpyc.core.channel.Channel.COMPRESSION_LEVEL = 0
rpyc.core.stream.SocketStream.MAX_IO_CHUNK = 65355*10

Increased timeout for client.py.

import  rpyc
c = rpyc.connect("localhost", 12345, config={"sync_request_timeout": 180})

Max possible throughput for socket library

#!/usr/bin/env python
import sys
import socket


PORT = 8621
HOST = "127.0.0.1"
BUFFSZ = 1024
COUNT = 1000000
SENDBUFF = bytes('x' * (BUFFSZ - 1) + '\n', 'utf-8')


def main():
    if sys.argv[1] == '-s':
        server()
    elif sys.argv[1] == '-c':
        client()
    else:
        sys.exist(1)


def server():
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.bind(('', PORT))
    s.listen(1)
    print('Server started.')
    while 1:
        conn, (host, remoteport) = s.accept()
        while 1:
            data = conn.recv(BUFFSZ)
            if not data:
                break
            del data
        conn.send(bytes('OK\n', 'utf-8'))
        conn.close()
        print('Done with', host, 'port', remoteport)


def client():
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.connect((HOST, PORT))
    for i in range(COUNT):
        s.send(SENDBUFF)
    s.shutdown(1)
    s.recv(BUFFSZ)


if __name__ == "__main__":
    main()

TODOs:

  • Review default value of rpyc.core.stream.SocketStream.MAX_IO_CHUNK
  • Review relation between multiple clients vs response time w/ respect to large data and echo
  • Define metric to test throughput changes with large deviation in data size
  • If feasible allow configurable compression level and chunk size

@comrumino
Copy link
Collaborator

comrumino commented Jun 10, 2019

Some initial data

        sample,tpickle,twrite,bytes,level,max_chunk
        10,0.06048287900193827,21.315996053999697,40000104,0,8000
        10,0.05449429500004044,2.357120478001889,40000104,0,65355
        10,0.05889695700170705,1.2712431600011769,40000104,0,130710
        10,0.058939051003108034,0.8827123650007707,40000104,0,196065
        10,0.05880788899958134,0.7268018610011495,40000104,0,261420

generated by test_service_pickle.py showing that pickling decreases from 21s to 0.7s as MAX_IO_CHUNK increases. This confirms an inverse relation between MAX_IO_CHUNK and the time to obtain an object. Further analysis should check for adverse effects on other connections.

@comrumino
Copy link
Collaborator

Confirmed that increasing MAX_IO_CHUNK decreases transfer time of pickle, but increases response time of simple string return.

sample,tpickle,twrite,bytes,level,max_chunk
3,0.05989043199951993,0.185679252994305,40000104,1,653550
3,0.060112245999334846,0.17400220799754607,40000104,1,8000

Test to show this

from __future__ import print_function
import sys
import pickle  # noqa
import timeit
import rpyc
import unittest
from nose import SkipTest
import cfg_tests
try:
    import pandas as pd
    import numpy as np
except Exception:
    raise SkipTest("Requires pandas, numpy, and tables")


DF_ROWS = 2000
DF_COLS = 2500


class MyService(rpyc.Service):
    on_connect_called = False
    on_disconnect_called = False

    def on_connect(self, conn):
        self.on_connect_called = True

    def on_disconnect(self, conn):
        self.on_disconnect_called = True

    def exposed_write_data(self, dataframe):
        rpyc.classic.obtain(dataframe)

    def exposed_ping(self):
        return "pong"


class TestServicePickle(unittest.TestCase):
    """Issues #323 and #329 showed for large objects there is an excessive number of round trips.

    This test case should check the interrelations of
        + MAX_IO_CHUNK
        + min twrite
        + occurrence rate of socket timeout for other clients
    """
    config = {}

    def setUp(self):
        self.cfg = {'allow_pickle': True}
        self.server = rpyc.utils.server.ThreadedServer(MyService, port=0, protocol_config=self.cfg.copy())
        self.server.logger.quiet = False
        self.thd = self.server._start_in_thread()
        self.conn = rpyc.connect("localhost", self.server.port, config=self.cfg)
        self.conn2 = rpyc.connect("localhost", self.server.port, config=self.cfg)
        # globals are made available to timeit, prepare them
        cfg_tests.timeit['conn'] = self.conn
        cfg_tests.timeit['conn2'] = self.conn2
        cfg_tests.timeit['df'] = pd.DataFrame(np.random.rand(DF_ROWS, DF_COLS))

    def tearDown(self):
        self.conn.close()
        self.server.close()
        self.thd.join()
        cfg_tests.timeit.clear()

    def test_dataframe_pickling(self):
        # the proxy will sync w/ the pickle handle and default proto and provide this as the argument to pickle.load
        # By timing how long w/ out any round trips pickle.dumps and picke.loads takes, the overhead of RPyC protocol
        # can be found

        rpyc.core.channel.Channel.COMPRESSION_LEVEL = 1
        #rpyc.core.stream.SocketStream.MAX_IO_CHUNK = 65355 * 10
        level = rpyc.core.channel.Channel.COMPRESSION_LEVEL
        max_chunk = rpyc.core.stream.SocketStream.MAX_IO_CHUNK
        repeat = 3
        number = 1
        pickle_stmt = 'pickle.loads(pickle.dumps(cfg_tests.timeit["df"]))'
        write_stmt = 'rpyc.lib.spawn(cfg_tests.timeit["conn"].root.write_data, cfg_tests.timeit["df"]); [cfg_tests.timeit["conn2"].root.ping() for i in range(30)]'
        t = timeit.Timer(pickle_stmt, globals=globals())
        tpickle = min(t.repeat(repeat, number))
        t = timeit.Timer(write_stmt, globals=globals())
        twrite = min(t.repeat(repeat, number))

        headers = ['sample', 'tpickle', 'twrite', 'bytes', 'level', 'max_chunk']  # noqa
        data = [repeat, tpickle, twrite, sys.getsizeof(cfg_tests.timeit['df']), level, max_chunk]
        data = [str(d) for d in data]
        print(','.join(headers), file=open('/tmp/time.csv', 'a'))
        print(','.join(data), file=open('/tmp/time.csv', 'a'))


if __name__ == "__main__":
    unittest.main()

@comrumino
Copy link
Collaborator

For now, the improvements made should be sufficient to close this issue. Other optimizations aren't specific to this issue.

YuvalEvron pushed a commit to weka/rpyc that referenced this issue Oct 27, 2019
* Added warning to _remote_tb when the major version of local and remote mismatch (tomerfiliba-org#332)

* Added `include_local_version` to DEFAULT_CONFIG to allow for configurable security controls (e.g. `include_local_traceback`)

* Update readme.txt

* Added break to client process loop when everything is dead

* Increased chunk size to improve multi-client response time and throughput of large data tomerfiliba-org#329

* Improved test for response of client 1 while transferring a large amount of data to client 2

* Cleaned up coding style of test_service_pickle.py

* Updated issue template

* added vs code testing cfgs; updated gitignore venv

* Changed settings.json to use env USERNAME

* Name pack casted in _unbox to fix IronPython bug. Fixed tomerfiliba-org#337

* Fixed netref.class_factory id_pack usage per tomerfiliba-org#339 and added test cases

* Added .readthedocs.yml and requirements to build

* Make OneShotServer terminates after client connection ends

* Added unit test for OneShotServer. Fixed tomerfiliba-org#343

* Fixed 2.6 backwards incompatibility for format syntax

* Updated change log and bumped version --- 4.1.1

* Added support for chained connections which result in netref being passed to get_id_pack. Fixed tomerfiliba-org#346

* Added tests for get_id_pack

* Added a test for issue tomerfiliba-org#346

* Corrected the connection used to inspect a netref

* Refactored __cmp__ getattr

* Extended rpyc over rpyc unit testing and removed port parameter from TestRestricted

* Added comment explaining the inspect for intermediate proxy. Fixed tomerfiliba-org#346

* Improved docstring for serve_threaded to address when and when not to use the method. Done tomerfiliba-org#345

* Release 4.1.2

* Fixed versions referred to in security.rst

* link docs instead of mitre

* set up logging with a better formatter

* fix bug when proxy context-manager is being exited with an exception (#1)

* logging: add a rotating file log handler

* fix bug when proxy context-manager is being exited with an exception (#1)

* logging: add a rotating file log handler
# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants