Skip to content

Commit

Permalink
Add max concurrency support - in order to prevent OutOfMemory state o…
Browse files Browse the repository at this point in the history
…n low end machines like esp8266.

Move 'backlog' keyword argument from run() into __init__()
Fixes #15
  • Loading branch information
belyalov committed May 10, 2018
1 parent 76ad234 commit d1164a6
Showing 1 changed file with 36 additions and 8 deletions.
44 changes: 36 additions & 8 deletions tinyweb/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,19 +372,35 @@ async def restful_resource_handler(req, resp, param=None):

class webserver:

def __init__(self, request_timeout=3):
"""Simple Web Server class.
def __init__(self, request_timeout=3, max_concurrency=None, backlog=16):
"""Tiny Web Server class.
Keyword arguments:
request_timeout - time for client to send complete request
request_timeout - Time for client to send complete request
after that connection will be closed.
Due to tiny implementation maximum timeout could
be request_timeout * 2 - we'll be waiting first time
for request line to be received and second time
for headers
max_concurrency - How many connections can be processed concurrently.
It is very important to limit this number because of
memory constrain.
Default value depends on platform
backlog - Parameter to socket.listen() function. Defines size of
pending to be accepted connections queue.
Must be greater than max_concurrency
"""
self.request_timeout = request_timeout
if not max_concurrency:
if sys.platform == 'esp8266':
self.max_concurrency = 3
elif sys.platform == 'esp32':
self.max_concurrency = 6
else:
self.max_concurrency = 10
self.backlog = backlog
self.explicit_url_map = {}
self.parameterized_url_map = {}
# Currently opened connections
self.conns = {}

def _find_url_handler(self, req):
Expand Down Expand Up @@ -476,7 +492,11 @@ async def _handler(self, reader, writer):
pass
finally:
await writer.aclose()
# Using conn socket as key
# Max concurrency support -
# if queue is full schedule resume of TCP server task
if len(self.conns) == self.max_concurrency:
self.loop.call_soon(self._server_coro)
# Delete connection, using socket as a key
del self.conns[id(writer.s)]

def add_route(self, url, f, **kwargs):
Expand Down Expand Up @@ -593,26 +613,34 @@ async def _tcp_server(self, host, port, backlog):
asyncio.StreamWriter(csock, {}))
self.conns[hid] = handler
self.loop.create_task(handler)
# In case of max concurrency reached - temporary pause server:
# 1. backlog must be greater than max_concurrency, otherwise
# client will got "Connection Reset"
# 2. Server task will be resumed whenever one active connection finished
if len(self.conns) == self.max_concurrency:
# Pause
yield False
except asyncio.CancelledError:
return
finally:
sock.close()

def run(self, host="127.0.0.1", port=8081, loop=None, loop_forever=True, backlog=16):
def run(self, host="127.0.0.1", port=8081, loop=None, loop_forever=True, backlog=None):
"""Run Web Server. By default it runs forever.
Keyword arguments:
host - host to listen on. By default - localhost (127.0.0.1)
port - port to listen on. By default - 8081
loop_forever - run async.loop_forever(). Defaults to True
backlog - size of pending connections queue. Defaults to 10
loop_forever - run loo.loop_forever(), otherwise caller must run it by itself.
"""
if backlog:
print("WARNING: 'backlog' has been moved to __init__() and will be removed in next release")
if loop:
self.loop = loop
else:
self.loop = asyncio.get_event_loop()
print("* Starting Web Server at {}:{}".format(host, port))
self._server_coro = self._tcp_server(host, port, backlog)
self._server_coro = self._tcp_server(host, port, self.backlog)
self.loop.create_task(self._server_coro)
if loop_forever:
self.loop.run_forever()
Expand Down

0 comments on commit d1164a6

Please # to comment.