diff options
Diffstat (limited to 'Lib/test')
-rw-r--r-- | Lib/test/test_asyncio/test_taskgroups.py | 45 | ||||
-rw-r--r-- | Lib/test/test_asyncio/test_timeouts.py | 48 | ||||
-rw-r--r-- | Lib/test/test_asyncio/utils.py | 15 |
3 files changed, 107 insertions, 1 deletions
diff --git a/Lib/test/test_asyncio/test_taskgroups.py b/Lib/test/test_asyncio/test_taskgroups.py index 6a0231f2859..7a18362b54e 100644 --- a/Lib/test/test_asyncio/test_taskgroups.py +++ b/Lib/test/test_asyncio/test_taskgroups.py @@ -8,6 +8,8 @@ import contextlib from asyncio import taskgroups import unittest +from test.test_asyncio.utils import await_without_task + # To prevent a warning "test altered the execution environment" def tearDownModule(): @@ -779,6 +781,49 @@ class TestTaskGroup(unittest.IsolatedAsyncioTestCase): await asyncio.create_task(main()) + async def test_taskgroup_already_entered(self): + tg = taskgroups.TaskGroup() + async with tg: + with self.assertRaisesRegex(RuntimeError, "has already been entered"): + async with tg: + pass + + async def test_taskgroup_double_enter(self): + tg = taskgroups.TaskGroup() + async with tg: + pass + with self.assertRaisesRegex(RuntimeError, "has already been entered"): + async with tg: + pass + + async def test_taskgroup_finished(self): + tg = taskgroups.TaskGroup() + async with tg: + pass + coro = asyncio.sleep(0) + with self.assertRaisesRegex(RuntimeError, "is finished"): + tg.create_task(coro) + # We still have to await coro to avoid a warning + await coro + + async def test_taskgroup_not_entered(self): + tg = taskgroups.TaskGroup() + coro = asyncio.sleep(0) + with self.assertRaisesRegex(RuntimeError, "has not been entered"): + tg.create_task(coro) + # We still have to await coro to avoid a warning + await coro + + async def test_taskgroup_without_parent_task(self): + tg = taskgroups.TaskGroup() + with self.assertRaisesRegex(RuntimeError, "parent task"): + await await_without_task(tg.__aenter__()) + coro = asyncio.sleep(0) + with self.assertRaisesRegex(RuntimeError, "has not been entered"): + tg.create_task(coro) + # We still have to await coro to avoid a warning + await coro + if __name__ == "__main__": unittest.main() diff --git a/Lib/test/test_asyncio/test_timeouts.py b/Lib/test/test_asyncio/test_timeouts.py index e9b59b95351..f54e79e4d8e 100644 --- a/Lib/test/test_asyncio/test_timeouts.py +++ b/Lib/test/test_asyncio/test_timeouts.py @@ -5,11 +5,12 @@ import time import asyncio +from test.test_asyncio.utils import await_without_task + def tearDownModule(): asyncio.set_event_loop_policy(None) - class TimeoutTests(unittest.IsolatedAsyncioTestCase): async def test_timeout_basic(self): @@ -257,6 +258,51 @@ class TimeoutTests(unittest.IsolatedAsyncioTestCase): cause = exc.exception.__cause__ assert isinstance(cause, asyncio.CancelledError) + async def test_timeout_already_entered(self): + async with asyncio.timeout(0.01) as cm: + with self.assertRaisesRegex(RuntimeError, "has already been entered"): + async with cm: + pass + + async def test_timeout_double_enter(self): + async with asyncio.timeout(0.01) as cm: + pass + with self.assertRaisesRegex(RuntimeError, "has already been entered"): + async with cm: + pass + + async def test_timeout_finished(self): + async with asyncio.timeout(0.01) as cm: + pass + with self.assertRaisesRegex(RuntimeError, "finished"): + cm.reschedule(0.02) + + async def test_timeout_expired(self): + with self.assertRaises(TimeoutError): + async with asyncio.timeout(0.01) as cm: + await asyncio.sleep(1) + with self.assertRaisesRegex(RuntimeError, "expired"): + cm.reschedule(0.02) + + async def test_timeout_expiring(self): + async with asyncio.timeout(0.01) as cm: + with self.assertRaises(asyncio.CancelledError): + await asyncio.sleep(1) + with self.assertRaisesRegex(RuntimeError, "expiring"): + cm.reschedule(0.02) + + async def test_timeout_not_entered(self): + cm = asyncio.timeout(0.01) + with self.assertRaisesRegex(RuntimeError, "has not been entered"): + cm.reschedule(0.02) + + async def test_timeout_without_task(self): + cm = asyncio.timeout(0.01) + with self.assertRaisesRegex(RuntimeError, "task"): + await await_without_task(cm.__aenter__()) + with self.assertRaisesRegex(RuntimeError, "has not been entered"): + cm.reschedule(0.02) + if __name__ == '__main__': unittest.main() diff --git a/Lib/test/test_asyncio/utils.py b/Lib/test/test_asyncio/utils.py index 83fac4a26af..18869b3290a 100644 --- a/Lib/test/test_asyncio/utils.py +++ b/Lib/test/test_asyncio/utils.py @@ -612,3 +612,18 @@ def mock_nonblocking_socket(proto=socket.IPPROTO_TCP, type=socket.SOCK_STREAM, sock.family = family sock.gettimeout.return_value = 0.0 return sock + + +async def await_without_task(coro): + exc = None + def func(): + try: + for _ in coro.__await__(): + pass + except BaseException as err: + nonlocal exc + exc = err + asyncio.get_running_loop().call_soon(func) + await asyncio.sleep(0) + if exc is not None: + raise exc |