| # Test the support for SSL and sockets | |
| import sys | |
| import unittest | |
| from test import test_support | |
| import asyncore | |
| import socket | |
| import select | |
| import time | |
| import gc | |
| import os | |
| import errno | |
| import pprint | |
| import urllib, urlparse | |
| import traceback | |
| import weakref | |
| import functools | |
| import platform | |
| from BaseHTTPServer import HTTPServer | |
| from SimpleHTTPServer import SimpleHTTPRequestHandler | |
| ssl = test_support.import_module("ssl") | |
| HOST = test_support.HOST | |
| CERTFILE = None | |
| SVN_PYTHON_ORG_ROOT_CERT = None | |
| def handle_error(prefix): | |
| exc_format = ' '.join(traceback.format_exception(*sys.exc_info())) | |
| if test_support.verbose: | |
| sys.stdout.write(prefix + exc_format) | |
| class BasicTests(unittest.TestCase): | |
| def test_sslwrap_simple(self): | |
| # A crude test for the legacy API | |
| try: | |
| ssl.sslwrap_simple(socket.socket(socket.AF_INET)) | |
| except IOError, e: | |
| if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that | |
| pass | |
| else: | |
| raise | |
| try: | |
| ssl.sslwrap_simple(socket.socket(socket.AF_INET)._sock) | |
| except IOError, e: | |
| if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that | |
| pass | |
| else: | |
| raise | |
| # Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2 | |
| def skip_if_broken_ubuntu_ssl(func): | |
| if hasattr(ssl, 'PROTOCOL_SSLv2'): | |
| # We need to access the lower-level wrapper in order to create an | |
| # implicit SSL context without trying to connect or listen. | |
| try: | |
| import _ssl | |
| except ImportError: | |
| # The returned function won't get executed, just ignore the error | |
| pass | |
| @functools.wraps(func) | |
| def f(*args, **kwargs): | |
| try: | |
| s = socket.socket(socket.AF_INET) | |
| _ssl.sslwrap(s._sock, 0, None, None, | |
| ssl.CERT_NONE, ssl.PROTOCOL_SSLv2, None, None) | |
| except ssl.SSLError as e: | |
| if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and | |
| platform.linux_distribution() == ('debian', 'squeeze/sid', '') | |
| and 'Invalid SSL protocol variant specified' in str(e)): | |
| raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour") | |
| return func(*args, **kwargs) | |
| return f | |
| else: | |
| return func | |
| class BasicSocketTests(unittest.TestCase): | |
| def test_constants(self): | |
| #ssl.PROTOCOL_SSLv2 | |
| ssl.PROTOCOL_SSLv23 | |
| ssl.PROTOCOL_SSLv3 | |
| ssl.PROTOCOL_TLSv1 | |
| ssl.CERT_NONE | |
| ssl.CERT_OPTIONAL | |
| ssl.CERT_REQUIRED | |
| def test_random(self): | |
| v = ssl.RAND_status() | |
| if test_support.verbose: | |
| sys.stdout.write("\n RAND_status is %d (%s)\n" | |
| % (v, (v and "sufficient randomness") or | |
| "insufficient randomness")) | |
| try: | |
| ssl.RAND_egd(1) | |
| except TypeError: | |
| pass | |
| else: | |
| print "didn't raise TypeError" | |
| ssl.RAND_add("this is a random string", 75.0) | |
| def test_parse_cert(self): | |
| # note that this uses an 'unofficial' function in _ssl.c, | |
| # provided solely for this test, to exercise the certificate | |
| # parsing code | |
| p = ssl._ssl._test_decode_cert(CERTFILE, False) | |
| if test_support.verbose: | |
| sys.stdout.write("\n" + pprint.pformat(p) + "\n") | |
| def test_DER_to_PEM(self): | |
| with open(SVN_PYTHON_ORG_ROOT_CERT, 'r') as f: | |
| pem = f.read() | |
| d1 = ssl.PEM_cert_to_DER_cert(pem) | |
| p2 = ssl.DER_cert_to_PEM_cert(d1) | |
| d2 = ssl.PEM_cert_to_DER_cert(p2) | |
| self.assertEqual(d1, d2) | |
| if not p2.startswith(ssl.PEM_HEADER + '\n'): | |
| self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2) | |
| if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'): | |
| self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2) | |
| def test_openssl_version(self): | |
| n = ssl.OPENSSL_VERSION_NUMBER | |
| t = ssl.OPENSSL_VERSION_INFO | |
| s = ssl.OPENSSL_VERSION | |
| self.assertIsInstance(n, (int, long)) | |
| self.assertIsInstance(t, tuple) | |
| self.assertIsInstance(s, str) | |
| # Some sanity checks follow | |
| # >= 0.9 | |
| self.assertGreaterEqual(n, 0x900000) | |
| # < 2.0 | |
| self.assertLess(n, 0x20000000) | |
| major, minor, fix, patch, status = t | |
| self.assertGreaterEqual(major, 0) | |
| self.assertLess(major, 2) | |
| self.assertGreaterEqual(minor, 0) | |
| self.assertLess(minor, 256) | |
| self.assertGreaterEqual(fix, 0) | |
| self.assertLess(fix, 256) | |
| self.assertGreaterEqual(patch, 0) | |
| self.assertLessEqual(patch, 26) | |
| self.assertGreaterEqual(status, 0) | |
| self.assertLessEqual(status, 15) | |
| # Version string as returned by OpenSSL, the format might change | |
| self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)), | |
| (s, t)) | |
| def test_ciphers(self): | |
| if not test_support.is_resource_enabled('network'): | |
| return | |
| remote = ("svn.python.org", 443) | |
| with test_support.transient_internet(remote[0]): | |
| s = ssl.wrap_socket(socket.socket(socket.AF_INET), | |
| cert_reqs=ssl.CERT_NONE, ciphers="ALL") | |
| s.connect(remote) | |
| s = ssl.wrap_socket(socket.socket(socket.AF_INET), | |
| cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") | |
| s.connect(remote) | |
| # Error checking occurs when connecting, because the SSL context | |
| # isn't created before. | |
| s = ssl.wrap_socket(socket.socket(socket.AF_INET), | |
| cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx") | |
| with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"): | |
| s.connect(remote) | |
| @test_support.cpython_only | |
| def test_refcycle(self): | |
| # Issue #7943: an SSL object doesn't create reference cycles with | |
| # itself. | |
| s = socket.socket(socket.AF_INET) | |
| ss = ssl.wrap_socket(s) | |
| wr = weakref.ref(ss) | |
| del ss | |
| self.assertEqual(wr(), None) | |
| def test_wrapped_unconnected(self): | |
| # The _delegate_methods in socket.py are correctly delegated to by an | |
| # unconnected SSLSocket, so they will raise a socket.error rather than | |
| # something unexpected like TypeError. | |
| s = socket.socket(socket.AF_INET) | |
| ss = ssl.wrap_socket(s) | |
| self.assertRaises(socket.error, ss.recv, 1) | |
| self.assertRaises(socket.error, ss.recv_into, bytearray(b'x')) | |
| self.assertRaises(socket.error, ss.recvfrom, 1) | |
| self.assertRaises(socket.error, ss.recvfrom_into, bytearray(b'x'), 1) | |
| self.assertRaises(socket.error, ss.send, b'x') | |
| self.assertRaises(socket.error, ss.sendto, b'x', ('0.0.0.0', 0)) | |
| class NetworkedTests(unittest.TestCase): | |
| def test_connect(self): | |
| with test_support.transient_internet("svn.python.org"): | |
| s = ssl.wrap_socket(socket.socket(socket.AF_INET), | |
| cert_reqs=ssl.CERT_NONE) | |
| s.connect(("svn.python.org", 443)) | |
| c = s.getpeercert() | |
| if c: | |
| self.fail("Peer cert %s shouldn't be here!") | |
| s.close() | |
| # this should fail because we have no verification certs | |
| s = ssl.wrap_socket(socket.socket(socket.AF_INET), | |
| cert_reqs=ssl.CERT_REQUIRED) | |
| try: | |
| s.connect(("svn.python.org", 443)) | |
| except ssl.SSLError: | |
| pass | |
| finally: | |
| s.close() | |
| # this should succeed because we specify the root cert | |
| s = ssl.wrap_socket(socket.socket(socket.AF_INET), | |
| cert_reqs=ssl.CERT_REQUIRED, | |
| ca_certs=SVN_PYTHON_ORG_ROOT_CERT) | |
| try: | |
| s.connect(("svn.python.org", 443)) | |
| finally: | |
| s.close() | |
| def test_connect_ex(self): | |
| # Issue #11326: check connect_ex() implementation | |
| with test_support.transient_internet("svn.python.org"): | |
| s = ssl.wrap_socket(socket.socket(socket.AF_INET), | |
| cert_reqs=ssl.CERT_REQUIRED, | |
| ca_certs=SVN_PYTHON_ORG_ROOT_CERT) | |
| try: | |
| self.assertEqual(0, s.connect_ex(("svn.python.org", 443))) | |
| self.assertTrue(s.getpeercert()) | |
| finally: | |
| s.close() | |
| def test_non_blocking_connect_ex(self): | |
| # Issue #11326: non-blocking connect_ex() should allow handshake | |
| # to proceed after the socket gets ready. | |
| with test_support.transient_internet("svn.python.org"): | |
| s = ssl.wrap_socket(socket.socket(socket.AF_INET), | |
| cert_reqs=ssl.CERT_REQUIRED, | |
| ca_certs=SVN_PYTHON_ORG_ROOT_CERT, | |
| do_handshake_on_connect=False) | |
| try: | |
| s.setblocking(False) | |
| rc = s.connect_ex(('svn.python.org', 443)) | |
| # EWOULDBLOCK under Windows, EINPROGRESS elsewhere | |
| self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK)) | |
| # Wait for connect to finish | |
| select.select([], [s], [], 5.0) | |
| # Non-blocking handshake | |
| while True: | |
| try: | |
| s.do_handshake() | |
| break | |
| except ssl.SSLError as err: | |
| if err.args[0] == ssl.SSL_ERROR_WANT_READ: | |
| select.select([s], [], [], 5.0) | |
| elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE: | |
| select.select([], [s], [], 5.0) | |
| else: | |
| raise | |
| # SSL established | |
| self.assertTrue(s.getpeercert()) | |
| finally: | |
| s.close() | |
| @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows") | |
| def test_makefile_close(self): | |
| # Issue #5238: creating a file-like object with makefile() shouldn't | |
| # delay closing the underlying "real socket" (here tested with its | |
| # file descriptor, hence skipping the test under Windows). | |
| with test_support.transient_internet("svn.python.org"): | |
| ss = ssl.wrap_socket(socket.socket(socket.AF_INET)) | |
| ss.connect(("svn.python.org", 443)) | |
| fd = ss.fileno() | |
| f = ss.makefile() | |
| f.close() | |
| # The fd is still open | |
| os.read(fd, 0) | |
| # Closing the SSL socket should close the fd too | |
| ss.close() | |
| gc.collect() | |
| with self.assertRaises(OSError) as e: | |
| os.read(fd, 0) | |
| self.assertEqual(e.exception.errno, errno.EBADF) | |
| def test_non_blocking_handshake(self): | |
| with test_support.transient_internet("svn.python.org"): | |
| s = socket.socket(socket.AF_INET) | |
| s.connect(("svn.python.org", 443)) | |
| s.setblocking(False) | |
| s = ssl.wrap_socket(s, | |
| cert_reqs=ssl.CERT_NONE, | |
| do_handshake_on_connect=False) | |
| count = 0 | |
| while True: | |
| try: | |
| count += 1 | |
| s.do_handshake() | |
| break | |
| except ssl.SSLError, err: | |
| if err.args[0] == ssl.SSL_ERROR_WANT_READ: | |
| select.select([s], [], []) | |
| elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE: | |
| select.select([], [s], []) | |
| else: | |
| raise | |
| s.close() | |
| if test_support.verbose: | |
| sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count) | |
| def test_get_server_certificate(self): | |
| with test_support.transient_internet("svn.python.org"): | |
| pem = ssl.get_server_certificate(("svn.python.org", 443)) | |
| if not pem: | |
| self.fail("No server certificate on svn.python.org:443!") | |
| try: | |
| pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=CERTFILE) | |
| except ssl.SSLError: | |
| #should fail | |
| pass | |
| else: | |
| self.fail("Got server certificate %s for svn.python.org!" % pem) | |
| pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=SVN_PYTHON_ORG_ROOT_CERT) | |
| if not pem: | |
| self.fail("No server certificate on svn.python.org:443!") | |
| if test_support.verbose: | |
| sys.stdout.write("\nVerified certificate for svn.python.org:443 is\n%s\n" % pem) | |
| def test_algorithms(self): | |
| # Issue #8484: all algorithms should be available when verifying a | |
| # certificate. | |
| # SHA256 was added in OpenSSL 0.9.8 | |
| if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15): | |
| self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION) | |
| # NOTE: https://sha256.tbs-internet.com is another possible test host | |
| remote = ("sha256.tbs-internet.com", 443) | |
| sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem") | |
| with test_support.transient_internet("sha256.tbs-internet.com"): | |
| s = ssl.wrap_socket(socket.socket(socket.AF_INET), | |
| cert_reqs=ssl.CERT_REQUIRED, | |
| ca_certs=sha256_cert,) | |
| try: | |
| s.connect(remote) | |
| if test_support.verbose: | |
| sys.stdout.write("\nCipher with %r is %r\n" % | |
| (remote, s.cipher())) | |
| sys.stdout.write("Certificate is:\n%s\n" % | |
| pprint.pformat(s.getpeercert())) | |
| finally: | |
| s.close() | |
| try: | |
| import threading | |
| except ImportError: | |
| _have_threads = False | |
| else: | |
| _have_threads = True | |
| class ThreadedEchoServer(threading.Thread): | |
| class ConnectionHandler(threading.Thread): | |
| """A mildly complicated class, because we want it to work both | |
| with and without the SSL wrapper around the socket connection, so | |
| that we can test the STARTTLS functionality.""" | |
| def __init__(self, server, connsock): | |
| self.server = server | |
| self.running = False | |
| self.sock = connsock | |
| self.sock.setblocking(1) | |
| self.sslconn = None | |
| threading.Thread.__init__(self) | |
| self.daemon = True | |
| def show_conn_details(self): | |
| if self.server.certreqs == ssl.CERT_REQUIRED: | |
| cert = self.sslconn.getpeercert() | |
| if test_support.verbose and self.server.chatty: | |
| sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n") | |
| cert_binary = self.sslconn.getpeercert(True) | |
| if test_support.verbose and self.server.chatty: | |
| sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n") | |
| cipher = self.sslconn.cipher() | |
| if test_support.verbose and self.server.chatty: | |
| sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n") | |
| def wrap_conn(self): | |
| try: | |
| self.sslconn = ssl.wrap_socket(self.sock, server_side=True, | |
| certfile=self.server.certificate, | |
| ssl_version=self.server.protocol, | |
| ca_certs=self.server.cacerts, | |
| cert_reqs=self.server.certreqs, | |
| ciphers=self.server.ciphers) | |
| except ssl.SSLError: | |
| # XXX Various errors can have happened here, for example | |
| # a mismatching protocol version, an invalid certificate, | |
| # or a low-level bug. This should be made more discriminating. | |
| if self.server.chatty: | |
| handle_error("\n server: bad connection attempt from " + | |
| str(self.sock.getpeername()) + ":\n") | |
| self.close() | |
| self.running = False | |
| self.server.stop() | |
| return False | |
| else: | |
| return True | |
| def read(self): | |
| if self.sslconn: | |
| return self.sslconn.read() | |
| else: | |
| return self.sock.recv(1024) | |
| def write(self, bytes): | |
| if self.sslconn: | |
| return self.sslconn.write(bytes) | |
| else: | |
| return self.sock.send(bytes) | |
| def close(self): | |
| if self.sslconn: | |
| self.sslconn.close() | |
| else: | |
| self.sock._sock.close() | |
| def run(self): | |
| self.running = True | |
| if not self.server.starttls_server: | |
| if isinstance(self.sock, ssl.SSLSocket): | |
| self.sslconn = self.sock | |
| elif not self.wrap_conn(): | |
| return | |
| self.show_conn_details() | |
| while self.running: | |
| try: | |
| msg = self.read() | |
| if not msg: | |
| # eof, so quit this handler | |
| self.running = False | |
| self.close() | |
| elif msg.strip() == 'over': | |
| if test_support.verbose and self.server.connectionchatty: | |
| sys.stdout.write(" server: client closed connection\n") | |
| self.close() | |
| return | |
| elif self.server.starttls_server and msg.strip() == 'STARTTLS': | |
| if test_support.verbose and self.server.connectionchatty: | |
| sys.stdout.write(" server: read STARTTLS from client, sending OK...\n") | |
| self.write("OK\n") | |
| if not self.wrap_conn(): | |
| return | |
| elif self.server.starttls_server and self.sslconn and msg.strip() == 'ENDTLS': | |
| if test_support.verbose and self.server.connectionchatty: | |
| sys.stdout.write(" server: read ENDTLS from client, sending OK...\n") | |
| self.write("OK\n") | |
| self.sslconn.unwrap() | |
| self.sslconn = None | |
| if test_support.verbose and self.server.connectionchatty: | |
| sys.stdout.write(" server: connection is now unencrypted...\n") | |
| else: | |
| if (test_support.verbose and | |
| self.server.connectionchatty): | |
| ctype = (self.sslconn and "encrypted") or "unencrypted" | |
| sys.stdout.write(" server: read %s (%s), sending back %s (%s)...\n" | |
| % (repr(msg), ctype, repr(msg.lower()), ctype)) | |
| self.write(msg.lower()) | |
| except ssl.SSLError: | |
| if self.server.chatty: | |
| handle_error("Test server failure:\n") | |
| self.close() | |
| self.running = False | |
| # normally, we'd just stop here, but for the test | |
| # harness, we want to stop the server | |
| self.server.stop() | |
| def __init__(self, certificate, ssl_version=None, | |
| certreqs=None, cacerts=None, | |
| chatty=True, connectionchatty=False, starttls_server=False, | |
| wrap_accepting_socket=False, ciphers=None): | |
| if ssl_version is None: | |
| ssl_version = ssl.PROTOCOL_TLSv1 | |
| if certreqs is None: | |
| certreqs = ssl.CERT_NONE | |
| self.certificate = certificate | |
| self.protocol = ssl_version | |
| self.certreqs = certreqs | |
| self.cacerts = cacerts | |
| self.ciphers = ciphers | |
| self.chatty = chatty | |
| self.connectionchatty = connectionchatty | |
| self.starttls_server = starttls_server | |
| self.sock = socket.socket() | |
| self.flag = None | |
| if wrap_accepting_socket: | |
| self.sock = ssl.wrap_socket(self.sock, server_side=True, | |
| certfile=self.certificate, | |
| cert_reqs = self.certreqs, | |
| ca_certs = self.cacerts, | |
| ssl_version = self.protocol, | |
| ciphers = self.ciphers) | |
| if test_support.verbose and self.chatty: | |
| sys.stdout.write(' server: wrapped server socket as %s\n' % str(self.sock)) | |
| self.port = test_support.bind_port(self.sock) | |
| self.active = False | |
| threading.Thread.__init__(self) | |
| self.daemon = True | |
| def start(self, flag=None): | |
| self.flag = flag | |
| threading.Thread.start(self) | |
| def run(self): | |
| self.sock.settimeout(0.05) | |
| self.sock.listen(5) | |
| self.active = True | |
| if self.flag: | |
| # signal an event | |
| self.flag.set() | |
| while self.active: | |
| try: | |
| newconn, connaddr = self.sock.accept() | |
| if test_support.verbose and self.chatty: | |
| sys.stdout.write(' server: new connection from ' | |
| + str(connaddr) + '\n') | |
| handler = self.ConnectionHandler(self, newconn) | |
| handler.start() | |
| except socket.timeout: | |
| pass | |
| except KeyboardInterrupt: | |
| self.stop() | |
| self.sock.close() | |
| def stop(self): | |
| self.active = False | |
| class AsyncoreEchoServer(threading.Thread): | |
| class EchoServer(asyncore.dispatcher): | |
| class ConnectionHandler(asyncore.dispatcher_with_send): | |
| def __init__(self, conn, certfile): | |
| asyncore.dispatcher_with_send.__init__(self, conn) | |
| self.socket = ssl.wrap_socket(conn, server_side=True, | |
| certfile=certfile, | |
| do_handshake_on_connect=False) | |
| self._ssl_accepting = True | |
| def readable(self): | |
| if isinstance(self.socket, ssl.SSLSocket): | |
| while self.socket.pending() > 0: | |
| self.handle_read_event() | |
| return True | |
| def _do_ssl_handshake(self): | |
| try: | |
| self.socket.do_handshake() | |
| except ssl.SSLError, err: | |
| if err.args[0] in (ssl.SSL_ERROR_WANT_READ, | |
| ssl.SSL_ERROR_WANT_WRITE): | |
| return | |
| elif err.args[0] == ssl.SSL_ERROR_EOF: | |
| return self.handle_close() | |
| raise | |
| except socket.error, err: | |
| if err.args[0] == errno.ECONNABORTED: | |
| return self.handle_close() | |
| else: | |
| self._ssl_accepting = False | |
| def handle_read(self): | |
| if self._ssl_accepting: | |
| self._do_ssl_handshake() | |
| else: | |
| data = self.recv(1024) | |
| if data and data.strip() != 'over': | |
| self.send(data.lower()) | |
| def handle_close(self): | |
| self.close() | |
| if test_support.verbose: | |
| sys.stdout.write(" server: closed connection %s\n" % self.socket) | |
| def handle_error(self): | |
| raise | |
| def __init__(self, certfile): | |
| self.certfile = certfile | |
| asyncore.dispatcher.__init__(self) | |
| self.create_socket(socket.AF_INET, socket.SOCK_STREAM) | |
| self.port = test_support.bind_port(self.socket) | |
| self.listen(5) | |
| def handle_accept(self): | |
| sock_obj, addr = self.accept() | |
| if test_support.verbose: | |
| sys.stdout.write(" server: new connection from %s:%s\n" %addr) | |
| self.ConnectionHandler(sock_obj, self.certfile) | |
| def handle_error(self): | |
| raise | |
| def __init__(self, certfile): | |
| self.flag = None | |
| self.active = False | |
| self.server = self.EchoServer(certfile) | |
| self.port = self.server.port | |
| threading.Thread.__init__(self) | |
| self.daemon = True | |
| def __str__(self): | |
| return "<%s %s>" % (self.__class__.__name__, self.server) | |
| def start(self, flag=None): | |
| self.flag = flag | |
| threading.Thread.start(self) | |
| def run(self): | |
| self.active = True | |
| if self.flag: | |
| self.flag.set() | |
| while self.active: | |
| asyncore.loop(0.05) | |
| def stop(self): | |
| self.active = False | |
| self.server.close() | |
| class SocketServerHTTPSServer(threading.Thread): | |
| class HTTPSServer(HTTPServer): | |
| def __init__(self, server_address, RequestHandlerClass, certfile): | |
| HTTPServer.__init__(self, server_address, RequestHandlerClass) | |
| # we assume the certfile contains both private key and certificate | |
| self.certfile = certfile | |
| self.allow_reuse_address = True | |
| def __str__(self): | |
| return ('<%s %s:%s>' % | |
| (self.__class__.__name__, | |
| self.server_name, | |
| self.server_port)) | |
| def get_request(self): | |
| # override this to wrap socket with SSL | |
| sock, addr = self.socket.accept() | |
| sslconn = ssl.wrap_socket(sock, server_side=True, | |
| certfile=self.certfile) | |
| return sslconn, addr | |
| class RootedHTTPRequestHandler(SimpleHTTPRequestHandler): | |
| # need to override translate_path to get a known root, | |
| # instead of using os.curdir, since the test could be | |
| # run from anywhere | |
| server_version = "TestHTTPS/1.0" | |
| root = None | |
| def translate_path(self, path): | |
| """Translate a /-separated PATH to the local filename syntax. | |
| Components that mean special things to the local file system | |
| (e.g. drive or directory names) are ignored. (XXX They should | |
| probably be diagnosed.) | |
| """ | |
| # abandon query parameters | |
| path = urlparse.urlparse(path)[2] | |
| path = os.path.normpath(urllib.unquote(path)) | |
| words = path.split('/') | |
| words = filter(None, words) | |
| path = self.root | |
| for word in words: | |
| drive, word = os.path.splitdrive(word) | |
| head, word = os.path.split(word) | |
| if word in self.root: continue | |
| path = os.path.join(path, word) | |
| return path | |
| def log_message(self, format, *args): | |
| # we override this to suppress logging unless "verbose" | |
| if test_support.verbose: | |
| sys.stdout.write(" server (%s:%d %s):\n [%s] %s\n" % | |
| (self.server.server_address, | |
| self.server.server_port, | |
| self.request.cipher(), | |
| self.log_date_time_string(), | |
| format%args)) | |
| def __init__(self, certfile): | |
| self.flag = None | |
| self.RootedHTTPRequestHandler.root = os.path.split(CERTFILE)[0] | |
| self.server = self.HTTPSServer( | |
| (HOST, 0), self.RootedHTTPRequestHandler, certfile) | |
| self.port = self.server.server_port | |
| threading.Thread.__init__(self) | |
| self.daemon = True | |
| def __str__(self): | |
| return "<%s %s>" % (self.__class__.__name__, self.server) | |
| def start(self, flag=None): | |
| self.flag = flag | |
| threading.Thread.start(self) | |
| def run(self): | |
| if self.flag: | |
| self.flag.set() | |
| self.server.serve_forever(0.05) | |
| def stop(self): | |
| self.server.shutdown() | |
| def bad_cert_test(certfile): | |
| """ | |
| Launch a server with CERT_REQUIRED, and check that trying to | |
| connect to it with the given client certificate fails. | |
| """ | |
| server = ThreadedEchoServer(CERTFILE, | |
| certreqs=ssl.CERT_REQUIRED, | |
| cacerts=CERTFILE, chatty=False) | |
| flag = threading.Event() | |
| server.start(flag) | |
| # wait for it to start | |
| flag.wait() | |
| # try to connect | |
| try: | |
| try: | |
| s = ssl.wrap_socket(socket.socket(), | |
| certfile=certfile, | |
| ssl_version=ssl.PROTOCOL_TLSv1) | |
| s.connect((HOST, server.port)) | |
| except ssl.SSLError, x: | |
| if test_support.verbose: | |
| sys.stdout.write("\nSSLError is %s\n" % x[1]) | |
| except socket.error, x: | |
| if test_support.verbose: | |
| sys.stdout.write("\nsocket.error is %s\n" % x[1]) | |
| else: | |
| raise AssertionError("Use of invalid cert should have failed!") | |
| finally: | |
| server.stop() | |
| server.join() | |
| def server_params_test(certfile, protocol, certreqs, cacertsfile, | |
| client_certfile, client_protocol=None, indata="FOO\n", | |
| ciphers=None, chatty=True, connectionchatty=False, | |
| wrap_accepting_socket=False): | |
| """ | |
| Launch a server, connect a client to it and try various reads | |
| and writes. | |
| """ | |
| server = ThreadedEchoServer(certfile, | |
| certreqs=certreqs, | |
| ssl_version=protocol, | |
| cacerts=cacertsfile, | |
| ciphers=ciphers, | |
| chatty=chatty, | |
| connectionchatty=connectionchatty, | |
| wrap_accepting_socket=wrap_accepting_socket) | |
| flag = threading.Event() | |
| server.start(flag) | |
| # wait for it to start | |
| flag.wait() | |
| # try to connect | |
| if client_protocol is None: | |
| client_protocol = protocol | |
| try: | |
| s = ssl.wrap_socket(socket.socket(), | |
| certfile=client_certfile, | |
| ca_certs=cacertsfile, | |
| ciphers=ciphers, | |
| cert_reqs=certreqs, | |
| ssl_version=client_protocol) | |
| s.connect((HOST, server.port)) | |
| for arg in [indata, bytearray(indata), memoryview(indata)]: | |
| if connectionchatty: | |
| if test_support.verbose: | |
| sys.stdout.write( | |
| " client: sending %s...\n" % (repr(arg))) | |
| s.write(arg) | |
| outdata = s.read() | |
| if connectionchatty: | |
| if test_support.verbose: | |
| sys.stdout.write(" client: read %s\n" % repr(outdata)) | |
| if outdata != indata.lower(): | |
| raise AssertionError( | |
| "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n" | |
| % (outdata[:min(len(outdata),20)], len(outdata), | |
| indata[:min(len(indata),20)].lower(), len(indata))) | |
| s.write("over\n") | |
| if connectionchatty: | |
| if test_support.verbose: | |
| sys.stdout.write(" client: closing connection.\n") | |
| s.close() | |
| finally: | |
| server.stop() | |
| server.join() | |
| def try_protocol_combo(server_protocol, | |
| client_protocol, | |
| expect_success, | |
| certsreqs=None): | |
| if certsreqs is None: | |
| certsreqs = ssl.CERT_NONE | |
| certtype = { | |
| ssl.CERT_NONE: "CERT_NONE", | |
| ssl.CERT_OPTIONAL: "CERT_OPTIONAL", | |
| ssl.CERT_REQUIRED: "CERT_REQUIRED", | |
| }[certsreqs] | |
| if test_support.verbose: | |
| formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n" | |
| sys.stdout.write(formatstr % | |
| (ssl.get_protocol_name(client_protocol), | |
| ssl.get_protocol_name(server_protocol), | |
| certtype)) | |
| try: | |
| # NOTE: we must enable "ALL" ciphers, otherwise an SSLv23 client | |
| # will send an SSLv3 hello (rather than SSLv2) starting from | |
| # OpenSSL 1.0.0 (see issue #8322). | |
| server_params_test(CERTFILE, server_protocol, certsreqs, | |
| CERTFILE, CERTFILE, client_protocol, | |
| ciphers="ALL", chatty=False) | |
| # Protocol mismatch can result in either an SSLError, or a | |
| # "Connection reset by peer" error. | |
| except ssl.SSLError: | |
| if expect_success: | |
| raise | |
| except socket.error as e: | |
| if expect_success or e.errno != errno.ECONNRESET: | |
| raise | |
| else: | |
| if not expect_success: | |
| raise AssertionError( | |
| "Client protocol %s succeeded with server protocol %s!" | |
| % (ssl.get_protocol_name(client_protocol), | |
| ssl.get_protocol_name(server_protocol))) | |
| class ThreadedTests(unittest.TestCase): | |
| def test_rude_shutdown(self): | |
| """A brutal shutdown of an SSL server should raise an IOError | |
| in the client when attempting handshake. | |
| """ | |
| listener_ready = threading.Event() | |
| listener_gone = threading.Event() | |
| s = socket.socket() | |
| port = test_support.bind_port(s, HOST) | |
| # `listener` runs in a thread. It sits in an accept() until | |
| # the main thread connects. Then it rudely closes the socket, | |
| # and sets Event `listener_gone` to let the main thread know | |
| # the socket is gone. | |
| def listener(): | |
| s.listen(5) | |
| listener_ready.set() | |
| s.accept() | |
| s.close() | |
| listener_gone.set() | |
| def connector(): | |
| listener_ready.wait() | |
| c = socket.socket() | |
| c.connect((HOST, port)) | |
| listener_gone.wait() | |
| try: | |
| ssl_sock = ssl.wrap_socket(c) | |
| except IOError: | |
| pass | |
| else: | |
| self.fail('connecting to closed SSL socket should have failed') | |
| t = threading.Thread(target=listener) | |
| t.start() | |
| try: | |
| connector() | |
| finally: | |
| t.join() | |
| @skip_if_broken_ubuntu_ssl | |
| def test_echo(self): | |
| """Basic test of an SSL client connecting to a server""" | |
| if test_support.verbose: | |
| sys.stdout.write("\n") | |
| server_params_test(CERTFILE, ssl.PROTOCOL_TLSv1, ssl.CERT_NONE, | |
| CERTFILE, CERTFILE, ssl.PROTOCOL_TLSv1, | |
| chatty=True, connectionchatty=True) | |
| def test_getpeercert(self): | |
| if test_support.verbose: | |
| sys.stdout.write("\n") | |
| s2 = socket.socket() | |
| server = ThreadedEchoServer(CERTFILE, | |
| certreqs=ssl.CERT_NONE, | |
| ssl_version=ssl.PROTOCOL_SSLv23, | |
| cacerts=CERTFILE, | |
| chatty=False) | |
| flag = threading.Event() | |
| server.start(flag) | |
| # wait for it to start | |
| flag.wait() | |
| # try to connect | |
| try: | |
| s = ssl.wrap_socket(socket.socket(), | |
| certfile=CERTFILE, | |
| ca_certs=CERTFILE, | |
| cert_reqs=ssl.CERT_REQUIRED, | |
| ssl_version=ssl.PROTOCOL_SSLv23) | |
| s.connect((HOST, server.port)) | |
| cert = s.getpeercert() | |
| self.assertTrue(cert, "Can't get peer certificate.") | |
| cipher = s.cipher() | |
| if test_support.verbose: | |
| sys.stdout.write(pprint.pformat(cert) + '\n') | |
| sys.stdout.write("Connection cipher is " + str(cipher) + '.\n') | |
| if 'subject' not in cert: | |
| self.fail("No subject field in certificate: %s." % | |
| pprint.pformat(cert)) | |
| if ((('organizationName', 'Python Software Foundation'),) | |
| not in cert['subject']): | |
| self.fail( | |
| "Missing or invalid 'organizationName' field in certificate subject; " | |
| "should be 'Python Software Foundation'.") | |
| s.close() | |
| finally: | |
| server.stop() | |
| server.join() | |
| def test_empty_cert(self): | |
| """Connecting with an empty cert file""" | |
| bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir, | |
| "nullcert.pem")) | |
| def test_malformed_cert(self): | |
| """Connecting with a badly formatted certificate (syntax error)""" | |
| bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir, | |
| "badcert.pem")) | |
| def test_nonexisting_cert(self): | |
| """Connecting with a non-existing cert file""" | |
| bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir, | |
| "wrongcert.pem")) | |
| def test_malformed_key(self): | |
| """Connecting with a badly formatted key (syntax error)""" | |
| bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir, | |
| "badkey.pem")) | |
| @skip_if_broken_ubuntu_ssl | |
| def test_protocol_sslv2(self): | |
| """Connecting to an SSLv2 server with various client options""" | |
| if test_support.verbose: | |
| sys.stdout.write("\n") | |
| try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True) | |
| try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL) | |
| try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED) | |
| try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, True) | |
| try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False) | |
| try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False) | |
| @skip_if_broken_ubuntu_ssl | |
| def test_protocol_sslv23(self): | |
| """Connecting to an SSLv23 server with various client options""" | |
| if test_support.verbose: | |
| sys.stdout.write("\n") | |
| try: | |
| try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True) | |
| except (ssl.SSLError, socket.error), x: | |
| # this fails on some older versions of OpenSSL (0.9.7l, for instance) | |
| if test_support.verbose: | |
| sys.stdout.write( | |
| " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n" | |
| % str(x)) | |
| try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True) | |
| try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True) | |
| try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True) | |
| try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL) | |
| try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL) | |
| try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL) | |
| try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED) | |
| try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED) | |
| try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED) | |
| @skip_if_broken_ubuntu_ssl | |
| def test_protocol_sslv3(self): | |
| """Connecting to an SSLv3 server with various client options""" | |
| if test_support.verbose: | |
| sys.stdout.write("\n") | |
| try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True) | |
| try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL) | |
| try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED) | |
| if hasattr(ssl, 'PROTOCOL_SSLv2'): | |
| try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False) | |
| try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False) | |
| try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False) | |
| @skip_if_broken_ubuntu_ssl | |
| def test_protocol_tlsv1(self): | |
| """Connecting to a TLSv1 server with various client options""" | |
| if test_support.verbose: | |
| sys.stdout.write("\n") | |
| try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True) | |
| try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL) | |
| try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED) | |
| if hasattr(ssl, 'PROTOCOL_SSLv2'): | |
| try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False) | |
| try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False) | |
| try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False) | |
| def test_starttls(self): | |
| """Switching from clear text to encrypted and back again.""" | |
| msgs = ("msg 1", "MSG 2", "STARTTLS", "MSG 3", "msg 4", "ENDTLS", "msg 5", "msg 6") | |
| server = ThreadedEchoServer(CERTFILE, | |
| ssl_version=ssl.PROTOCOL_TLSv1, | |
| starttls_server=True, | |
| chatty=True, | |
| connectionchatty=True) | |
| flag = threading.Event() | |
| server.start(flag) | |
| # wait for it to start | |
| flag.wait() | |
| # try to connect | |
| wrapped = False | |
| try: | |
| s = socket.socket() | |
| s.setblocking(1) | |
| s.connect((HOST, server.port)) | |
| if test_support.verbose: | |
| sys.stdout.write("\n") | |
| for indata in msgs: | |
| if test_support.verbose: | |
| sys.stdout.write( | |
| " client: sending %s...\n" % repr(indata)) | |
| if wrapped: | |
| conn.write(indata) | |
| outdata = conn.read() | |
| else: | |
| s.send(indata) | |
| outdata = s.recv(1024) | |
| if (indata == "STARTTLS" and | |
| outdata.strip().lower().startswith("ok")): | |
| # STARTTLS ok, switch to secure mode | |
| if test_support.verbose: | |
| sys.stdout.write( | |
| " client: read %s from server, starting TLS...\n" | |
| % repr(outdata)) | |
| conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1) | |
| wrapped = True | |
| elif (indata == "ENDTLS" and | |
| outdata.strip().lower().startswith("ok")): | |
| # ENDTLS ok, switch back to clear text | |
| if test_support.verbose: | |
| sys.stdout.write( | |
| " client: read %s from server, ending TLS...\n" | |
| % repr(outdata)) | |
| s = conn.unwrap() | |
| wrapped = False | |
| else: | |
| if test_support.verbose: | |
| sys.stdout.write( | |
| " client: read %s from server\n" % repr(outdata)) | |
| if test_support.verbose: | |
| sys.stdout.write(" client: closing connection.\n") | |
| if wrapped: | |
| conn.write("over\n") | |
| else: | |
| s.send("over\n") | |
| s.close() | |
| finally: | |
| server.stop() | |
| server.join() | |
| def test_socketserver(self): | |
| """Using a SocketServer to create and manage SSL connections.""" | |
| server = SocketServerHTTPSServer(CERTFILE) | |
| flag = threading.Event() | |
| server.start(flag) | |
| # wait for it to start | |
| flag.wait() | |
| # try to connect | |
| try: | |
| if test_support.verbose: | |
| sys.stdout.write('\n') | |
| with open(CERTFILE, 'rb') as f: | |
| d1 = f.read() | |
| d2 = '' | |
| # now fetch the same data from the HTTPS server | |
| url = 'https://127.0.0.1:%d/%s' % ( | |
| server.port, os.path.split(CERTFILE)[1]) | |
| with test_support.check_py3k_warnings(): | |
| f = urllib.urlopen(url) | |
| dlen = f.info().getheader("content-length") | |
| if dlen and (int(dlen) > 0): | |
| d2 = f.read(int(dlen)) | |
| if test_support.verbose: | |
| sys.stdout.write( | |
| " client: read %d bytes from remote server '%s'\n" | |
| % (len(d2), server)) | |
| f.close() | |
| self.assertEqual(d1, d2) | |
| finally: | |
| server.stop() | |
| server.join() | |
| def test_wrapped_accept(self): | |
| """Check the accept() method on SSL sockets.""" | |
| if test_support.verbose: | |
| sys.stdout.write("\n") | |
| server_params_test(CERTFILE, ssl.PROTOCOL_SSLv23, ssl.CERT_REQUIRED, | |
| CERTFILE, CERTFILE, ssl.PROTOCOL_SSLv23, | |
| chatty=True, connectionchatty=True, | |
| wrap_accepting_socket=True) | |
| def test_asyncore_server(self): | |
| """Check the example asyncore integration.""" | |
| indata = "TEST MESSAGE of mixed case\n" | |
| if test_support.verbose: | |
| sys.stdout.write("\n") | |
| server = AsyncoreEchoServer(CERTFILE) | |
| flag = threading.Event() | |
| server.start(flag) | |
| # wait for it to start | |
| flag.wait() | |
| # try to connect | |
| try: | |
| s = ssl.wrap_socket(socket.socket()) | |
| s.connect(('127.0.0.1', server.port)) | |
| if test_support.verbose: | |
| sys.stdout.write( | |
| " client: sending %s...\n" % (repr(indata))) | |
| s.write(indata) | |
| outdata = s.read() | |
| if test_support.verbose: | |
| sys.stdout.write(" client: read %s\n" % repr(outdata)) | |
| if outdata != indata.lower(): | |
| self.fail( | |
| "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n" | |
| % (outdata[:min(len(outdata),20)], len(outdata), | |
| indata[:min(len(indata),20)].lower(), len(indata))) | |
| s.write("over\n") | |
| if test_support.verbose: | |
| sys.stdout.write(" client: closing connection.\n") | |
| s.close() | |
| finally: | |
| server.stop() | |
| # wait for server thread to end | |
| server.join() | |
| def test_recv_send(self): | |
| """Test recv(), send() and friends.""" | |
| if test_support.verbose: | |
| sys.stdout.write("\n") | |
| server = ThreadedEchoServer(CERTFILE, | |
| certreqs=ssl.CERT_NONE, | |
| ssl_version=ssl.PROTOCOL_TLSv1, | |
| cacerts=CERTFILE, | |
| chatty=True, | |
| connectionchatty=False) | |
| flag = threading.Event() | |
| server.start(flag) | |
| # wait for it to start | |
| flag.wait() | |
| # try to connect | |
| s = ssl.wrap_socket(socket.socket(), | |
| server_side=False, | |
| certfile=CERTFILE, | |
| ca_certs=CERTFILE, | |
| cert_reqs=ssl.CERT_NONE, | |
| ssl_version=ssl.PROTOCOL_TLSv1) | |
| s.connect((HOST, server.port)) | |
| try: | |
| # helper methods for standardising recv* method signatures | |
| def _recv_into(): | |
| b = bytearray("\0"*100) | |
| count = s.recv_into(b) | |
| return b[:count] | |
| def _recvfrom_into(): | |
| b = bytearray("\0"*100) | |
| count, addr = s.recvfrom_into(b) | |
| return b[:count] | |
| # (name, method, whether to expect success, *args) | |
| send_methods = [ | |
| ('send', s.send, True, []), | |
| ('sendto', s.sendto, False, ["some.address"]), | |
| ('sendall', s.sendall, True, []), | |
| ] | |
| recv_methods = [ | |
| ('recv', s.recv, True, []), | |
| ('recvfrom', s.recvfrom, False, ["some.address"]), | |
| ('recv_into', _recv_into, True, []), | |
| ('recvfrom_into', _recvfrom_into, False, []), | |
| ] | |
| data_prefix = u"PREFIX_" | |
| for meth_name, send_meth, expect_success, args in send_methods: | |
| indata = data_prefix + meth_name | |
| try: | |
| send_meth(indata.encode('ASCII', 'strict'), *args) | |
| outdata = s.read() | |
| outdata = outdata.decode('ASCII', 'strict') | |
| if outdata != indata.lower(): | |
| self.fail( | |
| "While sending with <<%s>> bad data " | |
| "<<%r>> (%d) received; " | |
| "expected <<%r>> (%d)\n" % ( | |
| meth_name, outdata[:20], len(outdata), | |
| indata[:20], len(indata) | |
| ) | |
| ) | |
| except ValueError as e: | |
| if expect_success: | |
| self.fail( | |
| "Failed to send with method <<%s>>; " | |
| "expected to succeed.\n" % (meth_name,) | |
| ) | |
| if not str(e).startswith(meth_name): | |
| self.fail( | |
| "Method <<%s>> failed with unexpected " | |
| "exception message: %s\n" % ( | |
| meth_name, e | |
| ) | |
| ) | |
| for meth_name, recv_meth, expect_success, args in recv_methods: | |
| indata = data_prefix + meth_name | |
| try: | |
| s.send(indata.encode('ASCII', 'strict')) | |
| outdata = recv_meth(*args) | |
| outdata = outdata.decode('ASCII', 'strict') | |
| if outdata != indata.lower(): | |
| self.fail( | |
| "While receiving with <<%s>> bad data " | |
| "<<%r>> (%d) received; " | |
| "expected <<%r>> (%d)\n" % ( | |
| meth_name, outdata[:20], len(outdata), | |
| indata[:20], len(indata) | |
| ) | |
| ) | |
| except ValueError as e: | |
| if expect_success: | |
| self.fail( | |
| "Failed to receive with method <<%s>>; " | |
| "expected to succeed.\n" % (meth_name,) | |
| ) | |
| if not str(e).startswith(meth_name): | |
| self.fail( | |
| "Method <<%s>> failed with unexpected " | |
| "exception message: %s\n" % ( | |
| meth_name, e | |
| ) | |
| ) | |
| # consume data | |
| s.read() | |
| s.write("over\n".encode("ASCII", "strict")) | |
| s.close() | |
| finally: | |
| server.stop() | |
| server.join() | |
| def test_handshake_timeout(self): | |
| # Issue #5103: SSL handshake must respect the socket timeout | |
| server = socket.socket(socket.AF_INET) | |
| host = "127.0.0.1" | |
| port = test_support.bind_port(server) | |
| started = threading.Event() | |
| finish = False | |
| def serve(): | |
| server.listen(5) | |
| started.set() | |
| conns = [] | |
| while not finish: | |
| r, w, e = select.select([server], [], [], 0.1) | |
| if server in r: | |
| # Let the socket hang around rather than having | |
| # it closed by garbage collection. | |
| conns.append(server.accept()[0]) | |
| t = threading.Thread(target=serve) | |
| t.start() | |
| started.wait() | |
| try: | |
| try: | |
| c = socket.socket(socket.AF_INET) | |
| c.settimeout(0.2) | |
| c.connect((host, port)) | |
| # Will attempt handshake and time out | |
| self.assertRaisesRegexp(ssl.SSLError, "timed out", | |
| ssl.wrap_socket, c) | |
| finally: | |
| c.close() | |
| try: | |
| c = socket.socket(socket.AF_INET) | |
| c.settimeout(0.2) | |
| c = ssl.wrap_socket(c) | |
| # Will attempt handshake and time out | |
| self.assertRaisesRegexp(ssl.SSLError, "timed out", | |
| c.connect, (host, port)) | |
| finally: | |
| c.close() | |
| finally: | |
| finish = True | |
| t.join() | |
| server.close() | |
| def test_main(verbose=False): | |
| global CERTFILE, SVN_PYTHON_ORG_ROOT_CERT | |
| CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, | |
| "keycert.pem") | |
| SVN_PYTHON_ORG_ROOT_CERT = os.path.join( | |
| os.path.dirname(__file__) or os.curdir, | |
| "https_svn_python_org_root.pem") | |
| if (not os.path.exists(CERTFILE) or | |
| not os.path.exists(SVN_PYTHON_ORG_ROOT_CERT)): | |
| raise test_support.TestFailed("Can't read certificate files!") | |
| tests = [BasicTests, BasicSocketTests] | |
| if test_support.is_resource_enabled('network'): | |
| tests.append(NetworkedTests) | |
| if _have_threads: | |
| thread_info = test_support.threading_setup() | |
| if thread_info and test_support.is_resource_enabled('network'): | |
| tests.append(ThreadedTests) | |
| try: | |
| test_support.run_unittest(*tests) | |
| finally: | |
| if _have_threads: | |
| test_support.threading_cleanup(*thread_info) | |
| if __name__ == "__main__": | |
| test_main() |