""" Python 'utf-8-sig' Codec | |
This work similar to UTF-8 with the following changes: | |
* On encoding/writing a UTF-8 encoded BOM will be prepended/written as the | |
first three bytes. | |
* On decoding/reading if the first three bytes are a UTF-8 encoded BOM, these | |
bytes will be skipped. | |
""" | |
import codecs | |
### Codec APIs | |
def encode(input, errors='strict'): | |
return (codecs.BOM_UTF8 + codecs.utf_8_encode(input, errors)[0], len(input)) | |
def decode(input, errors='strict'): | |
prefix = 0 | |
if input[:3] == codecs.BOM_UTF8: | |
input = input[3:] | |
prefix = 3 | |
(output, consumed) = codecs.utf_8_decode(input, errors, True) | |
return (output, consumed+prefix) | |
class IncrementalEncoder(codecs.IncrementalEncoder): | |
def __init__(self, errors='strict'): | |
codecs.IncrementalEncoder.__init__(self, errors) | |
self.first = 1 | |
def encode(self, input, final=False): | |
if self.first: | |
self.first = 0 | |
return codecs.BOM_UTF8 + codecs.utf_8_encode(input, self.errors)[0] | |
else: | |
return codecs.utf_8_encode(input, self.errors)[0] | |
def reset(self): | |
codecs.IncrementalEncoder.reset(self) | |
self.first = 1 | |
def getstate(self): | |
return self.first | |
def setstate(self, state): | |
self.first = state | |
class IncrementalDecoder(codecs.BufferedIncrementalDecoder): | |
def __init__(self, errors='strict'): | |
codecs.BufferedIncrementalDecoder.__init__(self, errors) | |
self.first = True | |
def _buffer_decode(self, input, errors, final): | |
if self.first: | |
if len(input) < 3: | |
if codecs.BOM_UTF8.startswith(input): | |
# not enough data to decide if this really is a BOM | |
# => try again on the next call | |
return (u"", 0) | |
else: | |
self.first = None | |
else: | |
self.first = None | |
if input[:3] == codecs.BOM_UTF8: | |
(output, consumed) = codecs.utf_8_decode(input[3:], errors, final) | |
return (output, consumed+3) | |
return codecs.utf_8_decode(input, errors, final) | |
def reset(self): | |
codecs.BufferedIncrementalDecoder.reset(self) | |
self.first = True | |
class StreamWriter(codecs.StreamWriter): | |
def reset(self): | |
codecs.StreamWriter.reset(self) | |
try: | |
del self.encode | |
except AttributeError: | |
pass | |
def encode(self, input, errors='strict'): | |
self.encode = codecs.utf_8_encode | |
return encode(input, errors) | |
class StreamReader(codecs.StreamReader): | |
def reset(self): | |
codecs.StreamReader.reset(self) | |
try: | |
del self.decode | |
except AttributeError: | |
pass | |
def decode(self, input, errors='strict'): | |
if len(input) < 3: | |
if codecs.BOM_UTF8.startswith(input): | |
# not enough data to decide if this is a BOM | |
# => try again on the next call | |
return (u"", 0) | |
elif input[:3] == codecs.BOM_UTF8: | |
self.decode = codecs.utf_8_decode | |
(output, consumed) = codecs.utf_8_decode(input[3:],errors) | |
return (output, consumed+3) | |
# (else) no BOM present | |
self.decode = codecs.utf_8_decode | |
return codecs.utf_8_decode(input, errors) | |
### encodings module API | |
def getregentry(): | |
return codecs.CodecInfo( | |
name='utf-8-sig', | |
encode=encode, | |
decode=decode, | |
incrementalencoder=IncrementalEncoder, | |
incrementaldecoder=IncrementalDecoder, | |
streamreader=StreamReader, | |
streamwriter=StreamWriter, | |
) |