| # Adapted from test_file.py by Daniel Stutzbach | |
| from __future__ import unicode_literals | |
| import sys | |
| import os | |
| import errno | |
| import unittest | |
| from array import array | |
| from weakref import proxy | |
| from functools import wraps | |
| from test.test_support import TESTFN, check_warnings, run_unittest, make_bad_fd | |
| from test.test_support import py3k_bytes as bytes | |
| from test.script_helper import run_python | |
| from _io import FileIO as _FileIO | |
| class AutoFileTests(unittest.TestCase): | |
| # file tests for which a test file is automatically set up | |
| def setUp(self): | |
| self.f = _FileIO(TESTFN, 'w') | |
| def tearDown(self): | |
| if self.f: | |
| self.f.close() | |
| os.remove(TESTFN) | |
| def testWeakRefs(self): | |
| # verify weak references | |
| p = proxy(self.f) | |
| p.write(bytes(range(10))) | |
| self.assertEqual(self.f.tell(), p.tell()) | |
| self.f.close() | |
| self.f = None | |
| self.assertRaises(ReferenceError, getattr, p, 'tell') | |
| def testSeekTell(self): | |
| self.f.write(bytes(range(20))) | |
| self.assertEqual(self.f.tell(), 20) | |
| self.f.seek(0) | |
| self.assertEqual(self.f.tell(), 0) | |
| self.f.seek(10) | |
| self.assertEqual(self.f.tell(), 10) | |
| self.f.seek(5, 1) | |
| self.assertEqual(self.f.tell(), 15) | |
| self.f.seek(-5, 1) | |
| self.assertEqual(self.f.tell(), 10) | |
| self.f.seek(-5, 2) | |
| self.assertEqual(self.f.tell(), 15) | |
| def testAttributes(self): | |
| # verify expected attributes exist | |
| f = self.f | |
| self.assertEqual(f.mode, "wb") | |
| self.assertEqual(f.closed, False) | |
| # verify the attributes are readonly | |
| for attr in 'mode', 'closed': | |
| self.assertRaises((AttributeError, TypeError), | |
| setattr, f, attr, 'oops') | |
| def testReadinto(self): | |
| # verify readinto | |
| self.f.write(b"\x01\x02") | |
| self.f.close() | |
| a = array(b'b', b'x'*10) | |
| self.f = _FileIO(TESTFN, 'r') | |
| n = self.f.readinto(a) | |
| self.assertEqual(array(b'b', [1, 2]), a[:n]) | |
| def test_none_args(self): | |
| self.f.write(b"hi\nbye\nabc") | |
| self.f.close() | |
| self.f = _FileIO(TESTFN, 'r') | |
| self.assertEqual(self.f.read(None), b"hi\nbye\nabc") | |
| self.f.seek(0) | |
| self.assertEqual(self.f.readline(None), b"hi\n") | |
| self.assertEqual(self.f.readlines(None), [b"bye\n", b"abc"]) | |
| def testRepr(self): | |
| self.assertEqual(repr(self.f), "<_io.FileIO name=%r mode='%s'>" | |
| % (self.f.name, self.f.mode)) | |
| del self.f.name | |
| self.assertEqual(repr(self.f), "<_io.FileIO fd=%r mode='%s'>" | |
| % (self.f.fileno(), self.f.mode)) | |
| self.f.close() | |
| self.assertEqual(repr(self.f), "<_io.FileIO [closed]>") | |
| def testErrors(self): | |
| f = self.f | |
| self.assertTrue(not f.isatty()) | |
| self.assertTrue(not f.closed) | |
| #self.assertEqual(f.name, TESTFN) | |
| self.assertRaises(ValueError, f.read, 10) # Open for reading | |
| f.close() | |
| self.assertTrue(f.closed) | |
| f = _FileIO(TESTFN, 'r') | |
| self.assertRaises(TypeError, f.readinto, "") | |
| self.assertTrue(not f.closed) | |
| f.close() | |
| self.assertTrue(f.closed) | |
| def testMethods(self): | |
| methods = ['fileno', 'isatty', 'read', 'readinto', | |
| 'seek', 'tell', 'truncate', 'write', 'seekable', | |
| 'readable', 'writable'] | |
| if sys.platform.startswith('atheos'): | |
| methods.remove('truncate') | |
| self.f.close() | |
| self.assertTrue(self.f.closed) | |
| for methodname in methods: | |
| method = getattr(self.f, methodname) | |
| # should raise on closed file | |
| self.assertRaises(ValueError, method) | |
| def testOpendir(self): | |
| # Issue 3703: opening a directory should fill the errno | |
| # Windows always returns "[Errno 13]: Permission denied | |
| # Unix calls dircheck() and returns "[Errno 21]: Is a directory" | |
| try: | |
| _FileIO('.', 'r') | |
| except IOError as e: | |
| self.assertNotEqual(e.errno, 0) | |
| self.assertEqual(e.filename, ".") | |
| else: | |
| self.fail("Should have raised IOError") | |
| #A set of functions testing that we get expected behaviour if someone has | |
| #manually closed the internal file descriptor. First, a decorator: | |
| def ClosedFD(func): | |
| @wraps(func) | |
| def wrapper(self): | |
| #forcibly close the fd before invoking the problem function | |
| f = self.f | |
| os.close(f.fileno()) | |
| try: | |
| func(self, f) | |
| finally: | |
| try: | |
| self.f.close() | |
| except IOError: | |
| pass | |
| return wrapper | |
| def ClosedFDRaises(func): | |
| @wraps(func) | |
| def wrapper(self): | |
| #forcibly close the fd before invoking the problem function | |
| f = self.f | |
| os.close(f.fileno()) | |
| try: | |
| func(self, f) | |
| except IOError as e: | |
| self.assertEqual(e.errno, errno.EBADF) | |
| else: | |
| self.fail("Should have raised IOError") | |
| finally: | |
| try: | |
| self.f.close() | |
| except IOError: | |
| pass | |
| return wrapper | |
| @ClosedFDRaises | |
| def testErrnoOnClose(self, f): | |
| f.close() | |
| @ClosedFDRaises | |
| def testErrnoOnClosedWrite(self, f): | |
| f.write('a') | |
| @ClosedFDRaises | |
| def testErrnoOnClosedSeek(self, f): | |
| f.seek(0) | |
| @ClosedFDRaises | |
| def testErrnoOnClosedTell(self, f): | |
| f.tell() | |
| @ClosedFDRaises | |
| def testErrnoOnClosedTruncate(self, f): | |
| f.truncate(0) | |
| @ClosedFD | |
| def testErrnoOnClosedSeekable(self, f): | |
| f.seekable() | |
| @ClosedFD | |
| def testErrnoOnClosedReadable(self, f): | |
| f.readable() | |
| @ClosedFD | |
| def testErrnoOnClosedWritable(self, f): | |
| f.writable() | |
| @ClosedFD | |
| def testErrnoOnClosedFileno(self, f): | |
| f.fileno() | |
| @ClosedFD | |
| def testErrnoOnClosedIsatty(self, f): | |
| self.assertEqual(f.isatty(), False) | |
| def ReopenForRead(self): | |
| try: | |
| self.f.close() | |
| except IOError: | |
| pass | |
| self.f = _FileIO(TESTFN, 'r') | |
| os.close(self.f.fileno()) | |
| return self.f | |
| @ClosedFDRaises | |
| def testErrnoOnClosedRead(self, f): | |
| f = self.ReopenForRead() | |
| f.read(1) | |
| @ClosedFDRaises | |
| def testErrnoOnClosedReadall(self, f): | |
| f = self.ReopenForRead() | |
| f.readall() | |
| @ClosedFDRaises | |
| def testErrnoOnClosedReadinto(self, f): | |
| f = self.ReopenForRead() | |
| a = array(b'b', b'x'*10) | |
| f.readinto(a) | |
| class OtherFileTests(unittest.TestCase): | |
| def testAbles(self): | |
| try: | |
| f = _FileIO(TESTFN, "w") | |
| self.assertEqual(f.readable(), False) | |
| self.assertEqual(f.writable(), True) | |
| self.assertEqual(f.seekable(), True) | |
| f.close() | |
| f = _FileIO(TESTFN, "r") | |
| self.assertEqual(f.readable(), True) | |
| self.assertEqual(f.writable(), False) | |
| self.assertEqual(f.seekable(), True) | |
| f.close() | |
| f = _FileIO(TESTFN, "a+") | |
| self.assertEqual(f.readable(), True) | |
| self.assertEqual(f.writable(), True) | |
| self.assertEqual(f.seekable(), True) | |
| self.assertEqual(f.isatty(), False) | |
| f.close() | |
| if sys.platform != "win32": | |
| try: | |
| f = _FileIO("/dev/tty", "a") | |
| except EnvironmentError: | |
| # When run in a cron job there just aren't any | |
| # ttys, so skip the test. This also handles other | |
| # OS'es that don't support /dev/tty. | |
| pass | |
| else: | |
| self.assertEqual(f.readable(), False) | |
| self.assertEqual(f.writable(), True) | |
| if sys.platform != "darwin" and \ | |
| 'bsd' not in sys.platform and \ | |
| not sys.platform.startswith('sunos'): | |
| # Somehow /dev/tty appears seekable on some BSDs | |
| self.assertEqual(f.seekable(), False) | |
| self.assertEqual(f.isatty(), True) | |
| f.close() | |
| finally: | |
| os.unlink(TESTFN) | |
| def testModeStrings(self): | |
| # check invalid mode strings | |
| for mode in ("", "aU", "wU+", "rw", "rt"): | |
| try: | |
| f = _FileIO(TESTFN, mode) | |
| except ValueError: | |
| pass | |
| else: | |
| f.close() | |
| self.fail('%r is an invalid file mode' % mode) | |
| def testUnicodeOpen(self): | |
| # verify repr works for unicode too | |
| f = _FileIO(str(TESTFN), "w") | |
| f.close() | |
| os.unlink(TESTFN) | |
| def testBytesOpen(self): | |
| # Opening a bytes filename | |
| try: | |
| fn = TESTFN.encode("ascii") | |
| except UnicodeEncodeError: | |
| # Skip test | |
| return | |
| f = _FileIO(fn, "w") | |
| try: | |
| f.write(b"abc") | |
| f.close() | |
| with open(TESTFN, "rb") as f: | |
| self.assertEqual(f.read(), b"abc") | |
| finally: | |
| os.unlink(TESTFN) | |
| def testInvalidFd(self): | |
| self.assertRaises(ValueError, _FileIO, -10) | |
| self.assertRaises(OSError, _FileIO, make_bad_fd()) | |
| if sys.platform == 'win32': | |
| import msvcrt | |
| self.assertRaises(IOError, msvcrt.get_osfhandle, make_bad_fd()) | |
| def testBadModeArgument(self): | |
| # verify that we get a sensible error message for bad mode argument | |
| bad_mode = "qwerty" | |
| try: | |
| f = _FileIO(TESTFN, bad_mode) | |
| except ValueError as msg: | |
| if msg.args[0] != 0: | |
| s = str(msg) | |
| if TESTFN in s or bad_mode not in s: | |
| self.fail("bad error message for invalid mode: %s" % s) | |
| # if msg.args[0] == 0, we're probably on Windows where there may be | |
| # no obvious way to discover why open() failed. | |
| else: | |
| f.close() | |
| self.fail("no error for invalid mode: %s" % bad_mode) | |
| def testTruncate(self): | |
| f = _FileIO(TESTFN, 'w') | |
| f.write(bytes(bytearray(range(10)))) | |
| self.assertEqual(f.tell(), 10) | |
| f.truncate(5) | |
| self.assertEqual(f.tell(), 10) | |
| self.assertEqual(f.seek(0, os.SEEK_END), 5) | |
| f.truncate(15) | |
| self.assertEqual(f.tell(), 5) | |
| self.assertEqual(f.seek(0, os.SEEK_END), 15) | |
| f.close() | |
| def testTruncateOnWindows(self): | |
| def bug801631(): | |
| # SF bug <http://www.python.org/sf/801631> | |
| # "file.truncate fault on windows" | |
| f = _FileIO(TESTFN, 'w') | |
| f.write(bytes(range(11))) | |
| f.close() | |
| f = _FileIO(TESTFN,'r+') | |
| data = f.read(5) | |
| if data != bytes(range(5)): | |
| self.fail("Read on file opened for update failed %r" % data) | |
| if f.tell() != 5: | |
| self.fail("File pos after read wrong %d" % f.tell()) | |
| f.truncate() | |
| if f.tell() != 5: | |
| self.fail("File pos after ftruncate wrong %d" % f.tell()) | |
| f.close() | |
| size = os.path.getsize(TESTFN) | |
| if size != 5: | |
| self.fail("File size after ftruncate wrong %d" % size) | |
| try: | |
| bug801631() | |
| finally: | |
| os.unlink(TESTFN) | |
| def testAppend(self): | |
| try: | |
| f = open(TESTFN, 'wb') | |
| f.write(b'spam') | |
| f.close() | |
| f = open(TESTFN, 'ab') | |
| f.write(b'eggs') | |
| f.close() | |
| f = open(TESTFN, 'rb') | |
| d = f.read() | |
| f.close() | |
| self.assertEqual(d, b'spameggs') | |
| finally: | |
| try: | |
| os.unlink(TESTFN) | |
| except: | |
| pass | |
| def testInvalidInit(self): | |
| self.assertRaises(TypeError, _FileIO, "1", 0, 0) | |
| def testWarnings(self): | |
| with check_warnings(quiet=True) as w: | |
| self.assertEqual(w.warnings, []) | |
| self.assertRaises(TypeError, _FileIO, []) | |
| self.assertEqual(w.warnings, []) | |
| self.assertRaises(ValueError, _FileIO, "/some/invalid/name", "rt") | |
| self.assertEqual(w.warnings, []) | |
| def test_surrogates(self): | |
| # Issue #8438: try to open a filename containing surrogates. | |
| # It should either fail because the file doesn't exist or the filename | |
| # can't be represented using the filesystem encoding, but not because | |
| # of a LookupError for the error handler "surrogateescape". | |
| filename = u'\udc80.txt' | |
| try: | |
| with _FileIO(filename): | |
| pass | |
| except (UnicodeEncodeError, IOError): | |
| pass | |
| # Spawn a separate Python process with a different "file system | |
| # default encoding", to exercise this further. | |
| env = dict(os.environ) | |
| env[b'LC_CTYPE'] = b'C' | |
| _, out = run_python('-c', 'import _io; _io.FileIO(%r)' % filename, env=env) | |
| if ('UnicodeEncodeError' not in out and | |
| 'IOError: [Errno 2] No such file or directory' not in out): | |
| self.fail('Bad output: %r' % out) | |
| def test_main(): | |
| # Historically, these tests have been sloppy about removing TESTFN. | |
| # So get rid of it no matter what. | |
| try: | |
| run_unittest(AutoFileTests, OtherFileTests) | |
| finally: | |
| if os.path.exists(TESTFN): | |
| os.unlink(TESTFN) | |
| if __name__ == '__main__': | |
| test_main() |