aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/concurrent/futures
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/concurrent/futures')
-rw-r--r--Lib/concurrent/futures/_base.py3
-rw-r--r--Lib/concurrent/futures/process.py11
-rw-r--r--Lib/concurrent/futures/thread.py2
3 files changed, 11 insertions, 5 deletions
diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py
index ca3aebd9c76..3d0328049fe 100644
--- a/Lib/concurrent/futures/_base.py
+++ b/Lib/concurrent/futures/_base.py
@@ -198,8 +198,7 @@ def as_completed(fs, timeout=None):
waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
try:
- for future in finished:
- yield future
+ yield from finished
while pending:
if timeout is None:
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index adf2ab47594..07b5225d1da 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -40,7 +40,7 @@ Local worker thread:
Process #1..n:
- reads _CallItems from "Call Q", executes the calls, and puts the resulting
- _ResultItems in "Request Q"
+ _ResultItems in "Result Q"
"""
__author__ = 'Brian Quinlan (brian@sweetapp.com)'
@@ -49,8 +49,9 @@ import atexit
import os
from concurrent.futures import _base
import queue
+from queue import Full
import multiprocessing
-from multiprocessing.queues import SimpleQueue, Full
+from multiprocessing import SimpleQueue
from multiprocessing.connection import wait
import threading
import weakref
@@ -240,6 +241,8 @@ def _queue_management_worker(executor_reference,
"terminated abruptly while the future was "
"running or pending."
))
+ # Delete references to object. See issue16284
+ del work_item
pending_work_items.clear()
# Terminate remaining workers forcibly: the queues or their
# locks may be in a dirty state and block forever.
@@ -264,6 +267,8 @@ def _queue_management_worker(executor_reference,
work_item.future.set_exception(result_item.exception)
else:
work_item.future.set_result(result_item.result)
+ # Delete references to object. See issue16284
+ del work_item
# Check whether we should start shutting down.
executor = executor_reference()
# No more work items can be added if:
@@ -327,7 +332,7 @@ class ProcessPoolExecutor(_base.Executor):
_check_system_limits()
if max_workers is None:
- self._max_workers = multiprocessing.cpu_count()
+ self._max_workers = os.cpu_count() or 1
else:
self._max_workers = max_workers
diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py
index 95bb682565c..f9beb0f7f7e 100644
--- a/Lib/concurrent/futures/thread.py
+++ b/Lib/concurrent/futures/thread.py
@@ -63,6 +63,8 @@ def _worker(executor_reference, work_queue):
work_item = work_queue.get(block=True)
if work_item is not None:
work_item.run()
+ # Delete references to object. See issue16284
+ del work_item
continue
executor = executor_reference()
# Exit if: