diff --git a/changelog/884.bugfix b/changelog/884.bugfix new file mode 100644 index 00000000..8e53551b --- /dev/null +++ b/changelog/884.bugfix @@ -0,0 +1 @@ +Fixed hang in ``worksteal`` scheduler. diff --git a/src/xdist/remote.py b/src/xdist/remote.py index 2e83a8dc..e035f772 100644 --- a/src/xdist/remote.py +++ b/src/xdist/remote.py @@ -58,6 +58,7 @@ def worker_title(title): class WorkerInteractor: SHUTDOWN_MARK = object() + QUEUE_REPLACED_MARK = object() def __init__(self, config, channel): self.config = config @@ -72,6 +73,15 @@ def __init__(self, config, channel): def _make_queue(self): return self.channel.gateway.execmodel.queue.Queue() + def _get_next_item_index(self): + """Gets the next item from test queue. Handles the case when the queue + is replaced concurrently in another thread. + """ + result = self.torun.get() + while result is self.QUEUE_REPLACED_MARK: + result = self.torun.get() + return result + def sendevent(self, name, **kwargs): self.log("sending", name, kwargs) self.channel.send((name, kwargs)) @@ -136,19 +146,22 @@ def old_queue_get_nowait_noraise(): self.torun.put(i) self.sendevent("unscheduled", indices=stolen) + old_queue.put(self.QUEUE_REPLACED_MARK) @pytest.hookimpl def pytest_runtestloop(self, session): self.log("entering main loop") self.channel.setcallback(self.handle_command, endmarker=self.SHUTDOWN_MARK) - self.nextitem_index = self.torun.get() + self.nextitem_index = self._get_next_item_index() while self.nextitem_index is not self.SHUTDOWN_MARK: self.run_one_test() return True def run_one_test(self): + self.item_index = self.nextitem_index + self.nextitem_index = self._get_next_item_index() + items = self.session.items - self.item_index, self.nextitem_index = self.nextitem_index, self.torun.get() item = items[self.item_index] if self.nextitem_index is self.SHUTDOWN_MARK: nextitem = None diff --git a/testing/test_remote.py b/testing/test_remote.py index cb8f6b7f..2d250c5b 100644 --- a/testing/test_remote.py +++ b/testing/test_remote.py @@ -271,6 +271,40 @@ def test_func4(): pass ev = worker.popevent("workerfinished") assert "workeroutput" in ev.kwargs + def test_steal_empty_queue(self, worker: WorkerSetup, unserialize_report) -> None: + worker.pytester.makepyfile( + """ + def test_func(): pass + def test_func2(): pass + """ + ) + worker.setup() + ev = worker.popevent("collectionfinish") + ids = ev.kwargs["ids"] + assert len(ids) == 2 + worker.sendcommand("runtests_all") + + for when in ["setup", "call", "teardown"]: + ev = worker.popevent("testreport") + rep = unserialize_report(ev.kwargs["data"]) + assert rep.nodeid.endswith("::test_func") + assert rep.when == when + + worker.sendcommand("steal", indices=[0, 1]) + ev = worker.popevent("unscheduled") + assert ev.kwargs["indices"] == [] + + worker.sendcommand("shutdown") + + for when in ["setup", "call", "teardown"]: + ev = worker.popevent("testreport") + rep = unserialize_report(ev.kwargs["data"]) + assert rep.nodeid.endswith("::test_func2") + assert rep.when == when + + ev = worker.popevent("workerfinished") + assert "workeroutput" in ev.kwargs + def test_remote_env_vars(pytester: pytest.Pytester) -> None: pytester.makepyfile(