| """Test script for poplib module.""" | |
| # Modified by Giampaolo Rodola' to give poplib.POP3 and poplib.POP3_SSL | |
| # a real test suite | |
| import poplib | |
| import asyncore | |
| import asynchat | |
| import socket | |
| import os | |
| import time | |
| import errno | |
| from unittest import TestCase | |
| from test import test_support | |
| from test.test_support import HOST | |
| threading = test_support.import_module('threading') | |
| # the dummy data returned by server when LIST and RETR commands are issued | |
| LIST_RESP = '1 1\r\n2 2\r\n3 3\r\n4 4\r\n5 5\r\n.\r\n' | |
| RETR_RESP = """From: postmaster@python.org\ | |
| \r\nContent-Type: text/plain\r\n\ | |
| MIME-Version: 1.0\r\n\ | |
| Subject: Dummy\r\n\ | |
| \r\n\ | |
| line1\r\n\ | |
| line2\r\n\ | |
| line3\r\n\ | |
| .\r\n""" | |
| class DummyPOP3Handler(asynchat.async_chat): | |
| def __init__(self, conn): | |
| asynchat.async_chat.__init__(self, conn) | |
| self.set_terminator("\r\n") | |
| self.in_buffer = [] | |
| self.push('+OK dummy pop3 server ready.') | |
| def collect_incoming_data(self, data): | |
| self.in_buffer.append(data) | |
| def found_terminator(self): | |
| line = ''.join(self.in_buffer) | |
| self.in_buffer = [] | |
| cmd = line.split(' ')[0].lower() | |
| space = line.find(' ') | |
| if space != -1: | |
| arg = line[space + 1:] | |
| else: | |
| arg = "" | |
| if hasattr(self, 'cmd_' + cmd): | |
| method = getattr(self, 'cmd_' + cmd) | |
| method(arg) | |
| else: | |
| self.push('-ERR unrecognized POP3 command "%s".' %cmd) | |
| def handle_error(self): | |
| raise | |
| def push(self, data): | |
| asynchat.async_chat.push(self, data + '\r\n') | |
| def cmd_echo(self, arg): | |
| # sends back the received string (used by the test suite) | |
| self.push(arg) | |
| def cmd_user(self, arg): | |
| if arg != "guido": | |
| self.push("-ERR no such user") | |
| self.push('+OK password required') | |
| def cmd_pass(self, arg): | |
| if arg != "python": | |
| self.push("-ERR wrong password") | |
| self.push('+OK 10 messages') | |
| def cmd_stat(self, arg): | |
| self.push('+OK 10 100') | |
| def cmd_list(self, arg): | |
| if arg: | |
| self.push('+OK %s %s' %(arg, arg)) | |
| else: | |
| self.push('+OK') | |
| asynchat.async_chat.push(self, LIST_RESP) | |
| cmd_uidl = cmd_list | |
| def cmd_retr(self, arg): | |
| self.push('+OK %s bytes' %len(RETR_RESP)) | |
| asynchat.async_chat.push(self, RETR_RESP) | |
| cmd_top = cmd_retr | |
| def cmd_dele(self, arg): | |
| self.push('+OK message marked for deletion.') | |
| def cmd_noop(self, arg): | |
| self.push('+OK done nothing.') | |
| def cmd_rpop(self, arg): | |
| self.push('+OK done nothing.') | |
| class DummyPOP3Server(asyncore.dispatcher, threading.Thread): | |
| handler = DummyPOP3Handler | |
| def __init__(self, address, af=socket.AF_INET): | |
| threading.Thread.__init__(self) | |
| asyncore.dispatcher.__init__(self) | |
| self.create_socket(af, socket.SOCK_STREAM) | |
| self.bind(address) | |
| self.listen(5) | |
| self.active = False | |
| self.active_lock = threading.Lock() | |
| self.host, self.port = self.socket.getsockname()[:2] | |
| def start(self): | |
| assert not self.active | |
| self.__flag = threading.Event() | |
| threading.Thread.start(self) | |
| self.__flag.wait() | |
| def run(self): | |
| self.active = True | |
| self.__flag.set() | |
| while self.active and asyncore.socket_map: | |
| self.active_lock.acquire() | |
| asyncore.loop(timeout=0.1, count=1) | |
| self.active_lock.release() | |
| asyncore.close_all(ignore_all=True) | |
| def stop(self): | |
| assert self.active | |
| self.active = False | |
| self.join() | |
| def handle_accept(self): | |
| conn, addr = self.accept() | |
| self.handler = self.handler(conn) | |
| self.close() | |
| def handle_connect(self): | |
| self.close() | |
| handle_read = handle_connect | |
| def writable(self): | |
| return 0 | |
| def handle_error(self): | |
| raise | |
| class TestPOP3Class(TestCase): | |
| def assertOK(self, resp): | |
| self.assertTrue(resp.startswith("+OK")) | |
| def setUp(self): | |
| self.server = DummyPOP3Server((HOST, 0)) | |
| self.server.start() | |
| self.client = poplib.POP3(self.server.host, self.server.port) | |
| def tearDown(self): | |
| self.client.quit() | |
| self.server.stop() | |
| def test_getwelcome(self): | |
| self.assertEqual(self.client.getwelcome(), '+OK dummy pop3 server ready.') | |
| def test_exceptions(self): | |
| self.assertRaises(poplib.error_proto, self.client._shortcmd, 'echo -err') | |
| def test_user(self): | |
| self.assertOK(self.client.user('guido')) | |
| self.assertRaises(poplib.error_proto, self.client.user, 'invalid') | |
| def test_pass_(self): | |
| self.assertOK(self.client.pass_('python')) | |
| self.assertRaises(poplib.error_proto, self.client.user, 'invalid') | |
| def test_stat(self): | |
| self.assertEqual(self.client.stat(), (10, 100)) | |
| def test_list(self): | |
| self.assertEqual(self.client.list()[1:], | |
| (['1 1', '2 2', '3 3', '4 4', '5 5'], 25)) | |
| self.assertTrue(self.client.list('1').endswith("OK 1 1")) | |
| def test_retr(self): | |
| expected = ('+OK 116 bytes', | |
| ['From: postmaster@python.org', 'Content-Type: text/plain', | |
| 'MIME-Version: 1.0', 'Subject: Dummy', | |
| '', 'line1', 'line2', 'line3'], | |
| 113) | |
| self.assertEqual(self.client.retr('foo'), expected) | |
| def test_dele(self): | |
| self.assertOK(self.client.dele('foo')) | |
| def test_noop(self): | |
| self.assertOK(self.client.noop()) | |
| def test_rpop(self): | |
| self.assertOK(self.client.rpop('foo')) | |
| def test_top(self): | |
| expected = ('+OK 116 bytes', | |
| ['From: postmaster@python.org', 'Content-Type: text/plain', | |
| 'MIME-Version: 1.0', 'Subject: Dummy', '', | |
| 'line1', 'line2', 'line3'], | |
| 113) | |
| self.assertEqual(self.client.top(1, 1), expected) | |
| def test_uidl(self): | |
| self.client.uidl() | |
| self.client.uidl('foo') | |
| SUPPORTS_SSL = False | |
| if hasattr(poplib, 'POP3_SSL'): | |
| import ssl | |
| SUPPORTS_SSL = True | |
| CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "keycert.pem") | |
| class DummyPOP3_SSLHandler(DummyPOP3Handler): | |
| def __init__(self, conn): | |
| asynchat.async_chat.__init__(self, conn) | |
| self.socket = ssl.wrap_socket(self.socket, certfile=CERTFILE, | |
| server_side=True, | |
| do_handshake_on_connect=False) | |
| # Must try handshake before calling push() | |
| self._ssl_accepting = True | |
| self._do_ssl_handshake() | |
| self.set_terminator("\r\n") | |
| self.in_buffer = [] | |
| self.push('+OK dummy pop3 server ready.') | |
| def _do_ssl_handshake(self): | |
| try: | |
| self.socket.do_handshake() | |
| except ssl.SSLError, err: | |
| if err.args[0] in (ssl.SSL_ERROR_WANT_READ, | |
| ssl.SSL_ERROR_WANT_WRITE): | |
| return | |
| elif err.args[0] == ssl.SSL_ERROR_EOF: | |
| return self.handle_close() | |
| raise | |
| except socket.error, err: | |
| if err.args[0] == errno.ECONNABORTED: | |
| return self.handle_close() | |
| else: | |
| self._ssl_accepting = False | |
| def handle_read(self): | |
| if self._ssl_accepting: | |
| self._do_ssl_handshake() | |
| else: | |
| DummyPOP3Handler.handle_read(self) | |
| class TestPOP3_SSLClass(TestPOP3Class): | |
| # repeat previous tests by using poplib.POP3_SSL | |
| def setUp(self): | |
| self.server = DummyPOP3Server((HOST, 0)) | |
| self.server.handler = DummyPOP3_SSLHandler | |
| self.server.start() | |
| self.client = poplib.POP3_SSL(self.server.host, self.server.port) | |
| def test__all__(self): | |
| self.assertIn('POP3_SSL', poplib.__all__) | |
| class TestTimeouts(TestCase): | |
| def setUp(self): | |
| self.evt = threading.Event() | |
| self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| self.sock.settimeout(3) | |
| self.port = test_support.bind_port(self.sock) | |
| threading.Thread(target=self.server, args=(self.evt,self.sock)).start() | |
| time.sleep(.1) | |
| def tearDown(self): | |
| self.evt.wait() | |
| def server(self, evt, serv): | |
| serv.listen(5) | |
| try: | |
| conn, addr = serv.accept() | |
| except socket.timeout: | |
| pass | |
| else: | |
| conn.send("+ Hola mundo\n") | |
| conn.close() | |
| finally: | |
| serv.close() | |
| evt.set() | |
| def testTimeoutDefault(self): | |
| self.assertTrue(socket.getdefaulttimeout() is None) | |
| socket.setdefaulttimeout(30) | |
| try: | |
| pop = poplib.POP3("localhost", self.port) | |
| finally: | |
| socket.setdefaulttimeout(None) | |
| self.assertEqual(pop.sock.gettimeout(), 30) | |
| pop.sock.close() | |
| def testTimeoutNone(self): | |
| self.assertTrue(socket.getdefaulttimeout() is None) | |
| socket.setdefaulttimeout(30) | |
| try: | |
| pop = poplib.POP3(HOST, self.port, timeout=None) | |
| finally: | |
| socket.setdefaulttimeout(None) | |
| self.assertTrue(pop.sock.gettimeout() is None) | |
| pop.sock.close() | |
| def testTimeoutValue(self): | |
| pop = poplib.POP3("localhost", self.port, timeout=30) | |
| self.assertEqual(pop.sock.gettimeout(), 30) | |
| pop.sock.close() | |
| def test_main(): | |
| tests = [TestPOP3Class, TestTimeouts] | |
| if SUPPORTS_SSL: | |
| tests.append(TestPOP3_SSLClass) | |
| thread_info = test_support.threading_setup() | |
| try: | |
| test_support.run_unittest(*tests) | |
| finally: | |
| test_support.threading_cleanup(*thread_info) | |
| if __name__ == '__main__': | |
| test_main() |