aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/sqlite3
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/sqlite3')
-rw-r--r--Lib/sqlite3/__init__.py5
-rw-r--r--Lib/sqlite3/dbapi2.py15
-rw-r--r--Lib/sqlite3/test/dbapi.py58
-rw-r--r--Lib/sqlite3/test/dump.py2
-rw-r--r--Lib/sqlite3/test/factory.py60
-rw-r--r--Lib/sqlite3/test/hooks.py27
-rw-r--r--Lib/sqlite3/test/py25tests.py80
-rw-r--r--Lib/sqlite3/test/regression.py51
-rw-r--r--Lib/sqlite3/test/transactions.py3
-rw-r--r--Lib/sqlite3/test/types.py71
-rw-r--r--Lib/sqlite3/test/userfunctions.py55
11 files changed, 174 insertions, 253 deletions
diff --git a/Lib/sqlite3/__init__.py b/Lib/sqlite3/__init__.py
index 41ef2b76dfc..6c91df27cca 100644
--- a/Lib/sqlite3/__init__.py
+++ b/Lib/sqlite3/__init__.py
@@ -1,7 +1,6 @@
-#-*- coding: ISO-8859-1 -*-
# pysqlite2/__init__.py: the pysqlite2 package.
#
-# Copyright (C) 2005 Gerhard Häring <gh@ghaering.de>
+# Copyright (C) 2005 Gerhard Häring <gh@ghaering.de>
#
# This file is part of pysqlite.
#
@@ -21,4 +20,4 @@
# misrepresented as being the original software.
# 3. This notice may not be removed or altered from any source distribution.
-from dbapi2 import *
+from sqlite3.dbapi2 import *
diff --git a/Lib/sqlite3/dbapi2.py b/Lib/sqlite3/dbapi2.py
index 7eb28e87b1b..6c121a5c06d 100644
--- a/Lib/sqlite3/dbapi2.py
+++ b/Lib/sqlite3/dbapi2.py
@@ -1,7 +1,6 @@
-#-*- coding: ISO-8859-1 -*-
# pysqlite2/dbapi2.py: the DB-API 2.0 interface
#
-# Copyright (C) 2004-2005 Gerhard Häring <gh@ghaering.de>
+# Copyright (C) 2004-2005 Gerhard Häring <gh@ghaering.de>
#
# This file is part of pysqlite.
#
@@ -50,7 +49,7 @@ def TimestampFromTicks(ticks):
version_info = tuple([int(x) for x in version.split(".")])
sqlite_version_info = tuple([int(x) for x in sqlite_version.split(".")])
-Binary = buffer
+Binary = memoryview
def register_adapters_and_converters():
def adapt_date(val):
@@ -60,13 +59,13 @@ def register_adapters_and_converters():
return val.isoformat(" ")
def convert_date(val):
- return datetime.date(*map(int, val.split("-")))
+ return datetime.date(*map(int, val.split(b"-")))
def convert_timestamp(val):
- datepart, timepart = val.split(" ")
- year, month, day = map(int, datepart.split("-"))
- timepart_full = timepart.split(".")
- hours, minutes, seconds = map(int, timepart_full[0].split(":"))
+ datepart, timepart = val.split(b" ")
+ year, month, day = map(int, datepart.split(b"-"))
+ timepart_full = timepart.split(b".")
+ hours, minutes, seconds = map(int, timepart_full[0].split(b":"))
if len(timepart_full) == 2:
microseconds = int(timepart_full[1])
else:
diff --git a/Lib/sqlite3/test/dbapi.py b/Lib/sqlite3/test/dbapi.py
index c356d4706c4..202bd388760 100644
--- a/Lib/sqlite3/test/dbapi.py
+++ b/Lib/sqlite3/test/dbapi.py
@@ -22,7 +22,6 @@
# 3. This notice may not be removed or altered from any source distribution.
import unittest
-import sys
import sqlite3 as sqlite
try:
import threading
@@ -44,12 +43,12 @@ class ModuleTests(unittest.TestCase):
sqlite.paramstyle)
def CheckWarning(self):
- self.assertTrue(issubclass(sqlite.Warning, StandardError),
- "Warning is not a subclass of StandardError")
+ self.assertTrue(issubclass(sqlite.Warning, Exception),
+ "Warning is not a subclass of Exception")
def CheckError(self):
- self.assertTrue(issubclass(sqlite.Error, StandardError),
- "Error is not a subclass of StandardError")
+ self.assertTrue(issubclass(sqlite.Error, Exception),
+ "Error is not a subclass of Exception")
def CheckInterfaceError(self):
self.assertTrue(issubclass(sqlite.InterfaceError, sqlite.Error),
@@ -85,6 +84,7 @@ class ModuleTests(unittest.TestCase):
"NotSupportedError is not a subclass of DatabaseError")
class ConnectionTests(unittest.TestCase):
+
def setUp(self):
self.cx = sqlite.connect(":memory:")
cu = self.cx.cursor()
@@ -141,6 +141,28 @@ class ConnectionTests(unittest.TestCase):
self.assertEqual(self.cx.ProgrammingError, sqlite.ProgrammingError)
self.assertEqual(self.cx.NotSupportedError, sqlite.NotSupportedError)
+ def CheckInTransaction(self):
+ # Can't use db from setUp because we want to test initial state.
+ cx = sqlite.connect(":memory:")
+ cu = cx.cursor()
+ self.assertEqual(cx.in_transaction, False)
+ cu.execute("create table transactiontest(id integer primary key, name text)")
+ self.assertEqual(cx.in_transaction, False)
+ cu.execute("insert into transactiontest(name) values (?)", ("foo",))
+ self.assertEqual(cx.in_transaction, True)
+ cu.execute("select name from transactiontest where name=?", ["foo"])
+ row = cu.fetchone()
+ self.assertEqual(cx.in_transaction, True)
+ cx.commit()
+ self.assertEqual(cx.in_transaction, False)
+ cu.execute("select name from transactiontest where name=?", ["foo"])
+ row = cu.fetchone()
+ self.assertEqual(cx.in_transaction, False)
+
+ def CheckInTransactionRO(self):
+ with self.assertRaises(AttributeError):
+ self.cx.in_transaction = True
+
class CursorTests(unittest.TestCase):
def setUp(self):
self.cx = sqlite.connect(":memory:")
@@ -260,10 +282,6 @@ class CursorTests(unittest.TestCase):
self.assertEqual(row[0], "foo")
def CheckExecuteDictMapping_Mapping(self):
- # Test only works with Python 2.5 or later
- if sys.version_info < (2, 5, 0):
- return
-
class D(dict):
def __missing__(self, key):
return "foo"
@@ -339,7 +357,7 @@ class CursorTests(unittest.TestCase):
def __init__(self):
self.value = 5
- def next(self):
+ def __next__(self):
if self.value == 10:
raise StopIteration
else:
@@ -379,8 +397,8 @@ class CursorTests(unittest.TestCase):
self.fail("should have raised a TypeError")
except TypeError:
return
- except Exception, e:
- print "raised", e.__class__
+ except Exception as e:
+ print("raised", e.__class__)
self.fail("raised wrong exception.")
def CheckFetchIter(self):
@@ -653,7 +671,7 @@ class ConstructorTests(unittest.TestCase):
ts = sqlite.TimestampFromTicks(42)
def CheckBinary(self):
- b = sqlite.Binary(chr(0) + "'")
+ b = sqlite.Binary(b"\0'")
class ExtensionTests(unittest.TestCase):
def CheckScriptStringSql(self):
@@ -669,20 +687,6 @@ class ExtensionTests(unittest.TestCase):
res = cur.fetchone()[0]
self.assertEqual(res, 5)
- def CheckScriptStringUnicode(self):
- con = sqlite.connect(":memory:")
- cur = con.cursor()
- cur.executescript(u"""
- create table a(i);
- insert into a(i) values (5);
- select i from a;
- delete from a;
- insert into a(i) values (6);
- """)
- cur.execute("select i from a")
- res = cur.fetchone()[0]
- self.assertEqual(res, 6)
-
def CheckScriptSyntaxError(self):
con = sqlite.connect(":memory:")
cur = con.cursor()
diff --git a/Lib/sqlite3/test/dump.py b/Lib/sqlite3/test/dump.py
index 2e9b436f30a..b200333afac 100644
--- a/Lib/sqlite3/test/dump.py
+++ b/Lib/sqlite3/test/dump.py
@@ -47,7 +47,7 @@ class DumpTests(unittest.TestCase):
expected_sqls = ['BEGIN TRANSACTION;'] + expected_sqls + \
['COMMIT;']
[self.assertEqual(expected_sqls[i], actual_sqls[i])
- for i in xrange(len(expected_sqls))]
+ for i in range(len(expected_sqls))]
def suite():
return unittest.TestSuite(unittest.makeSuite(DumpTests, "Check"))
diff --git a/Lib/sqlite3/test/factory.py b/Lib/sqlite3/test/factory.py
index 52854be2b99..7f6f3473f3e 100644
--- a/Lib/sqlite3/test/factory.py
+++ b/Lib/sqlite3/test/factory.py
@@ -159,31 +159,31 @@ class TextFactoryTests(unittest.TestCase):
self.con = sqlite.connect(":memory:")
def CheckUnicode(self):
- austria = unicode("Österreich", "latin1")
+ austria = "Österreich"
row = self.con.execute("select ?", (austria,)).fetchone()
- self.assertTrue(type(row[0]) == unicode, "type of row[0] must be unicode")
+ self.assertTrue(type(row[0]) == str, "type of row[0] must be unicode")
def CheckString(self):
- self.con.text_factory = str
- austria = unicode("Österreich", "latin1")
+ self.con.text_factory = bytes
+ austria = "Österreich"
row = self.con.execute("select ?", (austria,)).fetchone()
- self.assertTrue(type(row[0]) == str, "type of row[0] must be str")
+ self.assertTrue(type(row[0]) == bytes, "type of row[0] must be bytes")
self.assertTrue(row[0] == austria.encode("utf-8"), "column must equal original data in UTF-8")
def CheckCustom(self):
- self.con.text_factory = lambda x: unicode(x, "utf-8", "ignore")
- austria = unicode("Österreich", "latin1")
- row = self.con.execute("select ?", (austria.encode("latin1"),)).fetchone()
- self.assertTrue(type(row[0]) == unicode, "type of row[0] must be unicode")
- self.assertTrue(row[0].endswith(u"reich"), "column must contain original data")
+ self.con.text_factory = lambda x: str(x, "utf-8", "ignore")
+ austria = "Österreich"
+ row = self.con.execute("select ?", (austria,)).fetchone()
+ self.assertTrue(type(row[0]) == str, "type of row[0] must be unicode")
+ self.assertTrue(row[0].endswith("reich"), "column must contain original data")
def CheckOptimizedUnicode(self):
self.con.text_factory = sqlite.OptimizedUnicode
- austria = unicode("Österreich", "latin1")
- germany = unicode("Deutchland")
+ austria = "Österreich"
+ germany = "Deutchland"
a_row = self.con.execute("select ?", (austria,)).fetchone()
d_row = self.con.execute("select ?", (germany,)).fetchone()
- self.assertTrue(type(a_row[0]) == unicode, "type of non-ASCII row must be unicode")
+ self.assertTrue(type(a_row[0]) == str, "type of non-ASCII row must be str")
self.assertTrue(type(d_row[0]) == str, "type of ASCII-only row must be str")
def tearDown(self):
@@ -196,33 +196,29 @@ class TextFactoryTestsWithEmbeddedZeroBytes(unittest.TestCase):
self.con.execute("insert into test (value) values (?)", ("a\x00b",))
def CheckString(self):
- # text_factory defaults to unicode
+ # text_factory defaults to str
row = self.con.execute("select value from test").fetchone()
- self.assertIs(type(row[0]), unicode)
+ self.assertIs(type(row[0]), str)
self.assertEqual(row[0], "a\x00b")
- def CheckCustom(self):
- # A custom factory should receive an str argument
- self.con.text_factory = lambda x: x
+ def CheckBytes(self):
+ self.con.text_factory = bytes
row = self.con.execute("select value from test").fetchone()
- self.assertIs(type(row[0]), str)
- self.assertEqual(row[0], "a\x00b")
+ self.assertIs(type(row[0]), bytes)
+ self.assertEqual(row[0], b"a\x00b")
- def CheckOptimizedUnicodeAsString(self):
- # ASCII -> str argument
- self.con.text_factory = sqlite.OptimizedUnicode
+ def CheckBytearray(self):
+ self.con.text_factory = bytearray
row = self.con.execute("select value from test").fetchone()
- self.assertIs(type(row[0]), str)
- self.assertEqual(row[0], "a\x00b")
+ self.assertIs(type(row[0]), bytearray)
+ self.assertEqual(row[0], b"a\x00b")
- def CheckOptimizedUnicodeAsUnicode(self):
- # Non-ASCII -> unicode argument
- self.con.text_factory = sqlite.OptimizedUnicode
- self.con.execute("delete from test")
- self.con.execute("insert into test (value) values (?)", (u'ä\0ö',))
+ def CheckCustom(self):
+ # A custom factory should receive a bytes argument
+ self.con.text_factory = lambda x: x
row = self.con.execute("select value from test").fetchone()
- self.assertIs(type(row[0]), unicode)
- self.assertEqual(row[0], u"ä\x00ö")
+ self.assertIs(type(row[0]), bytes)
+ self.assertEqual(row[0], b"a\x00b")
def tearDown(self):
self.con.close()
diff --git a/Lib/sqlite3/test/hooks.py b/Lib/sqlite3/test/hooks.py
index b798e749add..a92e8387861 100644
--- a/Lib/sqlite3/test/hooks.py
+++ b/Lib/sqlite3/test/hooks.py
@@ -21,7 +21,7 @@
# misrepresented as being the original software.
# 3. This notice may not be removed or altered from any source distribution.
-import os, unittest
+import unittest
import sqlite3 as sqlite
class CollationTests(unittest.TestCase):
@@ -36,15 +36,15 @@ class CollationTests(unittest.TestCase):
try:
con.create_collation("X", 42)
self.fail("should have raised a TypeError")
- except TypeError, e:
+ except TypeError as e:
self.assertEqual(e.args[0], "parameter must be callable")
def CheckCreateCollationNotAscii(self):
con = sqlite.connect(":memory:")
try:
- con.create_collation("collä", cmp)
+ con.create_collation("collä", lambda x, y: (x > y) - (x < y))
self.fail("should have raised a ProgrammingError")
- except sqlite.ProgrammingError, e:
+ except sqlite.ProgrammingError as e:
pass
def CheckCollationIsUsed(self):
@@ -52,7 +52,7 @@ class CollationTests(unittest.TestCase):
return
def mycoll(x, y):
# reverse order
- return -cmp(x, y)
+ return -((x > y) - (x < y))
con = sqlite.connect(":memory:")
con.create_collation("mycoll", mycoll)
@@ -73,7 +73,7 @@ class CollationTests(unittest.TestCase):
try:
result = con.execute(sql).fetchall()
self.fail("should have raised an OperationalError")
- except sqlite.OperationalError, e:
+ except sqlite.OperationalError as e:
self.assertEqual(e.args[0].lower(), "no such collation sequence: mycoll")
def CheckCollationRegisterTwice(self):
@@ -82,8 +82,8 @@ class CollationTests(unittest.TestCase):
Verify that the last one is actually used.
"""
con = sqlite.connect(":memory:")
- con.create_collation("mycoll", cmp)
- con.create_collation("mycoll", lambda x, y: -cmp(x, y))
+ con.create_collation("mycoll", lambda x, y: (x > y) - (x < y))
+ con.create_collation("mycoll", lambda x, y: -((x > y) - (x < y)))
result = con.execute("""
select x from (select 'a' as x union select 'b' as x) order by x collate mycoll
""").fetchall()
@@ -96,12 +96,12 @@ class CollationTests(unittest.TestCase):
to use it.
"""
con = sqlite.connect(":memory:")
- con.create_collation("mycoll", cmp)
+ con.create_collation("mycoll", lambda x, y: (x > y) - (x < y))
con.create_collation("mycoll", None)
try:
con.execute("select 'a' as x union select 'b' as x order by x collate mycoll")
self.fail("should have raised an OperationalError")
- except sqlite.OperationalError, e:
+ except sqlite.OperationalError as e:
if not e.args[0].startswith("no such collation sequence"):
self.fail("wrong OperationalError raised")
@@ -166,14 +166,15 @@ class ProgressTests(unittest.TestCase):
Test that setting the progress handler to None clears the previously set handler.
"""
con = sqlite.connect(":memory:")
- action = []
+ action = 0
def progress():
- action.append(1)
+ nonlocal action
+ action = 1
return 0
con.set_progress_handler(progress, 1)
con.set_progress_handler(None, 1)
con.execute("select 1 union select 2 union select 3").fetchall()
- self.assertEqual(len(action), 0, "progress handler was not cleared")
+ self.assertEqual(action, 0, "progress handler was not cleared")
def suite():
collation_suite = unittest.makeSuite(CollationTests, "Check")
diff --git a/Lib/sqlite3/test/py25tests.py b/Lib/sqlite3/test/py25tests.py
deleted file mode 100644
index 7fd3c0e0cf8..00000000000
--- a/Lib/sqlite3/test/py25tests.py
+++ /dev/null
@@ -1,80 +0,0 @@
-#-*- coding: ISO-8859-1 -*-
-# pysqlite2/test/regression.py: pysqlite regression tests
-#
-# Copyright (C) 2007 Gerhard Häring <gh@ghaering.de>
-#
-# This file is part of pysqlite.
-#
-# This software is provided 'as-is', without any express or implied
-# warranty. In no event will the authors be held liable for any damages
-# arising from the use of this software.
-#
-# Permission is granted to anyone to use this software for any purpose,
-# including commercial applications, and to alter it and redistribute it
-# freely, subject to the following restrictions:
-#
-# 1. The origin of this software must not be misrepresented; you must not
-# claim that you wrote the original software. If you use this software
-# in a product, an acknowledgment in the product documentation would be
-# appreciated but is not required.
-# 2. Altered source versions must be plainly marked as such, and must not be
-# misrepresented as being the original software.
-# 3. This notice may not be removed or altered from any source distribution.
-
-from __future__ import with_statement
-import unittest
-import sqlite3 as sqlite
-
-did_rollback = False
-
-class MyConnection(sqlite.Connection):
- def rollback(self):
- global did_rollback
- did_rollback = True
- sqlite.Connection.rollback(self)
-
-class ContextTests(unittest.TestCase):
- def setUp(self):
- global did_rollback
- self.con = sqlite.connect(":memory:", factory=MyConnection)
- self.con.execute("create table test(c unique)")
- did_rollback = False
-
- def tearDown(self):
- self.con.close()
-
- def CheckContextManager(self):
- """Can the connection be used as a context manager at all?"""
- with self.con:
- pass
-
- def CheckContextManagerCommit(self):
- """Is a commit called in the context manager?"""
- with self.con:
- self.con.execute("insert into test(c) values ('foo')")
- self.con.rollback()
- count = self.con.execute("select count(*) from test").fetchone()[0]
- self.assertEqual(count, 1)
-
- def CheckContextManagerRollback(self):
- """Is a rollback called in the context manager?"""
- global did_rollback
- self.assertEqual(did_rollback, False)
- try:
- with self.con:
- self.con.execute("insert into test(c) values (4)")
- self.con.execute("insert into test(c) values (4)")
- except sqlite.IntegrityError:
- pass
- self.assertEqual(did_rollback, True)
-
-def suite():
- ctx_suite = unittest.makeSuite(ContextTests, "Check")
- return unittest.TestSuite((ctx_suite,))
-
-def test():
- runner = unittest.TextTestRunner()
- runner.run(suite())
-
-if __name__ == "__main__":
- test()
diff --git a/Lib/sqlite3/test/regression.py b/Lib/sqlite3/test/regression.py
index eec2fcde951..c7551e35a1b 100644
--- a/Lib/sqlite3/test/regression.py
+++ b/Lib/sqlite3/test/regression.py
@@ -1,7 +1,7 @@
#-*- coding: ISO-8859-1 -*-
# pysqlite2/test/regression.py: pysqlite regression tests
#
-# Copyright (C) 2006-2007 Gerhard Häring <gh@ghaering.de>
+# Copyright (C) 2006-2010 Gerhard Häring <gh@ghaering.de>
#
# This file is part of pysqlite.
#
@@ -52,10 +52,10 @@ class RegressionTests(unittest.TestCase):
# reset before a rollback, but only those that are still in the
# statement cache. The others are not accessible from the connection object.
con = sqlite.connect(":memory:", cached_statements=5)
- cursors = [con.cursor() for x in xrange(5)]
+ cursors = [con.cursor() for x in range(5)]
cursors[0].execute("create table test(x)")
for i in range(10):
- cursors[0].executemany("insert into test(x) values (?)", [(x,) for x in xrange(10)])
+ cursors[0].executemany("insert into test(x) values (?)", [(x,) for x in range(10)])
for i in range(5):
cursors[i].execute(" " * i + "select x from test")
@@ -116,18 +116,6 @@ class RegressionTests(unittest.TestCase):
"""
self.con.execute("")
- def CheckUnicodeConnect(self):
- """
- With pysqlite 2.4.0 you needed to use a string or a APSW connection
- object for opening database connections.
-
- Formerly, both bytestrings and unicode strings used to work.
-
- Let's make sure unicode strings work in the future.
- """
- con = sqlite.connect(u":memory:")
- con.close()
-
def CheckTypeMapUsage(self):
"""
pysqlite until 2.4.1 did not rebuild the row_cast_map when recompiling
@@ -143,6 +131,21 @@ class RegressionTests(unittest.TestCase):
con.execute("insert into foo(bar) values (5)")
con.execute(SELECT)
+ def CheckErrorMsgDecodeError(self):
+ # When porting the module to Python 3.0, the error message about
+ # decoding errors disappeared. This verifies they're back again.
+ failure = None
+ try:
+ self.con.execute("select 'xxx' || ? || 'yyy' colname",
+ (bytes(bytearray([250])),)).fetchone()
+ failure = "should have raised an OperationalError with detailed description"
+ except sqlite.OperationalError as e:
+ msg = e.args[0]
+ if not msg.startswith("Could not decode to UTF-8 column 'colname' with text 'xxx"):
+ failure = "OperationalError did not have expected description text"
+ if failure:
+ self.fail(failure)
+
def CheckRegisterAdapter(self):
"""
See issue 3312.
@@ -154,8 +157,7 @@ class RegressionTests(unittest.TestCase):
See issue 3312.
"""
con = sqlite.connect(":memory:")
- self.assertRaises(UnicodeEncodeError, setattr, con,
- "isolation_level", u"\xe9")
+ setattr(con, "isolation_level", "\xe9")
def CheckCursorConstructorCallCheck(self):
"""
@@ -175,6 +177,14 @@ class RegressionTests(unittest.TestCase):
except:
self.fail("should have raised ProgrammingError")
+
+ def CheckStrSubclass(self):
+ """
+ The Python 3.0 port of the module didn't cope with values of subclasses of str.
+ """
+ class MyStr(str): pass
+ self.con.execute("select ?", (MyStr("abc"),))
+
def CheckConnectionConstructorCallCheck(self):
"""
Verifies that connection methods check wether base class __init__ was called.
@@ -264,6 +274,13 @@ class RegressionTests(unittest.TestCase):
"""
self.assertRaises(sqlite.Warning, self.con, 1)
+ def CheckCollation(self):
+ def collation_cb(a, b):
+ return 1
+ self.assertRaises(sqlite.ProgrammingError, self.con.create_collation,
+ # Lone surrogate cannot be encoded to the default encoding (utf8)
+ "\uDC80", collation_cb)
+
def CheckRecursiveCursorUse(self):
"""
http://bugs.python.org/issue10811
diff --git a/Lib/sqlite3/test/transactions.py b/Lib/sqlite3/test/transactions.py
index a732251b2d4..70e96a12ed3 100644
--- a/Lib/sqlite3/test/transactions.py
+++ b/Lib/sqlite3/test/transactions.py
@@ -21,7 +21,6 @@
# misrepresented as being the original software.
# 3. This notice may not be removed or altered from any source distribution.
-import sys
import os, unittest
import sqlite3 as sqlite
@@ -163,7 +162,7 @@ class TransactionTests(unittest.TestCase):
try:
cur.fetchall()
self.fail("InterfaceError should have been raised")
- except sqlite.InterfaceError, e:
+ except sqlite.InterfaceError as e:
pass
except:
self.fail("InterfaceError should have been raised")
diff --git a/Lib/sqlite3/test/types.py b/Lib/sqlite3/test/types.py
index c5ab39bb771..29413e14ec3 100644
--- a/Lib/sqlite3/test/types.py
+++ b/Lib/sqlite3/test/types.py
@@ -1,7 +1,7 @@
#-*- coding: ISO-8859-1 -*-
# pysqlite2/test/types.py: tests for type conversion and detection
#
-# Copyright (C) 2005-2007 Gerhard Häring <gh@ghaering.de>
+# Copyright (C) 2005 Gerhard Häring <gh@ghaering.de>
#
# This file is part of pysqlite.
#
@@ -41,10 +41,10 @@ class SqliteTypeTests(unittest.TestCase):
self.con.close()
def CheckString(self):
- self.cur.execute("insert into test(s) values (?)", (u"Österreich",))
+ self.cur.execute("insert into test(s) values (?)", ("Österreich",))
self.cur.execute("select s from test")
row = self.cur.fetchone()
- self.assertEqual(row[0], u"Österreich")
+ self.assertEqual(row[0], "Österreich")
def CheckSmallInt(self):
self.cur.execute("insert into test(i) values (?)", (42,))
@@ -67,47 +67,25 @@ class SqliteTypeTests(unittest.TestCase):
self.assertEqual(row[0], val)
def CheckBlob(self):
- val = buffer("Guglhupf")
+ sample = b"Guglhupf"
+ val = memoryview(sample)
self.cur.execute("insert into test(b) values (?)", (val,))
self.cur.execute("select b from test")
row = self.cur.fetchone()
- self.assertEqual(row[0], val)
+ self.assertEqual(row[0], sample)
def CheckUnicodeExecute(self):
- self.cur.execute(u"select 'Österreich'")
+ self.cur.execute("select 'Österreich'")
row = self.cur.fetchone()
- self.assertEqual(row[0], u"Österreich")
-
- def CheckNonUtf8_Default(self):
- try:
- self.cur.execute("select ?", (chr(150),))
- self.fail("should have raised a ProgrammingError")
- except sqlite.ProgrammingError:
- pass
-
- def CheckNonUtf8_TextFactoryString(self):
- orig_text_factory = self.con.text_factory
- try:
- self.con.text_factory = str
- self.cur.execute("select ?", (chr(150),))
- finally:
- self.con.text_factory = orig_text_factory
-
- def CheckNonUtf8_TextFactoryOptimizedUnicode(self):
- orig_text_factory = self.con.text_factory
- try:
- try:
- self.con.text_factory = sqlite.OptimizedUnicode
- self.cur.execute("select ?", (chr(150),))
- self.fail("should have raised a ProgrammingError")
- except sqlite.ProgrammingError:
- pass
- finally:
- self.con.text_factory = orig_text_factory
+ self.assertEqual(row[0], "Österreich")
class DeclTypesTests(unittest.TestCase):
class Foo:
def __init__(self, _val):
+ if isinstance(_val, bytes):
+ # sqlite3 always calls __init__ with a bytes created from a
+ # UTF-8 string when __conform__ was used to store the object.
+ _val = _val.decode('utf8')
self.val = _val
def __cmp__(self, other):
@@ -118,6 +96,12 @@ class DeclTypesTests(unittest.TestCase):
else:
return 1
+ def __eq__(self, other):
+ c = self.__cmp__(other)
+ if c is NotImplemented:
+ return c
+ return c == 0
+
def __conform__(self, protocol):
if protocol is sqlite.PrepareProtocol:
return self.val
@@ -194,7 +178,7 @@ class DeclTypesTests(unittest.TestCase):
def CheckUnicode(self):
# default
- val = u"\xd6sterreich"
+ val = "\xd6sterreich"
self.cur.execute("insert into test(u) values (?)", (val,))
self.cur.execute("select u from test")
row = self.cur.fetchone()
@@ -231,11 +215,12 @@ class DeclTypesTests(unittest.TestCase):
def CheckBlob(self):
# default
- val = buffer("Guglhupf")
+ sample = b"Guglhupf"
+ val = memoryview(sample)
self.cur.execute("insert into test(bin) values (?)", (val,))
self.cur.execute("select bin from test")
row = self.cur.fetchone()
- self.assertEqual(row[0], val)
+ self.assertEqual(row[0], sample)
def CheckNumber1(self):
self.cur.execute("insert into test(n1) values (5)")
@@ -256,9 +241,9 @@ class ColNamesTests(unittest.TestCase):
self.cur = self.con.cursor()
self.cur.execute("create table test(x foo)")
- sqlite.converters["FOO"] = lambda x: "[%s]" % x
- sqlite.converters["BAR"] = lambda x: "<%s>" % x
- sqlite.converters["EXC"] = lambda x: 5 // 0
+ sqlite.converters["FOO"] = lambda x: "[%s]" % x.decode("ascii")
+ sqlite.converters["BAR"] = lambda x: "<%s>" % x.decode("ascii")
+ sqlite.converters["EXC"] = lambda x: 5/0
sqlite.converters["B1B1"] = lambda x: "MARKER"
def tearDown(self):
@@ -296,7 +281,7 @@ class ColNamesTests(unittest.TestCase):
self.assertEqual(self.cur.description[0][0], "x")
def CheckCaseInConverterName(self):
- self.cur.execute("""select 'other' as "x [b1b1]\"""")
+ self.cur.execute("select 'other' as \"x [b1b1]\"")
val = self.cur.fetchone()[0]
self.assertEqual(val, "MARKER")
@@ -346,8 +331,8 @@ class BinaryConverterTests(unittest.TestCase):
self.con.close()
def CheckBinaryInputForConverter(self):
- testdata = "abcdefg" * 10
- result = self.con.execute('select ? as "x [bin]"', (buffer(zlib.compress(testdata)),)).fetchone()[0]
+ testdata = b"abcdefg" * 10
+ result = self.con.execute('select ? as "x [bin]"', (memoryview(zlib.compress(testdata)),)).fetchone()[0]
self.assertEqual(testdata, result)
class DateTimeTests(unittest.TestCase):
diff --git a/Lib/sqlite3/test/userfunctions.py b/Lib/sqlite3/test/userfunctions.py
index 2db3a610b34..e01341e9159 100644
--- a/Lib/sqlite3/test/userfunctions.py
+++ b/Lib/sqlite3/test/userfunctions.py
@@ -28,7 +28,7 @@ import sqlite3 as sqlite
def func_returntext():
return "foo"
def func_returnunicode():
- return u"bar"
+ return "bar"
def func_returnint():
return 42
def func_returnfloat():
@@ -36,14 +36,14 @@ def func_returnfloat():
def func_returnnull():
return None
def func_returnblob():
- return buffer("blob")
+ return b"blob"
def func_returnlonglong():
return 1<<31
def func_raiseexception():
- 5 // 0
+ 5/0
def func_isstring(v):
- return type(v) is unicode
+ return type(v) is str
def func_isint(v):
return type(v) is int
def func_isfloat(v):
@@ -51,9 +51,9 @@ def func_isfloat(v):
def func_isnone(v):
return type(v) is type(None)
def func_isblob(v):
- return type(v) is buffer
+ return isinstance(v, (bytes, memoryview))
def func_islonglong(v):
- return isinstance(v, (int, long)) and v >= 1<<31
+ return isinstance(v, int) and v >= 1<<31
class AggrNoStep:
def __init__(self):
@@ -71,7 +71,7 @@ class AggrNoFinalize:
class AggrExceptionInInit:
def __init__(self):
- 5 // 0
+ 5/0
def step(self, x):
pass
@@ -84,7 +84,7 @@ class AggrExceptionInStep:
pass
def step(self, x):
- 5 // 0
+ 5/0
def finalize(self):
return 42
@@ -97,14 +97,15 @@ class AggrExceptionInFinalize:
pass
def finalize(self):
- 5 // 0
+ 5/0
class AggrCheckType:
def __init__(self):
self.val = None
def step(self, whichType, val):
- theType = {"str": unicode, "int": int, "float": float, "None": type(None), "blob": buffer}
+ theType = {"str": str, "int": int, "float": float, "None": type(None),
+ "blob": bytes}
self.val = int(theType[whichType] is type(val))
def finalize(self):
@@ -166,15 +167,15 @@ class FunctionTests(unittest.TestCase):
cur = self.con.cursor()
cur.execute("select returntext()")
val = cur.fetchone()[0]
- self.assertEqual(type(val), unicode)
+ self.assertEqual(type(val), str)
self.assertEqual(val, "foo")
def CheckFuncReturnUnicode(self):
cur = self.con.cursor()
cur.execute("select returnunicode()")
val = cur.fetchone()[0]
- self.assertEqual(type(val), unicode)
- self.assertEqual(val, u"bar")
+ self.assertEqual(type(val), str)
+ self.assertEqual(val, "bar")
def CheckFuncReturnInt(self):
cur = self.con.cursor()
@@ -202,8 +203,8 @@ class FunctionTests(unittest.TestCase):
cur = self.con.cursor()
cur.execute("select returnblob()")
val = cur.fetchone()[0]
- self.assertEqual(type(val), buffer)
- self.assertEqual(val, buffer("blob"))
+ self.assertEqual(type(val), bytes)
+ self.assertEqual(val, b"blob")
def CheckFuncReturnLongLong(self):
cur = self.con.cursor()
@@ -217,7 +218,7 @@ class FunctionTests(unittest.TestCase):
cur.execute("select raiseexception()")
cur.fetchone()
self.fail("should have raised OperationalError")
- except sqlite.OperationalError, e:
+ except sqlite.OperationalError as e:
self.assertEqual(e.args[0], 'user-defined function raised exception')
def CheckParamString(self):
@@ -246,7 +247,7 @@ class FunctionTests(unittest.TestCase):
def CheckParamBlob(self):
cur = self.con.cursor()
- cur.execute("select isblob(?)", (buffer("blob"),))
+ cur.execute("select isblob(?)", (memoryview(b"blob"),))
val = cur.fetchone()[0]
self.assertEqual(val, 1)
@@ -270,7 +271,7 @@ class AggregateTests(unittest.TestCase):
)
""")
cur.execute("insert into test(t, i, f, n, b) values (?, ?, ?, ?, ?)",
- ("foo", 5, 3.14, None, buffer("blob"),))
+ ("foo", 5, 3.14, None, memoryview(b"blob"),))
self.con.create_aggregate("nostep", 1, AggrNoStep)
self.con.create_aggregate("nofinalize", 1, AggrNoFinalize)
@@ -297,8 +298,8 @@ class AggregateTests(unittest.TestCase):
try:
cur.execute("select nostep(t) from test")
self.fail("should have raised an AttributeError")
- except AttributeError, e:
- self.assertEqual(e.args[0], "AggrNoStep instance has no attribute 'step'")
+ except AttributeError as e:
+ self.assertEqual(e.args[0], "'AggrNoStep' object has no attribute 'step'")
def CheckAggrNoFinalize(self):
cur = self.con.cursor()
@@ -306,7 +307,7 @@ class AggregateTests(unittest.TestCase):
cur.execute("select nofinalize(t) from test")
val = cur.fetchone()[0]
self.fail("should have raised an OperationalError")
- except sqlite.OperationalError, e:
+ except sqlite.OperationalError as e:
self.assertEqual(e.args[0], "user-defined aggregate's 'finalize' method raised error")
def CheckAggrExceptionInInit(self):
@@ -315,7 +316,7 @@ class AggregateTests(unittest.TestCase):
cur.execute("select excInit(t) from test")
val = cur.fetchone()[0]
self.fail("should have raised an OperationalError")
- except sqlite.OperationalError, e:
+ except sqlite.OperationalError as e:
self.assertEqual(e.args[0], "user-defined aggregate's '__init__' method raised error")
def CheckAggrExceptionInStep(self):
@@ -324,7 +325,7 @@ class AggregateTests(unittest.TestCase):
cur.execute("select excStep(t) from test")
val = cur.fetchone()[0]
self.fail("should have raised an OperationalError")
- except sqlite.OperationalError, e:
+ except sqlite.OperationalError as e:
self.assertEqual(e.args[0], "user-defined aggregate's 'step' method raised error")
def CheckAggrExceptionInFinalize(self):
@@ -333,7 +334,7 @@ class AggregateTests(unittest.TestCase):
cur.execute("select excFinalize(t) from test")
val = cur.fetchone()[0]
self.fail("should have raised an OperationalError")
- except sqlite.OperationalError, e:
+ except sqlite.OperationalError as e:
self.assertEqual(e.args[0], "user-defined aggregate's 'finalize' method raised error")
def CheckAggrCheckParamStr(self):
@@ -362,7 +363,7 @@ class AggregateTests(unittest.TestCase):
def CheckAggrCheckParamBlob(self):
cur = self.con.cursor()
- cur.execute("select checkType('blob', ?)", (buffer("blob"),))
+ cur.execute("select checkType('blob', ?)", (memoryview(b"blob"),))
val = cur.fetchone()[0]
self.assertEqual(val, 1)
@@ -402,7 +403,7 @@ class AuthorizerTests(unittest.TestCase):
def CheckTableAccess(self):
try:
self.con.execute("select * from t2")
- except sqlite.DatabaseError, e:
+ except sqlite.DatabaseError as e:
if not e.args[0].endswith("prohibited"):
self.fail("wrong exception text: %s" % e.args[0])
return
@@ -411,7 +412,7 @@ class AuthorizerTests(unittest.TestCase):
def CheckColumnAccess(self):
try:
self.con.execute("select c2 from t1")
- except sqlite.DatabaseError, e:
+ except sqlite.DatabaseError as e:
if not e.args[0].endswith("prohibited"):
self.fail("wrong exception text: %s" % e.args[0])
return