1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
|
import asyncio
import contextvars
import inspect
import warnings
from .case import TestCase
class IsolatedAsyncioTestCase(TestCase):
# Names intentionally have a long prefix
# to reduce a chance of clashing with user-defined attributes
# from inherited test case
#
# The class doesn't call loop.run_until_complete(self.setUp()) and family
# but uses a different approach:
# 1. create a long-running task that reads self.setUp()
# awaitable from queue along with a future
# 2. await the awaitable object passing in and set the result
# into the future object
# 3. Outer code puts the awaitable and the future object into a queue
# with waiting for the future
# The trick is necessary because every run_until_complete() call
# creates a new task with embedded ContextVar context.
# To share contextvars between setUp(), test and tearDown() we need to execute
# them inside the same task.
# Note: the test case modifies event loop policy if the policy was not instantiated
# yet.
# asyncio.get_event_loop_policy() creates a default policy on demand but never
# returns None
# I believe this is not an issue in user level tests but python itself for testing
# should reset a policy in every test module
# by calling asyncio.set_event_loop_policy(None) in tearDownModule()
def __init__(self, methodName='runTest'):
super().__init__(methodName)
self._asyncioRunner = None
self._asyncioTestContext = contextvars.copy_context()
async def asyncSetUp(self):
pass
async def asyncTearDown(self):
pass
def addAsyncCleanup(self, func, /, *args, **kwargs):
# A trivial trampoline to addCleanup()
# the function exists because it has a different semantics
# and signature:
# addCleanup() accepts regular functions
# but addAsyncCleanup() accepts coroutines
#
# We intentionally don't add inspect.iscoroutinefunction() check
# for func argument because there is no way
# to check for async function reliably:
# 1. It can be "async def func()" itself
# 2. Class can implement "async def __call__()" method
# 3. Regular "def func()" that returns awaitable object
self.addCleanup(*(func, *args), **kwargs)
def _callSetUp(self):
self._asyncioTestContext.run(self.setUp)
self._callAsync(self.asyncSetUp)
def _callTestMethod(self, method):
if self._callMaybeAsync(method) is not None:
warnings.warn(f'It is deprecated to return a value!=None from a '
f'test case ({method})', DeprecationWarning, stacklevel=4)
def _callTearDown(self):
self._callAsync(self.asyncTearDown)
self._asyncioTestContext.run(self.tearDown)
def _callCleanup(self, function, *args, **kwargs):
self._callMaybeAsync(function, *args, **kwargs)
def _callAsync(self, func, /, *args, **kwargs):
assert self._asyncioRunner is not None, 'asyncio runner is not initialized'
assert inspect.iscoroutinefunction(func), f'{func!r} is not an async function'
return self._asyncioRunner.run(
func(*args, **kwargs),
context=self._asyncioTestContext
)
def _callMaybeAsync(self, func, /, *args, **kwargs):
assert self._asyncioRunner is not None, 'asyncio runner is not initialized'
if inspect.iscoroutinefunction(func):
return self._asyncioRunner.run(
func(*args, **kwargs),
context=self._asyncioTestContext,
)
else:
return self._asyncioTestContext.run(func, *args, **kwargs)
def _setupAsyncioRunner(self):
assert self._asyncioRunner is None, 'asyncio runner is already initialized'
runner = asyncio.Runner(debug=True)
self._asyncioRunner = runner
def _tearDownAsyncioRunner(self):
runner = self._asyncioRunner
runner.close()
def run(self, result=None):
self._setupAsyncioRunner()
try:
return super().run(result)
finally:
self._tearDownAsyncioRunner()
def debug(self):
self._setupAsyncioRunner()
super().debug()
self._tearDownAsyncioRunner()
def __del__(self):
if self._asyncioRunner is not None:
self._tearDownAsyncioRunner()
|