from __future__ import annotations
import morepath
import ua_parser
from datetime import timedelta
from functools import cached_property
from onegov.core.cache import instance_lru_cache
from onegov.core.utils import append_query_param
from itsdangerous import (
BadSignature,
SignatureExpired,
TimestampSigner,
URLSafeSerializer,
URLSafeTimedSerializer
)
from more.content_security import ContentSecurityRequest
from more.webassets.core import IncludeRequest
from morepath.authentication import NO_IDENTITY
from morepath.request import SAME_APP
from onegov.core import utils
from onegov.core.crypto import random_token
from webob.exc import HTTPForbidden
from wtforms.csrf.session import SessionCSRF
from typing import overload, Any, NamedTuple, TypeVar, TYPE_CHECKING
if TYPE_CHECKING:
from _typeshed import SupportsItems
from collections.abc import Callable, Iterable, Iterator
from dectate import Sentinel
from gettext import GNUTranslations
from markupsafe import Markup
from morepath.authentication import Identity, NoIdentity
from onegov.core import Framework
from onegov.core.browser_session import BrowserSession
from onegov.core.i18n.translation_string import TranslationMarkup
from onegov.core.security.permissions import Intent
from onegov.core.types import MessageType
from sqlalchemy import Column
from sqlalchemy.orm import Session
from translationstring import _ChameleonTranslate
from typing import Literal, Protocol, TypeGuard
from webob import Response
from webob.multidict import MultiDict
from webob.request import _FieldStorageWithFile
from wtforms import Form
from uuid import UUID
from .templates import TemplateLoader
[docs]
_BaseRequest = morepath.Request
# NOTE: To avoid a dependency between onegov.core and onegov.user
# we use a UserLike Protocol to define the properties we need
# to be present on a user.
class UserLike(Protocol):
@property
def id(self) -> UUID | Column[UUID]: ...
@property
def username(self) -> str | Column[str]: ...
@property
def group_id(self) -> UUID | Column[UUID | None] | None: ...
@property
def role(self) -> str | Column[str]: ...
else:
_BaseRequest = object
[docs]
_F = TypeVar('_F', bound='Form')
[docs]
class Message(NamedTuple):
[docs]
class ReturnToMixin(_BaseRequest):
""" Provides a safe and convenient way of using return-to links.
Return-to links are links with an added 'return-to' query parameter
which points to the url a specific view (usually with a form) should
return to, once all is said and done.
There's no magic involved. If a view should honor the return-to
paramter, it should use request.redirect instead of morepath.redirect.
If no return-to parameter was specified, rqeuest.redirect is a
transparent proxy to morepath.redirect.
To create a link::
url = request.return_to(original_url, redirect)
To honor the paramter in a view, if present::
return request.redirect(default_url)
*Do not use the return-to parameter directly*. Redirect parameters
are notorious for being used in phising attacks. By using ``return_to``
and ``redirect`` you are kept safe from these attacks as the redirect
url is signed and verified.
For the same reason you should not allow the user-data for return-to
links. Those are meant for internally generated links!
"""
@property
[docs]
def identity_secret(self) -> str:
raise NotImplementedError
@property
[docs]
def redirect_signer(self) -> URLSafeSerializer:
return URLSafeSerializer(self.identity_secret, 'return-to')
@instance_lru_cache(maxsize=16)
[docs]
def sign_url_for_redirect(self, url: str) -> str:
return self.redirect_signer.dumps(url)
[docs]
def return_to(self, url: str, redirect: str) -> str:
signed = self.sign_url_for_redirect(redirect)
return utils.append_query_param(url, 'return-to', signed)
[docs]
def return_here(self, url: str) -> str:
return self.return_to(url, self.url)
[docs]
def redirect(self, url: str) -> Response:
if 'return-to' in self.GET:
try:
url = self.redirect_signer.loads(self.GET['return-to'])
except BadSignature:
pass
return morepath.redirect(url)
[docs]
def is_logged_in(identity: Identity | NoIdentity) -> TypeGuard[Identity]:
return identity is not NO_IDENTITY
[docs]
class CoreRequest(IncludeRequest, ContentSecurityRequest, ReturnToMixin):
""" Extends the default Morepath request with virtual host support and
other useful methods.
Virtual hosting might be supported by Morepath directly in the future:
https://github.com/morepath/morepath/issues/185
"""
@cached_property
[docs]
def identity_secret(self) -> str:
return self.app.identity_secret
@cached_property
[docs]
def session(self) -> Session:
return self.app.session()
[docs]
def link_prefix(
self,
app: Framework | None = None # type:ignore[override]
) -> str:
""" Override the `link_prefix` with the application base path provided
by onegov.server, because the default link_prefix contains the
hostname, which is not useful in our case - we'll add the hostname
ourselves later.
"""
return getattr(app or self.app, 'application_base_path', '')
@cached_property
[docs]
def x_vhm_host(self) -> str:
""" Return the X_VHM_HOST variable or an empty string.
X_VHM_HOST acts like a prefix to all links generated by Morepath.
If this variable is not empty, it will be added in front of all
generated urls.
"""
return self.headers.get('X_VHM_HOST', '').rstrip('/')
@cached_property
[docs]
def x_vhm_root(self) -> str:
""" Return the X_VHM_ROOT variable or an empty string.
X_VHM_ROOT is a bit more tricky than X_VHM_HOST. It tells Morepath
where the root of the application is situated. This means that the
value of X_VHM_ROOT must be an existing path inside of Morepath.
We can understand this best with an example. Let's say you have a
Morepath application that serves a blog under /blog. You now want to
serve the blog under a separate domain, say blog.example.org.
If we just served Morepath under blog.example.org, we'd get urls like
this one::
blog.example.org/blog/posts/2014-11-17-16:00
In effect, this subdomain would be no different from example.org
(without the blog subdomain). However, we want the root of the host to
point to /blog.
To do this we set X_VHM_ROOT to /blog. Morepath will then automatically
return urls like this::
blog.example.org/posts/2014-11-17-16:00
"""
return self.headers.get('X_VHM_ROOT', '').rstrip('/')
@cached_property
[docs]
def path_url(self) -> str:
""" Returns the path_url, taking the virtual hosting in account. """
return self.transform(self.path)
@cached_property
[docs]
def application_url(self) -> str:
""" Extends the default application_url with virtual host suport. """
# FIXME: Technically this is not guaranteed to be URL safe, but the
# same is already true for X_VHM_ROOT and X_VHM_HOST, if we
# want to be able to deal with this properly we should add
# a function that does the same thing webob does internally
return self.transform(self.script_name).rstrip('/')
@overload # type:ignore[override]
[docs]
def link(
self,
obj: None,
name: str = ...,
default: None = ...,
app: Framework | Sentinel = ...,
query_params: SupportsItems[str, str] | None = ...,
fragment: str | None = ...,
) -> None: ...
@overload
def link(
self,
obj: None,
name: str,
default: _T,
app: Framework | Sentinel = ...,
query_params: SupportsItems[str, str] | None = ...,
fragment: str | None = ...,
) -> _T: ...
@overload
def link(
self,
obj: object,
name: str = ...,
default: Any = ...,
app: Framework | Sentinel = ...,
query_params: SupportsItems[str, str] | None = ...,
fragment: str | None = ...,
) -> str: ...
def link(
self,
obj: object,
name: str = '',
default: _T | None = None,
app: Framework | Sentinel = SAME_APP,
query_params: SupportsItems[str, str] | None = None,
fragment: str | None = None,
) -> str | _T | None:
""" Extends the default link generating function of Morepath. """
query_params = query_params or {}
result = self.transform(
super().link(obj, name=name, default=default, app=app)
)
for key, value in query_params.items():
result = append_query_param(result, key, value)
if fragment:
result += f'#{fragment}'
return result
[docs]
def class_link(
self,
model: type[Any],
variables: dict[str, Any] | None = None,
name: str = '',
app: Framework | Sentinel = SAME_APP, # type:ignore[override]
) -> str:
""" Extends the default class link generating function of Morepath. """
return self.transform(super().class_link(
model,
variables=variables,
name=name,
app=app
))
[docs]
def filestorage_link(self, path: str) -> str | None:
""" Takes the given filestorage path and returns an url if the path
exists. The url might point to the local server or it might point to
somehwere else on the web.
"""
app = self.app
if app.filestorage is None:
return None
if not app.filestorage.exists(path):
return None
if app.filestorage.hasurl(path):
url = app.filestorage.geturl(path)
if not url.startswith('file://'):
return url
return self.link(app.modules.filestorage.FilestorageFile(path))
@cached_property
[docs]
def theme_link(self) -> str:
""" Returns the link to the current theme. Computed once per request.
The theme is automatically compiled and stored if it doesn't exist yet,
or if it is outdated.
"""
theme = self.app.settings.core.theme
assert theme is not None, 'Do not call if no theme is used'
force = self.app.always_compile_theme or (
self.app.allow_shift_f5_compile
and self.headers.get('cache-control') == 'no-cache'
and self.headers.get('x-requested-with') != 'XMLHttpRequest')
filename = self.app.modules.theme.compile(
self.app.themestorage, theme, self.app.theme_options,
force=force
)
return self.link(self.app.modules.theme.ThemeFile(filename))
@cached_property
[docs]
def browser_session(self) -> BrowserSession:
""" Returns a browser_session bound to the request. Works via cookies,
so requests without cookies won't be able to use the browser_session.
The browser session is bound to the application (by id), so no session
data is shared between the applications.
If no data is written to the browser_session, no session_id cookie
is created.
The session_id is rotated when users log in but not when they log out,
that way we can still identify them and send messages when they log
out.
"""
if 'session_id' in self.cookies:
session_id = self.app.unsign(self.cookies['session_id'])
if session_id is None:
# NOTE: this ensures the new session_id actually gets stored
# since on_dirty does nothing if the cookie exists
# otherwise we'll be stuck with an invalid session_id
# until we delete the cookie manually and will get
# infinite CSRF errors
del self.cookies['session_id']
session_id = random_token()
else:
session_id = random_token()
def on_dirty(session: BrowserSession, token: str) -> None:
if 'session_id' in self.cookies:
return
self.cookies['session_id'] = self.app.sign(token)
@self.after
def store_session(response: morepath.Response) -> None:
response.set_cookie(
'session_id',
self.cookies['session_id'],
secure=self.app.identity_secure,
httponly=True,
samesite=self.app.same_site_cookie_policy # type:ignore
)
return self.app.modules.browser_session.BrowserSession(
cache=self.app.session_cache,
token=session_id,
on_dirty=on_dirty
)
@overload
[docs]
def translate(self, text: Markup | TranslationMarkup) -> Markup: ...
@overload
def translate(self, text: str) -> str: ...
def translate(self, text: str) -> str:
""" Translates the given text, if it's a translatable text. Also
translates mappings. """
if not hasattr(text, 'domain'):
return text
if (mapping := getattr(text, 'mapping', None)) is not None:
for key, value in mapping.items():
if hasattr(text, 'domain'):
mapping[key] = self.translator(value)
return self.translator(text)
@cached_property
[docs]
def translator(self) -> Callable[[str], str]:
""" Returns the translate function for basic string translations. """
translator = self.get_translate()
def translate(text: str) -> str:
if not hasattr(text, 'interpolate'):
return text
if translator:
return text.interpolate(translator.gettext(text))
return text.interpolate(text)
return translate
@cached_property
[docs]
def default_locale(self) -> str | None:
""" Returns the default locale. """
return self.app.default_locale
@cached_property
[docs]
def locale(self) -> str | None:
""" Returns the current locale of this request. """
settings = self.app.settings
locale = settings.i18n.locale_negotiator(self.app.locales, self)
return locale or self.app.default_locale
@cached_property
[docs]
def html_lang(self) -> str:
""" The language code for the html tag. """
return self.locale and self.locale.replace('_', '-') or ''
@overload
[docs]
def get_translate(
self,
for_chameleon: Literal[False] = False
) -> GNUTranslations | None: ...
@overload
def get_translate(
self,
for_chameleon: Literal[True]
) -> _ChameleonTranslate | None: ...
def get_translate(
self,
for_chameleon: bool = False
) -> GNUTranslations | _ChameleonTranslate | None:
""" Returns the translate method to the given request, or None
if no such method is availabe.
:for_chameleon:
True if the translate instance is used for chameleon (which is
special).
"""
if not self.app.locales:
return None
locale = self.locale
if locale is None:
return None
if for_chameleon:
return self.app.chameleon_translations.get(locale)
else:
return self.app.translations.get(locale)
[docs]
def message(self, text: str, type: MessageType) -> None:
""" Adds a message with the given type to the messages list. This
messages list may then be displayed by an application building on
onegov.core.
For example::
http://foundation.zurb.com/docs/components/alert_boxes.html
Four default types are defined on the request for easier use:
:meth:`success`
:meth:`warning`
:meth:`info`
:meth:`alert`
The messages are stored with the session and to display them, the
template using the messages should call :meth:`consume_messages`.
"""
if not self.browser_session.has('messages'):
self.browser_session.messages = [Message(text, type)]
else:
# this is a bit akward, but I don't see an easy way for this atm.
# (otoh, usually there's going to be one message only)
self.browser_session.messages = [
*self.browser_session.messages,
Message(text, type)
]
[docs]
def consume_messages(self) -> Iterator[Message]:
""" Returns the messages, removing them from the session in the
process. Call only if you can be reasonably sure that the user
will see the messages.
"""
yield from self.browser_session.pop('messages', ())
[docs]
def success(self, text: str) -> None:
""" Adds a success message. """
self.message(text, 'success')
[docs]
def warning(self, text: str) -> None:
""" Adds a warning message. """
self.message(text, 'warning')
[docs]
def info(self, text: str) -> None:
""" Adds an info message. """
self.message(text, 'info')
[docs]
def alert(self, text: str) -> None:
""" Adds an alert message. """
self.message(text, 'alert')
@cached_property
[docs]
def is_logged_in(self) -> bool:
""" Returns True if the current request is logged in at all. """
return self.identity is not NO_IDENTITY
@cached_property
[docs]
def agent(self) -> ua_parser.DefaultedResult:
""" Returns the user agent, parsed by ua-parser. """
return ua_parser.parse(self.user_agent or '').with_defaults()
[docs]
def has_permission(
self,
model: object,
permission: type[Intent] | None,
user: UserLike | None = None
) -> bool:
""" Returns True if the current or given user has the given permission
on the given model.
"""
if permission is None:
return True
identity = self.app.application_bound_identity(
user.username,
user.id.hex,
user.group_id.hex if user.group_id else None,
user.role
) if user else self.identity
return self.app._permits(identity, model, permission)
[docs]
def has_access_to_url(self, url: str) -> bool:
""" Returns true if the current user has access to the given url.
The domain part of the url is completely ignored. This method should
only be used if you have no other choice. Loading the object by
url first is slower than if you can get the object otherwise.
The initial use-case for this function is the to parameter in the
login view. If the to-url is accessible anyway, we skip the login
view.
If we can't find a view for the url, a KeyError is thrown.
"""
obj, view_name = self.app.object_by_path(url, with_view_name=True)
if obj is None:
raise KeyError("Could not find view for '{}'".format(url))
permission = self.app.permission_by_view(obj, view_name)
return self.has_permission(obj, permission)
[docs]
def exclude_invisible(self, models: Iterable[_T]) -> list[_T]:
""" Excludes models invisble to the current user from the list. """
return [m for m in models if self.is_visible(m)]
[docs]
def is_visible(self, model: object) -> bool:
""" Returns True if the given model is visible to the current user.
In addition to the `is_public` check, this checks if the model is
secret and should therefore not be visible (though it can still be
reached via URL).
"""
if not self.is_public(model):
return False
if not self.is_private(model) and hasattr(model, 'access'):
if model.access in ('secret', 'secret_mtan'):
return False
return True
[docs]
def is_public(self, model: object) -> bool:
""" Returns True if the current user has the Public permission for
the given model.
"""
return self.has_permission(model, self.app.modules.security.Public)
[docs]
def is_personal(self, model: object) -> bool:
""" Returns True if the current user has the Personal permission for
the given model.
"""
return self.has_permission(model, self.app.modules.security.Personal)
[docs]
def is_private(self, model: object) -> bool:
""" Returns True if the current user has the Private permission for
the given model.
"""
return self.has_permission(model, self.app.modules.security.Private)
[docs]
def is_secret(self, model: object) -> bool:
""" Returns True if the current user has the Secret permission for
the given model.
"""
return self.has_permission(model, self.app.modules.security.Secret)
@cached_property
[docs]
def current_role(self) -> str | None:
""" Returns the user-role of the current request, if logged in.
Otherwise, None is returned.
"""
return self.identity.role if is_logged_in(self.identity) else None
[docs]
def has_role(self, *roles: str) -> bool:
""" Returns true if the current user has any of the given roles. """
assert roles and all(roles)
return self.current_role in roles
@cached_property
[docs]
def csrf_salt(self) -> str:
if not self.browser_session.has('csrf_salt'):
self.browser_session['csrf_salt'] = random_token()
return self.browser_session['csrf_salt']
[docs]
def new_csrf_token(self, salt: str | bytes | None = None) -> bytes:
""" Returns a new CSRF token. A CSRF token can be verified
using :meth:`is_valid_csrf_token`.
Note that forms do their own CSRF protection. This is meant
for CSRF protection outside of forms.
onegov.core uses the Synchronizer Token Pattern for CSRF protection:
`<https://www.owasp.org/index.php/\
Cross-Site_Request_Forgery_%28CSRF%29_Prevention_Cheat_Sheet>`_
New CSRF tokens are signed usign a secret attached to the session (but
not sent out to the user). Clients have to return the CSRF token they
are given. The token has to match the secret, which the client doesn't
know. So an attacker would have to get access to both the cookie and
the html source to be able to forge a request.
Since cookies are marked as HTTP only (no javascript access), this
even prevents CSRF attack combined with XSS.
"""
assert salt or self.csrf_salt
salt = salt or self.csrf_salt
# use app.identity_secret here, because that's being used for
# more.itsdangerous, which uses the same algorithm
signer = TimestampSigner(self.identity_secret, salt=salt)
return signer.sign(random_token())
[docs]
def assert_valid_csrf_token(
self,
signed_value: str | bytes | None = None,
salt: str | bytes | None = None
) -> None:
""" Validates the given CSRF token and returns if it was
created by :meth:`new_csrf_token`. If there's a mismatch, a 403 is
raised.
If no signed_value is passed, it is taken from
request.params.get('csrf-token').
"""
_signed_value = signed_value or self.params.get('csrf-token')
salt = salt or self.csrf_salt
if not _signed_value:
raise HTTPForbidden()
# params on request could contain a cgi.FieldStorage, so lets make
# sure we are dealing with str or bytes
if not isinstance(_signed_value, (str, bytes)):
raise HTTPForbidden()
if not salt:
raise HTTPForbidden()
signer = TimestampSigner(self.identity_secret, salt=salt)
try:
signer.unsign(_signed_value, max_age=self.app.csrf_time_limit)
except (SignatureExpired, BadSignature) as exception:
raise HTTPForbidden() from exception
[docs]
def new_url_safe_token(
self,
data: object,
salt: str | bytes | None = None
) -> str:
""" Returns a new URL safe token. A token can be deserialized
using :meth:`load_url_safe_token`.
"""
serializer = URLSafeTimedSerializer(self.identity_secret)
return serializer.dumps(data, salt=salt)
[docs]
def load_url_safe_token(
self,
data: str | bytes | None,
salt: str | bytes | None = None,
max_age: int = 3600
) -> Any | None:
""" Deserialize a token created by :meth:`new_url_safe_token`.
If the token is invalid, None is returned.
"""
if not data:
return None
serializer = URLSafeTimedSerializer(self.identity_secret)
try:
return serializer.loads(data, salt=salt, max_age=max_age)
except (SignatureExpired, BadSignature):
return None
@cached_property
[docs]
def template_loader(self) -> TemplateLoader:
""" Returns the chameleon template loader. """
registry = self.app.config.template_engine_registry
return registry._template_loaders['.pt']