Source code for user.auth.core

from __future__ import annotations

import morepath

from itsdangerous import URLSafeSerializer, BadData
from itsdangerous.encoding import base64_encode, base64_decode
from secrets import token_bytes
from onegov.core.utils import relative_url
from onegov.user import log
from onegov.user.auth.second_factor import SECOND_FACTORS
from onegov.user.collections import UserCollection
from onegov.user.errors import ExpiredSignupLinkError
from sedate import utcnow
from webob import Response


from typing import TYPE_CHECKING
if TYPE_CHECKING:
    from morepath.authentication import Identity, NoIdentity
    from onegov.core.request import CoreRequest
    from onegov.user import User, UserApp
    from onegov.user.forms import RegistrationForm
    from typing_extensions import TypedDict
    from typing import Self

[docs] class SignupToken(TypedDict):
[docs] role: str
[docs] max_uses: int
[docs] expires: int
[docs] class Auth: """ Defines a model for authentication methods like login/logout. Applications should use this model to implement authentication views. """
[docs] identity_class = morepath.Identity
def __init__( self, app: UserApp, # FIXME: For now we allow None, because purl.URL will default # to '/' for None, which is used by relative_url, but # we should probably be more vigilant about this... to: str | None = '/', skip: bool = False, signup_token: str | None = None, signup_token_secret: str | None = None ):
[docs] self.app = app
[docs] self.session = app.session()
[docs] self.application_id = app.application_id
[docs] self.signup_token = signup_token
[docs] self.signup_token_secret = signup_token_secret or getattr( app, 'identity_secret', None)
# never redirect to an external page, this might potentially be used # to trick the user into thinking he's on our page after entering his # password and being redirected to a phishing site.
[docs] self.to = relative_url(to)
[docs] self.skip = skip
# initialize nth factors
[docs] self.factors = {}
for type, cls in SECOND_FACTORS.items(): obj = cls.configure(**cls.args_from_app(app)) if obj is not None: self.factors[type] = obj @classmethod
[docs] def from_request( cls, request: CoreRequest, to: str | None = '/', skip: bool = False, signup_token: str | None = None ) -> Self: return cls( request.app, # type:ignore[arg-type] to, skip, signup_token )
@classmethod
[docs] def from_request_path( cls, request: CoreRequest, skip: bool = False, signup_token: str | None = None ) -> Self: return cls.from_request( request, request.transform(request.path), skip, signup_token)
@property
[docs] def users(self) -> UserCollection: return UserCollection(self.session)
[docs] def redirect(self, request: CoreRequest, path: str) -> Response: return morepath.redirect(request.transform(path))
[docs] def skippable(self, request: CoreRequest) -> bool: """ Returns true if the login for the current `to` target is optional (i.e. it is not required to access the page). This should only be used on protected pages as public pages would always be skipppable. Therefore it has to be enabled manually by specifying `skip=True` on the :class:`Auth` class. """ if not self.skip: return False # this is the default parameter, we won't skip to it in any case if self.to == '/': return False try: return request.has_access_to_url(self.to) except KeyError: return False
[docs] def apply_second_factor( self, request: CoreRequest, user: User, second_factor_value: str | None ) -> Response | bool: """ Applies the second factor if applicable. :return: false if the second factor was invalid, a response if the second factor needs to be activated or requires a two step process and true otherwise """ if not user.second_factor: for factor in self.factors.values(): if factor.self_activation: response = factor.start_activation(request, self) if response is not None: return response return True try: factor = self.factors[user.second_factor['type']] except KeyError as exc: raise NotImplementedError from exc if factor.kind == 'single_step': if not second_factor_value: return False return factor.is_valid( request=request, user=user, factor=second_factor_value ) else: return factor.send_challenge( request=request, user=user, auth=self )
[docs] def authenticate( self, request: CoreRequest, username: str, password: str, client: str = 'unknown', second_factor: str | None = None, skip_providers: bool = False ) -> User | Response | None: """ Takes the given username and password and matches them against the users collection. This does not login the user, use :meth:`login_to` to accomplish that. :param username: The username to authenticate. :param password: The password of the user (clear-text). :param client: The client address of the user (i.e. his IP address). :param second_factor: The value of the second factor or None. :param skip_providers: In special cases where e.g. an LDAP-Provider is a source of users but can't offer the password for authentication, you can login using the application database. :return: The matched user or a response to complete the second factor authentication, if successful, or None. """ from onegov.user.integration import UserApp # circular import user = None source = None if isinstance(self.app, UserApp) and not skip_providers: for provider in self.app.providers.values(): if not provider.available(self.app): continue if provider.kind == 'integrated': user = provider.authenticate_user( request=request, username=username, password=password) if user: source = user.source break # fall back to default, only if it didn't work otherwise user = user or self.users.by_username_and_password(username, password) def fail() -> None: log.info(f'Failed login by {client} ({username})') return None if user is None: return fail() # type:ignore[func-returns-value] if not user.active: return fail() # type:ignore[func-returns-value] # only built-in users currently support second factors if source is None: try: response = self.apply_second_factor( request, user, second_factor ) if response is True: pass elif response is False: return fail() # type:ignore[func-returns-value] else: # we need to remember which user we already authenticated # the username and password for, so the second step can # be completed without entering the password again request.browser_session['pending_username'] = user.username return response except Exception as e: log.info(f'Second factor exception for user {user.username}: ' f'{e.args[0]}') return None # users from external authentication providers may not login using # a regular login - if for some reason the source is false (if the # authentication system is switched) - the source column has to be # set to NULL if user.source != source: return fail() # type:ignore[func-returns-value] log.info(f'Successful login by {client} ({username})') return user
[docs] def as_identity(self, user: User) -> Identity: """ Returns the morepath identity of the given user. """ return self.identity_class( # FIXME: We should consider switching to user.id.hex # for userid and provide username in a separate field # instead, but this was the less intrusive step # in the meantime. We use identity.userid in quite # a few places after all. userid=user.username, uid=user.id.hex, groupid=user.group_id.hex if user.group_id else '', role=user.role, application_id=self.application_id )
[docs] def by_identity(self, identity: Identity | NoIdentity) -> User | None: """ Returns the user record of the given identity. """ if identity.userid is None: return None return self.users.by_username(identity.userid)
[docs] def login_to( self, username: str, password: str, request: CoreRequest, second_factor: str | None = None, skip_providers: bool = False ) -> Response | None: """ Takes a user login request and remembers the user if the authentication completes successfully. :param username: The username to log in. :param password: The password to log in (cleartext). :param request: The request of the user. :param second_factor: The second factor, if any. :skip_providers: Pass option skip_providers to skip any configured auth providers. :return: A redirect response to ``self.to`` with the identity remembered as a cookie. If not successful, None is returned. """ user = self.authenticate( request=request, username=username, password=password, client=request.client_addr or 'unknown', second_factor=second_factor, skip_providers=skip_providers ) if user is None or isinstance(user, Response): return user return self.complete_login(user, request)
[docs] def complete_login( self, user: User, request: CoreRequest ) -> Response: """ Takes a user record, remembers its session and returns a proper redirect response to complete the login. This method is mostly useful inside onegov.user. You probably want to use :meth:`complete_login` outside of that. """ assert user is not None identity = self.as_identity(user) to: str | None assert hasattr(request.app, 'redirect_after_login') to = request.app.redirect_after_login(identity, request, self.to) to = to or self.to response = self.redirect(request, to) # Rotate the session ID if 'session_id' in request.cookies: del request.cookies['session_id'] if 'browser_session' in request.__dict__: del request.__dict__['browser_session'] request.app.remember_identity(response, request, identity) if hasattr(request.app, 'on_login'): request.app.on_login(request, user) user.save_current_session(request) response.completed_login = True # type:ignore[attr-defined] return response
[docs] def logout_to( self, request: CoreRequest, to: str | None = None ) -> Response: """ Logs the current user out and redirects to ``to`` or ``self.to``. :return: A response redirecting to ``self.to`` with the identity forgotten. """ user = self.by_identity(request.identity) if user is not None: user.remove_current_session(request) response = self.redirect(request, to or self.to) request.app.forget_identity(response, request) return response
[docs] def new_signup_token( self, role: str, max_age: int = 24 * 60 * 60, max_uses: int = 1 ) -> str: """ Returns a signup token which can be used for users to register themselves, directly gaining the given role. Signup tokens are recorded on the user to make sure that only the requested amount of uses is allowed. """ serializer = self.signup_token_serializer serialized = serializer.dumps({ 'role': role, 'max_uses': max_uses, 'expires': int(utcnow().replace(tzinfo=None).timestamp()) + max_age }) assert serializer.salt is not None encoded_salt = base64_encode(serializer.salt).decode('ascii') return f'{serialized}.{encoded_salt}'
@property
[docs] def signup_token_serializer(self) -> URLSafeSerializer: assert self.signup_token_secret return URLSafeSerializer( self.signup_token_secret, salt=token_bytes(16) )
[docs] def decode_signup_token(self, token: str) -> SignupToken | None: try: serialized, _, encoded_salt = token.rpartition('.') if not serialized: # the separator wasn't part of the token return None salt = base64_decode(encoded_salt) return self.signup_token_serializer.loads(serialized, salt=salt) except BadData: return None
@property
[docs] def permitted_role_for_registration(self) -> str | None: """ Returns the permitted role for the current signup token. """ if not self.signup_token: return 'member' assert self.signup_token_secret params = self.decode_signup_token(self.signup_token) if not params: return None if params['expires'] < int(utcnow().replace(tzinfo=None).timestamp()): return None signups = ( UserCollection(self.session) .by_signup_token(self.signup_token).count() ) if signups >= params['max_uses']: return None return params['role']
[docs] def register( self, form: RegistrationForm, request: CoreRequest ) -> User: """ Registers the user using the information on the registration form. Takes the signup token into account to provide the user with the proper role. See :meth:`onegov.user.collections.UserCollection.register_user` for more information. """ role = self.permitted_role_for_registration if role is None: raise ExpiredSignupLinkError() assert form.username.data is not None assert form.password.data is not None return UserCollection(self.session).register( username=form.username.data, password=form.password.data, request=request, role=role, signup_token=self.signup_token )