Source code for user.cli

""" Provides commands used to manage users. """

import click
import phonenumbers
import pyotp

from getpass import getpass
from onegov.user import User, UserCollection
from onegov.core.cli import command_group, abort
from onegov.core.crypto import random_password


from typing import TYPE_CHECKING
if TYPE_CHECKING:
    from collections.abc import Callable
    from onegov.core.framework import Framework
    from onegov.core.request import CoreRequest
    from sqlalchemy.orm import Query


[docs] cli = command_group()
@cli.command(context_settings={'singular': True}) @click.argument('role') @click.argument('username') @click.option('--password', default=None, help='Password to give the user') @click.option('--yubikey', default=None, help='The yubikey code to use for 2fa') @click.option('--realname', default=None, help="First name and last name, for example 'Jane Doe'") @click.option('--phone_number', default=None, help='Sets the phone number') @click.option('--no-prompt', default=False, help='If no questions should be asked', is_flag=True)
[docs] def add( role: str, username: str, password: str | None, yubikey: str | None, no_prompt: bool, realname: str | None, phone_number: str | None ) -> 'Callable[[CoreRequest, Framework], None]': """ Adds a user with the given name to the database. """ def add_user(request: 'CoreRequest', app: 'Framework') -> None: users = UserCollection(app.session()) print('Adding {} to {}'.format(username, app.application_id)) if users.exists(username): abort('{} already exists'.format(username)) nonlocal password if not password: password = random_password(16) print() print('Using the following random password:') click.secho(password, fg='green') print() nonlocal yubikey if not yubikey and not no_prompt: yubikey = getpass( 'Optionally plug in your yubi-key and press the button: ' ) yubikey = yubikey.strip() if yubikey: second_factor = { 'type': 'yubikey', 'data': yubikey[:12] } else: second_factor = None users.add(username, password, role, second_factor=second_factor, phone_number=phone_number, realname=realname) click.secho('{} was added'.format(username), fg='green') return add_user
@cli.command(context_settings={'singular': True}) @click.argument('username')
[docs] def delete(username: str) -> 'Callable[[CoreRequest, Framework], None]': """ Removes the given user from the database. """ def delete_user(request: 'CoreRequest', app: 'Framework') -> None: users = UserCollection(app.session()) if not users.exists(username): abort('{} does not exist'.format(username)) users.delete(username) click.secho('{} was deleted'.format(username), fg='green') return delete_user
@cli.command(context_settings={'default_selector': '*'}) @click.argument('username') @click.option('-r', '--recursive', is_flag=True, default=False)
[docs] def exists(username: str, recursive: bool) -> ('Callable[[CoreRequest, ' 'Framework], None]'): """ Returns 0 if the user exists, 1 if it doesn't when recursive equals to False. If the recursive flag is set, it will loop over all schemas and print the result for each schema without return value.""" def find_user(request: 'CoreRequest', app: 'Framework') -> None: users = UserCollection(app.session()) if users.exists(username): click.secho(f'{app.schema} {username} exists', fg='green') else: if recursive: click.secho(f'{app.schema} {username} does not exist', fg='yellow') else: abort(f'{app.schema} {username} does not exist') return find_user
@cli.command(context_settings={'singular': True}) @click.argument('username')
[docs] def activate(username: str) -> 'Callable[[CoreRequest, Framework], None]': """ Activates the given user. """ def activate_user(request: 'CoreRequest', app: 'Framework') -> None: user = UserCollection(app.session()).by_username(username) if user is None: abort('{} does not exist'.format(username)) user.active = True click.secho('{} was activated'.format(username), fg='green') return activate_user
@cli.command(context_settings={'singular': True}) @click.argument('username')
[docs] def deactivate(username: str) -> 'Callable[[CoreRequest, Framework], None]': """ Deactivates the given user. """ def deactivate_user(request: 'CoreRequest', app: 'Framework') -> None: user = UserCollection(app.session()).by_username(username) if not user: abort('{} does not exist'.format(username)) user.active = False user.logout_all_sessions(request.app) click.secho('{} was deactivated'.format(username), fg='green') return deactivate_user
@cli.command(context_settings={'singular': True}) @click.argument('username')
[docs] def logout(username: str) -> 'Callable[[CoreRequest, Framework], None]': """ Logs out the given user on all sessions. """ def logout_user(request: 'CoreRequest', app: 'Framework') -> None: user = UserCollection(app.session()).by_username(username) if not user: abort('{} does not exist'.format(username)) user.logout_all_sessions(request.app) click.secho('{} logged out'.format(username), fg='green') return logout_user
@cli.command(name='logout-all', context_settings={'singular': True})
[docs] def logout_all() -> 'Callable[[CoreRequest, Framework], None]': """ Logs out all users on all sessions. """ def logout_user(request: 'CoreRequest', app: 'Framework') -> None: for user in UserCollection(app.session()).query(): count = user.logout_all_sessions(request.app) if count: click.secho('{} logged out'.format(user.username), fg='green') return logout_user
@cli.command(context_settings={'singular': True}) @click.option('--active-only', help='Only show active users', is_flag=True) @click.option('--inactive-only', help='Only show inactive users', is_flag=True) @click.option('--sources', help='Display sources', is_flag=True, default=False)
[docs] def list( active_only: bool, inactive_only: bool, sources: bool ) -> 'Callable[[CoreRequest, Framework], None]': """ Lists all users. """ assert not all((active_only, inactive_only)) def list_users(request: 'CoreRequest', app: 'Framework') -> None: users: Query[tuple[str, str, bool, str | None]] users = UserCollection(app.session()).query().with_entities( User.username, User.role, User.active, User.source ) users = users.order_by(User.username, User.role) for username, role, active, source in users.all(): if active_only and not active: continue if inactive_only and active: continue print( '{active} {username} [{role}]{source}'.format( active='✔︎' if active else '✘', username=username, role=role, source=f' {{{source}}}' if sources else '' ) ) return list_users
@cli.command(name='change-password', context_settings={'singular': True}) @click.argument('username') @click.option('--password', help='Password to use', default=None)
[docs] def change_password( username: str, password: str | None ) -> 'Callable[[CoreRequest, Framework], None]': """ Changes the password of the given username. """ def change(request: 'CoreRequest', app: 'Framework') -> None: users = UserCollection(app.session()) user = users.by_username(username) if user is None: abort('{} does not exist'.format(username)) nonlocal password password = password or getpass('Enter password: ') user.password = password user.logout_all_sessions(request.app) click.secho("{}'s password was changed".format(username), fg='green') return change
@cli.command(name='change-yubikey', context_settings={'singular': True}) @click.argument('username') @click.option('--yubikey', help='Yubikey to use', default=None)
[docs] def change_yubikey( username: str, yubikey: str | None ) -> 'Callable[[CoreRequest, Framework], None]': """ Changes the yubikey of the given username. """ def change(request: 'CoreRequest', app: 'Framework') -> None: users = UserCollection(app.session()) user = users.by_username(username) if user is None: abort('{} does not exist'.format(username)) nonlocal yubikey yubikey = (yubikey or getpass('Enter yubikey: ')).strip()[:12] yubikey = yubikey.strip() if yubikey: user.second_factor = { 'type': 'yubikey', 'data': yubikey } else: user.second_factor = None user.logout_all_sessions(request.app) click.secho("{}'s yubikey was changed".format(username), fg='green') return change
@cli.command(name='change-mtan', context_settings={'singular': True}) @click.argument('username') @click.option('--phone-number', help='Phone number to use', default=None)
[docs] def change_mtan( username: str, phone_number: str | None ) -> 'Callable[[CoreRequest, Framework], None]': """ Changes the yubikey of the given username. """ def change(request: 'CoreRequest', app: 'Framework') -> None: users = UserCollection(app.session()) user = users.by_username(username) if user is None: abort(f'{username} does not exist') nonlocal phone_number phone_number = (phone_number or getpass('Enter phone number: ')) phone_number = phone_number.strip() if phone_number: try: number_obj = phonenumbers.parse(phone_number, 'CH') except Exception: abort(f'Failed to parse {phone_number} as a phone number') if not ( phonenumbers.is_valid_number(number_obj) and phonenumbers.is_possible_number(number_obj) ): abort(f'{phone_number} is not a valid phone number') phone_number = phonenumbers.format_number( number_obj, phonenumbers.PhoneNumberFormat.E164 ) user.second_factor = { 'type': 'mtan', 'data': phone_number } else: user.second_factor = None user.logout_all_sessions(request.app) click.secho(f"{username}'s phone number was changed", fg='green') return change
@cli.command(name='change-totp', context_settings={'singular': True}) @click.argument('username') @click.option('--secret', help='TOTP secret to use', default=None) @click.option( '--generate', help='Generate a new TOTP secret to use', is_flag=True, default=False )
[docs] def change_totp( username: str, secret: str | None, generate: bool ) -> 'Callable[[CoreRequest, Framework], None]': """ Changes the yubikey of the given username. """ def change(request: 'CoreRequest', app: 'Framework') -> None: users = UserCollection(app.session()) user = users.by_username(username) if user is None: abort(f'{username} does not exist') if generate: totp_secret = pyotp.random_base32() elif secret is None: totp_secret = getpass('Enter secret: ').strip() else: totp_secret = secret.strip() if totp_secret: user.second_factor = { 'type': 'totp', 'data': totp_secret } else: user.second_factor = None user.logout_all_sessions(request.app) click.secho(f"{username}'s TOTP secret was changed", fg='green') if generate: click.echo(f'Generated secret: {totp_secret}') return change
@cli.command(name='transfer-yubikey', context_settings={'singular': True}) @click.argument('source') @click.argument('target')
[docs] def transfer_yubikey( source: str, target: str ) -> 'Callable[[CoreRequest, Framework], None]': """ Transfers the Yubikey from one user to another. """ def transfer(request: 'CoreRequest', app: 'Framework') -> None: users = UserCollection(app.session()) source_user = users.by_username(source) if source_user is None: abort('{} does not exist'.format(source)) target_user = users.by_username(target) if target_user is None: abort('{} does not exist'.format(target)) if not source_user.second_factor: abort('{} is not linked to a yubikey'.format(source)) if target_user.second_factor: abort('{} is already linked to a yubikey'.format(target)) target_user.second_factor = source_user.second_factor source_user.second_factor = None target_user.logout_all_sessions(request.app) source_user.logout_all_sessions(request.app) click.secho( 'yubikey was transferred from {} to {}'.format(source, target), fg='green' ) return transfer
@cli.command(name='change-role', context_settings={'singular': True}) @click.argument('username') @click.argument('role')
[docs] def change_role( username: str, role: str ) -> 'Callable[[CoreRequest, Framework], None]': """ Changes the role of the given username. """ def change(request: 'CoreRequest', app: 'Framework') -> None: users = UserCollection(app.session()) user = users.by_username(username) if user is None: abort('{} does not exist'.format(username)) user.role = role user.logout_all_sessions(request.app) click.secho("{}'s role was changed".format(username), fg='green') return change
@cli.command(name='list-sessions', context_settings={'singular': True})
[docs] def list_sessions() -> 'Callable[[CoreRequest, Framework], None]': """ Lists all sessions of all users. """ def list_sessions(request: 'CoreRequest', app: 'Framework') -> None: for user in UserCollection(app.session()).query(): if user.sessions: click.secho('{}'.format(user.username), fg='yellow') for session in user.sessions.values(): session = session or {} # type:ignore[unreachable] print('{} [{}] "{}"'.format( session.get('address') or '?', session.get('timestamp') or '?', session.get('agent') or '?', )) return list_sessions