diff options
Diffstat (limited to 'Lib/test/test_remote_pdb.py')
-rw-r--r-- | Lib/test/test_remote_pdb.py | 162 |
1 files changed, 154 insertions, 8 deletions
diff --git a/Lib/test/test_remote_pdb.py b/Lib/test/test_remote_pdb.py index 9c794991dd5..a1c50af15f3 100644 --- a/Lib/test/test_remote_pdb.py +++ b/Lib/test/test_remote_pdb.py @@ -1,22 +1,19 @@ import io -import time import itertools import json import os +import re import signal import socket import subprocess import sys -import tempfile import textwrap -import threading import unittest import unittest.mock -from contextlib import closing, contextmanager, redirect_stdout, ExitStack -from pathlib import Path -from test.support import is_wasi, os_helper, requires_subprocess, SHORT_TIMEOUT -from test.support.os_helper import temp_dir, TESTFN, unlink -from typing import Dict, List, Optional, Tuple, Union, Any +from contextlib import closing, contextmanager, redirect_stdout, redirect_stderr, ExitStack +from test.support import is_wasi, cpython_only, force_color, requires_subprocess, SHORT_TIMEOUT +from test.support.os_helper import TESTFN, unlink +from typing import List import pdb from pdb import _PdbServer, _PdbClient @@ -1040,6 +1037,7 @@ class PdbConnectTestCase(unittest.TestCase): commands="", version=pdb._PdbServer.protocol_version(), signal_raising_thread=False, + colorize=False, ) return x # This line won't be reached in debugging @@ -1210,6 +1208,7 @@ class PdbConnectTestCase(unittest.TestCase): commands="", version=pdb._PdbServer.protocol_version(), signal_raising_thread=True, + colorize=False, ) print("Connected to debugger") iterations = 50 @@ -1301,6 +1300,7 @@ class PdbConnectTestCase(unittest.TestCase): commands="", version=fake_version, signal_raising_thread=False, + colorize=False, ) # This should print if the debugger detaches correctly @@ -1428,5 +1428,151 @@ class PdbConnectTestCase(unittest.TestCase): self.assertIn("Function returned: 42", stdout) self.assertEqual(process.returncode, 0) + +def _supports_remote_attaching(): + PROCESS_VM_READV_SUPPORTED = False + + try: + from _remote_debugging import PROCESS_VM_READV_SUPPORTED + except ImportError: + pass + + return PROCESS_VM_READV_SUPPORTED + + +@unittest.skipIf(not sys.is_remote_debug_enabled(), "Remote debugging is not enabled") +@unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux" and sys.platform != "win32", + "Test only runs on Linux, Windows and MacOS") +@unittest.skipIf(sys.platform == "linux" and not _supports_remote_attaching(), + "Testing on Linux requires process_vm_readv support") +@cpython_only +@requires_subprocess() +class PdbAttachTestCase(unittest.TestCase): + def setUp(self): + # Create a server socket that will wait for the debugger to connect + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock.bind(('127.0.0.1', 0)) # Let OS assign port + self.sock.listen(1) + self.port = self.sock.getsockname()[1] + self._create_script() + + def _create_script(self, script=None): + # Create a file for subprocess script + script = textwrap.dedent( + f""" + import socket + import time + + def foo(): + return bar() + + def bar(): + return baz() + + def baz(): + x = 1 + # Trigger attach + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect(('127.0.0.1', {self.port})) + sock.close() + count = 0 + while x == 1 and count < 100: + count += 1 + time.sleep(0.1) + return x + + result = foo() + print(f"Function returned: {{result}}") + """ + ) + + self.script_path = TESTFN + "_connect_test.py" + with open(self.script_path, 'w') as f: + f.write(script) + + def tearDown(self): + self.sock.close() + try: + unlink(self.script_path) + except OSError: + pass + + def do_integration_test(self, client_stdin): + process = subprocess.Popen( + [sys.executable, self.script_path], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True + ) + self.addCleanup(process.stdout.close) + self.addCleanup(process.stderr.close) + + # Wait for the process to reach our attachment point + self.sock.settimeout(10) + conn, _ = self.sock.accept() + conn.close() + + client_stdin = io.StringIO(client_stdin) + client_stdout = io.StringIO() + client_stderr = io.StringIO() + + self.addCleanup(client_stdin.close) + self.addCleanup(client_stdout.close) + self.addCleanup(client_stderr.close) + self.addCleanup(process.wait) + + with ( + unittest.mock.patch("sys.stdin", client_stdin), + redirect_stdout(client_stdout), + redirect_stderr(client_stderr), + unittest.mock.patch("sys.argv", ["pdb", "-p", str(process.pid)]), + ): + try: + pdb.main() + except PermissionError: + self.skipTest("Insufficient permissions for remote execution") + + process.wait() + server_stdout = process.stdout.read() + server_stderr = process.stderr.read() + + if process.returncode != 0: + print("server failed") + print(f"server stdout:\n{server_stdout}") + print(f"server stderr:\n{server_stderr}") + + self.assertEqual(process.returncode, 0) + return { + "client": { + "stdout": client_stdout.getvalue(), + "stderr": client_stderr.getvalue(), + }, + "server": { + "stdout": server_stdout, + "stderr": server_stderr, + }, + } + + def test_attach_to_process_without_colors(self): + with force_color(False): + output = self.do_integration_test("ll\nx=42\n") + self.assertEqual(output["client"]["stderr"], "") + self.assertEqual(output["server"]["stderr"], "") + + self.assertEqual(output["server"]["stdout"], "Function returned: 42\n") + self.assertIn("while x == 1", output["client"]["stdout"]) + self.assertNotIn("\x1b", output["client"]["stdout"]) + + def test_attach_to_process_with_colors(self): + with force_color(True): + output = self.do_integration_test("ll\nx=42\n") + self.assertEqual(output["client"]["stderr"], "") + self.assertEqual(output["server"]["stderr"], "") + + self.assertEqual(output["server"]["stdout"], "Function returned: 42\n") + self.assertIn("\x1b", output["client"]["stdout"]) + self.assertNotIn("while x == 1", output["client"]["stdout"]) + self.assertIn("while x == 1", re.sub("\x1b[^m]*m", "", output["client"]["stdout"])) + if __name__ == "__main__": unittest.main() |