diff options
Diffstat (limited to 'Lib/sqlite3/__main__.py')
-rw-r--r-- | Lib/sqlite3/__main__.py | 60 |
1 files changed, 41 insertions, 19 deletions
diff --git a/Lib/sqlite3/__main__.py b/Lib/sqlite3/__main__.py index 79a6209468d..c2fa23c46cf 100644 --- a/Lib/sqlite3/__main__.py +++ b/Lib/sqlite3/__main__.py @@ -10,9 +10,10 @@ import sys from argparse import ArgumentParser from code import InteractiveConsole from textwrap import dedent +from _colorize import get_theme, theme_no_color -def execute(c, sql, suppress_errors=True): +def execute(c, sql, suppress_errors=True, theme=theme_no_color): """Helper that wraps execution of SQL code. This is used both by the REPL and by direct execution from the CLI. @@ -25,11 +26,15 @@ def execute(c, sql, suppress_errors=True): for row in c.execute(sql): print(row) except sqlite3.Error as e: + t = theme.traceback tp = type(e).__name__ try: - print(f"{tp} ({e.sqlite_errorname}): {e}", file=sys.stderr) + tp += f" ({e.sqlite_errorname})" except AttributeError: - print(f"{tp}: {e}", file=sys.stderr) + pass + print( + f"{t.type}{tp}{t.reset}: {t.message}{e}{t.reset}", file=sys.stderr + ) if not suppress_errors: sys.exit(1) @@ -37,10 +42,11 @@ def execute(c, sql, suppress_errors=True): class SqliteInteractiveConsole(InteractiveConsole): """A simple SQLite REPL.""" - def __init__(self, connection): + def __init__(self, connection, use_color=False): super().__init__() self._con = connection self._cur = connection.cursor() + self._use_color = use_color def runsource(self, source, filename="<input>", symbol="single"): """Override runsource, the core of the InteractiveConsole REPL. @@ -48,23 +54,35 @@ class SqliteInteractiveConsole(InteractiveConsole): Return True if more input is needed; buffering is done automatically. Return False if input is a complete statement ready for execution. """ - match source: - case ".version": - print(f"{sqlite3.sqlite_version}") - case ".help": - print("Enter SQL code and press enter.") - case ".quit": - sys.exit(0) - case _: - if not sqlite3.complete_statement(source): - return True - execute(self._cur, source) + theme = get_theme(force_no_color=not self._use_color) + + if not source or source.isspace(): + return False + if source[0] == ".": + match source[1:].strip(): + case "version": + print(f"{sqlite3.sqlite_version}") + case "help": + print("Enter SQL code and press enter.") + case "quit": + sys.exit(0) + case "": + pass + case _ as unknown: + t = theme.traceback + self.write(f'{t.type}Error{t.reset}:{t.message} unknown' + f'command or invalid arguments: "{unknown}".\n{t.reset}') + else: + if not sqlite3.complete_statement(source): + return True + execute(self._cur, source, theme=theme) return False def main(*args): parser = ArgumentParser( description="Python sqlite3 CLI", + color=True, ) parser.add_argument( "filename", type=str, default=":memory:", nargs="?", @@ -104,17 +122,21 @@ def main(*args): Each command will be run using execute() on the cursor. Type ".help" for more information; type ".quit" or {eofkey} to quit. """).strip() - sys.ps1 = "sqlite> " - sys.ps2 = " ... " + + theme = get_theme() + s = theme.syntax + + sys.ps1 = f"{s.prompt}sqlite> {s.reset}" + sys.ps2 = f"{s.prompt} ... {s.reset}" con = sqlite3.connect(args.filename, isolation_level=None) try: if args.sql: # SQL statement provided on the command-line; execute it directly. - execute(con, args.sql, suppress_errors=False) + execute(con, args.sql, suppress_errors=False, theme=theme) else: # No SQL provided; start the REPL. - console = SqliteInteractiveConsole(con) + console = SqliteInteractiveConsole(con, use_color=True) try: import readline # noqa: F401 except ImportError: |