diff --git a/mars/core/operand/shuffle.py b/mars/core/operand/shuffle.py
index ee3086d55e..a5824a821f 100644
--- a/mars/core/operand/shuffle.py
+++ b/mars/core/operand/shuffle.py
@@ -38,6 +38,12 @@ class MapReduceOperand(Operand):
     reducer_ordinal = Int32Field("reducer_ordinal")
     reducer_phase = StringField("reducer_phase", default=None)
 
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+        if self.stage == OperandStage.reduce:
+            # for reducer, we assign worker at first
+            self.scheduling_hint.reassign_worker = True
+
     def _new_chunks(self, inputs, kws=None, **kw):
         if getattr(self, "reducer_index", None) is None:
             index = None
diff --git a/mars/services/storage/api/oscar.py b/mars/services/storage/api/oscar.py
index 81277abaa0..7b6e3849b1 100644
--- a/mars/services/storage/api/oscar.py
+++ b/mars/services/storage/api/oscar.py
@@ -13,7 +13,7 @@
 # limitations under the License.
 
 import sys
-from typing import Any, List, Type, TypeVar
+from typing import Any, List, Tuple, Type, TypeVar, Union
 
 from .... import oscar as mo
 from ....lib.aio import alru_cache
@@ -163,7 +163,7 @@ async def batch_delete(self, args_list, kwargs_list):
     @mo.extensible
     async def fetch(
         self,
-        data_key: str,
+        data_key: Union[str, Tuple],
         level: StorageLevel = None,
         band_name: str = None,
         remote_address: str = None,
diff --git a/mars/services/subtask/worker/processor.py b/mars/services/subtask/worker/processor.py
index f57f838c98..8ee04e80ed 100644
--- a/mars/services/subtask/worker/processor.py
+++ b/mars/services/subtask/worker/processor.py
@@ -17,7 +17,7 @@
 import sys
 import time
 from collections import defaultdict
-from typing import Any, Dict, List, Optional, Set, Type
+from typing import Any, Dict, List, Optional, Set, Type, Tuple
 
 from .... import oscar as mo
 from ....core import ChunkGraph, OperandType, enter_mode, ExecutionError
@@ -27,6 +27,7 @@
     FetchShuffle,
     execute,
 )
+from ....lib.aio import alru_cache
 from ....metrics import Metrics
 from ....optimization.physical import optimize
 from ....typing import BandType, ChunkType
@@ -424,6 +425,58 @@ async def set_chunks_meta():
         # set result data size
         self.result.data_size = result_data_size
 
+    @classmethod
+    @alru_cache(cache_exceptions=False)
+    async def _gen_reducer_index_to_bands(
+        cls, session_id: str, supervisor_address: str, task_id: str, map_reduce_id: int
+    ) -> Dict[Tuple[int], BandType]:
+        task_api = await TaskAPI.create(session_id, supervisor_address)
+        map_reduce_info = await task_api.get_map_reduce_info(task_id, map_reduce_id)
+        assert len(map_reduce_info.reducer_indexes) == len(
+            map_reduce_info.reducer_bands
+        )
+        return {
+            reducer_index: band
+            for reducer_index, band in zip(
+                map_reduce_info.reducer_indexes, map_reduce_info.reducer_bands
+            )
+        }
+
+    async def _push_mapper_data(self):
+        storage_api_to_fetch_tasks = defaultdict(list)
+        skip = True
+        for result_chunk in self._chunk_graph.result_chunks:
+            map_reduce_id = getattr(result_chunk.op, "extra_params", dict()).get(
+                "analyzer_map_reduce_id"
+            )
+            if map_reduce_id is None:
+                continue
+            skip = False
+            reducer_index_to_bands = await self._gen_reducer_index_to_bands(
+                self._session_id,
+                self._supervisor_address,
+                self.subtask.task_id,
+                map_reduce_id,
+            )
+            for reducer_index, band in reducer_index_to_bands.items():
+                # mapper key is a tuple
+                address, band_name = band
+                storage_api = await StorageAPI.create(
+                    self._session_id, address, band_name
+                )
+                fetch_task = storage_api.fetch.delay(
+                    (result_chunk.key, reducer_index),
+                    band_name=self._band[1],
+                    remote_address=self._band[0],
+                )
+                storage_api_to_fetch_tasks[storage_api].append(fetch_task)
+        if skip:
+            return
+        batch_tasks = []
+        for storage_api, tasks in storage_api_to_fetch_tasks.items():
+            batch_tasks.append(storage_api.fetch.batch(*tasks))
+        await asyncio.gather(*batch_tasks)
+
     async def done(self):
         if self.result.status == SubtaskStatus.running:
             self.result.status = SubtaskStatus.succeeded
@@ -516,6 +569,9 @@ async def run(self):
             pass
         return self.result
 
+    async def post_run(self):
+        await self._push_mapper_data()
+
     async def report_progress_periodically(self, interval=0.5, eps=0.001):
         last_progress = self.result.progress
         while not self.result.status.is_done:
@@ -598,7 +654,7 @@ async def _init_context(self, session_id: str):
         await context.init()
         set_context(context)
 
-    async def run(self, subtask: Subtask):
+    async def run(self, subtask: Subtask, wait_post_run: bool = False):
         logger.info(
             "Start to run subtask: %r on %s. chunk graph contains %s",
             subtask,
@@ -624,10 +680,18 @@ async def run(self, subtask: Subtask):
         try:
             result = yield self._running_aio_task
             logger.info("Finished subtask: %s", subtask.subtask_id)
+            # post run with actor tell which will not block
+            if not wait_post_run:
+                await self.ref().post_run.tell(processor)
+            else:
+                await self.post_run(processor)
             raise mo.Return(result)
         finally:
             self._processor = self._running_aio_task = None
 
+    async def post_run(self, processor: SubtaskProcessor):
+        await processor.post_run()
+
     async def wait(self):
         return self._processor.is_done.wait()
 
diff --git a/mars/services/subtask/worker/runner.py b/mars/services/subtask/worker/runner.py
index dbe7ac7236..15cd46fab3 100644
--- a/mars/services/subtask/worker/runner.py
+++ b/mars/services/subtask/worker/runner.py
@@ -89,7 +89,7 @@ async def _get_supervisor_address(self, session_id: str):
         [address] = await self._cluster_api.get_supervisors_by_keys([session_id])
         return address
 
-    async def run_subtask(self, subtask: Subtask):
+    async def run_subtask(self, subtask: Subtask, wait_post_run: bool = False):
         if self._running_processor is not None:  # pragma: no cover
             running_subtask_id = await self._running_processor.get_running_subtask_id()
             # current subtask is still running
@@ -122,7 +122,9 @@ async def run_subtask(self, subtask: Subtask):
         processor = self._session_id_to_processors[session_id]
         try:
             self._running_processor = self._last_processor = processor
-            result = yield self._running_processor.run(subtask)
+            result = yield self._running_processor.run(
+                subtask, wait_post_run=wait_post_run
+            )
         finally:
             self._running_processor = None
         raise mo.Return(result)
diff --git a/mars/services/subtask/worker/tests/test_subtask.py b/mars/services/subtask/worker/tests/test_subtask.py
index 0302ceff1c..067f44dc7d 100644
--- a/mars/services/subtask/worker/tests/test_subtask.py
+++ b/mars/services/subtask/worker/tests/test_subtask.py
@@ -18,14 +18,17 @@
 import time
 
 import numpy as np
+import pandas as pd
 import pytest
 
 from ..... import oscar as mo
+from ..... import dataframe as md
 from ..... import tensor as mt
 from ..... import remote as mr
-from .....core import ExecutionError
+from .....core import ExecutionError, ChunkGraph
 from .....core.context import get_context
 from .....core.graph import TileableGraph, TileableGraphBuilder, ChunkGraphBuilder
+from .....core.operand import OperandStage
 from .....resource import Resource
 from .....utils import Timer
 from ....cluster import MockClusterAPI
@@ -34,7 +37,7 @@
 from ....scheduling import MockSchedulingAPI
 from ....session import MockSessionAPI
 from ....storage import MockStorageAPI
-from ....task import new_task_id
+from ....task import new_task_id, MapReduceInfo
 from ....task.supervisor.manager import TaskManagerActor, TaskConfigurationActor
 from ....mutable import MockMutableAPI
 from ... import Subtask, SubtaskStatus, SubtaskResult
@@ -46,6 +49,13 @@ class FakeTaskManager(TaskManagerActor):
     def set_subtask_result(self, subtask_result: SubtaskResult):
         return
 
+    def get_map_reduce_info(self, task_id: str, map_reduce_id: int) -> MapReduceInfo:
+        return MapReduceInfo(
+            map_reduce_id=0,
+            reducer_indexes=[(0, 0)],
+            reducer_bands=[(self.address, "numa-0")],
+        )
+
 
 @pytest.fixture
 async def actor_pool():
@@ -142,6 +152,39 @@ async def test_subtask_success(actor_pool):
     assert await subtask_runner.is_runner_free() is True
 
 
+@pytest.mark.asyncio
+async def test_shuffle_subtask(actor_pool):
+    pool, session_id, meta_api, storage_api, manager = actor_pool
+
+    pdf = pd.DataFrame({"f1": ["a", "b", "a"], "f2": [1, 2, 3]})
+    df = md.DataFrame(pdf)
+    result = df.groupby("f1").sum(method="shuffle")
+
+    graph = TileableGraph([result.data])
+    next(TileableGraphBuilder(graph).build())
+    chunk_graph = next(ChunkGraphBuilder(graph, fuse_enabled=False).build())
+    result_chunks = []
+    new_chunk_graph = ChunkGraph(result_chunks)
+    chunk_graph_iter = chunk_graph.topological_iter()
+    curr = None
+    for _ in range(3):
+        prev = curr
+        curr = next(chunk_graph_iter)
+        new_chunk_graph.add_node(curr)
+        if prev is not None:
+            new_chunk_graph.add_edge(prev, curr)
+    assert curr.op.stage == OperandStage.map
+    curr.op.extra_params = {"analyzer_map_reduce_id": 0}
+    result_chunks.append(curr)
+    subtask = Subtask(new_task_id(), session_id, new_task_id(), new_chunk_graph)
+    subtask_runner: SubtaskRunnerRef = await mo.actor_ref(
+        SubtaskRunnerActor.gen_uid("numa-0", 0), address=pool.external_address
+    )
+    await subtask_runner.run_subtask(subtask, wait_post_run=True)
+    result = await subtask_runner.get_subtask_result()
+    assert result.status == SubtaskStatus.succeeded
+
+
 @pytest.mark.asyncio
 async def test_subtask_failure(actor_pool):
     pool, session_id, meta_api, storage_api, manager = actor_pool
diff --git a/mars/services/task/__init__.py b/mars/services/task/__init__.py
index 6851dbfd56..1b5656cf34 100644
--- a/mars/services/task/__init__.py
+++ b/mars/services/task/__init__.py
@@ -14,5 +14,5 @@
 
 from .api import AbstractTaskAPI, TaskAPI, WebTaskAPI
 from .config import task_options
-from .core import Task, TaskStatus, TaskResult, new_task_id
+from .core import Task, TaskStatus, TaskResult, new_task_id, MapReduceInfo
 from .errors import TaskNotExist
diff --git a/mars/services/task/analyzer/analyzer.py b/mars/services/task/analyzer/analyzer.py
index 86e95840ae..4a63b961ba 100644
--- a/mars/services/task/analyzer/analyzer.py
+++ b/mars/services/task/analyzer/analyzer.py
@@ -25,12 +25,14 @@
     LogicKeyGenerator,
     MapReduceOperand,
     OperandStage,
+    ShuffleProxy,
 )
+from ....lib.ordered_set import OrderedSet
 from ....resource import Resource
 from ....typing import BandType, OperandType
 from ....utils import build_fetch, tokenize
 from ...subtask import SubtaskGraph, Subtask
-from ..core import Task, new_task_id
+from ..core import Task, new_task_id, MapReduceInfo
 from .assigner import AbstractGraphAssigner, GraphAssigner
 from .fusion import Coloring
 
@@ -50,6 +52,8 @@ def need_reassign_worker(op: OperandType) -> bool:
 
 
 class GraphAnalyzer:
+    _map_reduce_id = itertools.count()
+
     def __init__(
         self,
         chunk_graph: ChunkGraph,
@@ -59,6 +63,7 @@ def __init__(
         chunk_to_subtasks: Dict[ChunkType, Subtask],
         graph_assigner_cls: Type[AbstractGraphAssigner] = None,
         stage_id: str = None,
+        map_reduce_id_to_infos: Dict[int, MapReduceInfo] = None,
     ):
         self._chunk_graph = chunk_graph
         self._band_resource = band_resource
@@ -68,12 +73,17 @@ def __init__(
         self._fuse_enabled = task.fuse_enabled
         self._extra_config = task.extra_config
         self._chunk_to_subtasks = chunk_to_subtasks
+        self._map_reduce_id_to_infos = map_reduce_id_to_infos
         if graph_assigner_cls is None:
             graph_assigner_cls = GraphAssigner
         self._graph_assigner_cls = graph_assigner_cls
         self._chunk_to_copied = dict()
         self._logic_key_generator = LogicKeyGenerator()
 
+    @classmethod
+    def next_map_reduce_id(cls) -> int:
+        return next(cls._map_reduce_id)
+
     @classmethod
     def _iter_start_ops(cls, chunk_graph: ChunkGraph):
         visited = set()
@@ -300,6 +310,38 @@ def _gen_logic_key(self, chunks: List[ChunkType]):
             *[self._logic_key_generator.get_logic_key(chunk.op) for chunk in chunks]
         )
 
+    def _gen_map_reduce_info(
+        self, chunk: ChunkType, assign_results: Dict[ChunkType, BandType]
+    ):
+        reducer_ops = OrderedSet(
+            [
+                c.op
+                for c in self._chunk_graph.successors(chunk)
+                if c.op.stage == OperandStage.reduce
+            ]
+        )
+        map_chunks = [
+            c
+            for c in self._chunk_graph.predecessors(chunk)
+            if c.op.stage == OperandStage.map
+        ]
+        map_reduce_id = self.next_map_reduce_id()
+        for map_chunk in map_chunks:
+            # record analyzer map reduce id for mapper op
+            # copied chunk exists because map chunk must have
+            # been processed before shuffle proxy
+            copied_map_chunk_op = self._chunk_to_copied[map_chunk].op
+            if not hasattr(copied_map_chunk_op, "extra_params"):
+                copied_map_chunk_op.extra_params = dict()
+            copied_map_chunk_op.extra_params["analyzer_map_reduce_id"] = map_reduce_id
+        reducer_bands = [assign_results[r.outputs[0]] for r in reducer_ops]
+        map_reduce_info = MapReduceInfo(
+            map_reduce_id=map_reduce_id,
+            reducer_indexes=[reducer_op.reducer_index for reducer_op in reducer_ops],
+            reducer_bands=reducer_bands,
+        )
+        self._map_reduce_id_to_infos[map_reduce_id] = map_reduce_info
+
     @enter_mode(build=True)
     def gen_subtask_graph(
         self, op_to_bands: Dict[str, BandType] = None
@@ -420,6 +462,10 @@ def gen_subtask_graph(
 
             for c in same_color_chunks:
                 chunk_to_subtask[c] = subtask
+            if self._map_reduce_id_to_infos is not None and isinstance(
+                chunk.op, ShuffleProxy
+            ):
+                self._gen_map_reduce_info(chunk, chunk_to_bands)
             visited.update(same_color_chunks)
 
         for subtasks in logic_key_to_subtasks.values():
diff --git a/mars/services/task/api/oscar.py b/mars/services/task/api/oscar.py
index 8127730f59..6505d7f8d1 100644
--- a/mars/services/task/api/oscar.py
+++ b/mars/services/task/api/oscar.py
@@ -18,7 +18,7 @@
 from ....core import Tileable
 from ....lib.aio import alru_cache
 from ...subtask import SubtaskResult
-from ..core import TileableGraph, TaskResult
+from ..core import TileableGraph, TaskResult, MapReduceInfo
 from ..supervisor.manager import TaskManagerActor
 from .core import AbstractTaskAPI
 
@@ -104,3 +104,8 @@ async def set_subtask_result(self, subtask_result: SubtaskResult):
 
     async def get_last_idle_time(self) -> Union[float, None]:
         return await self._task_manager_ref.get_last_idle_time()
+
+    async def get_map_reduce_info(
+        self, task_id: str, map_reduce_id: int
+    ) -> MapReduceInfo:
+        return await self._task_manager_ref.get_map_reduce_info(task_id, map_reduce_id)
diff --git a/mars/services/task/core.py b/mars/services/task/core.py
index 13353c6aa3..bc4a76a0ca 100644
--- a/mars/services/task/core.py
+++ b/mars/services/task/core.py
@@ -15,17 +15,20 @@
 import random
 from enum import Enum
 from string import ascii_letters, digits
-from typing import Any, Optional, Dict
+from typing import Any, Optional, Dict, List, Tuple
 
 from ...core import TileableGraph
+from ...typing import BandType
 from ...serialization.serializables import (
     Serializable,
+    FieldTypes,
     StringField,
     ReferenceField,
     Int32Field,
     BoolField,
     AnyField,
     DictField,
+    ListField,
     Float64Field,
 )
 
@@ -110,3 +113,17 @@ def __init__(
 
 def new_task_id():
     return "".join(random.choice(ascii_letters + digits) for _ in range(24))
+
+
+class MapReduceInfo(Serializable):
+    # record map reduce info during analyzing
+    # record reducer indexes, and assigned bands
+    map_reduce_id: int = Int32Field("map_reduce_id")
+    reducer_indexes: List[Tuple[int]] = ListField(
+        "reducer_indexes", FieldTypes.tuple(FieldTypes.int64), default_factory=list
+    )
+    reducer_bands: List[BandType] = ListField(
+        "reducer_bands",
+        FieldTypes.tuple(FieldTypes.string, FieldTypes.string),
+        default_factory=list,
+    )
diff --git a/mars/services/task/supervisor/manager.py b/mars/services/task/supervisor/manager.py
index d81ec5854f..416cdc3a68 100644
--- a/mars/services/task/supervisor/manager.py
+++ b/mars/services/task/supervisor/manager.py
@@ -25,7 +25,7 @@
 from ....core.operand import Fetch
 from ...subtask import SubtaskResult, SubtaskGraph
 from ..config import task_options
-from ..core import Task, new_task_id, TaskStatus
+from ..core import Task, new_task_id, TaskStatus, MapReduceInfo
 from ..errors import TaskNotExist
 from .preprocessor import TaskPreprocessor
 from .processor import TaskProcessor
@@ -326,3 +326,13 @@ async def get_last_idle_time(self):
             else:
                 self._last_idle_time = time.time()
         return self._last_idle_time
+
+    async def get_map_reduce_info(
+        self, task_id: str, map_reduce_id: int
+    ) -> MapReduceInfo:
+        try:
+            processor_ref = self._task_id_to_processor_ref[task_id]
+        except KeyError:  # pragma: no cover
+            raise TaskNotExist(f"Task {task_id} does not exist")
+
+        return await processor_ref.get_map_reduce_info(map_reduce_id)
diff --git a/mars/services/task/supervisor/preprocessor.py b/mars/services/task/supervisor/preprocessor.py
index e64b7da6aa..1d7f00ad6b 100644
--- a/mars/services/task/supervisor/preprocessor.py
+++ b/mars/services/task/supervisor/preprocessor.py
@@ -26,7 +26,7 @@
 from ....typing import BandType, TileableType, ChunkType
 from ...subtask import Subtask, SubtaskGraph
 from ..analyzer import GraphAnalyzer
-from ..core import Task
+from ..core import Task, MapReduceInfo
 
 logger = logging.getLogger(__name__)
 
@@ -112,9 +112,11 @@ class TaskPreprocessor:
         "chunk_optimization_records_list",
         "_cancelled",
         "_done",
+        "map_reduce_id_to_infos",
     )
 
     tile_context: TileContext
+    map_reduce_id_to_infos: Dict[int, MapReduceInfo]
 
     def __init__(
         self,
@@ -129,6 +131,7 @@ def __init__(
         self.tile_context = tiled_context
         self.tileable_optimization_records = None
         self.chunk_optimization_records_list = []
+        self.map_reduce_id_to_infos = dict()
 
         self._cancelled = asyncio.Event()
         self._done = asyncio.Event()
@@ -221,6 +224,7 @@ def analyze(
             self._config,
             chunk_to_subtasks,
             stage_id=stage_id,
+            map_reduce_id_to_infos=self.map_reduce_id_to_infos,
         )
         graph = analyzer.gen_subtask_graph(op_to_bands)
         logger.debug(
@@ -248,5 +252,8 @@ def get_tiled(self, tileable: TileableType):
         tileable = tileable.data if hasattr(tileable, "data") else tileable
         return self.tile_context[tileable]
 
+    def get_map_reduce_info(self, map_reduce_id: int) -> MapReduceInfo:
+        return self.map_reduce_id_to_infos[map_reduce_id]
+
     def __await__(self):
         return self._done.wait().__await__()
diff --git a/mars/services/task/supervisor/processor.py b/mars/services/task/supervisor/processor.py
index cfe239490d..c4cf78b743 100644
--- a/mars/services/task/supervisor/processor.py
+++ b/mars/services/task/supervisor/processor.py
@@ -30,7 +30,7 @@
 from ....typing import TileableType, ChunkType
 from ....utils import Timer
 from ...subtask import SubtaskResult, Subtask
-from ..core import Task, TaskResult, TaskStatus, new_task_id
+from ..core import Task, TaskResult, TaskStatus, new_task_id, MapReduceInfo
 from ..execution.api import TaskExecutor, ExecutionChunkResult
 from .preprocessor import TaskPreprocessor
 
@@ -407,6 +407,9 @@ def _gen_result(self):
             {"session_id": self._task.session_id, "task_id": self._task.task_id},
         )
 
+    def get_map_reduce_info(self, map_reduce_id: int) -> MapReduceInfo:
+        return self._preprocessor.get_map_reduce_info(map_reduce_id)
+
     def dump_subtask_graph(self):
         from .graph_visualizer import GraphVisualizer
 
diff --git a/mars/services/task/supervisor/task.py b/mars/services/task/supervisor/task.py
index 722f9fe50d..516fecc06b 100644
--- a/mars/services/task/supervisor/task.py
+++ b/mars/services/task/supervisor/task.py
@@ -25,7 +25,7 @@
 from ....typing import TileableType
 from ....utils import build_fetch
 from ...subtask import SubtaskResult, SubtaskStatus, SubtaskGraph
-from ..core import Task, TaskStatus
+from ..core import Task, TaskStatus, MapReduceInfo
 from ..execution.api import TaskExecutor
 from .preprocessor import TaskPreprocessor
 from .processor import TaskProcessor
@@ -417,6 +417,10 @@ async def set_subtask_result(self, subtask_result: SubtaskResult):
         if self._cur_processor is not None:
             await self._cur_processor.set_subtask_result(subtask_result)
 
+    async def get_map_reduce_info(self, map_reduce_id: int) -> MapReduceInfo:
+        for processor in self._task_id_to_processor.values():
+            return processor.get_map_reduce_info(map_reduce_id)
+
     def is_done(self) -> bool:
         for processor in self._task_id_to_processor.values():
             if not processor.is_done():
diff --git a/mars/services/task/supervisor/tests/test_task_manager.py b/mars/services/task/supervisor/tests/test_task_manager.py
index 749e381a8d..51735333c1 100644
--- a/mars/services/task/supervisor/tests/test_task_manager.py
+++ b/mars/services/task/supervisor/tests/test_task_manager.py
@@ -499,6 +499,25 @@ async def test_shuffle(actor_pool):
     )
     np.testing.assert_array_equal(result, expect)
 
+    # test generating map reduce info
+    subtask_graphs = (await manager.get_subtask_graphs(task_id))[0]
+    map_reduce_ids = []
+    for subtask in subtask_graphs:
+        for chunk in subtask.chunk_graph.result_chunks:
+            map_reduce_id = getattr(chunk.op, "extra_params", dict()).get(
+                "analyzer_map_reduce_id"
+            )
+            if map_reduce_id is not None:
+                map_reduce_ids.append(map_reduce_id)
+    assert len(map_reduce_ids) > 0
+    map_reduce_info = await manager.get_map_reduce_info(task_id, map_reduce_ids[0])
+    assert (
+        len(set(map_reduce_info.reducer_indexes))
+        == len(map_reduce_info.reducer_indexes)
+        == len(map_reduce_info.reducer_bands)
+        > 0
+    )
+
     # test ref counts
     assert (await lifecycle_api.get_tileable_ref_counts([c.key]))[0] == 1
     assert (
diff --git a/setup.cfg b/setup.cfg
index 00d405dda5..d08b1e0bb0 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -61,7 +61,7 @@ dev =
     pytest-cov>=2.5.0
     pytest-timeout>=1.2.0
     pytest-forked>=1.0
-    pytest-asyncio>=0.14.0
+    pytest-asyncio>=0.17.0
     mock>=4.0.0; python_version<"3.8"
     flake8>=3.8.0
     black