aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/test
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test')
-rw-r--r--Lib/test/test_asyncio/test_taskgroups.py45
-rw-r--r--Lib/test/test_asyncio/test_timeouts.py48
-rw-r--r--Lib/test/test_asyncio/utils.py15
3 files changed, 107 insertions, 1 deletions
diff --git a/Lib/test/test_asyncio/test_taskgroups.py b/Lib/test/test_asyncio/test_taskgroups.py
index 6a0231f2859..7a18362b54e 100644
--- a/Lib/test/test_asyncio/test_taskgroups.py
+++ b/Lib/test/test_asyncio/test_taskgroups.py
@@ -8,6 +8,8 @@ import contextlib
from asyncio import taskgroups
import unittest
+from test.test_asyncio.utils import await_without_task
+
# To prevent a warning "test altered the execution environment"
def tearDownModule():
@@ -779,6 +781,49 @@ class TestTaskGroup(unittest.IsolatedAsyncioTestCase):
await asyncio.create_task(main())
+ async def test_taskgroup_already_entered(self):
+ tg = taskgroups.TaskGroup()
+ async with tg:
+ with self.assertRaisesRegex(RuntimeError, "has already been entered"):
+ async with tg:
+ pass
+
+ async def test_taskgroup_double_enter(self):
+ tg = taskgroups.TaskGroup()
+ async with tg:
+ pass
+ with self.assertRaisesRegex(RuntimeError, "has already been entered"):
+ async with tg:
+ pass
+
+ async def test_taskgroup_finished(self):
+ tg = taskgroups.TaskGroup()
+ async with tg:
+ pass
+ coro = asyncio.sleep(0)
+ with self.assertRaisesRegex(RuntimeError, "is finished"):
+ tg.create_task(coro)
+ # We still have to await coro to avoid a warning
+ await coro
+
+ async def test_taskgroup_not_entered(self):
+ tg = taskgroups.TaskGroup()
+ coro = asyncio.sleep(0)
+ with self.assertRaisesRegex(RuntimeError, "has not been entered"):
+ tg.create_task(coro)
+ # We still have to await coro to avoid a warning
+ await coro
+
+ async def test_taskgroup_without_parent_task(self):
+ tg = taskgroups.TaskGroup()
+ with self.assertRaisesRegex(RuntimeError, "parent task"):
+ await await_without_task(tg.__aenter__())
+ coro = asyncio.sleep(0)
+ with self.assertRaisesRegex(RuntimeError, "has not been entered"):
+ tg.create_task(coro)
+ # We still have to await coro to avoid a warning
+ await coro
+
if __name__ == "__main__":
unittest.main()
diff --git a/Lib/test/test_asyncio/test_timeouts.py b/Lib/test/test_asyncio/test_timeouts.py
index e9b59b95351..f54e79e4d8e 100644
--- a/Lib/test/test_asyncio/test_timeouts.py
+++ b/Lib/test/test_asyncio/test_timeouts.py
@@ -5,11 +5,12 @@ import time
import asyncio
+from test.test_asyncio.utils import await_without_task
+
def tearDownModule():
asyncio.set_event_loop_policy(None)
-
class TimeoutTests(unittest.IsolatedAsyncioTestCase):
async def test_timeout_basic(self):
@@ -257,6 +258,51 @@ class TimeoutTests(unittest.IsolatedAsyncioTestCase):
cause = exc.exception.__cause__
assert isinstance(cause, asyncio.CancelledError)
+ async def test_timeout_already_entered(self):
+ async with asyncio.timeout(0.01) as cm:
+ with self.assertRaisesRegex(RuntimeError, "has already been entered"):
+ async with cm:
+ pass
+
+ async def test_timeout_double_enter(self):
+ async with asyncio.timeout(0.01) as cm:
+ pass
+ with self.assertRaisesRegex(RuntimeError, "has already been entered"):
+ async with cm:
+ pass
+
+ async def test_timeout_finished(self):
+ async with asyncio.timeout(0.01) as cm:
+ pass
+ with self.assertRaisesRegex(RuntimeError, "finished"):
+ cm.reschedule(0.02)
+
+ async def test_timeout_expired(self):
+ with self.assertRaises(TimeoutError):
+ async with asyncio.timeout(0.01) as cm:
+ await asyncio.sleep(1)
+ with self.assertRaisesRegex(RuntimeError, "expired"):
+ cm.reschedule(0.02)
+
+ async def test_timeout_expiring(self):
+ async with asyncio.timeout(0.01) as cm:
+ with self.assertRaises(asyncio.CancelledError):
+ await asyncio.sleep(1)
+ with self.assertRaisesRegex(RuntimeError, "expiring"):
+ cm.reschedule(0.02)
+
+ async def test_timeout_not_entered(self):
+ cm = asyncio.timeout(0.01)
+ with self.assertRaisesRegex(RuntimeError, "has not been entered"):
+ cm.reschedule(0.02)
+
+ async def test_timeout_without_task(self):
+ cm = asyncio.timeout(0.01)
+ with self.assertRaisesRegex(RuntimeError, "task"):
+ await await_without_task(cm.__aenter__())
+ with self.assertRaisesRegex(RuntimeError, "has not been entered"):
+ cm.reschedule(0.02)
+
if __name__ == '__main__':
unittest.main()
diff --git a/Lib/test/test_asyncio/utils.py b/Lib/test/test_asyncio/utils.py
index 83fac4a26af..18869b3290a 100644
--- a/Lib/test/test_asyncio/utils.py
+++ b/Lib/test/test_asyncio/utils.py
@@ -612,3 +612,18 @@ def mock_nonblocking_socket(proto=socket.IPPROTO_TCP, type=socket.SOCK_STREAM,
sock.family = family
sock.gettimeout.return_value = 0.0
return sock
+
+
+async def await_without_task(coro):
+ exc = None
+ def func():
+ try:
+ for _ in coro.__await__():
+ pass
+ except BaseException as err:
+ nonlocal exc
+ exc = err
+ asyncio.get_running_loop().call_soon(func)
+ await asyncio.sleep(0)
+ if exc is not None:
+ raise exc