diff options
Diffstat (limited to 'Lib/test/test_remote_pdb.py')
-rw-r--r-- | Lib/test/test_remote_pdb.py | 929 |
1 files changed, 903 insertions, 26 deletions
diff --git a/Lib/test/test_remote_pdb.py b/Lib/test/test_remote_pdb.py index 2c4a17abd82..a1c50af15f3 100644 --- a/Lib/test/test_remote_pdb.py +++ b/Lib/test/test_remote_pdb.py @@ -1,21 +1,19 @@ import io -import time +import itertools import json import os +import re import signal import socket import subprocess import sys -import tempfile import textwrap -import threading import unittest import unittest.mock -from contextlib import contextmanager -from pathlib import Path -from test.support import is_wasi, os_helper, requires_subprocess, SHORT_TIMEOUT -from test.support.os_helper import temp_dir, TESTFN, unlink -from typing import Dict, List, Optional, Tuple, Union, Any +from contextlib import closing, contextmanager, redirect_stdout, redirect_stderr, ExitStack +from test.support import is_wasi, cpython_only, force_color, requires_subprocess, SHORT_TIMEOUT +from test.support.os_helper import TESTFN, unlink +from typing import List import pdb from pdb import _PdbServer, _PdbClient @@ -78,6 +76,746 @@ class MockSocketFile: return results +class PdbClientTestCase(unittest.TestCase): + """Tests for the _PdbClient class.""" + + def do_test( + self, + *, + incoming, + simulate_send_failure=False, + simulate_sigint_during_stdout_write=False, + use_interrupt_socket=False, + expected_outgoing=None, + expected_outgoing_signals=None, + expected_completions=None, + expected_exception=None, + expected_stdout="", + expected_stdout_substring="", + expected_state=None, + ): + if expected_outgoing is None: + expected_outgoing = [] + if expected_outgoing_signals is None: + expected_outgoing_signals = [] + if expected_completions is None: + expected_completions = [] + if expected_state is None: + expected_state = {} + + expected_state.setdefault("write_failed", False) + messages = [m for source, m in incoming if source == "server"] + prompts = [m["prompt"] for source, m in incoming if source == "user"] + + input_iter = (m for source, m in incoming if source == "user") + completions = [] + + def mock_input(prompt): + message = next(input_iter, None) + if message is None: + raise EOFError + + if req := message.get("completion_request"): + readline_mock = unittest.mock.Mock() + readline_mock.get_line_buffer.return_value = req["line"] + readline_mock.get_begidx.return_value = req["begidx"] + readline_mock.get_endidx.return_value = req["endidx"] + unittest.mock.seal(readline_mock) + with unittest.mock.patch.dict(sys.modules, {"readline": readline_mock}): + for param in itertools.count(): + prefix = req["line"][req["begidx"] : req["endidx"]] + completion = client.complete(prefix, param) + if completion is None: + break + completions.append(completion) + + reply = message["input"] + if isinstance(reply, BaseException): + raise reply + if isinstance(reply, str): + return reply + return reply() + + with ExitStack() as stack: + client_sock, server_sock = socket.socketpair() + stack.enter_context(closing(client_sock)) + stack.enter_context(closing(server_sock)) + + server_sock = unittest.mock.Mock(wraps=server_sock) + + client_sock.sendall( + b"".join( + (m if isinstance(m, bytes) else json.dumps(m).encode()) + b"\n" + for m in messages + ) + ) + client_sock.shutdown(socket.SHUT_WR) + + if simulate_send_failure: + server_sock.sendall = unittest.mock.Mock( + side_effect=OSError("sendall failed") + ) + client_sock.shutdown(socket.SHUT_RD) + + stdout = io.StringIO() + + if simulate_sigint_during_stdout_write: + orig_stdout_write = stdout.write + + def sigint_stdout_write(s): + signal.raise_signal(signal.SIGINT) + return orig_stdout_write(s) + + stdout.write = sigint_stdout_write + + input_mock = stack.enter_context( + unittest.mock.patch("pdb.input", side_effect=mock_input) + ) + stack.enter_context(redirect_stdout(stdout)) + + if use_interrupt_socket: + interrupt_sock = unittest.mock.Mock(spec=socket.socket) + mock_kill = None + else: + interrupt_sock = None + mock_kill = stack.enter_context( + unittest.mock.patch("os.kill", spec=os.kill) + ) + + client = _PdbClient( + pid=12345, + server_socket=server_sock, + interrupt_sock=interrupt_sock, + ) + + if expected_exception is not None: + exception = expected_exception["exception"] + msg = expected_exception["msg"] + stack.enter_context(self.assertRaises(exception, msg=msg)) + + client.cmdloop() + + sent_msgs = [msg.args[0] for msg in server_sock.sendall.mock_calls] + for msg in sent_msgs: + assert msg.endswith(b"\n") + actual_outgoing = [json.loads(msg) for msg in sent_msgs] + + self.assertEqual(actual_outgoing, expected_outgoing) + self.assertEqual(completions, expected_completions) + if expected_stdout_substring and not expected_stdout: + self.assertIn(expected_stdout_substring, stdout.getvalue()) + else: + self.assertEqual(stdout.getvalue(), expected_stdout) + input_mock.assert_has_calls([unittest.mock.call(p) for p in prompts]) + actual_state = {k: getattr(client, k) for k in expected_state} + self.assertEqual(actual_state, expected_state) + + if use_interrupt_socket: + outgoing_signals = [ + signal.Signals(int.from_bytes(call.args[0])) + for call in interrupt_sock.sendall.call_args_list + ] + else: + assert mock_kill is not None + outgoing_signals = [] + for call in mock_kill.call_args_list: + pid, signum = call.args + self.assertEqual(pid, 12345) + outgoing_signals.append(signal.Signals(signum)) + self.assertEqual(outgoing_signals, expected_outgoing_signals) + + def test_remote_immediately_closing_the_connection(self): + """Test the behavior when the remote closes the connection immediately.""" + incoming = [] + expected_outgoing = [] + self.do_test( + incoming=incoming, + expected_outgoing=expected_outgoing, + ) + + def test_handling_command_list(self): + """Test handling the command_list message.""" + incoming = [ + ("server", {"command_list": ["help", "list", "continue"]}), + ] + self.do_test( + incoming=incoming, + expected_outgoing=[], + expected_state={ + "pdb_commands": {"help", "list", "continue"}, + }, + ) + + def test_handling_info_message(self): + """Test handling a message payload with type='info'.""" + incoming = [ + ("server", {"message": "Some message or other\n", "type": "info"}), + ] + self.do_test( + incoming=incoming, + expected_outgoing=[], + expected_stdout="Some message or other\n", + ) + + def test_handling_error_message(self): + """Test handling a message payload with type='error'.""" + incoming = [ + ("server", {"message": "Some message or other.", "type": "error"}), + ] + self.do_test( + incoming=incoming, + expected_outgoing=[], + expected_stdout="*** Some message or other.\n", + ) + + def test_handling_other_message(self): + """Test handling a message payload with an unrecognized type.""" + incoming = [ + ("server", {"message": "Some message.\n", "type": "unknown"}), + ] + self.do_test( + incoming=incoming, + expected_outgoing=[], + expected_stdout="Some message.\n", + ) + + def test_handling_help_for_command(self): + """Test handling a request to display help for a command.""" + incoming = [ + ("server", {"help": "ll"}), + ] + self.do_test( + incoming=incoming, + expected_outgoing=[], + expected_stdout_substring="Usage: ll | longlist", + ) + + def test_handling_help_without_a_specific_topic(self): + """Test handling a request to display a help overview.""" + incoming = [ + ("server", {"help": ""}), + ] + self.do_test( + incoming=incoming, + expected_outgoing=[], + expected_stdout_substring="type help <topic>", + ) + + def test_handling_help_pdb(self): + """Test handling a request to display the full PDB manual.""" + incoming = [ + ("server", {"help": "pdb"}), + ] + self.do_test( + incoming=incoming, + expected_outgoing=[], + expected_stdout_substring=">>> import pdb", + ) + + def test_handling_pdb_prompts(self): + """Test responding to pdb's normal prompts.""" + incoming = [ + ("server", {"command_list": ["b"]}), + ("server", {"prompt": "(Pdb) ", "state": "pdb"}), + ("user", {"prompt": "(Pdb) ", "input": "lst ["}), + ("user", {"prompt": "... ", "input": "0 ]"}), + ("server", {"prompt": "(Pdb) ", "state": "pdb"}), + ("user", {"prompt": "(Pdb) ", "input": ""}), + ("server", {"prompt": "(Pdb) ", "state": "pdb"}), + ("user", {"prompt": "(Pdb) ", "input": "b ["}), + ("server", {"prompt": "(Pdb) ", "state": "pdb"}), + ("user", {"prompt": "(Pdb) ", "input": "! b ["}), + ("user", {"prompt": "... ", "input": "b ]"}), + ] + self.do_test( + incoming=incoming, + expected_outgoing=[ + {"reply": "lst [\n0 ]"}, + {"reply": ""}, + {"reply": "b ["}, + {"reply": "!b [\nb ]"}, + ], + expected_state={"state": "pdb"}, + ) + + def test_handling_interact_prompts(self): + """Test responding to pdb's interact mode prompts.""" + incoming = [ + ("server", {"command_list": ["b"]}), + ("server", {"prompt": ">>> ", "state": "interact"}), + ("user", {"prompt": ">>> ", "input": "lst ["}), + ("user", {"prompt": "... ", "input": "0 ]"}), + ("server", {"prompt": ">>> ", "state": "interact"}), + ("user", {"prompt": ">>> ", "input": ""}), + ("server", {"prompt": ">>> ", "state": "interact"}), + ("user", {"prompt": ">>> ", "input": "b ["}), + ("user", {"prompt": "... ", "input": "b ]"}), + ] + self.do_test( + incoming=incoming, + expected_outgoing=[ + {"reply": "lst [\n0 ]"}, + {"reply": ""}, + {"reply": "b [\nb ]"}, + ], + expected_state={"state": "interact"}, + ) + + def test_retry_pdb_prompt_on_syntax_error(self): + """Test re-prompting after a SyntaxError in a Python expression.""" + incoming = [ + ("server", {"prompt": "(Pdb) ", "state": "pdb"}), + ("user", {"prompt": "(Pdb) ", "input": " lst ["}), + ("user", {"prompt": "(Pdb) ", "input": "lst ["}), + ("user", {"prompt": "... ", "input": " 0 ]"}), + ] + self.do_test( + incoming=incoming, + expected_outgoing=[ + {"reply": "lst [\n 0 ]"}, + ], + expected_stdout_substring="*** IndentationError", + expected_state={"state": "pdb"}, + ) + + def test_retry_interact_prompt_on_syntax_error(self): + """Test re-prompting after a SyntaxError in a Python expression.""" + incoming = [ + ("server", {"prompt": ">>> ", "state": "interact"}), + ("user", {"prompt": ">>> ", "input": "!lst ["}), + ("user", {"prompt": ">>> ", "input": "lst ["}), + ("user", {"prompt": "... ", "input": " 0 ]"}), + ] + self.do_test( + incoming=incoming, + expected_outgoing=[ + {"reply": "lst [\n 0 ]"}, + ], + expected_stdout_substring="*** SyntaxError", + expected_state={"state": "interact"}, + ) + + def test_handling_unrecognized_prompt_type(self): + """Test fallback to "dumb" single-line mode for unknown states.""" + incoming = [ + ("server", {"prompt": "Do it? ", "state": "confirm"}), + ("user", {"prompt": "Do it? ", "input": "! ["}), + ("server", {"prompt": "Do it? ", "state": "confirm"}), + ("user", {"prompt": "Do it? ", "input": "echo hello"}), + ("server", {"prompt": "Do it? ", "state": "confirm"}), + ("user", {"prompt": "Do it? ", "input": ""}), + ("server", {"prompt": "Do it? ", "state": "confirm"}), + ("user", {"prompt": "Do it? ", "input": "echo goodbye"}), + ] + self.do_test( + incoming=incoming, + expected_outgoing=[ + {"reply": "! ["}, + {"reply": "echo hello"}, + {"reply": ""}, + {"reply": "echo goodbye"}, + ], + expected_state={"state": "dumb"}, + ) + + def test_sigint_at_prompt(self): + """Test signaling when a prompt gets interrupted.""" + incoming = [ + ("server", {"prompt": "(Pdb) ", "state": "pdb"}), + ( + "user", + { + "prompt": "(Pdb) ", + "input": lambda: signal.raise_signal(signal.SIGINT), + }, + ), + ] + self.do_test( + incoming=incoming, + expected_outgoing=[ + {"signal": "INT"}, + ], + expected_state={"state": "pdb"}, + ) + + def test_sigint_at_continuation_prompt(self): + """Test signaling when a continuation prompt gets interrupted.""" + incoming = [ + ("server", {"prompt": "(Pdb) ", "state": "pdb"}), + ("user", {"prompt": "(Pdb) ", "input": "if True:"}), + ( + "user", + { + "prompt": "... ", + "input": lambda: signal.raise_signal(signal.SIGINT), + }, + ), + ] + self.do_test( + incoming=incoming, + expected_outgoing=[ + {"signal": "INT"}, + ], + expected_state={"state": "pdb"}, + ) + + def test_sigint_when_writing(self): + """Test siginaling when sys.stdout.write() gets interrupted.""" + incoming = [ + ("server", {"message": "Some message or other\n", "type": "info"}), + ] + for use_interrupt_socket in [False, True]: + with self.subTest(use_interrupt_socket=use_interrupt_socket): + self.do_test( + incoming=incoming, + simulate_sigint_during_stdout_write=True, + use_interrupt_socket=use_interrupt_socket, + expected_outgoing=[], + expected_outgoing_signals=[signal.SIGINT], + expected_stdout="Some message or other\n", + ) + + def test_eof_at_prompt(self): + """Test signaling when a prompt gets an EOFError.""" + incoming = [ + ("server", {"prompt": "(Pdb) ", "state": "pdb"}), + ("user", {"prompt": "(Pdb) ", "input": EOFError()}), + ] + self.do_test( + incoming=incoming, + expected_outgoing=[ + {"signal": "EOF"}, + ], + expected_state={"state": "pdb"}, + ) + + def test_unrecognized_json_message(self): + """Test failing after getting an unrecognized payload.""" + incoming = [ + ("server", {"monty": "python"}), + ("server", {"message": "Some message or other\n", "type": "info"}), + ] + self.do_test( + incoming=incoming, + expected_outgoing=[], + expected_exception={ + "exception": RuntimeError, + "msg": 'Unrecognized payload b\'{"monty": "python"}\'', + }, + ) + + def test_continuing_after_getting_a_non_json_payload(self): + """Test continuing after getting a non JSON payload.""" + incoming = [ + ("server", b"spam"), + ("server", {"message": "Something", "type": "info"}), + ] + self.do_test( + incoming=incoming, + expected_outgoing=[], + expected_stdout="\n".join( + [ + "*** Invalid JSON from remote: b'spam\\n'", + "Something", + ] + ), + ) + + def test_write_failing(self): + """Test terminating if write fails due to a half closed socket.""" + incoming = [ + ("server", {"prompt": "(Pdb) ", "state": "pdb"}), + ("user", {"prompt": "(Pdb) ", "input": KeyboardInterrupt()}), + ] + self.do_test( + incoming=incoming, + expected_outgoing=[{"signal": "INT"}], + simulate_send_failure=True, + expected_state={"write_failed": True}, + ) + + def test_completion_in_pdb_state(self): + """Test requesting tab completions at a (Pdb) prompt.""" + # GIVEN + incoming = [ + ("server", {"prompt": "(Pdb) ", "state": "pdb"}), + ( + "user", + { + "prompt": "(Pdb) ", + "completion_request": { + "line": " mod._", + "begidx": 8, + "endidx": 9, + }, + "input": "print(\n mod.__name__)", + }, + ), + ("server", {"completions": ["__name__", "__file__"]}), + ] + self.do_test( + incoming=incoming, + expected_outgoing=[ + { + "complete": { + "text": "_", + "line": "mod._", + "begidx": 4, + "endidx": 5, + } + }, + {"reply": "print(\n mod.__name__)"}, + ], + expected_completions=["__name__", "__file__"], + expected_state={"state": "pdb"}, + ) + + def test_multiline_completion_in_pdb_state(self): + """Test requesting tab completions at a (Pdb) continuation prompt.""" + # GIVEN + incoming = [ + ("server", {"prompt": "(Pdb) ", "state": "pdb"}), + ("user", {"prompt": "(Pdb) ", "input": "if True:"}), + ( + "user", + { + "prompt": "... ", + "completion_request": { + "line": " b", + "begidx": 4, + "endidx": 5, + }, + "input": " bool()", + }, + ), + ("server", {"completions": ["bin", "bool", "bytes"]}), + ("user", {"prompt": "... ", "input": ""}), + ] + self.do_test( + incoming=incoming, + expected_outgoing=[ + { + "complete": { + "text": "b", + "line": "! b", + "begidx": 2, + "endidx": 3, + } + }, + {"reply": "if True:\n bool()\n"}, + ], + expected_completions=["bin", "bool", "bytes"], + expected_state={"state": "pdb"}, + ) + + def test_completion_in_interact_state(self): + """Test requesting tab completions at a >>> prompt.""" + incoming = [ + ("server", {"prompt": ">>> ", "state": "interact"}), + ( + "user", + { + "prompt": ">>> ", + "completion_request": { + "line": " mod.__", + "begidx": 8, + "endidx": 10, + }, + "input": "print(\n mod.__name__)", + }, + ), + ("server", {"completions": ["__name__", "__file__"]}), + ] + self.do_test( + incoming=incoming, + expected_outgoing=[ + { + "complete": { + "text": "__", + "line": "mod.__", + "begidx": 4, + "endidx": 6, + } + }, + {"reply": "print(\n mod.__name__)"}, + ], + expected_completions=["__name__", "__file__"], + expected_state={"state": "interact"}, + ) + + def test_completion_in_unknown_state(self): + """Test requesting tab completions at an unrecognized prompt.""" + incoming = [ + ("server", {"command_list": ["p"]}), + ("server", {"prompt": "Do it? ", "state": "confirm"}), + ( + "user", + { + "prompt": "Do it? ", + "completion_request": { + "line": "_", + "begidx": 0, + "endidx": 1, + }, + "input": "__name__", + }, + ), + ] + self.do_test( + incoming=incoming, + expected_outgoing=[ + {"reply": "__name__"}, + ], + expected_state={"state": "dumb"}, + ) + + def test_write_failure_during_completion(self): + """Test failing to write to the socket to request tab completions.""" + incoming = [ + ("server", {"prompt": ">>> ", "state": "interact"}), + ( + "user", + { + "prompt": ">>> ", + "completion_request": { + "line": "xy", + "begidx": 0, + "endidx": 2, + }, + "input": "xyz", + }, + ), + ] + self.do_test( + incoming=incoming, + expected_outgoing=[ + { + "complete": { + "text": "xy", + "line": "xy", + "begidx": 0, + "endidx": 2, + } + }, + {"reply": "xyz"}, + ], + simulate_send_failure=True, + expected_completions=[], + expected_state={"state": "interact", "write_failed": True}, + ) + + def test_read_failure_during_completion(self): + """Test failing to read tab completions from the socket.""" + incoming = [ + ("server", {"prompt": ">>> ", "state": "interact"}), + ( + "user", + { + "prompt": ">>> ", + "completion_request": { + "line": "xy", + "begidx": 0, + "endidx": 2, + }, + "input": "xyz", + }, + ), + ] + self.do_test( + incoming=incoming, + expected_outgoing=[ + { + "complete": { + "text": "xy", + "line": "xy", + "begidx": 0, + "endidx": 2, + } + }, + {"reply": "xyz"}, + ], + expected_completions=[], + expected_state={"state": "interact"}, + ) + + def test_reading_invalid_json_during_completion(self): + """Test receiving invalid JSON when getting tab completions.""" + incoming = [ + ("server", {"prompt": ">>> ", "state": "interact"}), + ( + "user", + { + "prompt": ">>> ", + "completion_request": { + "line": "xy", + "begidx": 0, + "endidx": 2, + }, + "input": "xyz", + }, + ), + ("server", b'{"completions": '), + ("user", {"prompt": ">>> ", "input": "xyz"}), + ] + self.do_test( + incoming=incoming, + expected_outgoing=[ + { + "complete": { + "text": "xy", + "line": "xy", + "begidx": 0, + "endidx": 2, + } + }, + {"reply": "xyz"}, + ], + expected_stdout_substring="*** json.decoder.JSONDecodeError", + expected_completions=[], + expected_state={"state": "interact"}, + ) + + def test_reading_empty_json_during_completion(self): + """Test receiving an empty JSON object when getting tab completions.""" + incoming = [ + ("server", {"prompt": ">>> ", "state": "interact"}), + ( + "user", + { + "prompt": ">>> ", + "completion_request": { + "line": "xy", + "begidx": 0, + "endidx": 2, + }, + "input": "xyz", + }, + ), + ("server", {}), + ("user", {"prompt": ">>> ", "input": "xyz"}), + ] + self.do_test( + incoming=incoming, + expected_outgoing=[ + { + "complete": { + "text": "xy", + "line": "xy", + "begidx": 0, + "endidx": 2, + } + }, + {"reply": "xyz"}, + ], + expected_stdout=( + "*** RuntimeError: Failed to get valid completions." + " Got: {}\n" + ), + expected_completions=[], + expected_state={"state": "interact"}, + ) + + class RemotePdbTestCase(unittest.TestCase): """Tests for the _PdbServer class.""" @@ -298,6 +1036,8 @@ class PdbConnectTestCase(unittest.TestCase): frame=frame, commands="", version=pdb._PdbServer.protocol_version(), + signal_raising_thread=False, + colorize=False, ) return x # This line won't be reached in debugging @@ -355,23 +1095,6 @@ class PdbConnectTestCase(unittest.TestCase): client_file.write(json.dumps({"reply": command}).encode() + b"\n") client_file.flush() - def _send_interrupt(self, pid): - """Helper to send an interrupt signal to the debugger.""" - # with tempfile.NamedTemporaryFile("w", delete_on_close=False) as interrupt_script: - interrupt_script = TESTFN + "_interrupt_script.py" - with open(interrupt_script, 'w') as f: - f.write( - 'import pdb, sys\n' - 'print("Hello, world!")\n' - 'if inst := pdb.Pdb._last_pdb_instance:\n' - ' inst.set_trace(sys._getframe(1))\n' - ) - self.addCleanup(unlink, interrupt_script) - try: - sys.remote_exec(pid, interrupt_script) - except PermissionError: - self.skipTest("Insufficient permissions to execute code in remote process") - def test_connect_and_basic_commands(self): """Test connecting to a remote debugger and sending basic commands.""" self._create_script() @@ -484,6 +1207,8 @@ class PdbConnectTestCase(unittest.TestCase): frame=frame, commands="", version=pdb._PdbServer.protocol_version(), + signal_raising_thread=True, + colorize=False, ) print("Connected to debugger") iterations = 50 @@ -499,6 +1224,10 @@ class PdbConnectTestCase(unittest.TestCase): self._create_script(script=script) process, client_file = self._connect_and_get_client_file() + # Accept a 2nd connection from the subprocess to tell it about signals + signal_sock, _ = self.server_sock.accept() + self.addCleanup(signal_sock.close) + with kill_on_error(process): # Skip initial messages until we get to the prompt self._read_until_prompt(client_file) @@ -514,7 +1243,7 @@ class PdbConnectTestCase(unittest.TestCase): break # Inject a script to interrupt the running process - self._send_interrupt(process.pid) + signal_sock.sendall(signal.SIGINT.to_bytes()) messages = self._read_until_prompt(client_file) # Verify we got the keyboard interrupt message. @@ -570,6 +1299,8 @@ class PdbConnectTestCase(unittest.TestCase): frame=frame, commands="", version=fake_version, + signal_raising_thread=False, + colorize=False, ) # This should print if the debugger detaches correctly @@ -697,5 +1428,151 @@ class PdbConnectTestCase(unittest.TestCase): self.assertIn("Function returned: 42", stdout) self.assertEqual(process.returncode, 0) + +def _supports_remote_attaching(): + PROCESS_VM_READV_SUPPORTED = False + + try: + from _remote_debugging import PROCESS_VM_READV_SUPPORTED + except ImportError: + pass + + return PROCESS_VM_READV_SUPPORTED + + +@unittest.skipIf(not sys.is_remote_debug_enabled(), "Remote debugging is not enabled") +@unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux" and sys.platform != "win32", + "Test only runs on Linux, Windows and MacOS") +@unittest.skipIf(sys.platform == "linux" and not _supports_remote_attaching(), + "Testing on Linux requires process_vm_readv support") +@cpython_only +@requires_subprocess() +class PdbAttachTestCase(unittest.TestCase): + def setUp(self): + # Create a server socket that will wait for the debugger to connect + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock.bind(('127.0.0.1', 0)) # Let OS assign port + self.sock.listen(1) + self.port = self.sock.getsockname()[1] + self._create_script() + + def _create_script(self, script=None): + # Create a file for subprocess script + script = textwrap.dedent( + f""" + import socket + import time + + def foo(): + return bar() + + def bar(): + return baz() + + def baz(): + x = 1 + # Trigger attach + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect(('127.0.0.1', {self.port})) + sock.close() + count = 0 + while x == 1 and count < 100: + count += 1 + time.sleep(0.1) + return x + + result = foo() + print(f"Function returned: {{result}}") + """ + ) + + self.script_path = TESTFN + "_connect_test.py" + with open(self.script_path, 'w') as f: + f.write(script) + + def tearDown(self): + self.sock.close() + try: + unlink(self.script_path) + except OSError: + pass + + def do_integration_test(self, client_stdin): + process = subprocess.Popen( + [sys.executable, self.script_path], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True + ) + self.addCleanup(process.stdout.close) + self.addCleanup(process.stderr.close) + + # Wait for the process to reach our attachment point + self.sock.settimeout(10) + conn, _ = self.sock.accept() + conn.close() + + client_stdin = io.StringIO(client_stdin) + client_stdout = io.StringIO() + client_stderr = io.StringIO() + + self.addCleanup(client_stdin.close) + self.addCleanup(client_stdout.close) + self.addCleanup(client_stderr.close) + self.addCleanup(process.wait) + + with ( + unittest.mock.patch("sys.stdin", client_stdin), + redirect_stdout(client_stdout), + redirect_stderr(client_stderr), + unittest.mock.patch("sys.argv", ["pdb", "-p", str(process.pid)]), + ): + try: + pdb.main() + except PermissionError: + self.skipTest("Insufficient permissions for remote execution") + + process.wait() + server_stdout = process.stdout.read() + server_stderr = process.stderr.read() + + if process.returncode != 0: + print("server failed") + print(f"server stdout:\n{server_stdout}") + print(f"server stderr:\n{server_stderr}") + + self.assertEqual(process.returncode, 0) + return { + "client": { + "stdout": client_stdout.getvalue(), + "stderr": client_stderr.getvalue(), + }, + "server": { + "stdout": server_stdout, + "stderr": server_stderr, + }, + } + + def test_attach_to_process_without_colors(self): + with force_color(False): + output = self.do_integration_test("ll\nx=42\n") + self.assertEqual(output["client"]["stderr"], "") + self.assertEqual(output["server"]["stderr"], "") + + self.assertEqual(output["server"]["stdout"], "Function returned: 42\n") + self.assertIn("while x == 1", output["client"]["stdout"]) + self.assertNotIn("\x1b", output["client"]["stdout"]) + + def test_attach_to_process_with_colors(self): + with force_color(True): + output = self.do_integration_test("ll\nx=42\n") + self.assertEqual(output["client"]["stderr"], "") + self.assertEqual(output["server"]["stderr"], "") + + self.assertEqual(output["server"]["stdout"], "Function returned: 42\n") + self.assertIn("\x1b", output["client"]["stdout"]) + self.assertNotIn("while x == 1", output["client"]["stdout"]) + self.assertIn("while x == 1", re.sub("\x1b[^m]*m", "", output["client"]["stdout"])) + if __name__ == "__main__": unittest.main() |