aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/sqlite3/__main__.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/sqlite3/__main__.py')
-rw-r--r--Lib/sqlite3/__main__.py60
1 files changed, 41 insertions, 19 deletions
diff --git a/Lib/sqlite3/__main__.py b/Lib/sqlite3/__main__.py
index 79a6209468d..c2fa23c46cf 100644
--- a/Lib/sqlite3/__main__.py
+++ b/Lib/sqlite3/__main__.py
@@ -10,9 +10,10 @@ import sys
from argparse import ArgumentParser
from code import InteractiveConsole
from textwrap import dedent
+from _colorize import get_theme, theme_no_color
-def execute(c, sql, suppress_errors=True):
+def execute(c, sql, suppress_errors=True, theme=theme_no_color):
"""Helper that wraps execution of SQL code.
This is used both by the REPL and by direct execution from the CLI.
@@ -25,11 +26,15 @@ def execute(c, sql, suppress_errors=True):
for row in c.execute(sql):
print(row)
except sqlite3.Error as e:
+ t = theme.traceback
tp = type(e).__name__
try:
- print(f"{tp} ({e.sqlite_errorname}): {e}", file=sys.stderr)
+ tp += f" ({e.sqlite_errorname})"
except AttributeError:
- print(f"{tp}: {e}", file=sys.stderr)
+ pass
+ print(
+ f"{t.type}{tp}{t.reset}: {t.message}{e}{t.reset}", file=sys.stderr
+ )
if not suppress_errors:
sys.exit(1)
@@ -37,10 +42,11 @@ def execute(c, sql, suppress_errors=True):
class SqliteInteractiveConsole(InteractiveConsole):
"""A simple SQLite REPL."""
- def __init__(self, connection):
+ def __init__(self, connection, use_color=False):
super().__init__()
self._con = connection
self._cur = connection.cursor()
+ self._use_color = use_color
def runsource(self, source, filename="<input>", symbol="single"):
"""Override runsource, the core of the InteractiveConsole REPL.
@@ -48,23 +54,35 @@ class SqliteInteractiveConsole(InteractiveConsole):
Return True if more input is needed; buffering is done automatically.
Return False if input is a complete statement ready for execution.
"""
- match source:
- case ".version":
- print(f"{sqlite3.sqlite_version}")
- case ".help":
- print("Enter SQL code and press enter.")
- case ".quit":
- sys.exit(0)
- case _:
- if not sqlite3.complete_statement(source):
- return True
- execute(self._cur, source)
+ theme = get_theme(force_no_color=not self._use_color)
+
+ if not source or source.isspace():
+ return False
+ if source[0] == ".":
+ match source[1:].strip():
+ case "version":
+ print(f"{sqlite3.sqlite_version}")
+ case "help":
+ print("Enter SQL code and press enter.")
+ case "quit":
+ sys.exit(0)
+ case "":
+ pass
+ case _ as unknown:
+ t = theme.traceback
+ self.write(f'{t.type}Error{t.reset}:{t.message} unknown'
+ f'command or invalid arguments: "{unknown}".\n{t.reset}')
+ else:
+ if not sqlite3.complete_statement(source):
+ return True
+ execute(self._cur, source, theme=theme)
return False
def main(*args):
parser = ArgumentParser(
description="Python sqlite3 CLI",
+ color=True,
)
parser.add_argument(
"filename", type=str, default=":memory:", nargs="?",
@@ -104,17 +122,21 @@ def main(*args):
Each command will be run using execute() on the cursor.
Type ".help" for more information; type ".quit" or {eofkey} to quit.
""").strip()
- sys.ps1 = "sqlite> "
- sys.ps2 = " ... "
+
+ theme = get_theme()
+ s = theme.syntax
+
+ sys.ps1 = f"{s.prompt}sqlite> {s.reset}"
+ sys.ps2 = f"{s.prompt} ... {s.reset}"
con = sqlite3.connect(args.filename, isolation_level=None)
try:
if args.sql:
# SQL statement provided on the command-line; execute it directly.
- execute(con, args.sql, suppress_errors=False)
+ execute(con, args.sql, suppress_errors=False, theme=theme)
else:
# No SQL provided; start the REPL.
- console = SqliteInteractiveConsole(con)
+ console = SqliteInteractiveConsole(con, use_color=True)
try:
import readline # noqa: F401
except ImportError: