from __future__ import annotations
from onegov.directory.models.directory_entry import DirectoryEntry
from onegov.form import as_internal_id
from onegov.form import flatten_fieldsets
from onegov.form import parse_form
from onegov.form import parse_formcode
from sqlalchemy.orm import object_session, joinedload, undefer
from sqlalchemy.orm.attributes import get_history
from typing import Any, TYPE_CHECKING
if TYPE_CHECKING:
    from collections.abc import Callable, Iterable
    from datetime import date, datetime, time
    from onegov.directory.models import Directory
    from onegov.directory.types import DirectoryConfiguration
[docs]
class DirectoryMigration:
    """ Takes a directory and the structure/configuration it should have in
    the future.
    It then migrates the existing directory entries, if possible.
    """
    def __init__(
        self,
        directory: Directory,
        new_structure: str | None = None,
        new_configuration: DirectoryConfiguration | None = None,
        old_structure: str | None = None
    ):
[docs]
        self.directory = directory 
[docs]
        self.old_structure = old_structure or self.old_directory_structure 
[docs]
        self.new_structure = new_structure or directory.structure 
[docs]
        self.new_configuration = new_configuration or directory.configuration 
[docs]
        self.fieldtype_migrations = FieldTypeMigrations() 
[docs]
        self.changes = StructuralChanges(
            self.old_structure,
            self.new_structure
        ) 
    @property
[docs]
    def old_directory_structure(self) -> str:
        history = get_history(self.directory, 'structure')
        if history.deleted:
            return history.deleted[0]
        else:
            return self.directory.structure 
    @property
[docs]
    def possible(self) -> bool:
        if not self.directory.entries:
            return True
        if not self.changes:
            return True
        if not self.changes.changed_fields:
            return True
        for changed in self.changes.changed_fields:
            old = self.changes.old[changed]
            new = self.changes.new[changed]
            # we can turn required into optional fields and vice versa
            # (the form validation takes care of validating the requirements)
            if old.required != new.required and old.type == new.type:
                continue
            # we can only convert certain types
            if old.required == new.required and old.type != new.type:
                if not self.fieldtype_migrations.possible(old.type, new.type):
                    break
        else:
            return True
        return False 
    @property
[docs]
    def entries(self) -> Iterable[DirectoryEntry]:
        session = object_session(self.directory)
        if not session:
            return self.directory.entries
        e = session.query(DirectoryEntry)
        e = e.filter_by(directory_id=self.directory.id)
        e = e.options(joinedload(DirectoryEntry.files))
        e = e.options(undefer(DirectoryEntry.content))
        return e 
[docs]
    def execute(self) -> None:
        """ To run the migration, run this method. The other methods below
        should only be used if you know what you are doing.
        """
        assert self.possible
        self.migrate_directory()
        # Triggers the observer to func::structure_configuration_observer()
        # and executing this very function because of an autoflush event
        # in a new instance.
        for entry in self.entries:
            self.migrate_entry(entry) 
[docs]
    def migrate_directory(self) -> None:
        self.directory.structure = self.new_structure
        self.directory.configuration = self.new_configuration 
[docs]
    def migrate_entry(self, entry: DirectoryEntry) -> None:
        """
        This function is called after an update to the directory structure.
        During execution of self.execute(), the directory is migrated.
        On start of looping trough the entries, an auto_flush occurs, calling
        the migration observer for the directory, which will instantiate yet
        another instance of the migration. After this inside execute(),
        the session is not flusing anymore, and we have to skip,
        since the values are already migrated and migration will
        fail when removing fieldsets.
        """
        update = self.changes and True or False
        session = object_session(entry)
        if not session._flushing:
            return
        self.migrate_values(entry.values)
        self.directory.update(entry, entry.values, force_update=update) 
[docs]
    def migrate_values(self, values: dict[str, Any]) -> None:
        self.add_new_fields(values)
        self.remove_old_fields(values)
        self.rename_fields(values)
        self.convert_fields(values) 
[docs]
    def add_new_fields(self, values: dict[str, Any]) -> None:
        for added in self.changes.added_fields:
            added = as_internal_id(added)
            values[added] = None 
[docs]
    def remove_old_fields(self, values: dict[str, Any]) -> None:
        for removed in self.changes.removed_fields:
            removed = as_internal_id(removed)
            del values[removed] 
[docs]
    def rename_fields(self, values: dict[str, Any]) -> None:
        for old, new in self.changes.renamed_fields.items():
            old, new = as_internal_id(old), as_internal_id(new)
            values[new] = values[old]
            del values[old] 
[docs]
    def convert_fields(self, values: dict[str, Any]) -> None:
        for changed in self.changes.changed_fields:
            convert = self.fieldtype_migrations.get_converter(
                self.changes.old[changed].type,
                self.changes.new[changed].type
            )
            assert convert is not None
            changed = as_internal_id(changed)
            values[changed] = convert(values[changed]) 
 
[docs]
class FieldTypeMigrations:
    """ Contains methods to migrate fields from one type to another. """
[docs]
    def possible(self, old_type: str, new_type: str) -> bool:
        return self.get_converter(old_type, new_type) is not None 
[docs]
    def get_converter(
        self,
        old_type: str,
        new_type: str
    ) -> Callable[[Any], Any] | None:
        if old_type == 'password':
            return None  # disabled to avoid accidental leaks
        if old_type == new_type:
            return lambda v: v
        explicit = f'{old_type}_to_{new_type}'
        generic = f'any_to_{new_type}'
        return getattr(self, explicit, getattr(self, generic, None)) 
    # FIXME: A lot of these converters currently don't work if the value
    #        happens to be None, which should be possible for every field
    #        as long as its optional, or do we skip converting None
    #        explicitly somewhere?!
[docs]
    def any_to_text(self, value: Any) -> str:
        return str(value if value is not None else '').strip() 
[docs]
    def any_to_textarea(self, value: Any) -> str:
        return self.any_to_text(value) 
[docs]
    def textarea_to_text(self, value: str) -> str:
        return value.replace('\n', ' ').strip() 
[docs]
    def textarea_to_code(self, value: str) -> str:
        return value 
[docs]
    def text_to_code(self, value: str) -> str:
        return value 
[docs]
    def date_to_text(self, value: date) -> str:
        return '{:%d.%m.%Y}'.format(value) 
[docs]
    def datetime_to_text(self, value: datetime) -> str:
        return '{:%d.%m.%Y %H:%M}'.format(value) 
[docs]
    def time_to_text(self, value: time) -> str:
        return '{:%H:%M}'.format(value) 
[docs]
    def radio_to_checkbox(self, value: str) -> list[str]:
        return [value] 
[docs]
    def text_to_url(self, value: str) -> str:
        return value 
 
[docs]
class StructuralChanges:
    """ Tries to detect structural changes between two formcode blocks.
    Can only be trusted if the ``detection_successful`` property is True. If it
    is not, the detection failed because the changes were too great.
    """
    def __init__(self, old_structure: str, new_structure: str) -> None:
        old_fieldsets = parse_formcode(old_structure)
        new_fieldsets = parse_formcode(new_structure)
[docs]
        self.old = {
            f.human_id: f for f in flatten_fieldsets(old_fieldsets)
        } 
[docs]
        self.new = {
            f.human_id: f for f in flatten_fieldsets(new_fieldsets)
        } 
[docs]
        self.old_fieldsets = old_fieldsets 
[docs]
        self.new_fieldsets = new_fieldsets 
        self.detect_added_fieldsets()
        self.detect_removed_fieldsets()
        self.detect_added_fields()
        self.detect_removed_fields()
        self.detect_renamed_fields()  # modifies added/removed fields
        self.detect_changed_fields()
[docs]
    def __bool__(self) -> bool:
        return bool(
            self.added_fields
            or self.removed_fields
            or self.renamed_fields
            or self.changed_fields
        ) 
[docs]
    def detect_removed_fieldsets(self) -> None:
        new_ids = tuple(f.human_id for f in self.new_fieldsets if f.human_id)
        self.removed_fieldsets = [
            f.human_id for f in self.old_fieldsets
            if f.human_id and f.human_id not in new_ids
        ] 
[docs]
    def detect_added_fieldsets(self) -> None:
        old_ids = tuple(f.human_id for f in self.old_fieldsets if f.human_id)
        self.added_fieldsets = [
            f.human_id for f in self.new_fieldsets
            if f.human_id and f.human_id not in old_ids
        ] 
[docs]
    def detect_added_fields(self) -> None:
        self.added_fields = [
            f.human_id for f in self.new.values()
            if f.human_id not in {f.human_id for f in self.old.values()}
        ] 
[docs]
    def detect_removed_fields(self) -> None:
        self.removed_fields = [
            f.human_id for f in self.old.values()
            if f.human_id not in {f.human_id for f in self.new.values()}
        ] 
[docs]
    def do_rename(self, removed: str, added: str) -> bool:
        if removed in self.renamed_fields:
            return False
        if added in set(self.renamed_fields.values()):
            return False
        same_type = self.old[removed].type == self.new[added].type
        if not same_type:
            return False
        added_fs = '/'.join(added.split('/')[:-1])
        removed_fs = '/'.join(removed.split('/')[:-1])
        # has no fieldset
        if not added_fs and not removed_fs:
            return same_type
        # case fieldset/Oldname --> Oldname
        if removed_fs and not added_fs:
            if f'{removed_fs}/{added}' == removed:
                return True
        # case Oldname --> fieldset/Name
        if added_fs and not removed_fs:
            if f'{added_fs}/{removed}' == added:
                return True
        # case fieldset rename and field rename
        in_removed = any(s == removed_fs for s in self.removed_fieldsets)
        in_added = any(s == added_fs for s in self.added_fieldsets)
        # Fieldset rename
        expected = f'{added_fs}/{removed.split("/")[-1]}'
        if in_added and in_removed:
            if expected == added:
                return True
            if expected in self.added_fields:
                return False
            if added not in self.renamed_fields.values():
                # Prevent assigning same new field twice
                return True
        # Fieldset has been deleted
        if (in_removed and not in_added) or (in_added and not in_removed):
            if expected == added:
                # It matches exactly
                return True
            if expected in self.added_fields:
                # there is another field that matches better
                return False
        # if len(self.added_fields) == len(self.removed_fields) == 1:
        #     return True
        return True 
[docs]
    def detect_renamed_fields(self) -> None:
        # renames are detected aggressively - we rather have an incorrect
        # rename than an add/remove combo. Renames lead to no data loss, while
        # a add/remove combo does.
        self.renamed_fields = {}
        for r in self.removed_fields:
            for a in self.added_fields:
                if self.do_rename(r, a):
                    self.renamed_fields[r] = a
        self.added_fields = [
            f for f in self.added_fields
            if f not in set(self.renamed_fields.values())
        ]
        self.removed_fields = [
            f for f in self.removed_fields
            if f not in self.renamed_fields
        ] 
[docs]
    def detect_changed_fields(self) -> None:
        self.changed_fields = []
        for old_id, old in self.old.items():
            if old_id in self.renamed_fields:
                new_id = self.renamed_fields[old_id]
            elif old_id in self.new:
                new_id = old_id
            else:
                continue
            new = self.new[new_id]
            if old.required != new.required or old.type != new.type:
                self.changed_fields.append(new_id)