""" | |
csv.py - read/write/investigate CSV files | |
""" | |
import re | |
from functools import reduce | |
from _csv import Error, __version__, writer, reader, register_dialect, \ | |
unregister_dialect, get_dialect, list_dialects, \ | |
field_size_limit, \ | |
QUOTE_MINIMAL, QUOTE_ALL, QUOTE_NONNUMERIC, QUOTE_NONE, \ | |
__doc__ | |
from _csv import Dialect as _Dialect | |
try: | |
from cStringIO import StringIO | |
except ImportError: | |
from StringIO import StringIO | |
__all__ = [ "QUOTE_MINIMAL", "QUOTE_ALL", "QUOTE_NONNUMERIC", "QUOTE_NONE", | |
"Error", "Dialect", "__doc__", "excel", "excel_tab", | |
"field_size_limit", "reader", "writer", | |
"register_dialect", "get_dialect", "list_dialects", "Sniffer", | |
"unregister_dialect", "__version__", "DictReader", "DictWriter" ] | |
class Dialect: | |
"""Describe an Excel dialect. | |
This must be subclassed (see csv.excel). Valid attributes are: | |
delimiter, quotechar, escapechar, doublequote, skipinitialspace, | |
lineterminator, quoting. | |
""" | |
_name = "" | |
_valid = False | |
# placeholders | |
delimiter = None | |
quotechar = None | |
escapechar = None | |
doublequote = None | |
skipinitialspace = None | |
lineterminator = None | |
quoting = None | |
def __init__(self): | |
if self.__class__ != Dialect: | |
self._valid = True | |
self._validate() | |
def _validate(self): | |
try: | |
_Dialect(self) | |
except TypeError, e: | |
# We do this for compatibility with py2.3 | |
raise Error(str(e)) | |
class excel(Dialect): | |
"""Describe the usual properties of Excel-generated CSV files.""" | |
delimiter = ',' | |
quotechar = '"' | |
doublequote = True | |
skipinitialspace = False | |
lineterminator = '\r\n' | |
quoting = QUOTE_MINIMAL | |
register_dialect("excel", excel) | |
class excel_tab(excel): | |
"""Describe the usual properties of Excel-generated TAB-delimited files.""" | |
delimiter = '\t' | |
register_dialect("excel-tab", excel_tab) | |
class DictReader: | |
def __init__(self, f, fieldnames=None, restkey=None, restval=None, | |
dialect="excel", *args, **kwds): | |
self._fieldnames = fieldnames # list of keys for the dict | |
self.restkey = restkey # key to catch long rows | |
self.restval = restval # default value for short rows | |
self.reader = reader(f, dialect, *args, **kwds) | |
self.dialect = dialect | |
self.line_num = 0 | |
def __iter__(self): | |
return self | |
@property | |
def fieldnames(self): | |
if self._fieldnames is None: | |
try: | |
self._fieldnames = self.reader.next() | |
except StopIteration: | |
pass | |
self.line_num = self.reader.line_num | |
return self._fieldnames | |
@fieldnames.setter | |
def fieldnames(self, value): | |
self._fieldnames = value | |
def next(self): | |
if self.line_num == 0: | |
# Used only for its side effect. | |
self.fieldnames | |
row = self.reader.next() | |
self.line_num = self.reader.line_num | |
# unlike the basic reader, we prefer not to return blanks, | |
# because we will typically wind up with a dict full of None | |
# values | |
while row == []: | |
row = self.reader.next() | |
d = dict(zip(self.fieldnames, row)) | |
lf = len(self.fieldnames) | |
lr = len(row) | |
if lf < lr: | |
d[self.restkey] = row[lf:] | |
elif lf > lr: | |
for key in self.fieldnames[lr:]: | |
d[key] = self.restval | |
return d | |
class DictWriter: | |
def __init__(self, f, fieldnames, restval="", extrasaction="raise", | |
dialect="excel", *args, **kwds): | |
self.fieldnames = fieldnames # list of keys for the dict | |
self.restval = restval # for writing short dicts | |
if extrasaction.lower() not in ("raise", "ignore"): | |
raise ValueError, \ | |
("extrasaction (%s) must be 'raise' or 'ignore'" % | |
extrasaction) | |
self.extrasaction = extrasaction | |
self.writer = writer(f, dialect, *args, **kwds) | |
def writeheader(self): | |
header = dict(zip(self.fieldnames, self.fieldnames)) | |
self.writerow(header) | |
def _dict_to_list(self, rowdict): | |
if self.extrasaction == "raise": | |
wrong_fields = [k for k in rowdict if k not in self.fieldnames] | |
if wrong_fields: | |
raise ValueError("dict contains fields not in fieldnames: " + | |
", ".join(wrong_fields)) | |
return [rowdict.get(key, self.restval) for key in self.fieldnames] | |
def writerow(self, rowdict): | |
return self.writer.writerow(self._dict_to_list(rowdict)) | |
def writerows(self, rowdicts): | |
rows = [] | |
for rowdict in rowdicts: | |
rows.append(self._dict_to_list(rowdict)) | |
return self.writer.writerows(rows) | |
# Guard Sniffer's type checking against builds that exclude complex() | |
try: | |
complex | |
except NameError: | |
complex = float | |
class Sniffer: | |
''' | |
"Sniffs" the format of a CSV file (i.e. delimiter, quotechar) | |
Returns a Dialect object. | |
''' | |
def __init__(self): | |
# in case there is more than one possible delimiter | |
self.preferred = [',', '\t', ';', ' ', ':'] | |
def sniff(self, sample, delimiters=None): | |
""" | |
Returns a dialect (or None) corresponding to the sample | |
""" | |
quotechar, doublequote, delimiter, skipinitialspace = \ | |
self._guess_quote_and_delimiter(sample, delimiters) | |
if not delimiter: | |
delimiter, skipinitialspace = self._guess_delimiter(sample, | |
delimiters) | |
if not delimiter: | |
raise Error, "Could not determine delimiter" | |
class dialect(Dialect): | |
_name = "sniffed" | |
lineterminator = '\r\n' | |
quoting = QUOTE_MINIMAL | |
# escapechar = '' | |
dialect.doublequote = doublequote | |
dialect.delimiter = delimiter | |
# _csv.reader won't accept a quotechar of '' | |
dialect.quotechar = quotechar or '"' | |
dialect.skipinitialspace = skipinitialspace | |
return dialect | |
def _guess_quote_and_delimiter(self, data, delimiters): | |
""" | |
Looks for text enclosed between two identical quotes | |
(the probable quotechar) which are preceded and followed | |
by the same character (the probable delimiter). | |
For example: | |
,'some text', | |
The quote with the most wins, same with the delimiter. | |
If there is no quotechar the delimiter can't be determined | |
this way. | |
""" | |
matches = [] | |
for restr in ('(?P<delim>[^\w\n"\'])(?P<space> ?)(?P<quote>["\']).*?(?P=quote)(?P=delim)', # ,".*?", | |
'(?:^|\n)(?P<quote>["\']).*?(?P=quote)(?P<delim>[^\w\n"\'])(?P<space> ?)', # ".*?", | |
'(?P<delim>>[^\w\n"\'])(?P<space> ?)(?P<quote>["\']).*?(?P=quote)(?:$|\n)', # ,".*?" | |
'(?:^|\n)(?P<quote>["\']).*?(?P=quote)(?:$|\n)'): # ".*?" (no delim, no space) | |
regexp = re.compile(restr, re.DOTALL | re.MULTILINE) | |
matches = regexp.findall(data) | |
if matches: | |
break | |
if not matches: | |
# (quotechar, doublequote, delimiter, skipinitialspace) | |
return ('', False, None, 0) | |
quotes = {} | |
delims = {} | |
spaces = 0 | |
for m in matches: | |
n = regexp.groupindex['quote'] - 1 | |
key = m[n] | |
if key: | |
quotes[key] = quotes.get(key, 0) + 1 | |
try: | |
n = regexp.groupindex['delim'] - 1 | |
key = m[n] | |
except KeyError: | |
continue | |
if key and (delimiters is None or key in delimiters): | |
delims[key] = delims.get(key, 0) + 1 | |
try: | |
n = regexp.groupindex['space'] - 1 | |
except KeyError: | |
continue | |
if m[n]: | |
spaces += 1 | |
quotechar = reduce(lambda a, b, quotes = quotes: | |
(quotes[a] > quotes[b]) and a or b, quotes.keys()) | |
if delims: | |
delim = reduce(lambda a, b, delims = delims: | |
(delims[a] > delims[b]) and a or b, delims.keys()) | |
skipinitialspace = delims[delim] == spaces | |
if delim == '\n': # most likely a file with a single column | |
delim = '' | |
else: | |
# there is *no* delimiter, it's a single column of quoted data | |
delim = '' | |
skipinitialspace = 0 | |
# if we see an extra quote between delimiters, we've got a | |
# double quoted format | |
dq_regexp = re.compile(r"((%(delim)s)|^)\W*%(quote)s[^%(delim)s\n]*%(quote)s[^%(delim)s\n]*%(quote)s\W*((%(delim)s)|$)" % \ | |
{'delim':delim, 'quote':quotechar}, re.MULTILINE) | |
if dq_regexp.search(data): | |
doublequote = True | |
else: | |
doublequote = False | |
return (quotechar, doublequote, delim, skipinitialspace) | |
def _guess_delimiter(self, data, delimiters): | |
""" | |
The delimiter /should/ occur the same number of times on | |
each row. However, due to malformed data, it may not. We don't want | |
an all or nothing approach, so we allow for small variations in this | |
number. | |
1) build a table of the frequency of each character on every line. | |
2) build a table of frequencies of this frequency (meta-frequency?), | |
e.g. 'x occurred 5 times in 10 rows, 6 times in 1000 rows, | |
7 times in 2 rows' | |
3) use the mode of the meta-frequency to determine the /expected/ | |
frequency for that character | |
4) find out how often the character actually meets that goal | |
5) the character that best meets its goal is the delimiter | |
For performance reasons, the data is evaluated in chunks, so it can | |
try and evaluate the smallest portion of the data possible, evaluating | |
additional chunks as necessary. | |
""" | |
data = filter(None, data.split('\n')) | |
ascii = [chr(c) for c in range(127)] # 7-bit ASCII | |
# build frequency tables | |
chunkLength = min(10, len(data)) | |
iteration = 0 | |
charFrequency = {} | |
modes = {} | |
delims = {} | |
start, end = 0, min(chunkLength, len(data)) | |
while start < len(data): | |
iteration += 1 | |
for line in data[start:end]: | |
for char in ascii: | |
metaFrequency = charFrequency.get(char, {}) | |
# must count even if frequency is 0 | |
freq = line.count(char) | |
# value is the mode | |
metaFrequency[freq] = metaFrequency.get(freq, 0) + 1 | |
charFrequency[char] = metaFrequency | |
for char in charFrequency.keys(): | |
items = charFrequency[char].items() | |
if len(items) == 1 and items[0][0] == 0: | |
continue | |
# get the mode of the frequencies | |
if len(items) > 1: | |
modes[char] = reduce(lambda a, b: a[1] > b[1] and a or b, | |
items) | |
# adjust the mode - subtract the sum of all | |
# other frequencies | |
items.remove(modes[char]) | |
modes[char] = (modes[char][0], modes[char][1] | |
- reduce(lambda a, b: (0, a[1] + b[1]), | |
items)[1]) | |
else: | |
modes[char] = items[0] | |
# build a list of possible delimiters | |
modeList = modes.items() | |
total = float(chunkLength * iteration) | |
# (rows of consistent data) / (number of rows) = 100% | |
consistency = 1.0 | |
# minimum consistency threshold | |
threshold = 0.9 | |
while len(delims) == 0 and consistency >= threshold: | |
for k, v in modeList: | |
if v[0] > 0 and v[1] > 0: | |
if ((v[1]/total) >= consistency and | |
(delimiters is None or k in delimiters)): | |
delims[k] = v | |
consistency -= 0.01 | |
if len(delims) == 1: | |
delim = delims.keys()[0] | |
skipinitialspace = (data[0].count(delim) == | |
data[0].count("%c " % delim)) | |
return (delim, skipinitialspace) | |
# analyze another chunkLength lines | |
start = end | |
end += chunkLength | |
if not delims: | |
return ('', 0) | |
# if there's more than one, fall back to a 'preferred' list | |
if len(delims) > 1: | |
for d in self.preferred: | |
if d in delims.keys(): | |
skipinitialspace = (data[0].count(d) == | |
data[0].count("%c " % d)) | |
return (d, skipinitialspace) | |
# nothing else indicates a preference, pick the character that | |
# dominates(?) | |
items = [(v,k) for (k,v) in delims.items()] | |
items.sort() | |
delim = items[-1][1] | |
skipinitialspace = (data[0].count(delim) == | |
data[0].count("%c " % delim)) | |
return (delim, skipinitialspace) | |
def has_header(self, sample): | |
# Creates a dictionary of types of data in each column. If any | |
# column is of a single type (say, integers), *except* for the first | |
# row, then the first row is presumed to be labels. If the type | |
# can't be determined, it is assumed to be a string in which case | |
# the length of the string is the determining factor: if all of the | |
# rows except for the first are the same length, it's a header. | |
# Finally, a 'vote' is taken at the end for each column, adding or | |
# subtracting from the likelihood of the first row being a header. | |
rdr = reader(StringIO(sample), self.sniff(sample)) | |
header = rdr.next() # assume first row is header | |
columns = len(header) | |
columnTypes = {} | |
for i in range(columns): columnTypes[i] = None | |
checked = 0 | |
for row in rdr: | |
# arbitrary number of rows to check, to keep it sane | |
if checked > 20: | |
break | |
checked += 1 | |
if len(row) != columns: | |
continue # skip rows that have irregular number of columns | |
for col in columnTypes.keys(): | |
for thisType in [int, long, float, complex]: | |
try: | |
thisType(row[col]) | |
break | |
except (ValueError, OverflowError): | |
pass | |
else: | |
# fallback to length of string | |
thisType = len(row[col]) | |
# treat longs as ints | |
if thisType == long: | |
thisType = int | |
if thisType != columnTypes[col]: | |
if columnTypes[col] is None: # add new column type | |
columnTypes[col] = thisType | |
else: | |
# type is inconsistent, remove column from | |
# consideration | |
del columnTypes[col] | |
# finally, compare results against first row and "vote" | |
# on whether it's a header | |
hasHeader = 0 | |
for col, colType in columnTypes.items(): | |
if type(colType) == type(0): # it's a length | |
if len(header[col]) != colType: | |
hasHeader += 1 | |
else: | |
hasHeader -= 1 | |
else: # attempt typecast | |
try: | |
colType(header[col]) | |
except (ValueError, TypeError): | |
hasHeader += 1 | |
else: | |
hasHeader -= 1 | |
return hasHeader > 0 |