import inspect
from email_validator import validate_email
from enum import Enum
from onegov.core.cache import instance_lru_cache
from onegov.core.cache import lru_cache
from onegov.core.crypto import random_token
from onegov.core.orm import Base, observes
from onegov.core.orm.mixins import ContentMixin
from onegov.core.orm.mixins import TimestampMixin
from onegov.core.orm.types import UUID
from onegov.core.utils import normalize_for_url
from onegov.directory.errors import ValidationError, DuplicateEntryError
from onegov.directory.migration import DirectoryMigration
from onegov.directory.types import DirectoryConfigurationStorage
from onegov.file import File, MultiAssociatedFiles
from onegov.file.utils import as_fileintent
from onegov.form import flatten_fieldsets, parse_formcode, parse_form
from onegov.search import SearchableContent
from sedate import to_timezone
from sqlalchemy import Boolean, Column
from sqlalchemy import func, exists, and_
from sqlalchemy import Integer
from sqlalchemy import Text
from sqlalchemy.orm import object_session
from sqlalchemy.orm import relationship
from sqlalchemy.orm import validates
from sqlalchemy.orm.attributes import InstrumentedAttribute
from sqlalchemy_utils import aggregated
from uuid import uuid4
from wtforms import FieldList
from typing import Any, Literal, TYPE_CHECKING
if TYPE_CHECKING:
import uuid
from builtins import type as _type # type is shadowed in model
from collections.abc import Mapping, Sequence
from onegov.form import Form
from onegov.form.parser.core import (
BasicParsedField, FileParsedField, ParsedField)
from sqlalchemy.sql import ColumnElement
from typing import type_check_only, TypeAlias
from .directory_entry import DirectoryEntry
from ..types import DirectoryConfiguration
[docs]
InheritType: TypeAlias = 'Literal[_Sentinel.INHERIT]'
@type_check_only
class DirectoryEntryForm(Form):
# original form code
_source: str
@property
def mixed_data(self) -> dict[str, Any]: ...
def populate_obj( # type:ignore[override]
self,
obj: DirectoryEntry,
directory_update: bool = True
) -> None: ...
def process_obj(
self,
obj: DirectoryEntry # type:ignore[override]
) -> None: ...
[docs]
INHERIT = _Sentinel.INHERIT
[docs]
class DirectoryFile(File):
[docs]
__mapper_args__ = {'polymorphic_identity': 'directory'}
if TYPE_CHECKING:
# NOTE: this should always be exactly one entry, since we use
# a one-to-many relationship on DirectoryEntry. Technically
# it's possible to create a DirectoryFile, that isn't linked
# to any directory entry, but generally this shouldn't happen
[docs]
linked_directory_entries: relationship[list[DirectoryEntry]]
@property
[docs]
def directory_entry(self) -> 'DirectoryEntry | None':
# we gracefully handle if there are no linked entries, even though
# there should always be exactly one
entries = self.linked_directory_entries
return entries[0] if entries else None
@property
[docs]
def access(self) -> str:
# we don't want these files to show up in search engines
return 'secret' if self.published else 'private'
[docs]
class Directory(Base, ContentMixin, TimestampMixin,
SearchableContent, MultiAssociatedFiles):
""" A directory of entries that share a common data structure. For example,
a directory of people, of emergency services or playgrounds.
"""
[docs]
__tablename__ = 'directories'
[docs]
es_properties = {
'title': {'type': 'localized'},
'lead': {'type': 'localized'}
}
@property
[docs]
def es_public(self) -> bool:
return False # to be overridden downstream
#: An interal id for references (not public)
[docs]
id: 'Column[uuid.UUID]' = Column(
UUID, # type:ignore[arg-type]
primary_key=True,
default=uuid4
)
#: The public, unique name of the directory
[docs]
name: 'Column[str]' = Column(Text, nullable=False, unique=True)
#: The title of the directory
[docs]
title: 'Column[str]' = Column(Text, nullable=False)
#: Describes the directory briefly
[docs]
lead: 'Column[str | None]' = Column(Text, nullable=True)
#: The normalized title for sorting
[docs]
order: 'Column[str]' = Column(Text, nullable=False, index=True)
#: The polymorphic type of the directory
[docs]
type: 'Column[str]' = Column(
Text,
nullable=False,
default=lambda: 'generic'
)
#: The data structure of the contained entries
[docs]
structure: 'Column[str]' = Column(Text, nullable=False)
#: The configuration of the contained entries
[docs]
configuration: 'Column[DirectoryConfiguration]' = Column(
DirectoryConfigurationStorage,
nullable=False
)
#: The number of entries in the directory
@aggregated('entries', Column(Integer, nullable=False, default=0))
[docs]
def count(self) -> 'ColumnElement[int]':
return func.count('1')
[docs]
__mapper_args__ = {
'polymorphic_on': type,
'polymorphic_identity': 'generic'
}
[docs]
entries: 'relationship[list[DirectoryEntry]]' = relationship(
'DirectoryEntry',
order_by='DirectoryEntry.order',
back_populates='directory'
)
@property
[docs]
def entry_cls_name(self) -> str:
return 'DirectoryEntry'
@property
[docs]
def entry_cls(self) -> '_type[DirectoryEntry]':
return self.__class__._decl_class_registry[ # type:ignore
self.entry_cls_name
]
[docs]
def add(
self,
values: dict[str, Any],
type: 'str | InheritType' = INHERIT
) -> 'DirectoryEntry':
start = values.pop('publication_start', None)
end = values.pop('publication_end', None)
# Not converting to UTC here led to returning non-UTC datetimes
# from UTCDateTimeField, not triggering `def process_result_value`
if start:
start = to_timezone(start, 'UTC')
if end:
end = to_timezone(end, 'UTC')
entry = self.entry_cls(
directory=self,
type=self.type if type is INHERIT else type,
meta={},
publication_start=start,
publication_end=end,
)
return self.update(entry, values, set_name=True)
[docs]
def update(
self,
entry: 'DirectoryEntry',
values: 'Mapping[str, Any]',
set_name: bool = False,
force_update: bool = False
) -> 'DirectoryEntry':
session = object_session(self)
# replace all existing basic fields
updated = {f.id: values[f.id] for f in self.basic_fields}
# treat file fields differently
known_file_ids = {
f.id if idx is None else f'{f.id}:{idx}'
for f in self.file_fields
# add an id for each file in a multiple upload field
for idx in (
range(len(values[f.id]))
if hasattr(values[f.id], '__len__')
else [None]
)
}
if self.file_fields:
def get_value_field_from_note(file_id: str) -> Any:
id, __, idx = file_id.rpartition(':')
if idx is None or not idx.isdigit():
return values[file_id]
return values[id][int(idx)]
# files which are not given or whose value is {} are removed
# (this is in line with onegov.form's file upload field+widget)
for f in entry.files:
# this indicates that the file has been renamed
if f.note is None or f.note not in known_file_ids:
continue
value_field = get_value_field_from_note(f.note)
if isinstance(value_field, dict):
continue
delete = (
value_field is None
or value_field.data == {}
or value_field.data is not None
)
if delete:
session.delete(f)
for field in self.file_fields:
field_values = values[field.id]
if not field_values:
updated[field.id] = field_values
continue
# migrate files during an entry migration
if isinstance(field_values, dict):
updated[field.id] = field_values
file_id = field_values['data'].lstrip('@')
with session.no_autoflush:
f = session.query(File).filter_by(id=file_id).first()
if f and f.type != 'directory':
new = DirectoryFile( # type:ignore[misc]
id=random_token(),
name=f.name,
note=f.note,
reference=f.reference
)
entry.files.append(new)
updated[field.id].update({'data': f'@{new.id}'})
continue
elif isinstance(field_values, list):
updated[field.id] = field_values
for idx, field_value in enumerate(field_values):
file_id = field_value['data'].lstrip('@')
with session.no_autoflush:
f = session.query(File).filter_by(
id=file_id
).first()
if f and f.type != 'directory':
new = DirectoryFile( # type:ignore[misc]
id=random_token(),
name=f.name,
note=f.note,
reference=f.reference
)
entry.files.append(new)
updated[field.id][idx].update(
{'data': f'@{new.id}'}
)
continue
elif field.type == 'fileinput':
# keep files if selected in the dialog
if getattr(field_values, 'action', None) == 'keep':
original = (entry.values or {}).get(field.id, {})
updated[field.id] = original
continue
# delete files if selected in the dialog
if getattr(field_values, 'action', None) == 'delete':
updated[field.id] = {}
continue
# if there was no file supplied, we can't add it
if not getattr(field_values, 'file', None):
updated[field.id] = {}
continue
# create a new file
new_file = DirectoryFile( # type:ignore[misc]
id=random_token(),
name=field_values.filename,
note=field.id,
reference=as_fileintent(
content=field_values.file,
filename=field_values.filename
)
)
entry.files.append(new_file)
# keep a reference to the file in the values
updated[field.id] = {
'data': '@' + new_file.id,
'filename': field_values.filename,
'mimetype': new_file.reference.file.content_type,
'size': new_file.reference.file.content_length
}
continue
# FIXME: there's quite a bit of copy pasta between the
# filefield and multiplefilefield case, we should
# try to refactor this so we can handle both more
# easily
new_idx = 0
updated[field.id] = []
for old_idx, subfield_values in enumerate(field_values):
old_values = (entry.values or {}).get(field.id) or []
# keep files if selected in the dialog
if getattr(subfield_values, 'action', None) == 'keep':
if len(old_values) <= old_idx:
# it doesn't exist so we can't keep it
continue
original = old_values[old_idx]
updated[field.id].append(original)
# update the file.note so it points to the correct
# index in the list if necessary
file_id = original['data'].lstrip('@')
for file in entry.files:
if file.id == file_id:
new_key = f'{field.id}:{new_idx}'
if file.note != new_key:
file.note = new_key
break
new_idx += 1
continue
# delete files if selected in the dialog
if getattr(subfield_values, 'action', None) == 'delete':
continue
# if there was no file supplied, we can't add it
if not getattr(subfield_values, 'file', None):
continue
# create a new file
new_file = DirectoryFile( # type:ignore[misc]
id=random_token(),
name=subfield_values.filename,
note=f'{field.id}:{new_idx}',
reference=as_fileintent(
content=subfield_values.file,
filename=subfield_values.filename
)
)
entry.files.append(new_file)
# keep a reference to the file in the values
updated[field.id].append({
'data': '@' + new_file.id,
'filename': subfield_values.filename,
'mimetype': new_file.reference.file.content_type,
'size': new_file.reference.file.content_length
})
new_idx += 1
# update the values
if force_update or entry.values != updated:
entry.values = updated
# mark the values as dirty (required because values is only part
# of the actual content dictionary)
entry.content.changed() # type:ignore[attr-defined]
# update the metadata
for attr in ('title', 'lead', 'order', 'keywords'):
new = getattr(self.configuration, f'extract_{attr}')(values)
if new != getattr(entry, attr):
setattr(entry, attr, new)
# update the title
if set_name:
name = self.configuration.extract_name(values)
if session:
with session.no_autoflush:
if self.entry_with_name_exists(name):
# FIXME: I don't think this is necessary, the expunge
# should already remove the relationship
entry.directory = None # type:ignore[assignment]
session.expunge(entry)
raise DuplicateEntryError(name)
entry.name = name
# validate the values
form = self.form_obj
form.process(data=entry.values)
if not form.validate():
# if the validation error is captured, the entry is added
# to the directory, unless we expunge it
entry.directory = None # type:ignore[assignment]
session and session.expunge(entry)
raise ValidationError(entry, form.errors)
if session and not session._flushing:
session.flush()
return entry
@observes('title')
[docs]
def title_observer(self, title: str) -> None:
self.order = normalize_for_url(title)
@observes('structure', 'configuration')
[docs]
def structure_configuration_observer(
self,
structure: str,
configuration: 'DirectoryConfiguration'
) -> None:
self.migration(structure, configuration).execute()
[docs]
def entry_with_name_exists(self, name: str) -> bool:
return object_session(self).query(exists().where(and_(
self.entry_cls.name == name,
self.entry_cls.directory_id == self.id
))).scalar()
[docs]
def migration(
self,
new_structure: str,
new_configuration: 'DirectoryConfiguration'
) -> DirectoryMigration:
return DirectoryMigration(
directory=self,
new_structure=new_structure,
new_configuration=new_configuration
)
@property
[docs]
def fields(self) -> 'Sequence[ParsedField]':
return self.fields_from_structure(self.structure)
@staticmethod
@lru_cache(maxsize=1)
[docs]
def fields_from_structure(structure: str) -> 'Sequence[ParsedField]':
return tuple(flatten_fieldsets(parse_formcode(structure)))
@property
[docs]
def basic_fields(self) -> 'Sequence[BasicParsedField]':
return tuple(
f for f in self.fields
if f.type != 'fileinput' and f.type != 'multiplefileinput'
)
@property
[docs]
def file_fields(self) -> 'Sequence[FileParsedField]':
return tuple(
f for f in self.fields
if f.type == 'fileinput' or f.type == 'multiplefileinput'
)
[docs]
def field_by_id(self, id: str) -> 'ParsedField | None':
query = (f for f in self.fields if f.human_id == id or f.id == id)
return next(query, None)
@property
@property
@instance_lru_cache(maxsize=1)
@instance_lru_cache(maxsize=1)
[docs]
class EntryRecipient(Base, TimestampMixin):
""" Represents a single recipient.
"""
[docs]
__tablename__ = 'entry_recipients'
#: the id of the recipient, used in the url
[docs]
id: 'Column[uuid.UUID]' = Column(
UUID, # type:ignore[arg-type]
primary_key=True,
default=uuid4
)
#: the email address of the recipient
[docs]
address: 'Column[str]' = Column(Text, nullable=False)
@validates('address')
[docs]
def validate_address(self, key: str, address: str) -> str:
assert validate_email(address)
return address
#: this token is used for confirm and unsubscribe
[docs]
token: 'Column[str]' = Column(Text, nullable=False, default=random_token)
#: when recipients are added, they are unconfirmed. At this point they get
#: one e-mail with a confirmation link. If they ignore said e-mail they
#: should not get another one.
[docs]
confirmed: 'Column[bool]' = Column(Boolean, nullable=False, default=False)
@property
[docs]
def subscription(self) -> 'EntrySubscription':
return EntrySubscription(self, self.token)
[docs]
directory_id: 'Column[uuid.UUID]' = Column(
UUID, # type:ignore[arg-type]
nullable=False
)
[docs]
class EntrySubscription:
""" Adds subscription management to a recipient. """
def __init__(self, recipient: EntryRecipient, token: str):
[docs]
self.recipient = recipient
[docs]
self.token = token
@property
[docs]
def recipient_id(self) -> 'uuid.UUID':
# even though this seems redundant, we need this property
# for morepath, so it can match it to the path variable
return self.recipient.id
[docs]
def confirm(self) -> bool:
if self.recipient.token != self.token:
return False
self.recipient.confirmed = True
return True
[docs]
def unsubscribe(self) -> bool:
if self.recipient.token != self.token:
return False
session = object_session(self.recipient)
session.delete(self.recipient)
session.flush()
return True