import mimetypes
import shutil
import os
from collections import OrderedDict
from enum import Enum
from onegov.core.csv import convert_list_of_dicts_to_csv
from onegov.core.csv import convert_list_of_dicts_to_xlsx
from onegov.core.csv import convert_excel_to_csv
from onegov.core.csv import CSVFile
from onegov.core.custom import json
from onegov.core.utils import Bunch, is_subpath
from onegov.directory.errors import MissingColumnError, MissingFileError
from onegov.directory.models import Directory, DirectoryEntry
from onegov.directory.types import DirectoryConfiguration
from onegov.file import File
from onegov.form import as_internal_id
from pathlib import Path
from sqlalchemy.orm import object_session
from tempfile import TemporaryDirectory, NamedTemporaryFile
from typing import Any, Literal, TYPE_CHECKING
if TYPE_CHECKING:
from _typeshed import StrPath, SupportsItems, SupportsRead
from collections.abc import Callable, Iterable, Iterator, Sequence
from onegov.core.types import JSON_ro
from onegov.form.parser.core import FileinputField
from onegov.form.parser.core import MultipleFileinputField
from onegov.form.parser.core import ParsedField
from sqlalchemy.orm import Query, Session
from typing import Protocol, Self, TypeAlias
[docs]
UnknownFieldType: TypeAlias = 'Literal[_Sentinel.UNKNOWN_FIELD]'
DirectoryEntryFilter: TypeAlias = Callable[
[Iterable[DirectoryEntry]],
Iterable[DirectoryEntry]
]
FieldValueTransform: TypeAlias = Callable[
[str, Any],
tuple[str, Any | None]
]
class SupportsReadAndSeek(SupportsRead[bytes], Protocol):
def seek(self, offset: int, /) -> object: ...
[docs]
class _Sentinel(Enum):
[docs]
UNKNOWN_FIELD = object()
[docs]
UNKNOWN_FIELD = _Sentinel.UNKNOWN_FIELD
[docs]
class DirectoryFileNotFound(FileNotFoundError):
def __init__(self, file_id: str, entry_name: str, filename: str) -> None:
[docs]
self.entry_name = entry_name
[docs]
self.filename = filename
[docs]
class FieldParser:
""" Parses records read by the directory archive reader. """
def __init__(self, directory: Directory, archive_path: Path) -> None:
[docs]
self.fields_by_human_id = {f.human_id: f for f in directory.fields}
[docs]
self.fields_by_id = {f.id: f for f in directory.fields}
[docs]
self.archive_path = archive_path
[docs]
def get_field(self, key: str) -> 'ParsedField | None':
"""
CSV Files header parsing is inconsistent with the the internal id (
field.id) of the field. The headers are lovercased, so that the first
will not yield the field, the second will also not success because
characters like ( are not replaced by underscores.
"""
return (
self.fields_by_human_id.get(key)
or self.fields_by_id.get(key)
or self.fields_by_id.get(as_internal_id(key))
)
[docs]
def parse_generic(
self,
key: str,
value: str,
field: 'ParsedField'
) -> object:
return field.parse(value)
[docs]
def parse_item(
self,
key: str,
value: str
) -> 'tuple[str, Any | None] | UnknownFieldType':
field = self.get_field(key)
if not field:
return UNKNOWN_FIELD
parser = getattr(self, 'parse_' + field.type, self.parse_generic)
try:
result = parser(key, value, field)
except ValueError:
result = None
return as_internal_id(key), result
[docs]
def parse(
self,
record: 'SupportsItems[str, str]'
) -> dict[str, Any | None]:
return dict(
parsed
for k, v in record.items()
if (parsed := self.parse_item(k, v)) is not UNKNOWN_FIELD
)
[docs]
class DirectoryArchiveReader:
""" Reading part of :class:`DirectoryArchive`. """
[docs]
def read(
self,
target: Directory | None = None,
skip_existing: bool = True,
limit: int = 0,
apply_metadata: bool = True,
after_import: 'Callable[[DirectoryEntry], Any] | None' = None
) -> Directory:
""" Reads the archive resulting in a dictionary and entries.
:param target:
Uses the given directory as a target for the read. Otherwise,
a new directory is created in memory (default).
:param skip_existing:
Excludes already existing entries from being added to the
directory. Only applies if target is not None.
:param limit:
Limits the number of records which are imported. If the limit
is reached, the read process silently ignores all extra items.
:param apply_metadata:
True if the metadata found in the archive should be applied
to the directory.
:param after_import:
Called with the newly added entry, right after it has been added.
"""
meta_data = self.read_metadata()
directory = target or Directory.get_polymorphic_class(
meta_data.get('type', 'generic'),
Directory
)()
if apply_metadata:
directory = self.apply_metadata(directory, meta_data)
if skip_existing and target:
existing = {
e.name for e in object_session(target).query(DirectoryEntry)
.filter_by(directory_id=target.id)
.with_entities(DirectoryEntry.name)
}
else:
existing = set()
parser = FieldParser(directory, self.path)
amount = 0
for record in self.read_data():
if limit and amount >= limit:
break
values = parser.parse(record)
if skip_existing:
name = directory.configuration.extract_name(values)
if name in existing:
continue
existing.add(name)
try:
entry = directory.add(values)
except KeyError as exception:
raise MissingColumnError(
column=exception.args[0]
) from exception
names = (
('latitude', 'longitude'),
('Latitude', 'Longitude')
)
for lat, lon in names:
if record.get(lat) and record.get(lon):
entry.content['coordinates'] = {
'lon': record[lon],
'lat': record[lat],
'zoom': None
}
amount += 1
if after_import is not None:
after_import(entry)
return directory
[docs]
def read_data(self) -> 'Sequence[dict[str, Any]]':
""" Returns the entries as a sequence of dictionaries. """
if (self.path / 'data.json').exists():
return self.read_data_from_json()
if (self.path / 'data.csv').exists():
return self.read_data_from_csv()
if (self.path / 'data.xlsx').exists():
return self.read_data_from_xlsx()
raise NotImplementedError
[docs]
def read_data_from_json(self) -> list[dict[str, Any]]:
with (self.path / 'data.json').open('r') as f:
return json.loads(f.read())
[docs]
def read_data_from_csv(self) -> tuple[dict[str, Any], ...]:
with (self.path / 'data.csv').open('rb') as f:
rows = tuple(CSVFile(f, rowtype=dict).lines)
return tuple(row for row in rows if any(row.values()))
[docs]
def read_data_from_xlsx(self) -> tuple[dict[str, Any], ...]:
with (self.path / 'data.xlsx').open('rb') as f:
return tuple(CSVFile(
convert_excel_to_csv(f), rowtype=dict, dialect='excel'
).lines)
[docs]
class DirectoryArchiveWriter:
""" Writing part of :class:`DirectoryArchive`. """
[docs]
def write(
self,
directory: Directory,
*args: Any,
entry_filter: 'DirectoryEntryFilter | None' = None,
query: 'Query[DirectoryEntry] | None' = None,
**kwargs: Any
) -> None:
""" Writes the given directory. """
assert self.format in ('xlsx', 'csv', 'json')
self.write_directory_metadata(directory)
self.write_directory_entries(directory, entry_filter, query)
[docs]
def write_directory_entries(
self,
directory: Directory,
entry_filter: 'DirectoryEntryFilter | None' = None,
query: 'Query[DirectoryEntry] | None' = None
) -> None:
""" Writes the directory entries. Allows filtering with custom
entry_filter function as well as passing a query object """
fields = directory.fields
paths: dict[str, str] = {}
fid_to_entry = {}
def file_path(
entry: DirectoryEntry,
field: 'ParsedField',
value: dict[str, Any],
suffix: str = ''
) -> str:
return '{folder}/{name}{suffix}{ext}'.format(
folder=field.id,
name=entry.name,
suffix=suffix,
ext=mimetypes.guess_extension(value['mimetype']) or '')
def as_tuples(
entry: DirectoryEntry
) -> 'Iterator[tuple[str, Any | None]]':
for field in fields:
value = entry.values.get(field.id)
if field.type == 'fileinput':
if value:
file_id = value['data'].lstrip('@')
value = paths[file_id] = file_path(entry, field, value)
fid_to_entry[file_id] = entry.name
else:
value = None
elif field.type == 'multiplefileinput':
if value:
for idx, val in enumerate(value):
file_id = val['data'].lstrip('@')
value[idx] = paths[file_id] = file_path(
entry,
field,
val,
f'_{idx + 1}'
)
fid_to_entry[file_id] = entry.name
# turn it into a scalar value
value = os.pathsep.join(value)
else:
value = None
yield self.transform(field.human_id, value)
def as_dict(entry: DirectoryEntry) -> dict[str, Any | None]:
data = OrderedDict(as_tuples(entry))
coordinates = entry.content.get('coordinates', {})
if isinstance(coordinates, dict):
data['Latitude'] = coordinates.get('lat')
data['Longitude'] = coordinates.get('lon')
else:
data['Latitude'] = coordinates.lat
data['Longitude'] = coordinates.lon
return data
entries: Iterable[DirectoryEntry]
entries = query.all() if query else directory.entries
if entry_filter:
entries = entry_filter(entries)
data = tuple(as_dict(e) for e in entries)
write = getattr(self, f'write_{self.format}')
write(self.path / f'data.{self.format}', data)
self.write_paths(object_session(directory), paths, fid_to_entry)
[docs]
def write_paths(
self,
session: 'Session',
paths: dict[str, str],
fid_to_entry: dict[str, str] | None = None
) -> None:
""" Writes the given files to the archive path.
:param session:
The database session in use.
:param paths:
A dictionary with each key being a file id and each value
being a path where this file id should be written to.
:param fid_to_entry:
A dictionary with the mapping of the file id to the entry name
"""
files: Iterable[File]
if paths:
files = session.query(File).filter(File.id.in_(paths))
else:
files = ()
# keep the temp files around so they don't get GC'd prematurely
tempfiles = []
try:
for f in files:
relfolder, name = paths[f.id].split('/', 1)
folder = self.path / relfolder
if not folder.exists():
folder.mkdir()
# support both local files and others (memory/remote)
try:
if hasattr(f.reference.file, '_file_path'):
src = os.path.abspath(f.reference.file._file_path)
else:
tmp = NamedTemporaryFile() # noqa: SIM115
tmp.write(f.reference.file.read())
tempfiles.append(tmp)
src = tmp.name
except OSError as exception:
if fid_to_entry is None:
entry_name = 'unknown'
else:
entry_name = fid_to_entry[f.id]
raise DirectoryFileNotFound(
file_id=f.id,
entry_name=entry_name,
filename=name
) from exception
dst = str(folder / name)
try:
os.link(src, dst) # prefer links if possible (faster)
except OSError:
shutil.copyfile(src, dst)
finally:
for tempfile in tempfiles:
tempfile.close()
[docs]
def write_json(self, path: Path, data: 'JSON_ro') -> None:
with open(str(path), 'w') as f:
json.dump(data, f, sort_keys=True, indent=2)
[docs]
def write_xlsx(self, path: Path, data: 'Iterable[dict[str, Any]]') -> None:
with open(str(path), 'wb') as f:
f.write(convert_list_of_dicts_to_xlsx(data))
[docs]
def write_csv(self, path: Path, data: 'Iterable[dict[str, Any]]') -> None:
with open(str(path), 'w') as f:
f.write(convert_list_of_dicts_to_csv(data))
[docs]
class DirectoryArchive(DirectoryArchiveReader, DirectoryArchiveWriter):
""" Offers the ability to read/write a directory and its entries to a
folder.
Usage::
archive = DirectoryArchive('/tmp/directory')
archive.write()
archive = DirectoryArchive('/tmp/directory')
archive.read()
The archive content is as follows:
- metadata.json (contains the directory data)
- data.json/data.csv/data.xlsx (contains the directory entries)
- ./<field_id>/<entry_id>.<ext> (files referenced by the directory entries)
The directory entries are stored as json, csv or xlsx. Json is preferred.
"""
def __init__(
self,
path: 'StrPath',
format: Literal['json', 'csv', 'xlsx'] = 'json',
transform: 'FieldValueTransform | None ' = None
):
""" Initialise the archive at the given path (must exist).
:param path:
The target path of this archive.
:param format:
The format of the entries (json, csv or xlsx)
:param transform:
A transform function called with key and value for each entry
that is about to be written when creating an archive. Use this
to format values (for example datetime to string for json).
Note that transformed fields are read by onegov.form. So if the
transformed values cannot be parsed again by onegov.form, you
cannot import the resulting archive.
"""
[docs]
class DirectoryZipArchive:
""" Offers the same interface as the DirectoryArchive, additionally
zipping the folder on write and extracting the zip on read.
"""
def __init__(
self,
path: 'StrPath',
*args: Any,
**kwargs: Any
):
[docs]
self.temp = TemporaryDirectory()
[docs]
self.archive = DirectoryArchive(self.temp.name, *args, **kwargs)
@classmethod
[docs]
def from_buffer(cls, buffer: 'SupportsReadAndSeek') -> 'Self':
""" Creates a zip archive instance from a file object in memory. """
f = NamedTemporaryFile() # noqa: SIM115
buffer.seek(0)
while f.write(buffer.read(1024 * 1024)):
pass
f.flush()
obj = cls(f.name)
# keep the tempfile around undtil the zip archive itself is GC'd
obj.file = f # type:ignore[attr-defined]
return obj
[docs]
def write(self, directory: Directory, *args: Any, **kwargs: Any) -> None:
self.archive.write(directory, *args, **kwargs)
self.compress()
[docs]
def read(self, *args: Any, **kwargs: Any) -> Directory:
self.extract()
return self.archive.read(*args, **kwargs)
[docs]
def compress(self) -> None:
# make_archive expects a path without extension
output_file = str(self.path).removesuffix('.' + self.format)
shutil.make_archive(output_file, self.format, str(self.archive.path))