diff options
Diffstat (limited to 'Lib/concurrent/futures/thread.py')
-rw-r--r-- | Lib/concurrent/futures/thread.py | 60 |
1 files changed, 29 insertions, 31 deletions
diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py index 15736daa525..93b495f55a3 100644 --- a/Lib/concurrent/futures/thread.py +++ b/Lib/concurrent/futures/thread.py @@ -25,28 +25,17 @@ import weakref # workers to exit when their work queues are empty and then waits until the # threads finish. -_thread_references = set() +_threads_queues = weakref.WeakKeyDictionary() _shutdown = False def _python_exit(): global _shutdown _shutdown = True - for thread_reference in _thread_references: - thread = thread_reference() - if thread is not None: - thread.join() - -def _remove_dead_thread_references(): - """Remove inactive threads from _thread_references. - - Should be called periodically to prevent memory leaks in scenarios such as: - >>> while True: - ... t = ThreadPoolExecutor(max_workers=5) - ... t.map(int, ['1', '2', '3', '4', '5']) - """ - for thread_reference in set(_thread_references): - if thread_reference() is None: - _thread_references.discard(thread_reference) + items = list(_threads_queues.items()) + for t, q in items: + q.put(None) + for t, q in items: + t.join() atexit.register(_python_exit) @@ -72,18 +61,23 @@ def _worker(executor_reference, work_queue): try: while True: try: - work_item = work_queue.get(block=True, timeout=0.1) + work_item = work_queue.get(block=True) except queue.Empty: - executor = executor_reference() - # Exit if: - # - The interpreter is shutting down OR - # - The executor that owns the worker has been collected OR - # - The executor that owns the worker has been shutdown. - if _shutdown or executor is None or executor._shutdown: - return - del executor + pass else: - work_item.run() + if work_item is not None: + work_item.run() + continue + executor = executor_reference() + # Exit if: + # - The interpreter is shutting down OR + # - The executor that owns the worker has been collected OR + # - The executor that owns the worker has been shutdown. + if _shutdown or executor is None or executor._shutdown: + # Notice other workers + work_queue.put(None) + return + del executor except BaseException as e: _base.LOGGER.critical('Exception in worker', exc_info=True) @@ -95,8 +89,6 @@ class ThreadPoolExecutor(_base.Executor): max_workers: The maximum number of threads that can be used to execute the given calls. """ - _remove_dead_thread_references() - self._max_workers = max_workers self._work_queue = queue.Queue() self._threads = set() @@ -117,19 +109,25 @@ class ThreadPoolExecutor(_base.Executor): submit.__doc__ = _base.Executor.submit.__doc__ def _adjust_thread_count(self): + # When the executor gets lost, the weakref callback will wake up + # the worker threads. + def weakref_cb(_, q=self._work_queue): + q.put(None) # TODO(bquinlan): Should avoid creating new threads if there are more # idle threads than items in the work queue. if len(self._threads) < self._max_workers: t = threading.Thread(target=_worker, - args=(weakref.ref(self), self._work_queue)) + args=(weakref.ref(self, weakref_cb), + self._work_queue)) t.daemon = True t.start() self._threads.add(t) - _thread_references.add(weakref.ref(t)) + _threads_queues[t] = self._work_queue def shutdown(self, wait=True): with self._shutdown_lock: self._shutdown = True + self._work_queue.put(None) if wait: for t in self._threads: t.join() |