aboutsummaryrefslogtreecommitdiffstatshomepage
diff options
context:
space:
mode:
authorŁukasz Langa <lukasz@langa.pl>2025-05-02 20:22:31 +0200
committerGitHub <noreply@github.com>2025-05-02 20:22:31 +0200
commitfac41f56d4b6b858cb52b40529855cce85cdbdcc (patch)
tree70490d6d77240385c4ca99281c7e5333261e89dd
parentbfcbb28223b733b9cb88f152a059a9e1416f3467 (diff)
downloadcpython-fac41f56d4b6b858cb52b40529855cce85cdbdcc.tar.gz
cpython-fac41f56d4b6b858cb52b40529855cce85cdbdcc.zip
gh-131507: Add support for syntax highlighting in PyREPL (GH-133247)
Co-authored-by: Victorien <65306057+Viicos@users.noreply.github.com> Co-authored-by: Hugo van Kemenade <1324225+hugovk@users.noreply.github.com>
-rw-r--r--Doc/whatsnew/3.14.rst17
-rw-r--r--Lib/_colorize.py43
-rw-r--r--Lib/_pyrepl/_module_completer.py9
-rw-r--r--Lib/_pyrepl/commands.py29
-rw-r--r--Lib/_pyrepl/mypy.ini4
-rw-r--r--Lib/_pyrepl/reader.py49
-rw-r--r--Lib/_pyrepl/readline.py4
-rw-r--r--Lib/_pyrepl/simple_interact.py1
-rw-r--r--Lib/_pyrepl/unix_console.py20
-rw-r--r--Lib/_pyrepl/utils.py290
-rw-r--r--Lib/_pyrepl/windows_console.py32
-rw-r--r--Lib/test/test_pyrepl/test_pyrepl.py33
-rw-r--r--Lib/test/test_pyrepl/test_reader.py149
-rw-r--r--Lib/test/test_pyrepl/test_unix_console.py4
-rw-r--r--Lib/test/test_pyrepl/test_utils.py37
-rw-r--r--Lib/test/test_pyrepl/test_windows_console.py17
-rw-r--r--Lib/token.py6
-rw-r--r--Misc/NEWS.d/next/Core_and_Builtins/2025-03-21-19-03-42.gh-issue-131507.q9fvyM.rst1
l---------Misc/mypy/token.py1
-rw-r--r--Misc/mypy/typed-stdlib.txt1
-rwxr-xr-xTools/build/generate_token.py6
21 files changed, 654 insertions, 99 deletions
diff --git a/Doc/whatsnew/3.14.rst b/Doc/whatsnew/3.14.rst
index 8e8578f6a99..2f8b652d47e 100644
--- a/Doc/whatsnew/3.14.rst
+++ b/Doc/whatsnew/3.14.rst
@@ -560,6 +560,23 @@ For further information on how to build Python, see
(Contributed by Ken Jin in :gh:`128563`, with ideas on how to implement this
in CPython by Mark Shannon, Garrett Gu, Haoran Xu, and Josh Haberman.)
+Syntax highlighting in PyREPL
+-----------------------------
+
+The default :term:`interactive` shell now highlights Python syntax as you
+type. The feature is enabled by default unless the
+:envvar:`PYTHON_BASIC_REPL` environment is set or any color-disabling
+environment variables are used. See :ref:`using-on-controlling-color` for
+details.
+
+The default color theme for syntax highlighting strives for good contrast
+and uses exclusively the 4-bit VGA standard ANSI color codes for maximum
+compatibility. The theme can be customized using an experimental API
+``_colorize.set_theme()``. This can be called interactively, as well as
+in the :envvar:`PYTHONSTARTUP` script.
+
+(Contributed by Łukasz Langa in :gh:`131507`.)
+
Other language changes
======================
diff --git a/Lib/_colorize.py b/Lib/_colorize.py
index a39ff2ce5c1..54895488e74 100644
--- a/Lib/_colorize.py
+++ b/Lib/_colorize.py
@@ -7,7 +7,22 @@ COLORIZE = True
# types
if False:
- from typing import IO
+ from typing import IO, Literal
+
+ type ColorTag = Literal[
+ "PROMPT",
+ "KEYWORD",
+ "BUILTIN",
+ "COMMENT",
+ "STRING",
+ "NUMBER",
+ "OP",
+ "DEFINITION",
+ "SOFT_KEYWORD",
+ "RESET",
+ ]
+
+ theme: dict[ColorTag, str]
class ANSIColors:
@@ -23,6 +38,7 @@ class ANSIColors:
WHITE = "\x1b[37m" # more like LIGHT GRAY
YELLOW = "\x1b[33m"
+ BOLD = "\x1b[1m"
BOLD_BLACK = "\x1b[1;30m" # DARK GRAY
BOLD_BLUE = "\x1b[1;34m"
BOLD_CYAN = "\x1b[1;36m"
@@ -120,3 +136,28 @@ def can_colorize(*, file: IO[str] | IO[bytes] | None = None) -> bool:
return os.isatty(file.fileno())
except io.UnsupportedOperation:
return hasattr(file, "isatty") and file.isatty()
+
+
+def set_theme(t: dict[ColorTag, str] | None = None) -> None:
+ global theme
+
+ if t:
+ theme = t
+ return
+
+ colors = get_colors()
+ theme = {
+ "PROMPT": colors.BOLD_MAGENTA,
+ "KEYWORD": colors.BOLD_BLUE,
+ "BUILTIN": colors.CYAN,
+ "COMMENT": colors.RED,
+ "STRING": colors.GREEN,
+ "NUMBER": colors.YELLOW,
+ "OP": colors.RESET,
+ "DEFINITION": colors.BOLD,
+ "SOFT_KEYWORD": colors.BOLD_BLUE,
+ "RESET": colors.RESET,
+ }
+
+
+set_theme()
diff --git a/Lib/_pyrepl/_module_completer.py b/Lib/_pyrepl/_module_completer.py
index 1fb043e0b70..347f05607c7 100644
--- a/Lib/_pyrepl/_module_completer.py
+++ b/Lib/_pyrepl/_module_completer.py
@@ -2,6 +2,7 @@ from __future__ import annotations
import pkgutil
import sys
+import token
import tokenize
from io import StringIO
from contextlib import contextmanager
@@ -180,8 +181,8 @@ class ImportParser:
when parsing multiple statements.
"""
_ignored_tokens = {
- tokenize.INDENT, tokenize.DEDENT, tokenize.COMMENT,
- tokenize.NL, tokenize.NEWLINE, tokenize.ENDMARKER
+ token.INDENT, token.DEDENT, token.COMMENT,
+ token.NL, token.NEWLINE, token.ENDMARKER
}
_keywords = {'import', 'from', 'as'}
@@ -350,11 +351,11 @@ class TokenQueue:
def peek_name(self) -> bool:
if not (tok := self.peek()):
return False
- return tok.type == tokenize.NAME
+ return tok.type == token.NAME
def pop_name(self) -> str:
tok = self.pop()
- if tok.type != tokenize.NAME:
+ if tok.type != token.NAME:
raise ParseError('pop_name')
return tok.string
diff --git a/Lib/_pyrepl/commands.py b/Lib/_pyrepl/commands.py
index cbb6d85f683..2054a8e400f 100644
--- a/Lib/_pyrepl/commands.py
+++ b/Lib/_pyrepl/commands.py
@@ -21,6 +21,7 @@
from __future__ import annotations
import os
+import time
# Categories of actions:
# killing
@@ -31,6 +32,7 @@ import os
# finishing
# [completion]
+from .trace import trace
# types
if False:
@@ -471,19 +473,24 @@ class show_history(Command):
class paste_mode(Command):
-
def do(self) -> None:
self.reader.paste_mode = not self.reader.paste_mode
self.reader.dirty = True
-class enable_bracketed_paste(Command):
- def do(self) -> None:
- self.reader.paste_mode = True
- self.reader.in_bracketed_paste = True
-
-class disable_bracketed_paste(Command):
- def do(self) -> None:
- self.reader.paste_mode = False
- self.reader.in_bracketed_paste = False
- self.reader.dirty = True
+class perform_bracketed_paste(Command):
+ def do(self) -> None:
+ done = "\x1b[201~"
+ data = ""
+ start = time.time()
+ while done not in data:
+ self.reader.console.wait(100)
+ ev = self.reader.console.getpending()
+ data += ev.data
+ trace(
+ "bracketed pasting of {l} chars done in {s:.2f}s",
+ l=len(data),
+ s=time.time() - start,
+ )
+ self.reader.insert(data.replace(done, ""))
+ self.reader.last_refresh_cache.invalidated = True
diff --git a/Lib/_pyrepl/mypy.ini b/Lib/_pyrepl/mypy.ini
index eabd0e9b440..9375a55b53c 100644
--- a/Lib/_pyrepl/mypy.ini
+++ b/Lib/_pyrepl/mypy.ini
@@ -23,7 +23,3 @@ check_untyped_defs = False
# Various internal modules that typeshed deliberately doesn't have stubs for:
[mypy-_abc.*,_opcode.*,_overlapped.*,_testcapi.*,_testinternalcapi.*,test.*]
ignore_missing_imports = True
-
-# Other untyped parts of the stdlib
-[mypy-idlelib.*]
-ignore_missing_imports = True
diff --git a/Lib/_pyrepl/reader.py b/Lib/_pyrepl/reader.py
index 7fc2422dac9..65c2230dfd6 100644
--- a/Lib/_pyrepl/reader.py
+++ b/Lib/_pyrepl/reader.py
@@ -22,14 +22,13 @@
from __future__ import annotations
import sys
+import _colorize
from contextlib import contextmanager
from dataclasses import dataclass, field, fields
-from _colorize import can_colorize, ANSIColors
-
from . import commands, console, input
-from .utils import wlen, unbracket, disp_str
+from .utils import wlen, unbracket, disp_str, gen_colors
from .trace import trace
@@ -38,8 +37,7 @@ Command = commands.Command
from .types import Callback, SimpleContextManager, KeySpec, CommandName
-# syntax classes:
-
+# syntax classes
SYNTAX_WHITESPACE, SYNTAX_WORD, SYNTAX_SYMBOL = range(3)
@@ -105,8 +103,7 @@ default_keymap: tuple[tuple[KeySpec, CommandName], ...] = tuple(
(r"\M-9", "digit-arg"),
(r"\M-\n", "accept"),
("\\\\", "self-insert"),
- (r"\x1b[200~", "enable_bracketed_paste"),
- (r"\x1b[201~", "disable_bracketed_paste"),
+ (r"\x1b[200~", "perform-bracketed-paste"),
(r"\x03", "ctrl-c"),
]
+ [(c, "self-insert") for c in map(chr, range(32, 127)) if c != "\\"]
@@ -144,16 +141,17 @@ class Reader:
Instance variables of note include:
* buffer:
- A *list* (*not* a string at the moment :-) containing all the
- characters that have been entered.
+ A per-character list containing all the characters that have been
+ entered. Does not include color information.
* console:
Hopefully encapsulates the OS dependent stuff.
* pos:
A 0-based index into 'buffer' for where the insertion point
is.
* screeninfo:
- Ahem. This list contains some info needed to move the
- insertion point around reasonably efficiently.
+ A list of screen position tuples. Each list element is a tuple
+ representing information on visible line length for a given line.
+ Allows for efficient skipping of color escape sequences.
* cxy, lxy:
the position of the insertion point in screen ...
* syntax_table:
@@ -203,7 +201,6 @@ class Reader:
dirty: bool = False
finished: bool = False
paste_mode: bool = False
- in_bracketed_paste: bool = False
commands: dict[str, type[Command]] = field(default_factory=make_default_commands)
last_command: type[Command] | None = None
syntax_table: dict[str, int] = field(default_factory=make_default_syntax_table)
@@ -221,7 +218,6 @@ class Reader:
## cached metadata to speed up screen refreshes
@dataclass
class RefreshCache:
- in_bracketed_paste: bool = False
screen: list[str] = field(default_factory=list)
screeninfo: list[tuple[int, list[int]]] = field(init=False)
line_end_offsets: list[int] = field(default_factory=list)
@@ -235,7 +231,6 @@ class Reader:
screen: list[str],
screeninfo: list[tuple[int, list[int]]],
) -> None:
- self.in_bracketed_paste = reader.in_bracketed_paste
self.screen = screen.copy()
self.screeninfo = screeninfo.copy()
self.pos = reader.pos
@@ -248,8 +243,7 @@ class Reader:
return False
dimensions = reader.console.width, reader.console.height
dimensions_changed = dimensions != self.dimensions
- paste_changed = reader.in_bracketed_paste != self.in_bracketed_paste
- return not (dimensions_changed or paste_changed)
+ return not dimensions_changed
def get_cached_location(self, reader: Reader) -> tuple[int, int]:
if self.invalidated:
@@ -279,7 +273,7 @@ class Reader:
self.screeninfo = [(0, [])]
self.cxy = self.pos2xy()
self.lxy = (self.pos, 0)
- self.can_colorize = can_colorize()
+ self.can_colorize = _colorize.can_colorize()
self.last_refresh_cache.screeninfo = self.screeninfo
self.last_refresh_cache.pos = self.pos
@@ -316,6 +310,12 @@ class Reader:
pos -= offset
prompt_from_cache = (offset and self.buffer[offset - 1] != "\n")
+
+ if self.can_colorize:
+ colors = list(gen_colors(self.get_unicode()))
+ else:
+ colors = None
+ trace("colors = {colors}", colors=colors)
lines = "".join(self.buffer[offset:]).split("\n")
cursor_found = False
lines_beyond_cursor = 0
@@ -343,9 +343,8 @@ class Reader:
screeninfo.append((0, []))
pos -= line_len + 1
prompt, prompt_len = self.process_prompt(prompt)
- chars, char_widths = disp_str(line)
+ chars, char_widths = disp_str(line, colors, offset)
wrapcount = (sum(char_widths) + prompt_len) // self.console.width
- trace("wrapcount = {wrapcount}", wrapcount=wrapcount)
if wrapcount == 0 or not char_widths:
offset += line_len + 1 # Takes all of the line plus the newline
last_refresh_line_end_offsets.append(offset)
@@ -479,7 +478,7 @@ class Reader:
'lineno'."""
if self.arg is not None and cursor_on_line:
prompt = f"(arg: {self.arg}) "
- elif self.paste_mode and not self.in_bracketed_paste:
+ elif self.paste_mode:
prompt = "(paste) "
elif "\n" in self.buffer:
if lineno == 0:
@@ -492,7 +491,11 @@ class Reader:
prompt = self.ps1
if self.can_colorize:
- prompt = f"{ANSIColors.BOLD_MAGENTA}{prompt}{ANSIColors.RESET}"
+ prompt = (
+ f"{_colorize.theme["PROMPT"]}"
+ f"{prompt}"
+ f"{_colorize.theme["RESET"]}"
+ )
return prompt
def push_input_trans(self, itrans: input.KeymapTranslator) -> None:
@@ -567,6 +570,7 @@ class Reader:
def update_cursor(self) -> None:
"""Move the cursor to reflect changes in self.pos"""
self.cxy = self.pos2xy()
+ trace("update_cursor({pos}) = {cxy}", pos=self.pos, cxy=self.cxy)
self.console.move_cursor(*self.cxy)
def after_command(self, cmd: Command) -> None:
@@ -633,9 +637,6 @@ class Reader:
def refresh(self) -> None:
"""Recalculate and refresh the screen."""
- if self.in_bracketed_paste and self.buffer and not self.buffer[-1] == "\n":
- return
-
# this call sets up self.cxy, so call it first.
self.screen = self.calc_screen()
self.console.refresh(self.screen, self.cxy)
diff --git a/Lib/_pyrepl/readline.py b/Lib/_pyrepl/readline.py
index 9d58829faf1..560a9db1921 100644
--- a/Lib/_pyrepl/readline.py
+++ b/Lib/_pyrepl/readline.py
@@ -276,10 +276,6 @@ class maybe_accept(commands.Command):
r = self.reader # type: ignore[assignment]
r.dirty = True # this is needed to hide the completion menu, if visible
- if self.reader.in_bracketed_paste:
- r.insert("\n")
- return
-
# if there are already several lines and the cursor
# is not on the last one, always insert a new \n.
text = r.get_unicode()
diff --git a/Lib/_pyrepl/simple_interact.py b/Lib/_pyrepl/simple_interact.py
index 4c74466118b..e2274629b65 100644
--- a/Lib/_pyrepl/simple_interact.py
+++ b/Lib/_pyrepl/simple_interact.py
@@ -157,7 +157,6 @@ def run_multiline_interactive_console(
r.pos = len(r.get_unicode())
r.dirty = True
r.refresh()
- r.in_bracketed_paste = False
console.write("\nKeyboardInterrupt\n")
console.resetbuffer()
except MemoryError:
diff --git a/Lib/_pyrepl/unix_console.py b/Lib/_pyrepl/unix_console.py
index 96379bc20f3..07b160d2324 100644
--- a/Lib/_pyrepl/unix_console.py
+++ b/Lib/_pyrepl/unix_console.py
@@ -150,8 +150,6 @@ class UnixConsole(Console):
self.pollob = poll()
self.pollob.register(self.input_fd, select.POLLIN)
- self.input_buffer = b""
- self.input_buffer_pos = 0
curses.setupterm(term or None, self.output_fd)
self.term = term
@@ -199,22 +197,8 @@ class UnixConsole(Console):
self.event_queue = EventQueue(self.input_fd, self.encoding)
self.cursor_visible = 1
- def more_in_buffer(self) -> bool:
- return bool(
- self.input_buffer
- and self.input_buffer_pos < len(self.input_buffer)
- )
-
def __read(self, n: int) -> bytes:
- if not self.more_in_buffer():
- self.input_buffer = os.read(self.input_fd, 10000)
-
- ret = self.input_buffer[self.input_buffer_pos : self.input_buffer_pos + n]
- self.input_buffer_pos += len(ret)
- if self.input_buffer_pos >= len(self.input_buffer):
- self.input_buffer = b""
- self.input_buffer_pos = 0
- return ret
+ return os.read(self.input_fd, n)
def change_encoding(self, encoding: str) -> None:
@@ -422,7 +406,6 @@ class UnixConsole(Console):
"""
return (
not self.event_queue.empty()
- or self.more_in_buffer()
or bool(self.pollob.poll(timeout))
)
@@ -525,6 +508,7 @@ class UnixConsole(Console):
e.raw += e.raw
amount = struct.unpack("i", ioctl(self.input_fd, FIONREAD, b"\0\0\0\0"))[0]
+ trace("getpending({a})", a=amount)
raw = self.__read(amount)
data = str(raw, self.encoding, "replace")
e.data += data
diff --git a/Lib/_pyrepl/utils.py b/Lib/_pyrepl/utils.py
index 7437fbe1ab9..fe154aa59a0 100644
--- a/Lib/_pyrepl/utils.py
+++ b/Lib/_pyrepl/utils.py
@@ -1,6 +1,17 @@
+from __future__ import annotations
+import builtins
+import functools
+import keyword
import re
+import token as T
+import tokenize
import unicodedata
-import functools
+import _colorize
+
+from collections import deque
+from io import StringIO
+from tokenize import TokenInfo as TI
+from typing import Iterable, Iterator, Match, NamedTuple, Self
from .types import CharBuffer, CharWidths
from .trace import trace
@@ -8,6 +19,32 @@ from .trace import trace
ANSI_ESCAPE_SEQUENCE = re.compile(r"\x1b\[[ -@]*[A-~]")
ZERO_WIDTH_BRACKET = re.compile(r"\x01.*?\x02")
ZERO_WIDTH_TRANS = str.maketrans({"\x01": "", "\x02": ""})
+IDENTIFIERS_AFTER = {"def", "class"}
+BUILTINS = {str(name) for name in dir(builtins) if not name.startswith('_')}
+
+
+class Span(NamedTuple):
+ """Span indexing that's inclusive on both ends."""
+
+ start: int
+ end: int
+
+ @classmethod
+ def from_re(cls, m: Match[str], group: int | str) -> Self:
+ re_span = m.span(group)
+ return cls(re_span[0], re_span[1] - 1)
+
+ @classmethod
+ def from_token(cls, token: TI, line_len: list[int]) -> Self:
+ return cls(
+ line_len[token.start[0] - 1] + token.start[1],
+ line_len[token.end[0] - 1] + token.end[1] - 1,
+ )
+
+
+class ColorSpan(NamedTuple):
+ span: Span
+ tag: _colorize.ColorTag
@functools.cache
@@ -41,17 +78,207 @@ def unbracket(s: str, including_content: bool = False) -> str:
return s.translate(ZERO_WIDTH_TRANS)
-def disp_str(buffer: str) -> tuple[CharBuffer, CharWidths]:
- r"""Decompose the input buffer into a printable variant.
+def gen_colors(buffer: str) -> Iterator[ColorSpan]:
+ """Returns a list of index spans to color using the given color tag.
+
+ The input `buffer` should be a valid start of a Python code block, i.e.
+ it cannot be a block starting in the middle of a multiline string.
+ """
+ sio = StringIO(buffer)
+ line_lengths = [0] + [len(line) for line in sio.readlines()]
+ # make line_lengths cumulative
+ for i in range(1, len(line_lengths)):
+ line_lengths[i] += line_lengths[i-1]
+
+ sio.seek(0)
+ gen = tokenize.generate_tokens(sio.readline)
+ last_emitted: ColorSpan | None = None
+ try:
+ for color in gen_colors_from_token_stream(gen, line_lengths):
+ yield color
+ last_emitted = color
+ except tokenize.TokenError as te:
+ yield from recover_unterminated_string(
+ te, line_lengths, last_emitted, buffer
+ )
+
+
+def recover_unterminated_string(
+ exc: tokenize.TokenError,
+ line_lengths: list[int],
+ last_emitted: ColorSpan | None,
+ buffer: str,
+) -> Iterator[ColorSpan]:
+ msg, loc = exc.args
+ if loc is None:
+ return
+
+ line_no, column = loc
+
+ if msg.startswith(
+ (
+ "unterminated string literal",
+ "unterminated f-string literal",
+ "unterminated t-string literal",
+ "EOF in multi-line string",
+ "unterminated triple-quoted f-string literal",
+ "unterminated triple-quoted t-string literal",
+ )
+ ):
+ start = line_lengths[line_no - 1] + column - 1
+ end = line_lengths[-1] - 1
+
+ # in case FSTRING_START was already emitted
+ if last_emitted and start <= last_emitted.span.start:
+ trace("before last emitted = {s}", s=start)
+ start = last_emitted.span.end + 1
+
+ span = Span(start, end)
+ trace("yielding span {a} -> {b}", a=span.start, b=span.end)
+ yield ColorSpan(span, "STRING")
+ else:
+ trace(
+ "unhandled token error({buffer}) = {te}",
+ buffer=repr(buffer),
+ te=str(exc),
+ )
+
+
+def gen_colors_from_token_stream(
+ token_generator: Iterator[TI],
+ line_lengths: list[int],
+) -> Iterator[ColorSpan]:
+ token_window = prev_next_window(token_generator)
+
+ is_def_name = False
+ bracket_level = 0
+ for prev_token, token, next_token in token_window:
+ assert token is not None
+ if token.start == token.end:
+ continue
+
+ match token.type:
+ case (
+ T.STRING
+ | T.FSTRING_START | T.FSTRING_MIDDLE | T.FSTRING_END
+ | T.TSTRING_START | T.TSTRING_MIDDLE | T.TSTRING_END
+ ):
+ span = Span.from_token(token, line_lengths)
+ yield ColorSpan(span, "STRING")
+ case T.COMMENT:
+ span = Span.from_token(token, line_lengths)
+ yield ColorSpan(span, "COMMENT")
+ case T.NUMBER:
+ span = Span.from_token(token, line_lengths)
+ yield ColorSpan(span, "NUMBER")
+ case T.OP:
+ if token.string in "([{":
+ bracket_level += 1
+ elif token.string in ")]}":
+ bracket_level -= 1
+ span = Span.from_token(token, line_lengths)
+ yield ColorSpan(span, "OP")
+ case T.NAME:
+ if is_def_name:
+ is_def_name = False
+ span = Span.from_token(token, line_lengths)
+ yield ColorSpan(span, "DEFINITION")
+ elif keyword.iskeyword(token.string):
+ span = Span.from_token(token, line_lengths)
+ yield ColorSpan(span, "KEYWORD")
+ if token.string in IDENTIFIERS_AFTER:
+ is_def_name = True
+ elif (
+ keyword.issoftkeyword(token.string)
+ and bracket_level == 0
+ and is_soft_keyword_used(prev_token, token, next_token)
+ ):
+ span = Span.from_token(token, line_lengths)
+ yield ColorSpan(span, "SOFT_KEYWORD")
+ elif token.string in BUILTINS:
+ span = Span.from_token(token, line_lengths)
+ yield ColorSpan(span, "BUILTIN")
+
+
+keyword_first_sets_match = {"False", "None", "True", "await", "lambda", "not"}
+keyword_first_sets_case = {"False", "None", "True"}
+
+
+def is_soft_keyword_used(*tokens: TI | None) -> bool:
+ """Returns True if the current token is a keyword in this context.
+
+ For the `*tokens` to match anything, they have to be a three-tuple of
+ (previous, current, next).
+ """
+ trace("is_soft_keyword_used{t}", t=tokens)
+ match tokens:
+ case (
+ None | TI(T.NEWLINE) | TI(T.INDENT) | TI(string=":"),
+ TI(string="match"),
+ TI(T.NUMBER | T.STRING | T.FSTRING_START | T.TSTRING_START)
+ | TI(T.OP, string="(" | "*" | "[" | "{" | "~" | "...")
+ ):
+ return True
+ case (
+ None | TI(T.NEWLINE) | TI(T.INDENT) | TI(string=":"),
+ TI(string="match"),
+ TI(T.NAME, string=s)
+ ):
+ if keyword.iskeyword(s):
+ return s in keyword_first_sets_match
+ return True
+ case (
+ None | TI(T.NEWLINE) | TI(T.INDENT) | TI(string=":"),
+ TI(string="case"),
+ TI(T.NUMBER | T.STRING | T.FSTRING_START | T.TSTRING_START)
+ | TI(T.OP, string="(" | "*" | "-" | "[" | "{")
+ ):
+ return True
+ case (
+ None | TI(T.NEWLINE) | TI(T.INDENT) | TI(string=":"),
+ TI(string="case"),
+ TI(T.NAME, string=s)
+ ):
+ if keyword.iskeyword(s):
+ return s in keyword_first_sets_case
+ return True
+ case (TI(string="case"), TI(string="_"), TI(string=":")):
+ return True
+ case _:
+ return False
+
+
+def disp_str(
+ buffer: str, colors: list[ColorSpan] | None = None, start_index: int = 0
+) -> tuple[CharBuffer, CharWidths]:
+ r"""Decompose the input buffer into a printable variant with applied colors.
Returns a tuple of two lists:
- - the first list is the input buffer, character by character;
+ - the first list is the input buffer, character by character, with color
+ escape codes added (while those codes contain multiple ASCII characters,
+ each code is considered atomic *and is attached for the corresponding
+ visible character*);
- the second list is the visible width of each character in the input
buffer.
+ Note on colors:
+ - The `colors` list, if provided, is partially consumed within. We're using
+ a list and not a generator since we need to hold onto the current
+ unfinished span between calls to disp_str in case of multiline strings.
+ - The `colors` list is computed from the start of the input block. `buffer`
+ is only a subset of that input block, a single line within. This is why
+ we need `start_index` to inform us which position is the start of `buffer`
+ actually within user input. This allows us to match color spans correctly.
+
Examples:
>>> utils.disp_str("a = 9")
(['a', ' ', '=', ' ', '9'], [1, 1, 1, 1, 1])
+
+ >>> line = "while 1:"
+ >>> colors = list(utils.gen_colors(line))
+ >>> utils.disp_str(line, colors=colors)
+ (['\x1b[1;34mw', 'h', 'i', 'l', 'e\x1b[0m', ' ', '1', ':'], [1, 1, 1, 1, 1, 1, 1, 1])
+
"""
chars: CharBuffer = []
char_widths: CharWidths = []
@@ -59,7 +286,20 @@ def disp_str(buffer: str) -> tuple[CharBuffer, CharWidths]:
if not buffer:
return chars, char_widths
- for c in buffer:
+ while colors and colors[0].span.end < start_index:
+ # move past irrelevant spans
+ colors.pop(0)
+
+ pre_color = ""
+ post_color = ""
+ if colors and colors[0].span.start < start_index:
+ # looks like we're continuing a previous color (e.g. a multiline str)
+ pre_color = _colorize.theme[colors[0].tag]
+
+ for i, c in enumerate(buffer, start_index):
+ if colors and colors[0].span.start == i: # new color starts now
+ pre_color = _colorize.theme[colors[0].tag]
+
if c == "\x1a": # CTRL-Z on Windows
chars.append(c)
char_widths.append(2)
@@ -73,5 +313,43 @@ def disp_str(buffer: str) -> tuple[CharBuffer, CharWidths]:
else:
chars.append(c)
char_widths.append(str_width(c))
- trace("disp_str({buffer}) = {s}, {b}", buffer=repr(buffer), s=chars, b=char_widths)
+
+ if colors and colors[0].span.end == i: # current color ends now
+ post_color = _colorize.theme["RESET"]
+ colors.pop(0)
+
+ chars[-1] = pre_color + chars[-1] + post_color
+ pre_color = ""
+ post_color = ""
+
+ if colors and colors[0].span.start < i and colors[0].span.end > i:
+ # even though the current color should be continued, reset it for now.
+ # the next call to `disp_str()` will revive it.
+ chars[-1] += _colorize.theme["RESET"]
+
return chars, char_widths
+
+
+def prev_next_window[T](
+ iterable: Iterable[T]
+) -> Iterator[tuple[T | None, ...]]:
+ """Generates three-tuples of (previous, current, next) items.
+
+ On the first iteration previous is None. On the last iteration next
+ is None. In case of exception next is None and the exception is re-raised
+ on a subsequent next() call.
+
+ Inspired by `sliding_window` from `itertools` recipes.
+ """
+
+ iterator = iter(iterable)
+ window = deque((None, next(iterator)), maxlen=3)
+ try:
+ for x in iterator:
+ window.append(x)
+ yield tuple(window)
+ except Exception:
+ raise
+ finally:
+ window.append(None)
+ yield tuple(window)
diff --git a/Lib/_pyrepl/windows_console.py b/Lib/_pyrepl/windows_console.py
index 17942c8df07..77985e59a93 100644
--- a/Lib/_pyrepl/windows_console.py
+++ b/Lib/_pyrepl/windows_console.py
@@ -426,6 +426,20 @@ class WindowsConsole(Console):
return rec
+ def _read_input_bulk(
+ self, block: bool, n: int
+ ) -> tuple[ctypes.Array[INPUT_RECORD], int]:
+ rec = (n * INPUT_RECORD)()
+ read = DWORD()
+
+ if not block and not self.wait(timeout=0):
+ return rec, 0
+
+ if not ReadConsoleInput(InHandle, rec, n, read):
+ raise WinError(GetLastError())
+
+ return rec, read.value
+
def get_event(self, block: bool = True) -> Event | None:
"""Return an Event instance. Returns None if |block| is false
and there is no event pending, otherwise waits for the
@@ -521,7 +535,23 @@ class WindowsConsole(Console):
def getpending(self) -> Event:
"""Return the characters that have been typed but not yet
processed."""
- return Event("key", "", b"")
+ e = Event("key", "", b"")
+
+ while not self.event_queue.empty():
+ e2 = self.event_queue.get()
+ if e2:
+ e.data += e2.data
+
+ recs, rec_count = self._read_input_bulk(False, 1024)
+ for i in range(rec_count):
+ rec = recs[i]
+ if rec and rec.EventType == KEY_EVENT:
+ key_event = rec.Event.KeyEvent
+ ch = key_event.uChar.UnicodeChar
+ if ch == "\r":
+ ch += "\n"
+ e.data += ch
+ return e
def wait(self, timeout: float | None) -> bool:
"""Wait for an event."""
diff --git a/Lib/test/test_pyrepl/test_pyrepl.py b/Lib/test/test_pyrepl/test_pyrepl.py
index 75a5afad562..93029ab6e08 100644
--- a/Lib/test/test_pyrepl/test_pyrepl.py
+++ b/Lib/test/test_pyrepl/test_pyrepl.py
@@ -45,6 +45,7 @@ class ReplTestCase(TestCase):
cmdline_args: list[str] | None = None,
cwd: str | None = None,
skip: bool = False,
+ timeout: float = SHORT_TIMEOUT,
) -> tuple[str, int]:
temp_dir = None
if cwd is None:
@@ -52,7 +53,12 @@ class ReplTestCase(TestCase):
cwd = temp_dir.name
try:
return self._run_repl(
- repl_input, env=env, cmdline_args=cmdline_args, cwd=cwd, skip=skip,
+ repl_input,
+ env=env,
+ cmdline_args=cmdline_args,
+ cwd=cwd,
+ skip=skip,
+ timeout=timeout,
)
finally:
if temp_dir is not None:
@@ -66,6 +72,7 @@ class ReplTestCase(TestCase):
cmdline_args: list[str] | None,
cwd: str,
skip: bool,
+ timeout: float,
) -> tuple[str, int]:
assert pty
master_fd, slave_fd = pty.openpty()
@@ -103,7 +110,7 @@ class ReplTestCase(TestCase):
os.write(master_fd, repl_input.encode("utf-8"))
output = []
- while select.select([master_fd], [], [], SHORT_TIMEOUT)[0]:
+ while select.select([master_fd], [], [], timeout)[0]:
try:
data = os.read(master_fd, 1024).decode("utf-8")
if not data:
@@ -114,12 +121,12 @@ class ReplTestCase(TestCase):
else:
os.close(master_fd)
process.kill()
- process.wait(timeout=SHORT_TIMEOUT)
+ process.wait(timeout=timeout)
self.fail(f"Timeout while waiting for output, got: {''.join(output)}")
os.close(master_fd)
try:
- exit_code = process.wait(timeout=SHORT_TIMEOUT)
+ exit_code = process.wait(timeout=timeout)
except subprocess.TimeoutExpired:
process.kill()
exit_code = process.wait()
@@ -1561,25 +1568,29 @@ class TestMain(ReplTestCase):
def test_history_survive_crash(self):
env = os.environ.copy()
- commands = "1\nexit()\n"
- output, exit_code = self.run_repl(commands, env=env, skip=True)
with tempfile.NamedTemporaryFile() as hfile:
env["PYTHON_HISTORY"] = hfile.name
- commands = "spam\nimport time\ntime.sleep(1000)\npreved\n"
+
+ commands = "1\n2\n3\nexit()\n"
+ output, exit_code = self.run_repl(commands, env=env, skip=True)
+
+ commands = "spam\nimport time\ntime.sleep(1000)\nquit\n"
try:
- self.run_repl(commands, env=env)
+ self.run_repl(commands, env=env, timeout=3)
except AssertionError:
pass
history = pathlib.Path(hfile.name).read_text()
+ self.assertIn("2", history)
+ self.assertIn("exit()", history)
self.assertIn("spam", history)
- self.assertIn("time", history)
+ self.assertIn("import time", history)
self.assertNotIn("sleep", history)
- self.assertNotIn("preved", history)
+ self.assertNotIn("quit", history)
def test_keyboard_interrupt_after_isearch(self):
- output, exit_code = self.run_repl(["\x12", "\x03", "exit"])
+ output, exit_code = self.run_repl("\x12\x03exit\n")
self.assertEqual(exit_code, 0)
def test_prompt_after_help(self):
diff --git a/Lib/test/test_pyrepl/test_reader.py b/Lib/test/test_pyrepl/test_reader.py
index 109cb603ae8..8d7fcf538d2 100644
--- a/Lib/test/test_pyrepl/test_reader.py
+++ b/Lib/test/test_pyrepl/test_reader.py
@@ -1,14 +1,21 @@
import itertools
import functools
import rlcompleter
+from textwrap import dedent
from unittest import TestCase
from unittest.mock import MagicMock
from .support import handle_all_events, handle_events_narrow_console
from .support import ScreenEqualMixin, code_to_events
-from .support import prepare_reader, prepare_console
+from .support import prepare_console, reader_force_colors
+from .support import reader_no_colors as prepare_reader
from _pyrepl.console import Event
from _pyrepl.reader import Reader
+from _colorize import theme
+
+
+overrides = {"RESET": "z", "SOFT_KEYWORD": "K"}
+colors = {overrides.get(k, k[0].lower()): v for k, v in theme.items()}
class TestReader(ScreenEqualMixin, TestCase):
@@ -123,8 +130,9 @@ class TestReader(ScreenEqualMixin, TestCase):
def test_control_characters(self):
code = 'flag = "🏳️‍🌈"'
events = code_to_events(code)
- reader, _ = handle_all_events(events)
+ reader, _ = handle_all_events(events, prepare_reader=reader_force_colors)
self.assert_screen_equal(reader, 'flag = "🏳️\\u200d🌈"', clean=True)
+ self.assert_screen_equal(reader, 'flag {o}={z} {s}"🏳️\\u200d🌈"{z}'.format(**colors))
def test_setpos_from_xy_multiple_lines(self):
# fmt: off
@@ -355,3 +363,140 @@ class TestReader(ScreenEqualMixin, TestCase):
reader, _ = handle_all_events(events)
reader.setpos_from_xy(8, 0)
self.assertEqual(reader.pos, 7)
+
+ def test_syntax_highlighting_basic(self):
+ code = dedent(
+ """\
+ import re, sys
+ def funct(case: str = sys.platform) -> None:
+ match = re.search(
+ "(me)",
+ '''
+ Come on
+ Come on now
+ You know that it's time to emerge
+ ''',
+ )
+ match case:
+ case "emscripten": print("on the web")
+ case "ios" | "android": print("on the phone")
+ case _: print('arms around', match.group(1))
+ """
+ )
+ expected = dedent(
+ """\
+ {k}import{z} re{o},{z} sys
+ {a}{k}def{z} {d}funct{z}{o}({z}case{o}:{z} {b}str{z} {o}={z} sys{o}.{z}platform{o}){z} {o}->{z} {k}None{z}{o}:{z}
+ match {o}={z} re{o}.{z}search{o}({z}
+ {s}"(me)"{z}{o},{z}
+ {s}'''{z}
+ {s} Come on{z}
+ {s} Come on now{z}
+ {s} You know that it's time to emerge{z}
+ {s} '''{z}{o},{z}
+ {o}){z}
+ {K}match{z} case{o}:{z}
+ {K}case{z} {s}"emscripten"{z}{o}:{z} {b}print{z}{o}({z}{s}"on the web"{z}{o}){z}
+ {K}case{z} {s}"ios"{z} {o}|{z} {s}"android"{z}{o}:{z} {b}print{z}{o}({z}{s}"on the phone"{z}{o}){z}
+ {K}case{z} {K}_{z}{o}:{z} {b}print{z}{o}({z}{s}'arms around'{z}{o},{z} match{o}.{z}group{o}({z}{n}1{z}{o}){z}{o}){z}
+ """
+ )
+ expected_sync = expected.format(a="", **colors)
+ events = code_to_events(code)
+ reader, _ = handle_all_events(events, prepare_reader=reader_force_colors)
+ self.assert_screen_equal(reader, code, clean=True)
+ self.assert_screen_equal(reader, expected_sync)
+ self.assertEqual(reader.pos, 2**7 + 2**8)
+ self.assertEqual(reader.cxy, (0, 14))
+
+ async_msg = "{k}async{z} ".format(**colors)
+ expected_async = expected.format(a=async_msg, **colors)
+ more_events = itertools.chain(
+ code_to_events(code),
+ [Event(evt="key", data="up", raw=bytearray(b"\x1bOA"))] * 13,
+ code_to_events("async "),
+ )
+ reader, _ = handle_all_events(more_events, prepare_reader=reader_force_colors)
+ self.assert_screen_equal(reader, expected_async)
+ self.assertEqual(reader.pos, 21)
+ self.assertEqual(reader.cxy, (6, 1))
+
+ def test_syntax_highlighting_incomplete_string_first_line(self):
+ code = dedent(
+ """\
+ def unfinished_function(arg: str = "still typing
+ """
+ )
+ expected = dedent(
+ """\
+ {k}def{z} {d}unfinished_function{z}{o}({z}arg{o}:{z} {b}str{z} {o}={z} {s}"still typing{z}
+ """
+ ).format(**colors)
+ events = code_to_events(code)
+ reader, _ = handle_all_events(events, prepare_reader=reader_force_colors)
+ self.assert_screen_equal(reader, code, clean=True)
+ self.assert_screen_equal(reader, expected)
+
+ def test_syntax_highlighting_incomplete_string_another_line(self):
+ code = dedent(
+ """\
+ def unfinished_function(
+ arg: str = "still typing
+ """
+ )
+ expected = dedent(
+ """\
+ {k}def{z} {d}unfinished_function{z}{o}({z}
+ arg{o}:{z} {b}str{z} {o}={z} {s}"still typing{z}
+ """
+ ).format(**colors)
+ events = code_to_events(code)
+ reader, _ = handle_all_events(events, prepare_reader=reader_force_colors)
+ self.assert_screen_equal(reader, code, clean=True)
+ self.assert_screen_equal(reader, expected)
+
+ def test_syntax_highlighting_incomplete_multiline_string(self):
+ code = dedent(
+ """\
+ def unfinished_function():
+ '''Still writing
+ the docstring
+ """
+ )
+ expected = dedent(
+ """\
+ {k}def{z} {d}unfinished_function{z}{o}({z}{o}){z}{o}:{z}
+ {s}'''Still writing{z}
+ {s} the docstring{z}
+ """
+ ).format(**colors)
+ events = code_to_events(code)
+ reader, _ = handle_all_events(events, prepare_reader=reader_force_colors)
+ self.assert_screen_equal(reader, code, clean=True)
+ self.assert_screen_equal(reader, expected)
+
+ def test_syntax_highlighting_incomplete_fstring(self):
+ code = dedent(
+ """\
+ def unfinished_function():
+ var = f"Single-quote but {
+ 1
+ +
+ 1
+ } multi-line!
+ """
+ )
+ expected = dedent(
+ """\
+ {k}def{z} {d}unfinished_function{z}{o}({z}{o}){z}{o}:{z}
+ var {o}={z} {s}f"{z}{s}Single-quote but {z}{o}{OB}{z}
+ {n}1{z}
+ {o}+{z}
+ {n}1{z}
+ {o}{CB}{z}{s} multi-line!{z}
+ """
+ ).format(OB="{", CB="}", **colors)
+ events = code_to_events(code)
+ reader, _ = handle_all_events(events, prepare_reader=reader_force_colors)
+ self.assert_screen_equal(reader, code, clean=True)
+ self.assert_screen_equal(reader, expected)
diff --git a/Lib/test/test_pyrepl/test_unix_console.py b/Lib/test/test_pyrepl/test_unix_console.py
index 2f5c150402b..7acb84a94f7 100644
--- a/Lib/test/test_pyrepl/test_unix_console.py
+++ b/Lib/test/test_pyrepl/test_unix_console.py
@@ -33,10 +33,12 @@ def unix_console(events, **kwargs):
handle_events_unix_console = partial(
handle_all_events,
- prepare_console=partial(unix_console),
+ prepare_reader=reader_no_colors,
+ prepare_console=unix_console,
)
handle_events_narrow_unix_console = partial(
handle_all_events,
+ prepare_reader=reader_no_colors,
prepare_console=partial(unix_console, width=5),
)
handle_events_short_unix_console = partial(
diff --git a/Lib/test/test_pyrepl/test_utils.py b/Lib/test/test_pyrepl/test_utils.py
index 0d59968206a..8ce1e537138 100644
--- a/Lib/test/test_pyrepl/test_utils.py
+++ b/Lib/test/test_pyrepl/test_utils.py
@@ -1,6 +1,6 @@
from unittest import TestCase
-from _pyrepl.utils import str_width, wlen
+from _pyrepl.utils import str_width, wlen, prev_next_window
class TestUtils(TestCase):
@@ -25,3 +25,38 @@ class TestUtils(TestCase):
self.assertEqual(wlen('hello'), 5)
self.assertEqual(wlen('hello' + '\x1a'), 7)
+
+ def test_prev_next_window(self):
+ def gen_normal():
+ yield 1
+ yield 2
+ yield 3
+ yield 4
+
+ pnw = prev_next_window(gen_normal())
+ self.assertEqual(next(pnw), (None, 1, 2))
+ self.assertEqual(next(pnw), (1, 2, 3))
+ self.assertEqual(next(pnw), (2, 3, 4))
+ self.assertEqual(next(pnw), (3, 4, None))
+ with self.assertRaises(StopIteration):
+ next(pnw)
+
+ def gen_short():
+ yield 1
+
+ pnw = prev_next_window(gen_short())
+ self.assertEqual(next(pnw), (None, 1, None))
+ with self.assertRaises(StopIteration):
+ next(pnw)
+
+ def gen_raise():
+ yield from gen_normal()
+ 1/0
+
+ pnw = prev_next_window(gen_raise())
+ self.assertEqual(next(pnw), (None, 1, 2))
+ self.assertEqual(next(pnw), (1, 2, 3))
+ self.assertEqual(next(pnw), (2, 3, 4))
+ self.assertEqual(next(pnw), (3, 4, None))
+ with self.assertRaises(ZeroDivisionError):
+ next(pnw)
diff --git a/Lib/test/test_pyrepl/test_windows_console.py b/Lib/test/test_pyrepl/test_windows_console.py
index 69f2d5af2a4..e95fec46a85 100644
--- a/Lib/test/test_pyrepl/test_windows_console.py
+++ b/Lib/test/test_pyrepl/test_windows_console.py
@@ -12,6 +12,7 @@ from unittest import TestCase
from unittest.mock import MagicMock, call
from .support import handle_all_events, code_to_events
+from .support import reader_no_colors as default_prepare_reader
try:
from _pyrepl.console import Event, Console
@@ -47,14 +48,22 @@ class WindowsConsoleTests(TestCase):
setattr(console, key, val)
return console
- def handle_events(self, events: Iterable[Event], **kwargs):
- return handle_all_events(events, partial(self.console, **kwargs))
+ def handle_events(
+ self,
+ events: Iterable[Event],
+ prepare_console=None,
+ prepare_reader=None,
+ **kwargs,
+ ):
+ prepare_console = prepare_console or partial(self.console, **kwargs)
+ prepare_reader = prepare_reader or default_prepare_reader
+ return handle_all_events(events, prepare_console, prepare_reader)
def handle_events_narrow(self, events):
return self.handle_events(events, width=5)
- def handle_events_short(self, events):
- return self.handle_events(events, height=1)
+ def handle_events_short(self, events, **kwargs):
+ return self.handle_events(events, height=1, **kwargs)
def handle_events_height_3(self, events):
return self.handle_events(events, height=3)
diff --git a/Lib/token.py b/Lib/token.py
index a1fde61cd8a..f61723cc09d 100644
--- a/Lib/token.py
+++ b/Lib/token.py
@@ -134,11 +134,11 @@ EXACT_TOKEN_TYPES = {
'~': TILDE,
}
-def ISTERMINAL(x):
+def ISTERMINAL(x: int) -> bool:
return x < NT_OFFSET
-def ISNONTERMINAL(x):
+def ISNONTERMINAL(x: int) -> bool:
return x >= NT_OFFSET
-def ISEOF(x):
+def ISEOF(x: int) -> bool:
return x == ENDMARKER
diff --git a/Misc/NEWS.d/next/Core_and_Builtins/2025-03-21-19-03-42.gh-issue-131507.q9fvyM.rst b/Misc/NEWS.d/next/Core_and_Builtins/2025-03-21-19-03-42.gh-issue-131507.q9fvyM.rst
new file mode 100644
index 00000000000..354a116c533
--- /dev/null
+++ b/Misc/NEWS.d/next/Core_and_Builtins/2025-03-21-19-03-42.gh-issue-131507.q9fvyM.rst
@@ -0,0 +1 @@
+PyREPL now supports syntax highlighing. Contributed by Łukasz Langa.
diff --git a/Misc/mypy/token.py b/Misc/mypy/token.py
new file mode 120000
index 00000000000..0a39f726dda
--- /dev/null
+++ b/Misc/mypy/token.py
@@ -0,0 +1 @@
+../../Lib/token.py \ No newline at end of file
diff --git a/Misc/mypy/typed-stdlib.txt b/Misc/mypy/typed-stdlib.txt
index 9b27ee0d2de..07b88ba7445 100644
--- a/Misc/mypy/typed-stdlib.txt
+++ b/Misc/mypy/typed-stdlib.txt
@@ -2,4 +2,5 @@
_colorize.py
_pyrepl
+token.py
tomllib \ No newline at end of file
diff --git a/Tools/build/generate_token.py b/Tools/build/generate_token.py
index a64806763f3..9ee5ec86e75 100755
--- a/Tools/build/generate_token.py
+++ b/Tools/build/generate_token.py
@@ -278,13 +278,13 @@ EXACT_TOKEN_TYPES = {
%s
}
-def ISTERMINAL(x):
+def ISTERMINAL(x: int) -> bool:
return x < NT_OFFSET
-def ISNONTERMINAL(x):
+def ISNONTERMINAL(x: int) -> bool:
return x >= NT_OFFSET
-def ISEOF(x):
+def ISEOF(x: int) -> bool:
return x == ENDMARKER
'''