diff options
author | Łukasz Langa <lukasz@langa.pl> | 2025-05-02 20:22:31 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-05-02 20:22:31 +0200 |
commit | fac41f56d4b6b858cb52b40529855cce85cdbdcc (patch) | |
tree | 70490d6d77240385c4ca99281c7e5333261e89dd | |
parent | bfcbb28223b733b9cb88f152a059a9e1416f3467 (diff) | |
download | cpython-fac41f56d4b6b858cb52b40529855cce85cdbdcc.tar.gz cpython-fac41f56d4b6b858cb52b40529855cce85cdbdcc.zip |
gh-131507: Add support for syntax highlighting in PyREPL (GH-133247)
Co-authored-by: Victorien <65306057+Viicos@users.noreply.github.com>
Co-authored-by: Hugo van Kemenade <1324225+hugovk@users.noreply.github.com>
-rw-r--r-- | Doc/whatsnew/3.14.rst | 17 | ||||
-rw-r--r-- | Lib/_colorize.py | 43 | ||||
-rw-r--r-- | Lib/_pyrepl/_module_completer.py | 9 | ||||
-rw-r--r-- | Lib/_pyrepl/commands.py | 29 | ||||
-rw-r--r-- | Lib/_pyrepl/mypy.ini | 4 | ||||
-rw-r--r-- | Lib/_pyrepl/reader.py | 49 | ||||
-rw-r--r-- | Lib/_pyrepl/readline.py | 4 | ||||
-rw-r--r-- | Lib/_pyrepl/simple_interact.py | 1 | ||||
-rw-r--r-- | Lib/_pyrepl/unix_console.py | 20 | ||||
-rw-r--r-- | Lib/_pyrepl/utils.py | 290 | ||||
-rw-r--r-- | Lib/_pyrepl/windows_console.py | 32 | ||||
-rw-r--r-- | Lib/test/test_pyrepl/test_pyrepl.py | 33 | ||||
-rw-r--r-- | Lib/test/test_pyrepl/test_reader.py | 149 | ||||
-rw-r--r-- | Lib/test/test_pyrepl/test_unix_console.py | 4 | ||||
-rw-r--r-- | Lib/test/test_pyrepl/test_utils.py | 37 | ||||
-rw-r--r-- | Lib/test/test_pyrepl/test_windows_console.py | 17 | ||||
-rw-r--r-- | Lib/token.py | 6 | ||||
-rw-r--r-- | Misc/NEWS.d/next/Core_and_Builtins/2025-03-21-19-03-42.gh-issue-131507.q9fvyM.rst | 1 | ||||
l--------- | Misc/mypy/token.py | 1 | ||||
-rw-r--r-- | Misc/mypy/typed-stdlib.txt | 1 | ||||
-rwxr-xr-x | Tools/build/generate_token.py | 6 |
21 files changed, 654 insertions, 99 deletions
diff --git a/Doc/whatsnew/3.14.rst b/Doc/whatsnew/3.14.rst index 8e8578f6a99..2f8b652d47e 100644 --- a/Doc/whatsnew/3.14.rst +++ b/Doc/whatsnew/3.14.rst @@ -560,6 +560,23 @@ For further information on how to build Python, see (Contributed by Ken Jin in :gh:`128563`, with ideas on how to implement this in CPython by Mark Shannon, Garrett Gu, Haoran Xu, and Josh Haberman.) +Syntax highlighting in PyREPL +----------------------------- + +The default :term:`interactive` shell now highlights Python syntax as you +type. The feature is enabled by default unless the +:envvar:`PYTHON_BASIC_REPL` environment is set or any color-disabling +environment variables are used. See :ref:`using-on-controlling-color` for +details. + +The default color theme for syntax highlighting strives for good contrast +and uses exclusively the 4-bit VGA standard ANSI color codes for maximum +compatibility. The theme can be customized using an experimental API +``_colorize.set_theme()``. This can be called interactively, as well as +in the :envvar:`PYTHONSTARTUP` script. + +(Contributed by Łukasz Langa in :gh:`131507`.) + Other language changes ====================== diff --git a/Lib/_colorize.py b/Lib/_colorize.py index a39ff2ce5c1..54895488e74 100644 --- a/Lib/_colorize.py +++ b/Lib/_colorize.py @@ -7,7 +7,22 @@ COLORIZE = True # types if False: - from typing import IO + from typing import IO, Literal + + type ColorTag = Literal[ + "PROMPT", + "KEYWORD", + "BUILTIN", + "COMMENT", + "STRING", + "NUMBER", + "OP", + "DEFINITION", + "SOFT_KEYWORD", + "RESET", + ] + + theme: dict[ColorTag, str] class ANSIColors: @@ -23,6 +38,7 @@ class ANSIColors: WHITE = "\x1b[37m" # more like LIGHT GRAY YELLOW = "\x1b[33m" + BOLD = "\x1b[1m" BOLD_BLACK = "\x1b[1;30m" # DARK GRAY BOLD_BLUE = "\x1b[1;34m" BOLD_CYAN = "\x1b[1;36m" @@ -120,3 +136,28 @@ def can_colorize(*, file: IO[str] | IO[bytes] | None = None) -> bool: return os.isatty(file.fileno()) except io.UnsupportedOperation: return hasattr(file, "isatty") and file.isatty() + + +def set_theme(t: dict[ColorTag, str] | None = None) -> None: + global theme + + if t: + theme = t + return + + colors = get_colors() + theme = { + "PROMPT": colors.BOLD_MAGENTA, + "KEYWORD": colors.BOLD_BLUE, + "BUILTIN": colors.CYAN, + "COMMENT": colors.RED, + "STRING": colors.GREEN, + "NUMBER": colors.YELLOW, + "OP": colors.RESET, + "DEFINITION": colors.BOLD, + "SOFT_KEYWORD": colors.BOLD_BLUE, + "RESET": colors.RESET, + } + + +set_theme() diff --git a/Lib/_pyrepl/_module_completer.py b/Lib/_pyrepl/_module_completer.py index 1fb043e0b70..347f05607c7 100644 --- a/Lib/_pyrepl/_module_completer.py +++ b/Lib/_pyrepl/_module_completer.py @@ -2,6 +2,7 @@ from __future__ import annotations import pkgutil import sys +import token import tokenize from io import StringIO from contextlib import contextmanager @@ -180,8 +181,8 @@ class ImportParser: when parsing multiple statements. """ _ignored_tokens = { - tokenize.INDENT, tokenize.DEDENT, tokenize.COMMENT, - tokenize.NL, tokenize.NEWLINE, tokenize.ENDMARKER + token.INDENT, token.DEDENT, token.COMMENT, + token.NL, token.NEWLINE, token.ENDMARKER } _keywords = {'import', 'from', 'as'} @@ -350,11 +351,11 @@ class TokenQueue: def peek_name(self) -> bool: if not (tok := self.peek()): return False - return tok.type == tokenize.NAME + return tok.type == token.NAME def pop_name(self) -> str: tok = self.pop() - if tok.type != tokenize.NAME: + if tok.type != token.NAME: raise ParseError('pop_name') return tok.string diff --git a/Lib/_pyrepl/commands.py b/Lib/_pyrepl/commands.py index cbb6d85f683..2054a8e400f 100644 --- a/Lib/_pyrepl/commands.py +++ b/Lib/_pyrepl/commands.py @@ -21,6 +21,7 @@ from __future__ import annotations import os +import time # Categories of actions: # killing @@ -31,6 +32,7 @@ import os # finishing # [completion] +from .trace import trace # types if False: @@ -471,19 +473,24 @@ class show_history(Command): class paste_mode(Command): - def do(self) -> None: self.reader.paste_mode = not self.reader.paste_mode self.reader.dirty = True -class enable_bracketed_paste(Command): - def do(self) -> None: - self.reader.paste_mode = True - self.reader.in_bracketed_paste = True - -class disable_bracketed_paste(Command): - def do(self) -> None: - self.reader.paste_mode = False - self.reader.in_bracketed_paste = False - self.reader.dirty = True +class perform_bracketed_paste(Command): + def do(self) -> None: + done = "\x1b[201~" + data = "" + start = time.time() + while done not in data: + self.reader.console.wait(100) + ev = self.reader.console.getpending() + data += ev.data + trace( + "bracketed pasting of {l} chars done in {s:.2f}s", + l=len(data), + s=time.time() - start, + ) + self.reader.insert(data.replace(done, "")) + self.reader.last_refresh_cache.invalidated = True diff --git a/Lib/_pyrepl/mypy.ini b/Lib/_pyrepl/mypy.ini index eabd0e9b440..9375a55b53c 100644 --- a/Lib/_pyrepl/mypy.ini +++ b/Lib/_pyrepl/mypy.ini @@ -23,7 +23,3 @@ check_untyped_defs = False # Various internal modules that typeshed deliberately doesn't have stubs for: [mypy-_abc.*,_opcode.*,_overlapped.*,_testcapi.*,_testinternalcapi.*,test.*] ignore_missing_imports = True - -# Other untyped parts of the stdlib -[mypy-idlelib.*] -ignore_missing_imports = True diff --git a/Lib/_pyrepl/reader.py b/Lib/_pyrepl/reader.py index 7fc2422dac9..65c2230dfd6 100644 --- a/Lib/_pyrepl/reader.py +++ b/Lib/_pyrepl/reader.py @@ -22,14 +22,13 @@ from __future__ import annotations import sys +import _colorize from contextlib import contextmanager from dataclasses import dataclass, field, fields -from _colorize import can_colorize, ANSIColors - from . import commands, console, input -from .utils import wlen, unbracket, disp_str +from .utils import wlen, unbracket, disp_str, gen_colors from .trace import trace @@ -38,8 +37,7 @@ Command = commands.Command from .types import Callback, SimpleContextManager, KeySpec, CommandName -# syntax classes: - +# syntax classes SYNTAX_WHITESPACE, SYNTAX_WORD, SYNTAX_SYMBOL = range(3) @@ -105,8 +103,7 @@ default_keymap: tuple[tuple[KeySpec, CommandName], ...] = tuple( (r"\M-9", "digit-arg"), (r"\M-\n", "accept"), ("\\\\", "self-insert"), - (r"\x1b[200~", "enable_bracketed_paste"), - (r"\x1b[201~", "disable_bracketed_paste"), + (r"\x1b[200~", "perform-bracketed-paste"), (r"\x03", "ctrl-c"), ] + [(c, "self-insert") for c in map(chr, range(32, 127)) if c != "\\"] @@ -144,16 +141,17 @@ class Reader: Instance variables of note include: * buffer: - A *list* (*not* a string at the moment :-) containing all the - characters that have been entered. + A per-character list containing all the characters that have been + entered. Does not include color information. * console: Hopefully encapsulates the OS dependent stuff. * pos: A 0-based index into 'buffer' for where the insertion point is. * screeninfo: - Ahem. This list contains some info needed to move the - insertion point around reasonably efficiently. + A list of screen position tuples. Each list element is a tuple + representing information on visible line length for a given line. + Allows for efficient skipping of color escape sequences. * cxy, lxy: the position of the insertion point in screen ... * syntax_table: @@ -203,7 +201,6 @@ class Reader: dirty: bool = False finished: bool = False paste_mode: bool = False - in_bracketed_paste: bool = False commands: dict[str, type[Command]] = field(default_factory=make_default_commands) last_command: type[Command] | None = None syntax_table: dict[str, int] = field(default_factory=make_default_syntax_table) @@ -221,7 +218,6 @@ class Reader: ## cached metadata to speed up screen refreshes @dataclass class RefreshCache: - in_bracketed_paste: bool = False screen: list[str] = field(default_factory=list) screeninfo: list[tuple[int, list[int]]] = field(init=False) line_end_offsets: list[int] = field(default_factory=list) @@ -235,7 +231,6 @@ class Reader: screen: list[str], screeninfo: list[tuple[int, list[int]]], ) -> None: - self.in_bracketed_paste = reader.in_bracketed_paste self.screen = screen.copy() self.screeninfo = screeninfo.copy() self.pos = reader.pos @@ -248,8 +243,7 @@ class Reader: return False dimensions = reader.console.width, reader.console.height dimensions_changed = dimensions != self.dimensions - paste_changed = reader.in_bracketed_paste != self.in_bracketed_paste - return not (dimensions_changed or paste_changed) + return not dimensions_changed def get_cached_location(self, reader: Reader) -> tuple[int, int]: if self.invalidated: @@ -279,7 +273,7 @@ class Reader: self.screeninfo = [(0, [])] self.cxy = self.pos2xy() self.lxy = (self.pos, 0) - self.can_colorize = can_colorize() + self.can_colorize = _colorize.can_colorize() self.last_refresh_cache.screeninfo = self.screeninfo self.last_refresh_cache.pos = self.pos @@ -316,6 +310,12 @@ class Reader: pos -= offset prompt_from_cache = (offset and self.buffer[offset - 1] != "\n") + + if self.can_colorize: + colors = list(gen_colors(self.get_unicode())) + else: + colors = None + trace("colors = {colors}", colors=colors) lines = "".join(self.buffer[offset:]).split("\n") cursor_found = False lines_beyond_cursor = 0 @@ -343,9 +343,8 @@ class Reader: screeninfo.append((0, [])) pos -= line_len + 1 prompt, prompt_len = self.process_prompt(prompt) - chars, char_widths = disp_str(line) + chars, char_widths = disp_str(line, colors, offset) wrapcount = (sum(char_widths) + prompt_len) // self.console.width - trace("wrapcount = {wrapcount}", wrapcount=wrapcount) if wrapcount == 0 or not char_widths: offset += line_len + 1 # Takes all of the line plus the newline last_refresh_line_end_offsets.append(offset) @@ -479,7 +478,7 @@ class Reader: 'lineno'.""" if self.arg is not None and cursor_on_line: prompt = f"(arg: {self.arg}) " - elif self.paste_mode and not self.in_bracketed_paste: + elif self.paste_mode: prompt = "(paste) " elif "\n" in self.buffer: if lineno == 0: @@ -492,7 +491,11 @@ class Reader: prompt = self.ps1 if self.can_colorize: - prompt = f"{ANSIColors.BOLD_MAGENTA}{prompt}{ANSIColors.RESET}" + prompt = ( + f"{_colorize.theme["PROMPT"]}" + f"{prompt}" + f"{_colorize.theme["RESET"]}" + ) return prompt def push_input_trans(self, itrans: input.KeymapTranslator) -> None: @@ -567,6 +570,7 @@ class Reader: def update_cursor(self) -> None: """Move the cursor to reflect changes in self.pos""" self.cxy = self.pos2xy() + trace("update_cursor({pos}) = {cxy}", pos=self.pos, cxy=self.cxy) self.console.move_cursor(*self.cxy) def after_command(self, cmd: Command) -> None: @@ -633,9 +637,6 @@ class Reader: def refresh(self) -> None: """Recalculate and refresh the screen.""" - if self.in_bracketed_paste and self.buffer and not self.buffer[-1] == "\n": - return - # this call sets up self.cxy, so call it first. self.screen = self.calc_screen() self.console.refresh(self.screen, self.cxy) diff --git a/Lib/_pyrepl/readline.py b/Lib/_pyrepl/readline.py index 9d58829faf1..560a9db1921 100644 --- a/Lib/_pyrepl/readline.py +++ b/Lib/_pyrepl/readline.py @@ -276,10 +276,6 @@ class maybe_accept(commands.Command): r = self.reader # type: ignore[assignment] r.dirty = True # this is needed to hide the completion menu, if visible - if self.reader.in_bracketed_paste: - r.insert("\n") - return - # if there are already several lines and the cursor # is not on the last one, always insert a new \n. text = r.get_unicode() diff --git a/Lib/_pyrepl/simple_interact.py b/Lib/_pyrepl/simple_interact.py index 4c74466118b..e2274629b65 100644 --- a/Lib/_pyrepl/simple_interact.py +++ b/Lib/_pyrepl/simple_interact.py @@ -157,7 +157,6 @@ def run_multiline_interactive_console( r.pos = len(r.get_unicode()) r.dirty = True r.refresh() - r.in_bracketed_paste = False console.write("\nKeyboardInterrupt\n") console.resetbuffer() except MemoryError: diff --git a/Lib/_pyrepl/unix_console.py b/Lib/_pyrepl/unix_console.py index 96379bc20f3..07b160d2324 100644 --- a/Lib/_pyrepl/unix_console.py +++ b/Lib/_pyrepl/unix_console.py @@ -150,8 +150,6 @@ class UnixConsole(Console): self.pollob = poll() self.pollob.register(self.input_fd, select.POLLIN) - self.input_buffer = b"" - self.input_buffer_pos = 0 curses.setupterm(term or None, self.output_fd) self.term = term @@ -199,22 +197,8 @@ class UnixConsole(Console): self.event_queue = EventQueue(self.input_fd, self.encoding) self.cursor_visible = 1 - def more_in_buffer(self) -> bool: - return bool( - self.input_buffer - and self.input_buffer_pos < len(self.input_buffer) - ) - def __read(self, n: int) -> bytes: - if not self.more_in_buffer(): - self.input_buffer = os.read(self.input_fd, 10000) - - ret = self.input_buffer[self.input_buffer_pos : self.input_buffer_pos + n] - self.input_buffer_pos += len(ret) - if self.input_buffer_pos >= len(self.input_buffer): - self.input_buffer = b"" - self.input_buffer_pos = 0 - return ret + return os.read(self.input_fd, n) def change_encoding(self, encoding: str) -> None: @@ -422,7 +406,6 @@ class UnixConsole(Console): """ return ( not self.event_queue.empty() - or self.more_in_buffer() or bool(self.pollob.poll(timeout)) ) @@ -525,6 +508,7 @@ class UnixConsole(Console): e.raw += e.raw amount = struct.unpack("i", ioctl(self.input_fd, FIONREAD, b"\0\0\0\0"))[0] + trace("getpending({a})", a=amount) raw = self.__read(amount) data = str(raw, self.encoding, "replace") e.data += data diff --git a/Lib/_pyrepl/utils.py b/Lib/_pyrepl/utils.py index 7437fbe1ab9..fe154aa59a0 100644 --- a/Lib/_pyrepl/utils.py +++ b/Lib/_pyrepl/utils.py @@ -1,6 +1,17 @@ +from __future__ import annotations +import builtins +import functools +import keyword import re +import token as T +import tokenize import unicodedata -import functools +import _colorize + +from collections import deque +from io import StringIO +from tokenize import TokenInfo as TI +from typing import Iterable, Iterator, Match, NamedTuple, Self from .types import CharBuffer, CharWidths from .trace import trace @@ -8,6 +19,32 @@ from .trace import trace ANSI_ESCAPE_SEQUENCE = re.compile(r"\x1b\[[ -@]*[A-~]") ZERO_WIDTH_BRACKET = re.compile(r"\x01.*?\x02") ZERO_WIDTH_TRANS = str.maketrans({"\x01": "", "\x02": ""}) +IDENTIFIERS_AFTER = {"def", "class"} +BUILTINS = {str(name) for name in dir(builtins) if not name.startswith('_')} + + +class Span(NamedTuple): + """Span indexing that's inclusive on both ends.""" + + start: int + end: int + + @classmethod + def from_re(cls, m: Match[str], group: int | str) -> Self: + re_span = m.span(group) + return cls(re_span[0], re_span[1] - 1) + + @classmethod + def from_token(cls, token: TI, line_len: list[int]) -> Self: + return cls( + line_len[token.start[0] - 1] + token.start[1], + line_len[token.end[0] - 1] + token.end[1] - 1, + ) + + +class ColorSpan(NamedTuple): + span: Span + tag: _colorize.ColorTag @functools.cache @@ -41,17 +78,207 @@ def unbracket(s: str, including_content: bool = False) -> str: return s.translate(ZERO_WIDTH_TRANS) -def disp_str(buffer: str) -> tuple[CharBuffer, CharWidths]: - r"""Decompose the input buffer into a printable variant. +def gen_colors(buffer: str) -> Iterator[ColorSpan]: + """Returns a list of index spans to color using the given color tag. + + The input `buffer` should be a valid start of a Python code block, i.e. + it cannot be a block starting in the middle of a multiline string. + """ + sio = StringIO(buffer) + line_lengths = [0] + [len(line) for line in sio.readlines()] + # make line_lengths cumulative + for i in range(1, len(line_lengths)): + line_lengths[i] += line_lengths[i-1] + + sio.seek(0) + gen = tokenize.generate_tokens(sio.readline) + last_emitted: ColorSpan | None = None + try: + for color in gen_colors_from_token_stream(gen, line_lengths): + yield color + last_emitted = color + except tokenize.TokenError as te: + yield from recover_unterminated_string( + te, line_lengths, last_emitted, buffer + ) + + +def recover_unterminated_string( + exc: tokenize.TokenError, + line_lengths: list[int], + last_emitted: ColorSpan | None, + buffer: str, +) -> Iterator[ColorSpan]: + msg, loc = exc.args + if loc is None: + return + + line_no, column = loc + + if msg.startswith( + ( + "unterminated string literal", + "unterminated f-string literal", + "unterminated t-string literal", + "EOF in multi-line string", + "unterminated triple-quoted f-string literal", + "unterminated triple-quoted t-string literal", + ) + ): + start = line_lengths[line_no - 1] + column - 1 + end = line_lengths[-1] - 1 + + # in case FSTRING_START was already emitted + if last_emitted and start <= last_emitted.span.start: + trace("before last emitted = {s}", s=start) + start = last_emitted.span.end + 1 + + span = Span(start, end) + trace("yielding span {a} -> {b}", a=span.start, b=span.end) + yield ColorSpan(span, "STRING") + else: + trace( + "unhandled token error({buffer}) = {te}", + buffer=repr(buffer), + te=str(exc), + ) + + +def gen_colors_from_token_stream( + token_generator: Iterator[TI], + line_lengths: list[int], +) -> Iterator[ColorSpan]: + token_window = prev_next_window(token_generator) + + is_def_name = False + bracket_level = 0 + for prev_token, token, next_token in token_window: + assert token is not None + if token.start == token.end: + continue + + match token.type: + case ( + T.STRING + | T.FSTRING_START | T.FSTRING_MIDDLE | T.FSTRING_END + | T.TSTRING_START | T.TSTRING_MIDDLE | T.TSTRING_END + ): + span = Span.from_token(token, line_lengths) + yield ColorSpan(span, "STRING") + case T.COMMENT: + span = Span.from_token(token, line_lengths) + yield ColorSpan(span, "COMMENT") + case T.NUMBER: + span = Span.from_token(token, line_lengths) + yield ColorSpan(span, "NUMBER") + case T.OP: + if token.string in "([{": + bracket_level += 1 + elif token.string in ")]}": + bracket_level -= 1 + span = Span.from_token(token, line_lengths) + yield ColorSpan(span, "OP") + case T.NAME: + if is_def_name: + is_def_name = False + span = Span.from_token(token, line_lengths) + yield ColorSpan(span, "DEFINITION") + elif keyword.iskeyword(token.string): + span = Span.from_token(token, line_lengths) + yield ColorSpan(span, "KEYWORD") + if token.string in IDENTIFIERS_AFTER: + is_def_name = True + elif ( + keyword.issoftkeyword(token.string) + and bracket_level == 0 + and is_soft_keyword_used(prev_token, token, next_token) + ): + span = Span.from_token(token, line_lengths) + yield ColorSpan(span, "SOFT_KEYWORD") + elif token.string in BUILTINS: + span = Span.from_token(token, line_lengths) + yield ColorSpan(span, "BUILTIN") + + +keyword_first_sets_match = {"False", "None", "True", "await", "lambda", "not"} +keyword_first_sets_case = {"False", "None", "True"} + + +def is_soft_keyword_used(*tokens: TI | None) -> bool: + """Returns True if the current token is a keyword in this context. + + For the `*tokens` to match anything, they have to be a three-tuple of + (previous, current, next). + """ + trace("is_soft_keyword_used{t}", t=tokens) + match tokens: + case ( + None | TI(T.NEWLINE) | TI(T.INDENT) | TI(string=":"), + TI(string="match"), + TI(T.NUMBER | T.STRING | T.FSTRING_START | T.TSTRING_START) + | TI(T.OP, string="(" | "*" | "[" | "{" | "~" | "...") + ): + return True + case ( + None | TI(T.NEWLINE) | TI(T.INDENT) | TI(string=":"), + TI(string="match"), + TI(T.NAME, string=s) + ): + if keyword.iskeyword(s): + return s in keyword_first_sets_match + return True + case ( + None | TI(T.NEWLINE) | TI(T.INDENT) | TI(string=":"), + TI(string="case"), + TI(T.NUMBER | T.STRING | T.FSTRING_START | T.TSTRING_START) + | TI(T.OP, string="(" | "*" | "-" | "[" | "{") + ): + return True + case ( + None | TI(T.NEWLINE) | TI(T.INDENT) | TI(string=":"), + TI(string="case"), + TI(T.NAME, string=s) + ): + if keyword.iskeyword(s): + return s in keyword_first_sets_case + return True + case (TI(string="case"), TI(string="_"), TI(string=":")): + return True + case _: + return False + + +def disp_str( + buffer: str, colors: list[ColorSpan] | None = None, start_index: int = 0 +) -> tuple[CharBuffer, CharWidths]: + r"""Decompose the input buffer into a printable variant with applied colors. Returns a tuple of two lists: - - the first list is the input buffer, character by character; + - the first list is the input buffer, character by character, with color + escape codes added (while those codes contain multiple ASCII characters, + each code is considered atomic *and is attached for the corresponding + visible character*); - the second list is the visible width of each character in the input buffer. + Note on colors: + - The `colors` list, if provided, is partially consumed within. We're using + a list and not a generator since we need to hold onto the current + unfinished span between calls to disp_str in case of multiline strings. + - The `colors` list is computed from the start of the input block. `buffer` + is only a subset of that input block, a single line within. This is why + we need `start_index` to inform us which position is the start of `buffer` + actually within user input. This allows us to match color spans correctly. + Examples: >>> utils.disp_str("a = 9") (['a', ' ', '=', ' ', '9'], [1, 1, 1, 1, 1]) + + >>> line = "while 1:" + >>> colors = list(utils.gen_colors(line)) + >>> utils.disp_str(line, colors=colors) + (['\x1b[1;34mw', 'h', 'i', 'l', 'e\x1b[0m', ' ', '1', ':'], [1, 1, 1, 1, 1, 1, 1, 1]) + """ chars: CharBuffer = [] char_widths: CharWidths = [] @@ -59,7 +286,20 @@ def disp_str(buffer: str) -> tuple[CharBuffer, CharWidths]: if not buffer: return chars, char_widths - for c in buffer: + while colors and colors[0].span.end < start_index: + # move past irrelevant spans + colors.pop(0) + + pre_color = "" + post_color = "" + if colors and colors[0].span.start < start_index: + # looks like we're continuing a previous color (e.g. a multiline str) + pre_color = _colorize.theme[colors[0].tag] + + for i, c in enumerate(buffer, start_index): + if colors and colors[0].span.start == i: # new color starts now + pre_color = _colorize.theme[colors[0].tag] + if c == "\x1a": # CTRL-Z on Windows chars.append(c) char_widths.append(2) @@ -73,5 +313,43 @@ def disp_str(buffer: str) -> tuple[CharBuffer, CharWidths]: else: chars.append(c) char_widths.append(str_width(c)) - trace("disp_str({buffer}) = {s}, {b}", buffer=repr(buffer), s=chars, b=char_widths) + + if colors and colors[0].span.end == i: # current color ends now + post_color = _colorize.theme["RESET"] + colors.pop(0) + + chars[-1] = pre_color + chars[-1] + post_color + pre_color = "" + post_color = "" + + if colors and colors[0].span.start < i and colors[0].span.end > i: + # even though the current color should be continued, reset it for now. + # the next call to `disp_str()` will revive it. + chars[-1] += _colorize.theme["RESET"] + return chars, char_widths + + +def prev_next_window[T]( + iterable: Iterable[T] +) -> Iterator[tuple[T | None, ...]]: + """Generates three-tuples of (previous, current, next) items. + + On the first iteration previous is None. On the last iteration next + is None. In case of exception next is None and the exception is re-raised + on a subsequent next() call. + + Inspired by `sliding_window` from `itertools` recipes. + """ + + iterator = iter(iterable) + window = deque((None, next(iterator)), maxlen=3) + try: + for x in iterator: + window.append(x) + yield tuple(window) + except Exception: + raise + finally: + window.append(None) + yield tuple(window) diff --git a/Lib/_pyrepl/windows_console.py b/Lib/_pyrepl/windows_console.py index 17942c8df07..77985e59a93 100644 --- a/Lib/_pyrepl/windows_console.py +++ b/Lib/_pyrepl/windows_console.py @@ -426,6 +426,20 @@ class WindowsConsole(Console): return rec + def _read_input_bulk( + self, block: bool, n: int + ) -> tuple[ctypes.Array[INPUT_RECORD], int]: + rec = (n * INPUT_RECORD)() + read = DWORD() + + if not block and not self.wait(timeout=0): + return rec, 0 + + if not ReadConsoleInput(InHandle, rec, n, read): + raise WinError(GetLastError()) + + return rec, read.value + def get_event(self, block: bool = True) -> Event | None: """Return an Event instance. Returns None if |block| is false and there is no event pending, otherwise waits for the @@ -521,7 +535,23 @@ class WindowsConsole(Console): def getpending(self) -> Event: """Return the characters that have been typed but not yet processed.""" - return Event("key", "", b"") + e = Event("key", "", b"") + + while not self.event_queue.empty(): + e2 = self.event_queue.get() + if e2: + e.data += e2.data + + recs, rec_count = self._read_input_bulk(False, 1024) + for i in range(rec_count): + rec = recs[i] + if rec and rec.EventType == KEY_EVENT: + key_event = rec.Event.KeyEvent + ch = key_event.uChar.UnicodeChar + if ch == "\r": + ch += "\n" + e.data += ch + return e def wait(self, timeout: float | None) -> bool: """Wait for an event.""" diff --git a/Lib/test/test_pyrepl/test_pyrepl.py b/Lib/test/test_pyrepl/test_pyrepl.py index 75a5afad562..93029ab6e08 100644 --- a/Lib/test/test_pyrepl/test_pyrepl.py +++ b/Lib/test/test_pyrepl/test_pyrepl.py @@ -45,6 +45,7 @@ class ReplTestCase(TestCase): cmdline_args: list[str] | None = None, cwd: str | None = None, skip: bool = False, + timeout: float = SHORT_TIMEOUT, ) -> tuple[str, int]: temp_dir = None if cwd is None: @@ -52,7 +53,12 @@ class ReplTestCase(TestCase): cwd = temp_dir.name try: return self._run_repl( - repl_input, env=env, cmdline_args=cmdline_args, cwd=cwd, skip=skip, + repl_input, + env=env, + cmdline_args=cmdline_args, + cwd=cwd, + skip=skip, + timeout=timeout, ) finally: if temp_dir is not None: @@ -66,6 +72,7 @@ class ReplTestCase(TestCase): cmdline_args: list[str] | None, cwd: str, skip: bool, + timeout: float, ) -> tuple[str, int]: assert pty master_fd, slave_fd = pty.openpty() @@ -103,7 +110,7 @@ class ReplTestCase(TestCase): os.write(master_fd, repl_input.encode("utf-8")) output = [] - while select.select([master_fd], [], [], SHORT_TIMEOUT)[0]: + while select.select([master_fd], [], [], timeout)[0]: try: data = os.read(master_fd, 1024).decode("utf-8") if not data: @@ -114,12 +121,12 @@ class ReplTestCase(TestCase): else: os.close(master_fd) process.kill() - process.wait(timeout=SHORT_TIMEOUT) + process.wait(timeout=timeout) self.fail(f"Timeout while waiting for output, got: {''.join(output)}") os.close(master_fd) try: - exit_code = process.wait(timeout=SHORT_TIMEOUT) + exit_code = process.wait(timeout=timeout) except subprocess.TimeoutExpired: process.kill() exit_code = process.wait() @@ -1561,25 +1568,29 @@ class TestMain(ReplTestCase): def test_history_survive_crash(self): env = os.environ.copy() - commands = "1\nexit()\n" - output, exit_code = self.run_repl(commands, env=env, skip=True) with tempfile.NamedTemporaryFile() as hfile: env["PYTHON_HISTORY"] = hfile.name - commands = "spam\nimport time\ntime.sleep(1000)\npreved\n" + + commands = "1\n2\n3\nexit()\n" + output, exit_code = self.run_repl(commands, env=env, skip=True) + + commands = "spam\nimport time\ntime.sleep(1000)\nquit\n" try: - self.run_repl(commands, env=env) + self.run_repl(commands, env=env, timeout=3) except AssertionError: pass history = pathlib.Path(hfile.name).read_text() + self.assertIn("2", history) + self.assertIn("exit()", history) self.assertIn("spam", history) - self.assertIn("time", history) + self.assertIn("import time", history) self.assertNotIn("sleep", history) - self.assertNotIn("preved", history) + self.assertNotIn("quit", history) def test_keyboard_interrupt_after_isearch(self): - output, exit_code = self.run_repl(["\x12", "\x03", "exit"]) + output, exit_code = self.run_repl("\x12\x03exit\n") self.assertEqual(exit_code, 0) def test_prompt_after_help(self): diff --git a/Lib/test/test_pyrepl/test_reader.py b/Lib/test/test_pyrepl/test_reader.py index 109cb603ae8..8d7fcf538d2 100644 --- a/Lib/test/test_pyrepl/test_reader.py +++ b/Lib/test/test_pyrepl/test_reader.py @@ -1,14 +1,21 @@ import itertools import functools import rlcompleter +from textwrap import dedent from unittest import TestCase from unittest.mock import MagicMock from .support import handle_all_events, handle_events_narrow_console from .support import ScreenEqualMixin, code_to_events -from .support import prepare_reader, prepare_console +from .support import prepare_console, reader_force_colors +from .support import reader_no_colors as prepare_reader from _pyrepl.console import Event from _pyrepl.reader import Reader +from _colorize import theme + + +overrides = {"RESET": "z", "SOFT_KEYWORD": "K"} +colors = {overrides.get(k, k[0].lower()): v for k, v in theme.items()} class TestReader(ScreenEqualMixin, TestCase): @@ -123,8 +130,9 @@ class TestReader(ScreenEqualMixin, TestCase): def test_control_characters(self): code = 'flag = "🏳️🌈"' events = code_to_events(code) - reader, _ = handle_all_events(events) + reader, _ = handle_all_events(events, prepare_reader=reader_force_colors) self.assert_screen_equal(reader, 'flag = "🏳️\\u200d🌈"', clean=True) + self.assert_screen_equal(reader, 'flag {o}={z} {s}"🏳️\\u200d🌈"{z}'.format(**colors)) def test_setpos_from_xy_multiple_lines(self): # fmt: off @@ -355,3 +363,140 @@ class TestReader(ScreenEqualMixin, TestCase): reader, _ = handle_all_events(events) reader.setpos_from_xy(8, 0) self.assertEqual(reader.pos, 7) + + def test_syntax_highlighting_basic(self): + code = dedent( + """\ + import re, sys + def funct(case: str = sys.platform) -> None: + match = re.search( + "(me)", + ''' + Come on + Come on now + You know that it's time to emerge + ''', + ) + match case: + case "emscripten": print("on the web") + case "ios" | "android": print("on the phone") + case _: print('arms around', match.group(1)) + """ + ) + expected = dedent( + """\ + {k}import{z} re{o},{z} sys + {a}{k}def{z} {d}funct{z}{o}({z}case{o}:{z} {b}str{z} {o}={z} sys{o}.{z}platform{o}){z} {o}->{z} {k}None{z}{o}:{z} + match {o}={z} re{o}.{z}search{o}({z} + {s}"(me)"{z}{o},{z} + {s}'''{z} + {s} Come on{z} + {s} Come on now{z} + {s} You know that it's time to emerge{z} + {s} '''{z}{o},{z} + {o}){z} + {K}match{z} case{o}:{z} + {K}case{z} {s}"emscripten"{z}{o}:{z} {b}print{z}{o}({z}{s}"on the web"{z}{o}){z} + {K}case{z} {s}"ios"{z} {o}|{z} {s}"android"{z}{o}:{z} {b}print{z}{o}({z}{s}"on the phone"{z}{o}){z} + {K}case{z} {K}_{z}{o}:{z} {b}print{z}{o}({z}{s}'arms around'{z}{o},{z} match{o}.{z}group{o}({z}{n}1{z}{o}){z}{o}){z} + """ + ) + expected_sync = expected.format(a="", **colors) + events = code_to_events(code) + reader, _ = handle_all_events(events, prepare_reader=reader_force_colors) + self.assert_screen_equal(reader, code, clean=True) + self.assert_screen_equal(reader, expected_sync) + self.assertEqual(reader.pos, 2**7 + 2**8) + self.assertEqual(reader.cxy, (0, 14)) + + async_msg = "{k}async{z} ".format(**colors) + expected_async = expected.format(a=async_msg, **colors) + more_events = itertools.chain( + code_to_events(code), + [Event(evt="key", data="up", raw=bytearray(b"\x1bOA"))] * 13, + code_to_events("async "), + ) + reader, _ = handle_all_events(more_events, prepare_reader=reader_force_colors) + self.assert_screen_equal(reader, expected_async) + self.assertEqual(reader.pos, 21) + self.assertEqual(reader.cxy, (6, 1)) + + def test_syntax_highlighting_incomplete_string_first_line(self): + code = dedent( + """\ + def unfinished_function(arg: str = "still typing + """ + ) + expected = dedent( + """\ + {k}def{z} {d}unfinished_function{z}{o}({z}arg{o}:{z} {b}str{z} {o}={z} {s}"still typing{z} + """ + ).format(**colors) + events = code_to_events(code) + reader, _ = handle_all_events(events, prepare_reader=reader_force_colors) + self.assert_screen_equal(reader, code, clean=True) + self.assert_screen_equal(reader, expected) + + def test_syntax_highlighting_incomplete_string_another_line(self): + code = dedent( + """\ + def unfinished_function( + arg: str = "still typing + """ + ) + expected = dedent( + """\ + {k}def{z} {d}unfinished_function{z}{o}({z} + arg{o}:{z} {b}str{z} {o}={z} {s}"still typing{z} + """ + ).format(**colors) + events = code_to_events(code) + reader, _ = handle_all_events(events, prepare_reader=reader_force_colors) + self.assert_screen_equal(reader, code, clean=True) + self.assert_screen_equal(reader, expected) + + def test_syntax_highlighting_incomplete_multiline_string(self): + code = dedent( + """\ + def unfinished_function(): + '''Still writing + the docstring + """ + ) + expected = dedent( + """\ + {k}def{z} {d}unfinished_function{z}{o}({z}{o}){z}{o}:{z} + {s}'''Still writing{z} + {s} the docstring{z} + """ + ).format(**colors) + events = code_to_events(code) + reader, _ = handle_all_events(events, prepare_reader=reader_force_colors) + self.assert_screen_equal(reader, code, clean=True) + self.assert_screen_equal(reader, expected) + + def test_syntax_highlighting_incomplete_fstring(self): + code = dedent( + """\ + def unfinished_function(): + var = f"Single-quote but { + 1 + + + 1 + } multi-line! + """ + ) + expected = dedent( + """\ + {k}def{z} {d}unfinished_function{z}{o}({z}{o}){z}{o}:{z} + var {o}={z} {s}f"{z}{s}Single-quote but {z}{o}{OB}{z} + {n}1{z} + {o}+{z} + {n}1{z} + {o}{CB}{z}{s} multi-line!{z} + """ + ).format(OB="{", CB="}", **colors) + events = code_to_events(code) + reader, _ = handle_all_events(events, prepare_reader=reader_force_colors) + self.assert_screen_equal(reader, code, clean=True) + self.assert_screen_equal(reader, expected) diff --git a/Lib/test/test_pyrepl/test_unix_console.py b/Lib/test/test_pyrepl/test_unix_console.py index 2f5c150402b..7acb84a94f7 100644 --- a/Lib/test/test_pyrepl/test_unix_console.py +++ b/Lib/test/test_pyrepl/test_unix_console.py @@ -33,10 +33,12 @@ def unix_console(events, **kwargs): handle_events_unix_console = partial( handle_all_events, - prepare_console=partial(unix_console), + prepare_reader=reader_no_colors, + prepare_console=unix_console, ) handle_events_narrow_unix_console = partial( handle_all_events, + prepare_reader=reader_no_colors, prepare_console=partial(unix_console, width=5), ) handle_events_short_unix_console = partial( diff --git a/Lib/test/test_pyrepl/test_utils.py b/Lib/test/test_pyrepl/test_utils.py index 0d59968206a..8ce1e537138 100644 --- a/Lib/test/test_pyrepl/test_utils.py +++ b/Lib/test/test_pyrepl/test_utils.py @@ -1,6 +1,6 @@ from unittest import TestCase -from _pyrepl.utils import str_width, wlen +from _pyrepl.utils import str_width, wlen, prev_next_window class TestUtils(TestCase): @@ -25,3 +25,38 @@ class TestUtils(TestCase): self.assertEqual(wlen('hello'), 5) self.assertEqual(wlen('hello' + '\x1a'), 7) + + def test_prev_next_window(self): + def gen_normal(): + yield 1 + yield 2 + yield 3 + yield 4 + + pnw = prev_next_window(gen_normal()) + self.assertEqual(next(pnw), (None, 1, 2)) + self.assertEqual(next(pnw), (1, 2, 3)) + self.assertEqual(next(pnw), (2, 3, 4)) + self.assertEqual(next(pnw), (3, 4, None)) + with self.assertRaises(StopIteration): + next(pnw) + + def gen_short(): + yield 1 + + pnw = prev_next_window(gen_short()) + self.assertEqual(next(pnw), (None, 1, None)) + with self.assertRaises(StopIteration): + next(pnw) + + def gen_raise(): + yield from gen_normal() + 1/0 + + pnw = prev_next_window(gen_raise()) + self.assertEqual(next(pnw), (None, 1, 2)) + self.assertEqual(next(pnw), (1, 2, 3)) + self.assertEqual(next(pnw), (2, 3, 4)) + self.assertEqual(next(pnw), (3, 4, None)) + with self.assertRaises(ZeroDivisionError): + next(pnw) diff --git a/Lib/test/test_pyrepl/test_windows_console.py b/Lib/test/test_pyrepl/test_windows_console.py index 69f2d5af2a4..e95fec46a85 100644 --- a/Lib/test/test_pyrepl/test_windows_console.py +++ b/Lib/test/test_pyrepl/test_windows_console.py @@ -12,6 +12,7 @@ from unittest import TestCase from unittest.mock import MagicMock, call from .support import handle_all_events, code_to_events +from .support import reader_no_colors as default_prepare_reader try: from _pyrepl.console import Event, Console @@ -47,14 +48,22 @@ class WindowsConsoleTests(TestCase): setattr(console, key, val) return console - def handle_events(self, events: Iterable[Event], **kwargs): - return handle_all_events(events, partial(self.console, **kwargs)) + def handle_events( + self, + events: Iterable[Event], + prepare_console=None, + prepare_reader=None, + **kwargs, + ): + prepare_console = prepare_console or partial(self.console, **kwargs) + prepare_reader = prepare_reader or default_prepare_reader + return handle_all_events(events, prepare_console, prepare_reader) def handle_events_narrow(self, events): return self.handle_events(events, width=5) - def handle_events_short(self, events): - return self.handle_events(events, height=1) + def handle_events_short(self, events, **kwargs): + return self.handle_events(events, height=1, **kwargs) def handle_events_height_3(self, events): return self.handle_events(events, height=3) diff --git a/Lib/token.py b/Lib/token.py index a1fde61cd8a..f61723cc09d 100644 --- a/Lib/token.py +++ b/Lib/token.py @@ -134,11 +134,11 @@ EXACT_TOKEN_TYPES = { '~': TILDE, } -def ISTERMINAL(x): +def ISTERMINAL(x: int) -> bool: return x < NT_OFFSET -def ISNONTERMINAL(x): +def ISNONTERMINAL(x: int) -> bool: return x >= NT_OFFSET -def ISEOF(x): +def ISEOF(x: int) -> bool: return x == ENDMARKER diff --git a/Misc/NEWS.d/next/Core_and_Builtins/2025-03-21-19-03-42.gh-issue-131507.q9fvyM.rst b/Misc/NEWS.d/next/Core_and_Builtins/2025-03-21-19-03-42.gh-issue-131507.q9fvyM.rst new file mode 100644 index 00000000000..354a116c533 --- /dev/null +++ b/Misc/NEWS.d/next/Core_and_Builtins/2025-03-21-19-03-42.gh-issue-131507.q9fvyM.rst @@ -0,0 +1 @@ +PyREPL now supports syntax highlighing. Contributed by Łukasz Langa. diff --git a/Misc/mypy/token.py b/Misc/mypy/token.py new file mode 120000 index 00000000000..0a39f726dda --- /dev/null +++ b/Misc/mypy/token.py @@ -0,0 +1 @@ +../../Lib/token.py
\ No newline at end of file diff --git a/Misc/mypy/typed-stdlib.txt b/Misc/mypy/typed-stdlib.txt index 9b27ee0d2de..07b88ba7445 100644 --- a/Misc/mypy/typed-stdlib.txt +++ b/Misc/mypy/typed-stdlib.txt @@ -2,4 +2,5 @@ _colorize.py _pyrepl +token.py tomllib
\ No newline at end of file diff --git a/Tools/build/generate_token.py b/Tools/build/generate_token.py index a64806763f3..9ee5ec86e75 100755 --- a/Tools/build/generate_token.py +++ b/Tools/build/generate_token.py @@ -278,13 +278,13 @@ EXACT_TOKEN_TYPES = { %s } -def ISTERMINAL(x): +def ISTERMINAL(x: int) -> bool: return x < NT_OFFSET -def ISNONTERMINAL(x): +def ISNONTERMINAL(x: int) -> bool: return x >= NT_OFFSET -def ISEOF(x): +def ISEOF(x: int) -> bool: return x == ENDMARKER ''' |