diff --git a/requirements-dev.txt b/requirements-dev.txt index dda274f..a81c19e 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -2,4 +2,5 @@ flake8==3.2.1 ipdb==0.10.1 pytest==3.0.5 +pytest-asyncio==0.5.0 pytest-cov==2.4.0 diff --git a/tests/conftest.py b/tests/conftest.py index b713526..ae0bd8d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,7 +1,3 @@ -import asyncio -import gc -import socket - import pytest @@ -9,62 +5,3 @@ ids=['debug:true', 'debug:false']) def debug(request): return request.param - - -@pytest.yield_fixture -def loop(request, debug): - old_loop = asyncio.get_event_loop() - loop = asyncio.new_event_loop() - asyncio.set_event_loop(None) - loop.set_debug(debug) - yield loop - - if not loop._closed: - loop.call_soon(loop.stop) - loop.run_forever() - loop.close() - gc.collect() - asyncio.set_event_loop(old_loop) - - -@pytest.mark.tryfirst -def pytest_pycollect_makeitem(collector, name, obj): - if collector.funcnamefilter(name): - item = pytest.Function(name, parent=collector) - if 'run_loop' in item.keywords: - return list(collector._genfunctions(name, obj)) - - -@pytest.mark.tryfirst -def pytest_pyfunc_call(pyfuncitem): - """ - Run asyncio marked test functions in an event loop instead of a normal - function call. - """ - if 'run_loop' in pyfuncitem.keywords: - funcargs = pyfuncitem.funcargs - loop = funcargs['loop'] - testargs = {arg: funcargs[arg] - for arg in pyfuncitem._fixtureinfo.argnames} - - if not asyncio.iscoroutinefunction(pyfuncitem.obj): - func = asyncio.coroutine(pyfuncitem.obj) - else: - func = pyfuncitem.obj - loop.run_until_complete(func(**testargs)) - return True - - -def pytest_runtest_setup(item): - if 'run_loop' in item.keywords and 'loop' not in item.fixturenames: - # inject an event loop fixture for all async tests - item.fixturenames.append('loop') - - -@pytest.fixture(scope='session') -def unused_port(): - def f(): - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.bind(('127.0.0.1', 0)) - return s.getsockname()[1] - return f diff --git a/tests/test_sse.py b/tests/test_sse.py index 60f5bfa..0657adb 100644 --- a/tests/test_sse.py +++ b/tests/test_sse.py @@ -6,8 +6,8 @@ from aiohttp_sse import EventSourceResponse -@pytest.mark.run_loop -def test_func(loop, unused_port): +@pytest.mark.asyncio(forbid_global_loop=True) +def test_func(event_loop, unused_tcp_port): @asyncio.coroutine def func(request): @@ -17,18 +17,20 @@ def func(request): resp.send('foo', event='bar') resp.send('foo', event='bar', id='xyz') resp.send('foo', event='bar', id='xyz', retry=1) + resp.stop_streaming() + yield from resp.wait() return resp - app = web.Application(loop=loop) + app = web.Application(loop=event_loop) app.router.add_route('GET', '/', func) app.router.add_route('POST', '/', func) - port = unused_port() - srv = yield from loop.create_server( - app.make_handler(), '127.0.0.1', port) - url = "http://127.0.0.1:{}/".format(port) + handler = app.make_handler() + srv = yield from event_loop.create_server( + handler, '127.0.0.1', unused_tcp_port) + url = "http://127.0.0.1:{}/".format(unused_tcp_port) - resp = yield from aiohttp.request('GET', url, loop=loop) + resp = yield from aiohttp.request('GET', url, loop=event_loop) assert 200 == resp.status # make sure that EventSourceResponse supports passing @@ -45,13 +47,16 @@ def func(request): # check that EventSourceResponse object works only # with GET method - resp = yield from aiohttp.request('POST', url, loop=loop) + resp = yield from aiohttp.request('POST', url, loop=event_loop) assert 405 == resp.status srv.close() + yield from srv.wait_closed() + yield from handler.shutdown(0) -@pytest.mark.run_loop -def test_wait_stop_streaming(loop, unused_port): +@pytest.mark.asyncio(forbid_global_loop=True) +def test_wait_stop_streaming(event_loop, unused_tcp_port): + loop = event_loop @asyncio.coroutine def func(request): @@ -63,20 +68,20 @@ def func(request): yield from resp.wait() return resp - app = web.Application(loop=loop) + app = web.Application(loop=event_loop) app['socket'] = [] app.router.add_route('GET', '/', func) - port = unused_port() + handler = app.make_handler() srv = yield from loop.create_server( - app.make_handler(), '127.0.0.1', port) - url = "http://127.0.0.1:{}/".format(port) + handler, '127.0.0.1', unused_tcp_port) + url = "http://127.0.0.1:{}/".format(unused_tcp_port) resp_task = asyncio.async( - aiohttp.request('GET', url, loop=loop), - loop=loop) + aiohttp.request('GET', url, loop=event_loop), + loop=event_loop) - yield from asyncio.sleep(0.1, loop=loop) + yield from asyncio.sleep(0.1, loop=event_loop) esourse = app['socket'][0] esourse.stop_streaming() resp = yield from resp_task @@ -88,10 +93,12 @@ def func(request): assert streamed_data == expected srv.close() + yield from srv.wait_closed() + yield from handler.shutdown(0) -@pytest.mark.run_loop -def test_retry(loop, unused_port): +@pytest.mark.asyncio(forbid_global_loop=True) +def test_retry(event_loop, unused_tcp_port): @asyncio.coroutine def func(request): @@ -100,17 +107,19 @@ def func(request): with pytest.raises(TypeError): resp.send('foo', retry='one') resp.send('foo', retry=1) + resp.stop_streaming() + yield from resp.wait() return resp - app = web.Application(loop=loop) + app = web.Application(loop=event_loop) app.router.add_route('GET', '/', func) - port = unused_port() - srv = yield from loop.create_server( - app.make_handler(), '127.0.0.1', port) - url = "http://127.0.0.1:{}/".format(port) + handler = app.make_handler() + srv = yield from event_loop.create_server( + handler, '127.0.0.1', unused_tcp_port) + url = "http://127.0.0.1:{}/".format(unused_tcp_port) - resp = yield from aiohttp.request('GET', url, loop=loop) + resp = yield from aiohttp.request('GET', url, loop=event_loop) assert 200 == resp.status # check streamed data @@ -119,9 +128,11 @@ def func(request): assert streamed_data == expected srv.close() + yield from srv.wait_closed() + yield from handler.shutdown(0) -def test_wait_stop_streaming_errors(loop): +def test_wait_stop_streaming_errors(): response = EventSourceResponse() with pytest.raises(RuntimeError) as ctx: response.wait() @@ -138,7 +149,7 @@ def test_compression_not_implemented(): response.enable_compression() -def test_ping_property(loop): +def test_ping_property(event_loop): response = EventSourceResponse() default = response.DEFAULT_PING_INTERVAL assert response.ping_interval == default @@ -153,8 +164,8 @@ def test_ping_property(loop): response.ping_interval = -42 -@pytest.mark.run_loop -def test_ping(loop, unused_port): +@pytest.mark.asyncio(forbid_global_loop=True) +def test_ping(event_loop, unused_tcp_port): @asyncio.coroutine def func(request): @@ -167,20 +178,20 @@ def func(request): yield from resp.wait() return resp - app = web.Application(loop=loop) + app = web.Application(loop=event_loop) app['socket'] = [] app.router.add_route('GET', '/', func) - port = unused_port() - srv = yield from loop.create_server( - app.make_handler(), '127.0.0.1', port) - url = "http://127.0.0.1:{}/".format(port) + handler = app.make_handler() + srv = yield from event_loop.create_server( + handler, '127.0.0.1', unused_tcp_port) + url = "http://127.0.0.1:{}/".format(unused_tcp_port) resp_task = asyncio.async( - aiohttp.request('GET', url, loop=loop), - loop=loop) + aiohttp.request('GET', url, loop=event_loop), + loop=event_loop) - yield from asyncio.sleep(1.15, loop=loop) + yield from asyncio.sleep(1.15, loop=event_loop) esourse = app['socket'][0] esourse.stop_streaming() resp = yield from resp_task @@ -191,3 +202,5 @@ def func(request): expected = 'data: foo\r\n\r\n' + ': ping\r\n\r\n' assert streamed_data == expected srv.close() + yield from srv.wait_closed() + yield from handler.shutdown(0)