import inspect

from email_validator import validate_email
from enum import Enum
from onegov.core.cache import instance_lru_cache
from onegov.core.cache import lru_cache
from onegov.core.crypto import random_token
from onegov.core.orm import Base, observes
from onegov.core.orm.mixins import ContentMixin
from onegov.core.orm.mixins import TimestampMixin
from onegov.core.orm.types import UUID
from onegov.core.utils import normalize_for_url
from import ValidationError, DuplicateEntryError
from import DirectoryMigration
from import DirectoryConfigurationStorage
from onegov.file import File, MultiAssociatedFiles
from onegov.file.utils import as_fileintent
from onegov.form import flatten_fieldsets, parse_formcode, parse_form
from import SearchableContent
from sedate import to_timezone
from sqlalchemy import Boolean, Column
from sqlalchemy import func, exists, and_
from sqlalchemy import Integer
from sqlalchemy import Text
from sqlalchemy.orm import object_session
from sqlalchemy.orm import relationship
from sqlalchemy.orm import validates
from sqlalchemy.orm.attributes import InstrumentedAttribute
from sqlalchemy_utils import aggregated
from uuid import uuid4
from wtforms import FieldList

from typing import Any, Literal, TYPE_CHECKING
    import uuid
    from builtins import type as _type  # type is shadowed in model
    from import Mapping, Sequence
    from onegov.form import Form
    from onegov.form.parser.core import (
        BasicParsedField, FileParsedField, ParsedField)
    from sqlalchemy.sql import ColumnElement
    from typing import type_check_only, TypeAlias
    from .directory_entry import DirectoryEntry
    from ..types import DirectoryConfiguration

InheritType: TypeAlias = 'Literal[_Sentinel.INHERIT]'
@type_check_only class DirectoryEntryForm(Form): # original form code _source: str @property def mixed_data(self) -> dict[str, Any]: ... def populate_obj( # type:ignore[override] self, obj: DirectoryEntry, directory_update: bool = True ) -> None: ... def process_obj( self, obj: DirectoryEntry # type:ignore[override] ) -> None: ...
class _Sentinel(Enum):
INHERIT = object()
INHERIT = _Sentinel.INHERIT
class DirectoryFile(File):
__mapper_args__ = {'polymorphic_identity': 'directory'}
if TYPE_CHECKING: # NOTE: this should always be exactly one entry, since we use # a one-to-many relationship on DirectoryEntry. Technically # it's possible to create a DirectoryFile, that isn't linked # to any directory entry, but generally this shouldn't happen
linked_directory_entries: relationship[list[DirectoryEntry]]
def directory_entry(self) -> 'DirectoryEntry | None':
        # we gracefully handle if there are no linked entries, even though
        # there should always be exactly one
        entries = self.linked_directory_entries
        return entries[0] if entries else None
def access(self) -> str:
        # we don't want these files to show up in search engines
        return 'secret' if self.published else 'private'
class Directory(Base, ContentMixin, TimestampMixin, SearchableContent,
                 MultiAssociatedFiles):
    """ A directory of entries that share a common data structure.

    For example, a directory of people, of emergency services or playgrounds.

    """
__tablename__ = 'directories'
es_properties = {
        'title': {'type': 'localized'},
        'lead': {'type': 'localized'}
    }
def es_public(self) -> bool:
        return False  # to be overridden downstream
#: An interal id for references (not public)
id: 'Column[uuid.UUID]' = Column(
        UUID,  # type:ignore[arg-type]
        primary_key=True,
        default=uuid4
    )
#: The public, unique name of the directory
name: 'Column[str]' = Column(Text, nullable=False, unique=True)
#: The title of the directory
title: 'Column[str]' = Column(Text, nullable=False)
#: Describes the directory briefly
lead: 'Column[str | None]' = Column(Text, nullable=True)
#: The normalized title for sorting
order: 'Column[str]' = Column(Text, nullable=False, index=True)
#: The polymorphic type of the directory
type: 'Column[str]' = Column(
        Text,
        nullable=False,
        default=lambda: 'generic'
    )
#: The data structure of the contained entries
structure: 'Column[str]' = Column(Text, nullable=False)
#: The configuration of the contained entries
configuration: 'Column[DirectoryConfiguration]' = Column(
        DirectoryConfigurationStorage,
        nullable=False
    )
#: The number of entries in the directory @aggregated('entries', Column(Integer, nullable=False, default=0))
def count(self) -> 'ColumnElement[int]':
        return func.count('1')
__mapper_args__ = {
        'polymorphic_on': type,
        'polymorphic_identity': 'generic'
    }
entries: 'relationship[list[DirectoryEntry]]' = relationship(
        'DirectoryEntry',
        order_by='DirectoryEntry.order',
        back_populates='directory'
    )
def entry_cls_name(self) -> str:
        return 'DirectoryEntry'
def entry_cls(self) -> '_type[DirectoryEntry]':
        return self.__class__._decl_class_registry[  # type:ignore
            self.entry_cls_name
        ]
def add(
        self,
        values: dict[str, Any],
        type: 'str | InheritType' = INHERIT
    ) -> 'DirectoryEntry':

        start = values.pop('publication_start', None)
        end = values.pop('publication_end', None)

        # Not converting to UTC here led to returning non-UTC datetimes
        # from UTCDateTimeField, not triggering `def process_result_value`
        if start:
            start = to_timezone(start, 'UTC')
        if end:
            end = to_timezone(end, 'UTC')

        entry = self.entry_cls(
            directory=self,
            type=self.type if type is INHERIT else type,
            meta={},
            publication_start=start,
            publication_end=end,
        )

        return self.update(entry, values, set_name=True)
def add_by_form(
        self,
        form: 'DirectoryEntryForm',
        type: 'str | InheritType' = INHERIT
    ) -> 'DirectoryEntry':

        entry = self.add(form.mixed_data, type)

        # certain features, like mixin-forms require the form population
        # code to run - it ain't pretty but it avoids a lot of headaches
        form.populate_obj(entry, directory_update=False)

        return entry
[docs] def update( self, entry: 'DirectoryEntry', values: 'Mapping[str, Any]', set_name: bool = False, force_update: bool = False ) -> 'DirectoryEntry': session = object_session(self) # replace all existing basic fields updated = { values[] for f in self.basic_fields} # treat file fields differently known_file_ids = { if idx is None else f'{}:{idx}' for f in self.file_fields # add an id for each file in a multiple upload field for idx in ( range(len(values[])) if hasattr(values[], '__len__') else [None] ) } if self.file_fields: def get_value_field_from_note(file_id: str) -> Any: id, __, idx = file_id.rpartition(':') if idx is None or not idx.isdigit(): return values[file_id] return values[id][int(idx)] # files which are not given or whose value is {} are removed # (this is in line with onegov.form's file upload field+widget) for f in entry.files: # this indicates that the file has been renamed if f.note is None or f.note not in known_file_ids: continue value_field = get_value_field_from_note(f.note) if isinstance(value_field, dict): continue delete = ( value_field is None or == {} or is not None ) if delete: session.delete(f) for field in self.file_fields: field_values = values[] if not field_values: updated[] = field_values continue # migrate files during an entry migration if isinstance(field_values, dict): updated[] = field_values file_id = field_values['data'].lstrip('@') with session.no_autoflush: f = session.query(File).filter_by(id=file_id).first() if f and f.type != 'directory': new = DirectoryFile( # type:ignore[misc] id=random_token(),, note=f.note, reference=f.reference ) entry.files.append(new) updated[].update({'data': f'@{}'}) continue elif isinstance(field_values, list): updated[] = field_values for idx, field_value in enumerate(field_values): file_id = field_value['data'].lstrip('@') with session.no_autoflush: f = session.query(File).filter_by( id=file_id ).first() if f and f.type != 'directory': new = DirectoryFile( # type:ignore[misc] id=random_token(),, note=f.note, reference=f.reference ) entry.files.append(new) updated[][idx].update( {'data': f'@{}'} ) continue elif field.type == 'fileinput': # keep files if selected in the dialog if getattr(field_values, 'action', None) == 'keep': original = (entry.values or {}).get(, {}) updated[] = original continue # delete files if selected in the dialog if getattr(field_values, 'action', None) == 'delete': updated[] = {} continue # if there was no file supplied, we can't add it if not getattr(field_values, 'file', None): updated[] = {} continue # create a new file new_file = DirectoryFile( # type:ignore[misc] id=random_token(), name=field_values.filename,, reference=as_fileintent( content=field_values.file, filename=field_values.filename ) ) entry.files.append(new_file) # keep a reference to the file in the values updated[] = { 'data': '@' +, 'filename': field_values.filename, 'mimetype': new_file.reference.file.content_type, 'size': new_file.reference.file.content_length } continue # FIXME: there's quite a bit of copy pasta between the # filefield and multiplefilefield case, we should # try to refactor this so we can handle both more # easily new_idx = 0 updated[] = [] for old_idx, subfield_values in enumerate(field_values): old_values = (entry.values or {}).get( or [] # keep files if selected in the dialog if getattr(subfield_values, 'action', None) == 'keep': if len(old_values) <= old_idx: # it doesn't exist so we can't keep it continue original = old_values[old_idx] updated[].append(original) # update the file.note so it points to the correct # index in the list if necessary file_id = original['data'].lstrip('@') for file in entry.files: if == file_id: new_key = f'{}:{new_idx}' if file.note != new_key: file.note = new_key break new_idx += 1 continue # delete files if selected in the dialog if getattr(subfield_values, 'action', None) == 'delete': continue # if there was no file supplied, we can't add it if not getattr(subfield_values, 'file', None): continue # create a new file new_file = DirectoryFile( # type:ignore[misc] id=random_token(), name=subfield_values.filename, note=f'{}:{new_idx}', reference=as_fileintent( content=subfield_values.file, filename=subfield_values.filename ) ) entry.files.append(new_file) # keep a reference to the file in the values updated[].append({ 'data': '@' +, 'filename': subfield_values.filename, 'mimetype': new_file.reference.file.content_type, 'size': new_file.reference.file.content_length }) new_idx += 1 # update the values if force_update or entry.values != updated: entry.values = updated # mark the values as dirty (required because values is only part # of the actual content dictionary) entry.content.changed() # type:ignore[attr-defined] # update the metadata for attr in ('title', 'lead', 'order', 'keywords'): new = getattr(self.configuration, f'extract_{attr}')(values) if new != getattr(entry, attr): setattr(entry, attr, new) # update the title if set_name: name = self.configuration.extract_name(values) if session: with session.no_autoflush: if self.entry_with_name_exists(name): # FIXME: I don't think this is necessary, the expunge # should already remove the relationship = None # type:ignore[assignment] session.expunge(entry) raise DuplicateEntryError(name) = name # validate the values form = self.form_obj form.process(data=entry.values) if not form.validate(): # if the validation error is captured, the entry is added # to the directory, unless we expunge it = None # type:ignore[assignment] session and session.expunge(entry) raise ValidationError(entry, form.errors) if session and not session._flushing: session.flush() return entry
def title_observer(self, title: str) -> None:
        self.order = normalize_for_url(title)
@observes('structure', 'configuration')
def structure_configuration_observer(
        self,
        structure: str,
        configuration: 'DirectoryConfiguration'
    ) -> None:
        self.migration(structure, configuration).execute()
def entry_with_name_exists(self, name: str) -> bool:
        return object_session(self).query(exists().where(and_(
   == name,
            self.entry_cls.directory_id ==
        ))).scalar()
def migration(
        self,
        new_structure: str,
        new_configuration: 'DirectoryConfiguration'
    ) -> DirectoryMigration:

        return DirectoryMigration(
            directory=self,
            new_structure=new_structure,
            new_configuration=new_configuration
        )
def fields(self) -> 'Sequence[ParsedField]':
        return self.fields_from_structure(self.structure)
@staticmethod @lru_cache(maxsize=1)
def fields_from_structure(structure: str) -> 'Sequence[ParsedField]':
        return tuple(flatten_fieldsets(parse_formcode(structure)))
def basic_fields(self) -> 'Sequence[BasicParsedField]':
        return tuple(
            f for f in self.fields
            if f.type != 'fileinput' and f.type != 'multiplefileinput'
        )
def file_fields(self) -> 'Sequence[FileParsedField]':
        return tuple(
            f for f in self.fields
            if f.type == 'fileinput' or f.type == 'multiplefileinput'
        )
def field_by_id(self, id: str) -> 'ParsedField | None':
        query = (f for f in self.fields if f.human_id == id or == id)
        return next(query, None)
def form_obj(self) -> 'DirectoryEntryForm':
        return self.form_obj_from_structure(self.structure)
def form_class(self) -> '_type[DirectoryEntryForm]':
        return self.form_class_from_structure(self.structure)
def form_obj_from_structure(self, structure: str) -> 'DirectoryEntryForm':
        return self.form_class_from_structure(structure)()
[docs] def form_class_from_structure( self, structure: str ) -> '_type[DirectoryEntryForm]': directory = self class DirectoryEntryForm(parse_form(self.structure)): # type:ignore @property def mixed_data(self) -> dict[str, Any]: # use the field data for non-file fields data = { k: v for k, v in if k not in { for f in directory.file_fields } } # use the field objects for file-fields for field in directory.file_fields: data[] = self[] return data def populate_obj( self, obj: 'DirectoryEntry', directory_update: bool = True ) -> None: exclude = {k for k, v in inspect.getmembers( obj.__class__, lambda v: isinstance(v, InstrumentedAttribute) )} include = ('publication_start', 'publication_end') exclude = {k for k in exclude if k not in include} super().populate_obj(obj, exclude=exclude) if directory_update: directory.update(obj, self.mixed_data) def process_obj(self, obj: 'DirectoryEntry') -> None: super().process_obj(obj) for field in directory.fields: form_field = getattr(self, if form_field is None: continue data = obj.values.get( if isinstance(form_field, FieldList): for subdata in data or (): form_field.append_entry(subdata) else: = data return DirectoryEntryForm
class EntryRecipient(Base, TimestampMixin):
    """ Represents a single recipient. """
__tablename__ = 'entry_recipients'
#: the id of the recipient, used in the url
id: 'Column[uuid.UUID]' = Column(
        UUID,  # type:ignore[arg-type]
        primary_key=True,
        default=uuid4
    )
#: the email address of the recipient
address: 'Column[str]' = Column(Text, nullable=False)
def validate_address(self, key: str, address: str) -> str:
        assert validate_email(address)
        return address
#: this token is used for confirm and unsubscribe
token: 'Column[str]' = Column(Text, nullable=False, default=random_token)
#: when recipients are added, they are unconfirmed. At this point they get #: one e-mail with a confirmation link. If they ignore said e-mail they #: should not get another one.
confirmed: 'Column[bool]' = Column(Boolean, nullable=False, default=False)
def subscription(self) -> 'EntrySubscription':
        return EntrySubscription(self, self.token)
directory_id: 'Column[uuid.UUID]' = Column(
        UUID,  # type:ignore[arg-type]
        nullable=False
    )
class EntrySubscription:
    """ Adds subscription management to a recipient. """

    def __init__(self, recipient: EntryRecipient, token: str):
self.recipient = recipient
self.token = token
def recipient_id(self) -> 'uuid.UUID':
        # even though this seems redundant, we need this property
        # for morepath, so it can match it to the path variable
        return
def confirm(self) -> bool:
        if self.recipient.token != self.token:
            return False

        self.recipient.confirmed = True
        return True
def unsubscribe(self) -> bool:
        if self.recipient.token != self.token:
            return False

        session = object_session(self.recipient)
        session.delete(self.recipient)
        session.flush()

        return True