aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/_pyrepl
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/_pyrepl')
-rw-r--r--Lib/_pyrepl/_module_completer.py33
-rw-r--r--Lib/_pyrepl/base_eventqueue.py18
-rw-r--r--Lib/_pyrepl/commands.py37
-rw-r--r--Lib/_pyrepl/main.py11
-rw-r--r--Lib/_pyrepl/mypy.ini4
-rw-r--r--Lib/_pyrepl/reader.py46
-rw-r--r--Lib/_pyrepl/readline.py10
-rw-r--r--Lib/_pyrepl/simple_interact.py17
-rw-r--r--Lib/_pyrepl/unix_console.py39
-rw-r--r--Lib/_pyrepl/utils.py307
-rw-r--r--Lib/_pyrepl/windows_console.py87
11 files changed, 484 insertions, 125 deletions
diff --git a/Lib/_pyrepl/_module_completer.py b/Lib/_pyrepl/_module_completer.py
index 1fb043e0b70..1e9462a4215 100644
--- a/Lib/_pyrepl/_module_completer.py
+++ b/Lib/_pyrepl/_module_completer.py
@@ -2,6 +2,7 @@ from __future__ import annotations
import pkgutil
import sys
+import token
import tokenize
from io import StringIO
from contextlib import contextmanager
@@ -16,8 +17,8 @@ if TYPE_CHECKING:
def make_default_module_completer() -> ModuleCompleter:
- # Inside pyrepl, __package__ is set to '_pyrepl'
- return ModuleCompleter(namespace={'__package__': '_pyrepl'})
+ # Inside pyrepl, __package__ is set to None by default
+ return ModuleCompleter(namespace={'__package__': None})
class ModuleCompleter:
@@ -41,11 +42,11 @@ class ModuleCompleter:
self._global_cache: list[pkgutil.ModuleInfo] = []
self._curr_sys_path: list[str] = sys.path[:]
- def get_completions(self, line: str) -> list[str]:
+ def get_completions(self, line: str) -> list[str] | None:
"""Return the next possible import completions for 'line'."""
result = ImportParser(line).parse()
if not result:
- return []
+ return None
try:
return self.complete(*result)
except Exception:
@@ -80,8 +81,11 @@ class ModuleCompleter:
def _find_modules(self, path: str, prefix: str) -> list[str]:
if not path:
# Top-level import (e.g. `import foo<tab>`` or `from foo<tab>`)`
- return [name for _, name, _ in self.global_cache
- if name.startswith(prefix)]
+ builtin_modules = [name for name in sys.builtin_module_names
+ if self.is_suggestion_match(name, prefix)]
+ third_party_modules = [module.name for module in self.global_cache
+ if self.is_suggestion_match(module.name, prefix)]
+ return sorted(builtin_modules + third_party_modules)
if path.startswith('.'):
# Convert relative path to absolute path
@@ -96,7 +100,14 @@ class ModuleCompleter:
if mod_info.ispkg and mod_info.name == segment]
modules = self.iter_submodules(modules)
return [module.name for module in modules
- if module.name.startswith(prefix)]
+ if self.is_suggestion_match(module.name, prefix)]
+
+ def is_suggestion_match(self, module_name: str, prefix: str) -> bool:
+ if prefix:
+ return module_name.startswith(prefix)
+ # For consistency with attribute completion, which
+ # does not suggest private attributes unless requested.
+ return not module_name.startswith("_")
def iter_submodules(self, parent_modules: list[pkgutil.ModuleInfo]) -> Iterator[pkgutil.ModuleInfo]:
"""Iterate over all submodules of the given parent modules."""
@@ -180,8 +191,8 @@ class ImportParser:
when parsing multiple statements.
"""
_ignored_tokens = {
- tokenize.INDENT, tokenize.DEDENT, tokenize.COMMENT,
- tokenize.NL, tokenize.NEWLINE, tokenize.ENDMARKER
+ token.INDENT, token.DEDENT, token.COMMENT,
+ token.NL, token.NEWLINE, token.ENDMARKER
}
_keywords = {'import', 'from', 'as'}
@@ -350,11 +361,11 @@ class TokenQueue:
def peek_name(self) -> bool:
if not (tok := self.peek()):
return False
- return tok.type == tokenize.NAME
+ return tok.type == token.NAME
def pop_name(self) -> str:
tok = self.pop()
- if tok.type != tokenize.NAME:
+ if tok.type != token.NAME:
raise ParseError('pop_name')
return tok.string
diff --git a/Lib/_pyrepl/base_eventqueue.py b/Lib/_pyrepl/base_eventqueue.py
index e018c4fc183..0589a0f437e 100644
--- a/Lib/_pyrepl/base_eventqueue.py
+++ b/Lib/_pyrepl/base_eventqueue.py
@@ -69,18 +69,14 @@ class BaseEventQueue:
trace('added event {event}', event=event)
self.events.append(event)
- def push(self, char: int | bytes | str) -> None:
+ def push(self, char: int | bytes) -> None:
"""
Processes a character by updating the buffer and handling special key mappings.
"""
+ assert isinstance(char, (int, bytes))
ord_char = char if isinstance(char, int) else ord(char)
- if ord_char > 255:
- assert isinstance(char, str)
- char = bytes(char.encode(self.encoding, "replace"))
- self.buf.extend(char)
- else:
- char = bytes(bytearray((ord_char,)))
- self.buf.append(ord_char)
+ char = ord_char.to_bytes()
+ self.buf.append(ord_char)
if char in self.keymap:
if self.keymap is self.compiled_keymap:
@@ -91,7 +87,7 @@ class BaseEventQueue:
if isinstance(k, dict):
self.keymap = k
else:
- self.insert(Event('key', k, self.flush_buf()))
+ self.insert(Event('key', k, bytes(self.flush_buf())))
self.keymap = self.compiled_keymap
elif self.buf and self.buf[0] == 27: # escape
@@ -100,7 +96,7 @@ class BaseEventQueue:
# the docstring in keymap.py
trace('unrecognized escape sequence, propagating...')
self.keymap = self.compiled_keymap
- self.insert(Event('key', '\033', bytearray(b'\033')))
+ self.insert(Event('key', '\033', b'\033'))
for _c in self.flush_buf()[1:]:
self.push(_c)
@@ -110,5 +106,5 @@ class BaseEventQueue:
except UnicodeError:
return
else:
- self.insert(Event('key', decoded, self.flush_buf()))
+ self.insert(Event('key', decoded, bytes(self.flush_buf())))
self.keymap = self.compiled_keymap
diff --git a/Lib/_pyrepl/commands.py b/Lib/_pyrepl/commands.py
index cbb6d85f683..50c824995d8 100644
--- a/Lib/_pyrepl/commands.py
+++ b/Lib/_pyrepl/commands.py
@@ -21,6 +21,7 @@
from __future__ import annotations
import os
+import time
# Categories of actions:
# killing
@@ -31,6 +32,7 @@ import os
# finishing
# [completion]
+from .trace import trace
# types
if False:
@@ -368,6 +370,13 @@ class self_insert(EditCommand):
r = self.reader
text = self.event * r.get_arg()
r.insert(text)
+ if r.paste_mode:
+ data = ""
+ ev = r.console.getpending()
+ data += ev.data
+ if data:
+ r.insert(data)
+ r.last_refresh_cache.invalidated = True
class insert_nl(EditCommand):
@@ -437,7 +446,7 @@ class help(Command):
import _sitebuiltins
with self.reader.suspend():
- self.reader.msg = _sitebuiltins._Helper()() # type: ignore[assignment, call-arg]
+ self.reader.msg = _sitebuiltins._Helper()() # type: ignore[assignment]
class invalid_key(Command):
@@ -471,19 +480,23 @@ class show_history(Command):
class paste_mode(Command):
-
def do(self) -> None:
self.reader.paste_mode = not self.reader.paste_mode
self.reader.dirty = True
-class enable_bracketed_paste(Command):
- def do(self) -> None:
- self.reader.paste_mode = True
- self.reader.in_bracketed_paste = True
-
-class disable_bracketed_paste(Command):
- def do(self) -> None:
- self.reader.paste_mode = False
- self.reader.in_bracketed_paste = False
- self.reader.dirty = True
+class perform_bracketed_paste(Command):
+ def do(self) -> None:
+ done = "\x1b[201~"
+ data = ""
+ start = time.time()
+ while done not in data:
+ ev = self.reader.console.getpending()
+ data += ev.data
+ trace(
+ "bracketed pasting of {l} chars done in {s:.2f}s",
+ l=len(data),
+ s=time.time() - start,
+ )
+ self.reader.insert(data.replace(done, ""))
+ self.reader.last_refresh_cache.invalidated = True
diff --git a/Lib/_pyrepl/main.py b/Lib/_pyrepl/main.py
index a6f824dcc4a..447eb1e551e 100644
--- a/Lib/_pyrepl/main.py
+++ b/Lib/_pyrepl/main.py
@@ -1,6 +1,7 @@
import errno
import os
import sys
+import types
CAN_USE_PYREPL: bool
@@ -29,12 +30,10 @@ def interactive_console(mainmodule=None, quiet=False, pythonstartup=False):
print(FAIL_REASON, file=sys.stderr)
return sys._baserepl()
- if mainmodule:
- namespace = mainmodule.__dict__
- else:
- import __main__
- namespace = __main__.__dict__
- namespace.pop("__pyrepl_interactive_console", None)
+ if not mainmodule:
+ mainmodule = types.ModuleType("__main__")
+
+ namespace = mainmodule.__dict__
# sys._baserepl() above does this internally, we do it here
startup_path = os.getenv("PYTHONSTARTUP")
diff --git a/Lib/_pyrepl/mypy.ini b/Lib/_pyrepl/mypy.ini
index eabd0e9b440..9375a55b53c 100644
--- a/Lib/_pyrepl/mypy.ini
+++ b/Lib/_pyrepl/mypy.ini
@@ -23,7 +23,3 @@ check_untyped_defs = False
# Various internal modules that typeshed deliberately doesn't have stubs for:
[mypy-_abc.*,_opcode.*,_overlapped.*,_testcapi.*,_testinternalcapi.*,test.*]
ignore_missing_imports = True
-
-# Other untyped parts of the stdlib
-[mypy-idlelib.*]
-ignore_missing_imports = True
diff --git a/Lib/_pyrepl/reader.py b/Lib/_pyrepl/reader.py
index 7fc2422dac9..0ebd9162eca 100644
--- a/Lib/_pyrepl/reader.py
+++ b/Lib/_pyrepl/reader.py
@@ -22,14 +22,13 @@
from __future__ import annotations
import sys
+import _colorize
from contextlib import contextmanager
from dataclasses import dataclass, field, fields
-from _colorize import can_colorize, ANSIColors
-
from . import commands, console, input
-from .utils import wlen, unbracket, disp_str
+from .utils import wlen, unbracket, disp_str, gen_colors, THEME
from .trace import trace
@@ -38,8 +37,7 @@ Command = commands.Command
from .types import Callback, SimpleContextManager, KeySpec, CommandName
-# syntax classes:
-
+# syntax classes
SYNTAX_WHITESPACE, SYNTAX_WORD, SYNTAX_SYMBOL = range(3)
@@ -105,8 +103,7 @@ default_keymap: tuple[tuple[KeySpec, CommandName], ...] = tuple(
(r"\M-9", "digit-arg"),
(r"\M-\n", "accept"),
("\\\\", "self-insert"),
- (r"\x1b[200~", "enable_bracketed_paste"),
- (r"\x1b[201~", "disable_bracketed_paste"),
+ (r"\x1b[200~", "perform-bracketed-paste"),
(r"\x03", "ctrl-c"),
]
+ [(c, "self-insert") for c in map(chr, range(32, 127)) if c != "\\"]
@@ -144,16 +141,17 @@ class Reader:
Instance variables of note include:
* buffer:
- A *list* (*not* a string at the moment :-) containing all the
- characters that have been entered.
+ A per-character list containing all the characters that have been
+ entered. Does not include color information.
* console:
Hopefully encapsulates the OS dependent stuff.
* pos:
A 0-based index into 'buffer' for where the insertion point
is.
* screeninfo:
- Ahem. This list contains some info needed to move the
- insertion point around reasonably efficiently.
+ A list of screen position tuples. Each list element is a tuple
+ representing information on visible line length for a given line.
+ Allows for efficient skipping of color escape sequences.
* cxy, lxy:
the position of the insertion point in screen ...
* syntax_table:
@@ -203,7 +201,6 @@ class Reader:
dirty: bool = False
finished: bool = False
paste_mode: bool = False
- in_bracketed_paste: bool = False
commands: dict[str, type[Command]] = field(default_factory=make_default_commands)
last_command: type[Command] | None = None
syntax_table: dict[str, int] = field(default_factory=make_default_syntax_table)
@@ -221,7 +218,6 @@ class Reader:
## cached metadata to speed up screen refreshes
@dataclass
class RefreshCache:
- in_bracketed_paste: bool = False
screen: list[str] = field(default_factory=list)
screeninfo: list[tuple[int, list[int]]] = field(init=False)
line_end_offsets: list[int] = field(default_factory=list)
@@ -235,7 +231,6 @@ class Reader:
screen: list[str],
screeninfo: list[tuple[int, list[int]]],
) -> None:
- self.in_bracketed_paste = reader.in_bracketed_paste
self.screen = screen.copy()
self.screeninfo = screeninfo.copy()
self.pos = reader.pos
@@ -248,8 +243,7 @@ class Reader:
return False
dimensions = reader.console.width, reader.console.height
dimensions_changed = dimensions != self.dimensions
- paste_changed = reader.in_bracketed_paste != self.in_bracketed_paste
- return not (dimensions_changed or paste_changed)
+ return not dimensions_changed
def get_cached_location(self, reader: Reader) -> tuple[int, int]:
if self.invalidated:
@@ -279,7 +273,7 @@ class Reader:
self.screeninfo = [(0, [])]
self.cxy = self.pos2xy()
self.lxy = (self.pos, 0)
- self.can_colorize = can_colorize()
+ self.can_colorize = _colorize.can_colorize()
self.last_refresh_cache.screeninfo = self.screeninfo
self.last_refresh_cache.pos = self.pos
@@ -316,6 +310,12 @@ class Reader:
pos -= offset
prompt_from_cache = (offset and self.buffer[offset - 1] != "\n")
+
+ if self.can_colorize:
+ colors = list(gen_colors(self.get_unicode()))
+ else:
+ colors = None
+ trace("colors = {colors}", colors=colors)
lines = "".join(self.buffer[offset:]).split("\n")
cursor_found = False
lines_beyond_cursor = 0
@@ -343,9 +343,8 @@ class Reader:
screeninfo.append((0, []))
pos -= line_len + 1
prompt, prompt_len = self.process_prompt(prompt)
- chars, char_widths = disp_str(line)
+ chars, char_widths = disp_str(line, colors, offset)
wrapcount = (sum(char_widths) + prompt_len) // self.console.width
- trace("wrapcount = {wrapcount}", wrapcount=wrapcount)
if wrapcount == 0 or not char_widths:
offset += line_len + 1 # Takes all of the line plus the newline
last_refresh_line_end_offsets.append(offset)
@@ -479,7 +478,7 @@ class Reader:
'lineno'."""
if self.arg is not None and cursor_on_line:
prompt = f"(arg: {self.arg}) "
- elif self.paste_mode and not self.in_bracketed_paste:
+ elif self.paste_mode:
prompt = "(paste) "
elif "\n" in self.buffer:
if lineno == 0:
@@ -492,7 +491,8 @@ class Reader:
prompt = self.ps1
if self.can_colorize:
- prompt = f"{ANSIColors.BOLD_MAGENTA}{prompt}{ANSIColors.RESET}"
+ t = THEME()
+ prompt = f"{t.prompt}{prompt}{t.reset}"
return prompt
def push_input_trans(self, itrans: input.KeymapTranslator) -> None:
@@ -567,6 +567,7 @@ class Reader:
def update_cursor(self) -> None:
"""Move the cursor to reflect changes in self.pos"""
self.cxy = self.pos2xy()
+ trace("update_cursor({pos}) = {cxy}", pos=self.pos, cxy=self.cxy)
self.console.move_cursor(*self.cxy)
def after_command(self, cmd: Command) -> None:
@@ -633,9 +634,6 @@ class Reader:
def refresh(self) -> None:
"""Recalculate and refresh the screen."""
- if self.in_bracketed_paste and self.buffer and not self.buffer[-1] == "\n":
- return
-
# this call sets up self.cxy, so call it first.
self.screen = self.calc_screen()
self.console.refresh(self.screen, self.cxy)
diff --git a/Lib/_pyrepl/readline.py b/Lib/_pyrepl/readline.py
index 9d58829faf1..9560ae779ab 100644
--- a/Lib/_pyrepl/readline.py
+++ b/Lib/_pyrepl/readline.py
@@ -134,7 +134,8 @@ class ReadlineAlikeReader(historical_reader.HistoricalReader, CompletingReader):
return "".join(b[p + 1 : self.pos])
def get_completions(self, stem: str) -> list[str]:
- if module_completions := self.get_module_completions():
+ module_completions = self.get_module_completions()
+ if module_completions is not None:
return module_completions
if len(stem) == 0 and self.more_lines is not None:
b = self.buffer
@@ -165,7 +166,7 @@ class ReadlineAlikeReader(historical_reader.HistoricalReader, CompletingReader):
result.sort()
return result
- def get_module_completions(self) -> list[str]:
+ def get_module_completions(self) -> list[str] | None:
line = self.get_line()
return self.config.module_completer.get_completions(line)
@@ -276,10 +277,6 @@ class maybe_accept(commands.Command):
r = self.reader # type: ignore[assignment]
r.dirty = True # this is needed to hide the completion menu, if visible
- if self.reader.in_bracketed_paste:
- r.insert("\n")
- return
-
# if there are already several lines and the cursor
# is not on the last one, always insert a new \n.
text = r.get_unicode()
@@ -610,6 +607,7 @@ def _setup(namespace: Mapping[str, Any]) -> None:
# set up namespace in rlcompleter, which requires it to be a bona fide dict
if not isinstance(namespace, dict):
namespace = dict(namespace)
+ _wrapper.config.module_completer = ModuleCompleter(namespace)
_wrapper.config.readline_completer = RLCompleter(namespace).complete
# this is not really what readline.c does. Better than nothing I guess
diff --git a/Lib/_pyrepl/simple_interact.py b/Lib/_pyrepl/simple_interact.py
index 4c74466118b..965b853c34b 100644
--- a/Lib/_pyrepl/simple_interact.py
+++ b/Lib/_pyrepl/simple_interact.py
@@ -31,6 +31,7 @@ import os
import sys
import code
import warnings
+import errno
from .readline import _get_reader, multiline_input, append_history_file
@@ -110,6 +111,10 @@ def run_multiline_interactive_console(
more_lines = functools.partial(_more_lines, console)
input_n = 0
+ _is_x_showrefcount_set = sys._xoptions.get("showrefcount")
+ _is_pydebug_build = hasattr(sys, "gettotalrefcount")
+ show_ref_count = _is_x_showrefcount_set and _is_pydebug_build
+
def maybe_run_command(statement: str) -> bool:
statement = statement.strip()
if statement in console.locals or statement not in REPL_COMMANDS:
@@ -149,6 +154,7 @@ def run_multiline_interactive_console(
append_history_file()
except (FileNotFoundError, PermissionError, OSError) as e:
warnings.warn(f"failed to open the history file for writing: {e}")
+
input_n += 1
except KeyboardInterrupt:
r = _get_reader()
@@ -157,9 +163,18 @@ def run_multiline_interactive_console(
r.pos = len(r.get_unicode())
r.dirty = True
r.refresh()
- r.in_bracketed_paste = False
console.write("\nKeyboardInterrupt\n")
console.resetbuffer()
except MemoryError:
console.write("\nMemoryError\n")
console.resetbuffer()
+ except SystemExit:
+ raise
+ except:
+ console.showtraceback()
+ console.resetbuffer()
+ if show_ref_count:
+ console.write(
+ f"[{sys.gettotalrefcount()} refs,"
+ f" {sys.getallocatedblocks()} blocks]\n"
+ )
diff --git a/Lib/_pyrepl/unix_console.py b/Lib/_pyrepl/unix_console.py
index 96379bc20f3..d21cdd9b076 100644
--- a/Lib/_pyrepl/unix_console.py
+++ b/Lib/_pyrepl/unix_console.py
@@ -29,6 +29,7 @@ import signal
import struct
import termios
import time
+import types
import platform
from fcntl import ioctl
@@ -39,6 +40,12 @@ from .trace import trace
from .unix_eventqueue import EventQueue
from .utils import wlen
+# declare posix optional to allow None assignment on other platforms
+posix: types.ModuleType | None
+try:
+ import posix
+except ImportError:
+ posix = None
TYPE_CHECKING = False
@@ -150,8 +157,6 @@ class UnixConsole(Console):
self.pollob = poll()
self.pollob.register(self.input_fd, select.POLLIN)
- self.input_buffer = b""
- self.input_buffer_pos = 0
curses.setupterm(term or None, self.output_fd)
self.term = term
@@ -199,22 +204,14 @@ class UnixConsole(Console):
self.event_queue = EventQueue(self.input_fd, self.encoding)
self.cursor_visible = 1
- def more_in_buffer(self) -> bool:
- return bool(
- self.input_buffer
- and self.input_buffer_pos < len(self.input_buffer)
- )
+ signal.signal(signal.SIGCONT, self._sigcont_handler)
- def __read(self, n: int) -> bytes:
- if not self.more_in_buffer():
- self.input_buffer = os.read(self.input_fd, 10000)
+ def _sigcont_handler(self, signum, frame):
+ self.restore()
+ self.prepare()
- ret = self.input_buffer[self.input_buffer_pos : self.input_buffer_pos + n]
- self.input_buffer_pos += len(ret)
- if self.input_buffer_pos >= len(self.input_buffer):
- self.input_buffer = b""
- self.input_buffer_pos = 0
- return ret
+ def __read(self, n: int) -> bytes:
+ return os.read(self.input_fd, n)
def change_encoding(self, encoding: str) -> None:
@@ -422,7 +419,6 @@ class UnixConsole(Console):
"""
return (
not self.event_queue.empty()
- or self.more_in_buffer()
or bool(self.pollob.poll(timeout))
)
@@ -525,6 +521,7 @@ class UnixConsole(Console):
e.raw += e.raw
amount = struct.unpack("i", ioctl(self.input_fd, FIONREAD, b"\0\0\0\0"))[0]
+ trace("getpending({a})", a=amount)
raw = self.__read(amount)
data = str(raw, self.encoding, "replace")
e.data += data
@@ -566,11 +563,9 @@ class UnixConsole(Console):
@property
def input_hook(self):
- try:
- import posix
- except ImportError:
- return None
- if posix._is_inputhook_installed():
+ # avoid inline imports here so the repl doesn't get flooded
+ # with import logging from -X importtime=2
+ if posix is not None and posix._is_inputhook_installed():
return posix._inputhook
def __enable_bracketed_paste(self) -> None:
diff --git a/Lib/_pyrepl/utils.py b/Lib/_pyrepl/utils.py
index 7437fbe1ab9..e04fbdc6c8a 100644
--- a/Lib/_pyrepl/utils.py
+++ b/Lib/_pyrepl/utils.py
@@ -1,6 +1,17 @@
+from __future__ import annotations
+import builtins
+import functools
+import keyword
import re
+import token as T
+import tokenize
import unicodedata
-import functools
+import _colorize
+
+from collections import deque
+from io import StringIO
+from tokenize import TokenInfo as TI
+from typing import Iterable, Iterator, Match, NamedTuple, Self
from .types import CharBuffer, CharWidths
from .trace import trace
@@ -8,6 +19,43 @@ from .trace import trace
ANSI_ESCAPE_SEQUENCE = re.compile(r"\x1b\[[ -@]*[A-~]")
ZERO_WIDTH_BRACKET = re.compile(r"\x01.*?\x02")
ZERO_WIDTH_TRANS = str.maketrans({"\x01": "", "\x02": ""})
+IDENTIFIERS_AFTER = {"def", "class"}
+BUILTINS = {str(name) for name in dir(builtins) if not name.startswith('_')}
+
+
+def THEME(**kwargs):
+ # Not cached: the user can modify the theme inside the interactive session.
+ return _colorize.get_theme(**kwargs).syntax
+
+
+class Span(NamedTuple):
+ """Span indexing that's inclusive on both ends."""
+
+ start: int
+ end: int
+
+ @classmethod
+ def from_re(cls, m: Match[str], group: int | str) -> Self:
+ re_span = m.span(group)
+ return cls(re_span[0], re_span[1] - 1)
+
+ @classmethod
+ def from_token(cls, token: TI, line_len: list[int]) -> Self:
+ end_offset = -1
+ if (token.type in {T.FSTRING_MIDDLE, T.TSTRING_MIDDLE}
+ and token.string.endswith(("{", "}"))):
+ # gh-134158: a visible trailing brace comes from a double brace in input
+ end_offset += 1
+
+ return cls(
+ line_len[token.start[0] - 1] + token.start[1],
+ line_len[token.end[0] - 1] + token.end[1] + end_offset,
+ )
+
+
+class ColorSpan(NamedTuple):
+ span: Span
+ tag: str
@functools.cache
@@ -41,17 +89,212 @@ def unbracket(s: str, including_content: bool = False) -> str:
return s.translate(ZERO_WIDTH_TRANS)
-def disp_str(buffer: str) -> tuple[CharBuffer, CharWidths]:
- r"""Decompose the input buffer into a printable variant.
+def gen_colors(buffer: str) -> Iterator[ColorSpan]:
+ """Returns a list of index spans to color using the given color tag.
+
+ The input `buffer` should be a valid start of a Python code block, i.e.
+ it cannot be a block starting in the middle of a multiline string.
+ """
+ sio = StringIO(buffer)
+ line_lengths = [0] + [len(line) for line in sio.readlines()]
+ # make line_lengths cumulative
+ for i in range(1, len(line_lengths)):
+ line_lengths[i] += line_lengths[i-1]
+
+ sio.seek(0)
+ gen = tokenize.generate_tokens(sio.readline)
+ last_emitted: ColorSpan | None = None
+ try:
+ for color in gen_colors_from_token_stream(gen, line_lengths):
+ yield color
+ last_emitted = color
+ except SyntaxError:
+ return
+ except tokenize.TokenError as te:
+ yield from recover_unterminated_string(
+ te, line_lengths, last_emitted, buffer
+ )
+
+
+def recover_unterminated_string(
+ exc: tokenize.TokenError,
+ line_lengths: list[int],
+ last_emitted: ColorSpan | None,
+ buffer: str,
+) -> Iterator[ColorSpan]:
+ msg, loc = exc.args
+ if loc is None:
+ return
+
+ line_no, column = loc
+
+ if msg.startswith(
+ (
+ "unterminated string literal",
+ "unterminated f-string literal",
+ "unterminated t-string literal",
+ "EOF in multi-line string",
+ "unterminated triple-quoted f-string literal",
+ "unterminated triple-quoted t-string literal",
+ )
+ ):
+ start = line_lengths[line_no - 1] + column - 1
+ end = line_lengths[-1] - 1
+
+ # in case FSTRING_START was already emitted
+ if last_emitted and start <= last_emitted.span.start:
+ trace("before last emitted = {s}", s=start)
+ start = last_emitted.span.end + 1
+
+ span = Span(start, end)
+ trace("yielding span {a} -> {b}", a=span.start, b=span.end)
+ yield ColorSpan(span, "string")
+ else:
+ trace(
+ "unhandled token error({buffer}) = {te}",
+ buffer=repr(buffer),
+ te=str(exc),
+ )
+
+
+def gen_colors_from_token_stream(
+ token_generator: Iterator[TI],
+ line_lengths: list[int],
+) -> Iterator[ColorSpan]:
+ token_window = prev_next_window(token_generator)
+
+ is_def_name = False
+ bracket_level = 0
+ for prev_token, token, next_token in token_window:
+ assert token is not None
+ if token.start == token.end:
+ continue
+
+ match token.type:
+ case (
+ T.STRING
+ | T.FSTRING_START | T.FSTRING_MIDDLE | T.FSTRING_END
+ | T.TSTRING_START | T.TSTRING_MIDDLE | T.TSTRING_END
+ ):
+ span = Span.from_token(token, line_lengths)
+ yield ColorSpan(span, "string")
+ case T.COMMENT:
+ span = Span.from_token(token, line_lengths)
+ yield ColorSpan(span, "comment")
+ case T.NUMBER:
+ span = Span.from_token(token, line_lengths)
+ yield ColorSpan(span, "number")
+ case T.OP:
+ if token.string in "([{":
+ bracket_level += 1
+ elif token.string in ")]}":
+ bracket_level -= 1
+ span = Span.from_token(token, line_lengths)
+ yield ColorSpan(span, "op")
+ case T.NAME:
+ if is_def_name:
+ is_def_name = False
+ span = Span.from_token(token, line_lengths)
+ yield ColorSpan(span, "definition")
+ elif keyword.iskeyword(token.string):
+ span = Span.from_token(token, line_lengths)
+ yield ColorSpan(span, "keyword")
+ if token.string in IDENTIFIERS_AFTER:
+ is_def_name = True
+ elif (
+ keyword.issoftkeyword(token.string)
+ and bracket_level == 0
+ and is_soft_keyword_used(prev_token, token, next_token)
+ ):
+ span = Span.from_token(token, line_lengths)
+ yield ColorSpan(span, "soft_keyword")
+ elif token.string in BUILTINS:
+ span = Span.from_token(token, line_lengths)
+ yield ColorSpan(span, "builtin")
+
+
+keyword_first_sets_match = {"False", "None", "True", "await", "lambda", "not"}
+keyword_first_sets_case = {"False", "None", "True"}
+
+
+def is_soft_keyword_used(*tokens: TI | None) -> bool:
+ """Returns True if the current token is a keyword in this context.
+
+ For the `*tokens` to match anything, they have to be a three-tuple of
+ (previous, current, next).
+ """
+ trace("is_soft_keyword_used{t}", t=tokens)
+ match tokens:
+ case (
+ None | TI(T.NEWLINE) | TI(T.INDENT) | TI(string=":"),
+ TI(string="match"),
+ TI(T.NUMBER | T.STRING | T.FSTRING_START | T.TSTRING_START)
+ | TI(T.OP, string="(" | "*" | "[" | "{" | "~" | "...")
+ ):
+ return True
+ case (
+ None | TI(T.NEWLINE) | TI(T.INDENT) | TI(string=":"),
+ TI(string="match"),
+ TI(T.NAME, string=s)
+ ):
+ if keyword.iskeyword(s):
+ return s in keyword_first_sets_match
+ return True
+ case (
+ None | TI(T.NEWLINE) | TI(T.INDENT) | TI(string=":"),
+ TI(string="case"),
+ TI(T.NUMBER | T.STRING | T.FSTRING_START | T.TSTRING_START)
+ | TI(T.OP, string="(" | "*" | "-" | "[" | "{")
+ ):
+ return True
+ case (
+ None | TI(T.NEWLINE) | TI(T.INDENT) | TI(string=":"),
+ TI(string="case"),
+ TI(T.NAME, string=s)
+ ):
+ if keyword.iskeyword(s):
+ return s in keyword_first_sets_case
+ return True
+ case (TI(string="case"), TI(string="_"), TI(string=":")):
+ return True
+ case _:
+ return False
+
+
+def disp_str(
+ buffer: str,
+ colors: list[ColorSpan] | None = None,
+ start_index: int = 0,
+ force_color: bool = False,
+) -> tuple[CharBuffer, CharWidths]:
+ r"""Decompose the input buffer into a printable variant with applied colors.
Returns a tuple of two lists:
- - the first list is the input buffer, character by character;
+ - the first list is the input buffer, character by character, with color
+ escape codes added (while those codes contain multiple ASCII characters,
+ each code is considered atomic *and is attached for the corresponding
+ visible character*);
- the second list is the visible width of each character in the input
buffer.
+ Note on colors:
+ - The `colors` list, if provided, is partially consumed within. We're using
+ a list and not a generator since we need to hold onto the current
+ unfinished span between calls to disp_str in case of multiline strings.
+ - The `colors` list is computed from the start of the input block. `buffer`
+ is only a subset of that input block, a single line within. This is why
+ we need `start_index` to inform us which position is the start of `buffer`
+ actually within user input. This allows us to match color spans correctly.
+
Examples:
>>> utils.disp_str("a = 9")
(['a', ' ', '=', ' ', '9'], [1, 1, 1, 1, 1])
+
+ >>> line = "while 1:"
+ >>> colors = list(utils.gen_colors(line))
+ >>> utils.disp_str(line, colors=colors)
+ (['\x1b[1;34mw', 'h', 'i', 'l', 'e\x1b[0m', ' ', '1', ':'], [1, 1, 1, 1, 1, 1, 1, 1])
+
"""
chars: CharBuffer = []
char_widths: CharWidths = []
@@ -59,7 +302,21 @@ def disp_str(buffer: str) -> tuple[CharBuffer, CharWidths]:
if not buffer:
return chars, char_widths
- for c in buffer:
+ while colors and colors[0].span.end < start_index:
+ # move past irrelevant spans
+ colors.pop(0)
+
+ theme = THEME(force_color=force_color)
+ pre_color = ""
+ post_color = ""
+ if colors and colors[0].span.start < start_index:
+ # looks like we're continuing a previous color (e.g. a multiline str)
+ pre_color = theme[colors[0].tag]
+
+ for i, c in enumerate(buffer, start_index):
+ if colors and colors[0].span.start == i: # new color starts now
+ pre_color = theme[colors[0].tag]
+
if c == "\x1a": # CTRL-Z on Windows
chars.append(c)
char_widths.append(2)
@@ -73,5 +330,43 @@ def disp_str(buffer: str) -> tuple[CharBuffer, CharWidths]:
else:
chars.append(c)
char_widths.append(str_width(c))
- trace("disp_str({buffer}) = {s}, {b}", buffer=repr(buffer), s=chars, b=char_widths)
+
+ if colors and colors[0].span.end == i: # current color ends now
+ post_color = theme.reset
+ colors.pop(0)
+
+ chars[-1] = pre_color + chars[-1] + post_color
+ pre_color = ""
+ post_color = ""
+
+ if colors and colors[0].span.start < i and colors[0].span.end > i:
+ # even though the current color should be continued, reset it for now.
+ # the next call to `disp_str()` will revive it.
+ chars[-1] += theme.reset
+
return chars, char_widths
+
+
+def prev_next_window[T](
+ iterable: Iterable[T]
+) -> Iterator[tuple[T | None, ...]]:
+ """Generates three-tuples of (previous, current, next) items.
+
+ On the first iteration previous is None. On the last iteration next
+ is None. In case of exception next is None and the exception is re-raised
+ on a subsequent next() call.
+
+ Inspired by `sliding_window` from `itertools` recipes.
+ """
+
+ iterator = iter(iterable)
+ window = deque((None, next(iterator)), maxlen=3)
+ try:
+ for x in iterator:
+ window.append(x)
+ yield tuple(window)
+ except Exception:
+ raise
+ finally:
+ window.append(None)
+ yield tuple(window)
diff --git a/Lib/_pyrepl/windows_console.py b/Lib/_pyrepl/windows_console.py
index 17942c8df07..c56dcd6d7dd 100644
--- a/Lib/_pyrepl/windows_console.py
+++ b/Lib/_pyrepl/windows_console.py
@@ -24,6 +24,7 @@ import os
import sys
import ctypes
+import types
from ctypes.wintypes import (
_COORD,
WORD,
@@ -58,6 +59,12 @@ except:
self.err = err
self.descr = descr
+# declare nt optional to allow None assignment on other platforms
+nt: types.ModuleType | None
+try:
+ import nt
+except ImportError:
+ nt = None
TYPE_CHECKING = False
@@ -121,9 +128,8 @@ class _error(Exception):
def _supports_vt():
try:
- import nt
return nt._supports_virtual_terminal()
- except (ImportError, AttributeError):
+ except AttributeError:
return False
class WindowsConsole(Console):
@@ -235,11 +241,9 @@ class WindowsConsole(Console):
@property
def input_hook(self):
- try:
- import nt
- except ImportError:
- return None
- if nt._is_inputhook_installed():
+ # avoid inline imports here so the repl doesn't get flooded
+ # with import logging from -X importtime=2
+ if nt is not None and nt._is_inputhook_installed():
return nt._inputhook
def __write_changed_line(
@@ -415,10 +419,7 @@ class WindowsConsole(Console):
return info.srWindow.Bottom # type: ignore[no-any-return]
- def _read_input(self, block: bool = True) -> INPUT_RECORD | None:
- if not block and not self.wait(timeout=0):
- return None
-
+ def _read_input(self) -> INPUT_RECORD | None:
rec = INPUT_RECORD()
read = DWORD()
if not ReadConsoleInput(InHandle, rec, 1, read):
@@ -426,13 +427,26 @@ class WindowsConsole(Console):
return rec
+ def _read_input_bulk(
+ self, n: int
+ ) -> tuple[ctypes.Array[INPUT_RECORD], int]:
+ rec = (n * INPUT_RECORD)()
+ read = DWORD()
+ if not ReadConsoleInput(InHandle, rec, n, read):
+ raise WinError(GetLastError())
+
+ return rec, read.value
+
def get_event(self, block: bool = True) -> Event | None:
"""Return an Event instance. Returns None if |block| is false
and there is no event pending, otherwise waits for the
completion of an event."""
+ if not block and not self.wait(timeout=0):
+ return None
+
while self.event_queue.empty():
- rec = self._read_input(block)
+ rec = self._read_input()
if rec is None:
return None
@@ -450,7 +464,7 @@ class WindowsConsole(Console):
if key == "\r":
# Make enter unix-like
- return Event(evt="key", data="\n", raw=b"\n")
+ return Event(evt="key", data="\n")
elif key_event.wVirtualKeyCode == 8:
# Turn backspace directly into the command
key = "backspace"
@@ -462,24 +476,29 @@ class WindowsConsole(Console):
key = f"ctrl {key}"
elif key_event.dwControlKeyState & ALT_ACTIVE:
# queue the key, return the meta command
- self.event_queue.insert(Event(evt="key", data=key, raw=key))
+ self.event_queue.insert(Event(evt="key", data=key))
return Event(evt="key", data="\033") # keymap.py uses this for meta
- return Event(evt="key", data=key, raw=key)
+ return Event(evt="key", data=key)
if block:
continue
return None
elif self.__vt_support:
# If virtual terminal is enabled, scanning VT sequences
- self.event_queue.push(rec.Event.KeyEvent.uChar.UnicodeChar)
+ for char in raw_key.encode(self.event_queue.encoding, "replace"):
+ self.event_queue.push(char)
continue
if key_event.dwControlKeyState & ALT_ACTIVE:
- # queue the key, return the meta command
- self.event_queue.insert(Event(evt="key", data=key, raw=raw_key))
- return Event(evt="key", data="\033") # keymap.py uses this for meta
-
- return Event(evt="key", data=key, raw=raw_key)
+ # Do not swallow characters that have been entered via AltGr:
+ # Windows internally converts AltGr to CTRL+ALT, see
+ # https://learn.microsoft.com/en-us/windows/win32/api/winuser/nf-winuser-vkkeyscanw
+ if not key_event.dwControlKeyState & CTRL_ACTIVE:
+ # queue the key, return the meta command
+ self.event_queue.insert(Event(evt="key", data=key))
+ return Event(evt="key", data="\033") # keymap.py uses this for meta
+
+ return Event(evt="key", data=key)
return self.event_queue.get()
def push_char(self, char: int | bytes) -> None:
@@ -521,7 +540,31 @@ class WindowsConsole(Console):
def getpending(self) -> Event:
"""Return the characters that have been typed but not yet
processed."""
- return Event("key", "", b"")
+ e = Event("key", "", b"")
+
+ while not self.event_queue.empty():
+ e2 = self.event_queue.get()
+ if e2:
+ e.data += e2.data
+
+ recs, rec_count = self._read_input_bulk(1024)
+ for i in range(rec_count):
+ rec = recs[i]
+ # In case of a legacy console, we do not only receive a keydown
+ # event, but also a keyup event - and for uppercase letters
+ # an additional SHIFT_PRESSED event.
+ if rec and rec.EventType == KEY_EVENT:
+ key_event = rec.Event.KeyEvent
+ if not key_event.bKeyDown:
+ continue
+ ch = key_event.uChar.UnicodeChar
+ if ch == "\x00":
+ # ignore SHIFT_PRESSED and special keys
+ continue
+ if ch == "\r":
+ ch += "\n"
+ e.data += ch
+ return e
def wait(self, timeout: float | None) -> bool:
"""Wait for an event."""