aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/concurrent/futures/interpreter.py
blob: cbb60ce80c181355a19e86c85cdda212aa21c962 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
"""Implements InterpreterPoolExecutor."""

from concurrent import interpreters
import sys
import textwrap
from . import thread as _thread
import traceback


def do_call(results, func, args, kwargs):
    try:
        return func(*args, **kwargs)
    except BaseException as exc:
        # Send the captured exception out on the results queue,
        # but still leave it unhandled for the interpreter to handle.
        try:
            results.put(exc)
        except interpreters.NotShareableError:
            # The exception is not shareable.
            print('exception is not shareable:', file=sys.stderr)
            traceback.print_exception(exc)
            results.put(None)
        raise  # re-raise


class WorkerContext(_thread.WorkerContext):

    @classmethod
    def prepare(cls, initializer, initargs):
        def resolve_task(fn, args, kwargs):
            if isinstance(fn, str):
                # XXX Circle back to this later.
                raise TypeError('scripts not supported')
            else:
                task = (fn, args, kwargs)
            return task

        if initializer is not None:
            try:
                initdata = resolve_task(initializer, initargs, {})
            except ValueError:
                if isinstance(initializer, str) and initargs:
                    raise ValueError(f'an initializer script does not take args, got {initargs!r}')
                raise  # re-raise
        else:
            initdata = None
        def create_context():
            return cls(initdata)
        return create_context, resolve_task

    def __init__(self, initdata):
        self.initdata = initdata
        self.interp = None
        self.results = None

    def __del__(self):
        if self.interp is not None:
            self.finalize()

    def initialize(self):
        assert self.interp is None, self.interp
        self.interp = interpreters.create()
        try:
            maxsize = 0
            self.results = interpreters.create_queue(maxsize)

            if self.initdata:
                self.run(self.initdata)
        except BaseException:
            self.finalize()
            raise  # re-raise

    def finalize(self):
        interp = self.interp
        results = self.results
        self.results = None
        self.interp = None
        if results is not None:
            del results
        if interp is not None:
            interp.close()

    def run(self, task):
        try:
            return self.interp.call(do_call, self.results, *task)
        except interpreters.ExecutionFailed as wrapper:
            # Wait for the exception data to show up.
            exc = self.results.get()
            if exc is None:
                # The exception must have been not shareable.
                raise  # re-raise
            raise exc from wrapper


class BrokenInterpreterPool(_thread.BrokenThreadPool):
    """
    Raised when a worker thread in an InterpreterPoolExecutor failed initializing.
    """


class InterpreterPoolExecutor(_thread.ThreadPoolExecutor):

    BROKEN = BrokenInterpreterPool

    @classmethod
    def prepare_context(cls, initializer, initargs):
        return WorkerContext.prepare(initializer, initargs)

    def __init__(self, max_workers=None, thread_name_prefix='',
                 initializer=None, initargs=()):
        """Initializes a new InterpreterPoolExecutor instance.

        Args:
            max_workers: The maximum number of interpreters that can be used to
                execute the given calls.
            thread_name_prefix: An optional name prefix to give our threads.
            initializer: A callable or script used to initialize
                each worker interpreter.
            initargs: A tuple of arguments to pass to the initializer.
        """
        super().__init__(max_workers, thread_name_prefix,
                         initializer, initargs)