Source code for org.forms.user
from __future__ import annotations
import re
from onegov.core.utils import is_valid_yubikey_format
from onegov.directory.models.directory import Directory
from onegov.form import Form, merge_forms
from onegov.form import FormDefinition
from onegov.form.fields import ChosenSelectMultipleField
from onegov.form.fields import TagsField
from onegov.form.filters import yubikey_identifier
from onegov.org import _
from onegov.ticket import handlers
from onegov.user import User, UserGroup
from onegov.ticket import Ticket, TicketPermission
from onegov.user import UserCollection
from wtforms.fields import BooleanField
from wtforms.fields import EmailField
from wtforms.fields import RadioField
from wtforms.fields import StringField
from wtforms.validators import Email
from wtforms.validators import InputRequired
from wtforms.validators import ValidationError
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from onegov.org.request import OrgRequest
from wtforms.fields.choices import _Choice
[docs]
AVAILABLE_ROLES = [
('admin', _('Admin')),
('editor', _('Editor')),
('supporter', _('Supporter')),
('member', _('Member')),
]
[docs]
class ManageUserForm(Form):
""" Defines the edit user form. """
if TYPE_CHECKING:
[docs]
state = RadioField(
label=_('State'),
fieldset=_('General'),
default='active',
choices=(
('active', _('Active')),
('inactive', _('Inactive'))
),
)
[docs]
role = RadioField(
label=_('Role'),
fieldset=_('General'),
choices=AVAILABLE_ROLES,
default='member',
)
[docs]
yubikey = StringField(
label=_('Yubikey'),
fieldset=_('General'),
description=_('Plug your YubiKey into a USB slot and press it.'),
filters=(yubikey_identifier, ),
render_kw={'autocomplete': 'off'}
)
@property
@active.setter
def active(self, value: bool) -> None:
self.state.data = value and 'active' or 'inactive'
[docs]
def on_request(self) -> None:
# hide roles that are not configured for the current app
roles_setting = self.request.app.settings.roles
self.role.choices = [
(role, label)
for role, label in AVAILABLE_ROLES
if hasattr(roles_setting, role)
]
self.request.include('tags-input')
[docs]
def populate_obj(self, model: User) -> None: # type:ignore
if (
model.role != self.role.data
or model.active != self.active
):
model.logout_all_sessions(self.request.app)
super().populate_obj(model)
model.active = self.active
[docs]
def process_obj(self, model: User) -> None: # type:ignore
super().process_obj(model)
self.active = model.active
[docs]
def validate_yubikey(self, field: StringField) -> None:
if not self.active:
return
if not field.data:
if not self.request.app.enable_yubikey:
return
if self.role.data in ('admin', 'editor'):
raise ValidationError(_(
'Administrators and editors must use a Yubikey'
))
else:
return
if not is_valid_yubikey_format(field.data):
raise ValidationError(_('Invalid Yubikey'))
users = UserCollection(self.request.session)
user = users.by_yubikey(field.data)
if not hasattr(self, 'current_username'):
raise NotImplementedError()
if user and user.username != self.current_username:
raise ValidationError(
_('This Yubikey is already used by ${username}', mapping={
'username': user.username
})
)
[docs]
class PartialNewUserForm(Form):
""" Defines parts of the new user form not found in the manage user form.
"""
[docs]
username = EmailField(
label=_('E-Mail'),
description=_('The users e-mail address (a.k.a. username)'),
validators=[InputRequired(), Email()]
)
[docs]
send_activation_email = BooleanField(
label=_('Send Activation E-Mail with Instructions'),
default=True
)
@property
[docs]
def current_username(self) -> str:
assert self.username.data is not None
return self.username.data
[docs]
def validate_username(self, field: EmailField) -> None:
assert field.data is not None
if UserCollection(self.request.session).by_username(field.data):
raise ValidationError(
_('A user with this e-mail address exists already'))
if TYPE_CHECKING:
else:
NewUserForm = merge_forms(PartialNewUserForm, ManageUserForm)
[docs]
class ManageUserGroupForm(Form):
if TYPE_CHECKING:
[docs]
ticket_permissions = ChosenSelectMultipleField(
label=_('Ticket permissions'),
description=_(
'Restricts access and gives permission to these ticket categories'
),
choices=[],
)
[docs]
immediate_notification = ChosenSelectMultipleField(
label=_(
'Immediate e-mail notification to members upon ticket submission'
),
choices=[],
)
[docs]
def on_request(self) -> None:
self.users.choices = [
(str(u.id), u.title)
for u in UserCollection(self.request.session).query()
]
ticket_choices: list[_Choice] = [
(key, key)
for key in handlers.registry.keys()
]
ticket_choices.extend(
(f'DIR-{group}', f'DIR: {group}')
for group, in self.request.session.query(
Directory.title.label('group')
# some groups may get deleted, but as long as there are tickets
# we need a corresponding permission
).union(
self.request.session.query(
Ticket.group.label('group')
)
.filter(Ticket.handler_code == 'DIR')
.filter(Ticket.group.isnot(None))
.distinct()
).order_by('group').distinct()
)
ticket_choices.extend(
(f'FRM-{group}', f'FRM: {group}')
for group, in self.request.session.query(
FormDefinition.title.label('group')
# some groups may get deleted, but as long as there are tickets
# we need a corresponding permission
).union(
self.request.session.query(Ticket.group.label('group'))
.filter(Ticket.handler_code == 'FRM')
.filter(Ticket.group.isnot(None))
.distinct()
).order_by('group').distinct()
)
ticket_choices.sort()
self.ticket_permissions.choices = ticket_choices
if isinstance(self.immediate_notification, ChosenSelectMultipleField):
self.immediate_notification.choices = ticket_choices[:]
@property
[docs]
def exclusive_permissions(self) -> set[tuple[str, str | None]]:
return {
(match.group(1), match.group(2))
for permission in self.ticket_permissions.data or ()
if (match := TICKET_PERMISSION_RE.match(permission))
}
@property
[docs]
def immediate_notifications(self) -> set[tuple[str, str | None]]:
if not isinstance(self.immediate_notification.data, list):
return set()
return {
(match.group(1), match.group(2))
for permission in self.immediate_notification.data
if (match := TICKET_PERMISSION_RE.match(permission))
}
[docs]
def update_model(self, model: UserGroup) -> None:
session = self.request.session
# Logout the new and old users
user_ids = {str(r.id) for r in model.users.with_entities(User.id)}
user_ids |= set(self.users.data or ())
users = UserCollection(session).query()
users = users.filter(User.id.in_(user_ids))
for user in users:
if user != self.request.current_user:
user.logout_all_sessions(self.request.app)
# Update model
model.name = self.name.data
if self.users.data:
users = UserCollection(session).query()
users = users.filter(User.id.in_(self.users.data))
model.users = users.all() # type:ignore[assignment]
else:
model.users = [] # type:ignore[assignment]
exclusive_permissions = self.exclusive_permissions
immediate_notifications = self.immediate_notifications
missing = exclusive_permissions | immediate_notifications
# Update ticket permissions
# FIXME: backref across module boundaries
assert hasattr(model, 'ticket_permissions')
permissions = []
permission: TicketPermission
for permission in model.ticket_permissions:
key = (permission.handler_code, permission.group)
exclusive = key in exclusive_permissions
notification = key in immediate_notifications
if not exclusive and not notification:
# no permission object should exist
session.delete(permission)
continue
# so we don't add this permission twice
missing.discard(key)
permission.exclusive = exclusive
permission.immediate_notification = notification
permissions.append(permission)
for key in missing:
handler_code, group = key
permission = TicketPermission(
handler_code=handler_code,
group=group or None,
user_group=model,
exclusive=key in exclusive_permissions,
immediate_notification=key in immediate_notifications,
)
session.add(permission)
permissions.append(permission)
model.ticket_permissions = permissions
[docs]
def apply_model(self, model: UserGroup) -> None:
self.name.data = model.name
self.users.data = [str(u.id) for u in model.users]
# FIXME: backref across module boundaries
assert hasattr(model, 'ticket_permissions')
self.ticket_permissions.data = [
f'{permission.handler_code}-{permission.group}'
if permission.group else permission.handler_code
for permission in model.ticket_permissions
if permission.exclusive
]
if isinstance(self.immediate_notification, ChosenSelectMultipleField):
self.immediate_notification.data = [
f'{permission.handler_code}-{permission.group}'
if permission.group else permission.handler_code
for permission in model.ticket_permissions
if permission.immediate_notification
]