aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/_pyrepl/unix_eventqueue.py
diff options
context:
space:
mode:
authorPablo Galindo Salgado <Pablogsal@gmail.com>2024-05-05 21:32:23 +0200
committerGitHub <noreply@github.com>2024-05-05 21:32:23 +0200
commitf27f8c790af1233d499b795af1c0d1b36aaecaf5 (patch)
tree22c502c6382512fafbb63e3020c8462e5400d4df /Lib/_pyrepl/unix_eventqueue.py
parent40cc809902304f60c6e1c933191dd4d64e570e28 (diff)
downloadcpython-f27f8c790af1233d499b795af1c0d1b36aaecaf5.tar.gz
cpython-f27f8c790af1233d499b795af1c0d1b36aaecaf5.zip
gh-111201: A new Python REPL (GH-111567)
Co-authored-by: Łukasz Langa <lukasz@langa.pl> Co-authored-by: Marta Gómez Macías <mgmacias@google.com> Co-authored-by: Lysandros Nikolaou <lisandrosnik@gmail.com> Co-authored-by: Hugo van Kemenade <1324225+hugovk@users.noreply.github.com> Co-authored-by: Jelle Zijlstra <jelle.zijlstra@gmail.com>
Diffstat (limited to 'Lib/_pyrepl/unix_eventqueue.py')
-rw-r--r--Lib/_pyrepl/unix_eventqueue.py152
1 files changed, 152 insertions, 0 deletions
diff --git a/Lib/_pyrepl/unix_eventqueue.py b/Lib/_pyrepl/unix_eventqueue.py
new file mode 100644
index 00000000000..70cfade26e2
--- /dev/null
+++ b/Lib/_pyrepl/unix_eventqueue.py
@@ -0,0 +1,152 @@
+# Copyright 2000-2008 Michael Hudson-Doyle <micahel@gmail.com>
+# Armin Rigo
+#
+# All Rights Reserved
+#
+#
+# Permission to use, copy, modify, and distribute this software and
+# its documentation for any purpose is hereby granted without fee,
+# provided that the above copyright notice appear in all copies and
+# that both that copyright notice and this permission notice appear in
+# supporting documentation.
+#
+# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO
+# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL,
+# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
+# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
+# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
+# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+from collections import deque
+
+from . import keymap
+from .console import Event
+from . import curses
+from .trace import trace
+from termios import tcgetattr, VERASE
+import os
+
+
+# Mapping of human-readable key names to their terminal-specific codes
+TERMINAL_KEYNAMES = {
+ "delete": "kdch1",
+ "down": "kcud1",
+ "end": "kend",
+ "enter": "kent",
+ "home": "khome",
+ "insert": "kich1",
+ "left": "kcub1",
+ "page down": "knp",
+ "page up": "kpp",
+ "right": "kcuf1",
+ "up": "kcuu1",
+}
+
+
+# Function keys F1-F20 mapping
+TERMINAL_KEYNAMES.update(("f%d" % i, "kf%d" % i) for i in range(1, 21))
+
+# Known CTRL-arrow keycodes
+CTRL_ARROW_KEYCODES= {
+ # for xterm, gnome-terminal, xfce terminal, etc.
+ b'\033[1;5D': 'ctrl left',
+ b'\033[1;5C': 'ctrl right',
+ # for rxvt
+ b'\033Od': 'ctrl left',
+ b'\033Oc': 'ctrl right',
+}
+
+def get_terminal_keycodes() -> dict[bytes, str]:
+ """
+ Generates a dictionary mapping terminal keycodes to human-readable names.
+ """
+ keycodes = {}
+ for key, terminal_code in TERMINAL_KEYNAMES.items():
+ keycode = curses.tigetstr(terminal_code)
+ trace('key {key} tiname {terminal_code} keycode {keycode!r}', **locals())
+ if keycode:
+ keycodes[keycode] = key
+ keycodes.update(CTRL_ARROW_KEYCODES)
+ return keycodes
+
+class EventQueue:
+ def __init__(self, fd: int, encoding: str) -> None:
+ self.keycodes = get_terminal_keycodes()
+ if os.isatty(fd):
+ backspace = tcgetattr(fd)[6][VERASE]
+ self.keycodes[backspace] = "backspace"
+ self.compiled_keymap = keymap.compile_keymap(self.keycodes)
+ self.keymap = self.compiled_keymap
+ trace("keymap {k!r}", k=self.keymap)
+ self.encoding = encoding
+ self.events: deque[Event] = deque()
+ self.buf = bytearray()
+
+ def get(self) -> Event | None:
+ """
+ Retrieves the next event from the queue.
+ """
+ if self.events:
+ return self.events.popleft()
+ else:
+ return None
+
+ def empty(self) -> bool:
+ """
+ Checks if the queue is empty.
+ """
+ return not self.events
+
+ def flush_buf(self) -> bytearray:
+ """
+ Flushes the buffer and returns its contents.
+ """
+ old = self.buf
+ self.buf = bytearray()
+ return old
+
+ def insert(self, event: Event) -> None:
+ """
+ Inserts an event into the queue.
+ """
+ trace('added event {event}', event=event)
+ self.events.append(event)
+
+ def push(self, char: int | bytes) -> None:
+ """
+ Processes a character by updating the buffer and handling special key mappings.
+ """
+ ord_char = char if isinstance(char, int) else ord(char)
+ char = bytes(bytearray((ord_char,)))
+ self.buf.append(ord_char)
+ if char in self.keymap:
+ if self.keymap is self.compiled_keymap:
+ #sanity check, buffer is empty when a special key comes
+ assert len(self.buf) == 1
+ k = self.keymap[char]
+ trace('found map {k!r}', k=k)
+ if isinstance(k, dict):
+ self.keymap = k
+ else:
+ self.insert(Event('key', k, self.flush_buf()))
+ self.keymap = self.compiled_keymap
+
+ elif self.buf and self.buf[0] == 27: # escape
+ # escape sequence not recognized by our keymap: propagate it
+ # outside so that i can be recognized as an M-... key (see also
+ # the docstring in keymap.py
+ trace('unrecognized escape sequence, propagating...')
+ self.keymap = self.compiled_keymap
+ self.insert(Event('key', '\033', bytearray(b'\033')))
+ for _c in self.flush_buf()[1:]:
+ self.push(_c)
+
+ else:
+ try:
+ decoded = bytes(self.buf).decode(self.encoding)
+ except UnicodeError:
+ return
+ else:
+ self.insert(Event('key', decoded, self.flush_buf()))
+ self.keymap = self.compiled_keymap