diff options
Diffstat (limited to 'Lib/test/test_remote_pdb.py')
-rw-r--r-- | Lib/test/test_remote_pdb.py | 457 |
1 files changed, 319 insertions, 138 deletions
diff --git a/Lib/test/test_remote_pdb.py b/Lib/test/test_remote_pdb.py index e4c44c78d4a..a1c50af15f3 100644 --- a/Lib/test/test_remote_pdb.py +++ b/Lib/test/test_remote_pdb.py @@ -1,22 +1,19 @@ import io -import time import itertools import json import os +import re import signal import socket import subprocess import sys -import tempfile import textwrap -import threading import unittest import unittest.mock -from contextlib import contextmanager, redirect_stdout, ExitStack -from pathlib import Path -from test.support import is_wasi, os_helper, requires_subprocess, SHORT_TIMEOUT -from test.support.os_helper import temp_dir, TESTFN, unlink -from typing import Dict, List, Optional, Tuple, Union, Any +from contextlib import closing, contextmanager, redirect_stdout, redirect_stderr, ExitStack +from test.support import is_wasi, cpython_only, force_color, requires_subprocess, SHORT_TIMEOUT +from test.support.os_helper import TESTFN, unlink +from typing import List import pdb from pdb import _PdbServer, _PdbClient @@ -79,44 +76,6 @@ class MockSocketFile: return results -class MockDebuggerSocket: - """Mock file-like simulating a connection to a _RemotePdb instance""" - - def __init__(self, incoming): - self.incoming = iter(incoming) - self.outgoing = [] - self.buffered = bytearray() - - def write(self, data: bytes) -> None: - """Simulate write to socket.""" - self.buffered += data - - def flush(self) -> None: - """Ensure each line is valid JSON.""" - lines = self.buffered.splitlines(keepends=True) - self.buffered.clear() - for line in lines: - assert line.endswith(b"\n") - self.outgoing.append(json.loads(line)) - - def readline(self) -> bytes: - """Read a line from the prepared input queue.""" - # Anything written must be flushed before trying to read, - # since the read will be dependent upon the last write. - assert not self.buffered - try: - item = next(self.incoming) - if not isinstance(item, bytes): - item = json.dumps(item).encode() - return item + b"\n" - except StopIteration: - return b"" - - def close(self) -> None: - """No-op close implementation.""" - pass - - class PdbClientTestCase(unittest.TestCase): """Tests for the _PdbClient class.""" @@ -124,8 +83,11 @@ class PdbClientTestCase(unittest.TestCase): self, *, incoming, - simulate_failure=None, + simulate_send_failure=False, + simulate_sigint_during_stdout_write=False, + use_interrupt_socket=False, expected_outgoing=None, + expected_outgoing_signals=None, expected_completions=None, expected_exception=None, expected_stdout="", @@ -134,6 +96,8 @@ class PdbClientTestCase(unittest.TestCase): ): if expected_outgoing is None: expected_outgoing = [] + if expected_outgoing_signals is None: + expected_outgoing_signals = [] if expected_completions is None: expected_completions = [] if expected_state is None: @@ -142,16 +106,6 @@ class PdbClientTestCase(unittest.TestCase): expected_state.setdefault("write_failed", False) messages = [m for source, m in incoming if source == "server"] prompts = [m["prompt"] for source, m in incoming if source == "user"] - sockfile = MockDebuggerSocket(messages) - stdout = io.StringIO() - - if simulate_failure: - sockfile.write = unittest.mock.Mock() - sockfile.flush = unittest.mock.Mock() - if simulate_failure == "write": - sockfile.write.side_effect = OSError("write failed") - elif simulate_failure == "flush": - sockfile.flush.side_effect = OSError("flush failed") input_iter = (m for source, m in incoming if source == "user") completions = [] @@ -178,18 +132,60 @@ class PdbClientTestCase(unittest.TestCase): reply = message["input"] if isinstance(reply, BaseException): raise reply - return reply + if isinstance(reply, str): + return reply + return reply() with ExitStack() as stack: + client_sock, server_sock = socket.socketpair() + stack.enter_context(closing(client_sock)) + stack.enter_context(closing(server_sock)) + + server_sock = unittest.mock.Mock(wraps=server_sock) + + client_sock.sendall( + b"".join( + (m if isinstance(m, bytes) else json.dumps(m).encode()) + b"\n" + for m in messages + ) + ) + client_sock.shutdown(socket.SHUT_WR) + + if simulate_send_failure: + server_sock.sendall = unittest.mock.Mock( + side_effect=OSError("sendall failed") + ) + client_sock.shutdown(socket.SHUT_RD) + + stdout = io.StringIO() + + if simulate_sigint_during_stdout_write: + orig_stdout_write = stdout.write + + def sigint_stdout_write(s): + signal.raise_signal(signal.SIGINT) + return orig_stdout_write(s) + + stdout.write = sigint_stdout_write + input_mock = stack.enter_context( unittest.mock.patch("pdb.input", side_effect=mock_input) ) stack.enter_context(redirect_stdout(stdout)) + if use_interrupt_socket: + interrupt_sock = unittest.mock.Mock(spec=socket.socket) + mock_kill = None + else: + interrupt_sock = None + mock_kill = stack.enter_context( + unittest.mock.patch("os.kill", spec=os.kill) + ) + client = _PdbClient( - pid=0, - sockfile=sockfile, - interrupt_script="/a/b.py", + pid=12345, + server_socket=server_sock, + interrupt_sock=interrupt_sock, ) if expected_exception is not None: @@ -199,13 +195,12 @@ class PdbClientTestCase(unittest.TestCase): client.cmdloop() - actual_outgoing = sockfile.outgoing - if simulate_failure: - actual_outgoing += [ - json.loads(msg.args[0]) for msg in sockfile.write.mock_calls - ] + sent_msgs = [msg.args[0] for msg in server_sock.sendall.mock_calls] + for msg in sent_msgs: + assert msg.endswith(b"\n") + actual_outgoing = [json.loads(msg) for msg in sent_msgs] - self.assertEqual(sockfile.outgoing, expected_outgoing) + self.assertEqual(actual_outgoing, expected_outgoing) self.assertEqual(completions, expected_completions) if expected_stdout_substring and not expected_stdout: self.assertIn(expected_stdout_substring, stdout.getvalue()) @@ -215,6 +210,20 @@ class PdbClientTestCase(unittest.TestCase): actual_state = {k: getattr(client, k) for k in expected_state} self.assertEqual(actual_state, expected_state) + if use_interrupt_socket: + outgoing_signals = [ + signal.Signals(int.from_bytes(call.args[0])) + for call in interrupt_sock.sendall.call_args_list + ] + else: + assert mock_kill is not None + outgoing_signals = [] + for call in mock_kill.call_args_list: + pid, signum = call.args + self.assertEqual(pid, 12345) + outgoing_signals.append(signal.Signals(signum)) + self.assertEqual(outgoing_signals, expected_outgoing_signals) + def test_remote_immediately_closing_the_connection(self): """Test the behavior when the remote closes the connection immediately.""" incoming = [] @@ -409,11 +418,38 @@ class PdbClientTestCase(unittest.TestCase): expected_state={"state": "dumb"}, ) - def test_keyboard_interrupt_at_prompt(self): - """Test signaling when a prompt gets a KeyboardInterrupt.""" + def test_sigint_at_prompt(self): + """Test signaling when a prompt gets interrupted.""" incoming = [ ("server", {"prompt": "(Pdb) ", "state": "pdb"}), - ("user", {"prompt": "(Pdb) ", "input": KeyboardInterrupt()}), + ( + "user", + { + "prompt": "(Pdb) ", + "input": lambda: signal.raise_signal(signal.SIGINT), + }, + ), + ] + self.do_test( + incoming=incoming, + expected_outgoing=[ + {"signal": "INT"}, + ], + expected_state={"state": "pdb"}, + ) + + def test_sigint_at_continuation_prompt(self): + """Test signaling when a continuation prompt gets interrupted.""" + incoming = [ + ("server", {"prompt": "(Pdb) ", "state": "pdb"}), + ("user", {"prompt": "(Pdb) ", "input": "if True:"}), + ( + "user", + { + "prompt": "... ", + "input": lambda: signal.raise_signal(signal.SIGINT), + }, + ), ] self.do_test( incoming=incoming, @@ -423,6 +459,22 @@ class PdbClientTestCase(unittest.TestCase): expected_state={"state": "pdb"}, ) + def test_sigint_when_writing(self): + """Test siginaling when sys.stdout.write() gets interrupted.""" + incoming = [ + ("server", {"message": "Some message or other\n", "type": "info"}), + ] + for use_interrupt_socket in [False, True]: + with self.subTest(use_interrupt_socket=use_interrupt_socket): + self.do_test( + incoming=incoming, + simulate_sigint_during_stdout_write=True, + use_interrupt_socket=use_interrupt_socket, + expected_outgoing=[], + expected_outgoing_signals=[signal.SIGINT], + expected_stdout="Some message or other\n", + ) + def test_eof_at_prompt(self): """Test signaling when a prompt gets an EOFError.""" incoming = [ @@ -478,20 +530,7 @@ class PdbClientTestCase(unittest.TestCase): self.do_test( incoming=incoming, expected_outgoing=[{"signal": "INT"}], - simulate_failure="write", - expected_state={"write_failed": True}, - ) - - def test_flush_failing(self): - """Test terminating if flush fails due to a half closed socket.""" - incoming = [ - ("server", {"prompt": "(Pdb) ", "state": "pdb"}), - ("user", {"prompt": "(Pdb) ", "input": KeyboardInterrupt()}), - ] - self.do_test( - incoming=incoming, - expected_outgoing=[{"signal": "INT"}], - simulate_failure="flush", + simulate_send_failure=True, expected_state={"write_failed": True}, ) @@ -531,6 +570,44 @@ class PdbClientTestCase(unittest.TestCase): expected_state={"state": "pdb"}, ) + def test_multiline_completion_in_pdb_state(self): + """Test requesting tab completions at a (Pdb) continuation prompt.""" + # GIVEN + incoming = [ + ("server", {"prompt": "(Pdb) ", "state": "pdb"}), + ("user", {"prompt": "(Pdb) ", "input": "if True:"}), + ( + "user", + { + "prompt": "... ", + "completion_request": { + "line": " b", + "begidx": 4, + "endidx": 5, + }, + "input": " bool()", + }, + ), + ("server", {"completions": ["bin", "bool", "bytes"]}), + ("user", {"prompt": "... ", "input": ""}), + ] + self.do_test( + incoming=incoming, + expected_outgoing=[ + { + "complete": { + "text": "b", + "line": "! b", + "begidx": 2, + "endidx": 3, + } + }, + {"reply": "if True:\n bool()\n"}, + ], + expected_completions=["bin", "bool", "bytes"], + expected_state={"state": "pdb"}, + ) + def test_completion_in_interact_state(self): """Test requesting tab completions at a >>> prompt.""" incoming = [ @@ -622,42 +699,7 @@ class PdbClientTestCase(unittest.TestCase): }, {"reply": "xyz"}, ], - simulate_failure="write", - expected_completions=[], - expected_state={"state": "interact", "write_failed": True}, - ) - - def test_flush_failure_during_completion(self): - """Test failing to flush to the socket to request tab completions.""" - incoming = [ - ("server", {"prompt": ">>> ", "state": "interact"}), - ( - "user", - { - "prompt": ">>> ", - "completion_request": { - "line": "xy", - "begidx": 0, - "endidx": 2, - }, - "input": "xyz", - }, - ), - ] - self.do_test( - incoming=incoming, - expected_outgoing=[ - { - "complete": { - "text": "xy", - "line": "xy", - "begidx": 0, - "endidx": 2, - } - }, - {"reply": "xyz"}, - ], - simulate_failure="flush", + simulate_send_failure=True, expected_completions=[], expected_state={"state": "interact", "write_failed": True}, ) @@ -994,6 +1036,8 @@ class PdbConnectTestCase(unittest.TestCase): frame=frame, commands="", version=pdb._PdbServer.protocol_version(), + signal_raising_thread=False, + colorize=False, ) return x # This line won't be reached in debugging @@ -1051,23 +1095,6 @@ class PdbConnectTestCase(unittest.TestCase): client_file.write(json.dumps({"reply": command}).encode() + b"\n") client_file.flush() - def _send_interrupt(self, pid): - """Helper to send an interrupt signal to the debugger.""" - # with tempfile.NamedTemporaryFile("w", delete_on_close=False) as interrupt_script: - interrupt_script = TESTFN + "_interrupt_script.py" - with open(interrupt_script, 'w') as f: - f.write( - 'import pdb, sys\n' - 'print("Hello, world!")\n' - 'if inst := pdb.Pdb._last_pdb_instance:\n' - ' inst.set_trace(sys._getframe(1))\n' - ) - self.addCleanup(unlink, interrupt_script) - try: - sys.remote_exec(pid, interrupt_script) - except PermissionError: - self.skipTest("Insufficient permissions to execute code in remote process") - def test_connect_and_basic_commands(self): """Test connecting to a remote debugger and sending basic commands.""" self._create_script() @@ -1180,6 +1207,8 @@ class PdbConnectTestCase(unittest.TestCase): frame=frame, commands="", version=pdb._PdbServer.protocol_version(), + signal_raising_thread=True, + colorize=False, ) print("Connected to debugger") iterations = 50 @@ -1195,6 +1224,10 @@ class PdbConnectTestCase(unittest.TestCase): self._create_script(script=script) process, client_file = self._connect_and_get_client_file() + # Accept a 2nd connection from the subprocess to tell it about signals + signal_sock, _ = self.server_sock.accept() + self.addCleanup(signal_sock.close) + with kill_on_error(process): # Skip initial messages until we get to the prompt self._read_until_prompt(client_file) @@ -1210,7 +1243,7 @@ class PdbConnectTestCase(unittest.TestCase): break # Inject a script to interrupt the running process - self._send_interrupt(process.pid) + signal_sock.sendall(signal.SIGINT.to_bytes()) messages = self._read_until_prompt(client_file) # Verify we got the keyboard interrupt message. @@ -1266,6 +1299,8 @@ class PdbConnectTestCase(unittest.TestCase): frame=frame, commands="", version=fake_version, + signal_raising_thread=False, + colorize=False, ) # This should print if the debugger detaches correctly @@ -1393,5 +1428,151 @@ class PdbConnectTestCase(unittest.TestCase): self.assertIn("Function returned: 42", stdout) self.assertEqual(process.returncode, 0) + +def _supports_remote_attaching(): + PROCESS_VM_READV_SUPPORTED = False + + try: + from _remote_debugging import PROCESS_VM_READV_SUPPORTED + except ImportError: + pass + + return PROCESS_VM_READV_SUPPORTED + + +@unittest.skipIf(not sys.is_remote_debug_enabled(), "Remote debugging is not enabled") +@unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux" and sys.platform != "win32", + "Test only runs on Linux, Windows and MacOS") +@unittest.skipIf(sys.platform == "linux" and not _supports_remote_attaching(), + "Testing on Linux requires process_vm_readv support") +@cpython_only +@requires_subprocess() +class PdbAttachTestCase(unittest.TestCase): + def setUp(self): + # Create a server socket that will wait for the debugger to connect + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock.bind(('127.0.0.1', 0)) # Let OS assign port + self.sock.listen(1) + self.port = self.sock.getsockname()[1] + self._create_script() + + def _create_script(self, script=None): + # Create a file for subprocess script + script = textwrap.dedent( + f""" + import socket + import time + + def foo(): + return bar() + + def bar(): + return baz() + + def baz(): + x = 1 + # Trigger attach + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect(('127.0.0.1', {self.port})) + sock.close() + count = 0 + while x == 1 and count < 100: + count += 1 + time.sleep(0.1) + return x + + result = foo() + print(f"Function returned: {{result}}") + """ + ) + + self.script_path = TESTFN + "_connect_test.py" + with open(self.script_path, 'w') as f: + f.write(script) + + def tearDown(self): + self.sock.close() + try: + unlink(self.script_path) + except OSError: + pass + + def do_integration_test(self, client_stdin): + process = subprocess.Popen( + [sys.executable, self.script_path], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True + ) + self.addCleanup(process.stdout.close) + self.addCleanup(process.stderr.close) + + # Wait for the process to reach our attachment point + self.sock.settimeout(10) + conn, _ = self.sock.accept() + conn.close() + + client_stdin = io.StringIO(client_stdin) + client_stdout = io.StringIO() + client_stderr = io.StringIO() + + self.addCleanup(client_stdin.close) + self.addCleanup(client_stdout.close) + self.addCleanup(client_stderr.close) + self.addCleanup(process.wait) + + with ( + unittest.mock.patch("sys.stdin", client_stdin), + redirect_stdout(client_stdout), + redirect_stderr(client_stderr), + unittest.mock.patch("sys.argv", ["pdb", "-p", str(process.pid)]), + ): + try: + pdb.main() + except PermissionError: + self.skipTest("Insufficient permissions for remote execution") + + process.wait() + server_stdout = process.stdout.read() + server_stderr = process.stderr.read() + + if process.returncode != 0: + print("server failed") + print(f"server stdout:\n{server_stdout}") + print(f"server stderr:\n{server_stderr}") + + self.assertEqual(process.returncode, 0) + return { + "client": { + "stdout": client_stdout.getvalue(), + "stderr": client_stderr.getvalue(), + }, + "server": { + "stdout": server_stdout, + "stderr": server_stderr, + }, + } + + def test_attach_to_process_without_colors(self): + with force_color(False): + output = self.do_integration_test("ll\nx=42\n") + self.assertEqual(output["client"]["stderr"], "") + self.assertEqual(output["server"]["stderr"], "") + + self.assertEqual(output["server"]["stdout"], "Function returned: 42\n") + self.assertIn("while x == 1", output["client"]["stdout"]) + self.assertNotIn("\x1b", output["client"]["stdout"]) + + def test_attach_to_process_with_colors(self): + with force_color(True): + output = self.do_integration_test("ll\nx=42\n") + self.assertEqual(output["client"]["stderr"], "") + self.assertEqual(output["server"]["stderr"], "") + + self.assertEqual(output["server"]["stdout"], "Function returned: 42\n") + self.assertIn("\x1b", output["client"]["stdout"]) + self.assertNotIn("while x == 1", output["client"]["stdout"]) + self.assertIn("while x == 1", re.sub("\x1b[^m]*m", "", output["client"]["stdout"])) + if __name__ == "__main__": unittest.main() |