| import unittest | |
| from test import test_support | |
| from contextlib import closing | |
| import gc | |
| import pickle | |
| import select | |
| import signal | |
| import subprocess | |
| import traceback | |
| import sys, os, time, errno | |
| if sys.platform in ('os2', 'riscos'): | |
| raise unittest.SkipTest("Can't test signal on %s" % sys.platform) | |
| class HandlerBCalled(Exception): | |
| pass | |
| def exit_subprocess(): | |
| """Use os._exit(0) to exit the current subprocess. | |
| Otherwise, the test catches the SystemExit and continues executing | |
| in parallel with the original test, so you wind up with an | |
| exponential number of tests running concurrently. | |
| """ | |
| os._exit(0) | |
| def ignoring_eintr(__func, *args, **kwargs): | |
| try: | |
| return __func(*args, **kwargs) | |
| except EnvironmentError as e: | |
| if e.errno != errno.EINTR: | |
| raise | |
| return None | |
| @unittest.skipIf(sys.platform == "win32", "Not valid on Windows") | |
| class InterProcessSignalTests(unittest.TestCase): | |
| MAX_DURATION = 20 # Entire test should last at most 20 sec. | |
| def setUp(self): | |
| self.using_gc = gc.isenabled() | |
| gc.disable() | |
| def tearDown(self): | |
| if self.using_gc: | |
| gc.enable() | |
| def format_frame(self, frame, limit=None): | |
| return ''.join(traceback.format_stack(frame, limit=limit)) | |
| def handlerA(self, signum, frame): | |
| self.a_called = True | |
| if test_support.verbose: | |
| print "handlerA invoked from signal %s at:\n%s" % ( | |
| signum, self.format_frame(frame, limit=1)) | |
| def handlerB(self, signum, frame): | |
| self.b_called = True | |
| if test_support.verbose: | |
| print "handlerB invoked from signal %s at:\n%s" % ( | |
| signum, self.format_frame(frame, limit=1)) | |
| raise HandlerBCalled(signum, self.format_frame(frame)) | |
| def wait(self, child): | |
| """Wait for child to finish, ignoring EINTR.""" | |
| while True: | |
| try: | |
| child.wait() | |
| return | |
| except OSError as e: | |
| if e.errno != errno.EINTR: | |
| raise | |
| def run_test(self): | |
| # Install handlers. This function runs in a sub-process, so we | |
| # don't worry about re-setting the default handlers. | |
| signal.signal(signal.SIGHUP, self.handlerA) | |
| signal.signal(signal.SIGUSR1, self.handlerB) | |
| signal.signal(signal.SIGUSR2, signal.SIG_IGN) | |
| signal.signal(signal.SIGALRM, signal.default_int_handler) | |
| # Variables the signals will modify: | |
| self.a_called = False | |
| self.b_called = False | |
| # Let the sub-processes know who to send signals to. | |
| pid = os.getpid() | |
| if test_support.verbose: | |
| print "test runner's pid is", pid | |
| child = ignoring_eintr(subprocess.Popen, ['kill', '-HUP', str(pid)]) | |
| if child: | |
| self.wait(child) | |
| if not self.a_called: | |
| time.sleep(1) # Give the signal time to be delivered. | |
| self.assertTrue(self.a_called) | |
| self.assertFalse(self.b_called) | |
| self.a_called = False | |
| # Make sure the signal isn't delivered while the previous | |
| # Popen object is being destroyed, because __del__ swallows | |
| # exceptions. | |
| del child | |
| try: | |
| child = subprocess.Popen(['kill', '-USR1', str(pid)]) | |
| # This wait should be interrupted by the signal's exception. | |
| self.wait(child) | |
| time.sleep(1) # Give the signal time to be delivered. | |
| self.fail('HandlerBCalled exception not thrown') | |
| except HandlerBCalled: | |
| self.assertTrue(self.b_called) | |
| self.assertFalse(self.a_called) | |
| if test_support.verbose: | |
| print "HandlerBCalled exception caught" | |
| child = ignoring_eintr(subprocess.Popen, ['kill', '-USR2', str(pid)]) | |
| if child: | |
| self.wait(child) # Nothing should happen. | |
| try: | |
| signal.alarm(1) | |
| # The race condition in pause doesn't matter in this case, | |
| # since alarm is going to raise a KeyboardException, which | |
| # will skip the call. | |
| signal.pause() | |
| # But if another signal arrives before the alarm, pause | |
| # may return early. | |
| time.sleep(1) | |
| except KeyboardInterrupt: | |
| if test_support.verbose: | |
| print "KeyboardInterrupt (the alarm() went off)" | |
| except: | |
| self.fail("Some other exception woke us from pause: %s" % | |
| traceback.format_exc()) | |
| else: | |
| self.fail("pause returned of its own accord, and the signal" | |
| " didn't arrive after another second.") | |
| # Issue 3864. Unknown if this affects earlier versions of freebsd also. | |
| @unittest.skipIf(sys.platform=='freebsd6', | |
| 'inter process signals not reliable (do not mix well with threading) ' | |
| 'on freebsd6') | |
| def test_main(self): | |
| # This function spawns a child process to insulate the main | |
| # test-running process from all the signals. It then | |
| # communicates with that child process over a pipe and | |
| # re-raises information about any exceptions the child | |
| # throws. The real work happens in self.run_test(). | |
| os_done_r, os_done_w = os.pipe() | |
| with closing(os.fdopen(os_done_r)) as done_r, \ | |
| closing(os.fdopen(os_done_w, 'w')) as done_w: | |
| child = os.fork() | |
| if child == 0: | |
| # In the child process; run the test and report results | |
| # through the pipe. | |
| try: | |
| done_r.close() | |
| # Have to close done_w again here because | |
| # exit_subprocess() will skip the enclosing with block. | |
| with closing(done_w): | |
| try: | |
| self.run_test() | |
| except: | |
| pickle.dump(traceback.format_exc(), done_w) | |
| else: | |
| pickle.dump(None, done_w) | |
| except: | |
| print 'Uh oh, raised from pickle.' | |
| traceback.print_exc() | |
| finally: | |
| exit_subprocess() | |
| done_w.close() | |
| # Block for up to MAX_DURATION seconds for the test to finish. | |
| r, w, x = select.select([done_r], [], [], self.MAX_DURATION) | |
| if done_r in r: | |
| tb = pickle.load(done_r) | |
| if tb: | |
| self.fail(tb) | |
| else: | |
| os.kill(child, signal.SIGKILL) | |
| self.fail('Test deadlocked after %d seconds.' % | |
| self.MAX_DURATION) | |
| @unittest.skipIf(sys.platform == "win32", "Not valid on Windows") | |
| class BasicSignalTests(unittest.TestCase): | |
| def trivial_signal_handler(self, *args): | |
| pass | |
| def test_out_of_range_signal_number_raises_error(self): | |
| self.assertRaises(ValueError, signal.getsignal, 4242) | |
| self.assertRaises(ValueError, signal.signal, 4242, | |
| self.trivial_signal_handler) | |
| def test_setting_signal_handler_to_none_raises_error(self): | |
| self.assertRaises(TypeError, signal.signal, | |
| signal.SIGUSR1, None) | |
| def test_getsignal(self): | |
| hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler) | |
| self.assertEqual(signal.getsignal(signal.SIGHUP), | |
| self.trivial_signal_handler) | |
| signal.signal(signal.SIGHUP, hup) | |
| self.assertEqual(signal.getsignal(signal.SIGHUP), hup) | |
| @unittest.skipUnless(sys.platform == "win32", "Windows specific") | |
| class WindowsSignalTests(unittest.TestCase): | |
| def test_issue9324(self): | |
| # Updated for issue #10003, adding SIGBREAK | |
| handler = lambda x, y: None | |
| for sig in (signal.SIGABRT, signal.SIGBREAK, signal.SIGFPE, | |
| signal.SIGILL, signal.SIGINT, signal.SIGSEGV, | |
| signal.SIGTERM): | |
| # Set and then reset a handler for signals that work on windows | |
| signal.signal(sig, signal.signal(sig, handler)) | |
| with self.assertRaises(ValueError): | |
| signal.signal(-1, handler) | |
| with self.assertRaises(ValueError): | |
| signal.signal(7, handler) | |
| @unittest.skipIf(sys.platform == "win32", "Not valid on Windows") | |
| class WakeupSignalTests(unittest.TestCase): | |
| TIMEOUT_FULL = 10 | |
| TIMEOUT_HALF = 5 | |
| def test_wakeup_fd_early(self): | |
| import select | |
| signal.alarm(1) | |
| before_time = time.time() | |
| # We attempt to get a signal during the sleep, | |
| # before select is called | |
| time.sleep(self.TIMEOUT_FULL) | |
| mid_time = time.time() | |
| self.assertTrue(mid_time - before_time < self.TIMEOUT_HALF) | |
| select.select([self.read], [], [], self.TIMEOUT_FULL) | |
| after_time = time.time() | |
| self.assertTrue(after_time - mid_time < self.TIMEOUT_HALF) | |
| def test_wakeup_fd_during(self): | |
| import select | |
| signal.alarm(1) | |
| before_time = time.time() | |
| # We attempt to get a signal during the select call | |
| self.assertRaises(select.error, select.select, | |
| [self.read], [], [], self.TIMEOUT_FULL) | |
| after_time = time.time() | |
| self.assertTrue(after_time - before_time < self.TIMEOUT_HALF) | |
| def setUp(self): | |
| import fcntl | |
| self.alrm = signal.signal(signal.SIGALRM, lambda x,y:None) | |
| self.read, self.write = os.pipe() | |
| flags = fcntl.fcntl(self.write, fcntl.F_GETFL, 0) | |
| flags = flags | os.O_NONBLOCK | |
| fcntl.fcntl(self.write, fcntl.F_SETFL, flags) | |
| self.old_wakeup = signal.set_wakeup_fd(self.write) | |
| def tearDown(self): | |
| signal.set_wakeup_fd(self.old_wakeup) | |
| os.close(self.read) | |
| os.close(self.write) | |
| signal.signal(signal.SIGALRM, self.alrm) | |
| @unittest.skipIf(sys.platform == "win32", "Not valid on Windows") | |
| class SiginterruptTest(unittest.TestCase): | |
| def setUp(self): | |
| """Install a no-op signal handler that can be set to allow | |
| interrupts or not, and arrange for the original signal handler to be | |
| re-installed when the test is finished. | |
| """ | |
| self.signum = signal.SIGUSR1 | |
| oldhandler = signal.signal(self.signum, lambda x,y: None) | |
| self.addCleanup(signal.signal, self.signum, oldhandler) | |
| def readpipe_interrupted(self): | |
| """Perform a read during which a signal will arrive. Return True if the | |
| read is interrupted by the signal and raises an exception. Return False | |
| if it returns normally. | |
| """ | |
| # Create a pipe that can be used for the read. Also clean it up | |
| # when the test is over, since nothing else will (but see below for | |
| # the write end). | |
| r, w = os.pipe() | |
| self.addCleanup(os.close, r) | |
| # Create another process which can send a signal to this one to try | |
| # to interrupt the read. | |
| ppid = os.getpid() | |
| pid = os.fork() | |
| if pid == 0: | |
| # Child code: sleep to give the parent enough time to enter the | |
| # read() call (there's a race here, but it's really tricky to | |
| # eliminate it); then signal the parent process. Also, sleep | |
| # again to make it likely that the signal is delivered to the | |
| # parent process before the child exits. If the child exits | |
| # first, the write end of the pipe will be closed and the test | |
| # is invalid. | |
| try: | |
| time.sleep(0.2) | |
| os.kill(ppid, self.signum) | |
| time.sleep(0.2) | |
| finally: | |
| # No matter what, just exit as fast as possible now. | |
| exit_subprocess() | |
| else: | |
| # Parent code. | |
| # Make sure the child is eventually reaped, else it'll be a | |
| # zombie for the rest of the test suite run. | |
| self.addCleanup(os.waitpid, pid, 0) | |
| # Close the write end of the pipe. The child has a copy, so | |
| # it's not really closed until the child exits. We need it to | |
| # close when the child exits so that in the non-interrupt case | |
| # the read eventually completes, otherwise we could just close | |
| # it *after* the test. | |
| os.close(w) | |
| # Try the read and report whether it is interrupted or not to | |
| # the caller. | |
| try: | |
| d = os.read(r, 1) | |
| return False | |
| except OSError, err: | |
| if err.errno != errno.EINTR: | |
| raise | |
| return True | |
| def test_without_siginterrupt(self): | |
| """If a signal handler is installed and siginterrupt is not called | |
| at all, when that signal arrives, it interrupts a syscall that's in | |
| progress. | |
| """ | |
| i = self.readpipe_interrupted() | |
| self.assertTrue(i) | |
| # Arrival of the signal shouldn't have changed anything. | |
| i = self.readpipe_interrupted() | |
| self.assertTrue(i) | |
| def test_siginterrupt_on(self): | |
| """If a signal handler is installed and siginterrupt is called with | |
| a true value for the second argument, when that signal arrives, it | |
| interrupts a syscall that's in progress. | |
| """ | |
| signal.siginterrupt(self.signum, 1) | |
| i = self.readpipe_interrupted() | |
| self.assertTrue(i) | |
| # Arrival of the signal shouldn't have changed anything. | |
| i = self.readpipe_interrupted() | |
| self.assertTrue(i) | |
| def test_siginterrupt_off(self): | |
| """If a signal handler is installed and siginterrupt is called with | |
| a false value for the second argument, when that signal arrives, it | |
| does not interrupt a syscall that's in progress. | |
| """ | |
| signal.siginterrupt(self.signum, 0) | |
| i = self.readpipe_interrupted() | |
| self.assertFalse(i) | |
| # Arrival of the signal shouldn't have changed anything. | |
| i = self.readpipe_interrupted() | |
| self.assertFalse(i) | |
| @unittest.skipIf(sys.platform == "win32", "Not valid on Windows") | |
| class ItimerTest(unittest.TestCase): | |
| def setUp(self): | |
| self.hndl_called = False | |
| self.hndl_count = 0 | |
| self.itimer = None | |
| self.old_alarm = signal.signal(signal.SIGALRM, self.sig_alrm) | |
| def tearDown(self): | |
| signal.signal(signal.SIGALRM, self.old_alarm) | |
| if self.itimer is not None: # test_itimer_exc doesn't change this attr | |
| # just ensure that itimer is stopped | |
| signal.setitimer(self.itimer, 0) | |
| def sig_alrm(self, *args): | |
| self.hndl_called = True | |
| if test_support.verbose: | |
| print("SIGALRM handler invoked", args) | |
| def sig_vtalrm(self, *args): | |
| self.hndl_called = True | |
| if self.hndl_count > 3: | |
| # it shouldn't be here, because it should have been disabled. | |
| raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL " | |
| "timer.") | |
| elif self.hndl_count == 3: | |
| # disable ITIMER_VIRTUAL, this function shouldn't be called anymore | |
| signal.setitimer(signal.ITIMER_VIRTUAL, 0) | |
| if test_support.verbose: | |
| print("last SIGVTALRM handler call") | |
| self.hndl_count += 1 | |
| if test_support.verbose: | |
| print("SIGVTALRM handler invoked", args) | |
| def sig_prof(self, *args): | |
| self.hndl_called = True | |
| signal.setitimer(signal.ITIMER_PROF, 0) | |
| if test_support.verbose: | |
| print("SIGPROF handler invoked", args) | |
| def test_itimer_exc(self): | |
| # XXX I'm assuming -1 is an invalid itimer, but maybe some platform | |
| # defines it ? | |
| self.assertRaises(signal.ItimerError, signal.setitimer, -1, 0) | |
| # Negative times are treated as zero on some platforms. | |
| if 0: | |
| self.assertRaises(signal.ItimerError, | |
| signal.setitimer, signal.ITIMER_REAL, -1) | |
| def test_itimer_real(self): | |
| self.itimer = signal.ITIMER_REAL | |
| signal.setitimer(self.itimer, 1.0) | |
| if test_support.verbose: | |
| print("\ncall pause()...") | |
| signal.pause() | |
| self.assertEqual(self.hndl_called, True) | |
| # Issue 3864. Unknown if this affects earlier versions of freebsd also. | |
| @unittest.skipIf(sys.platform in ('freebsd6', 'netbsd5'), | |
| 'itimer not reliable (does not mix well with threading) on some BSDs.') | |
| def test_itimer_virtual(self): | |
| self.itimer = signal.ITIMER_VIRTUAL | |
| signal.signal(signal.SIGVTALRM, self.sig_vtalrm) | |
| signal.setitimer(self.itimer, 0.3, 0.2) | |
| start_time = time.time() | |
| while time.time() - start_time < 60.0: | |
| # use up some virtual time by doing real work | |
| _ = pow(12345, 67890, 10000019) | |
| if signal.getitimer(self.itimer) == (0.0, 0.0): | |
| break # sig_vtalrm handler stopped this itimer | |
| else: # Issue 8424 | |
| self.skipTest("timeout: likely cause: machine too slow or load too " | |
| "high") | |
| # virtual itimer should be (0.0, 0.0) now | |
| self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0)) | |
| # and the handler should have been called | |
| self.assertEqual(self.hndl_called, True) | |
| # Issue 3864. Unknown if this affects earlier versions of freebsd also. | |
| @unittest.skipIf(sys.platform=='freebsd6', | |
| 'itimer not reliable (does not mix well with threading) on freebsd6') | |
| def test_itimer_prof(self): | |
| self.itimer = signal.ITIMER_PROF | |
| signal.signal(signal.SIGPROF, self.sig_prof) | |
| signal.setitimer(self.itimer, 0.2, 0.2) | |
| start_time = time.time() | |
| while time.time() - start_time < 60.0: | |
| # do some work | |
| _ = pow(12345, 67890, 10000019) | |
| if signal.getitimer(self.itimer) == (0.0, 0.0): | |
| break # sig_prof handler stopped this itimer | |
| else: # Issue 8424 | |
| self.skipTest("timeout: likely cause: machine too slow or load too " | |
| "high") | |
| # profiling itimer should be (0.0, 0.0) now | |
| self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0)) | |
| # and the handler should have been called | |
| self.assertEqual(self.hndl_called, True) | |
| def test_main(): | |
| test_support.run_unittest(BasicSignalTests, InterProcessSignalTests, | |
| WakeupSignalTests, SiginterruptTest, | |
| ItimerTest, WindowsSignalTests) | |
| if __name__ == "__main__": | |
| test_main() |