diff options
Diffstat (limited to 'Lib/sqlite3')
-rw-r--r-- | Lib/sqlite3/__init__.py | 5 | ||||
-rw-r--r-- | Lib/sqlite3/dbapi2.py | 15 | ||||
-rw-r--r-- | Lib/sqlite3/test/dbapi.py | 58 | ||||
-rw-r--r-- | Lib/sqlite3/test/dump.py | 2 | ||||
-rw-r--r-- | Lib/sqlite3/test/factory.py | 60 | ||||
-rw-r--r-- | Lib/sqlite3/test/hooks.py | 27 | ||||
-rw-r--r-- | Lib/sqlite3/test/py25tests.py | 80 | ||||
-rw-r--r-- | Lib/sqlite3/test/regression.py | 51 | ||||
-rw-r--r-- | Lib/sqlite3/test/transactions.py | 3 | ||||
-rw-r--r-- | Lib/sqlite3/test/types.py | 71 | ||||
-rw-r--r-- | Lib/sqlite3/test/userfunctions.py | 55 |
11 files changed, 174 insertions, 253 deletions
diff --git a/Lib/sqlite3/__init__.py b/Lib/sqlite3/__init__.py index 41ef2b76dfc..6c91df27cca 100644 --- a/Lib/sqlite3/__init__.py +++ b/Lib/sqlite3/__init__.py @@ -1,7 +1,6 @@ -#-*- coding: ISO-8859-1 -*- # pysqlite2/__init__.py: the pysqlite2 package. # -# Copyright (C) 2005 Gerhard Häring <gh@ghaering.de> +# Copyright (C) 2005 Gerhard Häring <gh@ghaering.de> # # This file is part of pysqlite. # @@ -21,4 +20,4 @@ # misrepresented as being the original software. # 3. This notice may not be removed or altered from any source distribution. -from dbapi2 import * +from sqlite3.dbapi2 import * diff --git a/Lib/sqlite3/dbapi2.py b/Lib/sqlite3/dbapi2.py index 7eb28e87b1b..6c121a5c06d 100644 --- a/Lib/sqlite3/dbapi2.py +++ b/Lib/sqlite3/dbapi2.py @@ -1,7 +1,6 @@ -#-*- coding: ISO-8859-1 -*- # pysqlite2/dbapi2.py: the DB-API 2.0 interface # -# Copyright (C) 2004-2005 Gerhard Häring <gh@ghaering.de> +# Copyright (C) 2004-2005 Gerhard Häring <gh@ghaering.de> # # This file is part of pysqlite. # @@ -50,7 +49,7 @@ def TimestampFromTicks(ticks): version_info = tuple([int(x) for x in version.split(".")]) sqlite_version_info = tuple([int(x) for x in sqlite_version.split(".")]) -Binary = buffer +Binary = memoryview def register_adapters_and_converters(): def adapt_date(val): @@ -60,13 +59,13 @@ def register_adapters_and_converters(): return val.isoformat(" ") def convert_date(val): - return datetime.date(*map(int, val.split("-"))) + return datetime.date(*map(int, val.split(b"-"))) def convert_timestamp(val): - datepart, timepart = val.split(" ") - year, month, day = map(int, datepart.split("-")) - timepart_full = timepart.split(".") - hours, minutes, seconds = map(int, timepart_full[0].split(":")) + datepart, timepart = val.split(b" ") + year, month, day = map(int, datepart.split(b"-")) + timepart_full = timepart.split(b".") + hours, minutes, seconds = map(int, timepart_full[0].split(b":")) if len(timepart_full) == 2: microseconds = int(timepart_full[1]) else: diff --git a/Lib/sqlite3/test/dbapi.py b/Lib/sqlite3/test/dbapi.py index c356d4706c4..202bd388760 100644 --- a/Lib/sqlite3/test/dbapi.py +++ b/Lib/sqlite3/test/dbapi.py @@ -22,7 +22,6 @@ # 3. This notice may not be removed or altered from any source distribution. import unittest -import sys import sqlite3 as sqlite try: import threading @@ -44,12 +43,12 @@ class ModuleTests(unittest.TestCase): sqlite.paramstyle) def CheckWarning(self): - self.assertTrue(issubclass(sqlite.Warning, StandardError), - "Warning is not a subclass of StandardError") + self.assertTrue(issubclass(sqlite.Warning, Exception), + "Warning is not a subclass of Exception") def CheckError(self): - self.assertTrue(issubclass(sqlite.Error, StandardError), - "Error is not a subclass of StandardError") + self.assertTrue(issubclass(sqlite.Error, Exception), + "Error is not a subclass of Exception") def CheckInterfaceError(self): self.assertTrue(issubclass(sqlite.InterfaceError, sqlite.Error), @@ -85,6 +84,7 @@ class ModuleTests(unittest.TestCase): "NotSupportedError is not a subclass of DatabaseError") class ConnectionTests(unittest.TestCase): + def setUp(self): self.cx = sqlite.connect(":memory:") cu = self.cx.cursor() @@ -141,6 +141,28 @@ class ConnectionTests(unittest.TestCase): self.assertEqual(self.cx.ProgrammingError, sqlite.ProgrammingError) self.assertEqual(self.cx.NotSupportedError, sqlite.NotSupportedError) + def CheckInTransaction(self): + # Can't use db from setUp because we want to test initial state. + cx = sqlite.connect(":memory:") + cu = cx.cursor() + self.assertEqual(cx.in_transaction, False) + cu.execute("create table transactiontest(id integer primary key, name text)") + self.assertEqual(cx.in_transaction, False) + cu.execute("insert into transactiontest(name) values (?)", ("foo",)) + self.assertEqual(cx.in_transaction, True) + cu.execute("select name from transactiontest where name=?", ["foo"]) + row = cu.fetchone() + self.assertEqual(cx.in_transaction, True) + cx.commit() + self.assertEqual(cx.in_transaction, False) + cu.execute("select name from transactiontest where name=?", ["foo"]) + row = cu.fetchone() + self.assertEqual(cx.in_transaction, False) + + def CheckInTransactionRO(self): + with self.assertRaises(AttributeError): + self.cx.in_transaction = True + class CursorTests(unittest.TestCase): def setUp(self): self.cx = sqlite.connect(":memory:") @@ -260,10 +282,6 @@ class CursorTests(unittest.TestCase): self.assertEqual(row[0], "foo") def CheckExecuteDictMapping_Mapping(self): - # Test only works with Python 2.5 or later - if sys.version_info < (2, 5, 0): - return - class D(dict): def __missing__(self, key): return "foo" @@ -339,7 +357,7 @@ class CursorTests(unittest.TestCase): def __init__(self): self.value = 5 - def next(self): + def __next__(self): if self.value == 10: raise StopIteration else: @@ -379,8 +397,8 @@ class CursorTests(unittest.TestCase): self.fail("should have raised a TypeError") except TypeError: return - except Exception, e: - print "raised", e.__class__ + except Exception as e: + print("raised", e.__class__) self.fail("raised wrong exception.") def CheckFetchIter(self): @@ -653,7 +671,7 @@ class ConstructorTests(unittest.TestCase): ts = sqlite.TimestampFromTicks(42) def CheckBinary(self): - b = sqlite.Binary(chr(0) + "'") + b = sqlite.Binary(b"\0'") class ExtensionTests(unittest.TestCase): def CheckScriptStringSql(self): @@ -669,20 +687,6 @@ class ExtensionTests(unittest.TestCase): res = cur.fetchone()[0] self.assertEqual(res, 5) - def CheckScriptStringUnicode(self): - con = sqlite.connect(":memory:") - cur = con.cursor() - cur.executescript(u""" - create table a(i); - insert into a(i) values (5); - select i from a; - delete from a; - insert into a(i) values (6); - """) - cur.execute("select i from a") - res = cur.fetchone()[0] - self.assertEqual(res, 6) - def CheckScriptSyntaxError(self): con = sqlite.connect(":memory:") cur = con.cursor() diff --git a/Lib/sqlite3/test/dump.py b/Lib/sqlite3/test/dump.py index 2e9b436f30a..b200333afac 100644 --- a/Lib/sqlite3/test/dump.py +++ b/Lib/sqlite3/test/dump.py @@ -47,7 +47,7 @@ class DumpTests(unittest.TestCase): expected_sqls = ['BEGIN TRANSACTION;'] + expected_sqls + \ ['COMMIT;'] [self.assertEqual(expected_sqls[i], actual_sqls[i]) - for i in xrange(len(expected_sqls))] + for i in range(len(expected_sqls))] def suite(): return unittest.TestSuite(unittest.makeSuite(DumpTests, "Check")) diff --git a/Lib/sqlite3/test/factory.py b/Lib/sqlite3/test/factory.py index 52854be2b99..7f6f3473f3e 100644 --- a/Lib/sqlite3/test/factory.py +++ b/Lib/sqlite3/test/factory.py @@ -159,31 +159,31 @@ class TextFactoryTests(unittest.TestCase): self.con = sqlite.connect(":memory:") def CheckUnicode(self): - austria = unicode("Österreich", "latin1") + austria = "Österreich" row = self.con.execute("select ?", (austria,)).fetchone() - self.assertTrue(type(row[0]) == unicode, "type of row[0] must be unicode") + self.assertTrue(type(row[0]) == str, "type of row[0] must be unicode") def CheckString(self): - self.con.text_factory = str - austria = unicode("Österreich", "latin1") + self.con.text_factory = bytes + austria = "Österreich" row = self.con.execute("select ?", (austria,)).fetchone() - self.assertTrue(type(row[0]) == str, "type of row[0] must be str") + self.assertTrue(type(row[0]) == bytes, "type of row[0] must be bytes") self.assertTrue(row[0] == austria.encode("utf-8"), "column must equal original data in UTF-8") def CheckCustom(self): - self.con.text_factory = lambda x: unicode(x, "utf-8", "ignore") - austria = unicode("Österreich", "latin1") - row = self.con.execute("select ?", (austria.encode("latin1"),)).fetchone() - self.assertTrue(type(row[0]) == unicode, "type of row[0] must be unicode") - self.assertTrue(row[0].endswith(u"reich"), "column must contain original data") + self.con.text_factory = lambda x: str(x, "utf-8", "ignore") + austria = "Österreich" + row = self.con.execute("select ?", (austria,)).fetchone() + self.assertTrue(type(row[0]) == str, "type of row[0] must be unicode") + self.assertTrue(row[0].endswith("reich"), "column must contain original data") def CheckOptimizedUnicode(self): self.con.text_factory = sqlite.OptimizedUnicode - austria = unicode("Österreich", "latin1") - germany = unicode("Deutchland") + austria = "Österreich" + germany = "Deutchland" a_row = self.con.execute("select ?", (austria,)).fetchone() d_row = self.con.execute("select ?", (germany,)).fetchone() - self.assertTrue(type(a_row[0]) == unicode, "type of non-ASCII row must be unicode") + self.assertTrue(type(a_row[0]) == str, "type of non-ASCII row must be str") self.assertTrue(type(d_row[0]) == str, "type of ASCII-only row must be str") def tearDown(self): @@ -196,33 +196,29 @@ class TextFactoryTestsWithEmbeddedZeroBytes(unittest.TestCase): self.con.execute("insert into test (value) values (?)", ("a\x00b",)) def CheckString(self): - # text_factory defaults to unicode + # text_factory defaults to str row = self.con.execute("select value from test").fetchone() - self.assertIs(type(row[0]), unicode) + self.assertIs(type(row[0]), str) self.assertEqual(row[0], "a\x00b") - def CheckCustom(self): - # A custom factory should receive an str argument - self.con.text_factory = lambda x: x + def CheckBytes(self): + self.con.text_factory = bytes row = self.con.execute("select value from test").fetchone() - self.assertIs(type(row[0]), str) - self.assertEqual(row[0], "a\x00b") + self.assertIs(type(row[0]), bytes) + self.assertEqual(row[0], b"a\x00b") - def CheckOptimizedUnicodeAsString(self): - # ASCII -> str argument - self.con.text_factory = sqlite.OptimizedUnicode + def CheckBytearray(self): + self.con.text_factory = bytearray row = self.con.execute("select value from test").fetchone() - self.assertIs(type(row[0]), str) - self.assertEqual(row[0], "a\x00b") + self.assertIs(type(row[0]), bytearray) + self.assertEqual(row[0], b"a\x00b") - def CheckOptimizedUnicodeAsUnicode(self): - # Non-ASCII -> unicode argument - self.con.text_factory = sqlite.OptimizedUnicode - self.con.execute("delete from test") - self.con.execute("insert into test (value) values (?)", (u'ä\0ö',)) + def CheckCustom(self): + # A custom factory should receive a bytes argument + self.con.text_factory = lambda x: x row = self.con.execute("select value from test").fetchone() - self.assertIs(type(row[0]), unicode) - self.assertEqual(row[0], u"ä\x00ö") + self.assertIs(type(row[0]), bytes) + self.assertEqual(row[0], b"a\x00b") def tearDown(self): self.con.close() diff --git a/Lib/sqlite3/test/hooks.py b/Lib/sqlite3/test/hooks.py index b798e749add..a92e8387861 100644 --- a/Lib/sqlite3/test/hooks.py +++ b/Lib/sqlite3/test/hooks.py @@ -21,7 +21,7 @@ # misrepresented as being the original software. # 3. This notice may not be removed or altered from any source distribution. -import os, unittest +import unittest import sqlite3 as sqlite class CollationTests(unittest.TestCase): @@ -36,15 +36,15 @@ class CollationTests(unittest.TestCase): try: con.create_collation("X", 42) self.fail("should have raised a TypeError") - except TypeError, e: + except TypeError as e: self.assertEqual(e.args[0], "parameter must be callable") def CheckCreateCollationNotAscii(self): con = sqlite.connect(":memory:") try: - con.create_collation("collä", cmp) + con.create_collation("collä", lambda x, y: (x > y) - (x < y)) self.fail("should have raised a ProgrammingError") - except sqlite.ProgrammingError, e: + except sqlite.ProgrammingError as e: pass def CheckCollationIsUsed(self): @@ -52,7 +52,7 @@ class CollationTests(unittest.TestCase): return def mycoll(x, y): # reverse order - return -cmp(x, y) + return -((x > y) - (x < y)) con = sqlite.connect(":memory:") con.create_collation("mycoll", mycoll) @@ -73,7 +73,7 @@ class CollationTests(unittest.TestCase): try: result = con.execute(sql).fetchall() self.fail("should have raised an OperationalError") - except sqlite.OperationalError, e: + except sqlite.OperationalError as e: self.assertEqual(e.args[0].lower(), "no such collation sequence: mycoll") def CheckCollationRegisterTwice(self): @@ -82,8 +82,8 @@ class CollationTests(unittest.TestCase): Verify that the last one is actually used. """ con = sqlite.connect(":memory:") - con.create_collation("mycoll", cmp) - con.create_collation("mycoll", lambda x, y: -cmp(x, y)) + con.create_collation("mycoll", lambda x, y: (x > y) - (x < y)) + con.create_collation("mycoll", lambda x, y: -((x > y) - (x < y))) result = con.execute(""" select x from (select 'a' as x union select 'b' as x) order by x collate mycoll """).fetchall() @@ -96,12 +96,12 @@ class CollationTests(unittest.TestCase): to use it. """ con = sqlite.connect(":memory:") - con.create_collation("mycoll", cmp) + con.create_collation("mycoll", lambda x, y: (x > y) - (x < y)) con.create_collation("mycoll", None) try: con.execute("select 'a' as x union select 'b' as x order by x collate mycoll") self.fail("should have raised an OperationalError") - except sqlite.OperationalError, e: + except sqlite.OperationalError as e: if not e.args[0].startswith("no such collation sequence"): self.fail("wrong OperationalError raised") @@ -166,14 +166,15 @@ class ProgressTests(unittest.TestCase): Test that setting the progress handler to None clears the previously set handler. """ con = sqlite.connect(":memory:") - action = [] + action = 0 def progress(): - action.append(1) + nonlocal action + action = 1 return 0 con.set_progress_handler(progress, 1) con.set_progress_handler(None, 1) con.execute("select 1 union select 2 union select 3").fetchall() - self.assertEqual(len(action), 0, "progress handler was not cleared") + self.assertEqual(action, 0, "progress handler was not cleared") def suite(): collation_suite = unittest.makeSuite(CollationTests, "Check") diff --git a/Lib/sqlite3/test/py25tests.py b/Lib/sqlite3/test/py25tests.py deleted file mode 100644 index 7fd3c0e0cf8..00000000000 --- a/Lib/sqlite3/test/py25tests.py +++ /dev/null @@ -1,80 +0,0 @@ -#-*- coding: ISO-8859-1 -*- -# pysqlite2/test/regression.py: pysqlite regression tests -# -# Copyright (C) 2007 Gerhard Häring <gh@ghaering.de> -# -# This file is part of pysqlite. -# -# This software is provided 'as-is', without any express or implied -# warranty. In no event will the authors be held liable for any damages -# arising from the use of this software. -# -# Permission is granted to anyone to use this software for any purpose, -# including commercial applications, and to alter it and redistribute it -# freely, subject to the following restrictions: -# -# 1. The origin of this software must not be misrepresented; you must not -# claim that you wrote the original software. If you use this software -# in a product, an acknowledgment in the product documentation would be -# appreciated but is not required. -# 2. Altered source versions must be plainly marked as such, and must not be -# misrepresented as being the original software. -# 3. This notice may not be removed or altered from any source distribution. - -from __future__ import with_statement -import unittest -import sqlite3 as sqlite - -did_rollback = False - -class MyConnection(sqlite.Connection): - def rollback(self): - global did_rollback - did_rollback = True - sqlite.Connection.rollback(self) - -class ContextTests(unittest.TestCase): - def setUp(self): - global did_rollback - self.con = sqlite.connect(":memory:", factory=MyConnection) - self.con.execute("create table test(c unique)") - did_rollback = False - - def tearDown(self): - self.con.close() - - def CheckContextManager(self): - """Can the connection be used as a context manager at all?""" - with self.con: - pass - - def CheckContextManagerCommit(self): - """Is a commit called in the context manager?""" - with self.con: - self.con.execute("insert into test(c) values ('foo')") - self.con.rollback() - count = self.con.execute("select count(*) from test").fetchone()[0] - self.assertEqual(count, 1) - - def CheckContextManagerRollback(self): - """Is a rollback called in the context manager?""" - global did_rollback - self.assertEqual(did_rollback, False) - try: - with self.con: - self.con.execute("insert into test(c) values (4)") - self.con.execute("insert into test(c) values (4)") - except sqlite.IntegrityError: - pass - self.assertEqual(did_rollback, True) - -def suite(): - ctx_suite = unittest.makeSuite(ContextTests, "Check") - return unittest.TestSuite((ctx_suite,)) - -def test(): - runner = unittest.TextTestRunner() - runner.run(suite()) - -if __name__ == "__main__": - test() diff --git a/Lib/sqlite3/test/regression.py b/Lib/sqlite3/test/regression.py index eec2fcde951..c7551e35a1b 100644 --- a/Lib/sqlite3/test/regression.py +++ b/Lib/sqlite3/test/regression.py @@ -1,7 +1,7 @@ #-*- coding: ISO-8859-1 -*- # pysqlite2/test/regression.py: pysqlite regression tests # -# Copyright (C) 2006-2007 Gerhard Häring <gh@ghaering.de> +# Copyright (C) 2006-2010 Gerhard Häring <gh@ghaering.de> # # This file is part of pysqlite. # @@ -52,10 +52,10 @@ class RegressionTests(unittest.TestCase): # reset before a rollback, but only those that are still in the # statement cache. The others are not accessible from the connection object. con = sqlite.connect(":memory:", cached_statements=5) - cursors = [con.cursor() for x in xrange(5)] + cursors = [con.cursor() for x in range(5)] cursors[0].execute("create table test(x)") for i in range(10): - cursors[0].executemany("insert into test(x) values (?)", [(x,) for x in xrange(10)]) + cursors[0].executemany("insert into test(x) values (?)", [(x,) for x in range(10)]) for i in range(5): cursors[i].execute(" " * i + "select x from test") @@ -116,18 +116,6 @@ class RegressionTests(unittest.TestCase): """ self.con.execute("") - def CheckUnicodeConnect(self): - """ - With pysqlite 2.4.0 you needed to use a string or a APSW connection - object for opening database connections. - - Formerly, both bytestrings and unicode strings used to work. - - Let's make sure unicode strings work in the future. - """ - con = sqlite.connect(u":memory:") - con.close() - def CheckTypeMapUsage(self): """ pysqlite until 2.4.1 did not rebuild the row_cast_map when recompiling @@ -143,6 +131,21 @@ class RegressionTests(unittest.TestCase): con.execute("insert into foo(bar) values (5)") con.execute(SELECT) + def CheckErrorMsgDecodeError(self): + # When porting the module to Python 3.0, the error message about + # decoding errors disappeared. This verifies they're back again. + failure = None + try: + self.con.execute("select 'xxx' || ? || 'yyy' colname", + (bytes(bytearray([250])),)).fetchone() + failure = "should have raised an OperationalError with detailed description" + except sqlite.OperationalError as e: + msg = e.args[0] + if not msg.startswith("Could not decode to UTF-8 column 'colname' with text 'xxx"): + failure = "OperationalError did not have expected description text" + if failure: + self.fail(failure) + def CheckRegisterAdapter(self): """ See issue 3312. @@ -154,8 +157,7 @@ class RegressionTests(unittest.TestCase): See issue 3312. """ con = sqlite.connect(":memory:") - self.assertRaises(UnicodeEncodeError, setattr, con, - "isolation_level", u"\xe9") + setattr(con, "isolation_level", "\xe9") def CheckCursorConstructorCallCheck(self): """ @@ -175,6 +177,14 @@ class RegressionTests(unittest.TestCase): except: self.fail("should have raised ProgrammingError") + + def CheckStrSubclass(self): + """ + The Python 3.0 port of the module didn't cope with values of subclasses of str. + """ + class MyStr(str): pass + self.con.execute("select ?", (MyStr("abc"),)) + def CheckConnectionConstructorCallCheck(self): """ Verifies that connection methods check wether base class __init__ was called. @@ -264,6 +274,13 @@ class RegressionTests(unittest.TestCase): """ self.assertRaises(sqlite.Warning, self.con, 1) + def CheckCollation(self): + def collation_cb(a, b): + return 1 + self.assertRaises(sqlite.ProgrammingError, self.con.create_collation, + # Lone surrogate cannot be encoded to the default encoding (utf8) + "\uDC80", collation_cb) + def CheckRecursiveCursorUse(self): """ http://bugs.python.org/issue10811 diff --git a/Lib/sqlite3/test/transactions.py b/Lib/sqlite3/test/transactions.py index a732251b2d4..70e96a12ed3 100644 --- a/Lib/sqlite3/test/transactions.py +++ b/Lib/sqlite3/test/transactions.py @@ -21,7 +21,6 @@ # misrepresented as being the original software. # 3. This notice may not be removed or altered from any source distribution. -import sys import os, unittest import sqlite3 as sqlite @@ -163,7 +162,7 @@ class TransactionTests(unittest.TestCase): try: cur.fetchall() self.fail("InterfaceError should have been raised") - except sqlite.InterfaceError, e: + except sqlite.InterfaceError as e: pass except: self.fail("InterfaceError should have been raised") diff --git a/Lib/sqlite3/test/types.py b/Lib/sqlite3/test/types.py index c5ab39bb771..29413e14ec3 100644 --- a/Lib/sqlite3/test/types.py +++ b/Lib/sqlite3/test/types.py @@ -1,7 +1,7 @@ #-*- coding: ISO-8859-1 -*- # pysqlite2/test/types.py: tests for type conversion and detection # -# Copyright (C) 2005-2007 Gerhard Häring <gh@ghaering.de> +# Copyright (C) 2005 Gerhard Häring <gh@ghaering.de> # # This file is part of pysqlite. # @@ -41,10 +41,10 @@ class SqliteTypeTests(unittest.TestCase): self.con.close() def CheckString(self): - self.cur.execute("insert into test(s) values (?)", (u"Österreich",)) + self.cur.execute("insert into test(s) values (?)", ("Österreich",)) self.cur.execute("select s from test") row = self.cur.fetchone() - self.assertEqual(row[0], u"Österreich") + self.assertEqual(row[0], "Österreich") def CheckSmallInt(self): self.cur.execute("insert into test(i) values (?)", (42,)) @@ -67,47 +67,25 @@ class SqliteTypeTests(unittest.TestCase): self.assertEqual(row[0], val) def CheckBlob(self): - val = buffer("Guglhupf") + sample = b"Guglhupf" + val = memoryview(sample) self.cur.execute("insert into test(b) values (?)", (val,)) self.cur.execute("select b from test") row = self.cur.fetchone() - self.assertEqual(row[0], val) + self.assertEqual(row[0], sample) def CheckUnicodeExecute(self): - self.cur.execute(u"select 'Österreich'") + self.cur.execute("select 'Österreich'") row = self.cur.fetchone() - self.assertEqual(row[0], u"Österreich") - - def CheckNonUtf8_Default(self): - try: - self.cur.execute("select ?", (chr(150),)) - self.fail("should have raised a ProgrammingError") - except sqlite.ProgrammingError: - pass - - def CheckNonUtf8_TextFactoryString(self): - orig_text_factory = self.con.text_factory - try: - self.con.text_factory = str - self.cur.execute("select ?", (chr(150),)) - finally: - self.con.text_factory = orig_text_factory - - def CheckNonUtf8_TextFactoryOptimizedUnicode(self): - orig_text_factory = self.con.text_factory - try: - try: - self.con.text_factory = sqlite.OptimizedUnicode - self.cur.execute("select ?", (chr(150),)) - self.fail("should have raised a ProgrammingError") - except sqlite.ProgrammingError: - pass - finally: - self.con.text_factory = orig_text_factory + self.assertEqual(row[0], "Österreich") class DeclTypesTests(unittest.TestCase): class Foo: def __init__(self, _val): + if isinstance(_val, bytes): + # sqlite3 always calls __init__ with a bytes created from a + # UTF-8 string when __conform__ was used to store the object. + _val = _val.decode('utf8') self.val = _val def __cmp__(self, other): @@ -118,6 +96,12 @@ class DeclTypesTests(unittest.TestCase): else: return 1 + def __eq__(self, other): + c = self.__cmp__(other) + if c is NotImplemented: + return c + return c == 0 + def __conform__(self, protocol): if protocol is sqlite.PrepareProtocol: return self.val @@ -194,7 +178,7 @@ class DeclTypesTests(unittest.TestCase): def CheckUnicode(self): # default - val = u"\xd6sterreich" + val = "\xd6sterreich" self.cur.execute("insert into test(u) values (?)", (val,)) self.cur.execute("select u from test") row = self.cur.fetchone() @@ -231,11 +215,12 @@ class DeclTypesTests(unittest.TestCase): def CheckBlob(self): # default - val = buffer("Guglhupf") + sample = b"Guglhupf" + val = memoryview(sample) self.cur.execute("insert into test(bin) values (?)", (val,)) self.cur.execute("select bin from test") row = self.cur.fetchone() - self.assertEqual(row[0], val) + self.assertEqual(row[0], sample) def CheckNumber1(self): self.cur.execute("insert into test(n1) values (5)") @@ -256,9 +241,9 @@ class ColNamesTests(unittest.TestCase): self.cur = self.con.cursor() self.cur.execute("create table test(x foo)") - sqlite.converters["FOO"] = lambda x: "[%s]" % x - sqlite.converters["BAR"] = lambda x: "<%s>" % x - sqlite.converters["EXC"] = lambda x: 5 // 0 + sqlite.converters["FOO"] = lambda x: "[%s]" % x.decode("ascii") + sqlite.converters["BAR"] = lambda x: "<%s>" % x.decode("ascii") + sqlite.converters["EXC"] = lambda x: 5/0 sqlite.converters["B1B1"] = lambda x: "MARKER" def tearDown(self): @@ -296,7 +281,7 @@ class ColNamesTests(unittest.TestCase): self.assertEqual(self.cur.description[0][0], "x") def CheckCaseInConverterName(self): - self.cur.execute("""select 'other' as "x [b1b1]\"""") + self.cur.execute("select 'other' as \"x [b1b1]\"") val = self.cur.fetchone()[0] self.assertEqual(val, "MARKER") @@ -346,8 +331,8 @@ class BinaryConverterTests(unittest.TestCase): self.con.close() def CheckBinaryInputForConverter(self): - testdata = "abcdefg" * 10 - result = self.con.execute('select ? as "x [bin]"', (buffer(zlib.compress(testdata)),)).fetchone()[0] + testdata = b"abcdefg" * 10 + result = self.con.execute('select ? as "x [bin]"', (memoryview(zlib.compress(testdata)),)).fetchone()[0] self.assertEqual(testdata, result) class DateTimeTests(unittest.TestCase): diff --git a/Lib/sqlite3/test/userfunctions.py b/Lib/sqlite3/test/userfunctions.py index 2db3a610b34..e01341e9159 100644 --- a/Lib/sqlite3/test/userfunctions.py +++ b/Lib/sqlite3/test/userfunctions.py @@ -28,7 +28,7 @@ import sqlite3 as sqlite def func_returntext(): return "foo" def func_returnunicode(): - return u"bar" + return "bar" def func_returnint(): return 42 def func_returnfloat(): @@ -36,14 +36,14 @@ def func_returnfloat(): def func_returnnull(): return None def func_returnblob(): - return buffer("blob") + return b"blob" def func_returnlonglong(): return 1<<31 def func_raiseexception(): - 5 // 0 + 5/0 def func_isstring(v): - return type(v) is unicode + return type(v) is str def func_isint(v): return type(v) is int def func_isfloat(v): @@ -51,9 +51,9 @@ def func_isfloat(v): def func_isnone(v): return type(v) is type(None) def func_isblob(v): - return type(v) is buffer + return isinstance(v, (bytes, memoryview)) def func_islonglong(v): - return isinstance(v, (int, long)) and v >= 1<<31 + return isinstance(v, int) and v >= 1<<31 class AggrNoStep: def __init__(self): @@ -71,7 +71,7 @@ class AggrNoFinalize: class AggrExceptionInInit: def __init__(self): - 5 // 0 + 5/0 def step(self, x): pass @@ -84,7 +84,7 @@ class AggrExceptionInStep: pass def step(self, x): - 5 // 0 + 5/0 def finalize(self): return 42 @@ -97,14 +97,15 @@ class AggrExceptionInFinalize: pass def finalize(self): - 5 // 0 + 5/0 class AggrCheckType: def __init__(self): self.val = None def step(self, whichType, val): - theType = {"str": unicode, "int": int, "float": float, "None": type(None), "blob": buffer} + theType = {"str": str, "int": int, "float": float, "None": type(None), + "blob": bytes} self.val = int(theType[whichType] is type(val)) def finalize(self): @@ -166,15 +167,15 @@ class FunctionTests(unittest.TestCase): cur = self.con.cursor() cur.execute("select returntext()") val = cur.fetchone()[0] - self.assertEqual(type(val), unicode) + self.assertEqual(type(val), str) self.assertEqual(val, "foo") def CheckFuncReturnUnicode(self): cur = self.con.cursor() cur.execute("select returnunicode()") val = cur.fetchone()[0] - self.assertEqual(type(val), unicode) - self.assertEqual(val, u"bar") + self.assertEqual(type(val), str) + self.assertEqual(val, "bar") def CheckFuncReturnInt(self): cur = self.con.cursor() @@ -202,8 +203,8 @@ class FunctionTests(unittest.TestCase): cur = self.con.cursor() cur.execute("select returnblob()") val = cur.fetchone()[0] - self.assertEqual(type(val), buffer) - self.assertEqual(val, buffer("blob")) + self.assertEqual(type(val), bytes) + self.assertEqual(val, b"blob") def CheckFuncReturnLongLong(self): cur = self.con.cursor() @@ -217,7 +218,7 @@ class FunctionTests(unittest.TestCase): cur.execute("select raiseexception()") cur.fetchone() self.fail("should have raised OperationalError") - except sqlite.OperationalError, e: + except sqlite.OperationalError as e: self.assertEqual(e.args[0], 'user-defined function raised exception') def CheckParamString(self): @@ -246,7 +247,7 @@ class FunctionTests(unittest.TestCase): def CheckParamBlob(self): cur = self.con.cursor() - cur.execute("select isblob(?)", (buffer("blob"),)) + cur.execute("select isblob(?)", (memoryview(b"blob"),)) val = cur.fetchone()[0] self.assertEqual(val, 1) @@ -270,7 +271,7 @@ class AggregateTests(unittest.TestCase): ) """) cur.execute("insert into test(t, i, f, n, b) values (?, ?, ?, ?, ?)", - ("foo", 5, 3.14, None, buffer("blob"),)) + ("foo", 5, 3.14, None, memoryview(b"blob"),)) self.con.create_aggregate("nostep", 1, AggrNoStep) self.con.create_aggregate("nofinalize", 1, AggrNoFinalize) @@ -297,8 +298,8 @@ class AggregateTests(unittest.TestCase): try: cur.execute("select nostep(t) from test") self.fail("should have raised an AttributeError") - except AttributeError, e: - self.assertEqual(e.args[0], "AggrNoStep instance has no attribute 'step'") + except AttributeError as e: + self.assertEqual(e.args[0], "'AggrNoStep' object has no attribute 'step'") def CheckAggrNoFinalize(self): cur = self.con.cursor() @@ -306,7 +307,7 @@ class AggregateTests(unittest.TestCase): cur.execute("select nofinalize(t) from test") val = cur.fetchone()[0] self.fail("should have raised an OperationalError") - except sqlite.OperationalError, e: + except sqlite.OperationalError as e: self.assertEqual(e.args[0], "user-defined aggregate's 'finalize' method raised error") def CheckAggrExceptionInInit(self): @@ -315,7 +316,7 @@ class AggregateTests(unittest.TestCase): cur.execute("select excInit(t) from test") val = cur.fetchone()[0] self.fail("should have raised an OperationalError") - except sqlite.OperationalError, e: + except sqlite.OperationalError as e: self.assertEqual(e.args[0], "user-defined aggregate's '__init__' method raised error") def CheckAggrExceptionInStep(self): @@ -324,7 +325,7 @@ class AggregateTests(unittest.TestCase): cur.execute("select excStep(t) from test") val = cur.fetchone()[0] self.fail("should have raised an OperationalError") - except sqlite.OperationalError, e: + except sqlite.OperationalError as e: self.assertEqual(e.args[0], "user-defined aggregate's 'step' method raised error") def CheckAggrExceptionInFinalize(self): @@ -333,7 +334,7 @@ class AggregateTests(unittest.TestCase): cur.execute("select excFinalize(t) from test") val = cur.fetchone()[0] self.fail("should have raised an OperationalError") - except sqlite.OperationalError, e: + except sqlite.OperationalError as e: self.assertEqual(e.args[0], "user-defined aggregate's 'finalize' method raised error") def CheckAggrCheckParamStr(self): @@ -362,7 +363,7 @@ class AggregateTests(unittest.TestCase): def CheckAggrCheckParamBlob(self): cur = self.con.cursor() - cur.execute("select checkType('blob', ?)", (buffer("blob"),)) + cur.execute("select checkType('blob', ?)", (memoryview(b"blob"),)) val = cur.fetchone()[0] self.assertEqual(val, 1) @@ -402,7 +403,7 @@ class AuthorizerTests(unittest.TestCase): def CheckTableAccess(self): try: self.con.execute("select * from t2") - except sqlite.DatabaseError, e: + except sqlite.DatabaseError as e: if not e.args[0].endswith("prohibited"): self.fail("wrong exception text: %s" % e.args[0]) return @@ -411,7 +412,7 @@ class AuthorizerTests(unittest.TestCase): def CheckColumnAccess(self): try: self.con.execute("select c2 from t1") - except sqlite.DatabaseError, e: + except sqlite.DatabaseError as e: if not e.args[0].endswith("prohibited"): self.fail("wrong exception text: %s" % e.args[0]) return |