| """Test script for ftplib module.""" | |
| # Modified by Giampaolo Rodola' to test FTP class, IPv6 and TLS | |
| # environment | |
| import ftplib | |
| import asyncore | |
| import asynchat | |
| import socket | |
| import StringIO | |
| import errno | |
| import os | |
| try: | |
| import ssl | |
| except ImportError: | |
| ssl = None | |
| from unittest import TestCase | |
| from test import test_support | |
| from test.test_support import HOST | |
| threading = test_support.import_module('threading') | |
| # the dummy data returned by server over the data channel when | |
| # RETR, LIST and NLST commands are issued | |
| RETR_DATA = 'abcde12345\r\n' * 1000 | |
| LIST_DATA = 'foo\r\nbar\r\n' | |
| NLST_DATA = 'foo\r\nbar\r\n' | |
| class DummyDTPHandler(asynchat.async_chat): | |
| dtp_conn_closed = False | |
| def __init__(self, conn, baseclass): | |
| asynchat.async_chat.__init__(self, conn) | |
| self.baseclass = baseclass | |
| self.baseclass.last_received_data = '' | |
| def handle_read(self): | |
| self.baseclass.last_received_data += self.recv(1024) | |
| def handle_close(self): | |
| # XXX: this method can be called many times in a row for a single | |
| # connection, including in clear-text (non-TLS) mode. | |
| # (behaviour witnessed with test_data_connection) | |
| if not self.dtp_conn_closed: | |
| self.baseclass.push('226 transfer complete') | |
| self.close() | |
| self.dtp_conn_closed = True | |
| def handle_error(self): | |
| raise | |
| class DummyFTPHandler(asynchat.async_chat): | |
| dtp_handler = DummyDTPHandler | |
| def __init__(self, conn): | |
| asynchat.async_chat.__init__(self, conn) | |
| self.set_terminator("\r\n") | |
| self.in_buffer = [] | |
| self.dtp = None | |
| self.last_received_cmd = None | |
| self.last_received_data = '' | |
| self.next_response = '' | |
| self.rest = None | |
| self.push('220 welcome') | |
| def collect_incoming_data(self, data): | |
| self.in_buffer.append(data) | |
| def found_terminator(self): | |
| line = ''.join(self.in_buffer) | |
| self.in_buffer = [] | |
| if self.next_response: | |
| self.push(self.next_response) | |
| self.next_response = '' | |
| cmd = line.split(' ')[0].lower() | |
| self.last_received_cmd = cmd | |
| space = line.find(' ') | |
| if space != -1: | |
| arg = line[space + 1:] | |
| else: | |
| arg = "" | |
| if hasattr(self, 'cmd_' + cmd): | |
| method = getattr(self, 'cmd_' + cmd) | |
| method(arg) | |
| else: | |
| self.push('550 command "%s" not understood.' %cmd) | |
| def handle_error(self): | |
| raise | |
| def push(self, data): | |
| asynchat.async_chat.push(self, data + '\r\n') | |
| def cmd_port(self, arg): | |
| addr = map(int, arg.split(',')) | |
| ip = '%d.%d.%d.%d' %tuple(addr[:4]) | |
| port = (addr[4] * 256) + addr[5] | |
| s = socket.create_connection((ip, port), timeout=10) | |
| self.dtp = self.dtp_handler(s, baseclass=self) | |
| self.push('200 active data connection established') | |
| def cmd_pasv(self, arg): | |
| sock = socket.socket() | |
| sock.bind((self.socket.getsockname()[0], 0)) | |
| sock.listen(5) | |
| sock.settimeout(10) | |
| ip, port = sock.getsockname()[:2] | |
| ip = ip.replace('.', ',') | |
| p1, p2 = divmod(port, 256) | |
| self.push('227 entering passive mode (%s,%d,%d)' %(ip, p1, p2)) | |
| conn, addr = sock.accept() | |
| self.dtp = self.dtp_handler(conn, baseclass=self) | |
| def cmd_eprt(self, arg): | |
| af, ip, port = arg.split(arg[0])[1:-1] | |
| port = int(port) | |
| s = socket.create_connection((ip, port), timeout=10) | |
| self.dtp = self.dtp_handler(s, baseclass=self) | |
| self.push('200 active data connection established') | |
| def cmd_epsv(self, arg): | |
| sock = socket.socket(socket.AF_INET6) | |
| sock.bind((self.socket.getsockname()[0], 0)) | |
| sock.listen(5) | |
| sock.settimeout(10) | |
| port = sock.getsockname()[1] | |
| self.push('229 entering extended passive mode (|||%d|)' %port) | |
| conn, addr = sock.accept() | |
| self.dtp = self.dtp_handler(conn, baseclass=self) | |
| def cmd_echo(self, arg): | |
| # sends back the received string (used by the test suite) | |
| self.push(arg) | |
| def cmd_user(self, arg): | |
| self.push('331 username ok') | |
| def cmd_pass(self, arg): | |
| self.push('230 password ok') | |
| def cmd_acct(self, arg): | |
| self.push('230 acct ok') | |
| def cmd_rnfr(self, arg): | |
| self.push('350 rnfr ok') | |
| def cmd_rnto(self, arg): | |
| self.push('250 rnto ok') | |
| def cmd_dele(self, arg): | |
| self.push('250 dele ok') | |
| def cmd_cwd(self, arg): | |
| self.push('250 cwd ok') | |
| def cmd_size(self, arg): | |
| self.push('250 1000') | |
| def cmd_mkd(self, arg): | |
| self.push('257 "%s"' %arg) | |
| def cmd_rmd(self, arg): | |
| self.push('250 rmd ok') | |
| def cmd_pwd(self, arg): | |
| self.push('257 "pwd ok"') | |
| def cmd_type(self, arg): | |
| self.push('200 type ok') | |
| def cmd_quit(self, arg): | |
| self.push('221 quit ok') | |
| self.close() | |
| def cmd_stor(self, arg): | |
| self.push('125 stor ok') | |
| def cmd_rest(self, arg): | |
| self.rest = arg | |
| self.push('350 rest ok') | |
| def cmd_retr(self, arg): | |
| self.push('125 retr ok') | |
| if self.rest is not None: | |
| offset = int(self.rest) | |
| else: | |
| offset = 0 | |
| self.dtp.push(RETR_DATA[offset:]) | |
| self.dtp.close_when_done() | |
| self.rest = None | |
| def cmd_list(self, arg): | |
| self.push('125 list ok') | |
| self.dtp.push(LIST_DATA) | |
| self.dtp.close_when_done() | |
| def cmd_nlst(self, arg): | |
| self.push('125 nlst ok') | |
| self.dtp.push(NLST_DATA) | |
| self.dtp.close_when_done() | |
| class DummyFTPServer(asyncore.dispatcher, threading.Thread): | |
| handler = DummyFTPHandler | |
| def __init__(self, address, af=socket.AF_INET): | |
| threading.Thread.__init__(self) | |
| asyncore.dispatcher.__init__(self) | |
| self.create_socket(af, socket.SOCK_STREAM) | |
| self.bind(address) | |
| self.listen(5) | |
| self.active = False | |
| self.active_lock = threading.Lock() | |
| self.host, self.port = self.socket.getsockname()[:2] | |
| def start(self): | |
| assert not self.active | |
| self.__flag = threading.Event() | |
| threading.Thread.start(self) | |
| self.__flag.wait() | |
| def run(self): | |
| self.active = True | |
| self.__flag.set() | |
| while self.active and asyncore.socket_map: | |
| self.active_lock.acquire() | |
| asyncore.loop(timeout=0.1, count=1) | |
| self.active_lock.release() | |
| asyncore.close_all(ignore_all=True) | |
| def stop(self): | |
| assert self.active | |
| self.active = False | |
| self.join() | |
| def handle_accept(self): | |
| conn, addr = self.accept() | |
| self.handler = self.handler(conn) | |
| self.close() | |
| def handle_connect(self): | |
| self.close() | |
| handle_read = handle_connect | |
| def writable(self): | |
| return 0 | |
| def handle_error(self): | |
| raise | |
| if ssl is not None: | |
| CERTFILE = os.path.join(os.path.dirname(__file__), "keycert.pem") | |
| class SSLConnection(object, asyncore.dispatcher): | |
| """An asyncore.dispatcher subclass supporting TLS/SSL.""" | |
| _ssl_accepting = False | |
| _ssl_closing = False | |
| def secure_connection(self): | |
| self.socket = ssl.wrap_socket(self.socket, suppress_ragged_eofs=False, | |
| certfile=CERTFILE, server_side=True, | |
| do_handshake_on_connect=False, | |
| ssl_version=ssl.PROTOCOL_SSLv23) | |
| self._ssl_accepting = True | |
| def _do_ssl_handshake(self): | |
| try: | |
| self.socket.do_handshake() | |
| except ssl.SSLError, err: | |
| if err.args[0] in (ssl.SSL_ERROR_WANT_READ, | |
| ssl.SSL_ERROR_WANT_WRITE): | |
| return | |
| elif err.args[0] == ssl.SSL_ERROR_EOF: | |
| return self.handle_close() | |
| raise | |
| except socket.error, err: | |
| if err.args[0] == errno.ECONNABORTED: | |
| return self.handle_close() | |
| else: | |
| self._ssl_accepting = False | |
| def _do_ssl_shutdown(self): | |
| self._ssl_closing = True | |
| try: | |
| self.socket = self.socket.unwrap() | |
| except ssl.SSLError, err: | |
| if err.args[0] in (ssl.SSL_ERROR_WANT_READ, | |
| ssl.SSL_ERROR_WANT_WRITE): | |
| return | |
| except socket.error, err: | |
| # Any "socket error" corresponds to a SSL_ERROR_SYSCALL return | |
| # from OpenSSL's SSL_shutdown(), corresponding to a | |
| # closed socket condition. See also: | |
| # http://www.mail-archive.com/openssl-users@openssl.org/msg60710.html | |
| pass | |
| self._ssl_closing = False | |
| super(SSLConnection, self).close() | |
| def handle_read_event(self): | |
| if self._ssl_accepting: | |
| self._do_ssl_handshake() | |
| elif self._ssl_closing: | |
| self._do_ssl_shutdown() | |
| else: | |
| super(SSLConnection, self).handle_read_event() | |
| def handle_write_event(self): | |
| if self._ssl_accepting: | |
| self._do_ssl_handshake() | |
| elif self._ssl_closing: | |
| self._do_ssl_shutdown() | |
| else: | |
| super(SSLConnection, self).handle_write_event() | |
| def send(self, data): | |
| try: | |
| return super(SSLConnection, self).send(data) | |
| except ssl.SSLError, err: | |
| if err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN, | |
| ssl.SSL_ERROR_WANT_READ, | |
| ssl.SSL_ERROR_WANT_WRITE): | |
| return 0 | |
| raise | |
| def recv(self, buffer_size): | |
| try: | |
| return super(SSLConnection, self).recv(buffer_size) | |
| except ssl.SSLError, err: | |
| if err.args[0] in (ssl.SSL_ERROR_WANT_READ, | |
| ssl.SSL_ERROR_WANT_WRITE): | |
| return '' | |
| if err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN): | |
| self.handle_close() | |
| return '' | |
| raise | |
| def handle_error(self): | |
| raise | |
| def close(self): | |
| if (isinstance(self.socket, ssl.SSLSocket) and | |
| self.socket._sslobj is not None): | |
| self._do_ssl_shutdown() | |
| class DummyTLS_DTPHandler(SSLConnection, DummyDTPHandler): | |
| """A DummyDTPHandler subclass supporting TLS/SSL.""" | |
| def __init__(self, conn, baseclass): | |
| DummyDTPHandler.__init__(self, conn, baseclass) | |
| if self.baseclass.secure_data_channel: | |
| self.secure_connection() | |
| class DummyTLS_FTPHandler(SSLConnection, DummyFTPHandler): | |
| """A DummyFTPHandler subclass supporting TLS/SSL.""" | |
| dtp_handler = DummyTLS_DTPHandler | |
| def __init__(self, conn): | |
| DummyFTPHandler.__init__(self, conn) | |
| self.secure_data_channel = False | |
| def cmd_auth(self, line): | |
| """Set up secure control channel.""" | |
| self.push('234 AUTH TLS successful') | |
| self.secure_connection() | |
| def cmd_pbsz(self, line): | |
| """Negotiate size of buffer for secure data transfer. | |
| For TLS/SSL the only valid value for the parameter is '0'. | |
| Any other value is accepted but ignored. | |
| """ | |
| self.push('200 PBSZ=0 successful.') | |
| def cmd_prot(self, line): | |
| """Setup un/secure data channel.""" | |
| arg = line.upper() | |
| if arg == 'C': | |
| self.push('200 Protection set to Clear') | |
| self.secure_data_channel = False | |
| elif arg == 'P': | |
| self.push('200 Protection set to Private') | |
| self.secure_data_channel = True | |
| else: | |
| self.push("502 Unrecognized PROT type (use C or P).") | |
| class DummyTLS_FTPServer(DummyFTPServer): | |
| handler = DummyTLS_FTPHandler | |
| class TestFTPClass(TestCase): | |
| def setUp(self): | |
| self.server = DummyFTPServer((HOST, 0)) | |
| self.server.start() | |
| self.client = ftplib.FTP(timeout=10) | |
| self.client.connect(self.server.host, self.server.port) | |
| def tearDown(self): | |
| self.client.close() | |
| self.server.stop() | |
| def test_getwelcome(self): | |
| self.assertEqual(self.client.getwelcome(), '220 welcome') | |
| def test_sanitize(self): | |
| self.assertEqual(self.client.sanitize('foo'), repr('foo')) | |
| self.assertEqual(self.client.sanitize('pass 12345'), repr('pass *****')) | |
| self.assertEqual(self.client.sanitize('PASS 12345'), repr('PASS *****')) | |
| def test_exceptions(self): | |
| self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'echo 400') | |
| self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'echo 499') | |
| self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'echo 500') | |
| self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'echo 599') | |
| self.assertRaises(ftplib.error_proto, self.client.sendcmd, 'echo 999') | |
| def test_all_errors(self): | |
| exceptions = (ftplib.error_reply, ftplib.error_temp, ftplib.error_perm, | |
| ftplib.error_proto, ftplib.Error, IOError, EOFError) | |
| for x in exceptions: | |
| try: | |
| raise x('exception not included in all_errors set') | |
| except ftplib.all_errors: | |
| pass | |
| def test_set_pasv(self): | |
| # passive mode is supposed to be enabled by default | |
| self.assertTrue(self.client.passiveserver) | |
| self.client.set_pasv(True) | |
| self.assertTrue(self.client.passiveserver) | |
| self.client.set_pasv(False) | |
| self.assertFalse(self.client.passiveserver) | |
| def test_voidcmd(self): | |
| self.client.voidcmd('echo 200') | |
| self.client.voidcmd('echo 299') | |
| self.assertRaises(ftplib.error_reply, self.client.voidcmd, 'echo 199') | |
| self.assertRaises(ftplib.error_reply, self.client.voidcmd, 'echo 300') | |
| def test_login(self): | |
| self.client.login() | |
| def test_acct(self): | |
| self.client.acct('passwd') | |
| def test_rename(self): | |
| self.client.rename('a', 'b') | |
| self.server.handler.next_response = '200' | |
| self.assertRaises(ftplib.error_reply, self.client.rename, 'a', 'b') | |
| def test_delete(self): | |
| self.client.delete('foo') | |
| self.server.handler.next_response = '199' | |
| self.assertRaises(ftplib.error_reply, self.client.delete, 'foo') | |
| def test_size(self): | |
| self.client.size('foo') | |
| def test_mkd(self): | |
| dir = self.client.mkd('/foo') | |
| self.assertEqual(dir, '/foo') | |
| def test_rmd(self): | |
| self.client.rmd('foo') | |
| def test_pwd(self): | |
| dir = self.client.pwd() | |
| self.assertEqual(dir, 'pwd ok') | |
| def test_quit(self): | |
| self.assertEqual(self.client.quit(), '221 quit ok') | |
| # Ensure the connection gets closed; sock attribute should be None | |
| self.assertEqual(self.client.sock, None) | |
| def test_retrbinary(self): | |
| received = [] | |
| self.client.retrbinary('retr', received.append) | |
| self.assertEqual(''.join(received), RETR_DATA) | |
| def test_retrbinary_rest(self): | |
| for rest in (0, 10, 20): | |
| received = [] | |
| self.client.retrbinary('retr', received.append, rest=rest) | |
| self.assertEqual(''.join(received), RETR_DATA[rest:], | |
| msg='rest test case %d %d %d' % (rest, | |
| len(''.join(received)), | |
| len(RETR_DATA[rest:]))) | |
| def test_retrlines(self): | |
| received = [] | |
| self.client.retrlines('retr', received.append) | |
| self.assertEqual(''.join(received), RETR_DATA.replace('\r\n', '')) | |
| def test_storbinary(self): | |
| f = StringIO.StringIO(RETR_DATA) | |
| self.client.storbinary('stor', f) | |
| self.assertEqual(self.server.handler.last_received_data, RETR_DATA) | |
| # test new callback arg | |
| flag = [] | |
| f.seek(0) | |
| self.client.storbinary('stor', f, callback=lambda x: flag.append(None)) | |
| self.assertTrue(flag) | |
| def test_storbinary_rest(self): | |
| f = StringIO.StringIO(RETR_DATA) | |
| for r in (30, '30'): | |
| f.seek(0) | |
| self.client.storbinary('stor', f, rest=r) | |
| self.assertEqual(self.server.handler.rest, str(r)) | |
| def test_storlines(self): | |
| f = StringIO.StringIO(RETR_DATA.replace('\r\n', '\n')) | |
| self.client.storlines('stor', f) | |
| self.assertEqual(self.server.handler.last_received_data, RETR_DATA) | |
| # test new callback arg | |
| flag = [] | |
| f.seek(0) | |
| self.client.storlines('stor foo', f, callback=lambda x: flag.append(None)) | |
| self.assertTrue(flag) | |
| def test_nlst(self): | |
| self.client.nlst() | |
| self.assertEqual(self.client.nlst(), NLST_DATA.split('\r\n')[:-1]) | |
| def test_dir(self): | |
| l = [] | |
| self.client.dir(lambda x: l.append(x)) | |
| self.assertEqual(''.join(l), LIST_DATA.replace('\r\n', '')) | |
| def test_makeport(self): | |
| self.client.makeport() | |
| # IPv4 is in use, just make sure send_eprt has not been used | |
| self.assertEqual(self.server.handler.last_received_cmd, 'port') | |
| def test_makepasv(self): | |
| host, port = self.client.makepasv() | |
| conn = socket.create_connection((host, port), 10) | |
| conn.close() | |
| # IPv4 is in use, just make sure send_epsv has not been used | |
| self.assertEqual(self.server.handler.last_received_cmd, 'pasv') | |
| class TestIPv6Environment(TestCase): | |
| def setUp(self): | |
| self.server = DummyFTPServer((HOST, 0), af=socket.AF_INET6) | |
| self.server.start() | |
| self.client = ftplib.FTP() | |
| self.client.connect(self.server.host, self.server.port) | |
| def tearDown(self): | |
| self.client.close() | |
| self.server.stop() | |
| def test_af(self): | |
| self.assertEqual(self.client.af, socket.AF_INET6) | |
| def test_makeport(self): | |
| self.client.makeport() | |
| self.assertEqual(self.server.handler.last_received_cmd, 'eprt') | |
| def test_makepasv(self): | |
| host, port = self.client.makepasv() | |
| conn = socket.create_connection((host, port), 10) | |
| conn.close() | |
| self.assertEqual(self.server.handler.last_received_cmd, 'epsv') | |
| def test_transfer(self): | |
| def retr(): | |
| received = [] | |
| self.client.retrbinary('retr', received.append) | |
| self.assertEqual(''.join(received), RETR_DATA) | |
| self.client.set_pasv(True) | |
| retr() | |
| self.client.set_pasv(False) | |
| retr() | |
| class TestTLS_FTPClassMixin(TestFTPClass): | |
| """Repeat TestFTPClass tests starting the TLS layer for both control | |
| and data connections first. | |
| """ | |
| def setUp(self): | |
| self.server = DummyTLS_FTPServer((HOST, 0)) | |
| self.server.start() | |
| self.client = ftplib.FTP_TLS(timeout=10) | |
| self.client.connect(self.server.host, self.server.port) | |
| # enable TLS | |
| self.client.auth() | |
| self.client.prot_p() | |
| class TestTLS_FTPClass(TestCase): | |
| """Specific TLS_FTP class tests.""" | |
| def setUp(self): | |
| self.server = DummyTLS_FTPServer((HOST, 0)) | |
| self.server.start() | |
| self.client = ftplib.FTP_TLS(timeout=10) | |
| self.client.connect(self.server.host, self.server.port) | |
| def tearDown(self): | |
| self.client.close() | |
| self.server.stop() | |
| def test_control_connection(self): | |
| self.assertNotIsInstance(self.client.sock, ssl.SSLSocket) | |
| self.client.auth() | |
| self.assertIsInstance(self.client.sock, ssl.SSLSocket) | |
| def test_data_connection(self): | |
| # clear text | |
| sock = self.client.transfercmd('list') | |
| self.assertNotIsInstance(sock, ssl.SSLSocket) | |
| sock.close() | |
| self.assertEqual(self.client.voidresp(), "226 transfer complete") | |
| # secured, after PROT P | |
| self.client.prot_p() | |
| sock = self.client.transfercmd('list') | |
| self.assertIsInstance(sock, ssl.SSLSocket) | |
| sock.close() | |
| self.assertEqual(self.client.voidresp(), "226 transfer complete") | |
| # PROT C is issued, the connection must be in cleartext again | |
| self.client.prot_c() | |
| sock = self.client.transfercmd('list') | |
| self.assertNotIsInstance(sock, ssl.SSLSocket) | |
| sock.close() | |
| self.assertEqual(self.client.voidresp(), "226 transfer complete") | |
| def test_login(self): | |
| # login() is supposed to implicitly secure the control connection | |
| self.assertNotIsInstance(self.client.sock, ssl.SSLSocket) | |
| self.client.login() | |
| self.assertIsInstance(self.client.sock, ssl.SSLSocket) | |
| # make sure that AUTH TLS doesn't get issued again | |
| self.client.login() | |
| def test_auth_issued_twice(self): | |
| self.client.auth() | |
| self.assertRaises(ValueError, self.client.auth) | |
| def test_auth_ssl(self): | |
| try: | |
| self.client.ssl_version = ssl.PROTOCOL_SSLv3 | |
| self.client.auth() | |
| self.assertRaises(ValueError, self.client.auth) | |
| finally: | |
| self.client.ssl_version = ssl.PROTOCOL_TLSv1 | |
| class TestTimeouts(TestCase): | |
| def setUp(self): | |
| self.evt = threading.Event() | |
| self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| self.sock.settimeout(3) | |
| self.port = test_support.bind_port(self.sock) | |
| threading.Thread(target=self.server, args=(self.evt,self.sock)).start() | |
| # Wait for the server to be ready. | |
| self.evt.wait() | |
| self.evt.clear() | |
| ftplib.FTP.port = self.port | |
| def tearDown(self): | |
| self.evt.wait() | |
| def server(self, evt, serv): | |
| # This method sets the evt 3 times: | |
| # 1) when the connection is ready to be accepted. | |
| # 2) when it is safe for the caller to close the connection | |
| # 3) when we have closed the socket | |
| serv.listen(5) | |
| # (1) Signal the caller that we are ready to accept the connection. | |
| evt.set() | |
| try: | |
| conn, addr = serv.accept() | |
| except socket.timeout: | |
| pass | |
| else: | |
| conn.send("1 Hola mundo\n") | |
| # (2) Signal the caller that it is safe to close the socket. | |
| evt.set() | |
| conn.close() | |
| finally: | |
| serv.close() | |
| # (3) Signal the caller that we are done. | |
| evt.set() | |
| def testTimeoutDefault(self): | |
| # default -- use global socket timeout | |
| self.assertTrue(socket.getdefaulttimeout() is None) | |
| socket.setdefaulttimeout(30) | |
| try: | |
| ftp = ftplib.FTP("localhost") | |
| finally: | |
| socket.setdefaulttimeout(None) | |
| self.assertEqual(ftp.sock.gettimeout(), 30) | |
| self.evt.wait() | |
| ftp.close() | |
| def testTimeoutNone(self): | |
| # no timeout -- do not use global socket timeout | |
| self.assertTrue(socket.getdefaulttimeout() is None) | |
| socket.setdefaulttimeout(30) | |
| try: | |
| ftp = ftplib.FTP("localhost", timeout=None) | |
| finally: | |
| socket.setdefaulttimeout(None) | |
| self.assertTrue(ftp.sock.gettimeout() is None) | |
| self.evt.wait() | |
| ftp.close() | |
| def testTimeoutValue(self): | |
| # a value | |
| ftp = ftplib.FTP(HOST, timeout=30) | |
| self.assertEqual(ftp.sock.gettimeout(), 30) | |
| self.evt.wait() | |
| ftp.close() | |
| def testTimeoutConnect(self): | |
| ftp = ftplib.FTP() | |
| ftp.connect(HOST, timeout=30) | |
| self.assertEqual(ftp.sock.gettimeout(), 30) | |
| self.evt.wait() | |
| ftp.close() | |
| def testTimeoutDifferentOrder(self): | |
| ftp = ftplib.FTP(timeout=30) | |
| ftp.connect(HOST) | |
| self.assertEqual(ftp.sock.gettimeout(), 30) | |
| self.evt.wait() | |
| ftp.close() | |
| def testTimeoutDirectAccess(self): | |
| ftp = ftplib.FTP() | |
| ftp.timeout = 30 | |
| ftp.connect(HOST) | |
| self.assertEqual(ftp.sock.gettimeout(), 30) | |
| self.evt.wait() | |
| ftp.close() | |
| def test_main(): | |
| tests = [TestFTPClass, TestTimeouts] | |
| if socket.has_ipv6: | |
| try: | |
| DummyFTPServer((HOST, 0), af=socket.AF_INET6) | |
| except socket.error: | |
| pass | |
| else: | |
| tests.append(TestIPv6Environment) | |
| if ssl is not None: | |
| tests.extend([TestTLS_FTPClassMixin, TestTLS_FTPClass]) | |
| thread_info = test_support.threading_setup() | |
| try: | |
| test_support.run_unittest(*tests) | |
| finally: | |
| test_support.threading_cleanup(*thread_info) | |
| if __name__ == '__main__': | |
| test_main() |