diff options
Diffstat (limited to 'Lib/test/test_csv.py')
-rw-r--r-- | Lib/test/test_csv.py | 369 |
1 files changed, 126 insertions, 243 deletions
diff --git a/Lib/test/test_csv.py b/Lib/test/test_csv.py index 53ca5ab12f1..55796a204ad 100644 --- a/Lib/test/test_csv.py +++ b/Lib/test/test_csv.py @@ -1,16 +1,15 @@ -# -*- coding: iso-8859-1 -*- # Copyright (C) 2001,2002 Python Software Foundation # csv package unit tests +import io import sys import os import unittest -from StringIO import StringIO -import tempfile +from io import StringIO +from tempfile import TemporaryFile import csv import gc -import io -from test import test_support +from test import support class Test_Csv(unittest.TestCase): """ @@ -54,8 +53,8 @@ class Test_Csv(unittest.TestCase): self.assertEqual(obj.dialect.skipinitialspace, False) self.assertEqual(obj.dialect.strict, False) # Try deleting or changing attributes (they are read-only) - self.assertRaises(TypeError, delattr, obj.dialect, 'delimiter') - self.assertRaises(TypeError, setattr, obj.dialect, 'delimiter', ':') + self.assertRaises(AttributeError, delattr, obj.dialect, 'delimiter') + self.assertRaises(AttributeError, setattr, obj.dialect, 'delimiter', ':') self.assertRaises(AttributeError, delattr, obj.dialect, 'quoting') self.assertRaises(AttributeError, setattr, obj.dialect, 'quoting', None) @@ -118,17 +117,12 @@ class Test_Csv(unittest.TestCase): def _write_test(self, fields, expect, **kwargs): - fd, name = tempfile.mkstemp() - fileobj = os.fdopen(fd, "w+b") - try: + with TemporaryFile("w+", newline='') as fileobj: writer = csv.writer(fileobj, **kwargs) writer.writerow(fields) fileobj.seek(0) self.assertEqual(fileobj.read(), expect + writer.dialect.lineterminator) - finally: - fileobj.close() - os.unlink(name) def test_write_arg_valid(self): self.assertRaises(csv.Error, self._write_test, None, '') @@ -195,29 +189,13 @@ class Test_Csv(unittest.TestCase): raise IOError writer = csv.writer(BrokenFile()) self.assertRaises(IOError, writer.writerows, [['a']]) - fd, name = tempfile.mkstemp() - fileobj = os.fdopen(fd, "w+b") - try: + + with TemporaryFile("w+", newline='') as fileobj: writer = csv.writer(fileobj) self.assertRaises(TypeError, writer.writerows, None) writer.writerows([['a','b'],['c','d']]) fileobj.seek(0) self.assertEqual(fileobj.read(), "a,b\r\nc,d\r\n") - finally: - fileobj.close() - os.unlink(name) - - def test_write_float(self): - # Issue 13573: loss of precision because csv.writer - # uses str() for floats instead of repr() - orig_row = [1.234567890123, 1.0/7.0, 'abc'] - f = StringIO() - c = csv.writer(f, quoting=csv.QUOTE_NONNUMERIC) - c.writerow(orig_row) - f.seek(0) - c = csv.reader(f, quoting=csv.QUOTE_NONNUMERIC) - new_row = next(c) - self.assertEqual(orig_row, new_row) def _read_test(self, input, expect, **kwargs): reader = csv.reader(input, **kwargs) @@ -234,6 +212,10 @@ class Test_Csv(unittest.TestCase): ['ab\0c'], None, strict = 1) self._read_test(['"ab"c'], [['abc']], doublequote = 0) + self.assertRaises(csv.Error, self._read_test, + [b'ab\0c'], None) + + def test_read_eol(self): self._read_test(['a,b'], [['a','b']]) self._read_test(['a,b\n'], [['a','b']]) @@ -294,23 +276,19 @@ class Test_Csv(unittest.TestCase): csv.field_size_limit(limit) def test_read_linenum(self): - for r in (csv.reader(['line,1', 'line,2', 'line,3']), - csv.DictReader(['line,1', 'line,2', 'line,3'], - fieldnames=['a', 'b', 'c'])): - self.assertEqual(r.line_num, 0) - r.next() - self.assertEqual(r.line_num, 1) - r.next() - self.assertEqual(r.line_num, 2) - r.next() - self.assertEqual(r.line_num, 3) - self.assertRaises(StopIteration, r.next) - self.assertEqual(r.line_num, 3) + r = csv.reader(['line,1', 'line,2', 'line,3']) + self.assertEqual(r.line_num, 0) + next(r) + self.assertEqual(r.line_num, 1) + next(r) + self.assertEqual(r.line_num, 2) + next(r) + self.assertEqual(r.line_num, 3) + self.assertRaises(StopIteration, next, r) + self.assertEqual(r.line_num, 3) def test_roundtrip_quoteed_newlines(self): - fd, name = tempfile.mkstemp() - fileobj = os.fdopen(fd, "w+b") - try: + with TemporaryFile("w+", newline='') as fileobj: writer = csv.writer(fileobj) self.assertRaises(TypeError, writer.writerows, None) rows = [['a\nb','b'],['c','x\r\nd']] @@ -318,9 +296,6 @@ class Test_Csv(unittest.TestCase): fileobj.seek(0) for i, row in enumerate(csv.reader(fileobj)): self.assertEqual(row, rows[i]) - finally: - fileobj.close() - os.unlink(name) class TestDialectRegistry(unittest.TestCase): def test_registry_badargs(self): @@ -370,17 +345,21 @@ class TestDialectRegistry(unittest.TestCase): quoting = csv.QUOTE_NONE escapechar = "\\" - fd, name = tempfile.mkstemp() - fileobj = os.fdopen(fd, "w+b") - try: + with TemporaryFile("w+") as fileobj: fileobj.write("abc def\nc1ccccc1 benzene\n") fileobj.seek(0) - rdr = csv.reader(fileobj, dialect=space()) - self.assertEqual(rdr.next(), ["abc", "def"]) - self.assertEqual(rdr.next(), ["c1ccccc1", "benzene"]) - finally: - fileobj.close() - os.unlink(name) + reader = csv.reader(fileobj, dialect=space()) + self.assertEqual(next(reader), ["abc", "def"]) + self.assertEqual(next(reader), ["c1ccccc1", "benzene"]) + + def compare_dialect_123(self, expected, *writeargs, **kwwriteargs): + + with TemporaryFile("w+", newline='', encoding="utf-8") as fileobj: + + writer = csv.writer(fileobj, *writeargs, **kwwriteargs) + writer.writerow([1,2,3]) + fileobj.seek(0) + self.assertEqual(fileobj.read(), expected) def test_dialect_apply(self): class testA(csv.excel): @@ -389,63 +368,19 @@ class TestDialectRegistry(unittest.TestCase): delimiter = ":" class testC(csv.excel): delimiter = "|" + class testUni(csv.excel): + delimiter = "\u039B" csv.register_dialect('testC', testC) try: - fd, name = tempfile.mkstemp() - fileobj = os.fdopen(fd, "w+b") - try: - writer = csv.writer(fileobj) - writer.writerow([1,2,3]) - fileobj.seek(0) - self.assertEqual(fileobj.read(), "1,2,3\r\n") - finally: - fileobj.close() - os.unlink(name) - - fd, name = tempfile.mkstemp() - fileobj = os.fdopen(fd, "w+b") - try: - writer = csv.writer(fileobj, testA) - writer.writerow([1,2,3]) - fileobj.seek(0) - self.assertEqual(fileobj.read(), "1\t2\t3\r\n") - finally: - fileobj.close() - os.unlink(name) - - fd, name = tempfile.mkstemp() - fileobj = os.fdopen(fd, "w+b") - try: - writer = csv.writer(fileobj, dialect=testB()) - writer.writerow([1,2,3]) - fileobj.seek(0) - self.assertEqual(fileobj.read(), "1:2:3\r\n") - finally: - fileobj.close() - os.unlink(name) - - fd, name = tempfile.mkstemp() - fileobj = os.fdopen(fd, "w+b") - try: - writer = csv.writer(fileobj, dialect='testC') - writer.writerow([1,2,3]) - fileobj.seek(0) - self.assertEqual(fileobj.read(), "1|2|3\r\n") - finally: - fileobj.close() - os.unlink(name) - - fd, name = tempfile.mkstemp() - fileobj = os.fdopen(fd, "w+b") - try: - writer = csv.writer(fileobj, dialect=testA, delimiter=';') - writer.writerow([1,2,3]) - fileobj.seek(0) - self.assertEqual(fileobj.read(), "1;2;3\r\n") - finally: - fileobj.close() - os.unlink(name) + self.compare_dialect_123("1,2,3\r\n") + self.compare_dialect_123("1\t2\t3\r\n", testA) + self.compare_dialect_123("1:2:3\r\n", dialect=testB()) + self.compare_dialect_123("1|2|3\r\n", dialect='testC') + self.compare_dialect_123("1;2;3\r\n", dialect=testA, + delimiter=';') + self.compare_dialect_123("1\u039B2\u039B3\r\n", + dialect=testUni) finally: csv.unregister_dialect('testC') @@ -460,29 +395,19 @@ class TestDialectRegistry(unittest.TestCase): class TestCsvBase(unittest.TestCase): def readerAssertEqual(self, input, expected_result): - fd, name = tempfile.mkstemp() - fileobj = os.fdopen(fd, "w+b") - try: + with TemporaryFile("w+", newline='') as fileobj: fileobj.write(input) fileobj.seek(0) reader = csv.reader(fileobj, dialect = self.dialect) fields = list(reader) self.assertEqual(fields, expected_result) - finally: - fileobj.close() - os.unlink(name) def writerAssertEqual(self, input, expected_result): - fd, name = tempfile.mkstemp() - fileobj = os.fdopen(fd, "w+b") - try: + with TemporaryFile("w+", newline='') as fileobj: writer = csv.writer(fileobj, dialect = self.dialect) writer.writerows(input) fileobj.seek(0) self.assertEqual(fileobj.read(), expected_result) - finally: - fileobj.close() - os.unlink(name) class TestDialectExcel(TestCsvBase): dialect = 'excel' @@ -594,6 +519,15 @@ class TestEscapedExcel(TestCsvBase): def test_read_escape_fieldsep(self): self.readerAssertEqual('abc\\,def\r\n', [['abc,def']]) +class TestDialectUnix(TestCsvBase): + dialect = 'unix' + + def test_simple_writer(self): + self.writerAssertEqual([[1, 'abc def', 'abc']], '"1","abc def","abc"\n') + + def test_simple_reader(self): + self.readerAssertEqual('"1","abc def","abc"\n', [['1', 'abc def', 'abc']]) + class QuotedEscapedExcel(csv.excel): quoting = csv.QUOTE_NONNUMERIC escapechar = '\\' @@ -611,9 +545,7 @@ class TestDictFields(unittest.TestCase): ### "long" means the row is longer than the number of fieldnames ### "short" means there are fewer elements in the row than fieldnames def test_write_simple_dict(self): - fd, name = tempfile.mkstemp() - fileobj = io.open(fd, 'w+b') - try: + with TemporaryFile("w+", newline='') as fileobj: writer = csv.DictWriter(fileobj, fieldnames = ["f1", "f2", "f3"]) writer.writeheader() fileobj.seek(0) @@ -622,130 +554,88 @@ class TestDictFields(unittest.TestCase): fileobj.seek(0) fileobj.readline() # header self.assertEqual(fileobj.read(), "10,,abc\r\n") - finally: - fileobj.close() - os.unlink(name) def test_write_no_fields(self): fileobj = StringIO() self.assertRaises(TypeError, csv.DictWriter, fileobj) def test_read_dict_fields(self): - fd, name = tempfile.mkstemp() - fileobj = os.fdopen(fd, "w+b") - try: + with TemporaryFile("w+") as fileobj: fileobj.write("1,2,abc\r\n") fileobj.seek(0) reader = csv.DictReader(fileobj, fieldnames=["f1", "f2", "f3"]) - self.assertEqual(reader.next(), {"f1": '1', "f2": '2', "f3": 'abc'}) - finally: - fileobj.close() - os.unlink(name) + self.assertEqual(next(reader), {"f1": '1', "f2": '2', "f3": 'abc'}) def test_read_dict_no_fieldnames(self): - fd, name = tempfile.mkstemp() - fileobj = os.fdopen(fd, "w+b") - try: + with TemporaryFile("w+") as fileobj: fileobj.write("f1,f2,f3\r\n1,2,abc\r\n") fileobj.seek(0) reader = csv.DictReader(fileobj) + self.assertEqual(next(reader), {"f1": '1', "f2": '2', "f3": 'abc'}) self.assertEqual(reader.fieldnames, ["f1", "f2", "f3"]) - self.assertEqual(reader.next(), {"f1": '1', "f2": '2', "f3": 'abc'}) - finally: - fileobj.close() - os.unlink(name) # Two test cases to make sure existing ways of implicitly setting # fieldnames continue to work. Both arise from discussion in issue3436. def test_read_dict_fieldnames_from_file(self): - fd, name = tempfile.mkstemp() - f = os.fdopen(fd, "w+b") - try: - f.write("f1,f2,f3\r\n1,2,abc\r\n") - f.seek(0) - reader = csv.DictReader(f, fieldnames=csv.reader(f).next()) + with TemporaryFile("w+") as fileobj: + fileobj.write("f1,f2,f3\r\n1,2,abc\r\n") + fileobj.seek(0) + reader = csv.DictReader(fileobj, + fieldnames=next(csv.reader(fileobj))) self.assertEqual(reader.fieldnames, ["f1", "f2", "f3"]) - self.assertEqual(reader.next(), {"f1": '1', "f2": '2', "f3": 'abc'}) - finally: - f.close() - os.unlink(name) + self.assertEqual(next(reader), {"f1": '1', "f2": '2', "f3": 'abc'}) def test_read_dict_fieldnames_chain(self): import itertools - fd, name = tempfile.mkstemp() - f = os.fdopen(fd, "w+b") - try: - f.write("f1,f2,f3\r\n1,2,abc\r\n") - f.seek(0) - reader = csv.DictReader(f) + with TemporaryFile("w+") as fileobj: + fileobj.write("f1,f2,f3\r\n1,2,abc\r\n") + fileobj.seek(0) + reader = csv.DictReader(fileobj) first = next(reader) for row in itertools.chain([first], reader): self.assertEqual(reader.fieldnames, ["f1", "f2", "f3"]) self.assertEqual(row, {"f1": '1', "f2": '2', "f3": 'abc'}) - finally: - f.close() - os.unlink(name) def test_read_long(self): - fd, name = tempfile.mkstemp() - fileobj = os.fdopen(fd, "w+b") - try: + with TemporaryFile("w+") as fileobj: fileobj.write("1,2,abc,4,5,6\r\n") fileobj.seek(0) reader = csv.DictReader(fileobj, fieldnames=["f1", "f2"]) - self.assertEqual(reader.next(), {"f1": '1', "f2": '2', + self.assertEqual(next(reader), {"f1": '1', "f2": '2', None: ["abc", "4", "5", "6"]}) - finally: - fileobj.close() - os.unlink(name) def test_read_long_with_rest(self): - fd, name = tempfile.mkstemp() - fileobj = os.fdopen(fd, "w+b") - try: + with TemporaryFile("w+") as fileobj: fileobj.write("1,2,abc,4,5,6\r\n") fileobj.seek(0) reader = csv.DictReader(fileobj, fieldnames=["f1", "f2"], restkey="_rest") - self.assertEqual(reader.next(), {"f1": '1', "f2": '2', + self.assertEqual(next(reader), {"f1": '1', "f2": '2', "_rest": ["abc", "4", "5", "6"]}) - finally: - fileobj.close() - os.unlink(name) def test_read_long_with_rest_no_fieldnames(self): - fd, name = tempfile.mkstemp() - fileobj = os.fdopen(fd, "w+b") - try: + with TemporaryFile("w+") as fileobj: fileobj.write("f1,f2\r\n1,2,abc,4,5,6\r\n") fileobj.seek(0) reader = csv.DictReader(fileobj, restkey="_rest") self.assertEqual(reader.fieldnames, ["f1", "f2"]) - self.assertEqual(reader.next(), {"f1": '1', "f2": '2', + self.assertEqual(next(reader), {"f1": '1', "f2": '2', "_rest": ["abc", "4", "5", "6"]}) - finally: - fileobj.close() - os.unlink(name) def test_read_short(self): - fd, name = tempfile.mkstemp() - fileobj = os.fdopen(fd, "w+b") - try: + with TemporaryFile("w+") as fileobj: fileobj.write("1,2,abc,4,5,6\r\n1,2,abc\r\n") fileobj.seek(0) reader = csv.DictReader(fileobj, fieldnames="1 2 3 4 5 6".split(), restval="DEFAULT") - self.assertEqual(reader.next(), {"1": '1', "2": '2', "3": 'abc', + self.assertEqual(next(reader), {"1": '1', "2": '2', "3": 'abc', "4": '4', "5": '5', "6": '6'}) - self.assertEqual(reader.next(), {"1": '1', "2": '2', "3": 'abc', + self.assertEqual(next(reader), {"1": '1', "2": '2', "3": 'abc', "4": 'DEFAULT', "5": 'DEFAULT', "6": 'DEFAULT'}) - finally: - fileobj.close() - os.unlink(name) def test_read_multi(self): sample = [ @@ -756,7 +646,7 @@ class TestDictFields(unittest.TestCase): reader = csv.DictReader(sample, fieldnames="i1 float i2 s1 s2".split()) - self.assertEqual(reader.next(), {"i1": '2147483648', + self.assertEqual(next(reader), {"i1": '2147483648', "float": '43.0e12', "i2": '17', "s1": 'abc', @@ -766,16 +656,16 @@ class TestDictFields(unittest.TestCase): reader = csv.DictReader(["1,2,abc,4,5,6\r\n","\r\n", "1,2,abc,4,5,6\r\n"], fieldnames="1 2 3 4 5 6".split()) - self.assertEqual(reader.next(), {"1": '1', "2": '2', "3": 'abc', + self.assertEqual(next(reader), {"1": '1', "2": '2', "3": 'abc', "4": '4', "5": '5', "6": '6'}) - self.assertEqual(reader.next(), {"1": '1', "2": '2', "3": 'abc', + self.assertEqual(next(reader), {"1": '1', "2": '2', "3": 'abc', "4": '4', "5": '5', "6": '6'}) def test_read_semi_sep(self): reader = csv.DictReader(["1;2;abc;4;5;6\r\n"], fieldnames="1 2 3 4 5 6".split(), delimiter=';') - self.assertEqual(reader.next(), {"1": '1', "2": '2', "3": 'abc', + self.assertEqual(next(reader), {"1": '1', "2": '2', "3": 'abc', "4": '4', "5": '5', "6": '6'}) class TestArrayWrites(unittest.TestCase): @@ -784,64 +674,45 @@ class TestArrayWrites(unittest.TestCase): contents = [(20-i) for i in range(20)] a = array.array('i', contents) - fd, name = tempfile.mkstemp() - fileobj = os.fdopen(fd, "w+b") - try: + with TemporaryFile("w+", newline='') as fileobj: writer = csv.writer(fileobj, dialect="excel") writer.writerow(a) expected = ",".join([str(i) for i in a])+"\r\n" fileobj.seek(0) self.assertEqual(fileobj.read(), expected) - finally: - fileobj.close() - os.unlink(name) def test_double_write(self): import array contents = [(20-i)*0.1 for i in range(20)] a = array.array('d', contents) - fd, name = tempfile.mkstemp() - fileobj = os.fdopen(fd, "w+b") - try: + with TemporaryFile("w+", newline='') as fileobj: writer = csv.writer(fileobj, dialect="excel") writer.writerow(a) - expected = ",".join([repr(i) for i in a])+"\r\n" + expected = ",".join([str(i) for i in a])+"\r\n" fileobj.seek(0) self.assertEqual(fileobj.read(), expected) - finally: - fileobj.close() - os.unlink(name) def test_float_write(self): import array contents = [(20-i)*0.1 for i in range(20)] a = array.array('f', contents) - fd, name = tempfile.mkstemp() - fileobj = os.fdopen(fd, "w+b") - try: + with TemporaryFile("w+", newline='') as fileobj: writer = csv.writer(fileobj, dialect="excel") writer.writerow(a) - expected = ",".join([repr(i) for i in a])+"\r\n" + expected = ",".join([str(i) for i in a])+"\r\n" fileobj.seek(0) self.assertEqual(fileobj.read(), expected) - finally: - fileobj.close() - os.unlink(name) def test_char_write(self): import array, string - a = array.array('c', string.letters) - fd, name = tempfile.mkstemp() - fileobj = os.fdopen(fd, "w+b") - try: + a = array.array('u', string.ascii_letters) + + with TemporaryFile("w+", newline='') as fileobj: writer = csv.writer(fileobj, dialect="excel") writer.writerow(a) expected = ",".join(a)+"\r\n" fileobj.seek(0) self.assertEqual(fileobj.read(), expected) - finally: - fileobj.close() - os.unlink(name) class TestDialectValidity(unittest.TestCase): def test_quoting(self): @@ -979,7 +850,7 @@ Stonecutters Seafood and Chop House, Lemont, IL, 12/19/02, Week Back self.assertTrue(dialect.doublequote) if not hasattr(sys, "gettotalrefcount"): - if test_support.verbose: print "*** skipping leakage tests ***" + if support.verbose: print("*** skipping leakage tests ***") else: class NUL: def write(s, *args): @@ -990,7 +861,7 @@ else: def test_create_read(self): delta = 0 lastrc = sys.gettotalrefcount() - for i in xrange(20): + for i in range(20): gc.collect() self.assertEqual(gc.garbage, []) rc = sys.gettotalrefcount() @@ -1006,7 +877,7 @@ else: delta = 0 lastrc = sys.gettotalrefcount() s = NUL() - for i in xrange(20): + for i in range(20): gc.collect() self.assertEqual(gc.garbage, []) rc = sys.gettotalrefcount() @@ -1022,7 +893,7 @@ else: delta = 0 rows = ["a,b,c\r\n"]*5 lastrc = sys.gettotalrefcount() - for i in xrange(20): + for i in range(20): gc.collect() self.assertEqual(gc.garbage, []) rc = sys.gettotalrefcount() @@ -1039,7 +910,7 @@ else: rows = [[1,2,3]]*5 s = NUL() lastrc = sys.gettotalrefcount() - for i in xrange(20): + for i in range(20): gc.collect() self.assertEqual(gc.garbage, []) rc = sys.gettotalrefcount() @@ -1051,24 +922,36 @@ else: # if writer leaks during write, last delta should be 5 or more self.assertEqual(delta < 5, True) -# commented out for now - csv module doesn't yet support Unicode -## class TestUnicode(unittest.TestCase): -## def test_unicode_read(self): -## import codecs -## f = codecs.EncodedFile(StringIO("Martin von Löwis," -## "Marc André Lemburg," -## "Guido van Rossum," -## "François Pinard\r\n"), -## data_encoding='iso-8859-1') -## reader = csv.reader(f) -## self.assertEqual(list(reader), [[u"Martin von Löwis", -## u"Marc André Lemburg", -## u"Guido van Rossum", -## u"François Pinardn"]]) +class TestUnicode(unittest.TestCase): + + names = ["Martin von Löwis", + "Marc André Lemburg", + "Guido van Rossum", + "François Pinard"] + + def test_unicode_read(self): + import io + with TemporaryFile("w+", newline='', encoding="utf-8") as fileobj: + fileobj.write(",".join(self.names) + "\r\n") + fileobj.seek(0) + reader = csv.reader(fileobj) + self.assertEqual(list(reader), [self.names]) + + + def test_unicode_write(self): + import io + with TemporaryFile("w+", newline='', encoding="utf-8") as fileobj: + writer = csv.writer(fileobj) + writer.writerow(self.names) + expected = ",".join(self.names)+"\r\n" + fileobj.seek(0) + self.assertEqual(fileobj.read(), expected) + + def test_main(): mod = sys.modules[__name__] - test_support.run_unittest( + support.run_unittest( *[getattr(mod, name) for name in dir(mod) if name.startswith('Test')] ) |