1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
"""sqlite3 CLI tests."""
import sqlite3
import unittest
from sqlite3.__main__ import main as cli
from test.support.os_helper import TESTFN, unlink
from test.support import captured_stdout, captured_stderr, captured_stdin
class CommandLineInterface(unittest.TestCase):
def _do_test(self, *args, expect_success=True):
with (
captured_stdout() as out,
captured_stderr() as err,
self.assertRaises(SystemExit) as cm
):
cli(args)
return out.getvalue(), err.getvalue(), cm.exception.code
def expect_success(self, *args):
out, err, code = self._do_test(*args)
self.assertEqual(code, 0,
"\n".join([f"Unexpected failure: {args=}", out, err]))
self.assertEqual(err, "")
return out
def expect_failure(self, *args):
out, err, code = self._do_test(*args, expect_success=False)
self.assertNotEqual(code, 0,
"\n".join([f"Unexpected failure: {args=}", out, err]))
self.assertEqual(out, "")
return err
def test_cli_help(self):
out = self.expect_success("-h")
self.assertIn("usage: python -m sqlite3", out)
def test_cli_version(self):
out = self.expect_success("-v")
self.assertIn(sqlite3.sqlite_version, out)
def test_cli_execute_sql(self):
out = self.expect_success(":memory:", "select 1")
self.assertIn("(1,)", out)
def test_cli_execute_too_much_sql(self):
stderr = self.expect_failure(":memory:", "select 1; select 2")
err = "ProgrammingError: You can only execute one statement at a time"
self.assertIn(err, stderr)
def test_cli_execute_incomplete_sql(self):
stderr = self.expect_failure(":memory:", "sel")
self.assertIn("OperationalError (SQLITE_ERROR)", stderr)
def test_cli_on_disk_db(self):
self.addCleanup(unlink, TESTFN)
out = self.expect_success(TESTFN, "create table t(t)")
self.assertEqual(out, "")
out = self.expect_success(TESTFN, "select count(t) from t")
self.assertIn("(0,)", out)
class InteractiveSession(unittest.TestCase):
MEMORY_DB_MSG = "Connected to a transient in-memory database"
PS1 = "sqlite> "
PS2 = "... "
def run_cli(self, *args, commands=()):
with (
captured_stdin() as stdin,
captured_stdout() as stdout,
captured_stderr() as stderr,
self.assertRaises(SystemExit) as cm
):
for cmd in commands:
stdin.write(cmd + "\n")
stdin.seek(0)
cli(args)
out = stdout.getvalue()
err = stderr.getvalue()
self.assertEqual(cm.exception.code, 0,
f"Unexpected failure: {args=}\n{out}\n{err}")
return out, err
def test_interact(self):
out, err = self.run_cli()
self.assertIn(self.MEMORY_DB_MSG, err)
self.assertIn(self.PS1, out)
def test_interact_quit(self):
out, err = self.run_cli(commands=(".quit",))
self.assertIn(self.PS1, out)
def test_interact_version(self):
out, err = self.run_cli(commands=(".version",))
self.assertIn(self.MEMORY_DB_MSG, err)
self.assertIn(sqlite3.sqlite_version, out)
def test_interact_valid_sql(self):
out, err = self.run_cli(commands=("SELECT 1;",))
self.assertIn(self.MEMORY_DB_MSG, err)
self.assertIn("(1,)", out)
def test_interact_valid_multiline_sql(self):
out, err = self.run_cli(commands=("SELECT 1\n;",))
self.assertIn(self.MEMORY_DB_MSG, err)
self.assertIn(self.PS2, out)
self.assertIn("(1,)", out)
def test_interact_invalid_sql(self):
out, err = self.run_cli(commands=("sel;",))
self.assertIn(self.MEMORY_DB_MSG, err)
self.assertIn("OperationalError (SQLITE_ERROR)", err)
def test_interact_on_disk_file(self):
self.addCleanup(unlink, TESTFN)
out, err = self.run_cli(TESTFN, commands=("CREATE TABLE t(t);",))
self.assertIn(TESTFN, err)
self.assertIn(self.PS1, out)
out, _ = self.run_cli(TESTFN, commands=("SELECT count(t) FROM t;",))
self.assertIn("(0,)", out)
if __name__ == "__main__":
unittest.main()
|