from __future__ import annotations
from datetime import datetime
from onegov.core.crypto import hash_password, verify_password
from onegov.core.orm import Base
from onegov.core.orm.mixins import data_property, dict_property, TimestampMixin
from onegov.core.orm.types import LowercaseText
from onegov.core.security import forget, remembered
from onegov.core.utils import is_valid_yubikey_format
from onegov.core.utils import remove_repeated_dots
from onegov.core.utils import remove_repeated_spaces
from onegov.core.utils import yubikey_otp_to_serial
from onegov.search import ORMSearchable
from onegov.user.i18n import _
from onegov.user.models.group import UserGroup
from sedate import utcnow
from sqlalchemy import func
from sqlalchemy import Index, UniqueConstraint
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import mapped_column, relationship, DynamicMapped, Mapped
from uuid import uuid4, UUID
from typing import Any, TYPE_CHECKING
if TYPE_CHECKING:
from collections.abc import Sequence
from onegov.api.models import ApiKey
from onegov.core.framework import Framework
from onegov.core.request import CoreRequest
from onegov.ticket import Ticket
from onegov.user import RoleMapping
from sqlalchemy import ColumnElement, Table
from typing_extensions import TypedDict
[docs]
class SessionDict(TypedDict):
# HACK: We experienced flaky behavior with mypy when importing this
# symbol normally, so for now we'll just declare what this
# symbol is, so it doesn't have to retrieved from the other module
# eventually we can hopefully get rid of this again and just
# import normally.
group_association_table: Table
else:
from onegov.user.models.group import group_association_table
[docs]
class User(Base, TimestampMixin, ORMSearchable):
""" Defines a generic user. """
[docs]
__tablename__ = 'users'
#: the type of the item, this can be used to create custom polymorphic
#: subclasses of this class. See
#: `<https://docs.sqlalchemy.org/en/improve_toc/\
#: orm/extensions/declarative/inheritance.html>`_.
[docs]
type: Mapped[str] = mapped_column(default=lambda: 'generic')
[docs]
__mapper_args__ = {
'polymorphic_on': type,
'polymorphic_identity': 'generic',
}
[docs]
fts_type_title = _('Users')
[docs]
fts_title_property = 'title'
[docs]
fts_properties = {
'username': {'type': 'text', 'weight': 'A'},
'realname': {'type': 'text', 'weight': 'A'},
'userprofile': {'type': 'text', 'weight': 'B'}
}
@property
[docs]
def fts_suggestion(self) -> tuple[str, str]:
return (self.realname or self.username, self.username)
@property
[docs]
def userprofile(self) -> list[str]:
if not self.data:
return []
return [
value
for value in self.data.values()
if value and isinstance(value, str)
]
#: the user id is a uuid because that's more secure (no id guessing)
[docs]
id: Mapped[UUID] = mapped_column(
primary_key=True,
default=uuid4
)
#: the username may be any string, but will usually be an email address
[docs]
username: Mapped[str] = mapped_column(
LowercaseText,
unique=True
)
#: the password is stored with the hashing algorithm defined by onegov.core
[docs]
password_hash: Mapped[str]
#: the role is relevant for security in onegov.core
#: the group this user belongs to
[docs]
groups: Mapped[list[UserGroup]] = relationship(
secondary=group_association_table,
back_populates='users',
passive_deletes=True,
)
#: the real name of the user for display (use the :attr:`name` property
#: to automatically get the name or the username)
[docs]
realname: Mapped[str | None]
#: extra data that may be stored with this user, the format and content
#: of this data is defined by the consumer of onegov.user
#: by default, this data is only loaded by request, so if you need to
#: load a lot of data columns, do something like this::
#:
#: session.query(User).options(undefer("data"))
#:
[docs]
data: Mapped[dict[str, Any]] = mapped_column(deferred=True)
#: two-factor authentication schemes are enabled with this property
#: if no two-factor auth is used, the value is NULL, if one *is* used,
#: there should be a dictionary with the type of the two-factor
#: authentication as well as custom values required by the two-factor
#: implementation.
#:
#: e.g.::
#:
#: {
#: 'type': 'yubikey',
#: 'data': 'ccccccbcgujh'
#: }
#:
#: Note that 'data' could also be a nested dictionary!
#:
[docs]
second_factor: Mapped[dict[str, Any] | None]
#: A string describing where the user came from, None if internal.
#
#: Internal users may login using a password, which they may also change.
#
#: External users may not login using a password, nor can they ask for one.
#
#: A user can technically come from changing providers - the source refers
#: to the last provider he used.
[docs]
source: Mapped[str | None]
#: A string describing the user id on the source, which is an id that is
#: supposed never change (unlike the username, which may change).
#:
#: If set, the source_id is unique per source.
[docs]
source_id: Mapped[str | None]
#: true if the user is active
[docs]
active: Mapped[bool] = mapped_column(default=True)
#: timestamp of the last successful login
[docs]
last_login: Mapped[datetime | None]
#: the signup token used by the user
[docs]
signup_token: Mapped[str | None]
[docs]
__table_args__ = (
Index('lowercase_username', func.lower(username), unique=True),
UniqueConstraint('source', 'source_id', name='unique_source_id'),
)
#: the role mappings associated with this user
[docs]
role_mappings: DynamicMapped[RoleMapping] = relationship(
back_populates='user'
)
#: the api keys associated with this user
[docs]
api_keys: DynamicMapped[ApiKey] = relationship(
cascade='all,delete-orphan',
back_populates='user'
)
#: the tickets assigned to this user
[docs]
tickets: Mapped[list[Ticket]] = relationship(back_populates='user')
@hybrid_property
[docs]
def title(self) -> str:
""" Returns the realname or the username of the user, depending on
what's available first. """
if self.realname is None:
return self.username
if self.realname.strip():
return self.realname
return self.username
@title.inplace.expression
@classmethod
[docs]
def _title_expression(cls) -> ColumnElement[str]:
return func.coalesce(
func.nullif(func.trim(cls.realname), ''), cls.username
)
@hybrid_property
[docs]
def password(self) -> str:
""" An alias for :attr:`password_hash`. """
return self.password_hash
@password.inplace.setter
def _password_setter(self, value: str) -> None:
""" When set, the given password in cleartext is hashed using
onegov.core's default hashing algorithm.
"""
self.password_hash = hash_password(value)
[docs]
def is_matching_password(self, password: str) -> bool:
""" Returns True if the given password (cleartext) matches the
stored password hash.
"""
return verify_password(password, self.password_hash)
@classmethod
[docs]
def get_initials(cls, username: str, realname: str | None = None) -> str:
""" Takes the name and returns initials which are at most two
characters wide.
Examples:
admin => A
nathan.drake@example.org => ND
Victor Sullivan => VS
Charles Montgomery Burns => CB
"""
parts: Sequence[str]
# for e-mail addresses assume the dot splits the name and use
# the first two parts of said split (probably won't have a middle
# name in the e-mail address)
if realname is None or not realname.strip():
username = username.split('@')[0]
parts = remove_repeated_dots(username.strip('.')).split('.')[:2]
# for real names split by space and assume that with more than one
# part that the first and last part are the most important to get rid
# of middlenames
else:
parts = remove_repeated_spaces(realname.strip()).split(' ')
if len(parts) > 2:
parts = (parts[0], parts[-1])
return ''.join(p[0] for p in parts if p).upper() or '?'
@property
[docs]
def initials(self) -> str:
return self.get_initials(self.username, self.realname)
@property
[docs]
def has_yubikey(self) -> bool:
if not self.second_factor:
return False
return self.second_factor.get('type') == 'yubikey'
@property
[docs]
def yubikey(self) -> str | None:
if not self.has_yubikey:
return None
assert self.second_factor is not None
return self.second_factor.get('data')
@yubikey.setter
def yubikey(self, yubikey: str | None) -> None:
if not yubikey:
self.second_factor = None
else:
assert is_valid_yubikey_format(yubikey)
self.second_factor = {
'type': 'yubikey',
'data': yubikey[:12]
}
@property
[docs]
def yubikey_serial(self) -> int | None:
""" Returns the yubikey serial of the yubikey associated with this
user (if any).
"""
yubikey = self.yubikey
return yubikey and yubikey_otp_to_serial(yubikey) or None
@property
[docs]
def mtan_phone_number(self) -> str | None:
if not self.second_factor:
return None
if self.second_factor.get('type') != 'mtan':
return None
return self.second_factor.get('data')
#: sessions of this user
[docs]
sessions: dict_property[dict[str, SessionDict] | None] = data_property()
#: tags of this user
#: the phone number of this user
[docs]
phone_number: dict_property[str | None] = data_property()
[docs]
def cleanup_sessions(self, app: Framework) -> None:
""" Removes stored sessions not valid anymore. """
self.sessions = self.sessions or {}
for session_id in list(self.sessions.keys()):
if not remembered(app, session_id):
del self.sessions[session_id]
[docs]
def save_current_session(self, request: CoreRequest) -> None:
""" Stores the current browser session. """
self.sessions = self.sessions or {}
self.sessions[request.browser_session._token] = {
'address': request.client_addr,
'timestamp': utcnow().replace(tzinfo=None).isoformat(),
'agent': request.user_agent
}
self.cleanup_sessions(request.app)
[docs]
def remove_current_session(self, request: CoreRequest) -> None:
""" Removes the current browser session. """
token = request.browser_session._token
if self.sessions and token and token in self.sessions:
del self.sessions[token]
self.cleanup_sessions(request.app)
[docs]
def logout_all_sessions(self, app: Framework) -> int:
""" Terminates all open browser sessions. """
self.sessions = self.sessions or {}
count = len(self.sessions)
for session_id in self.sessions:
forget(app, session_id)
self.cleanup_sessions(app)
return count