""" Offers tools to deal with csv (and xls, xlsx) files. """
import codecs
import openpyxl
import re
import sys
import tempfile
import xlrd # type:ignore[import-untyped]
from collections import namedtuple, OrderedDict
from csv import DictWriter, Sniffer, QUOTE_ALL
from csv import Error as CsvError
from csv import reader as csv_reader
from csv import writer as csv_writer
from datetime import datetime
from editdistance import eval as distance
from io import BytesIO, StringIO
from itertools import permutations
from onegov.core import errors
from onegov.core.cache import lru_cache
from ordered_set import OrderedSet
from unidecode import unidecode
from xlsxwriter.workbook import Workbook # type:ignore[import-untyped]
from onegov.core.utils import normalize_for_url
from typing import overload, Any, Generic, IO, TypeVar, TYPE_CHECKING
if TYPE_CHECKING:
from _typeshed import SupportsRichComparison
from collections.abc import (
Callable, Collection, Iterable, Iterator, Sequence)
from csv import Dialect
from openpyxl.worksheet.worksheet import Worksheet
from typing import Protocol, TypeAlias
_T_co = TypeVar('_T_co', covariant=True)
_SupportsRichComparisonT = TypeVar(
'_SupportsRichComparisonT',
bound=SupportsRichComparison
)
class _RowType(Protocol[_T_co]):
def __call__(self, rownumber: int, **kwargs: str) -> _T_co: ...
class DefaultRow(Protocol):
@property
def rownumber(self) -> int: ...
def __getattr__(self, name: str) -> str: ...
KeyFunc: TypeAlias = Callable[[_T], SupportsRichComparison]
DefaultCSVFile: TypeAlias = 'CSVFile[DefaultRow]'
[docs]
_RowT = TypeVar('_RowT')
[docs]
VALID_CSV_DELIMITERS = ',;\t'
[docs]
WHITESPACE = re.compile(r'\s+')
[docs]
INVALID_XLSX_TITLE = re.compile(r'[\\*?:/\[\]]')
[docs]
small_chars = 'fijlrt:,;.+i '
[docs]
class CSVFile(Generic[_RowT]):
""" Provides access to a csv file.
:param csvfile:
The csv file to be accessed. Must be an open file (not a path), opened
in binary mode. For example::
with open(path, 'rb') as f:
csv = CSVFile(f)
:param expected_headers:
The expected headers if known. Expected headers are headers which
*must* exist in the CSV file. There may be additional headers.
If the headers are slightly misspelled, a matching algorithm tries to
guess the correct header, without accidentally matching the wrong
headers.
See :func:`match_headers` for more information.
If the no expected_headers are passed, no checks are done, but the
headers are still available. Headers matching is useful if a user
provides the CSV and it might be wrong.
If it is impossible for misspellings to occurr, the expected headers
don't have to be specified.
:param dialect:
The CSV dialect to expect. By default, the dialect will be guessed
using Python's heuristic.
:param encoding:
The CSV encoding to expect. By default, the encoding will be guessed
and will either be UTF-8 or CP1252.
:param rename_duplicate_column_names:
It is possible to rename duplicate column names to deal with super
wacky files. If this option is set and a duplicate header is found,
a suffix is appended to the column name rather than throwing a
DuplicateColumnNamesError.
:param rowtype:
An alternative rowtype for the resulting rows. This should be a
callable that receives a `rownumber` key/value and all the other
keys/values found in the csv. The keys are normalized and are valid
Python identifiers usable as attribute names.
Defaults to a namedtuple created using the found headers.
Once the csv file is open, the records can be acceessed as follows::
with open(path, 'rb') as f:
csv = CSVFile(f)
for line in csv.lines:
csv.my_field # access the column with the 'my_field' header
"""
[docs]
rowtype: '_RowType[_RowT]'
@overload
def __init__(
self: 'DefaultCSVFile',
csvfile: IO[bytes],
expected_headers: 'Collection[str] | None ' = None,
dialect: 'type[Dialect] | Dialect | str | None' = None,
encoding: str | None = None,
rename_duplicate_column_names: bool = False,
rowtype: None = None
): ...
@overload
def __init__(
self: 'CSVFile[_RowT]',
csvfile: IO[bytes],
expected_headers: 'Collection[str] | None ' = None,
dialect: 'type[Dialect] | Dialect | str | None' = None,
encoding: str | None = None,
rename_duplicate_column_names: bool = False,
*,
rowtype: '_RowType[_RowT]'
): ...
def __init__(
self,
csvfile: IO[bytes],
expected_headers: 'Collection[str] | None ' = None,
dialect: 'type[Dialect] | Dialect | str | None' = None,
encoding: str | None = None,
rename_duplicate_column_names: bool = False,
rowtype: '_RowType[_RowT] | None' = None
):
# guess the encoding if not already provided
encoding = encoding or detect_encoding(csvfile)
if encoding is None:
raise errors.InvalidFormatError()
[docs]
self.csvfile = codecs.getreader(encoding)(csvfile)
# sniff the dialect if not already provided
try:
self.csvfile.seek(0)
self.dialect = dialect or sniff_dialect(self.csvfile.read(1024))
except (CsvError, errors.InvalidFormatError):
self.csvfile.seek(0)
self.dialect = sniff_dialect(self.csvfile.read())
# match the headers
self.csvfile.seek(0)
# if no expected headers expect, we just take what we can get
headers = parse_header(
self.csvfile.readline(),
self.dialect,
rename_duplicate_column_names
)
expected_headers = expected_headers or headers
# create an output type
assert 'rownumber' not in expected_headers, """
rownumber can't be used as a header
"""
self.rowtype = rowtype or namedtuple( # type:ignore # noqa: PYI024
'CSVFileRow', ['rownumber', *(
self.as_valid_identifier(k)
for k in self.headers.keys()
)]
)
@staticmethod
@lru_cache(maxsize=128)
[docs]
def as_valid_identifier(value: str) -> str:
result = normalize_header(value)
for invalid in '- .%/,;()':
result = result.replace(invalid, '_')
while result and result[0] in '_0123456789':
result = result[1:]
return result
[docs]
def __iter__(self) -> 'Iterator[_RowT]':
yield from self.lines
@property
[docs]
def lines(self) -> 'Iterator[_RowT]':
self.csvfile.seek(0)
encountered_empty_line = False
for ix, line in enumerate(csv_reader(self.csvfile, self.dialect)):
# raise an empty line error if we found one somewhere in the
# middle -> at the end they don't count
if not line:
encountered_empty_line = True
continue
if line and encountered_empty_line:
raise errors.EmptyLineInFileError()
# the first line is the header
if ix == 0:
continue
yield self.rowtype(
rownumber=ix + 1, # row numbers are for customers, not coders
**{
self.as_valid_identifier(header): line[column].strip()
for header, column in self.headers.items()
}
)
[docs]
def detect_encoding(csvfile: IO[bytes]) -> str:
""" Since encoding detection is hard to get right (and work correctly
every time), we limit ourselves here to UTF-8 or CP1252, whichever works
first. CP1252 is basically the csv format you get if you use windows and
excel and it is a superset of ISO-8859-1/LATIN1.
"""
csvfile.seek(0)
try:
for line in csvfile.readlines():
line.decode('utf-8')
return 'utf-8'
except UnicodeDecodeError:
return 'cp1252'
[docs]
def sniff_dialect(csv: str) -> type['Dialect']:
""" Takes the given csv string and returns the dialect or raises an error.
Works just like Python's built in sniffer, just that it is a bit more
conservative and doesn't just accept any kind of character as csv
delimiter.
"""
if not csv:
raise errors.EmptyFileError()
try:
dialect = Sniffer().sniff(csv, VALID_CSV_DELIMITERS)
except CsvError as exception:
# sometimes we can get away with an extra pass just over the first line
# (the header tends to contain fewer special cases)
if '\n' in csv:
return sniff_dialect(csv[:csv.find('\n')])
raise errors.InvalidFormatError() from exception
return dialect
[docs]
def convert_xlsx_to_csv(
xlsx: IO[bytes],
sheet_name: str | None = None
) -> BytesIO:
""" Takes an XLS file and returns a csv file using the given worksheet
name or the first worksheet found.
"""
xlsx.seek(0)
try:
excel = openpyxl.load_workbook(xlsx, data_only=True)
except Exception as exception:
raise OSError('Could not read XLSX file') from exception
sheet: Worksheet
if sheet_name:
try:
sheet = excel[sheet_name]
except KeyError as exception:
raise KeyError(
'Could not find the given sheet in this excel file!'
) from exception
else:
sheet = excel.worksheets[0]
# FIXME: We should probably do this check at runtime eventually since
# Workbook[name] might return a Worksheet, ReadOnlyWorksheet or a
# a WriteOnlyWorksheet. Workbook.worksheet[index] might additionaly return
# a Chartsheet.
if TYPE_CHECKING:
assert isinstance(sheet, Worksheet)
text_output = StringIO()
writecsv = csv_writer(text_output, quoting=QUOTE_ALL)
for row in range(1, sheet.max_row + 1):
values = []
for column in range(1, sheet.max_column + 1):
cell = sheet.cell(row, column)
if cell.value is None:
value = ''
elif cell.data_type == 's':
value = cell.value # type:ignore[assignment]
elif cell.data_type == 'n':
if (int_value := int(cell.value)) == cell.value: # type:ignore
value = str(int_value)
else:
value = str(cell.value)
elif cell.data_type == 'd':
value = cell.value.isoformat() # type:ignore[union-attr]
elif cell.data_type == 'b':
value = '1' if cell.value else '0'
else:
raise NotImplementedError
values.append(value)
if any(values):
writecsv.writerow(values)
text_output.seek(0)
# FIXME: We can use StringIOWrapper around a BytesIO, then we don't
# need to convert at the end!
output = BytesIO()
for line in text_output.readlines():
output.write(line.encode('utf-8'))
return output
[docs]
def convert_xls_to_csv(
xls: IO[bytes],
sheet_name: str | None = None
) -> BytesIO:
""" Takes an XLS file and returns a csv file using the given worksheet
name or the first worksheet found.
"""
xls.seek(0)
try:
excel = xlrd.open_workbook(file_contents=xls.read())
except Exception as exception:
raise OSError('Could not read XLS file') from exception
if sheet_name:
try:
sheet = excel.sheet_by_name(sheet_name)
except xlrd.XLRDError as exception:
raise KeyError(
'Could not find the given sheet in this excel file!'
) from exception
else:
sheet = excel.sheet_by_index(0)
text_output = StringIO()
writecsv = csv_writer(text_output, quoting=QUOTE_ALL)
for rownum in range(sheet.nrows):
values = []
for cell in sheet.row(rownum):
if cell.ctype == xlrd.XL_CELL_TEXT:
value = cell.value
elif cell.ctype in (xlrd.XL_CELL_EMPTY, xlrd.XL_CELL_BLANK):
value = ''
elif cell.ctype == xlrd.XL_CELL_NUMBER:
if cell.value.is_integer():
value = str(int(cell.value))
else:
value = str(cell.value)
elif cell.ctype == xlrd.XL_CELL_DATE:
value = xlrd.xldate_as_tuple(cell.value, excel.datemode)
value = datetime(*value).isoformat()
elif cell.ctype == xlrd.XL_CELL_BOOLEAN:
value = str(cell.value)
else:
raise NotImplementedError
values.append(value)
writecsv.writerow(values)
text_output.seek(0)
# FIXME: We can use StringIOWrapper around a BytesIO, then we don't
# need to convert at the end!
output = BytesIO()
for line in text_output.readlines():
output.write(line.encode('utf-8'))
return output
[docs]
def convert_excel_to_csv(
file: IO[bytes],
sheet_name: str | None = None
) -> BytesIO:
""" Takes an XLS/XLSX file and returns a csv file using the given worksheet
name or the first worksheet found.
"""
try:
return convert_xlsx_to_csv(file, sheet_name)
except OSError:
return convert_xls_to_csv(file, sheet_name)
[docs]
def character_width(char: str) -> float:
# those numbers have been acquired by chasing unicorns
# and fairies in the magic forest of Excel
#
# tweak them as needed, but know that there's no correct answer,
# as each excel version on each platform or os-version will render
# the fonts used at different widths
if char in small_chars:
return 0.75
elif char in large_chars:
return 1.2
else:
return 1
[docs]
def estimate_width(text: str) -> float:
if not text:
return 0
width = max(
sum(character_width(c) for c in line)
for line in text.splitlines()
)
return min(width, max_width)
@overload
[docs]
def get_keys_from_list_of_dicts(
rows: 'Iterable[dict[_SupportsRichComparisonT, Any]]',
key: None = None,
reverse: bool = False
) -> tuple['_SupportsRichComparisonT', ...]: ...
@overload
def get_keys_from_list_of_dicts(
rows: 'Iterable[dict[_T, Any]]',
key: 'KeyFunc[_T]',
reverse: bool = False
) -> tuple['_T', ...]: ...
def get_keys_from_list_of_dicts(
rows: 'Iterable[dict[Any, Any]]',
key: 'KeyFunc[Any] | None' = None,
reverse: bool = False
) -> tuple[Any, ...]:
""" Returns all keys of a list of dicts in an ordered tuple.
If the list of dicts is irregular, the keys found in later rows are
added at the end of the list.
Note that the order of keys is otherwise defined by the order of the keys
of the dictionaries. So if ordered dictionaries are used, the order is
defined. If regular dictionaries are used, the order is undefined.
Alternatively, a key and a reverse flag may be provided which will be
used to order the fields. If the list of fields is specified, the key and
the reverse flag is ignored.
"""
fields_set: OrderedSet[str] = OrderedSet()
for dictionary in rows:
fields_set.update(dictionary.keys())
if key:
fields = tuple(sorted(fields_set, key=key, reverse=reverse))
else:
fields = tuple(fields_set)
return fields
[docs]
def convert_list_of_dicts_to_csv(
rows: 'Iterable[dict[str, Any]]',
fields: 'Sequence[str] | None' = None,
key: 'KeyFunc[str] | None' = None,
reverse: bool = False
) -> str:
""" Takes a list of dictionaries and returns a csv.
If no fields are provided, all fields are included in the order of the keys
of the first dict. With regular dictionaries this is random. Use an ordered
dict or provide a list of fields to have a fixed order.
Alternatively, a key and a reverse flag may be provided which will be
used to order the fields. If the list of fields is specified, the key and
the reverse flag is ignored.
The function returns a string created in memory. Therefore this function
is limited to small-ish datasets.
"""
if not rows:
return ''
fields = fields or get_keys_from_list_of_dicts(rows, key, reverse)
output = StringIO()
writer = DictWriter(output, fieldnames=fields)
writer.writeheader()
for row in rows:
writer.writerow({field: row.get(field, '') for field in fields})
output.seek(0)
return output.read()
[docs]
def convert_list_of_dicts_to_xlsx(
rows: 'Iterable[dict[str, Any]]',
fields: 'Sequence[str] | None' = None,
key: 'KeyFunc[str] | None' = None,
reverse: bool = False
) -> bytes:
""" Takes a list of dictionaries and returns a xlsx.
This behaves the same way as :func:`convert_list_of_dicts_to_csv`.
"""
with tempfile.NamedTemporaryFile() as file:
workbook = Workbook(file.name, options={'constant_memory': True})
cellformat = workbook.add_format({'text_wrap': True})
worksheet = workbook.add_worksheet()
fields_ = fields or get_keys_from_list_of_dicts(rows, key, reverse)
# write the header
worksheet.write_row(0, 0, fields_, cellformat)
# keep track of the maximum character width
column_widths = [estimate_width(field) for field in fields_]
def values(row: dict[str, Any]) -> 'Iterator[str]':
for ix, field in enumerate(fields_):
value = row.get(field, '')
column_widths[ix] = max(
column_widths[ix],
estimate_width(str(value))
)
if isinstance(value, str):
value = value.replace('\r', '')
yield value
# write the rows
for r, row in enumerate(rows, start=1):
worksheet.write_row(r, 0, values(row), cellformat)
# set the column widths
for col, width in enumerate(column_widths):
worksheet.set_column(col, col, width)
workbook.close()
file.seek(0)
return file.read()
[docs]
def convert_list_of_list_of_dicts_to_xlsx(
row_list: 'Sequence[Iterable[dict[str, Any]]]',
titles_list: 'Sequence[str]',
key_list: 'Sequence[KeyFunc[str] | None] | None' = None,
reverse: bool = False
) -> bytes:
"""
Like to :func:`convert_list_of_dicts_to_xlsx`, but operates on a list
instead of in a single item.
"""
titles_list = normalize_sheet_titles(titles_list)
with tempfile.NamedTemporaryFile() as file:
workbook = Workbook(file.name, options={'constant_memory': True})
if key_list is None:
key_list = [None] * len(titles_list)
for rows, title, key in zip(row_list, titles_list, key_list):
cellformat = workbook.add_format({'text_wrap': True})
worksheet = workbook.add_worksheet(title)
fields = get_keys_from_list_of_dicts(rows, key, reverse)
# write the header
worksheet.write_row(0, 0, fields, cellformat)
# keep track of the maximum character width
column_widths = [estimate_width(field) for field in fields]
def values(
row: dict[str, Any],
fields: tuple[str, ...] = fields,
column_widths: list[float] = column_widths
) -> 'Iterator[str]':
for ix, field in enumerate(fields):
value = row.get(field, '')
column_widths[ix] = max(
column_widths[ix],
estimate_width(str(value))
)
if isinstance(value, str):
value = value.replace('\r', '')
yield value
# write the list_of_rows
for r, row in enumerate(rows, start=1):
worksheet.write_row(r, 0, values(row), cellformat)
# set the column widths
for col, width in enumerate(column_widths):
worksheet.set_column(col, col, width)
workbook.close()
file.seek(0)
return file.read()
[docs]
def normalize_sheet_titles(titles: 'Sequence[str]') -> list[str]:
"""
Ensuring the title of the xlsx is valid.
"""
def valid_characters_or_raise(title: str) -> None:
m = INVALID_XLSX_TITLE.search(title)
if m:
msg = f'Invalid character {m.group(0)} found in xlsx sheet title'
raise ValueError(msg)
titles = [normalize_for_url(title) for title in titles]
duplicate_idxs = list_duplicates_index(titles)
# change name of the duplicate sheet names
for index in duplicate_idxs:
current_title = titles[index]
valid_characters_or_raise(current_title)
titles[index] = avoid_duplicate_name(titles, current_title)
# change name of the duplicate sheet names
for index, item in enumerate(titles):
if len(item) > 31:
while len(titles[index]) > 31:
titles[index] = remove_first_word(titles[index])
return titles
[docs]
def avoid_duplicate_name(titles: 'Sequence[str]', title: str) -> str:
"""
Naive check to see whether name already exists.
If name does exist suggest a name using an incrementer
Duplicates are case-insensitive
"""
# Check for an absolute match in which case we need to find an alternative
match = [n for n in titles if n.lower() == title.lower()]
if match:
titles = ','.join(titles)
sheet_title_regex = re.compile(
f'(?P<title>{re.escape(title)})(?P<count>\\d*),?', re.I
)
matches = sheet_title_regex.findall(titles)
if matches:
# use name, but append with the next highest integer
counts = [int(idx) for (t, idx) in matches if idx.isdigit()]
highest = 0
if counts:
highest = max(counts)
title = f'{title}_{highest + 1}'
return title
[docs]
def remove_first_word(title: str) -> str:
"""
Removes all chars from beginning up until and including the first "-".
"""
return re.sub(r'^.*?-', '', title)
[docs]
def has_duplicates(a_list: 'Sequence[Any]') -> bool:
return len(a_list) != len(set(a_list))
[docs]
def list_duplicates_index(a: 'Sequence[Any]') -> list[int]:
"""
Returns a list of indexes of duplicates in a list.
for example::
a = [1, 2, 3, 2, 1, 5, 6, 5, 5, 5]
list_duplicates_index(a) == [3, 4, 7, 8, 9]
"""
return [idx for idx, item in enumerate(a) if item in a[:idx]]