"""Macintosh binhex compression/decompression. | |
easy interface: | |
binhex(inputfilename, outputfilename) | |
hexbin(inputfilename, outputfilename) | |
""" | |
# | |
# Jack Jansen, CWI, August 1995. | |
# | |
# The module is supposed to be as compatible as possible. Especially the | |
# easy interface should work "as expected" on any platform. | |
# XXXX Note: currently, textfiles appear in mac-form on all platforms. | |
# We seem to lack a simple character-translate in python. | |
# (we should probably use ISO-Latin-1 on all but the mac platform). | |
# XXXX The simple routines are too simple: they expect to hold the complete | |
# files in-core. Should be fixed. | |
# XXXX It would be nice to handle AppleDouble format on unix | |
# (for servers serving macs). | |
# XXXX I don't understand what happens when you get 0x90 times the same byte on | |
# input. The resulting code (xx 90 90) would appear to be interpreted as an | |
# escaped *value* of 0x90. All coders I've seen appear to ignore this nicety... | |
# | |
import sys | |
import os | |
import struct | |
import binascii | |
__all__ = ["binhex","hexbin","Error"] | |
class Error(Exception): | |
pass | |
# States (what have we written) | |
[_DID_HEADER, _DID_DATA, _DID_RSRC] = range(3) | |
# Various constants | |
REASONABLY_LARGE=32768 # Minimal amount we pass the rle-coder | |
LINELEN=64 | |
RUNCHAR=chr(0x90) # run-length introducer | |
# | |
# This code is no longer byte-order dependent | |
# | |
# Workarounds for non-mac machines. | |
try: | |
from Carbon.File import FSSpec, FInfo | |
from MacOS import openrf | |
def getfileinfo(name): | |
finfo = FSSpec(name).FSpGetFInfo() | |
dir, file = os.path.split(name) | |
# XXX Get resource/data sizes | |
fp = open(name, 'rb') | |
fp.seek(0, 2) | |
dlen = fp.tell() | |
fp = openrf(name, '*rb') | |
fp.seek(0, 2) | |
rlen = fp.tell() | |
return file, finfo, dlen, rlen | |
def openrsrc(name, *mode): | |
if not mode: | |
mode = '*rb' | |
else: | |
mode = '*' + mode[0] | |
return openrf(name, mode) | |
except ImportError: | |
# | |
# Glue code for non-macintosh usage | |
# | |
class FInfo: | |
def __init__(self): | |
self.Type = '????' | |
self.Creator = '????' | |
self.Flags = 0 | |
def getfileinfo(name): | |
finfo = FInfo() | |
# Quick check for textfile | |
fp = open(name) | |
data = open(name).read(256) | |
for c in data: | |
if not c.isspace() and (c<' ' or ord(c) > 0x7f): | |
break | |
else: | |
finfo.Type = 'TEXT' | |
fp.seek(0, 2) | |
dsize = fp.tell() | |
fp.close() | |
dir, file = os.path.split(name) | |
file = file.replace(':', '-', 1) | |
return file, finfo, dsize, 0 | |
class openrsrc: | |
def __init__(self, *args): | |
pass | |
def read(self, *args): | |
return '' | |
def write(self, *args): | |
pass | |
def close(self): | |
pass | |
class _Hqxcoderengine: | |
"""Write data to the coder in 3-byte chunks""" | |
def __init__(self, ofp): | |
self.ofp = ofp | |
self.data = '' | |
self.hqxdata = '' | |
self.linelen = LINELEN-1 | |
def write(self, data): | |
self.data = self.data + data | |
datalen = len(self.data) | |
todo = (datalen//3)*3 | |
data = self.data[:todo] | |
self.data = self.data[todo:] | |
if not data: | |
return | |
self.hqxdata = self.hqxdata + binascii.b2a_hqx(data) | |
self._flush(0) | |
def _flush(self, force): | |
first = 0 | |
while first <= len(self.hqxdata)-self.linelen: | |
last = first + self.linelen | |
self.ofp.write(self.hqxdata[first:last]+'\n') | |
self.linelen = LINELEN | |
first = last | |
self.hqxdata = self.hqxdata[first:] | |
if force: | |
self.ofp.write(self.hqxdata + ':\n') | |
def close(self): | |
if self.data: | |
self.hqxdata = \ | |
self.hqxdata + binascii.b2a_hqx(self.data) | |
self._flush(1) | |
self.ofp.close() | |
del self.ofp | |
class _Rlecoderengine: | |
"""Write data to the RLE-coder in suitably large chunks""" | |
def __init__(self, ofp): | |
self.ofp = ofp | |
self.data = '' | |
def write(self, data): | |
self.data = self.data + data | |
if len(self.data) < REASONABLY_LARGE: | |
return | |
rledata = binascii.rlecode_hqx(self.data) | |
self.ofp.write(rledata) | |
self.data = '' | |
def close(self): | |
if self.data: | |
rledata = binascii.rlecode_hqx(self.data) | |
self.ofp.write(rledata) | |
self.ofp.close() | |
del self.ofp | |
class BinHex: | |
def __init__(self, name_finfo_dlen_rlen, ofp): | |
name, finfo, dlen, rlen = name_finfo_dlen_rlen | |
if type(ofp) == type(''): | |
ofname = ofp | |
ofp = open(ofname, 'w') | |
ofp.write('(This file must be converted with BinHex 4.0)\n\n:') | |
hqxer = _Hqxcoderengine(ofp) | |
self.ofp = _Rlecoderengine(hqxer) | |
self.crc = 0 | |
if finfo is None: | |
finfo = FInfo() | |
self.dlen = dlen | |
self.rlen = rlen | |
self._writeinfo(name, finfo) | |
self.state = _DID_HEADER | |
def _writeinfo(self, name, finfo): | |
nl = len(name) | |
if nl > 63: | |
raise Error, 'Filename too long' | |
d = chr(nl) + name + '\0' | |
d2 = finfo.Type + finfo.Creator | |
# Force all structs to be packed with big-endian | |
d3 = struct.pack('>h', finfo.Flags) | |
d4 = struct.pack('>ii', self.dlen, self.rlen) | |
info = d + d2 + d3 + d4 | |
self._write(info) | |
self._writecrc() | |
def _write(self, data): | |
self.crc = binascii.crc_hqx(data, self.crc) | |
self.ofp.write(data) | |
def _writecrc(self): | |
# XXXX Should this be here?? | |
# self.crc = binascii.crc_hqx('\0\0', self.crc) | |
if self.crc < 0: | |
fmt = '>h' | |
else: | |
fmt = '>H' | |
self.ofp.write(struct.pack(fmt, self.crc)) | |
self.crc = 0 | |
def write(self, data): | |
if self.state != _DID_HEADER: | |
raise Error, 'Writing data at the wrong time' | |
self.dlen = self.dlen - len(data) | |
self._write(data) | |
def close_data(self): | |
if self.dlen != 0: | |
raise Error, 'Incorrect data size, diff=%r' % (self.rlen,) | |
self._writecrc() | |
self.state = _DID_DATA | |
def write_rsrc(self, data): | |
if self.state < _DID_DATA: | |
self.close_data() | |
if self.state != _DID_DATA: | |
raise Error, 'Writing resource data at the wrong time' | |
self.rlen = self.rlen - len(data) | |
self._write(data) | |
def close(self): | |
if self.state < _DID_DATA: | |
self.close_data() | |
if self.state != _DID_DATA: | |
raise Error, 'Close at the wrong time' | |
if self.rlen != 0: | |
raise Error, \ | |
"Incorrect resource-datasize, diff=%r" % (self.rlen,) | |
self._writecrc() | |
self.ofp.close() | |
self.state = None | |
del self.ofp | |
def binhex(inp, out): | |
"""(infilename, outfilename) - Create binhex-encoded copy of a file""" | |
finfo = getfileinfo(inp) | |
ofp = BinHex(finfo, out) | |
ifp = open(inp, 'rb') | |
# XXXX Do textfile translation on non-mac systems | |
while 1: | |
d = ifp.read(128000) | |
if not d: break | |
ofp.write(d) | |
ofp.close_data() | |
ifp.close() | |
ifp = openrsrc(inp, 'rb') | |
while 1: | |
d = ifp.read(128000) | |
if not d: break | |
ofp.write_rsrc(d) | |
ofp.close() | |
ifp.close() | |
class _Hqxdecoderengine: | |
"""Read data via the decoder in 4-byte chunks""" | |
def __init__(self, ifp): | |
self.ifp = ifp | |
self.eof = 0 | |
def read(self, totalwtd): | |
"""Read at least wtd bytes (or until EOF)""" | |
decdata = '' | |
wtd = totalwtd | |
# | |
# The loop here is convoluted, since we don't really now how | |
# much to decode: there may be newlines in the incoming data. | |
while wtd > 0: | |
if self.eof: return decdata | |
wtd = ((wtd+2)//3)*4 | |
data = self.ifp.read(wtd) | |
# | |
# Next problem: there may not be a complete number of | |
# bytes in what we pass to a2b. Solve by yet another | |
# loop. | |
# | |
while 1: | |
try: | |
decdatacur, self.eof = \ | |
binascii.a2b_hqx(data) | |
break | |
except binascii.Incomplete: | |
pass | |
newdata = self.ifp.read(1) | |
if not newdata: | |
raise Error, \ | |
'Premature EOF on binhex file' | |
data = data + newdata | |
decdata = decdata + decdatacur | |
wtd = totalwtd - len(decdata) | |
if not decdata and not self.eof: | |
raise Error, 'Premature EOF on binhex file' | |
return decdata | |
def close(self): | |
self.ifp.close() | |
class _Rledecoderengine: | |
"""Read data via the RLE-coder""" | |
def __init__(self, ifp): | |
self.ifp = ifp | |
self.pre_buffer = '' | |
self.post_buffer = '' | |
self.eof = 0 | |
def read(self, wtd): | |
if wtd > len(self.post_buffer): | |
self._fill(wtd-len(self.post_buffer)) | |
rv = self.post_buffer[:wtd] | |
self.post_buffer = self.post_buffer[wtd:] | |
return rv | |
def _fill(self, wtd): | |
self.pre_buffer = self.pre_buffer + self.ifp.read(wtd+4) | |
if self.ifp.eof: | |
self.post_buffer = self.post_buffer + \ | |
binascii.rledecode_hqx(self.pre_buffer) | |
self.pre_buffer = '' | |
return | |
# | |
# Obfuscated code ahead. We have to take care that we don't | |
# end up with an orphaned RUNCHAR later on. So, we keep a couple | |
# of bytes in the buffer, depending on what the end of | |
# the buffer looks like: | |
# '\220\0\220' - Keep 3 bytes: repeated \220 (escaped as \220\0) | |
# '?\220' - Keep 2 bytes: repeated something-else | |
# '\220\0' - Escaped \220: Keep 2 bytes. | |
# '?\220?' - Complete repeat sequence: decode all | |
# otherwise: keep 1 byte. | |
# | |
mark = len(self.pre_buffer) | |
if self.pre_buffer[-3:] == RUNCHAR + '\0' + RUNCHAR: | |
mark = mark - 3 | |
elif self.pre_buffer[-1] == RUNCHAR: | |
mark = mark - 2 | |
elif self.pre_buffer[-2:] == RUNCHAR + '\0': | |
mark = mark - 2 | |
elif self.pre_buffer[-2] == RUNCHAR: | |
pass # Decode all | |
else: | |
mark = mark - 1 | |
self.post_buffer = self.post_buffer + \ | |
binascii.rledecode_hqx(self.pre_buffer[:mark]) | |
self.pre_buffer = self.pre_buffer[mark:] | |
def close(self): | |
self.ifp.close() | |
class HexBin: | |
def __init__(self, ifp): | |
if type(ifp) == type(''): | |
ifp = open(ifp) | |
# | |
# Find initial colon. | |
# | |
while 1: | |
ch = ifp.read(1) | |
if not ch: | |
raise Error, "No binhex data found" | |
# Cater for \r\n terminated lines (which show up as \n\r, hence | |
# all lines start with \r) | |
if ch == '\r': | |
continue | |
if ch == ':': | |
break | |
if ch != '\n': | |
dummy = ifp.readline() | |
hqxifp = _Hqxdecoderengine(ifp) | |
self.ifp = _Rledecoderengine(hqxifp) | |
self.crc = 0 | |
self._readheader() | |
def _read(self, len): | |
data = self.ifp.read(len) | |
self.crc = binascii.crc_hqx(data, self.crc) | |
return data | |
def _checkcrc(self): | |
filecrc = struct.unpack('>h', self.ifp.read(2))[0] & 0xffff | |
#self.crc = binascii.crc_hqx('\0\0', self.crc) | |
# XXXX Is this needed?? | |
self.crc = self.crc & 0xffff | |
if filecrc != self.crc: | |
raise Error, 'CRC error, computed %x, read %x' \ | |
%(self.crc, filecrc) | |
self.crc = 0 | |
def _readheader(self): | |
len = self._read(1) | |
fname = self._read(ord(len)) | |
rest = self._read(1+4+4+2+4+4) | |
self._checkcrc() | |
type = rest[1:5] | |
creator = rest[5:9] | |
flags = struct.unpack('>h', rest[9:11])[0] | |
self.dlen = struct.unpack('>l', rest[11:15])[0] | |
self.rlen = struct.unpack('>l', rest[15:19])[0] | |
self.FName = fname | |
self.FInfo = FInfo() | |
self.FInfo.Creator = creator | |
self.FInfo.Type = type | |
self.FInfo.Flags = flags | |
self.state = _DID_HEADER | |
def read(self, *n): | |
if self.state != _DID_HEADER: | |
raise Error, 'Read data at wrong time' | |
if n: | |
n = n[0] | |
n = min(n, self.dlen) | |
else: | |
n = self.dlen | |
rv = '' | |
while len(rv) < n: | |
rv = rv + self._read(n-len(rv)) | |
self.dlen = self.dlen - n | |
return rv | |
def close_data(self): | |
if self.state != _DID_HEADER: | |
raise Error, 'close_data at wrong time' | |
if self.dlen: | |
dummy = self._read(self.dlen) | |
self._checkcrc() | |
self.state = _DID_DATA | |
def read_rsrc(self, *n): | |
if self.state == _DID_HEADER: | |
self.close_data() | |
if self.state != _DID_DATA: | |
raise Error, 'Read resource data at wrong time' | |
if n: | |
n = n[0] | |
n = min(n, self.rlen) | |
else: | |
n = self.rlen | |
self.rlen = self.rlen - n | |
return self._read(n) | |
def close(self): | |
if self.rlen: | |
dummy = self.read_rsrc(self.rlen) | |
self._checkcrc() | |
self.state = _DID_RSRC | |
self.ifp.close() | |
def hexbin(inp, out): | |
"""(infilename, outfilename) - Decode binhexed file""" | |
ifp = HexBin(inp) | |
finfo = ifp.FInfo | |
if not out: | |
out = ifp.FName | |
ofp = open(out, 'wb') | |
# XXXX Do translation on non-mac systems | |
while 1: | |
d = ifp.read(128000) | |
if not d: break | |
ofp.write(d) | |
ofp.close() | |
ifp.close_data() | |
d = ifp.read_rsrc(128000) | |
if d: | |
ofp = openrsrc(out, 'wb') | |
ofp.write(d) | |
while 1: | |
d = ifp.read_rsrc(128000) | |
if not d: break | |
ofp.write(d) | |
ofp.close() | |
ifp.close() | |
def _test(): | |
fname = sys.argv[1] | |
binhex(fname, fname+'.hqx') | |
hexbin(fname+'.hqx', fname+'.viahqx') | |
#hexbin(fname, fname+'.unpacked') | |
sys.exit(1) | |
if __name__ == '__main__': | |
_test() |