aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/test/test_remote_pdb.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_remote_pdb.py')
-rw-r--r--Lib/test/test_remote_pdb.py457
1 files changed, 319 insertions, 138 deletions
diff --git a/Lib/test/test_remote_pdb.py b/Lib/test/test_remote_pdb.py
index e4c44c78d4a..a1c50af15f3 100644
--- a/Lib/test/test_remote_pdb.py
+++ b/Lib/test/test_remote_pdb.py
@@ -1,22 +1,19 @@
import io
-import time
import itertools
import json
import os
+import re
import signal
import socket
import subprocess
import sys
-import tempfile
import textwrap
-import threading
import unittest
import unittest.mock
-from contextlib import contextmanager, redirect_stdout, ExitStack
-from pathlib import Path
-from test.support import is_wasi, os_helper, requires_subprocess, SHORT_TIMEOUT
-from test.support.os_helper import temp_dir, TESTFN, unlink
-from typing import Dict, List, Optional, Tuple, Union, Any
+from contextlib import closing, contextmanager, redirect_stdout, redirect_stderr, ExitStack
+from test.support import is_wasi, cpython_only, force_color, requires_subprocess, SHORT_TIMEOUT
+from test.support.os_helper import TESTFN, unlink
+from typing import List
import pdb
from pdb import _PdbServer, _PdbClient
@@ -79,44 +76,6 @@ class MockSocketFile:
return results
-class MockDebuggerSocket:
- """Mock file-like simulating a connection to a _RemotePdb instance"""
-
- def __init__(self, incoming):
- self.incoming = iter(incoming)
- self.outgoing = []
- self.buffered = bytearray()
-
- def write(self, data: bytes) -> None:
- """Simulate write to socket."""
- self.buffered += data
-
- def flush(self) -> None:
- """Ensure each line is valid JSON."""
- lines = self.buffered.splitlines(keepends=True)
- self.buffered.clear()
- for line in lines:
- assert line.endswith(b"\n")
- self.outgoing.append(json.loads(line))
-
- def readline(self) -> bytes:
- """Read a line from the prepared input queue."""
- # Anything written must be flushed before trying to read,
- # since the read will be dependent upon the last write.
- assert not self.buffered
- try:
- item = next(self.incoming)
- if not isinstance(item, bytes):
- item = json.dumps(item).encode()
- return item + b"\n"
- except StopIteration:
- return b""
-
- def close(self) -> None:
- """No-op close implementation."""
- pass
-
-
class PdbClientTestCase(unittest.TestCase):
"""Tests for the _PdbClient class."""
@@ -124,8 +83,11 @@ class PdbClientTestCase(unittest.TestCase):
self,
*,
incoming,
- simulate_failure=None,
+ simulate_send_failure=False,
+ simulate_sigint_during_stdout_write=False,
+ use_interrupt_socket=False,
expected_outgoing=None,
+ expected_outgoing_signals=None,
expected_completions=None,
expected_exception=None,
expected_stdout="",
@@ -134,6 +96,8 @@ class PdbClientTestCase(unittest.TestCase):
):
if expected_outgoing is None:
expected_outgoing = []
+ if expected_outgoing_signals is None:
+ expected_outgoing_signals = []
if expected_completions is None:
expected_completions = []
if expected_state is None:
@@ -142,16 +106,6 @@ class PdbClientTestCase(unittest.TestCase):
expected_state.setdefault("write_failed", False)
messages = [m for source, m in incoming if source == "server"]
prompts = [m["prompt"] for source, m in incoming if source == "user"]
- sockfile = MockDebuggerSocket(messages)
- stdout = io.StringIO()
-
- if simulate_failure:
- sockfile.write = unittest.mock.Mock()
- sockfile.flush = unittest.mock.Mock()
- if simulate_failure == "write":
- sockfile.write.side_effect = OSError("write failed")
- elif simulate_failure == "flush":
- sockfile.flush.side_effect = OSError("flush failed")
input_iter = (m for source, m in incoming if source == "user")
completions = []
@@ -178,18 +132,60 @@ class PdbClientTestCase(unittest.TestCase):
reply = message["input"]
if isinstance(reply, BaseException):
raise reply
- return reply
+ if isinstance(reply, str):
+ return reply
+ return reply()
with ExitStack() as stack:
+ client_sock, server_sock = socket.socketpair()
+ stack.enter_context(closing(client_sock))
+ stack.enter_context(closing(server_sock))
+
+ server_sock = unittest.mock.Mock(wraps=server_sock)
+
+ client_sock.sendall(
+ b"".join(
+ (m if isinstance(m, bytes) else json.dumps(m).encode()) + b"\n"
+ for m in messages
+ )
+ )
+ client_sock.shutdown(socket.SHUT_WR)
+
+ if simulate_send_failure:
+ server_sock.sendall = unittest.mock.Mock(
+ side_effect=OSError("sendall failed")
+ )
+ client_sock.shutdown(socket.SHUT_RD)
+
+ stdout = io.StringIO()
+
+ if simulate_sigint_during_stdout_write:
+ orig_stdout_write = stdout.write
+
+ def sigint_stdout_write(s):
+ signal.raise_signal(signal.SIGINT)
+ return orig_stdout_write(s)
+
+ stdout.write = sigint_stdout_write
+
input_mock = stack.enter_context(
unittest.mock.patch("pdb.input", side_effect=mock_input)
)
stack.enter_context(redirect_stdout(stdout))
+ if use_interrupt_socket:
+ interrupt_sock = unittest.mock.Mock(spec=socket.socket)
+ mock_kill = None
+ else:
+ interrupt_sock = None
+ mock_kill = stack.enter_context(
+ unittest.mock.patch("os.kill", spec=os.kill)
+ )
+
client = _PdbClient(
- pid=0,
- sockfile=sockfile,
- interrupt_script="/a/b.py",
+ pid=12345,
+ server_socket=server_sock,
+ interrupt_sock=interrupt_sock,
)
if expected_exception is not None:
@@ -199,13 +195,12 @@ class PdbClientTestCase(unittest.TestCase):
client.cmdloop()
- actual_outgoing = sockfile.outgoing
- if simulate_failure:
- actual_outgoing += [
- json.loads(msg.args[0]) for msg in sockfile.write.mock_calls
- ]
+ sent_msgs = [msg.args[0] for msg in server_sock.sendall.mock_calls]
+ for msg in sent_msgs:
+ assert msg.endswith(b"\n")
+ actual_outgoing = [json.loads(msg) for msg in sent_msgs]
- self.assertEqual(sockfile.outgoing, expected_outgoing)
+ self.assertEqual(actual_outgoing, expected_outgoing)
self.assertEqual(completions, expected_completions)
if expected_stdout_substring and not expected_stdout:
self.assertIn(expected_stdout_substring, stdout.getvalue())
@@ -215,6 +210,20 @@ class PdbClientTestCase(unittest.TestCase):
actual_state = {k: getattr(client, k) for k in expected_state}
self.assertEqual(actual_state, expected_state)
+ if use_interrupt_socket:
+ outgoing_signals = [
+ signal.Signals(int.from_bytes(call.args[0]))
+ for call in interrupt_sock.sendall.call_args_list
+ ]
+ else:
+ assert mock_kill is not None
+ outgoing_signals = []
+ for call in mock_kill.call_args_list:
+ pid, signum = call.args
+ self.assertEqual(pid, 12345)
+ outgoing_signals.append(signal.Signals(signum))
+ self.assertEqual(outgoing_signals, expected_outgoing_signals)
+
def test_remote_immediately_closing_the_connection(self):
"""Test the behavior when the remote closes the connection immediately."""
incoming = []
@@ -409,11 +418,38 @@ class PdbClientTestCase(unittest.TestCase):
expected_state={"state": "dumb"},
)
- def test_keyboard_interrupt_at_prompt(self):
- """Test signaling when a prompt gets a KeyboardInterrupt."""
+ def test_sigint_at_prompt(self):
+ """Test signaling when a prompt gets interrupted."""
incoming = [
("server", {"prompt": "(Pdb) ", "state": "pdb"}),
- ("user", {"prompt": "(Pdb) ", "input": KeyboardInterrupt()}),
+ (
+ "user",
+ {
+ "prompt": "(Pdb) ",
+ "input": lambda: signal.raise_signal(signal.SIGINT),
+ },
+ ),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[
+ {"signal": "INT"},
+ ],
+ expected_state={"state": "pdb"},
+ )
+
+ def test_sigint_at_continuation_prompt(self):
+ """Test signaling when a continuation prompt gets interrupted."""
+ incoming = [
+ ("server", {"prompt": "(Pdb) ", "state": "pdb"}),
+ ("user", {"prompt": "(Pdb) ", "input": "if True:"}),
+ (
+ "user",
+ {
+ "prompt": "... ",
+ "input": lambda: signal.raise_signal(signal.SIGINT),
+ },
+ ),
]
self.do_test(
incoming=incoming,
@@ -423,6 +459,22 @@ class PdbClientTestCase(unittest.TestCase):
expected_state={"state": "pdb"},
)
+ def test_sigint_when_writing(self):
+ """Test siginaling when sys.stdout.write() gets interrupted."""
+ incoming = [
+ ("server", {"message": "Some message or other\n", "type": "info"}),
+ ]
+ for use_interrupt_socket in [False, True]:
+ with self.subTest(use_interrupt_socket=use_interrupt_socket):
+ self.do_test(
+ incoming=incoming,
+ simulate_sigint_during_stdout_write=True,
+ use_interrupt_socket=use_interrupt_socket,
+ expected_outgoing=[],
+ expected_outgoing_signals=[signal.SIGINT],
+ expected_stdout="Some message or other\n",
+ )
+
def test_eof_at_prompt(self):
"""Test signaling when a prompt gets an EOFError."""
incoming = [
@@ -478,20 +530,7 @@ class PdbClientTestCase(unittest.TestCase):
self.do_test(
incoming=incoming,
expected_outgoing=[{"signal": "INT"}],
- simulate_failure="write",
- expected_state={"write_failed": True},
- )
-
- def test_flush_failing(self):
- """Test terminating if flush fails due to a half closed socket."""
- incoming = [
- ("server", {"prompt": "(Pdb) ", "state": "pdb"}),
- ("user", {"prompt": "(Pdb) ", "input": KeyboardInterrupt()}),
- ]
- self.do_test(
- incoming=incoming,
- expected_outgoing=[{"signal": "INT"}],
- simulate_failure="flush",
+ simulate_send_failure=True,
expected_state={"write_failed": True},
)
@@ -531,6 +570,44 @@ class PdbClientTestCase(unittest.TestCase):
expected_state={"state": "pdb"},
)
+ def test_multiline_completion_in_pdb_state(self):
+ """Test requesting tab completions at a (Pdb) continuation prompt."""
+ # GIVEN
+ incoming = [
+ ("server", {"prompt": "(Pdb) ", "state": "pdb"}),
+ ("user", {"prompt": "(Pdb) ", "input": "if True:"}),
+ (
+ "user",
+ {
+ "prompt": "... ",
+ "completion_request": {
+ "line": " b",
+ "begidx": 4,
+ "endidx": 5,
+ },
+ "input": " bool()",
+ },
+ ),
+ ("server", {"completions": ["bin", "bool", "bytes"]}),
+ ("user", {"prompt": "... ", "input": ""}),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[
+ {
+ "complete": {
+ "text": "b",
+ "line": "! b",
+ "begidx": 2,
+ "endidx": 3,
+ }
+ },
+ {"reply": "if True:\n bool()\n"},
+ ],
+ expected_completions=["bin", "bool", "bytes"],
+ expected_state={"state": "pdb"},
+ )
+
def test_completion_in_interact_state(self):
"""Test requesting tab completions at a >>> prompt."""
incoming = [
@@ -622,42 +699,7 @@ class PdbClientTestCase(unittest.TestCase):
},
{"reply": "xyz"},
],
- simulate_failure="write",
- expected_completions=[],
- expected_state={"state": "interact", "write_failed": True},
- )
-
- def test_flush_failure_during_completion(self):
- """Test failing to flush to the socket to request tab completions."""
- incoming = [
- ("server", {"prompt": ">>> ", "state": "interact"}),
- (
- "user",
- {
- "prompt": ">>> ",
- "completion_request": {
- "line": "xy",
- "begidx": 0,
- "endidx": 2,
- },
- "input": "xyz",
- },
- ),
- ]
- self.do_test(
- incoming=incoming,
- expected_outgoing=[
- {
- "complete": {
- "text": "xy",
- "line": "xy",
- "begidx": 0,
- "endidx": 2,
- }
- },
- {"reply": "xyz"},
- ],
- simulate_failure="flush",
+ simulate_send_failure=True,
expected_completions=[],
expected_state={"state": "interact", "write_failed": True},
)
@@ -994,6 +1036,8 @@ class PdbConnectTestCase(unittest.TestCase):
frame=frame,
commands="",
version=pdb._PdbServer.protocol_version(),
+ signal_raising_thread=False,
+ colorize=False,
)
return x # This line won't be reached in debugging
@@ -1051,23 +1095,6 @@ class PdbConnectTestCase(unittest.TestCase):
client_file.write(json.dumps({"reply": command}).encode() + b"\n")
client_file.flush()
- def _send_interrupt(self, pid):
- """Helper to send an interrupt signal to the debugger."""
- # with tempfile.NamedTemporaryFile("w", delete_on_close=False) as interrupt_script:
- interrupt_script = TESTFN + "_interrupt_script.py"
- with open(interrupt_script, 'w') as f:
- f.write(
- 'import pdb, sys\n'
- 'print("Hello, world!")\n'
- 'if inst := pdb.Pdb._last_pdb_instance:\n'
- ' inst.set_trace(sys._getframe(1))\n'
- )
- self.addCleanup(unlink, interrupt_script)
- try:
- sys.remote_exec(pid, interrupt_script)
- except PermissionError:
- self.skipTest("Insufficient permissions to execute code in remote process")
-
def test_connect_and_basic_commands(self):
"""Test connecting to a remote debugger and sending basic commands."""
self._create_script()
@@ -1180,6 +1207,8 @@ class PdbConnectTestCase(unittest.TestCase):
frame=frame,
commands="",
version=pdb._PdbServer.protocol_version(),
+ signal_raising_thread=True,
+ colorize=False,
)
print("Connected to debugger")
iterations = 50
@@ -1195,6 +1224,10 @@ class PdbConnectTestCase(unittest.TestCase):
self._create_script(script=script)
process, client_file = self._connect_and_get_client_file()
+ # Accept a 2nd connection from the subprocess to tell it about signals
+ signal_sock, _ = self.server_sock.accept()
+ self.addCleanup(signal_sock.close)
+
with kill_on_error(process):
# Skip initial messages until we get to the prompt
self._read_until_prompt(client_file)
@@ -1210,7 +1243,7 @@ class PdbConnectTestCase(unittest.TestCase):
break
# Inject a script to interrupt the running process
- self._send_interrupt(process.pid)
+ signal_sock.sendall(signal.SIGINT.to_bytes())
messages = self._read_until_prompt(client_file)
# Verify we got the keyboard interrupt message.
@@ -1266,6 +1299,8 @@ class PdbConnectTestCase(unittest.TestCase):
frame=frame,
commands="",
version=fake_version,
+ signal_raising_thread=False,
+ colorize=False,
)
# This should print if the debugger detaches correctly
@@ -1393,5 +1428,151 @@ class PdbConnectTestCase(unittest.TestCase):
self.assertIn("Function returned: 42", stdout)
self.assertEqual(process.returncode, 0)
+
+def _supports_remote_attaching():
+ PROCESS_VM_READV_SUPPORTED = False
+
+ try:
+ from _remote_debugging import PROCESS_VM_READV_SUPPORTED
+ except ImportError:
+ pass
+
+ return PROCESS_VM_READV_SUPPORTED
+
+
+@unittest.skipIf(not sys.is_remote_debug_enabled(), "Remote debugging is not enabled")
+@unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux" and sys.platform != "win32",
+ "Test only runs on Linux, Windows and MacOS")
+@unittest.skipIf(sys.platform == "linux" and not _supports_remote_attaching(),
+ "Testing on Linux requires process_vm_readv support")
+@cpython_only
+@requires_subprocess()
+class PdbAttachTestCase(unittest.TestCase):
+ def setUp(self):
+ # Create a server socket that will wait for the debugger to connect
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.sock.bind(('127.0.0.1', 0)) # Let OS assign port
+ self.sock.listen(1)
+ self.port = self.sock.getsockname()[1]
+ self._create_script()
+
+ def _create_script(self, script=None):
+ # Create a file for subprocess script
+ script = textwrap.dedent(
+ f"""
+ import socket
+ import time
+
+ def foo():
+ return bar()
+
+ def bar():
+ return baz()
+
+ def baz():
+ x = 1
+ # Trigger attach
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.connect(('127.0.0.1', {self.port}))
+ sock.close()
+ count = 0
+ while x == 1 and count < 100:
+ count += 1
+ time.sleep(0.1)
+ return x
+
+ result = foo()
+ print(f"Function returned: {{result}}")
+ """
+ )
+
+ self.script_path = TESTFN + "_connect_test.py"
+ with open(self.script_path, 'w') as f:
+ f.write(script)
+
+ def tearDown(self):
+ self.sock.close()
+ try:
+ unlink(self.script_path)
+ except OSError:
+ pass
+
+ def do_integration_test(self, client_stdin):
+ process = subprocess.Popen(
+ [sys.executable, self.script_path],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ text=True
+ )
+ self.addCleanup(process.stdout.close)
+ self.addCleanup(process.stderr.close)
+
+ # Wait for the process to reach our attachment point
+ self.sock.settimeout(10)
+ conn, _ = self.sock.accept()
+ conn.close()
+
+ client_stdin = io.StringIO(client_stdin)
+ client_stdout = io.StringIO()
+ client_stderr = io.StringIO()
+
+ self.addCleanup(client_stdin.close)
+ self.addCleanup(client_stdout.close)
+ self.addCleanup(client_stderr.close)
+ self.addCleanup(process.wait)
+
+ with (
+ unittest.mock.patch("sys.stdin", client_stdin),
+ redirect_stdout(client_stdout),
+ redirect_stderr(client_stderr),
+ unittest.mock.patch("sys.argv", ["pdb", "-p", str(process.pid)]),
+ ):
+ try:
+ pdb.main()
+ except PermissionError:
+ self.skipTest("Insufficient permissions for remote execution")
+
+ process.wait()
+ server_stdout = process.stdout.read()
+ server_stderr = process.stderr.read()
+
+ if process.returncode != 0:
+ print("server failed")
+ print(f"server stdout:\n{server_stdout}")
+ print(f"server stderr:\n{server_stderr}")
+
+ self.assertEqual(process.returncode, 0)
+ return {
+ "client": {
+ "stdout": client_stdout.getvalue(),
+ "stderr": client_stderr.getvalue(),
+ },
+ "server": {
+ "stdout": server_stdout,
+ "stderr": server_stderr,
+ },
+ }
+
+ def test_attach_to_process_without_colors(self):
+ with force_color(False):
+ output = self.do_integration_test("ll\nx=42\n")
+ self.assertEqual(output["client"]["stderr"], "")
+ self.assertEqual(output["server"]["stderr"], "")
+
+ self.assertEqual(output["server"]["stdout"], "Function returned: 42\n")
+ self.assertIn("while x == 1", output["client"]["stdout"])
+ self.assertNotIn("\x1b", output["client"]["stdout"])
+
+ def test_attach_to_process_with_colors(self):
+ with force_color(True):
+ output = self.do_integration_test("ll\nx=42\n")
+ self.assertEqual(output["client"]["stderr"], "")
+ self.assertEqual(output["server"]["stderr"], "")
+
+ self.assertEqual(output["server"]["stdout"], "Function returned: 42\n")
+ self.assertIn("\x1b", output["client"]["stdout"])
+ self.assertNotIn("while x == 1", output["client"]["stdout"])
+ self.assertIn("while x == 1", re.sub("\x1b[^m]*m", "", output["client"]["stdout"]))
+
if __name__ == "__main__":
unittest.main()