diff options
Diffstat (limited to 'Lib/sqlite3')
-rw-r--r-- | Lib/sqlite3/__main__.py | 73 | ||||
-rw-r--r-- | Lib/sqlite3/_completer.py | 42 |
2 files changed, 91 insertions, 24 deletions
diff --git a/Lib/sqlite3/__main__.py b/Lib/sqlite3/__main__.py index 79a6209468d..35344ecceff 100644 --- a/Lib/sqlite3/__main__.py +++ b/Lib/sqlite3/__main__.py @@ -10,9 +10,12 @@ import sys from argparse import ArgumentParser from code import InteractiveConsole from textwrap import dedent +from _colorize import get_theme, theme_no_color +from ._completer import completer -def execute(c, sql, suppress_errors=True): + +def execute(c, sql, suppress_errors=True, theme=theme_no_color): """Helper that wraps execution of SQL code. This is used both by the REPL and by direct execution from the CLI. @@ -25,11 +28,15 @@ def execute(c, sql, suppress_errors=True): for row in c.execute(sql): print(row) except sqlite3.Error as e: + t = theme.traceback tp = type(e).__name__ try: - print(f"{tp} ({e.sqlite_errorname}): {e}", file=sys.stderr) + tp += f" ({e.sqlite_errorname})" except AttributeError: - print(f"{tp}: {e}", file=sys.stderr) + pass + print( + f"{t.type}{tp}{t.reset}: {t.message}{e}{t.reset}", file=sys.stderr + ) if not suppress_errors: sys.exit(1) @@ -37,10 +44,11 @@ def execute(c, sql, suppress_errors=True): class SqliteInteractiveConsole(InteractiveConsole): """A simple SQLite REPL.""" - def __init__(self, connection): + def __init__(self, connection, use_color=False): super().__init__() self._con = connection self._cur = connection.cursor() + self._use_color = use_color def runsource(self, source, filename="<input>", symbol="single"): """Override runsource, the core of the InteractiveConsole REPL. @@ -48,23 +56,39 @@ class SqliteInteractiveConsole(InteractiveConsole): Return True if more input is needed; buffering is done automatically. Return False if input is a complete statement ready for execution. """ - match source: - case ".version": - print(f"{sqlite3.sqlite_version}") - case ".help": - print("Enter SQL code and press enter.") - case ".quit": - sys.exit(0) - case _: - if not sqlite3.complete_statement(source): - return True - execute(self._cur, source) + theme = get_theme(force_no_color=not self._use_color) + + if not source or source.isspace(): + return False + if source[0] == ".": + match source[1:].strip(): + case "version": + print(sqlite3.sqlite_version) + case "help": + t = theme.syntax + print(f"Enter SQL code or one of the below commands, and press enter.\n\n" + f"{t.builtin}.version{t.reset} Print underlying SQLite library version\n" + f"{t.builtin}.help{t.reset} Print this help message\n" + f"{t.builtin}.quit{t.reset} Exit the CLI, equivalent to CTRL-D\n") + case "quit": + sys.exit(0) + case "": + pass + case _ as unknown: + t = theme.traceback + self.write(f'{t.type}Error{t.reset}: {t.message}unknown ' + f'command: "{unknown}"{t.reset}\n') + else: + if not sqlite3.complete_statement(source): + return True + execute(self._cur, source, theme=theme) return False def main(*args): parser = ArgumentParser( description="Python sqlite3 CLI", + color=True, ) parser.add_argument( "filename", type=str, default=":memory:", nargs="?", @@ -104,22 +128,23 @@ def main(*args): Each command will be run using execute() on the cursor. Type ".help" for more information; type ".quit" or {eofkey} to quit. """).strip() - sys.ps1 = "sqlite> " - sys.ps2 = " ... " + + theme = get_theme() + s = theme.syntax + + sys.ps1 = f"{s.prompt}sqlite> {s.reset}" + sys.ps2 = f"{s.prompt} ... {s.reset}" con = sqlite3.connect(args.filename, isolation_level=None) try: if args.sql: # SQL statement provided on the command-line; execute it directly. - execute(con, args.sql, suppress_errors=False) + execute(con, args.sql, suppress_errors=False, theme=theme) else: # No SQL provided; start the REPL. - console = SqliteInteractiveConsole(con) - try: - import readline # noqa: F401 - except ImportError: - pass - console.interact(banner, exitmsg="") + with completer(): + console = SqliteInteractiveConsole(con, use_color=True) + console.interact(banner, exitmsg="") finally: con.close() diff --git a/Lib/sqlite3/_completer.py b/Lib/sqlite3/_completer.py new file mode 100644 index 00000000000..f21ef69cad6 --- /dev/null +++ b/Lib/sqlite3/_completer.py @@ -0,0 +1,42 @@ +from contextlib import contextmanager + +try: + from _sqlite3 import SQLITE_KEYWORDS +except ImportError: + SQLITE_KEYWORDS = () + +_completion_matches = [] + + +def _complete(text, state): + global _completion_matches + + if state == 0: + text_upper = text.upper() + _completion_matches = [c for c in SQLITE_KEYWORDS if c.startswith(text_upper)] + try: + return _completion_matches[state] + " " + except IndexError: + return None + + +@contextmanager +def completer(): + try: + import readline + except ImportError: + yield + return + + old_completer = readline.get_completer() + try: + readline.set_completer(_complete) + if readline.backend == "editline": + # libedit uses "^I" instead of "tab" + command_string = "bind ^I rl_complete" + else: + command_string = "tab: complete" + readline.parse_and_bind(command_string) + yield + finally: + readline.set_completer(old_completer) |