| """Various tools used by MIME-reading or MIME-writing programs.""" | |
| import os | |
| import sys | |
| import tempfile | |
| from warnings import filterwarnings, catch_warnings | |
| with catch_warnings(): | |
| if sys.py3kwarning: | |
| filterwarnings("ignore", ".*rfc822 has been removed", DeprecationWarning) | |
| import rfc822 | |
| from warnings import warnpy3k | |
| warnpy3k("in 3.x, mimetools has been removed in favor of the email package", | |
| stacklevel=2) | |
| __all__ = ["Message","choose_boundary","encode","decode","copyliteral", | |
| "copybinary"] | |
| class Message(rfc822.Message): | |
| """A derived class of rfc822.Message that knows about MIME headers and | |
| contains some hooks for decoding encoded and multipart messages.""" | |
| def __init__(self, fp, seekable = 1): | |
| rfc822.Message.__init__(self, fp, seekable) | |
| self.encodingheader = \ | |
| self.getheader('content-transfer-encoding') | |
| self.typeheader = \ | |
| self.getheader('content-type') | |
| self.parsetype() | |
| self.parseplist() | |
| def parsetype(self): | |
| str = self.typeheader | |
| if str is None: | |
| str = 'text/plain' | |
| if ';' in str: | |
| i = str.index(';') | |
| self.plisttext = str[i:] | |
| str = str[:i] | |
| else: | |
| self.plisttext = '' | |
| fields = str.split('/') | |
| for i in range(len(fields)): | |
| fields[i] = fields[i].strip().lower() | |
| self.type = '/'.join(fields) | |
| self.maintype = fields[0] | |
| self.subtype = '/'.join(fields[1:]) | |
| def parseplist(self): | |
| str = self.plisttext | |
| self.plist = [] | |
| while str[:1] == ';': | |
| str = str[1:] | |
| if ';' in str: | |
| # XXX Should parse quotes! | |
| end = str.index(';') | |
| else: | |
| end = len(str) | |
| f = str[:end] | |
| if '=' in f: | |
| i = f.index('=') | |
| f = f[:i].strip().lower() + \ | |
| '=' + f[i+1:].strip() | |
| self.plist.append(f.strip()) | |
| str = str[end:] | |
| def getplist(self): | |
| return self.plist | |
| def getparam(self, name): | |
| name = name.lower() + '=' | |
| n = len(name) | |
| for p in self.plist: | |
| if p[:n] == name: | |
| return rfc822.unquote(p[n:]) | |
| return None | |
| def getparamnames(self): | |
| result = [] | |
| for p in self.plist: | |
| i = p.find('=') | |
| if i >= 0: | |
| result.append(p[:i].lower()) | |
| return result | |
| def getencoding(self): | |
| if self.encodingheader is None: | |
| return '7bit' | |
| return self.encodingheader.lower() | |
| def gettype(self): | |
| return self.type | |
| def getmaintype(self): | |
| return self.maintype | |
| def getsubtype(self): | |
| return self.subtype | |
| # Utility functions | |
| # ----------------- | |
| try: | |
| import thread | |
| except ImportError: | |
| import dummy_thread as thread | |
| _counter_lock = thread.allocate_lock() | |
| del thread | |
| _counter = 0 | |
| def _get_next_counter(): | |
| global _counter | |
| _counter_lock.acquire() | |
| _counter += 1 | |
| result = _counter | |
| _counter_lock.release() | |
| return result | |
| _prefix = None | |
| def choose_boundary(): | |
| """Return a string usable as a multipart boundary. | |
| The string chosen is unique within a single program run, and | |
| incorporates the user id (if available), process id (if available), | |
| and current time. So it's very unlikely the returned string appears | |
| in message text, but there's no guarantee. | |
| The boundary contains dots so you have to quote it in the header.""" | |
| global _prefix | |
| import time | |
| if _prefix is None: | |
| import socket | |
| try: | |
| hostid = socket.gethostbyname(socket.gethostname()) | |
| except socket.gaierror: | |
| hostid = '127.0.0.1' | |
| try: | |
| uid = repr(os.getuid()) | |
| except AttributeError: | |
| uid = '1' | |
| try: | |
| pid = repr(os.getpid()) | |
| except AttributeError: | |
| pid = '1' | |
| _prefix = hostid + '.' + uid + '.' + pid | |
| return "%s.%.3f.%d" % (_prefix, time.time(), _get_next_counter()) | |
| # Subroutines for decoding some common content-transfer-types | |
| def decode(input, output, encoding): | |
| """Decode common content-transfer-encodings (base64, quopri, uuencode).""" | |
| if encoding == 'base64': | |
| import base64 | |
| return base64.decode(input, output) | |
| if encoding == 'quoted-printable': | |
| import quopri | |
| return quopri.decode(input, output) | |
| if encoding in ('uuencode', 'x-uuencode', 'uue', 'x-uue'): | |
| import uu | |
| return uu.decode(input, output) | |
| if encoding in ('7bit', '8bit'): | |
| return output.write(input.read()) | |
| if encoding in decodetab: | |
| pipethrough(input, decodetab[encoding], output) | |
| else: | |
| raise ValueError, \ | |
| 'unknown Content-Transfer-Encoding: %s' % encoding | |
| def encode(input, output, encoding): | |
| """Encode common content-transfer-encodings (base64, quopri, uuencode).""" | |
| if encoding == 'base64': | |
| import base64 | |
| return base64.encode(input, output) | |
| if encoding == 'quoted-printable': | |
| import quopri | |
| return quopri.encode(input, output, 0) | |
| if encoding in ('uuencode', 'x-uuencode', 'uue', 'x-uue'): | |
| import uu | |
| return uu.encode(input, output) | |
| if encoding in ('7bit', '8bit'): | |
| return output.write(input.read()) | |
| if encoding in encodetab: | |
| pipethrough(input, encodetab[encoding], output) | |
| else: | |
| raise ValueError, \ | |
| 'unknown Content-Transfer-Encoding: %s' % encoding | |
| # The following is no longer used for standard encodings | |
| # XXX This requires that uudecode and mmencode are in $PATH | |
| uudecode_pipe = '''( | |
| TEMP=/tmp/@uu.$$ | |
| sed "s%^begin [0-7][0-7]* .*%begin 600 $TEMP%" | uudecode | |
| cat $TEMP | |
| rm $TEMP | |
| )''' | |
| decodetab = { | |
| 'uuencode': uudecode_pipe, | |
| 'x-uuencode': uudecode_pipe, | |
| 'uue': uudecode_pipe, | |
| 'x-uue': uudecode_pipe, | |
| 'quoted-printable': 'mmencode -u -q', | |
| 'base64': 'mmencode -u -b', | |
| } | |
| encodetab = { | |
| 'x-uuencode': 'uuencode tempfile', | |
| 'uuencode': 'uuencode tempfile', | |
| 'x-uue': 'uuencode tempfile', | |
| 'uue': 'uuencode tempfile', | |
| 'quoted-printable': 'mmencode -q', | |
| 'base64': 'mmencode -b', | |
| } | |
| def pipeto(input, command): | |
| pipe = os.popen(command, 'w') | |
| copyliteral(input, pipe) | |
| pipe.close() | |
| def pipethrough(input, command, output): | |
| (fd, tempname) = tempfile.mkstemp() | |
| temp = os.fdopen(fd, 'w') | |
| copyliteral(input, temp) | |
| temp.close() | |
| pipe = os.popen(command + ' <' + tempname, 'r') | |
| copybinary(pipe, output) | |
| pipe.close() | |
| os.unlink(tempname) | |
| def copyliteral(input, output): | |
| while 1: | |
| line = input.readline() | |
| if not line: break | |
| output.write(line) | |
| def copybinary(input, output): | |
| BUFSIZE = 8192 | |
| while 1: | |
| line = input.read(BUFSIZE) | |
| if not line: break | |
| output.write(line) |