aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/test/test_asyncio/test_timeouts.py
diff options
context:
space:
mode:
authorSerhiy Storchaka <storchaka@gmail.com>2023-10-21 22:18:34 +0300
committerGitHub <noreply@github.com>2023-10-21 22:18:34 +0300
commit6c23635f2b7067ef091a550954e09f8b7c329e3f (patch)
treeb420a114d1b1ff9ca39f85b69ac68311d580ffe5 /Lib/test/test_asyncio/test_timeouts.py
parentfd60549c0ac6c81f05594a5141d24b4433ae39be (diff)
downloadcpython-6c23635f2b7067ef091a550954e09f8b7c329e3f.tar.gz
cpython-6c23635f2b7067ef091a550954e09f8b7c329e3f.zip
gh-111085: Fix invalid state handling in TaskGroup and Timeout (#111111)
asyncio.TaskGroup and asyncio.Timeout classes now raise proper RuntimeError if they are improperly used. * When they are used without entering the context manager. * When they are used after finishing. * When the context manager is entered more than once (simultaneously or sequentially). * If there is no current task when entering the context manager. They now remain in a consistent state after an exception is thrown, so subsequent operations can be performed correctly (if they are allowed). Co-authored-by: James Hilton-Balfe <gobot1234yt@gmail.com>
Diffstat (limited to 'Lib/test/test_asyncio/test_timeouts.py')
-rw-r--r--Lib/test/test_asyncio/test_timeouts.py48
1 files changed, 47 insertions, 1 deletions
diff --git a/Lib/test/test_asyncio/test_timeouts.py b/Lib/test/test_asyncio/test_timeouts.py
index e9b59b95351..f54e79e4d8e 100644
--- a/Lib/test/test_asyncio/test_timeouts.py
+++ b/Lib/test/test_asyncio/test_timeouts.py
@@ -5,11 +5,12 @@ import time
import asyncio
+from test.test_asyncio.utils import await_without_task
+
def tearDownModule():
asyncio.set_event_loop_policy(None)
-
class TimeoutTests(unittest.IsolatedAsyncioTestCase):
async def test_timeout_basic(self):
@@ -257,6 +258,51 @@ class TimeoutTests(unittest.IsolatedAsyncioTestCase):
cause = exc.exception.__cause__
assert isinstance(cause, asyncio.CancelledError)
+ async def test_timeout_already_entered(self):
+ async with asyncio.timeout(0.01) as cm:
+ with self.assertRaisesRegex(RuntimeError, "has already been entered"):
+ async with cm:
+ pass
+
+ async def test_timeout_double_enter(self):
+ async with asyncio.timeout(0.01) as cm:
+ pass
+ with self.assertRaisesRegex(RuntimeError, "has already been entered"):
+ async with cm:
+ pass
+
+ async def test_timeout_finished(self):
+ async with asyncio.timeout(0.01) as cm:
+ pass
+ with self.assertRaisesRegex(RuntimeError, "finished"):
+ cm.reschedule(0.02)
+
+ async def test_timeout_expired(self):
+ with self.assertRaises(TimeoutError):
+ async with asyncio.timeout(0.01) as cm:
+ await asyncio.sleep(1)
+ with self.assertRaisesRegex(RuntimeError, "expired"):
+ cm.reschedule(0.02)
+
+ async def test_timeout_expiring(self):
+ async with asyncio.timeout(0.01) as cm:
+ with self.assertRaises(asyncio.CancelledError):
+ await asyncio.sleep(1)
+ with self.assertRaisesRegex(RuntimeError, "expiring"):
+ cm.reschedule(0.02)
+
+ async def test_timeout_not_entered(self):
+ cm = asyncio.timeout(0.01)
+ with self.assertRaisesRegex(RuntimeError, "has not been entered"):
+ cm.reschedule(0.02)
+
+ async def test_timeout_without_task(self):
+ cm = asyncio.timeout(0.01)
+ with self.assertRaisesRegex(RuntimeError, "task"):
+ await await_without_task(cm.__aenter__())
+ with self.assertRaisesRegex(RuntimeError, "has not been entered"):
+ cm.reschedule(0.02)
+
if __name__ == '__main__':
unittest.main()