| import os | |
| import pprint | |
| import unittest | |
| import tempfile | |
| import _hotshot | |
| import gc | |
| from test import test_support | |
| # Silence Py3k warning | |
| hotshot = test_support.import_module('hotshot', deprecated=True) | |
| from hotshot.log import ENTER, EXIT, LINE | |
| from hotshot import stats | |
| def shortfilename(fn): | |
| # We use a really shortened filename since an exact match is made, | |
| # and the source may be either a Python source file or a | |
| # pre-compiled bytecode file. | |
| if fn: | |
| return os.path.splitext(os.path.basename(fn))[0] | |
| else: | |
| return fn | |
| class UnlinkingLogReader(hotshot.log.LogReader): | |
| """Extend the LogReader so the log file is unlinked when we're | |
| done with it.""" | |
| def __init__(self, logfn): | |
| self.__logfn = logfn | |
| hotshot.log.LogReader.__init__(self, logfn) | |
| def next(self, index=None): | |
| try: | |
| return hotshot.log.LogReader.next(self) | |
| except StopIteration: | |
| self.close() | |
| os.unlink(self.__logfn) | |
| raise | |
| class HotShotTestCase(unittest.TestCase): | |
| def new_profiler(self, lineevents=0, linetimings=1): | |
| self.logfn = test_support.TESTFN | |
| return hotshot.Profile(self.logfn, lineevents, linetimings) | |
| def get_logreader(self): | |
| return UnlinkingLogReader(self.logfn) | |
| def get_events_wotime(self): | |
| L = [] | |
| for event in self.get_logreader(): | |
| what, (filename, lineno, funcname), tdelta = event | |
| L.append((what, (shortfilename(filename), lineno, funcname))) | |
| return L | |
| def check_events(self, expected): | |
| events = self.get_events_wotime() | |
| if events != expected: | |
| self.fail( | |
| "events did not match expectation; got:\n%s\nexpected:\n%s" | |
| % (pprint.pformat(events), pprint.pformat(expected))) | |
| def run_test(self, callable, events, profiler=None): | |
| if profiler is None: | |
| profiler = self.new_profiler() | |
| self.assertTrue(not profiler._prof.closed) | |
| profiler.runcall(callable) | |
| self.assertTrue(not profiler._prof.closed) | |
| profiler.close() | |
| self.assertTrue(profiler._prof.closed) | |
| self.check_events(events) | |
| def test_addinfo(self): | |
| def f(p): | |
| p.addinfo("test-key", "test-value") | |
| profiler = self.new_profiler() | |
| profiler.runcall(f, profiler) | |
| profiler.close() | |
| log = self.get_logreader() | |
| info = log._info | |
| list(log) | |
| self.assertTrue(info["test-key"] == ["test-value"]) | |
| def test_line_numbers(self): | |
| def f(): | |
| y = 2 | |
| x = 1 | |
| def g(): | |
| f() | |
| f_lineno = f.func_code.co_firstlineno | |
| g_lineno = g.func_code.co_firstlineno | |
| events = [(ENTER, ("test_hotshot", g_lineno, "g")), | |
| (LINE, ("test_hotshot", g_lineno+1, "g")), | |
| (ENTER, ("test_hotshot", f_lineno, "f")), | |
| (LINE, ("test_hotshot", f_lineno+1, "f")), | |
| (LINE, ("test_hotshot", f_lineno+2, "f")), | |
| (EXIT, ("test_hotshot", f_lineno, "f")), | |
| (EXIT, ("test_hotshot", g_lineno, "g")), | |
| ] | |
| self.run_test(g, events, self.new_profiler(lineevents=1)) | |
| def test_start_stop(self): | |
| # Make sure we don't return NULL in the start() and stop() | |
| # methods when there isn't an error. Bug in 2.2 noted by | |
| # Anthony Baxter. | |
| profiler = self.new_profiler() | |
| profiler.start() | |
| profiler.stop() | |
| profiler.close() | |
| os.unlink(self.logfn) | |
| def test_bad_sys_path(self): | |
| import sys | |
| import os | |
| orig_path = sys.path | |
| coverage = hotshot._hotshot.coverage | |
| try: | |
| # verify we require a list for sys.path | |
| sys.path = 'abc' | |
| self.assertRaises(RuntimeError, coverage, test_support.TESTFN) | |
| # verify that we require sys.path exists | |
| del sys.path | |
| self.assertRaises(RuntimeError, coverage, test_support.TESTFN) | |
| finally: | |
| sys.path = orig_path | |
| if os.path.exists(test_support.TESTFN): | |
| os.remove(test_support.TESTFN) | |
| def test_logreader_eof_error(self): | |
| emptyfile = tempfile.NamedTemporaryFile() | |
| try: | |
| self.assertRaises((IOError, EOFError), _hotshot.logreader, | |
| emptyfile.name) | |
| finally: | |
| emptyfile.close() | |
| gc.collect() | |
| def test_load_stats(self): | |
| def start(prof): | |
| prof.start() | |
| # Make sure stats can be loaded when start and stop of profiler | |
| # are not executed in the same stack frame. | |
| profiler = self.new_profiler() | |
| start(profiler) | |
| profiler.stop() | |
| profiler.close() | |
| stats.load(self.logfn) | |
| os.unlink(self.logfn) | |
| def test_main(): | |
| test_support.run_unittest(HotShotTestCase) | |
| if __name__ == "__main__": | |
| test_main() |