diff options
Diffstat (limited to 'Lib/_pyrepl')
-rw-r--r-- | Lib/_pyrepl/_module_completer.py | 33 | ||||
-rw-r--r-- | Lib/_pyrepl/base_eventqueue.py | 18 | ||||
-rw-r--r-- | Lib/_pyrepl/commands.py | 37 | ||||
-rw-r--r-- | Lib/_pyrepl/main.py | 11 | ||||
-rw-r--r-- | Lib/_pyrepl/mypy.ini | 4 | ||||
-rw-r--r-- | Lib/_pyrepl/reader.py | 46 | ||||
-rw-r--r-- | Lib/_pyrepl/readline.py | 10 | ||||
-rw-r--r-- | Lib/_pyrepl/simple_interact.py | 17 | ||||
-rw-r--r-- | Lib/_pyrepl/unix_console.py | 39 | ||||
-rw-r--r-- | Lib/_pyrepl/utils.py | 307 | ||||
-rw-r--r-- | Lib/_pyrepl/windows_console.py | 87 |
11 files changed, 484 insertions, 125 deletions
diff --git a/Lib/_pyrepl/_module_completer.py b/Lib/_pyrepl/_module_completer.py index 1fb043e0b70..1e9462a4215 100644 --- a/Lib/_pyrepl/_module_completer.py +++ b/Lib/_pyrepl/_module_completer.py @@ -2,6 +2,7 @@ from __future__ import annotations import pkgutil import sys +import token import tokenize from io import StringIO from contextlib import contextmanager @@ -16,8 +17,8 @@ if TYPE_CHECKING: def make_default_module_completer() -> ModuleCompleter: - # Inside pyrepl, __package__ is set to '_pyrepl' - return ModuleCompleter(namespace={'__package__': '_pyrepl'}) + # Inside pyrepl, __package__ is set to None by default + return ModuleCompleter(namespace={'__package__': None}) class ModuleCompleter: @@ -41,11 +42,11 @@ class ModuleCompleter: self._global_cache: list[pkgutil.ModuleInfo] = [] self._curr_sys_path: list[str] = sys.path[:] - def get_completions(self, line: str) -> list[str]: + def get_completions(self, line: str) -> list[str] | None: """Return the next possible import completions for 'line'.""" result = ImportParser(line).parse() if not result: - return [] + return None try: return self.complete(*result) except Exception: @@ -80,8 +81,11 @@ class ModuleCompleter: def _find_modules(self, path: str, prefix: str) -> list[str]: if not path: # Top-level import (e.g. `import foo<tab>`` or `from foo<tab>`)` - return [name for _, name, _ in self.global_cache - if name.startswith(prefix)] + builtin_modules = [name for name in sys.builtin_module_names + if self.is_suggestion_match(name, prefix)] + third_party_modules = [module.name for module in self.global_cache + if self.is_suggestion_match(module.name, prefix)] + return sorted(builtin_modules + third_party_modules) if path.startswith('.'): # Convert relative path to absolute path @@ -96,7 +100,14 @@ class ModuleCompleter: if mod_info.ispkg and mod_info.name == segment] modules = self.iter_submodules(modules) return [module.name for module in modules - if module.name.startswith(prefix)] + if self.is_suggestion_match(module.name, prefix)] + + def is_suggestion_match(self, module_name: str, prefix: str) -> bool: + if prefix: + return module_name.startswith(prefix) + # For consistency with attribute completion, which + # does not suggest private attributes unless requested. + return not module_name.startswith("_") def iter_submodules(self, parent_modules: list[pkgutil.ModuleInfo]) -> Iterator[pkgutil.ModuleInfo]: """Iterate over all submodules of the given parent modules.""" @@ -180,8 +191,8 @@ class ImportParser: when parsing multiple statements. """ _ignored_tokens = { - tokenize.INDENT, tokenize.DEDENT, tokenize.COMMENT, - tokenize.NL, tokenize.NEWLINE, tokenize.ENDMARKER + token.INDENT, token.DEDENT, token.COMMENT, + token.NL, token.NEWLINE, token.ENDMARKER } _keywords = {'import', 'from', 'as'} @@ -350,11 +361,11 @@ class TokenQueue: def peek_name(self) -> bool: if not (tok := self.peek()): return False - return tok.type == tokenize.NAME + return tok.type == token.NAME def pop_name(self) -> str: tok = self.pop() - if tok.type != tokenize.NAME: + if tok.type != token.NAME: raise ParseError('pop_name') return tok.string diff --git a/Lib/_pyrepl/base_eventqueue.py b/Lib/_pyrepl/base_eventqueue.py index e018c4fc183..0589a0f437e 100644 --- a/Lib/_pyrepl/base_eventqueue.py +++ b/Lib/_pyrepl/base_eventqueue.py @@ -69,18 +69,14 @@ class BaseEventQueue: trace('added event {event}', event=event) self.events.append(event) - def push(self, char: int | bytes | str) -> None: + def push(self, char: int | bytes) -> None: """ Processes a character by updating the buffer and handling special key mappings. """ + assert isinstance(char, (int, bytes)) ord_char = char if isinstance(char, int) else ord(char) - if ord_char > 255: - assert isinstance(char, str) - char = bytes(char.encode(self.encoding, "replace")) - self.buf.extend(char) - else: - char = bytes(bytearray((ord_char,))) - self.buf.append(ord_char) + char = ord_char.to_bytes() + self.buf.append(ord_char) if char in self.keymap: if self.keymap is self.compiled_keymap: @@ -91,7 +87,7 @@ class BaseEventQueue: if isinstance(k, dict): self.keymap = k else: - self.insert(Event('key', k, self.flush_buf())) + self.insert(Event('key', k, bytes(self.flush_buf()))) self.keymap = self.compiled_keymap elif self.buf and self.buf[0] == 27: # escape @@ -100,7 +96,7 @@ class BaseEventQueue: # the docstring in keymap.py trace('unrecognized escape sequence, propagating...') self.keymap = self.compiled_keymap - self.insert(Event('key', '\033', bytearray(b'\033'))) + self.insert(Event('key', '\033', b'\033')) for _c in self.flush_buf()[1:]: self.push(_c) @@ -110,5 +106,5 @@ class BaseEventQueue: except UnicodeError: return else: - self.insert(Event('key', decoded, self.flush_buf())) + self.insert(Event('key', decoded, bytes(self.flush_buf()))) self.keymap = self.compiled_keymap diff --git a/Lib/_pyrepl/commands.py b/Lib/_pyrepl/commands.py index cbb6d85f683..50c824995d8 100644 --- a/Lib/_pyrepl/commands.py +++ b/Lib/_pyrepl/commands.py @@ -21,6 +21,7 @@ from __future__ import annotations import os +import time # Categories of actions: # killing @@ -31,6 +32,7 @@ import os # finishing # [completion] +from .trace import trace # types if False: @@ -368,6 +370,13 @@ class self_insert(EditCommand): r = self.reader text = self.event * r.get_arg() r.insert(text) + if r.paste_mode: + data = "" + ev = r.console.getpending() + data += ev.data + if data: + r.insert(data) + r.last_refresh_cache.invalidated = True class insert_nl(EditCommand): @@ -437,7 +446,7 @@ class help(Command): import _sitebuiltins with self.reader.suspend(): - self.reader.msg = _sitebuiltins._Helper()() # type: ignore[assignment, call-arg] + self.reader.msg = _sitebuiltins._Helper()() # type: ignore[assignment] class invalid_key(Command): @@ -471,19 +480,23 @@ class show_history(Command): class paste_mode(Command): - def do(self) -> None: self.reader.paste_mode = not self.reader.paste_mode self.reader.dirty = True -class enable_bracketed_paste(Command): - def do(self) -> None: - self.reader.paste_mode = True - self.reader.in_bracketed_paste = True - -class disable_bracketed_paste(Command): - def do(self) -> None: - self.reader.paste_mode = False - self.reader.in_bracketed_paste = False - self.reader.dirty = True +class perform_bracketed_paste(Command): + def do(self) -> None: + done = "\x1b[201~" + data = "" + start = time.time() + while done not in data: + ev = self.reader.console.getpending() + data += ev.data + trace( + "bracketed pasting of {l} chars done in {s:.2f}s", + l=len(data), + s=time.time() - start, + ) + self.reader.insert(data.replace(done, "")) + self.reader.last_refresh_cache.invalidated = True diff --git a/Lib/_pyrepl/main.py b/Lib/_pyrepl/main.py index a6f824dcc4a..447eb1e551e 100644 --- a/Lib/_pyrepl/main.py +++ b/Lib/_pyrepl/main.py @@ -1,6 +1,7 @@ import errno import os import sys +import types CAN_USE_PYREPL: bool @@ -29,12 +30,10 @@ def interactive_console(mainmodule=None, quiet=False, pythonstartup=False): print(FAIL_REASON, file=sys.stderr) return sys._baserepl() - if mainmodule: - namespace = mainmodule.__dict__ - else: - import __main__ - namespace = __main__.__dict__ - namespace.pop("__pyrepl_interactive_console", None) + if not mainmodule: + mainmodule = types.ModuleType("__main__") + + namespace = mainmodule.__dict__ # sys._baserepl() above does this internally, we do it here startup_path = os.getenv("PYTHONSTARTUP") diff --git a/Lib/_pyrepl/mypy.ini b/Lib/_pyrepl/mypy.ini index eabd0e9b440..9375a55b53c 100644 --- a/Lib/_pyrepl/mypy.ini +++ b/Lib/_pyrepl/mypy.ini @@ -23,7 +23,3 @@ check_untyped_defs = False # Various internal modules that typeshed deliberately doesn't have stubs for: [mypy-_abc.*,_opcode.*,_overlapped.*,_testcapi.*,_testinternalcapi.*,test.*] ignore_missing_imports = True - -# Other untyped parts of the stdlib -[mypy-idlelib.*] -ignore_missing_imports = True diff --git a/Lib/_pyrepl/reader.py b/Lib/_pyrepl/reader.py index 7fc2422dac9..0ebd9162eca 100644 --- a/Lib/_pyrepl/reader.py +++ b/Lib/_pyrepl/reader.py @@ -22,14 +22,13 @@ from __future__ import annotations import sys +import _colorize from contextlib import contextmanager from dataclasses import dataclass, field, fields -from _colorize import can_colorize, ANSIColors - from . import commands, console, input -from .utils import wlen, unbracket, disp_str +from .utils import wlen, unbracket, disp_str, gen_colors, THEME from .trace import trace @@ -38,8 +37,7 @@ Command = commands.Command from .types import Callback, SimpleContextManager, KeySpec, CommandName -# syntax classes: - +# syntax classes SYNTAX_WHITESPACE, SYNTAX_WORD, SYNTAX_SYMBOL = range(3) @@ -105,8 +103,7 @@ default_keymap: tuple[tuple[KeySpec, CommandName], ...] = tuple( (r"\M-9", "digit-arg"), (r"\M-\n", "accept"), ("\\\\", "self-insert"), - (r"\x1b[200~", "enable_bracketed_paste"), - (r"\x1b[201~", "disable_bracketed_paste"), + (r"\x1b[200~", "perform-bracketed-paste"), (r"\x03", "ctrl-c"), ] + [(c, "self-insert") for c in map(chr, range(32, 127)) if c != "\\"] @@ -144,16 +141,17 @@ class Reader: Instance variables of note include: * buffer: - A *list* (*not* a string at the moment :-) containing all the - characters that have been entered. + A per-character list containing all the characters that have been + entered. Does not include color information. * console: Hopefully encapsulates the OS dependent stuff. * pos: A 0-based index into 'buffer' for where the insertion point is. * screeninfo: - Ahem. This list contains some info needed to move the - insertion point around reasonably efficiently. + A list of screen position tuples. Each list element is a tuple + representing information on visible line length for a given line. + Allows for efficient skipping of color escape sequences. * cxy, lxy: the position of the insertion point in screen ... * syntax_table: @@ -203,7 +201,6 @@ class Reader: dirty: bool = False finished: bool = False paste_mode: bool = False - in_bracketed_paste: bool = False commands: dict[str, type[Command]] = field(default_factory=make_default_commands) last_command: type[Command] | None = None syntax_table: dict[str, int] = field(default_factory=make_default_syntax_table) @@ -221,7 +218,6 @@ class Reader: ## cached metadata to speed up screen refreshes @dataclass class RefreshCache: - in_bracketed_paste: bool = False screen: list[str] = field(default_factory=list) screeninfo: list[tuple[int, list[int]]] = field(init=False) line_end_offsets: list[int] = field(default_factory=list) @@ -235,7 +231,6 @@ class Reader: screen: list[str], screeninfo: list[tuple[int, list[int]]], ) -> None: - self.in_bracketed_paste = reader.in_bracketed_paste self.screen = screen.copy() self.screeninfo = screeninfo.copy() self.pos = reader.pos @@ -248,8 +243,7 @@ class Reader: return False dimensions = reader.console.width, reader.console.height dimensions_changed = dimensions != self.dimensions - paste_changed = reader.in_bracketed_paste != self.in_bracketed_paste - return not (dimensions_changed or paste_changed) + return not dimensions_changed def get_cached_location(self, reader: Reader) -> tuple[int, int]: if self.invalidated: @@ -279,7 +273,7 @@ class Reader: self.screeninfo = [(0, [])] self.cxy = self.pos2xy() self.lxy = (self.pos, 0) - self.can_colorize = can_colorize() + self.can_colorize = _colorize.can_colorize() self.last_refresh_cache.screeninfo = self.screeninfo self.last_refresh_cache.pos = self.pos @@ -316,6 +310,12 @@ class Reader: pos -= offset prompt_from_cache = (offset and self.buffer[offset - 1] != "\n") + + if self.can_colorize: + colors = list(gen_colors(self.get_unicode())) + else: + colors = None + trace("colors = {colors}", colors=colors) lines = "".join(self.buffer[offset:]).split("\n") cursor_found = False lines_beyond_cursor = 0 @@ -343,9 +343,8 @@ class Reader: screeninfo.append((0, [])) pos -= line_len + 1 prompt, prompt_len = self.process_prompt(prompt) - chars, char_widths = disp_str(line) + chars, char_widths = disp_str(line, colors, offset) wrapcount = (sum(char_widths) + prompt_len) // self.console.width - trace("wrapcount = {wrapcount}", wrapcount=wrapcount) if wrapcount == 0 or not char_widths: offset += line_len + 1 # Takes all of the line plus the newline last_refresh_line_end_offsets.append(offset) @@ -479,7 +478,7 @@ class Reader: 'lineno'.""" if self.arg is not None and cursor_on_line: prompt = f"(arg: {self.arg}) " - elif self.paste_mode and not self.in_bracketed_paste: + elif self.paste_mode: prompt = "(paste) " elif "\n" in self.buffer: if lineno == 0: @@ -492,7 +491,8 @@ class Reader: prompt = self.ps1 if self.can_colorize: - prompt = f"{ANSIColors.BOLD_MAGENTA}{prompt}{ANSIColors.RESET}" + t = THEME() + prompt = f"{t.prompt}{prompt}{t.reset}" return prompt def push_input_trans(self, itrans: input.KeymapTranslator) -> None: @@ -567,6 +567,7 @@ class Reader: def update_cursor(self) -> None: """Move the cursor to reflect changes in self.pos""" self.cxy = self.pos2xy() + trace("update_cursor({pos}) = {cxy}", pos=self.pos, cxy=self.cxy) self.console.move_cursor(*self.cxy) def after_command(self, cmd: Command) -> None: @@ -633,9 +634,6 @@ class Reader: def refresh(self) -> None: """Recalculate and refresh the screen.""" - if self.in_bracketed_paste and self.buffer and not self.buffer[-1] == "\n": - return - # this call sets up self.cxy, so call it first. self.screen = self.calc_screen() self.console.refresh(self.screen, self.cxy) diff --git a/Lib/_pyrepl/readline.py b/Lib/_pyrepl/readline.py index 9d58829faf1..9560ae779ab 100644 --- a/Lib/_pyrepl/readline.py +++ b/Lib/_pyrepl/readline.py @@ -134,7 +134,8 @@ class ReadlineAlikeReader(historical_reader.HistoricalReader, CompletingReader): return "".join(b[p + 1 : self.pos]) def get_completions(self, stem: str) -> list[str]: - if module_completions := self.get_module_completions(): + module_completions = self.get_module_completions() + if module_completions is not None: return module_completions if len(stem) == 0 and self.more_lines is not None: b = self.buffer @@ -165,7 +166,7 @@ class ReadlineAlikeReader(historical_reader.HistoricalReader, CompletingReader): result.sort() return result - def get_module_completions(self) -> list[str]: + def get_module_completions(self) -> list[str] | None: line = self.get_line() return self.config.module_completer.get_completions(line) @@ -276,10 +277,6 @@ class maybe_accept(commands.Command): r = self.reader # type: ignore[assignment] r.dirty = True # this is needed to hide the completion menu, if visible - if self.reader.in_bracketed_paste: - r.insert("\n") - return - # if there are already several lines and the cursor # is not on the last one, always insert a new \n. text = r.get_unicode() @@ -610,6 +607,7 @@ def _setup(namespace: Mapping[str, Any]) -> None: # set up namespace in rlcompleter, which requires it to be a bona fide dict if not isinstance(namespace, dict): namespace = dict(namespace) + _wrapper.config.module_completer = ModuleCompleter(namespace) _wrapper.config.readline_completer = RLCompleter(namespace).complete # this is not really what readline.c does. Better than nothing I guess diff --git a/Lib/_pyrepl/simple_interact.py b/Lib/_pyrepl/simple_interact.py index 4c74466118b..965b853c34b 100644 --- a/Lib/_pyrepl/simple_interact.py +++ b/Lib/_pyrepl/simple_interact.py @@ -31,6 +31,7 @@ import os import sys import code import warnings +import errno from .readline import _get_reader, multiline_input, append_history_file @@ -110,6 +111,10 @@ def run_multiline_interactive_console( more_lines = functools.partial(_more_lines, console) input_n = 0 + _is_x_showrefcount_set = sys._xoptions.get("showrefcount") + _is_pydebug_build = hasattr(sys, "gettotalrefcount") + show_ref_count = _is_x_showrefcount_set and _is_pydebug_build + def maybe_run_command(statement: str) -> bool: statement = statement.strip() if statement in console.locals or statement not in REPL_COMMANDS: @@ -149,6 +154,7 @@ def run_multiline_interactive_console( append_history_file() except (FileNotFoundError, PermissionError, OSError) as e: warnings.warn(f"failed to open the history file for writing: {e}") + input_n += 1 except KeyboardInterrupt: r = _get_reader() @@ -157,9 +163,18 @@ def run_multiline_interactive_console( r.pos = len(r.get_unicode()) r.dirty = True r.refresh() - r.in_bracketed_paste = False console.write("\nKeyboardInterrupt\n") console.resetbuffer() except MemoryError: console.write("\nMemoryError\n") console.resetbuffer() + except SystemExit: + raise + except: + console.showtraceback() + console.resetbuffer() + if show_ref_count: + console.write( + f"[{sys.gettotalrefcount()} refs," + f" {sys.getallocatedblocks()} blocks]\n" + ) diff --git a/Lib/_pyrepl/unix_console.py b/Lib/_pyrepl/unix_console.py index 96379bc20f3..d21cdd9b076 100644 --- a/Lib/_pyrepl/unix_console.py +++ b/Lib/_pyrepl/unix_console.py @@ -29,6 +29,7 @@ import signal import struct import termios import time +import types import platform from fcntl import ioctl @@ -39,6 +40,12 @@ from .trace import trace from .unix_eventqueue import EventQueue from .utils import wlen +# declare posix optional to allow None assignment on other platforms +posix: types.ModuleType | None +try: + import posix +except ImportError: + posix = None TYPE_CHECKING = False @@ -150,8 +157,6 @@ class UnixConsole(Console): self.pollob = poll() self.pollob.register(self.input_fd, select.POLLIN) - self.input_buffer = b"" - self.input_buffer_pos = 0 curses.setupterm(term or None, self.output_fd) self.term = term @@ -199,22 +204,14 @@ class UnixConsole(Console): self.event_queue = EventQueue(self.input_fd, self.encoding) self.cursor_visible = 1 - def more_in_buffer(self) -> bool: - return bool( - self.input_buffer - and self.input_buffer_pos < len(self.input_buffer) - ) + signal.signal(signal.SIGCONT, self._sigcont_handler) - def __read(self, n: int) -> bytes: - if not self.more_in_buffer(): - self.input_buffer = os.read(self.input_fd, 10000) + def _sigcont_handler(self, signum, frame): + self.restore() + self.prepare() - ret = self.input_buffer[self.input_buffer_pos : self.input_buffer_pos + n] - self.input_buffer_pos += len(ret) - if self.input_buffer_pos >= len(self.input_buffer): - self.input_buffer = b"" - self.input_buffer_pos = 0 - return ret + def __read(self, n: int) -> bytes: + return os.read(self.input_fd, n) def change_encoding(self, encoding: str) -> None: @@ -422,7 +419,6 @@ class UnixConsole(Console): """ return ( not self.event_queue.empty() - or self.more_in_buffer() or bool(self.pollob.poll(timeout)) ) @@ -525,6 +521,7 @@ class UnixConsole(Console): e.raw += e.raw amount = struct.unpack("i", ioctl(self.input_fd, FIONREAD, b"\0\0\0\0"))[0] + trace("getpending({a})", a=amount) raw = self.__read(amount) data = str(raw, self.encoding, "replace") e.data += data @@ -566,11 +563,9 @@ class UnixConsole(Console): @property def input_hook(self): - try: - import posix - except ImportError: - return None - if posix._is_inputhook_installed(): + # avoid inline imports here so the repl doesn't get flooded + # with import logging from -X importtime=2 + if posix is not None and posix._is_inputhook_installed(): return posix._inputhook def __enable_bracketed_paste(self) -> None: diff --git a/Lib/_pyrepl/utils.py b/Lib/_pyrepl/utils.py index 7437fbe1ab9..e04fbdc6c8a 100644 --- a/Lib/_pyrepl/utils.py +++ b/Lib/_pyrepl/utils.py @@ -1,6 +1,17 @@ +from __future__ import annotations +import builtins +import functools +import keyword import re +import token as T +import tokenize import unicodedata -import functools +import _colorize + +from collections import deque +from io import StringIO +from tokenize import TokenInfo as TI +from typing import Iterable, Iterator, Match, NamedTuple, Self from .types import CharBuffer, CharWidths from .trace import trace @@ -8,6 +19,43 @@ from .trace import trace ANSI_ESCAPE_SEQUENCE = re.compile(r"\x1b\[[ -@]*[A-~]") ZERO_WIDTH_BRACKET = re.compile(r"\x01.*?\x02") ZERO_WIDTH_TRANS = str.maketrans({"\x01": "", "\x02": ""}) +IDENTIFIERS_AFTER = {"def", "class"} +BUILTINS = {str(name) for name in dir(builtins) if not name.startswith('_')} + + +def THEME(**kwargs): + # Not cached: the user can modify the theme inside the interactive session. + return _colorize.get_theme(**kwargs).syntax + + +class Span(NamedTuple): + """Span indexing that's inclusive on both ends.""" + + start: int + end: int + + @classmethod + def from_re(cls, m: Match[str], group: int | str) -> Self: + re_span = m.span(group) + return cls(re_span[0], re_span[1] - 1) + + @classmethod + def from_token(cls, token: TI, line_len: list[int]) -> Self: + end_offset = -1 + if (token.type in {T.FSTRING_MIDDLE, T.TSTRING_MIDDLE} + and token.string.endswith(("{", "}"))): + # gh-134158: a visible trailing brace comes from a double brace in input + end_offset += 1 + + return cls( + line_len[token.start[0] - 1] + token.start[1], + line_len[token.end[0] - 1] + token.end[1] + end_offset, + ) + + +class ColorSpan(NamedTuple): + span: Span + tag: str @functools.cache @@ -41,17 +89,212 @@ def unbracket(s: str, including_content: bool = False) -> str: return s.translate(ZERO_WIDTH_TRANS) -def disp_str(buffer: str) -> tuple[CharBuffer, CharWidths]: - r"""Decompose the input buffer into a printable variant. +def gen_colors(buffer: str) -> Iterator[ColorSpan]: + """Returns a list of index spans to color using the given color tag. + + The input `buffer` should be a valid start of a Python code block, i.e. + it cannot be a block starting in the middle of a multiline string. + """ + sio = StringIO(buffer) + line_lengths = [0] + [len(line) for line in sio.readlines()] + # make line_lengths cumulative + for i in range(1, len(line_lengths)): + line_lengths[i] += line_lengths[i-1] + + sio.seek(0) + gen = tokenize.generate_tokens(sio.readline) + last_emitted: ColorSpan | None = None + try: + for color in gen_colors_from_token_stream(gen, line_lengths): + yield color + last_emitted = color + except SyntaxError: + return + except tokenize.TokenError as te: + yield from recover_unterminated_string( + te, line_lengths, last_emitted, buffer + ) + + +def recover_unterminated_string( + exc: tokenize.TokenError, + line_lengths: list[int], + last_emitted: ColorSpan | None, + buffer: str, +) -> Iterator[ColorSpan]: + msg, loc = exc.args + if loc is None: + return + + line_no, column = loc + + if msg.startswith( + ( + "unterminated string literal", + "unterminated f-string literal", + "unterminated t-string literal", + "EOF in multi-line string", + "unterminated triple-quoted f-string literal", + "unterminated triple-quoted t-string literal", + ) + ): + start = line_lengths[line_no - 1] + column - 1 + end = line_lengths[-1] - 1 + + # in case FSTRING_START was already emitted + if last_emitted and start <= last_emitted.span.start: + trace("before last emitted = {s}", s=start) + start = last_emitted.span.end + 1 + + span = Span(start, end) + trace("yielding span {a} -> {b}", a=span.start, b=span.end) + yield ColorSpan(span, "string") + else: + trace( + "unhandled token error({buffer}) = {te}", + buffer=repr(buffer), + te=str(exc), + ) + + +def gen_colors_from_token_stream( + token_generator: Iterator[TI], + line_lengths: list[int], +) -> Iterator[ColorSpan]: + token_window = prev_next_window(token_generator) + + is_def_name = False + bracket_level = 0 + for prev_token, token, next_token in token_window: + assert token is not None + if token.start == token.end: + continue + + match token.type: + case ( + T.STRING + | T.FSTRING_START | T.FSTRING_MIDDLE | T.FSTRING_END + | T.TSTRING_START | T.TSTRING_MIDDLE | T.TSTRING_END + ): + span = Span.from_token(token, line_lengths) + yield ColorSpan(span, "string") + case T.COMMENT: + span = Span.from_token(token, line_lengths) + yield ColorSpan(span, "comment") + case T.NUMBER: + span = Span.from_token(token, line_lengths) + yield ColorSpan(span, "number") + case T.OP: + if token.string in "([{": + bracket_level += 1 + elif token.string in ")]}": + bracket_level -= 1 + span = Span.from_token(token, line_lengths) + yield ColorSpan(span, "op") + case T.NAME: + if is_def_name: + is_def_name = False + span = Span.from_token(token, line_lengths) + yield ColorSpan(span, "definition") + elif keyword.iskeyword(token.string): + span = Span.from_token(token, line_lengths) + yield ColorSpan(span, "keyword") + if token.string in IDENTIFIERS_AFTER: + is_def_name = True + elif ( + keyword.issoftkeyword(token.string) + and bracket_level == 0 + and is_soft_keyword_used(prev_token, token, next_token) + ): + span = Span.from_token(token, line_lengths) + yield ColorSpan(span, "soft_keyword") + elif token.string in BUILTINS: + span = Span.from_token(token, line_lengths) + yield ColorSpan(span, "builtin") + + +keyword_first_sets_match = {"False", "None", "True", "await", "lambda", "not"} +keyword_first_sets_case = {"False", "None", "True"} + + +def is_soft_keyword_used(*tokens: TI | None) -> bool: + """Returns True if the current token is a keyword in this context. + + For the `*tokens` to match anything, they have to be a three-tuple of + (previous, current, next). + """ + trace("is_soft_keyword_used{t}", t=tokens) + match tokens: + case ( + None | TI(T.NEWLINE) | TI(T.INDENT) | TI(string=":"), + TI(string="match"), + TI(T.NUMBER | T.STRING | T.FSTRING_START | T.TSTRING_START) + | TI(T.OP, string="(" | "*" | "[" | "{" | "~" | "...") + ): + return True + case ( + None | TI(T.NEWLINE) | TI(T.INDENT) | TI(string=":"), + TI(string="match"), + TI(T.NAME, string=s) + ): + if keyword.iskeyword(s): + return s in keyword_first_sets_match + return True + case ( + None | TI(T.NEWLINE) | TI(T.INDENT) | TI(string=":"), + TI(string="case"), + TI(T.NUMBER | T.STRING | T.FSTRING_START | T.TSTRING_START) + | TI(T.OP, string="(" | "*" | "-" | "[" | "{") + ): + return True + case ( + None | TI(T.NEWLINE) | TI(T.INDENT) | TI(string=":"), + TI(string="case"), + TI(T.NAME, string=s) + ): + if keyword.iskeyword(s): + return s in keyword_first_sets_case + return True + case (TI(string="case"), TI(string="_"), TI(string=":")): + return True + case _: + return False + + +def disp_str( + buffer: str, + colors: list[ColorSpan] | None = None, + start_index: int = 0, + force_color: bool = False, +) -> tuple[CharBuffer, CharWidths]: + r"""Decompose the input buffer into a printable variant with applied colors. Returns a tuple of two lists: - - the first list is the input buffer, character by character; + - the first list is the input buffer, character by character, with color + escape codes added (while those codes contain multiple ASCII characters, + each code is considered atomic *and is attached for the corresponding + visible character*); - the second list is the visible width of each character in the input buffer. + Note on colors: + - The `colors` list, if provided, is partially consumed within. We're using + a list and not a generator since we need to hold onto the current + unfinished span between calls to disp_str in case of multiline strings. + - The `colors` list is computed from the start of the input block. `buffer` + is only a subset of that input block, a single line within. This is why + we need `start_index` to inform us which position is the start of `buffer` + actually within user input. This allows us to match color spans correctly. + Examples: >>> utils.disp_str("a = 9") (['a', ' ', '=', ' ', '9'], [1, 1, 1, 1, 1]) + + >>> line = "while 1:" + >>> colors = list(utils.gen_colors(line)) + >>> utils.disp_str(line, colors=colors) + (['\x1b[1;34mw', 'h', 'i', 'l', 'e\x1b[0m', ' ', '1', ':'], [1, 1, 1, 1, 1, 1, 1, 1]) + """ chars: CharBuffer = [] char_widths: CharWidths = [] @@ -59,7 +302,21 @@ def disp_str(buffer: str) -> tuple[CharBuffer, CharWidths]: if not buffer: return chars, char_widths - for c in buffer: + while colors and colors[0].span.end < start_index: + # move past irrelevant spans + colors.pop(0) + + theme = THEME(force_color=force_color) + pre_color = "" + post_color = "" + if colors and colors[0].span.start < start_index: + # looks like we're continuing a previous color (e.g. a multiline str) + pre_color = theme[colors[0].tag] + + for i, c in enumerate(buffer, start_index): + if colors and colors[0].span.start == i: # new color starts now + pre_color = theme[colors[0].tag] + if c == "\x1a": # CTRL-Z on Windows chars.append(c) char_widths.append(2) @@ -73,5 +330,43 @@ def disp_str(buffer: str) -> tuple[CharBuffer, CharWidths]: else: chars.append(c) char_widths.append(str_width(c)) - trace("disp_str({buffer}) = {s}, {b}", buffer=repr(buffer), s=chars, b=char_widths) + + if colors and colors[0].span.end == i: # current color ends now + post_color = theme.reset + colors.pop(0) + + chars[-1] = pre_color + chars[-1] + post_color + pre_color = "" + post_color = "" + + if colors and colors[0].span.start < i and colors[0].span.end > i: + # even though the current color should be continued, reset it for now. + # the next call to `disp_str()` will revive it. + chars[-1] += theme.reset + return chars, char_widths + + +def prev_next_window[T]( + iterable: Iterable[T] +) -> Iterator[tuple[T | None, ...]]: + """Generates three-tuples of (previous, current, next) items. + + On the first iteration previous is None. On the last iteration next + is None. In case of exception next is None and the exception is re-raised + on a subsequent next() call. + + Inspired by `sliding_window` from `itertools` recipes. + """ + + iterator = iter(iterable) + window = deque((None, next(iterator)), maxlen=3) + try: + for x in iterator: + window.append(x) + yield tuple(window) + except Exception: + raise + finally: + window.append(None) + yield tuple(window) diff --git a/Lib/_pyrepl/windows_console.py b/Lib/_pyrepl/windows_console.py index 17942c8df07..c56dcd6d7dd 100644 --- a/Lib/_pyrepl/windows_console.py +++ b/Lib/_pyrepl/windows_console.py @@ -24,6 +24,7 @@ import os import sys import ctypes +import types from ctypes.wintypes import ( _COORD, WORD, @@ -58,6 +59,12 @@ except: self.err = err self.descr = descr +# declare nt optional to allow None assignment on other platforms +nt: types.ModuleType | None +try: + import nt +except ImportError: + nt = None TYPE_CHECKING = False @@ -121,9 +128,8 @@ class _error(Exception): def _supports_vt(): try: - import nt return nt._supports_virtual_terminal() - except (ImportError, AttributeError): + except AttributeError: return False class WindowsConsole(Console): @@ -235,11 +241,9 @@ class WindowsConsole(Console): @property def input_hook(self): - try: - import nt - except ImportError: - return None - if nt._is_inputhook_installed(): + # avoid inline imports here so the repl doesn't get flooded + # with import logging from -X importtime=2 + if nt is not None and nt._is_inputhook_installed(): return nt._inputhook def __write_changed_line( @@ -415,10 +419,7 @@ class WindowsConsole(Console): return info.srWindow.Bottom # type: ignore[no-any-return] - def _read_input(self, block: bool = True) -> INPUT_RECORD | None: - if not block and not self.wait(timeout=0): - return None - + def _read_input(self) -> INPUT_RECORD | None: rec = INPUT_RECORD() read = DWORD() if not ReadConsoleInput(InHandle, rec, 1, read): @@ -426,13 +427,26 @@ class WindowsConsole(Console): return rec + def _read_input_bulk( + self, n: int + ) -> tuple[ctypes.Array[INPUT_RECORD], int]: + rec = (n * INPUT_RECORD)() + read = DWORD() + if not ReadConsoleInput(InHandle, rec, n, read): + raise WinError(GetLastError()) + + return rec, read.value + def get_event(self, block: bool = True) -> Event | None: """Return an Event instance. Returns None if |block| is false and there is no event pending, otherwise waits for the completion of an event.""" + if not block and not self.wait(timeout=0): + return None + while self.event_queue.empty(): - rec = self._read_input(block) + rec = self._read_input() if rec is None: return None @@ -450,7 +464,7 @@ class WindowsConsole(Console): if key == "\r": # Make enter unix-like - return Event(evt="key", data="\n", raw=b"\n") + return Event(evt="key", data="\n") elif key_event.wVirtualKeyCode == 8: # Turn backspace directly into the command key = "backspace" @@ -462,24 +476,29 @@ class WindowsConsole(Console): key = f"ctrl {key}" elif key_event.dwControlKeyState & ALT_ACTIVE: # queue the key, return the meta command - self.event_queue.insert(Event(evt="key", data=key, raw=key)) + self.event_queue.insert(Event(evt="key", data=key)) return Event(evt="key", data="\033") # keymap.py uses this for meta - return Event(evt="key", data=key, raw=key) + return Event(evt="key", data=key) if block: continue return None elif self.__vt_support: # If virtual terminal is enabled, scanning VT sequences - self.event_queue.push(rec.Event.KeyEvent.uChar.UnicodeChar) + for char in raw_key.encode(self.event_queue.encoding, "replace"): + self.event_queue.push(char) continue if key_event.dwControlKeyState & ALT_ACTIVE: - # queue the key, return the meta command - self.event_queue.insert(Event(evt="key", data=key, raw=raw_key)) - return Event(evt="key", data="\033") # keymap.py uses this for meta - - return Event(evt="key", data=key, raw=raw_key) + # Do not swallow characters that have been entered via AltGr: + # Windows internally converts AltGr to CTRL+ALT, see + # https://learn.microsoft.com/en-us/windows/win32/api/winuser/nf-winuser-vkkeyscanw + if not key_event.dwControlKeyState & CTRL_ACTIVE: + # queue the key, return the meta command + self.event_queue.insert(Event(evt="key", data=key)) + return Event(evt="key", data="\033") # keymap.py uses this for meta + + return Event(evt="key", data=key) return self.event_queue.get() def push_char(self, char: int | bytes) -> None: @@ -521,7 +540,31 @@ class WindowsConsole(Console): def getpending(self) -> Event: """Return the characters that have been typed but not yet processed.""" - return Event("key", "", b"") + e = Event("key", "", b"") + + while not self.event_queue.empty(): + e2 = self.event_queue.get() + if e2: + e.data += e2.data + + recs, rec_count = self._read_input_bulk(1024) + for i in range(rec_count): + rec = recs[i] + # In case of a legacy console, we do not only receive a keydown + # event, but also a keyup event - and for uppercase letters + # an additional SHIFT_PRESSED event. + if rec and rec.EventType == KEY_EVENT: + key_event = rec.Event.KeyEvent + if not key_event.bKeyDown: + continue + ch = key_event.uChar.UnicodeChar + if ch == "\x00": + # ignore SHIFT_PRESSED and special keys + continue + if ch == "\r": + ch += "\n" + e.data += ch + return e def wait(self, timeout: float | None) -> bool: """Wait for an event.""" |