Source code for user.upgrade

""" Contains upgrade tasks that are executed when the application is being
upgraded on the server. See :class:`onegov.core.upgrade.upgrade_task`.

"""
from __future__ import annotations

from collections import defaultdict
from onegov.core.upgrade import upgrade_task
from onegov.core.orm.types import JSON, UUID
from onegov.user import User, UserCollection
from sqlalchemy import Boolean, Column, Text
from sqlalchemy.sql import text


from typing import TYPE_CHECKING
if TYPE_CHECKING:
    import uuid
    from datetime import datetime
    from onegov.core.upgrade import UpgradeContext


@upgrade_task('Add second_factor column')
[docs] def add_second_factor_column(context: UpgradeContext) -> None: context.operations.add_column( 'users', Column('second_factor', JSON, nullable=True) )
@upgrade_task('Add active column')
[docs] def add_active_column(context: UpgradeContext) -> None: context.operations.add_column( 'users', Column('active', Boolean, nullable=True, default=True) ) for user in context.session.query(User).all(): user.active = True context.session.flush() context.operations.alter_column('users', 'active', nullable=False)
@upgrade_task('Add realname column')
[docs] def add_realname_column(context: UpgradeContext) -> None: context.operations.add_column( 'users', Column('realname', Text, nullable=True)) for user in context.session.query(User).all(): user.realname = (user.data and user.data or {}).get('name') context.session.flush()
[docs] def change_ownership_by_name( context: UpgradeContext, old_username: str, new_username: str ) -> None: # transfer all ownership without using models (which might or # might not be available here) if context.has_table('activities'): context.operations.execute(""" UPDATE activities SET username = '{new_username}' WHERE username = '{old_username}' """.format(new_username=new_username, old_username=old_username)) if context.has_table('attendees'): context.operations.execute(""" UPDATE attendees SET username = '{new_username}' WHERE username = '{old_username}' """.format(new_username=new_username, old_username=old_username)) if context.has_table('bookings'): context.operations.execute(""" UPDATE bookings SET username = '{new_username}' WHERE username = '{old_username}' """.format(new_username=new_username, old_username=old_username)) if context.has_table('invoice_items'): context.operations.execute(""" UPDATE invoice_items SET username = '{new_username}' WHERE username = '{old_username}' """.format(new_username=new_username, old_username=old_username))
[docs] def change_ownership_by_id( context: UpgradeContext, old_userid: uuid.UUID, new_userid: uuid.UUID ) -> None: if context.has_table('tickets'): context.operations.execute(""" UPDATE tickets SET user_id = '{new_userid}' WHERE user_id = '{old_userid}' """.format(old_userid=old_userid.hex, new_userid=new_userid.hex))
@upgrade_task('Force lowercase usernames')
[docs] def force_lowercase_usernames(context: UpgradeContext) -> None: users = defaultdict(list) for user in context.session.query(User).all(): users[user.username.lower()].append(user) temp_user = UserCollection(context.session).add( username='temp', password='temp', # nosec: B106 active=False, role='member', ) for users_ in users.values(): # simply change usernames that don't conflict with others if len(users_) == 1: with context.session.no_autoflush: change_ownership_by_name( context, users_[0].username, 'temp') users_[0].username = users_[0].username.lower() context.session.flush() change_ownership_by_name( context, 'temp', users_[0].username) continue # from others select one user, move ownership to it and then # remove the other users -> that means that some of our users need # to reset their password... # the remaining user is selected by activity, last-change and role # we prefer active users over low-ranking users over less recently # updated users role_hierarchy = [ 'member', 'editor', 'admin' ] def sort_key( user: User, role_hierarchy: list[str] = role_hierarchy ) -> tuple[bool, int, datetime]: return ( user.active, role_hierarchy.index(user.role), user.last_change, ) # FIXME: This never actually worked before because it was using the # the dictionary instead of the list, was this maybe hotfixed # and then just never merged into the codebase? remaining = sorted(users_, key=sort_key)[-1] remaining_data = remaining.data or {} others = [u for u in users_ if u.id != remaining.id] for other in others: # keep as much user data as possible for key, value in (other.data or {}).items(): if value and not remaining.data.get(key): remaining_data[key] = value # change the userids change_ownership_by_id(context, other.id, remaining.id) # change the username change_ownership_by_name(context, other.username, 'temp') # delete the other user context.operations.execute(""" DELETE from users WHERE id = '{}' """.format(other.id.hex)) # switch the remaining user change_ownership_by_name(context, remaining.username, 'temp') remaining.username = remaining.username.lower() remaining.data = remaining_data context.session.flush() change_ownership_by_name(context, 'temp', remaining.username) # enforce the lowercase rule context.operations.create_index( 'lowercase_username', 'users', [ text('lower("username")') ], unique=True ) # remove the temporary user context.session.delete(temp_user)
@upgrade_task('Add singup_token column')
[docs] def add_signup_token_column(context: UpgradeContext) -> None: context.operations.add_column( 'users', Column('signup_token', Text, nullable=True))
@upgrade_task('Add group_id column')
[docs] def add_group_id_column(context: UpgradeContext) -> None: if not context.has_column('users', 'group_id'): context.operations.add_column( 'users', Column('group_id', UUID, nullable=True) )
@upgrade_task('Add type column')
[docs] def add_type_column(context: UpgradeContext) -> None: if not context.has_column('users', 'type'): context.operations.add_column( 'users', Column('type', Text, nullable=True) )
@upgrade_task('Add authentication_provider column')
[docs] def add_authentication_provider_column(context: UpgradeContext) -> None: context.operations.add_column( 'users', Column('authentication_provider', JSON, nullable=True))
@upgrade_task('Drop authentication_provider column')
[docs] def drop_authentication_provider_column(context: UpgradeContext) -> None: context.operations.drop_column('users', 'authentication_provider')
@upgrade_task('Add source column')
[docs] def add_source_column(context: UpgradeContext) -> None: context.operations.add_column( 'users', Column('source', Text, nullable=True, default=None) )
@upgrade_task('Add source_id column')
[docs] def add_source_id_column(context: UpgradeContext) -> None: context.operations.add_column( 'users', Column('source_id', Text, nullable=True, default=None)) context.operations.create_unique_constraint( 'unique_source_id', 'users', ('source', 'source_id'))
@upgrade_task('Make user models polymorphic type non-nullable')
[docs] def make_user_models_polymorphic_type_non_nullable( context: UpgradeContext ) -> None: for table in ('users', 'groups', 'role_mappings'): if context.has_table(table): context.operations.execute(f""" UPDATE {table} SET type = 'generic' WHERE type IS NULL; """) context.operations.alter_column(table, 'type', nullable=False)
@upgrade_task('Add scope column')
[docs] def add_scope_column(context: UpgradeContext) -> None: if not context.has_table('tans'): return context.operations.add_column( 'tans', Column('scope', Text, nullable=True, index=True) ) context.operations.execute( "UPDATE tans set scope = 'mtan_access' WHERE scope IS NULL;" ) context.session.flush() context.operations.alter_column('tans', 'scope', nullable=False)