aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/test/test_remote_pdb.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_remote_pdb.py')
-rw-r--r--Lib/test/test_remote_pdb.py929
1 files changed, 903 insertions, 26 deletions
diff --git a/Lib/test/test_remote_pdb.py b/Lib/test/test_remote_pdb.py
index 2c4a17abd82..a1c50af15f3 100644
--- a/Lib/test/test_remote_pdb.py
+++ b/Lib/test/test_remote_pdb.py
@@ -1,21 +1,19 @@
import io
-import time
+import itertools
import json
import os
+import re
import signal
import socket
import subprocess
import sys
-import tempfile
import textwrap
-import threading
import unittest
import unittest.mock
-from contextlib import contextmanager
-from pathlib import Path
-from test.support import is_wasi, os_helper, requires_subprocess, SHORT_TIMEOUT
-from test.support.os_helper import temp_dir, TESTFN, unlink
-from typing import Dict, List, Optional, Tuple, Union, Any
+from contextlib import closing, contextmanager, redirect_stdout, redirect_stderr, ExitStack
+from test.support import is_wasi, cpython_only, force_color, requires_subprocess, SHORT_TIMEOUT
+from test.support.os_helper import TESTFN, unlink
+from typing import List
import pdb
from pdb import _PdbServer, _PdbClient
@@ -78,6 +76,746 @@ class MockSocketFile:
return results
+class PdbClientTestCase(unittest.TestCase):
+ """Tests for the _PdbClient class."""
+
+ def do_test(
+ self,
+ *,
+ incoming,
+ simulate_send_failure=False,
+ simulate_sigint_during_stdout_write=False,
+ use_interrupt_socket=False,
+ expected_outgoing=None,
+ expected_outgoing_signals=None,
+ expected_completions=None,
+ expected_exception=None,
+ expected_stdout="",
+ expected_stdout_substring="",
+ expected_state=None,
+ ):
+ if expected_outgoing is None:
+ expected_outgoing = []
+ if expected_outgoing_signals is None:
+ expected_outgoing_signals = []
+ if expected_completions is None:
+ expected_completions = []
+ if expected_state is None:
+ expected_state = {}
+
+ expected_state.setdefault("write_failed", False)
+ messages = [m for source, m in incoming if source == "server"]
+ prompts = [m["prompt"] for source, m in incoming if source == "user"]
+
+ input_iter = (m for source, m in incoming if source == "user")
+ completions = []
+
+ def mock_input(prompt):
+ message = next(input_iter, None)
+ if message is None:
+ raise EOFError
+
+ if req := message.get("completion_request"):
+ readline_mock = unittest.mock.Mock()
+ readline_mock.get_line_buffer.return_value = req["line"]
+ readline_mock.get_begidx.return_value = req["begidx"]
+ readline_mock.get_endidx.return_value = req["endidx"]
+ unittest.mock.seal(readline_mock)
+ with unittest.mock.patch.dict(sys.modules, {"readline": readline_mock}):
+ for param in itertools.count():
+ prefix = req["line"][req["begidx"] : req["endidx"]]
+ completion = client.complete(prefix, param)
+ if completion is None:
+ break
+ completions.append(completion)
+
+ reply = message["input"]
+ if isinstance(reply, BaseException):
+ raise reply
+ if isinstance(reply, str):
+ return reply
+ return reply()
+
+ with ExitStack() as stack:
+ client_sock, server_sock = socket.socketpair()
+ stack.enter_context(closing(client_sock))
+ stack.enter_context(closing(server_sock))
+
+ server_sock = unittest.mock.Mock(wraps=server_sock)
+
+ client_sock.sendall(
+ b"".join(
+ (m if isinstance(m, bytes) else json.dumps(m).encode()) + b"\n"
+ for m in messages
+ )
+ )
+ client_sock.shutdown(socket.SHUT_WR)
+
+ if simulate_send_failure:
+ server_sock.sendall = unittest.mock.Mock(
+ side_effect=OSError("sendall failed")
+ )
+ client_sock.shutdown(socket.SHUT_RD)
+
+ stdout = io.StringIO()
+
+ if simulate_sigint_during_stdout_write:
+ orig_stdout_write = stdout.write
+
+ def sigint_stdout_write(s):
+ signal.raise_signal(signal.SIGINT)
+ return orig_stdout_write(s)
+
+ stdout.write = sigint_stdout_write
+
+ input_mock = stack.enter_context(
+ unittest.mock.patch("pdb.input", side_effect=mock_input)
+ )
+ stack.enter_context(redirect_stdout(stdout))
+
+ if use_interrupt_socket:
+ interrupt_sock = unittest.mock.Mock(spec=socket.socket)
+ mock_kill = None
+ else:
+ interrupt_sock = None
+ mock_kill = stack.enter_context(
+ unittest.mock.patch("os.kill", spec=os.kill)
+ )
+
+ client = _PdbClient(
+ pid=12345,
+ server_socket=server_sock,
+ interrupt_sock=interrupt_sock,
+ )
+
+ if expected_exception is not None:
+ exception = expected_exception["exception"]
+ msg = expected_exception["msg"]
+ stack.enter_context(self.assertRaises(exception, msg=msg))
+
+ client.cmdloop()
+
+ sent_msgs = [msg.args[0] for msg in server_sock.sendall.mock_calls]
+ for msg in sent_msgs:
+ assert msg.endswith(b"\n")
+ actual_outgoing = [json.loads(msg) for msg in sent_msgs]
+
+ self.assertEqual(actual_outgoing, expected_outgoing)
+ self.assertEqual(completions, expected_completions)
+ if expected_stdout_substring and not expected_stdout:
+ self.assertIn(expected_stdout_substring, stdout.getvalue())
+ else:
+ self.assertEqual(stdout.getvalue(), expected_stdout)
+ input_mock.assert_has_calls([unittest.mock.call(p) for p in prompts])
+ actual_state = {k: getattr(client, k) for k in expected_state}
+ self.assertEqual(actual_state, expected_state)
+
+ if use_interrupt_socket:
+ outgoing_signals = [
+ signal.Signals(int.from_bytes(call.args[0]))
+ for call in interrupt_sock.sendall.call_args_list
+ ]
+ else:
+ assert mock_kill is not None
+ outgoing_signals = []
+ for call in mock_kill.call_args_list:
+ pid, signum = call.args
+ self.assertEqual(pid, 12345)
+ outgoing_signals.append(signal.Signals(signum))
+ self.assertEqual(outgoing_signals, expected_outgoing_signals)
+
+ def test_remote_immediately_closing_the_connection(self):
+ """Test the behavior when the remote closes the connection immediately."""
+ incoming = []
+ expected_outgoing = []
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=expected_outgoing,
+ )
+
+ def test_handling_command_list(self):
+ """Test handling the command_list message."""
+ incoming = [
+ ("server", {"command_list": ["help", "list", "continue"]}),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[],
+ expected_state={
+ "pdb_commands": {"help", "list", "continue"},
+ },
+ )
+
+ def test_handling_info_message(self):
+ """Test handling a message payload with type='info'."""
+ incoming = [
+ ("server", {"message": "Some message or other\n", "type": "info"}),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[],
+ expected_stdout="Some message or other\n",
+ )
+
+ def test_handling_error_message(self):
+ """Test handling a message payload with type='error'."""
+ incoming = [
+ ("server", {"message": "Some message or other.", "type": "error"}),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[],
+ expected_stdout="*** Some message or other.\n",
+ )
+
+ def test_handling_other_message(self):
+ """Test handling a message payload with an unrecognized type."""
+ incoming = [
+ ("server", {"message": "Some message.\n", "type": "unknown"}),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[],
+ expected_stdout="Some message.\n",
+ )
+
+ def test_handling_help_for_command(self):
+ """Test handling a request to display help for a command."""
+ incoming = [
+ ("server", {"help": "ll"}),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[],
+ expected_stdout_substring="Usage: ll | longlist",
+ )
+
+ def test_handling_help_without_a_specific_topic(self):
+ """Test handling a request to display a help overview."""
+ incoming = [
+ ("server", {"help": ""}),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[],
+ expected_stdout_substring="type help <topic>",
+ )
+
+ def test_handling_help_pdb(self):
+ """Test handling a request to display the full PDB manual."""
+ incoming = [
+ ("server", {"help": "pdb"}),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[],
+ expected_stdout_substring=">>> import pdb",
+ )
+
+ def test_handling_pdb_prompts(self):
+ """Test responding to pdb's normal prompts."""
+ incoming = [
+ ("server", {"command_list": ["b"]}),
+ ("server", {"prompt": "(Pdb) ", "state": "pdb"}),
+ ("user", {"prompt": "(Pdb) ", "input": "lst ["}),
+ ("user", {"prompt": "... ", "input": "0 ]"}),
+ ("server", {"prompt": "(Pdb) ", "state": "pdb"}),
+ ("user", {"prompt": "(Pdb) ", "input": ""}),
+ ("server", {"prompt": "(Pdb) ", "state": "pdb"}),
+ ("user", {"prompt": "(Pdb) ", "input": "b ["}),
+ ("server", {"prompt": "(Pdb) ", "state": "pdb"}),
+ ("user", {"prompt": "(Pdb) ", "input": "! b ["}),
+ ("user", {"prompt": "... ", "input": "b ]"}),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[
+ {"reply": "lst [\n0 ]"},
+ {"reply": ""},
+ {"reply": "b ["},
+ {"reply": "!b [\nb ]"},
+ ],
+ expected_state={"state": "pdb"},
+ )
+
+ def test_handling_interact_prompts(self):
+ """Test responding to pdb's interact mode prompts."""
+ incoming = [
+ ("server", {"command_list": ["b"]}),
+ ("server", {"prompt": ">>> ", "state": "interact"}),
+ ("user", {"prompt": ">>> ", "input": "lst ["}),
+ ("user", {"prompt": "... ", "input": "0 ]"}),
+ ("server", {"prompt": ">>> ", "state": "interact"}),
+ ("user", {"prompt": ">>> ", "input": ""}),
+ ("server", {"prompt": ">>> ", "state": "interact"}),
+ ("user", {"prompt": ">>> ", "input": "b ["}),
+ ("user", {"prompt": "... ", "input": "b ]"}),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[
+ {"reply": "lst [\n0 ]"},
+ {"reply": ""},
+ {"reply": "b [\nb ]"},
+ ],
+ expected_state={"state": "interact"},
+ )
+
+ def test_retry_pdb_prompt_on_syntax_error(self):
+ """Test re-prompting after a SyntaxError in a Python expression."""
+ incoming = [
+ ("server", {"prompt": "(Pdb) ", "state": "pdb"}),
+ ("user", {"prompt": "(Pdb) ", "input": " lst ["}),
+ ("user", {"prompt": "(Pdb) ", "input": "lst ["}),
+ ("user", {"prompt": "... ", "input": " 0 ]"}),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[
+ {"reply": "lst [\n 0 ]"},
+ ],
+ expected_stdout_substring="*** IndentationError",
+ expected_state={"state": "pdb"},
+ )
+
+ def test_retry_interact_prompt_on_syntax_error(self):
+ """Test re-prompting after a SyntaxError in a Python expression."""
+ incoming = [
+ ("server", {"prompt": ">>> ", "state": "interact"}),
+ ("user", {"prompt": ">>> ", "input": "!lst ["}),
+ ("user", {"prompt": ">>> ", "input": "lst ["}),
+ ("user", {"prompt": "... ", "input": " 0 ]"}),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[
+ {"reply": "lst [\n 0 ]"},
+ ],
+ expected_stdout_substring="*** SyntaxError",
+ expected_state={"state": "interact"},
+ )
+
+ def test_handling_unrecognized_prompt_type(self):
+ """Test fallback to "dumb" single-line mode for unknown states."""
+ incoming = [
+ ("server", {"prompt": "Do it? ", "state": "confirm"}),
+ ("user", {"prompt": "Do it? ", "input": "! ["}),
+ ("server", {"prompt": "Do it? ", "state": "confirm"}),
+ ("user", {"prompt": "Do it? ", "input": "echo hello"}),
+ ("server", {"prompt": "Do it? ", "state": "confirm"}),
+ ("user", {"prompt": "Do it? ", "input": ""}),
+ ("server", {"prompt": "Do it? ", "state": "confirm"}),
+ ("user", {"prompt": "Do it? ", "input": "echo goodbye"}),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[
+ {"reply": "! ["},
+ {"reply": "echo hello"},
+ {"reply": ""},
+ {"reply": "echo goodbye"},
+ ],
+ expected_state={"state": "dumb"},
+ )
+
+ def test_sigint_at_prompt(self):
+ """Test signaling when a prompt gets interrupted."""
+ incoming = [
+ ("server", {"prompt": "(Pdb) ", "state": "pdb"}),
+ (
+ "user",
+ {
+ "prompt": "(Pdb) ",
+ "input": lambda: signal.raise_signal(signal.SIGINT),
+ },
+ ),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[
+ {"signal": "INT"},
+ ],
+ expected_state={"state": "pdb"},
+ )
+
+ def test_sigint_at_continuation_prompt(self):
+ """Test signaling when a continuation prompt gets interrupted."""
+ incoming = [
+ ("server", {"prompt": "(Pdb) ", "state": "pdb"}),
+ ("user", {"prompt": "(Pdb) ", "input": "if True:"}),
+ (
+ "user",
+ {
+ "prompt": "... ",
+ "input": lambda: signal.raise_signal(signal.SIGINT),
+ },
+ ),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[
+ {"signal": "INT"},
+ ],
+ expected_state={"state": "pdb"},
+ )
+
+ def test_sigint_when_writing(self):
+ """Test siginaling when sys.stdout.write() gets interrupted."""
+ incoming = [
+ ("server", {"message": "Some message or other\n", "type": "info"}),
+ ]
+ for use_interrupt_socket in [False, True]:
+ with self.subTest(use_interrupt_socket=use_interrupt_socket):
+ self.do_test(
+ incoming=incoming,
+ simulate_sigint_during_stdout_write=True,
+ use_interrupt_socket=use_interrupt_socket,
+ expected_outgoing=[],
+ expected_outgoing_signals=[signal.SIGINT],
+ expected_stdout="Some message or other\n",
+ )
+
+ def test_eof_at_prompt(self):
+ """Test signaling when a prompt gets an EOFError."""
+ incoming = [
+ ("server", {"prompt": "(Pdb) ", "state": "pdb"}),
+ ("user", {"prompt": "(Pdb) ", "input": EOFError()}),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[
+ {"signal": "EOF"},
+ ],
+ expected_state={"state": "pdb"},
+ )
+
+ def test_unrecognized_json_message(self):
+ """Test failing after getting an unrecognized payload."""
+ incoming = [
+ ("server", {"monty": "python"}),
+ ("server", {"message": "Some message or other\n", "type": "info"}),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[],
+ expected_exception={
+ "exception": RuntimeError,
+ "msg": 'Unrecognized payload b\'{"monty": "python"}\'',
+ },
+ )
+
+ def test_continuing_after_getting_a_non_json_payload(self):
+ """Test continuing after getting a non JSON payload."""
+ incoming = [
+ ("server", b"spam"),
+ ("server", {"message": "Something", "type": "info"}),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[],
+ expected_stdout="\n".join(
+ [
+ "*** Invalid JSON from remote: b'spam\\n'",
+ "Something",
+ ]
+ ),
+ )
+
+ def test_write_failing(self):
+ """Test terminating if write fails due to a half closed socket."""
+ incoming = [
+ ("server", {"prompt": "(Pdb) ", "state": "pdb"}),
+ ("user", {"prompt": "(Pdb) ", "input": KeyboardInterrupt()}),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[{"signal": "INT"}],
+ simulate_send_failure=True,
+ expected_state={"write_failed": True},
+ )
+
+ def test_completion_in_pdb_state(self):
+ """Test requesting tab completions at a (Pdb) prompt."""
+ # GIVEN
+ incoming = [
+ ("server", {"prompt": "(Pdb) ", "state": "pdb"}),
+ (
+ "user",
+ {
+ "prompt": "(Pdb) ",
+ "completion_request": {
+ "line": " mod._",
+ "begidx": 8,
+ "endidx": 9,
+ },
+ "input": "print(\n mod.__name__)",
+ },
+ ),
+ ("server", {"completions": ["__name__", "__file__"]}),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[
+ {
+ "complete": {
+ "text": "_",
+ "line": "mod._",
+ "begidx": 4,
+ "endidx": 5,
+ }
+ },
+ {"reply": "print(\n mod.__name__)"},
+ ],
+ expected_completions=["__name__", "__file__"],
+ expected_state={"state": "pdb"},
+ )
+
+ def test_multiline_completion_in_pdb_state(self):
+ """Test requesting tab completions at a (Pdb) continuation prompt."""
+ # GIVEN
+ incoming = [
+ ("server", {"prompt": "(Pdb) ", "state": "pdb"}),
+ ("user", {"prompt": "(Pdb) ", "input": "if True:"}),
+ (
+ "user",
+ {
+ "prompt": "... ",
+ "completion_request": {
+ "line": " b",
+ "begidx": 4,
+ "endidx": 5,
+ },
+ "input": " bool()",
+ },
+ ),
+ ("server", {"completions": ["bin", "bool", "bytes"]}),
+ ("user", {"prompt": "... ", "input": ""}),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[
+ {
+ "complete": {
+ "text": "b",
+ "line": "! b",
+ "begidx": 2,
+ "endidx": 3,
+ }
+ },
+ {"reply": "if True:\n bool()\n"},
+ ],
+ expected_completions=["bin", "bool", "bytes"],
+ expected_state={"state": "pdb"},
+ )
+
+ def test_completion_in_interact_state(self):
+ """Test requesting tab completions at a >>> prompt."""
+ incoming = [
+ ("server", {"prompt": ">>> ", "state": "interact"}),
+ (
+ "user",
+ {
+ "prompt": ">>> ",
+ "completion_request": {
+ "line": " mod.__",
+ "begidx": 8,
+ "endidx": 10,
+ },
+ "input": "print(\n mod.__name__)",
+ },
+ ),
+ ("server", {"completions": ["__name__", "__file__"]}),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[
+ {
+ "complete": {
+ "text": "__",
+ "line": "mod.__",
+ "begidx": 4,
+ "endidx": 6,
+ }
+ },
+ {"reply": "print(\n mod.__name__)"},
+ ],
+ expected_completions=["__name__", "__file__"],
+ expected_state={"state": "interact"},
+ )
+
+ def test_completion_in_unknown_state(self):
+ """Test requesting tab completions at an unrecognized prompt."""
+ incoming = [
+ ("server", {"command_list": ["p"]}),
+ ("server", {"prompt": "Do it? ", "state": "confirm"}),
+ (
+ "user",
+ {
+ "prompt": "Do it? ",
+ "completion_request": {
+ "line": "_",
+ "begidx": 0,
+ "endidx": 1,
+ },
+ "input": "__name__",
+ },
+ ),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[
+ {"reply": "__name__"},
+ ],
+ expected_state={"state": "dumb"},
+ )
+
+ def test_write_failure_during_completion(self):
+ """Test failing to write to the socket to request tab completions."""
+ incoming = [
+ ("server", {"prompt": ">>> ", "state": "interact"}),
+ (
+ "user",
+ {
+ "prompt": ">>> ",
+ "completion_request": {
+ "line": "xy",
+ "begidx": 0,
+ "endidx": 2,
+ },
+ "input": "xyz",
+ },
+ ),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[
+ {
+ "complete": {
+ "text": "xy",
+ "line": "xy",
+ "begidx": 0,
+ "endidx": 2,
+ }
+ },
+ {"reply": "xyz"},
+ ],
+ simulate_send_failure=True,
+ expected_completions=[],
+ expected_state={"state": "interact", "write_failed": True},
+ )
+
+ def test_read_failure_during_completion(self):
+ """Test failing to read tab completions from the socket."""
+ incoming = [
+ ("server", {"prompt": ">>> ", "state": "interact"}),
+ (
+ "user",
+ {
+ "prompt": ">>> ",
+ "completion_request": {
+ "line": "xy",
+ "begidx": 0,
+ "endidx": 2,
+ },
+ "input": "xyz",
+ },
+ ),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[
+ {
+ "complete": {
+ "text": "xy",
+ "line": "xy",
+ "begidx": 0,
+ "endidx": 2,
+ }
+ },
+ {"reply": "xyz"},
+ ],
+ expected_completions=[],
+ expected_state={"state": "interact"},
+ )
+
+ def test_reading_invalid_json_during_completion(self):
+ """Test receiving invalid JSON when getting tab completions."""
+ incoming = [
+ ("server", {"prompt": ">>> ", "state": "interact"}),
+ (
+ "user",
+ {
+ "prompt": ">>> ",
+ "completion_request": {
+ "line": "xy",
+ "begidx": 0,
+ "endidx": 2,
+ },
+ "input": "xyz",
+ },
+ ),
+ ("server", b'{"completions": '),
+ ("user", {"prompt": ">>> ", "input": "xyz"}),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[
+ {
+ "complete": {
+ "text": "xy",
+ "line": "xy",
+ "begidx": 0,
+ "endidx": 2,
+ }
+ },
+ {"reply": "xyz"},
+ ],
+ expected_stdout_substring="*** json.decoder.JSONDecodeError",
+ expected_completions=[],
+ expected_state={"state": "interact"},
+ )
+
+ def test_reading_empty_json_during_completion(self):
+ """Test receiving an empty JSON object when getting tab completions."""
+ incoming = [
+ ("server", {"prompt": ">>> ", "state": "interact"}),
+ (
+ "user",
+ {
+ "prompt": ">>> ",
+ "completion_request": {
+ "line": "xy",
+ "begidx": 0,
+ "endidx": 2,
+ },
+ "input": "xyz",
+ },
+ ),
+ ("server", {}),
+ ("user", {"prompt": ">>> ", "input": "xyz"}),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[
+ {
+ "complete": {
+ "text": "xy",
+ "line": "xy",
+ "begidx": 0,
+ "endidx": 2,
+ }
+ },
+ {"reply": "xyz"},
+ ],
+ expected_stdout=(
+ "*** RuntimeError: Failed to get valid completions."
+ " Got: {}\n"
+ ),
+ expected_completions=[],
+ expected_state={"state": "interact"},
+ )
+
+
class RemotePdbTestCase(unittest.TestCase):
"""Tests for the _PdbServer class."""
@@ -298,6 +1036,8 @@ class PdbConnectTestCase(unittest.TestCase):
frame=frame,
commands="",
version=pdb._PdbServer.protocol_version(),
+ signal_raising_thread=False,
+ colorize=False,
)
return x # This line won't be reached in debugging
@@ -355,23 +1095,6 @@ class PdbConnectTestCase(unittest.TestCase):
client_file.write(json.dumps({"reply": command}).encode() + b"\n")
client_file.flush()
- def _send_interrupt(self, pid):
- """Helper to send an interrupt signal to the debugger."""
- # with tempfile.NamedTemporaryFile("w", delete_on_close=False) as interrupt_script:
- interrupt_script = TESTFN + "_interrupt_script.py"
- with open(interrupt_script, 'w') as f:
- f.write(
- 'import pdb, sys\n'
- 'print("Hello, world!")\n'
- 'if inst := pdb.Pdb._last_pdb_instance:\n'
- ' inst.set_trace(sys._getframe(1))\n'
- )
- self.addCleanup(unlink, interrupt_script)
- try:
- sys.remote_exec(pid, interrupt_script)
- except PermissionError:
- self.skipTest("Insufficient permissions to execute code in remote process")
-
def test_connect_and_basic_commands(self):
"""Test connecting to a remote debugger and sending basic commands."""
self._create_script()
@@ -484,6 +1207,8 @@ class PdbConnectTestCase(unittest.TestCase):
frame=frame,
commands="",
version=pdb._PdbServer.protocol_version(),
+ signal_raising_thread=True,
+ colorize=False,
)
print("Connected to debugger")
iterations = 50
@@ -499,6 +1224,10 @@ class PdbConnectTestCase(unittest.TestCase):
self._create_script(script=script)
process, client_file = self._connect_and_get_client_file()
+ # Accept a 2nd connection from the subprocess to tell it about signals
+ signal_sock, _ = self.server_sock.accept()
+ self.addCleanup(signal_sock.close)
+
with kill_on_error(process):
# Skip initial messages until we get to the prompt
self._read_until_prompt(client_file)
@@ -514,7 +1243,7 @@ class PdbConnectTestCase(unittest.TestCase):
break
# Inject a script to interrupt the running process
- self._send_interrupt(process.pid)
+ signal_sock.sendall(signal.SIGINT.to_bytes())
messages = self._read_until_prompt(client_file)
# Verify we got the keyboard interrupt message.
@@ -570,6 +1299,8 @@ class PdbConnectTestCase(unittest.TestCase):
frame=frame,
commands="",
version=fake_version,
+ signal_raising_thread=False,
+ colorize=False,
)
# This should print if the debugger detaches correctly
@@ -697,5 +1428,151 @@ class PdbConnectTestCase(unittest.TestCase):
self.assertIn("Function returned: 42", stdout)
self.assertEqual(process.returncode, 0)
+
+def _supports_remote_attaching():
+ PROCESS_VM_READV_SUPPORTED = False
+
+ try:
+ from _remote_debugging import PROCESS_VM_READV_SUPPORTED
+ except ImportError:
+ pass
+
+ return PROCESS_VM_READV_SUPPORTED
+
+
+@unittest.skipIf(not sys.is_remote_debug_enabled(), "Remote debugging is not enabled")
+@unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux" and sys.platform != "win32",
+ "Test only runs on Linux, Windows and MacOS")
+@unittest.skipIf(sys.platform == "linux" and not _supports_remote_attaching(),
+ "Testing on Linux requires process_vm_readv support")
+@cpython_only
+@requires_subprocess()
+class PdbAttachTestCase(unittest.TestCase):
+ def setUp(self):
+ # Create a server socket that will wait for the debugger to connect
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.sock.bind(('127.0.0.1', 0)) # Let OS assign port
+ self.sock.listen(1)
+ self.port = self.sock.getsockname()[1]
+ self._create_script()
+
+ def _create_script(self, script=None):
+ # Create a file for subprocess script
+ script = textwrap.dedent(
+ f"""
+ import socket
+ import time
+
+ def foo():
+ return bar()
+
+ def bar():
+ return baz()
+
+ def baz():
+ x = 1
+ # Trigger attach
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.connect(('127.0.0.1', {self.port}))
+ sock.close()
+ count = 0
+ while x == 1 and count < 100:
+ count += 1
+ time.sleep(0.1)
+ return x
+
+ result = foo()
+ print(f"Function returned: {{result}}")
+ """
+ )
+
+ self.script_path = TESTFN + "_connect_test.py"
+ with open(self.script_path, 'w') as f:
+ f.write(script)
+
+ def tearDown(self):
+ self.sock.close()
+ try:
+ unlink(self.script_path)
+ except OSError:
+ pass
+
+ def do_integration_test(self, client_stdin):
+ process = subprocess.Popen(
+ [sys.executable, self.script_path],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ text=True
+ )
+ self.addCleanup(process.stdout.close)
+ self.addCleanup(process.stderr.close)
+
+ # Wait for the process to reach our attachment point
+ self.sock.settimeout(10)
+ conn, _ = self.sock.accept()
+ conn.close()
+
+ client_stdin = io.StringIO(client_stdin)
+ client_stdout = io.StringIO()
+ client_stderr = io.StringIO()
+
+ self.addCleanup(client_stdin.close)
+ self.addCleanup(client_stdout.close)
+ self.addCleanup(client_stderr.close)
+ self.addCleanup(process.wait)
+
+ with (
+ unittest.mock.patch("sys.stdin", client_stdin),
+ redirect_stdout(client_stdout),
+ redirect_stderr(client_stderr),
+ unittest.mock.patch("sys.argv", ["pdb", "-p", str(process.pid)]),
+ ):
+ try:
+ pdb.main()
+ except PermissionError:
+ self.skipTest("Insufficient permissions for remote execution")
+
+ process.wait()
+ server_stdout = process.stdout.read()
+ server_stderr = process.stderr.read()
+
+ if process.returncode != 0:
+ print("server failed")
+ print(f"server stdout:\n{server_stdout}")
+ print(f"server stderr:\n{server_stderr}")
+
+ self.assertEqual(process.returncode, 0)
+ return {
+ "client": {
+ "stdout": client_stdout.getvalue(),
+ "stderr": client_stderr.getvalue(),
+ },
+ "server": {
+ "stdout": server_stdout,
+ "stderr": server_stderr,
+ },
+ }
+
+ def test_attach_to_process_without_colors(self):
+ with force_color(False):
+ output = self.do_integration_test("ll\nx=42\n")
+ self.assertEqual(output["client"]["stderr"], "")
+ self.assertEqual(output["server"]["stderr"], "")
+
+ self.assertEqual(output["server"]["stdout"], "Function returned: 42\n")
+ self.assertIn("while x == 1", output["client"]["stdout"])
+ self.assertNotIn("\x1b", output["client"]["stdout"])
+
+ def test_attach_to_process_with_colors(self):
+ with force_color(True):
+ output = self.do_integration_test("ll\nx=42\n")
+ self.assertEqual(output["client"]["stderr"], "")
+ self.assertEqual(output["server"]["stderr"], "")
+
+ self.assertEqual(output["server"]["stdout"], "Function returned: 42\n")
+ self.assertIn("\x1b", output["client"]["stdout"])
+ self.assertNotIn("while x == 1", output["client"]["stdout"])
+ self.assertIn("while x == 1", re.sub("\x1b[^m]*m", "", output["client"]["stdout"]))
+
if __name__ == "__main__":
unittest.main()