Source code for directory.migration

from __future__ import annotations

from onegov.directory.models.directory_entry import DirectoryEntry
from onegov.form import as_internal_id
from onegov.form import flatten_fieldsets
from onegov.form import parse_form
from onegov.form import parse_formcode
from onegov.form.parser.core import OptionsField
from sqlalchemy.orm import object_session, joinedload, undefer
from sqlalchemy.orm.attributes import get_history
from wtforms import ValidationError

from typing import Any, TYPE_CHECKING

if TYPE_CHECKING:
    from collections.abc import Callable, Iterable
    from datetime import date, datetime, time
    from onegov.directory.models import Directory
    from onegov.directory.types import DirectoryConfiguration


[docs] class DirectoryMigration: """ Takes a directory and the structure/configuration it should have in the future. It then migrates the existing directory entries, if possible. """ def __init__( self, directory: Directory, new_structure: str | None = None, new_configuration: DirectoryConfiguration | None = None, old_structure: str | None = None ):
[docs] self.directory = directory
[docs] self.old_structure = old_structure or self.old_directory_structure
[docs] self.new_structure = new_structure or directory.structure
[docs] self.new_configuration = new_configuration or directory.configuration
[docs] self.new_form_class = parse_form(self.new_structure)
[docs] self.fieldtype_migrations = FieldTypeMigrations()
[docs] self.changes = StructuralChanges( self.old_structure, self.new_structure )
@property
[docs] def old_directory_structure(self) -> str: history = get_history(self.directory, 'structure') if history.deleted: return history.deleted[0] else: return self.directory.structure
@property
[docs] def possible(self) -> bool: """ Returns True if the migration is possible, False otherwise. """ if not self.directory.entries: return True if not self.changes: return True if len(self.changes.renamed_options) > 1: return False if self.multiple_option_changes_in_one_step(): return False if self.added_required_fields(): return False for changed in self.changes.changed_fields: old = self.changes.old[changed] new = self.changes.new[changed] # we can turn required into optional fields and vice versa # (the form validation takes care of validating the requirements) if old.required != new.required and old.type == new.type: continue # we can only convert certain types if old.required == new.required and old.type != new.type: if not self.fieldtype_migrations.possible(old.type, new.type): break else: return True return False
@property
[docs] def entries(self) -> Iterable[DirectoryEntry]: session = object_session(self.directory) if not session: return self.directory.entries e = session.query(DirectoryEntry) e = e.filter_by(directory_id=self.directory.id) e = e.options(joinedload(DirectoryEntry.files)) e = e.options(undefer(DirectoryEntry.content)) return e
[docs] def execute(self) -> None: """ To run the migration, run this method. The other methods below should only be used if you know what you are doing. """ if self.added_required_fields(): raise ValidationError( '${fields}: New fields cannot be required initially. ' 'Require them in a separate migration step.'.format( fields=', '.join( f'"{f}"' for f in self.get_added_required_field_ids()) ) ) assert self.possible self.migrate_directory() # Triggers the observer to func::structure_configuration_observer() # and executing this very function because of an autoflush event # in a new instance. for entry in self.entries: self.migrate_entry(entry)
[docs] def migrate_directory(self) -> None: self.directory.structure = self.new_structure self.directory.configuration = self.new_configuration
[docs] def migrate_entry(self, entry: DirectoryEntry) -> None: """ This function is called after an update to the directory structure. During execution of self.execute(), the directory is migrated. On start of looping trough the entries, an auto_flush occurs, calling the migration observer for the directory, which will instantiate yet another instance of the migration. After this inside execute(), the session is not flusing anymore, and we have to skip, since the values are already migrated and migration will fail when removing fieldsets. """ update = self.changes and True or False session = object_session(entry) if not session._flushing: return self.migrate_values(entry.values) self.directory.update(entry, entry.values, force_update=update)
[docs] def migrate_values(self, values: dict[str, Any]) -> None: self.add_new_fields(values) self.remove_old_fields(values) self.rename_fields(values) self.convert_fields(values) self.rename_options(values) self.remove_old_options(values)
[docs] def add_new_fields(self, values: dict[str, Any]) -> None: for added in self.changes.added_fields: added = as_internal_id(added) values[added] = None
[docs] def remove_old_fields(self, values: dict[str, Any]) -> None: for removed in self.changes.removed_fields: removed = as_internal_id(removed) del values[removed]
[docs] def rename_fields(self, values: dict[str, Any]) -> None: for old, new in self.changes.renamed_fields.items(): old, new = as_internal_id(old), as_internal_id(new) values[new] = values[old] del values[old]
[docs] def convert_fields(self, values: dict[str, Any]) -> None: for changed in self.changes.changed_fields: convert = self.fieldtype_migrations.get_converter( self.changes.old[changed].type, self.changes.new[changed].type ) assert convert is not None changed = as_internal_id(changed) values[changed] = convert(values[changed])
[docs] def rename_options(self, values: dict[str, Any]) -> None: for old_option, new_option in self.changes.renamed_options.items(): old_label = old_option[1] new_label = new_option[1] for key, val in list(values.items()): if isinstance(val, list): values[key] = [ new_label if v == old_label else v for v in val ] elif val == old_label: values[key] = new_label
[docs] def remove_old_options(self, values: dict[str, Any]) -> None: for human_id, label in self.changes.removed_options: id = as_internal_id(human_id) if id in values: if isinstance(values[id], list): values[id] = [v for v in values[id] if v != label] elif values[id] == label: values[id] = None
[docs] def multiple_option_changes_in_one_step(self) -> bool: """ Returns True if there are multiple changes e.g. added and removed options. """ if ( (self.changes.added_options and self.changes.removed_options) or (self.changes.added_options and self.changes.renamed_options) or (self.changes.removed_options and self.changes.renamed_options) ): return True return False
[docs] def added_required_fields(self) -> bool: """ Identify newly added fields that are set to be required. Newly added fields shall not be required if entries exist, make them required in a separate migration step. """ if self.directory.entries: return any( f.required for f in self.changes.new.values() if f.human_id in self.changes.added_fields ) return False
[docs] def get_added_required_field_ids(self) -> list[str]: return [ f.human_id for f in self.changes.new.values() if f.required and f.human_id in self.changes.added_fields ]
[docs] class FieldTypeMigrations: """ Contains methods to migrate fields from one type to another. """
[docs] def possible(self, old_type: str, new_type: str) -> bool: return self.get_converter(old_type, new_type) is not None
[docs] def get_converter( self, old_type: str, new_type: str ) -> Callable[[Any], Any] | None: if old_type == 'password': return None # disabled to avoid accidental leaks if old_type == new_type: return lambda v: v explicit = f'{old_type}_to_{new_type}' generic = f'any_to_{new_type}' return getattr(self, explicit, getattr(self, generic, None))
[docs] def any_to_text(self, value: Any) -> str: return str(value if value is not None else '').strip()
[docs] def any_to_textarea(self, value: Any) -> str: return self.any_to_text(value)
[docs] def textarea_to_text(self, value: str) -> str: return value.replace('\n', ' ').strip()
[docs] def textarea_to_code(self, value: str) -> str: return value
[docs] def text_to_code(self, value: str) -> str: return value
[docs] def date_to_text(self, value: date) -> str: return '{:%d.%m.%Y}'.format(value) if value else ''
[docs] def datetime_to_text(self, value: datetime) -> str: return '{:%d.%m.%Y %H:%M}'.format(value) if value else ''
[docs] def time_to_text(self, value: time) -> str: return '{:%H:%M}'.format(value) if value else ''
[docs] def radio_to_checkbox(self, value: str) -> list[str]: return [value] if value else []
[docs] def text_to_url(self, value: str) -> str: return value
[docs] class StructuralChanges: """ Tries to detect structural changes between two formcode blocks. Can only be trusted if the ``detection_successful`` property is True. If it is not, the detection failed because the changes were too great. """ def __init__(self, old_structure: str, new_structure: str) -> None: old_fieldsets = parse_formcode(old_structure) new_fieldsets = parse_formcode(new_structure)
[docs] self.old = { f.human_id: f for f in flatten_fieldsets(old_fieldsets) }
[docs] self.new = { f.human_id: f for f in flatten_fieldsets(new_fieldsets) }
[docs] self.old_fieldsets = old_fieldsets
[docs] self.new_fieldsets = new_fieldsets
self.detect_added_fieldsets() self.detect_removed_fieldsets() self.detect_added_fields() self.detect_removed_fields() self.detect_renamed_fields() # modifies added/removed fields self.detect_changed_fields() self.detect_added_options() self.detect_removed_options() self.detect_renamed_options()
[docs] def __bool__(self) -> bool: return bool( self.added_fields or self.removed_fields or self.renamed_fields or self.changed_fields or self.added_options or self.removed_options or self.renamed_options # radio and checkboxes )
[docs] def detect_removed_fieldsets(self) -> None: new_ids = tuple(f.human_id for f in self.new_fieldsets if f.human_id) self.removed_fieldsets = [ f.human_id for f in self.old_fieldsets if f.human_id and f.human_id not in new_ids ]
[docs] def detect_added_fieldsets(self) -> None: old_ids = tuple(f.human_id for f in self.old_fieldsets if f.human_id) self.added_fieldsets = [ f.human_id for f in self.new_fieldsets if f.human_id and f.human_id not in old_ids ]
[docs] def detect_added_fields(self) -> None: self.added_fields = [ f.human_id for f in self.new.values() if f.human_id not in {f.human_id for f in self.old.values()} ]
[docs] def detect_removed_fields(self) -> None: self.removed_fields = [ f.human_id for f in self.old.values() if f.human_id not in {f.human_id for f in self.new.values()} ]
[docs] def do_rename(self, removed: str, added: str) -> bool: if removed in self.renamed_fields: return False if added in set(self.renamed_fields.values()): return False same_type = self.old[removed].type == self.new[added].type if not same_type: return False added_fs = '/'.join(added.split('/')[:-1]) removed_fs = '/'.join(removed.split('/')[:-1]) # has no fieldset if not added_fs and not removed_fs: return same_type # case fieldset/Oldname --> Oldname if removed_fs and not added_fs: if f'{removed_fs}/{added}' == removed: return True # case Oldname --> fieldset/Name if added_fs and not removed_fs: if f'{added_fs}/{removed}' == added: return True # case fieldset rename and field rename in_removed = any(s == removed_fs for s in self.removed_fieldsets) in_added = any(s == added_fs for s in self.added_fieldsets) # Fieldset rename expected = f'{added_fs}/{removed.split("/")[-1]}' if in_added and in_removed: if expected == added: return True if expected in self.added_fields: return False if added not in self.renamed_fields.values(): # Prevent assigning same new field twice return True # Fieldset has been deleted if (in_removed and not in_added) or (in_added and not in_removed): if expected == added: # It matches exactly return True if expected in self.added_fields: # there is another field that matches better return False # if len(self.added_fields) == len(self.removed_fields) == 1: # return True return True
[docs] def detect_renamed_fields(self) -> None: # renames are detected aggressively - we rather have an incorrect # rename than an add/remove combo. Renames lead to no data loss, while # a add/remove combo does. self.renamed_fields = {} for r in self.removed_fields: for a in self.added_fields: if self.do_rename(r, a): self.renamed_fields[r] = a self.added_fields = [ f for f in self.added_fields if f not in set(self.renamed_fields.values()) ] self.removed_fields = [ f for f in self.removed_fields if f not in self.renamed_fields ]
[docs] def detect_changed_fields(self) -> None: self.changed_fields = [] for old_id, old in self.old.items(): if old_id in self.renamed_fields: new_id = self.renamed_fields[old_id] elif old_id in self.new: new_id = old_id else: continue new = self.new[new_id] if old.required != new.required or old.type != new.type: self.changed_fields.append(new_id)
[docs] def detect_added_options(self) -> None: self.added_options = [] for old_id, old_field in self.old.items(): if isinstance(old_field, OptionsField) and old_id in self.new: new_field = self.new[old_id] if isinstance(new_field, OptionsField): new_labels = [r.label for r in new_field.choices] old_labels = [r.label for r in old_field.choices] for n in new_labels: if n not in old_labels: self.added_options.append((old_id, n))
[docs] def detect_removed_options(self) -> None: self.removed_options = [] for old_id, old_field in self.old.items(): if isinstance(old_field, OptionsField) and old_id in self.new: new_field = self.new[old_id] if isinstance(new_field, OptionsField): new_labels = [r.label for r in new_field.choices] old_labels = [r.label for r in old_field.choices] for o in old_labels: if o not in new_labels: self.removed_options.append((old_id, o))
[docs] def detect_renamed_options(self) -> None: self.renamed_options = {} for old_id, old_field in self.old.items(): if isinstance(old_field, OptionsField) and old_id in self.new: new_field = self.new[old_id] if isinstance(new_field, OptionsField): old_labels = [r.label for r in old_field.choices] new_labels = [r.label for r in new_field.choices] if old_labels == new_labels: continue # test if re-ordered if set(old_labels) == set(new_labels): continue # only consider renames if the number of options # remains the same if len(old_labels) != len(new_labels): continue for o_label, n_label in zip(old_labels, new_labels): if o_label != n_label: self.renamed_options[(old_id, o_label)] = ( old_id, n_label ) self.added_options = [ ao for ao in self.added_options if ao not in self.renamed_options.values() ] self.removed_options = [ ro for ro in self.removed_options if ro not in self.renamed_options.keys() ]