| """Supporting definitions for the Python regression tests.""" | |
| if __name__ != 'test.test_support': | |
| raise ImportError('test_support must be imported from the test package') | |
| import contextlib | |
| import errno | |
| import functools | |
| import gc | |
| import socket | |
| import sys | |
| import os | |
| import platform | |
| import shutil | |
| import warnings | |
| import unittest | |
| import importlib | |
| import UserDict | |
| import re | |
| import time | |
| try: | |
| import thread | |
| except ImportError: | |
| thread = None | |
| __all__ = ["Error", "TestFailed", "ResourceDenied", "import_module", | |
| "verbose", "use_resources", "max_memuse", "record_original_stdout", | |
| "get_original_stdout", "unload", "unlink", "rmtree", "forget", | |
| "is_resource_enabled", "requires", "find_unused_port", "bind_port", | |
| "fcmp", "have_unicode", "is_jython", "TESTFN", "HOST", "FUZZ", | |
| "SAVEDCWD", "temp_cwd", "findfile", "sortdict", "check_syntax_error", | |
| "open_urlresource", "check_warnings", "check_py3k_warnings", | |
| "CleanImport", "EnvironmentVarGuard", "captured_output", | |
| "captured_stdout", "TransientResource", "transient_internet", | |
| "run_with_locale", "set_memlimit", "bigmemtest", "bigaddrspacetest", | |
| "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup", | |
| "threading_cleanup", "reap_children", "cpython_only", | |
| "check_impl_detail", "get_attribute", "py3k_bytes", | |
| "import_fresh_module"] | |
| class Error(Exception): | |
| """Base class for regression test exceptions.""" | |
| class TestFailed(Error): | |
| """Test failed.""" | |
| class ResourceDenied(unittest.SkipTest): | |
| """Test skipped because it requested a disallowed resource. | |
| This is raised when a test calls requires() for a resource that | |
| has not been enabled. It is used to distinguish between expected | |
| and unexpected skips. | |
| """ | |
| @contextlib.contextmanager | |
| def _ignore_deprecated_imports(ignore=True): | |
| """Context manager to suppress package and module deprecation | |
| warnings when importing them. | |
| If ignore is False, this context manager has no effect.""" | |
| if ignore: | |
| with warnings.catch_warnings(): | |
| warnings.filterwarnings("ignore", ".+ (module|package)", | |
| DeprecationWarning) | |
| yield | |
| else: | |
| yield | |
| def import_module(name, deprecated=False): | |
| """Import and return the module to be tested, raising SkipTest if | |
| it is not available. | |
| If deprecated is True, any module or package deprecation messages | |
| will be suppressed.""" | |
| with _ignore_deprecated_imports(deprecated): | |
| try: | |
| return importlib.import_module(name) | |
| except ImportError, msg: | |
| raise unittest.SkipTest(str(msg)) | |
| def _save_and_remove_module(name, orig_modules): | |
| """Helper function to save and remove a module from sys.modules | |
| Raise ImportError if the module can't be imported.""" | |
| # try to import the module and raise an error if it can't be imported | |
| if name not in sys.modules: | |
| __import__(name) | |
| del sys.modules[name] | |
| for modname in list(sys.modules): | |
| if modname == name or modname.startswith(name + '.'): | |
| orig_modules[modname] = sys.modules[modname] | |
| del sys.modules[modname] | |
| def _save_and_block_module(name, orig_modules): | |
| """Helper function to save and block a module in sys.modules | |
| Return True if the module was in sys.modules, False otherwise.""" | |
| saved = True | |
| try: | |
| orig_modules[name] = sys.modules[name] | |
| except KeyError: | |
| saved = False | |
| sys.modules[name] = None | |
| return saved | |
| def import_fresh_module(name, fresh=(), blocked=(), deprecated=False): | |
| """Imports and returns a module, deliberately bypassing the sys.modules cache | |
| and importing a fresh copy of the module. Once the import is complete, | |
| the sys.modules cache is restored to its original state. | |
| Modules named in fresh are also imported anew if needed by the import. | |
| If one of these modules can't be imported, None is returned. | |
| Importing of modules named in blocked is prevented while the fresh import | |
| takes place. | |
| If deprecated is True, any module or package deprecation messages | |
| will be suppressed.""" | |
| # NOTE: test_heapq, test_json, and test_warnings include extra sanity | |
| # checks to make sure that this utility function is working as expected | |
| with _ignore_deprecated_imports(deprecated): | |
| # Keep track of modules saved for later restoration as well | |
| # as those which just need a blocking entry removed | |
| orig_modules = {} | |
| names_to_remove = [] | |
| _save_and_remove_module(name, orig_modules) | |
| try: | |
| for fresh_name in fresh: | |
| _save_and_remove_module(fresh_name, orig_modules) | |
| for blocked_name in blocked: | |
| if not _save_and_block_module(blocked_name, orig_modules): | |
| names_to_remove.append(blocked_name) | |
| fresh_module = importlib.import_module(name) | |
| except ImportError: | |
| fresh_module = None | |
| finally: | |
| for orig_name, module in orig_modules.items(): | |
| sys.modules[orig_name] = module | |
| for name_to_remove in names_to_remove: | |
| del sys.modules[name_to_remove] | |
| return fresh_module | |
| def get_attribute(obj, name): | |
| """Get an attribute, raising SkipTest if AttributeError is raised.""" | |
| try: | |
| attribute = getattr(obj, name) | |
| except AttributeError: | |
| raise unittest.SkipTest("module %s has no attribute %s" % ( | |
| obj.__name__, name)) | |
| else: | |
| return attribute | |
| verbose = 1 # Flag set to 0 by regrtest.py | |
| use_resources = None # Flag set to [] by regrtest.py | |
| max_memuse = 0 # Disable bigmem tests (they will still be run with | |
| # small sizes, to make sure they work.) | |
| real_max_memuse = 0 | |
| # _original_stdout is meant to hold stdout at the time regrtest began. | |
| # This may be "the real" stdout, or IDLE's emulation of stdout, or whatever. | |
| # The point is to have some flavor of stdout the user can actually see. | |
| _original_stdout = None | |
| def record_original_stdout(stdout): | |
| global _original_stdout | |
| _original_stdout = stdout | |
| def get_original_stdout(): | |
| return _original_stdout or sys.stdout | |
| def unload(name): | |
| try: | |
| del sys.modules[name] | |
| except KeyError: | |
| pass | |
| def unlink(filename): | |
| try: | |
| os.unlink(filename) | |
| except OSError: | |
| pass | |
| def rmtree(path): | |
| try: | |
| shutil.rmtree(path) | |
| except OSError, e: | |
| # Unix returns ENOENT, Windows returns ESRCH. | |
| if e.errno not in (errno.ENOENT, errno.ESRCH): | |
| raise | |
| def forget(modname): | |
| '''"Forget" a module was ever imported by removing it from sys.modules and | |
| deleting any .pyc and .pyo files.''' | |
| unload(modname) | |
| for dirname in sys.path: | |
| unlink(os.path.join(dirname, modname + os.extsep + 'pyc')) | |
| # Deleting the .pyo file cannot be within the 'try' for the .pyc since | |
| # the chance exists that there is no .pyc (and thus the 'try' statement | |
| # is exited) but there is a .pyo file. | |
| unlink(os.path.join(dirname, modname + os.extsep + 'pyo')) | |
| def is_resource_enabled(resource): | |
| """Test whether a resource is enabled. Known resources are set by | |
| regrtest.py.""" | |
| return use_resources is not None and resource in use_resources | |
| def requires(resource, msg=None): | |
| """Raise ResourceDenied if the specified resource is not available. | |
| If the caller's module is __main__ then automatically return True. The | |
| possibility of False being returned occurs when regrtest.py is executing.""" | |
| # see if the caller's module is __main__ - if so, treat as if | |
| # the resource was set | |
| if sys._getframe(1).f_globals.get("__name__") == "__main__": | |
| return | |
| if not is_resource_enabled(resource): | |
| if msg is None: | |
| msg = "Use of the `%s' resource not enabled" % resource | |
| raise ResourceDenied(msg) | |
| HOST = 'localhost' | |
| def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM): | |
| """Returns an unused port that should be suitable for binding. This is | |
| achieved by creating a temporary socket with the same family and type as | |
| the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to | |
| the specified host address (defaults to 0.0.0.0) with the port set to 0, | |
| eliciting an unused ephemeral port from the OS. The temporary socket is | |
| then closed and deleted, and the ephemeral port is returned. | |
| Either this method or bind_port() should be used for any tests where a | |
| server socket needs to be bound to a particular port for the duration of | |
| the test. Which one to use depends on whether the calling code is creating | |
| a python socket, or if an unused port needs to be provided in a constructor | |
| or passed to an external program (i.e. the -accept argument to openssl's | |
| s_server mode). Always prefer bind_port() over find_unused_port() where | |
| possible. Hard coded ports should *NEVER* be used. As soon as a server | |
| socket is bound to a hard coded port, the ability to run multiple instances | |
| of the test simultaneously on the same host is compromised, which makes the | |
| test a ticking time bomb in a buildbot environment. On Unix buildbots, this | |
| may simply manifest as a failed test, which can be recovered from without | |
| intervention in most cases, but on Windows, the entire python process can | |
| completely and utterly wedge, requiring someone to log in to the buildbot | |
| and manually kill the affected process. | |
| (This is easy to reproduce on Windows, unfortunately, and can be traced to | |
| the SO_REUSEADDR socket option having different semantics on Windows versus | |
| Unix/Linux. On Unix, you can't have two AF_INET SOCK_STREAM sockets bind, | |
| listen and then accept connections on identical host/ports. An EADDRINUSE | |
| socket.error will be raised at some point (depending on the platform and | |
| the order bind and listen were called on each socket). | |
| However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE | |
| will ever be raised when attempting to bind two identical host/ports. When | |
| accept() is called on each socket, the second caller's process will steal | |
| the port from the first caller, leaving them both in an awkwardly wedged | |
| state where they'll no longer respond to any signals or graceful kills, and | |
| must be forcibly killed via OpenProcess()/TerminateProcess(). | |
| The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option | |
| instead of SO_REUSEADDR, which effectively affords the same semantics as | |
| SO_REUSEADDR on Unix. Given the propensity of Unix developers in the Open | |
| Source world compared to Windows ones, this is a common mistake. A quick | |
| look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when | |
| openssl.exe is called with the 's_server' option, for example. See | |
| http://bugs.python.org/issue2550 for more info. The following site also | |
| has a very thorough description about the implications of both REUSEADDR | |
| and EXCLUSIVEADDRUSE on Windows: | |
| http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx) | |
| XXX: although this approach is a vast improvement on previous attempts to | |
| elicit unused ports, it rests heavily on the assumption that the ephemeral | |
| port returned to us by the OS won't immediately be dished back out to some | |
| other process when we close and delete our temporary socket but before our | |
| calling code has a chance to bind the returned port. We can deal with this | |
| issue if/when we come across it.""" | |
| tempsock = socket.socket(family, socktype) | |
| port = bind_port(tempsock) | |
| tempsock.close() | |
| del tempsock | |
| return port | |
| def bind_port(sock, host=HOST): | |
| """Bind the socket to a free port and return the port number. Relies on | |
| ephemeral ports in order to ensure we are using an unbound port. This is | |
| important as many tests may be running simultaneously, especially in a | |
| buildbot environment. This method raises an exception if the sock.family | |
| is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR | |
| or SO_REUSEPORT set on it. Tests should *never* set these socket options | |
| for TCP/IP sockets. The only case for setting these options is testing | |
| multicasting via multiple UDP sockets. | |
| Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e. | |
| on Windows), it will be set on the socket. This will prevent anyone else | |
| from bind()'ing to our host/port for the duration of the test. | |
| """ | |
| if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM: | |
| if hasattr(socket, 'SO_REUSEADDR'): | |
| if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1: | |
| raise TestFailed("tests should never set the SO_REUSEADDR " \ | |
| "socket option on TCP/IP sockets!") | |
| if hasattr(socket, 'SO_REUSEPORT'): | |
| if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1: | |
| raise TestFailed("tests should never set the SO_REUSEPORT " \ | |
| "socket option on TCP/IP sockets!") | |
| if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'): | |
| sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1) | |
| sock.bind((host, 0)) | |
| port = sock.getsockname()[1] | |
| return port | |
| FUZZ = 1e-6 | |
| def fcmp(x, y): # fuzzy comparison function | |
| if isinstance(x, float) or isinstance(y, float): | |
| try: | |
| fuzz = (abs(x) + abs(y)) * FUZZ | |
| if abs(x-y) <= fuzz: | |
| return 0 | |
| except: | |
| pass | |
| elif type(x) == type(y) and isinstance(x, (tuple, list)): | |
| for i in range(min(len(x), len(y))): | |
| outcome = fcmp(x[i], y[i]) | |
| if outcome != 0: | |
| return outcome | |
| return (len(x) > len(y)) - (len(x) < len(y)) | |
| return (x > y) - (x < y) | |
| try: | |
| unicode | |
| have_unicode = True | |
| except NameError: | |
| have_unicode = False | |
| is_jython = sys.platform.startswith('java') | |
| # Filename used for testing | |
| if os.name == 'java': | |
| # Jython disallows @ in module names | |
| TESTFN = '$test' | |
| elif os.name == 'riscos': | |
| TESTFN = 'testfile' | |
| else: | |
| TESTFN = '@test' | |
| # Unicode name only used if TEST_FN_ENCODING exists for the platform. | |
| if have_unicode: | |
| # Assuming sys.getfilesystemencoding()!=sys.getdefaultencoding() | |
| # TESTFN_UNICODE is a filename that can be encoded using the | |
| # file system encoding, but *not* with the default (ascii) encoding | |
| if isinstance('', unicode): | |
| # python -U | |
| # XXX perhaps unicode() should accept Unicode strings? | |
| TESTFN_UNICODE = "@test-\xe0\xf2" | |
| else: | |
| # 2 latin characters. | |
| TESTFN_UNICODE = unicode("@test-\xe0\xf2", "latin-1") | |
| TESTFN_ENCODING = sys.getfilesystemencoding() | |
| # TESTFN_UNENCODABLE is a filename that should *not* be | |
| # able to be encoded by *either* the default or filesystem encoding. | |
| # This test really only makes sense on Windows NT platforms | |
| # which have special Unicode support in posixmodule. | |
| if (not hasattr(sys, "getwindowsversion") or | |
| sys.getwindowsversion()[3] < 2): # 0=win32s or 1=9x/ME | |
| TESTFN_UNENCODABLE = None | |
| else: | |
| # Japanese characters (I think - from bug 846133) | |
| TESTFN_UNENCODABLE = eval('u"@test-\u5171\u6709\u3055\u308c\u308b"') | |
| try: | |
| # XXX - Note - should be using TESTFN_ENCODING here - but for | |
| # Windows, "mbcs" currently always operates as if in | |
| # errors=ignore' mode - hence we get '?' characters rather than | |
| # the exception. 'Latin1' operates as we expect - ie, fails. | |
| # See [ 850997 ] mbcs encoding ignores errors | |
| TESTFN_UNENCODABLE.encode("Latin1") | |
| except UnicodeEncodeError: | |
| pass | |
| else: | |
| print \ | |
| 'WARNING: The filename %r CAN be encoded by the filesystem. ' \ | |
| 'Unicode filename tests may not be effective' \ | |
| % TESTFN_UNENCODABLE | |
| # Disambiguate TESTFN for parallel testing, while letting it remain a valid | |
| # module name. | |
| TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid()) | |
| # Save the initial cwd | |
| SAVEDCWD = os.getcwd() | |
| @contextlib.contextmanager | |
| def temp_cwd(name='tempcwd', quiet=False): | |
| """ | |
| Context manager that creates a temporary directory and set it as CWD. | |
| The new CWD is created in the current directory and it's named *name*. | |
| If *quiet* is False (default) and it's not possible to create or change | |
| the CWD, an error is raised. If it's True, only a warning is raised | |
| and the original CWD is used. | |
| """ | |
| if isinstance(name, unicode): | |
| try: | |
| name = name.encode(sys.getfilesystemencoding() or 'ascii') | |
| except UnicodeEncodeError: | |
| if not quiet: | |
| raise unittest.SkipTest('unable to encode the cwd name with ' | |
| 'the filesystem encoding.') | |
| saved_dir = os.getcwd() | |
| is_temporary = False | |
| try: | |
| os.mkdir(name) | |
| os.chdir(name) | |
| is_temporary = True | |
| except OSError: | |
| if not quiet: | |
| raise | |
| warnings.warn('tests may fail, unable to change the CWD to ' + name, | |
| RuntimeWarning, stacklevel=3) | |
| try: | |
| yield os.getcwd() | |
| finally: | |
| os.chdir(saved_dir) | |
| if is_temporary: | |
| rmtree(name) | |
| def findfile(file, here=__file__, subdir=None): | |
| """Try to find a file on sys.path and the working directory. If it is not | |
| found the argument passed to the function is returned (this does not | |
| necessarily signal failure; could still be the legitimate path).""" | |
| if os.path.isabs(file): | |
| return file | |
| if subdir is not None: | |
| file = os.path.join(subdir, file) | |
| path = sys.path | |
| path = [os.path.dirname(here)] + path | |
| for dn in path: | |
| fn = os.path.join(dn, file) | |
| if os.path.exists(fn): return fn | |
| return file | |
| def sortdict(dict): | |
| "Like repr(dict), but in sorted order." | |
| items = dict.items() | |
| items.sort() | |
| reprpairs = ["%r: %r" % pair for pair in items] | |
| withcommas = ", ".join(reprpairs) | |
| return "{%s}" % withcommas | |
| def make_bad_fd(): | |
| """ | |
| Create an invalid file descriptor by opening and closing a file and return | |
| its fd. | |
| """ | |
| file = open(TESTFN, "wb") | |
| try: | |
| return file.fileno() | |
| finally: | |
| file.close() | |
| unlink(TESTFN) | |
| def check_syntax_error(testcase, statement): | |
| testcase.assertRaises(SyntaxError, compile, statement, | |
| '<test string>', 'exec') | |
| def open_urlresource(url, check=None): | |
| import urlparse, urllib2 | |
| filename = urlparse.urlparse(url)[2].split('/')[-1] # '/': it's URL! | |
| fn = os.path.join(os.path.dirname(__file__), "data", filename) | |
| def check_valid_file(fn): | |
| f = open(fn) | |
| if check is None: | |
| return f | |
| elif check(f): | |
| f.seek(0) | |
| return f | |
| f.close() | |
| if os.path.exists(fn): | |
| f = check_valid_file(fn) | |
| if f is not None: | |
| return f | |
| unlink(fn) | |
| # Verify the requirement before downloading the file | |
| requires('urlfetch') | |
| print >> get_original_stdout(), '\tfetching %s ...' % url | |
| f = urllib2.urlopen(url, timeout=15) | |
| try: | |
| with open(fn, "wb") as out: | |
| s = f.read() | |
| while s: | |
| out.write(s) | |
| s = f.read() | |
| finally: | |
| f.close() | |
| f = check_valid_file(fn) | |
| if f is not None: | |
| return f | |
| raise TestFailed('invalid resource "%s"' % fn) | |
| class WarningsRecorder(object): | |
| """Convenience wrapper for the warnings list returned on | |
| entry to the warnings.catch_warnings() context manager. | |
| """ | |
| def __init__(self, warnings_list): | |
| self._warnings = warnings_list | |
| self._last = 0 | |
| def __getattr__(self, attr): | |
| if len(self._warnings) > self._last: | |
| return getattr(self._warnings[-1], attr) | |
| elif attr in warnings.WarningMessage._WARNING_DETAILS: | |
| return None | |
| raise AttributeError("%r has no attribute %r" % (self, attr)) | |
| @property | |
| def warnings(self): | |
| return self._warnings[self._last:] | |
| def reset(self): | |
| self._last = len(self._warnings) | |
| def _filterwarnings(filters, quiet=False): | |
| """Catch the warnings, then check if all the expected | |
| warnings have been raised and re-raise unexpected warnings. | |
| If 'quiet' is True, only re-raise the unexpected warnings. | |
| """ | |
| # Clear the warning registry of the calling module | |
| # in order to re-raise the warnings. | |
| frame = sys._getframe(2) | |
| registry = frame.f_globals.get('__warningregistry__') | |
| if registry: | |
| registry.clear() | |
| with warnings.catch_warnings(record=True) as w: | |
| # Set filter "always" to record all warnings. Because | |
| # test_warnings swap the module, we need to look up in | |
| # the sys.modules dictionary. | |
| sys.modules['warnings'].simplefilter("always") | |
| yield WarningsRecorder(w) | |
| # Filter the recorded warnings | |
| reraise = [warning.message for warning in w] | |
| missing = [] | |
| for msg, cat in filters: | |
| seen = False | |
| for exc in reraise[:]: | |
| message = str(exc) | |
| # Filter out the matching messages | |
| if (re.match(msg, message, re.I) and | |
| issubclass(exc.__class__, cat)): | |
| seen = True | |
| reraise.remove(exc) | |
| if not seen and not quiet: | |
| # This filter caught nothing | |
| missing.append((msg, cat.__name__)) | |
| if reraise: | |
| raise AssertionError("unhandled warning %r" % reraise[0]) | |
| if missing: | |
| raise AssertionError("filter (%r, %s) did not catch any warning" % | |
| missing[0]) | |
| @contextlib.contextmanager | |
| def check_warnings(*filters, **kwargs): | |
| """Context manager to silence warnings. | |
| Accept 2-tuples as positional arguments: | |
| ("message regexp", WarningCategory) | |
| Optional argument: | |
| - if 'quiet' is True, it does not fail if a filter catches nothing | |
| (default True without argument, | |
| default False if some filters are defined) | |
| Without argument, it defaults to: | |
| check_warnings(("", Warning), quiet=True) | |
| """ | |
| quiet = kwargs.get('quiet') | |
| if not filters: | |
| filters = (("", Warning),) | |
| # Preserve backward compatibility | |
| if quiet is None: | |
| quiet = True | |
| return _filterwarnings(filters, quiet) | |
| @contextlib.contextmanager | |
| def check_py3k_warnings(*filters, **kwargs): | |
| """Context manager to silence py3k warnings. | |
| Accept 2-tuples as positional arguments: | |
| ("message regexp", WarningCategory) | |
| Optional argument: | |
| - if 'quiet' is True, it does not fail if a filter catches nothing | |
| (default False) | |
| Without argument, it defaults to: | |
| check_py3k_warnings(("", DeprecationWarning), quiet=False) | |
| """ | |
| if sys.py3kwarning: | |
| if not filters: | |
| filters = (("", DeprecationWarning),) | |
| else: | |
| # It should not raise any py3k warning | |
| filters = () | |
| return _filterwarnings(filters, kwargs.get('quiet')) | |
| class CleanImport(object): | |
| """Context manager to force import to return a new module reference. | |
| This is useful for testing module-level behaviours, such as | |
| the emission of a DeprecationWarning on import. | |
| Use like this: | |
| with CleanImport("foo"): | |
| importlib.import_module("foo") # new reference | |
| """ | |
| def __init__(self, *module_names): | |
| self.original_modules = sys.modules.copy() | |
| for module_name in module_names: | |
| if module_name in sys.modules: | |
| module = sys.modules[module_name] | |
| # It is possible that module_name is just an alias for | |
| # another module (e.g. stub for modules renamed in 3.x). | |
| # In that case, we also need delete the real module to clear | |
| # the import cache. | |
| if module.__name__ != module_name: | |
| del sys.modules[module.__name__] | |
| del sys.modules[module_name] | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, *ignore_exc): | |
| sys.modules.update(self.original_modules) | |
| class EnvironmentVarGuard(UserDict.DictMixin): | |
| """Class to help protect the environment variable properly. Can be used as | |
| a context manager.""" | |
| def __init__(self): | |
| self._environ = os.environ | |
| self._changed = {} | |
| def __getitem__(self, envvar): | |
| return self._environ[envvar] | |
| def __setitem__(self, envvar, value): | |
| # Remember the initial value on the first access | |
| if envvar not in self._changed: | |
| self._changed[envvar] = self._environ.get(envvar) | |
| self._environ[envvar] = value | |
| def __delitem__(self, envvar): | |
| # Remember the initial value on the first access | |
| if envvar not in self._changed: | |
| self._changed[envvar] = self._environ.get(envvar) | |
| if envvar in self._environ: | |
| del self._environ[envvar] | |
| def keys(self): | |
| return self._environ.keys() | |
| def set(self, envvar, value): | |
| self[envvar] = value | |
| def unset(self, envvar): | |
| del self[envvar] | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, *ignore_exc): | |
| for (k, v) in self._changed.items(): | |
| if v is None: | |
| if k in self._environ: | |
| del self._environ[k] | |
| else: | |
| self._environ[k] = v | |
| os.environ = self._environ | |
| class DirsOnSysPath(object): | |
| """Context manager to temporarily add directories to sys.path. | |
| This makes a copy of sys.path, appends any directories given | |
| as positional arguments, then reverts sys.path to the copied | |
| settings when the context ends. | |
| Note that *all* sys.path modifications in the body of the | |
| context manager, including replacement of the object, | |
| will be reverted at the end of the block. | |
| """ | |
| def __init__(self, *paths): | |
| self.original_value = sys.path[:] | |
| self.original_object = sys.path | |
| sys.path.extend(paths) | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, *ignore_exc): | |
| sys.path = self.original_object | |
| sys.path[:] = self.original_value | |
| class TransientResource(object): | |
| """Raise ResourceDenied if an exception is raised while the context manager | |
| is in effect that matches the specified exception and attributes.""" | |
| def __init__(self, exc, **kwargs): | |
| self.exc = exc | |
| self.attrs = kwargs | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, type_=None, value=None, traceback=None): | |
| """If type_ is a subclass of self.exc and value has attributes matching | |
| self.attrs, raise ResourceDenied. Otherwise let the exception | |
| propagate (if any).""" | |
| if type_ is not None and issubclass(self.exc, type_): | |
| for attr, attr_value in self.attrs.iteritems(): | |
| if not hasattr(value, attr): | |
| break | |
| if getattr(value, attr) != attr_value: | |
| break | |
| else: | |
| raise ResourceDenied("an optional resource is not available") | |
| @contextlib.contextmanager | |
| def transient_internet(resource_name, timeout=30.0, errnos=()): | |
| """Return a context manager that raises ResourceDenied when various issues | |
| with the Internet connection manifest themselves as exceptions.""" | |
| default_errnos = [ | |
| ('ECONNREFUSED', 111), | |
| ('ECONNRESET', 104), | |
| ('EHOSTUNREACH', 113), | |
| ('ENETUNREACH', 101), | |
| ('ETIMEDOUT', 110), | |
| ] | |
| default_gai_errnos = [ | |
| ('EAI_NONAME', -2), | |
| ('EAI_NODATA', -5), | |
| ] | |
| denied = ResourceDenied("Resource '%s' is not available" % resource_name) | |
| captured_errnos = errnos | |
| gai_errnos = [] | |
| if not captured_errnos: | |
| captured_errnos = [getattr(errno, name, num) | |
| for (name, num) in default_errnos] | |
| gai_errnos = [getattr(socket, name, num) | |
| for (name, num) in default_gai_errnos] | |
| def filter_error(err): | |
| n = getattr(err, 'errno', None) | |
| if (isinstance(err, socket.timeout) or | |
| (isinstance(err, socket.gaierror) and n in gai_errnos) or | |
| n in captured_errnos): | |
| if not verbose: | |
| sys.stderr.write(denied.args[0] + "\n") | |
| raise denied | |
| old_timeout = socket.getdefaulttimeout() | |
| try: | |
| if timeout is not None: | |
| socket.setdefaulttimeout(timeout) | |
| yield | |
| except IOError as err: | |
| # urllib can wrap original socket errors multiple times (!), we must | |
| # unwrap to get at the original error. | |
| while True: | |
| a = err.args | |
| if len(a) >= 1 and isinstance(a[0], IOError): | |
| err = a[0] | |
| # The error can also be wrapped as args[1]: | |
| # except socket.error as msg: | |
| # raise IOError('socket error', msg).with_traceback(sys.exc_info()[2]) | |
| elif len(a) >= 2 and isinstance(a[1], IOError): | |
| err = a[1] | |
| else: | |
| break | |
| filter_error(err) | |
| raise | |
| # XXX should we catch generic exceptions and look for their | |
| # __cause__ or __context__? | |
| finally: | |
| socket.setdefaulttimeout(old_timeout) | |
| @contextlib.contextmanager | |
| def captured_output(stream_name): | |
| """Return a context manager used by captured_stdout and captured_stdin | |
| that temporarily replaces the sys stream *stream_name* with a StringIO.""" | |
| import StringIO | |
| orig_stdout = getattr(sys, stream_name) | |
| setattr(sys, stream_name, StringIO.StringIO()) | |
| try: | |
| yield getattr(sys, stream_name) | |
| finally: | |
| setattr(sys, stream_name, orig_stdout) | |
| def captured_stdout(): | |
| """Capture the output of sys.stdout: | |
| with captured_stdout() as s: | |
| print "hello" | |
| self.assertEqual(s.getvalue(), "hello") | |
| """ | |
| return captured_output("stdout") | |
| def captured_stdin(): | |
| return captured_output("stdin") | |
| def gc_collect(): | |
| """Force as many objects as possible to be collected. | |
| In non-CPython implementations of Python, this is needed because timely | |
| deallocation is not guaranteed by the garbage collector. (Even in CPython | |
| this can be the case in case of reference cycles.) This means that __del__ | |
| methods may be called later than expected and weakrefs may remain alive for | |
| longer than expected. This function tries its best to force all garbage | |
| objects to disappear. | |
| """ | |
| gc.collect() | |
| if is_jython: | |
| time.sleep(0.1) | |
| gc.collect() | |
| gc.collect() | |
| #======================================================================= | |
| # Decorator for running a function in a different locale, correctly resetting | |
| # it afterwards. | |
| def run_with_locale(catstr, *locales): | |
| def decorator(func): | |
| def inner(*args, **kwds): | |
| try: | |
| import locale | |
| category = getattr(locale, catstr) | |
| orig_locale = locale.setlocale(category) | |
| except AttributeError: | |
| # if the test author gives us an invalid category string | |
| raise | |
| except: | |
| # cannot retrieve original locale, so do nothing | |
| locale = orig_locale = None | |
| else: | |
| for loc in locales: | |
| try: | |
| locale.setlocale(category, loc) | |
| break | |
| except: | |
| pass | |
| # now run the function, resetting the locale on exceptions | |
| try: | |
| return func(*args, **kwds) | |
| finally: | |
| if locale and orig_locale: | |
| locale.setlocale(category, orig_locale) | |
| inner.func_name = func.func_name | |
| inner.__doc__ = func.__doc__ | |
| return inner | |
| return decorator | |
| #======================================================================= | |
| # Big-memory-test support. Separate from 'resources' because memory use should be configurable. | |
| # Some handy shorthands. Note that these are used for byte-limits as well | |
| # as size-limits, in the various bigmem tests | |
| _1M = 1024*1024 | |
| _1G = 1024 * _1M | |
| _2G = 2 * _1G | |
| _4G = 4 * _1G | |
| MAX_Py_ssize_t = sys.maxsize | |
| def set_memlimit(limit): | |
| global max_memuse | |
| global real_max_memuse | |
| sizes = { | |
| 'k': 1024, | |
| 'm': _1M, | |
| 'g': _1G, | |
| 't': 1024*_1G, | |
| } | |
| m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit, | |
| re.IGNORECASE | re.VERBOSE) | |
| if m is None: | |
| raise ValueError('Invalid memory limit %r' % (limit,)) | |
| memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()]) | |
| real_max_memuse = memlimit | |
| if memlimit > MAX_Py_ssize_t: | |
| memlimit = MAX_Py_ssize_t | |
| if memlimit < _2G - 1: | |
| raise ValueError('Memory limit %r too low to be useful' % (limit,)) | |
| max_memuse = memlimit | |
| def bigmemtest(minsize, memuse, overhead=5*_1M): | |
| """Decorator for bigmem tests. | |
| 'minsize' is the minimum useful size for the test (in arbitrary, | |
| test-interpreted units.) 'memuse' is the number of 'bytes per size' for | |
| the test, or a good estimate of it. 'overhead' specifies fixed overhead, | |
| independent of the testsize, and defaults to 5Mb. | |
| The decorator tries to guess a good value for 'size' and passes it to | |
| the decorated test function. If minsize * memuse is more than the | |
| allowed memory use (as defined by max_memuse), the test is skipped. | |
| Otherwise, minsize is adjusted upward to use up to max_memuse. | |
| """ | |
| def decorator(f): | |
| def wrapper(self): | |
| if not max_memuse: | |
| # If max_memuse is 0 (the default), | |
| # we still want to run the tests with size set to a few kb, | |
| # to make sure they work. We still want to avoid using | |
| # too much memory, though, but we do that noisily. | |
| maxsize = 5147 | |
| self.assertFalse(maxsize * memuse + overhead > 20 * _1M) | |
| else: | |
| maxsize = int((max_memuse - overhead) / memuse) | |
| if maxsize < minsize: | |
| # Really ought to print 'test skipped' or something | |
| if verbose: | |
| sys.stderr.write("Skipping %s because of memory " | |
| "constraint\n" % (f.__name__,)) | |
| return | |
| # Try to keep some breathing room in memory use | |
| maxsize = max(maxsize - 50 * _1M, minsize) | |
| return f(self, maxsize) | |
| wrapper.minsize = minsize | |
| wrapper.memuse = memuse | |
| wrapper.overhead = overhead | |
| return wrapper | |
| return decorator | |
| def precisionbigmemtest(size, memuse, overhead=5*_1M): | |
| def decorator(f): | |
| def wrapper(self): | |
| if not real_max_memuse: | |
| maxsize = 5147 | |
| else: | |
| maxsize = size | |
| if real_max_memuse and real_max_memuse < maxsize * memuse: | |
| if verbose: | |
| sys.stderr.write("Skipping %s because of memory " | |
| "constraint\n" % (f.__name__,)) | |
| return | |
| return f(self, maxsize) | |
| wrapper.size = size | |
| wrapper.memuse = memuse | |
| wrapper.overhead = overhead | |
| return wrapper | |
| return decorator | |
| def bigaddrspacetest(f): | |
| """Decorator for tests that fill the address space.""" | |
| def wrapper(self): | |
| if max_memuse < MAX_Py_ssize_t: | |
| if verbose: | |
| sys.stderr.write("Skipping %s because of memory " | |
| "constraint\n" % (f.__name__,)) | |
| else: | |
| return f(self) | |
| return wrapper | |
| #======================================================================= | |
| # unittest integration. | |
| class BasicTestRunner: | |
| def run(self, test): | |
| result = unittest.TestResult() | |
| test(result) | |
| return result | |
| def _id(obj): | |
| return obj | |
| def requires_resource(resource): | |
| if is_resource_enabled(resource): | |
| return _id | |
| else: | |
| return unittest.skip("resource {0!r} is not enabled".format(resource)) | |
| def cpython_only(test): | |
| """ | |
| Decorator for tests only applicable on CPython. | |
| """ | |
| return impl_detail(cpython=True)(test) | |
| def impl_detail(msg=None, **guards): | |
| if check_impl_detail(**guards): | |
| return _id | |
| if msg is None: | |
| guardnames, default = _parse_guards(guards) | |
| if default: | |
| msg = "implementation detail not available on {0}" | |
| else: | |
| msg = "implementation detail specific to {0}" | |
| guardnames = sorted(guardnames.keys()) | |
| msg = msg.format(' or '.join(guardnames)) | |
| return unittest.skip(msg) | |
| def _parse_guards(guards): | |
| # Returns a tuple ({platform_name: run_me}, default_value) | |
| if not guards: | |
| return ({'cpython': True}, False) | |
| is_true = guards.values()[0] | |
| assert guards.values() == [is_true] * len(guards) # all True or all False | |
| return (guards, not is_true) | |
| # Use the following check to guard CPython's implementation-specific tests -- | |
| # or to run them only on the implementation(s) guarded by the arguments. | |
| def check_impl_detail(**guards): | |
| """This function returns True or False depending on the host platform. | |
| Examples: | |
| if check_impl_detail(): # only on CPython (default) | |
| if check_impl_detail(jython=True): # only on Jython | |
| if check_impl_detail(cpython=False): # everywhere except on CPython | |
| """ | |
| guards, default = _parse_guards(guards) | |
| return guards.get(platform.python_implementation().lower(), default) | |
| def _run_suite(suite): | |
| """Run tests from a unittest.TestSuite-derived class.""" | |
| if verbose: | |
| runner = unittest.TextTestRunner(sys.stdout, verbosity=2) | |
| else: | |
| runner = BasicTestRunner() | |
| result = runner.run(suite) | |
| if not result.wasSuccessful(): | |
| if len(result.errors) == 1 and not result.failures: | |
| err = result.errors[0][1] | |
| elif len(result.failures) == 1 and not result.errors: | |
| err = result.failures[0][1] | |
| else: | |
| err = "multiple errors occurred" | |
| if not verbose: | |
| err += "; run in verbose mode for details" | |
| raise TestFailed(err) | |
| def run_unittest(*classes): | |
| """Run tests from unittest.TestCase-derived classes.""" | |
| valid_types = (unittest.TestSuite, unittest.TestCase) | |
| suite = unittest.TestSuite() | |
| for cls in classes: | |
| if isinstance(cls, str): | |
| if cls in sys.modules: | |
| suite.addTest(unittest.findTestCases(sys.modules[cls])) | |
| else: | |
| raise ValueError("str arguments must be keys in sys.modules") | |
| elif isinstance(cls, valid_types): | |
| suite.addTest(cls) | |
| else: | |
| suite.addTest(unittest.makeSuite(cls)) | |
| _run_suite(suite) | |
| #======================================================================= | |
| # doctest driver. | |
| def run_doctest(module, verbosity=None): | |
| """Run doctest on the given module. Return (#failures, #tests). | |
| If optional argument verbosity is not specified (or is None), pass | |
| test_support's belief about verbosity on to doctest. Else doctest's | |
| usual behavior is used (it searches sys.argv for -v). | |
| """ | |
| import doctest | |
| if verbosity is None: | |
| verbosity = verbose | |
| else: | |
| verbosity = None | |
| # Direct doctest output (normally just errors) to real stdout; doctest | |
| # output shouldn't be compared by regrtest. | |
| save_stdout = sys.stdout | |
| sys.stdout = get_original_stdout() | |
| try: | |
| f, t = doctest.testmod(module, verbose=verbosity) | |
| if f: | |
| raise TestFailed("%d of %d doctests failed" % (f, t)) | |
| finally: | |
| sys.stdout = save_stdout | |
| if verbose: | |
| print 'doctest (%s) ... %d tests with zero failures' % (module.__name__, t) | |
| return f, t | |
| #======================================================================= | |
| # Threading support to prevent reporting refleaks when running regrtest.py -R | |
| # NOTE: we use thread._count() rather than threading.enumerate() (or the | |
| # moral equivalent thereof) because a threading.Thread object is still alive | |
| # until its __bootstrap() method has returned, even after it has been | |
| # unregistered from the threading module. | |
| # thread._count(), on the other hand, only gets decremented *after* the | |
| # __bootstrap() method has returned, which gives us reliable reference counts | |
| # at the end of a test run. | |
| def threading_setup(): | |
| if thread: | |
| return thread._count(), | |
| else: | |
| return 1, | |
| def threading_cleanup(nb_threads): | |
| if not thread: | |
| return | |
| _MAX_COUNT = 10 | |
| for count in range(_MAX_COUNT): | |
| n = thread._count() | |
| if n == nb_threads: | |
| break | |
| time.sleep(0.1) | |
| # XXX print a warning in case of failure? | |
| def reap_threads(func): | |
| """Use this function when threads are being used. This will | |
| ensure that the threads are cleaned up even when the test fails. | |
| If threading is unavailable this function does nothing. | |
| """ | |
| if not thread: | |
| return func | |
| @functools.wraps(func) | |
| def decorator(*args): | |
| key = threading_setup() | |
| try: | |
| return func(*args) | |
| finally: | |
| threading_cleanup(*key) | |
| return decorator | |
| def reap_children(): | |
| """Use this function at the end of test_main() whenever sub-processes | |
| are started. This will help ensure that no extra children (zombies) | |
| stick around to hog resources and create problems when looking | |
| for refleaks. | |
| """ | |
| # Reap all our dead child processes so we don't leave zombies around. | |
| # These hog resources and might be causing some of the buildbots to die. | |
| if hasattr(os, 'waitpid'): | |
| any_process = -1 | |
| while True: | |
| try: | |
| # This will raise an exception on Windows. That's ok. | |
| pid, status = os.waitpid(any_process, os.WNOHANG) | |
| if pid == 0: | |
| break | |
| except: | |
| break | |
| def py3k_bytes(b): | |
| """Emulate the py3k bytes() constructor. | |
| NOTE: This is only a best effort function. | |
| """ | |
| try: | |
| # memoryview? | |
| return b.tobytes() | |
| except AttributeError: | |
| try: | |
| # iterable of ints? | |
| return b"".join(chr(x) for x in b) | |
| except TypeError: | |
| return bytes(b) | |
| def args_from_interpreter_flags(): | |
| """Return a list of command-line arguments reproducing the current | |
| settings in sys.flags.""" | |
| flag_opt_map = { | |
| 'bytes_warning': 'b', | |
| 'dont_write_bytecode': 'B', | |
| 'ignore_environment': 'E', | |
| 'no_user_site': 's', | |
| 'no_site': 'S', | |
| 'optimize': 'O', | |
| 'py3k_warning': '3', | |
| 'verbose': 'v', | |
| } | |
| args = [] | |
| for flag, opt in flag_opt_map.items(): | |
| v = getattr(sys.flags, flag) | |
| if v > 0: | |
| args.append('-' + opt * v) | |
| return args | |
| def strip_python_stderr(stderr): | |
| """Strip the stderr of a Python process from potential debug output | |
| emitted by the interpreter. | |
| This will typically be run on the result of the communicate() method | |
| of a subprocess.Popen object. | |
| """ | |
| stderr = re.sub(br"\[\d+ refs\]\r?\n?$", b"", stderr).strip() | |
| return stderr |