Skip to content

Commit ec78e22

Browse files
committed
Don't spin up new threads if there are existing idle ones
Fixes #88.
1 parent 1f9c23f commit ec78e22

File tree

4 files changed

+48
-3
lines changed

4 files changed

+48
-3
lines changed

CHANGES.rst

+6
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
3.3.0
2+
=====
3+
4+
- Backported bpo-24882: Let ThreadPoolExecutor reuse idle threads before creating new thread
5+
6+
17
3.2.0
28
=====
39

concurrent/futures/thread.py

+12-2
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,12 @@ def _worker(executor_reference, work_queue):
7575
work_item.run()
7676
# Delete references to object. See issue16284
7777
del work_item
78+
79+
# attempt to increment idle count
80+
executor = executor_reference()
81+
if executor is not None:
82+
executor._idle_semaphore.release()
83+
del executor
7884
continue
7985
executor = executor_reference()
8086
# Exit if:
@@ -112,6 +118,7 @@ def __init__(self, max_workers=None, thread_name_prefix=''):
112118

113119
self._max_workers = max_workers
114120
self._work_queue = queue.Queue()
121+
self._idle_semaphore = threading.Semaphore(0)
115122
self._threads = set()
116123
self._shutdown = False
117124
self._shutdown_lock = threading.Lock()
@@ -132,12 +139,15 @@ def submit(self, fn, *args, **kwargs):
132139
submit.__doc__ = _base.Executor.submit.__doc__
133140

134141
def _adjust_thread_count(self):
142+
# if idle threads are available, don't spin new threads
143+
if self._idle_semaphore.acquire(False):
144+
return
145+
135146
# When the executor gets lost, the weakref callback will wake up
136147
# the worker threads.
137148
def weakref_cb(_, q=self._work_queue):
138149
q.put(None)
139-
# TODO(bquinlan): Should avoid creating new threads if there are more
140-
# idle threads than items in the work queue.
150+
141151
num_threads = len(self._threads)
142152
if num_threads < self._max_workers:
143153
thread_name = '%s_%d' % (self._thread_name_prefix or self,

setup.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
readme = f.read()
2828

2929
setup(name='futures',
30-
version='3.2.0',
30+
version='3.3.0',
3131
description='Backport of the concurrent.futures package from Python 3',
3232
long_description=readme,
3333
author='Brian Quinlan',

test_futures.py

+29
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,15 @@ def test_threads_terminate(self):
206206
self.executor.submit(mul, 21, 2)
207207
self.executor.submit(mul, 6, 7)
208208
self.executor.submit(mul, 3, 14)
209+
def acquire_lock(lock):
210+
lock.acquire()
211+
212+
sem = threading.Semaphore(0)
213+
for i in range(3):
214+
self.executor.submit(acquire_lock, sem)
209215
self.assertEqual(len(self.executor._threads), 3)
216+
for i in range(3):
217+
sem.release()
210218
self.executor.shutdown()
211219
for t in self.executor._threads:
212220
t.join()
@@ -529,6 +537,27 @@ def test_default_workers(self):
529537
self.assertEqual(executor._max_workers,
530538
(cpu_count() or 1) * 5)
531539

540+
def test_saturation(self):
541+
executor = self.executor_type(4)
542+
def acquire_lock(lock):
543+
lock.acquire()
544+
545+
sem = threading.Semaphore(0)
546+
for i in range(15 * executor._max_workers):
547+
executor.submit(acquire_lock, sem)
548+
self.assertEqual(len(executor._threads), executor._max_workers)
549+
for i in range(15 * executor._max_workers):
550+
sem.release()
551+
executor.shutdown(wait=True)
552+
553+
def test_idle_thread_reuse(self):
554+
executor = self.executor_type()
555+
executor.submit(mul, 21, 2).result()
556+
executor.submit(mul, 6, 7).result()
557+
executor.submit(mul, 3, 14).result()
558+
self.assertEqual(len(executor._threads), 1)
559+
executor.shutdown(wait=True)
560+
532561

533562
class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest):
534563
pass

0 commit comments

Comments
 (0)