| # Tests of the full ZIP64 functionality of zipfile | |
| # The test_support.requires call is the only reason for keeping this separate | |
| # from test_zipfile | |
| from test import test_support | |
| # XXX(nnorwitz): disable this test by looking for extra largfile resource | |
| # which doesn't exist. This test takes over 30 minutes to run in general | |
| # and requires more disk space than most of the buildbots. | |
| test_support.requires( | |
| 'extralargefile', | |
| 'test requires loads of disk-space bytes and a long time to run' | |
| ) | |
| # We can test part of the module without zlib. | |
| try: | |
| import zlib | |
| except ImportError: | |
| zlib = None | |
| import zipfile, os, unittest | |
| import time | |
| import sys | |
| from tempfile import TemporaryFile | |
| from test.test_support import TESTFN, run_unittest | |
| TESTFN2 = TESTFN + "2" | |
| # How much time in seconds can pass before we print a 'Still working' message. | |
| _PRINT_WORKING_MSG_INTERVAL = 5 * 60 | |
| class TestsWithSourceFile(unittest.TestCase): | |
| def setUp(self): | |
| # Create test data. | |
| # xrange() is important here -- don't want to create immortal space | |
| # for a million ints. | |
| line_gen = ("Test of zipfile line %d." % i for i in xrange(1000000)) | |
| self.data = '\n'.join(line_gen) | |
| # And write it to a file. | |
| fp = open(TESTFN, "wb") | |
| fp.write(self.data) | |
| fp.close() | |
| def zipTest(self, f, compression): | |
| # Create the ZIP archive. | |
| zipfp = zipfile.ZipFile(f, "w", compression, allowZip64=True) | |
| # It will contain enough copies of self.data to reach about 6GB of | |
| # raw data to store. | |
| filecount = 6*1024**3 // len(self.data) | |
| next_time = time.time() + _PRINT_WORKING_MSG_INTERVAL | |
| for num in range(filecount): | |
| zipfp.writestr("testfn%d" % num, self.data) | |
| # Print still working message since this test can be really slow | |
| if next_time <= time.time(): | |
| next_time = time.time() + _PRINT_WORKING_MSG_INTERVAL | |
| print >>sys.__stdout__, ( | |
| ' zipTest still writing %d of %d, be patient...' % | |
| (num, filecount)) | |
| sys.__stdout__.flush() | |
| zipfp.close() | |
| # Read the ZIP archive | |
| zipfp = zipfile.ZipFile(f, "r", compression) | |
| for num in range(filecount): | |
| self.assertEqual(zipfp.read("testfn%d" % num), self.data) | |
| # Print still working message since this test can be really slow | |
| if next_time <= time.time(): | |
| next_time = time.time() + _PRINT_WORKING_MSG_INTERVAL | |
| print >>sys.__stdout__, ( | |
| ' zipTest still reading %d of %d, be patient...' % | |
| (num, filecount)) | |
| sys.__stdout__.flush() | |
| zipfp.close() | |
| def testStored(self): | |
| # Try the temp file first. If we do TESTFN2 first, then it hogs | |
| # gigabytes of disk space for the duration of the test. | |
| for f in TemporaryFile(), TESTFN2: | |
| self.zipTest(f, zipfile.ZIP_STORED) | |
| if zlib: | |
| def testDeflated(self): | |
| # Try the temp file first. If we do TESTFN2 first, then it hogs | |
| # gigabytes of disk space for the duration of the test. | |
| for f in TemporaryFile(), TESTFN2: | |
| self.zipTest(f, zipfile.ZIP_DEFLATED) | |
| def tearDown(self): | |
| for fname in TESTFN, TESTFN2: | |
| if os.path.exists(fname): | |
| os.remove(fname) | |
| class OtherTests(unittest.TestCase): | |
| def testMoreThan64kFiles(self): | |
| # This test checks that more than 64k files can be added to an archive, | |
| # and that the resulting archive can be read properly by ZipFile | |
| zipf = zipfile.ZipFile(TESTFN, mode="w") | |
| zipf.debug = 100 | |
| numfiles = (1 << 16) * 3/2 | |
| for i in xrange(numfiles): | |
| zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57)) | |
| self.assertEqual(len(zipf.namelist()), numfiles) | |
| zipf.close() | |
| zipf2 = zipfile.ZipFile(TESTFN, mode="r") | |
| self.assertEqual(len(zipf2.namelist()), numfiles) | |
| for i in xrange(numfiles): | |
| self.assertEqual(zipf2.read("foo%08d" % i), "%d" % (i**3 % 57)) | |
| zipf.close() | |
| def tearDown(self): | |
| test_support.unlink(TESTFN) | |
| test_support.unlink(TESTFN2) | |
| def test_main(): | |
| run_unittest(TestsWithSourceFile, OtherTests) | |
| if __name__ == "__main__": | |
| test_main() |