aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/test/test_asyncio/test_asyncio_waitfor.py
diff options
context:
space:
mode:
authorAndrew Svetlov <andrew.svetlov@gmail.com>2022-03-16 21:49:18 +0200
committerGitHub <noreply@github.com>2022-03-16 21:49:18 +0200
commitdd0082c627713634c7fd88ad33d18b5cc9f4a7b8 (patch)
treee9f434eaa1982990b396cd360617b595f285dec9 /Lib/test/test_asyncio/test_asyncio_waitfor.py
parentdbbe4d2d0075fa0e95b069fb4780d79aae3514c7 (diff)
downloadcpython-dd0082c627713634c7fd88ad33d18b5cc9f4a7b8.tar.gz
cpython-dd0082c627713634c7fd88ad33d18b5cc9f4a7b8.zip
bpo-47038: Rewrite asyncio.wait_for test to use IsolatedAsyncioTestCase (GH-31942)
Diffstat (limited to 'Lib/test/test_asyncio/test_asyncio_waitfor.py')
-rw-r--r--Lib/test/test_asyncio/test_asyncio_waitfor.py61
1 files changed, 0 insertions, 61 deletions
diff --git a/Lib/test/test_asyncio/test_asyncio_waitfor.py b/Lib/test/test_asyncio/test_asyncio_waitfor.py
deleted file mode 100644
index 2ca64abbeb5..00000000000
--- a/Lib/test/test_asyncio/test_asyncio_waitfor.py
+++ /dev/null
@@ -1,61 +0,0 @@
-import asyncio
-import unittest
-import time
-
-def tearDownModule():
- asyncio.set_event_loop_policy(None)
-
-
-class SlowTask:
- """ Task will run for this defined time, ignoring cancel requests """
- TASK_TIMEOUT = 0.2
-
- def __init__(self):
- self.exited = False
-
- async def run(self):
- exitat = time.monotonic() + self.TASK_TIMEOUT
-
- while True:
- tosleep = exitat - time.monotonic()
- if tosleep <= 0:
- break
-
- try:
- await asyncio.sleep(tosleep)
- except asyncio.CancelledError:
- pass
-
- self.exited = True
-
-class AsyncioWaitForTest(unittest.TestCase):
-
- async def atest_asyncio_wait_for_cancelled(self):
- t = SlowTask()
-
- waitfortask = asyncio.create_task(asyncio.wait_for(t.run(), t.TASK_TIMEOUT * 2))
- await asyncio.sleep(0)
- waitfortask.cancel()
- await asyncio.wait({waitfortask})
-
- self.assertTrue(t.exited)
-
- def test_asyncio_wait_for_cancelled(self):
- asyncio.run(self.atest_asyncio_wait_for_cancelled())
-
- async def atest_asyncio_wait_for_timeout(self):
- t = SlowTask()
-
- try:
- await asyncio.wait_for(t.run(), t.TASK_TIMEOUT / 2)
- except asyncio.TimeoutError:
- pass
-
- self.assertTrue(t.exited)
-
- def test_asyncio_wait_for_timeout(self):
- asyncio.run(self.atest_asyncio_wait_for_timeout())
-
-
-if __name__ == '__main__':
- unittest.main()