| """Generic socket server classes. | |
| This module tries to capture the various aspects of defining a server: | |
| For socket-based servers: | |
| - address family: | |
| - AF_INET{,6}: IP (Internet Protocol) sockets (default) | |
| - AF_UNIX: Unix domain sockets | |
| - others, e.g. AF_DECNET are conceivable (see <socket.h> | |
| - socket type: | |
| - SOCK_STREAM (reliable stream, e.g. TCP) | |
| - SOCK_DGRAM (datagrams, e.g. UDP) | |
| For request-based servers (including socket-based): | |
| - client address verification before further looking at the request | |
| (This is actually a hook for any processing that needs to look | |
| at the request before anything else, e.g. logging) | |
| - how to handle multiple requests: | |
| - synchronous (one request is handled at a time) | |
| - forking (each request is handled by a new process) | |
| - threading (each request is handled by a new thread) | |
| The classes in this module favor the server type that is simplest to | |
| write: a synchronous TCP/IP server. This is bad class design, but | |
| save some typing. (There's also the issue that a deep class hierarchy | |
| slows down method lookups.) | |
| There are five classes in an inheritance diagram, four of which represent | |
| synchronous servers of four types: | |
| +------------+ | |
| | BaseServer | | |
| +------------+ | |
| | | |
| v | |
| +-----------+ +------------------+ | |
| | TCPServer |------->| UnixStreamServer | | |
| +-----------+ +------------------+ | |
| | | |
| v | |
| +-----------+ +--------------------+ | |
| | UDPServer |------->| UnixDatagramServer | | |
| +-----------+ +--------------------+ | |
| Note that UnixDatagramServer derives from UDPServer, not from | |
| UnixStreamServer -- the only difference between an IP and a Unix | |
| stream server is the address family, which is simply repeated in both | |
| unix server classes. | |
| Forking and threading versions of each type of server can be created | |
| using the ForkingMixIn and ThreadingMixIn mix-in classes. For | |
| instance, a threading UDP server class is created as follows: | |
| class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass | |
| The Mix-in class must come first, since it overrides a method defined | |
| in UDPServer! Setting the various member variables also changes | |
| the behavior of the underlying server mechanism. | |
| To implement a service, you must derive a class from | |
| BaseRequestHandler and redefine its handle() method. You can then run | |
| various versions of the service by combining one of the server classes | |
| with your request handler class. | |
| The request handler class must be different for datagram or stream | |
| services. This can be hidden by using the request handler | |
| subclasses StreamRequestHandler or DatagramRequestHandler. | |
| Of course, you still have to use your head! | |
| For instance, it makes no sense to use a forking server if the service | |
| contains state in memory that can be modified by requests (since the | |
| modifications in the child process would never reach the initial state | |
| kept in the parent process and passed to each child). In this case, | |
| you can use a threading server, but you will probably have to use | |
| locks to avoid two requests that come in nearly simultaneous to apply | |
| conflicting changes to the server state. | |
| On the other hand, if you are building e.g. an HTTP server, where all | |
| data is stored externally (e.g. in the file system), a synchronous | |
| class will essentially render the service "deaf" while one request is | |
| being handled -- which may be for a very long time if a client is slow | |
| to reqd all the data it has requested. Here a threading or forking | |
| server is appropriate. | |
| In some cases, it may be appropriate to process part of a request | |
| synchronously, but to finish processing in a forked child depending on | |
| the request data. This can be implemented by using a synchronous | |
| server and doing an explicit fork in the request handler class | |
| handle() method. | |
| Another approach to handling multiple simultaneous requests in an | |
| environment that supports neither threads nor fork (or where these are | |
| too expensive or inappropriate for the service) is to maintain an | |
| explicit table of partially finished requests and to use select() to | |
| decide which request to work on next (or whether to handle a new | |
| incoming request). This is particularly important for stream services | |
| where each client can potentially be connected for a long time (if | |
| threads or subprocesses cannot be used). | |
| Future work: | |
| - Standard classes for Sun RPC (which uses either UDP or TCP) | |
| - Standard mix-in classes to implement various authentication | |
| and encryption schemes | |
| - Standard framework for select-based multiplexing | |
| XXX Open problems: | |
| - What to do with out-of-band data? | |
| BaseServer: | |
| - split generic "request" functionality out into BaseServer class. | |
| Copyright (C) 2000 Luke Kenneth Casson Leighton <lkcl@samba.org> | |
| example: read entries from a SQL database (requires overriding | |
| get_request() to return a table entry from the database). | |
| entry is processed by a RequestHandlerClass. | |
| """ | |
| # Author of the BaseServer patch: Luke Kenneth Casson Leighton | |
| # XXX Warning! | |
| # There is a test suite for this module, but it cannot be run by the | |
| # standard regression test. | |
| # To run it manually, run Lib/test/test_socketserver.py. | |
| __version__ = "0.4" | |
| import socket | |
| import select | |
| import sys | |
| import os | |
| try: | |
| import threading | |
| except ImportError: | |
| import dummy_threading as threading | |
| __all__ = ["TCPServer","UDPServer","ForkingUDPServer","ForkingTCPServer", | |
| "ThreadingUDPServer","ThreadingTCPServer","BaseRequestHandler", | |
| "StreamRequestHandler","DatagramRequestHandler", | |
| "ThreadingMixIn", "ForkingMixIn"] | |
| if hasattr(socket, "AF_UNIX"): | |
| __all__.extend(["UnixStreamServer","UnixDatagramServer", | |
| "ThreadingUnixStreamServer", | |
| "ThreadingUnixDatagramServer"]) | |
| class BaseServer: | |
| """Base class for server classes. | |
| Methods for the caller: | |
| - __init__(server_address, RequestHandlerClass) | |
| - serve_forever(poll_interval=0.5) | |
| - shutdown() | |
| - handle_request() # if you do not use serve_forever() | |
| - fileno() -> int # for select() | |
| Methods that may be overridden: | |
| - server_bind() | |
| - server_activate() | |
| - get_request() -> request, client_address | |
| - handle_timeout() | |
| - verify_request(request, client_address) | |
| - server_close() | |
| - process_request(request, client_address) | |
| - shutdown_request(request) | |
| - close_request(request) | |
| - handle_error() | |
| Methods for derived classes: | |
| - finish_request(request, client_address) | |
| Class variables that may be overridden by derived classes or | |
| instances: | |
| - timeout | |
| - address_family | |
| - socket_type | |
| - allow_reuse_address | |
| Instance variables: | |
| - RequestHandlerClass | |
| - socket | |
| """ | |
| timeout = None | |
| def __init__(self, server_address, RequestHandlerClass): | |
| """Constructor. May be extended, do not override.""" | |
| self.server_address = server_address | |
| self.RequestHandlerClass = RequestHandlerClass | |
| self.__is_shut_down = threading.Event() | |
| self.__shutdown_request = False | |
| def server_activate(self): | |
| """Called by constructor to activate the server. | |
| May be overridden. | |
| """ | |
| pass | |
| def serve_forever(self, poll_interval=0.5): | |
| """Handle one request at a time until shutdown. | |
| Polls for shutdown every poll_interval seconds. Ignores | |
| self.timeout. If you need to do periodic tasks, do them in | |
| another thread. | |
| """ | |
| self.__is_shut_down.clear() | |
| try: | |
| while not self.__shutdown_request: | |
| # XXX: Consider using another file descriptor or | |
| # connecting to the socket to wake this up instead of | |
| # polling. Polling reduces our responsiveness to a | |
| # shutdown request and wastes cpu at all other times. | |
| r, w, e = select.select([self], [], [], poll_interval) | |
| if self in r: | |
| self._handle_request_noblock() | |
| finally: | |
| self.__shutdown_request = False | |
| self.__is_shut_down.set() | |
| def shutdown(self): | |
| """Stops the serve_forever loop. | |
| Blocks until the loop has finished. This must be called while | |
| serve_forever() is running in another thread, or it will | |
| deadlock. | |
| """ | |
| self.__shutdown_request = True | |
| self.__is_shut_down.wait() | |
| # The distinction between handling, getting, processing and | |
| # finishing a request is fairly arbitrary. Remember: | |
| # | |
| # - handle_request() is the top-level call. It calls | |
| # select, get_request(), verify_request() and process_request() | |
| # - get_request() is different for stream or datagram sockets | |
| # - process_request() is the place that may fork a new process | |
| # or create a new thread to finish the request | |
| # - finish_request() instantiates the request handler class; | |
| # this constructor will handle the request all by itself | |
| def handle_request(self): | |
| """Handle one request, possibly blocking. | |
| Respects self.timeout. | |
| """ | |
| # Support people who used socket.settimeout() to escape | |
| # handle_request before self.timeout was available. | |
| timeout = self.socket.gettimeout() | |
| if timeout is None: | |
| timeout = self.timeout | |
| elif self.timeout is not None: | |
| timeout = min(timeout, self.timeout) | |
| fd_sets = select.select([self], [], [], timeout) | |
| if not fd_sets[0]: | |
| self.handle_timeout() | |
| return | |
| self._handle_request_noblock() | |
| def _handle_request_noblock(self): | |
| """Handle one request, without blocking. | |
| I assume that select.select has returned that the socket is | |
| readable before this function was called, so there should be | |
| no risk of blocking in get_request(). | |
| """ | |
| try: | |
| request, client_address = self.get_request() | |
| except socket.error: | |
| return | |
| if self.verify_request(request, client_address): | |
| try: | |
| self.process_request(request, client_address) | |
| except: | |
| self.handle_error(request, client_address) | |
| self.shutdown_request(request) | |
| def handle_timeout(self): | |
| """Called if no new request arrives within self.timeout. | |
| Overridden by ForkingMixIn. | |
| """ | |
| pass | |
| def verify_request(self, request, client_address): | |
| """Verify the request. May be overridden. | |
| Return True if we should proceed with this request. | |
| """ | |
| return True | |
| def process_request(self, request, client_address): | |
| """Call finish_request. | |
| Overridden by ForkingMixIn and ThreadingMixIn. | |
| """ | |
| self.finish_request(request, client_address) | |
| self.shutdown_request(request) | |
| def server_close(self): | |
| """Called to clean-up the server. | |
| May be overridden. | |
| """ | |
| pass | |
| def finish_request(self, request, client_address): | |
| """Finish one request by instantiating RequestHandlerClass.""" | |
| self.RequestHandlerClass(request, client_address, self) | |
| def shutdown_request(self, request): | |
| """Called to shutdown and close an individual request.""" | |
| self.close_request(request) | |
| def close_request(self, request): | |
| """Called to clean up an individual request.""" | |
| pass | |
| def handle_error(self, request, client_address): | |
| """Handle an error gracefully. May be overridden. | |
| The default is to print a traceback and continue. | |
| """ | |
| print '-'*40 | |
| print 'Exception happened during processing of request from', | |
| print client_address | |
| import traceback | |
| traceback.print_exc() # XXX But this goes to stderr! | |
| print '-'*40 | |
| class TCPServer(BaseServer): | |
| """Base class for various socket-based server classes. | |
| Defaults to synchronous IP stream (i.e., TCP). | |
| Methods for the caller: | |
| - __init__(server_address, RequestHandlerClass, bind_and_activate=True) | |
| - serve_forever(poll_interval=0.5) | |
| - shutdown() | |
| - handle_request() # if you don't use serve_forever() | |
| - fileno() -> int # for select() | |
| Methods that may be overridden: | |
| - server_bind() | |
| - server_activate() | |
| - get_request() -> request, client_address | |
| - handle_timeout() | |
| - verify_request(request, client_address) | |
| - process_request(request, client_address) | |
| - shutdown_request(request) | |
| - close_request(request) | |
| - handle_error() | |
| Methods for derived classes: | |
| - finish_request(request, client_address) | |
| Class variables that may be overridden by derived classes or | |
| instances: | |
| - timeout | |
| - address_family | |
| - socket_type | |
| - request_queue_size (only for stream sockets) | |
| - allow_reuse_address | |
| Instance variables: | |
| - server_address | |
| - RequestHandlerClass | |
| - socket | |
| """ | |
| address_family = socket.AF_INET | |
| socket_type = socket.SOCK_STREAM | |
| request_queue_size = 5 | |
| allow_reuse_address = False | |
| def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True): | |
| """Constructor. May be extended, do not override.""" | |
| BaseServer.__init__(self, server_address, RequestHandlerClass) | |
| self.socket = socket.socket(self.address_family, | |
| self.socket_type) | |
| if bind_and_activate: | |
| self.server_bind() | |
| self.server_activate() | |
| def server_bind(self): | |
| """Called by constructor to bind the socket. | |
| May be overridden. | |
| """ | |
| if self.allow_reuse_address: | |
| self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
| self.socket.bind(self.server_address) | |
| self.server_address = self.socket.getsockname() | |
| def server_activate(self): | |
| """Called by constructor to activate the server. | |
| May be overridden. | |
| """ | |
| self.socket.listen(self.request_queue_size) | |
| def server_close(self): | |
| """Called to clean-up the server. | |
| May be overridden. | |
| """ | |
| self.socket.close() | |
| def fileno(self): | |
| """Return socket file number. | |
| Interface required by select(). | |
| """ | |
| return self.socket.fileno() | |
| def get_request(self): | |
| """Get the request and client address from the socket. | |
| May be overridden. | |
| """ | |
| return self.socket.accept() | |
| def shutdown_request(self, request): | |
| """Called to shutdown and close an individual request.""" | |
| try: | |
| #explicitly shutdown. socket.close() merely releases | |
| #the socket and waits for GC to perform the actual close. | |
| request.shutdown(socket.SHUT_WR) | |
| except socket.error: | |
| pass #some platforms may raise ENOTCONN here | |
| self.close_request(request) | |
| def close_request(self, request): | |
| """Called to clean up an individual request.""" | |
| request.close() | |
| class UDPServer(TCPServer): | |
| """UDP server class.""" | |
| allow_reuse_address = False | |
| socket_type = socket.SOCK_DGRAM | |
| max_packet_size = 8192 | |
| def get_request(self): | |
| data, client_addr = self.socket.recvfrom(self.max_packet_size) | |
| return (data, self.socket), client_addr | |
| def server_activate(self): | |
| # No need to call listen() for UDP. | |
| pass | |
| def shutdown_request(self, request): | |
| # No need to shutdown anything. | |
| self.close_request(request) | |
| def close_request(self, request): | |
| # No need to close anything. | |
| pass | |
| class ForkingMixIn: | |
| """Mix-in class to handle each request in a new process.""" | |
| timeout = 300 | |
| active_children = None | |
| max_children = 40 | |
| def collect_children(self): | |
| """Internal routine to wait for children that have exited.""" | |
| if self.active_children is None: return | |
| while len(self.active_children) >= self.max_children: | |
| # XXX: This will wait for any child process, not just ones | |
| # spawned by this library. This could confuse other | |
| # libraries that expect to be able to wait for their own | |
| # children. | |
| try: | |
| pid, status = os.waitpid(0, 0) | |
| except os.error: | |
| pid = None | |
| if pid not in self.active_children: continue | |
| self.active_children.remove(pid) | |
| # XXX: This loop runs more system calls than it ought | |
| # to. There should be a way to put the active_children into a | |
| # process group and then use os.waitpid(-pgid) to wait for any | |
| # of that set, but I couldn't find a way to allocate pgids | |
| # that couldn't collide. | |
| for child in self.active_children: | |
| try: | |
| pid, status = os.waitpid(child, os.WNOHANG) | |
| except os.error: | |
| pid = None | |
| if not pid: continue | |
| try: | |
| self.active_children.remove(pid) | |
| except ValueError, e: | |
| raise ValueError('%s. x=%d and list=%r' % (e.message, pid, | |
| self.active_children)) | |
| def handle_timeout(self): | |
| """Wait for zombies after self.timeout seconds of inactivity. | |
| May be extended, do not override. | |
| """ | |
| self.collect_children() | |
| def process_request(self, request, client_address): | |
| """Fork a new subprocess to process the request.""" | |
| self.collect_children() | |
| pid = os.fork() | |
| if pid: | |
| # Parent process | |
| if self.active_children is None: | |
| self.active_children = [] | |
| self.active_children.append(pid) | |
| self.close_request(request) #close handle in parent process | |
| return | |
| else: | |
| # Child process. | |
| # This must never return, hence os._exit()! | |
| try: | |
| self.finish_request(request, client_address) | |
| self.shutdown_request(request) | |
| os._exit(0) | |
| except: | |
| try: | |
| self.handle_error(request, client_address) | |
| self.shutdown_request(request) | |
| finally: | |
| os._exit(1) | |
| class ThreadingMixIn: | |
| """Mix-in class to handle each request in a new thread.""" | |
| # Decides how threads will act upon termination of the | |
| # main process | |
| daemon_threads = False | |
| def process_request_thread(self, request, client_address): | |
| """Same as in BaseServer but as a thread. | |
| In addition, exception handling is done here. | |
| """ | |
| try: | |
| self.finish_request(request, client_address) | |
| self.shutdown_request(request) | |
| except: | |
| self.handle_error(request, client_address) | |
| self.shutdown_request(request) | |
| def process_request(self, request, client_address): | |
| """Start a new thread to process the request.""" | |
| t = threading.Thread(target = self.process_request_thread, | |
| args = (request, client_address)) | |
| if self.daemon_threads: | |
| t.setDaemon (1) | |
| t.start() | |
| class ForkingUDPServer(ForkingMixIn, UDPServer): pass | |
| class ForkingTCPServer(ForkingMixIn, TCPServer): pass | |
| class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass | |
| class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass | |
| if hasattr(socket, 'AF_UNIX'): | |
| class UnixStreamServer(TCPServer): | |
| address_family = socket.AF_UNIX | |
| class UnixDatagramServer(UDPServer): | |
| address_family = socket.AF_UNIX | |
| class ThreadingUnixStreamServer(ThreadingMixIn, UnixStreamServer): pass | |
| class ThreadingUnixDatagramServer(ThreadingMixIn, UnixDatagramServer): pass | |
| class BaseRequestHandler: | |
| """Base class for request handler classes. | |
| This class is instantiated for each request to be handled. The | |
| constructor sets the instance variables request, client_address | |
| and server, and then calls the handle() method. To implement a | |
| specific service, all you need to do is to derive a class which | |
| defines a handle() method. | |
| The handle() method can find the request as self.request, the | |
| client address as self.client_address, and the server (in case it | |
| needs access to per-server information) as self.server. Since a | |
| separate instance is created for each request, the handle() method | |
| can define arbitrary other instance variariables. | |
| """ | |
| def __init__(self, request, client_address, server): | |
| self.request = request | |
| self.client_address = client_address | |
| self.server = server | |
| self.setup() | |
| try: | |
| self.handle() | |
| finally: | |
| self.finish() | |
| def setup(self): | |
| pass | |
| def handle(self): | |
| pass | |
| def finish(self): | |
| pass | |
| # The following two classes make it possible to use the same service | |
| # class for stream or datagram servers. | |
| # Each class sets up these instance variables: | |
| # - rfile: a file object from which receives the request is read | |
| # - wfile: a file object to which the reply is written | |
| # When the handle() method returns, wfile is flushed properly | |
| class StreamRequestHandler(BaseRequestHandler): | |
| """Define self.rfile and self.wfile for stream sockets.""" | |
| # Default buffer sizes for rfile, wfile. | |
| # We default rfile to buffered because otherwise it could be | |
| # really slow for large data (a getc() call per byte); we make | |
| # wfile unbuffered because (a) often after a write() we want to | |
| # read and we need to flush the line; (b) big writes to unbuffered | |
| # files are typically optimized by stdio even when big reads | |
| # aren't. | |
| rbufsize = -1 | |
| wbufsize = 0 | |
| # A timeout to apply to the request socket, if not None. | |
| timeout = None | |
| # Disable nagle algorithm for this socket, if True. | |
| # Use only when wbufsize != 0, to avoid small packets. | |
| disable_nagle_algorithm = False | |
| def setup(self): | |
| self.connection = self.request | |
| if self.timeout is not None: | |
| self.connection.settimeout(self.timeout) | |
| if self.disable_nagle_algorithm: | |
| self.connection.setsockopt(socket.IPPROTO_TCP, | |
| socket.TCP_NODELAY, True) | |
| self.rfile = self.connection.makefile('rb', self.rbufsize) | |
| self.wfile = self.connection.makefile('wb', self.wbufsize) | |
| def finish(self): | |
| if not self.wfile.closed: | |
| self.wfile.flush() | |
| self.wfile.close() | |
| self.rfile.close() | |
| class DatagramRequestHandler(BaseRequestHandler): | |
| # XXX Regrettably, I cannot get this working on Linux; | |
| # s.recvfrom() doesn't return a meaningful client address. | |
| """Define self.rfile and self.wfile for datagram sockets.""" | |
| def setup(self): | |
| try: | |
| from cStringIO import StringIO | |
| except ImportError: | |
| from StringIO import StringIO | |
| self.packet, self.socket = self.request | |
| self.rfile = StringIO(self.packet) | |
| self.wfile = StringIO() | |
| def finish(self): | |
| self.socket.sendto(self.wfile.getvalue(), self.client_address) |