""" Contains the base application used by other applications. """
from __future__ import annotations
import re
import yaml
import base64
import hashlib
import morepath
from dectate import directive
from email.headerregistry import Address
from functools import wraps
from more.content_security import SELF
from more.content_security import NONE
from more.content_security.core import content_security_policy_tween_factory
from onegov.core import Framework, utils
from onegov.core.framework import default_content_security_policy
from onegov.core.i18n import default_locale_negotiator
from onegov.core.orm.cache import orm_cached, request_cached
from onegov.core.templates import PageTemplate
from onegov.core.widgets import transform_structure
from onegov.file import DepotApp
from onegov.form import FormApp
from onegov.gis import MapboxApp
from onegov.org import directives
from onegov.org.auth import MTANAuth
from onegov.org.initial_content import create_new_organisation
from onegov.org.models import Dashboard, Organisation, PublicationCollection
from onegov.org.request import OrgRequest
from onegov.org.theme import OrgTheme
from onegov.pay import PayApp
from onegov.reservation import LibresIntegration
from onegov.search import ElasticsearchApp
from onegov.ticket import TicketCollection
from onegov.ticket import TicketPermission
from onegov.user import UserApp
from onegov.websockets import WebsocketsApp
from purl import URL
from types import MethodType
from webob.exc import WSGIHTTPException, HTTPTooManyRequests
from typing import Any, Literal, TYPE_CHECKING
if TYPE_CHECKING:
from _typeshed import StrPath
from collections.abc import (
Callable, Collection, Iterable, Iterator, Sequence)
from more.content_security import ContentSecurityPolicy
from morepath.authentication import Identity, NoIdentity
from onegov.core.mail import Attachment
from onegov.core.types import EmailJsonDict, SequenceOrScalar
from onegov.pay import Price
from onegov.ticket.collection import TicketCount
from reg.dispatch import _KeyLookup
from webob import Response
[docs]
class OrgApp(Framework, LibresIntegration, ElasticsearchApp, MapboxApp,
DepotApp, PayApp, FormApp, UserApp, WebsocketsApp):
[docs]
serve_static_files = True
[docs]
request_class = OrgRequest
#: org directives
[docs]
homepage_widget = directive(directives.HomepageWidgetAction)
[docs]
export = directive(directives.ExportAction)
[docs]
userlinks = directive(directives.UserlinkAction)
[docs]
settings_view = directive(directives.SettingsView)
[docs]
boardlet = directive(directives.Boardlet)
#: cronjob settings
[docs]
send_ticket_statistics = True
[docs]
def is_allowed_application_id(self, application_id: str) -> bool:
""" Stops onegov.server from ever passing the request to the org
application, if the schema does not exist. This way we can host
onegov.org in a way that allows all requests to ``*.example.org``.
If the schema for ``newyork.example.org`` exists, the request is
handled. If the schema does not exist, the request is not handled.
Here we basically decide if an org exists or not.
"""
schema = self.namespace + '-' + application_id
if schema in self.known_schemas:
return True
# block invalid schemas from ever being checked
if not self.session_manager.is_valid_schema(schema):
return False
# if the schema exists, remember it
if self.session_manager.is_schema_found_on_database(schema):
self.known_schemas.add(schema)
return True
return False
# NOTE: Organisation is probably the one model we could get away with
# caching long-term without causing too many issues, but we
# would first have to prove that we actually save a significant
# amount of resources by skipping this query
@request_cached
[docs]
def org(self) -> Organisation:
# even though this could return no Organisation, this can only
# occur during setup, until after we added an Organisation, so
# outside of this very narrow use-case this should always return
# an organisation, so we pretend that it always does
return self.session().query(Organisation).first() # type:ignore
@orm_cached(policy='on-table-change:organisations')
[docs]
def homepage_template(self) -> PageTemplate:
structure = self.org.meta.get('homepage_structure')
if structure:
widgets = self.config.homepage_widget_registry.values()
return PageTemplate(transform_structure(widgets, structure))
else:
return PageTemplate('')
@orm_cached(policy='on-table-change:tickets')
[docs]
def ticket_count(self) -> TicketCount:
return TicketCollection(self.session()).get_count()
@orm_cached(policy='on-table-change:ticket_permissions')
[docs]
def ticket_permissions(self) -> dict[str, dict[str | None, list[str]]]:
result: dict[str, dict[str | None, list[str]]] = {}
for permission in self.session().query(TicketPermission).with_entities(
TicketPermission.handler_code,
TicketPermission.group,
TicketPermission.user_group_id
):
handler = result.setdefault(permission.handler_code, {})
group = handler.setdefault(permission.group, [])
group.append(permission.user_group_id.hex)
return result
@orm_cached(policy='on-table-change:files')
[docs]
def publications_count(self) -> int:
return PublicationCollection(self.session()).query().count()
[docs]
def prepare_email(
self,
reply_to: Address | str | None = None,
category: Literal['marketing', 'transactional'] = 'marketing',
receivers: SequenceOrScalar[Address | str] = (),
cc: SequenceOrScalar[Address | str] = (),
bcc: SequenceOrScalar[Address | str] = (),
subject: str | None = None,
content: str | None = None,
attachments: Iterable[Attachment | StrPath] = (),
headers: dict[str, str] | None = None,
plaintext: str | None = None
) -> EmailJsonDict:
""" Wraps :meth:`onegov.core.framework.Framework.prepare_email`,
setting the reply_to address by using the reply address from
the organisation settings.
"""
reply_to = reply_to or self.org.meta.get('reply_to', None)
if not reply_to:
assert self.mail is not None
reply_to = self.mail[category]['sender']
if isinstance(reply_to, str):
reply_to = Address(self.org.title, addr_spec=reply_to)
return super().prepare_email(
reply_to=reply_to,
category=category,
receivers=receivers,
cc=cc,
bcc=bcc,
subject=subject,
content=content,
attachments=attachments,
headers=headers,
plaintext=plaintext,
)
@property
[docs]
def theme_options(self) -> dict[str, Any]:
return self.org.theme_options or {}
@property
[docs]
def font_family(self) -> str | None:
return self.theme_options.get('font-family-sans-serif')
@property
@property
[docs]
def custom_texts(self) -> dict[str, str] | None:
return self.cache.get_or_create(
'custom_texts', self.load_custom_texts,
expiration_time=3600
)
[docs]
def load_custom_texts(self) -> dict[str, str] | None:
"""
Customer specific texts are specified in `puppet` repo, see loxo
https://gitea.seantis.ch/operations/puppet/src/branch/master/nodes/loxo.seantis.ch.yaml#L183,193
Remember to create customtexts.yml in your local dev setup
`/usr/local/var/onegov/files/<org>/customtexts.yml`
Example customtexts.yml:
```yaml
custom texts:
(en) Custom admission course agreement: I agree to attend the ..
(de) Custom admission course agreement: Ich erkläre mich bereit, ..
```
"""
fs = self.filestorage
assert fs is not None
if not fs.exists('customtexts.yml'):
return {}
with fs.open('customtexts.yml', 'r') as f:
return yaml.safe_load(f).get('custom texts', {})
@property
[docs]
def allowed_iframe_domains(self) -> list[str]:
return self.cache.get_or_create(
'allowed_iframe_domains', self.load_allowed_iframe_domains
)
[docs]
def load_allowed_iframe_domains(self) -> list[str] | None:
fs = self.filestorage
assert fs is not None
if not fs.exists('allowed_iframe_domains.yml'):
return []
with fs.open('allowed_iframe_domains.yml', 'r') as f:
return yaml.safe_load(f).get('allowed_domains', [])
@property
[docs]
def hashed_identity_key(self) -> bytes:
""" Take the sha-256 because we want a key that is 32 bytes long. """
hash_object = hashlib.sha256()
hash_object.update(self.identity_secret.encode('utf-8'))
short_key = hash_object.digest()
key_base64 = base64.b64encode(short_key)
return key_base64
@property
[docs]
def redirect_after_login(
self,
identity: Identity | NoIdentity,
request: OrgRequest, # type:ignore[override]
default: str
) -> str | None:
""" Returns the path to redirect after login, given the request and
the default login path, which is usually the current path.
Returns a path or None, if the default should be used.
"""
# if we already have a target, we do not change it
if default != '/':
return None
# we redirect to the dashboard…
dashboard = Dashboard(request)
# …if available…
if not dashboard.is_available:
return None
# …and accessible…
permission = self.permission_by_view(dashboard, view_name=None)
if not self._permits(identity, dashboard, permission):
return None
return URL(request.link(dashboard)).path()
@OrgApp.webasset_path()
[docs]
def get_shared_assets_path() -> str:
return utils.module_path('onegov.shared', 'assets/js')
@OrgApp.setting(section='i18n', name='locales')
[docs]
def get_i18n_used_locales() -> set[str]:
return {'de_CH', 'fr_CH'}
@OrgApp.setting(section='i18n', name='localedirs')
[docs]
def get_i18n_localedirs() -> list[str]:
return [
utils.module_path('onegov.org', 'locale'),
utils.module_path('onegov.form', 'locale'),
utils.module_path('onegov.user', 'locale')
]
@OrgApp.setting(section='i18n', name='default_locale')
[docs]
def get_i18n_default_locale() -> str:
return 'de_CH'
@OrgApp.setting(section='i18n', name='locale_negotiator')
[docs]
def get_locale_negotiator(
) -> Callable[[Sequence[str], OrgRequest], str | None]:
def locale_negotiator(
locales: Sequence[str],
request: OrgRequest
) -> str | None:
if request.app.org:
locales = request.app.org.locales or get_i18n_default_locale()
if isinstance(locales, str):
locales = (locales, )
return default_locale_negotiator(locales, request) or locales[0]
else:
return default_locale_negotiator(locales, request)
return locale_negotiator
@OrgApp.static_directory()
[docs]
def get_static_directory() -> str:
return 'static'
@OrgApp.template_directory()
[docs]
def get_template_directory() -> str:
return 'templates'
@OrgApp.setting(section='core', name='theme')
[docs]
def get_theme() -> OrgTheme:
return OrgTheme()
@OrgApp.setting(section='content_security_policy', name='default')
[docs]
def org_content_security_policy() -> ContentSecurityPolicy:
policy = default_content_security_policy()
policy.child_src.add(SELF)
policy.child_src.add('https://*.youtube.com')
policy.child_src.add('https://*.vimeo.com')
policy.child_src.add('https://*.infomaniak.com')
policy.child_src.add('https://checkout.stripe.com')
policy.connect_src.add(SELF)
policy.connect_src.add('https://checkout.stripe.com')
policy.connect_src.add('https://*.google-analytics.com')
policy.connect_src.add('https://stats.g.doubleclick.net')
policy.connect_src.add('https://map.geo.bs.ch')
policy.connect_src.add('https://wmts.geo.bs.ch')
policy.connect_src.add('https://maps.zg.ch')
policy.connect_src.add('https://api.mapbox.com')
policy.connect_src.add('https://stats.seantis.ch')
policy.connect_src.add('https://analytics.seantis.ch')
policy.connect_src.add('https://geodesy.geo.admin.ch')
policy.connect_src.add('https://wms.geo.admin.ch/')
policy.connect_src.add('https://*.projuventute.ch')
policy.connect_src.add('https://cdn.jsdelivr.net')
policy.connect_src.add('https://*.usercentrics.eu')
policy.script_src.add('https:')
return policy
@OrgApp.setting(section='org', name='create_new_organisation')
[docs]
def get_create_new_organisation_factory(
) -> Callable[[OrgApp, str], Organisation]:
return create_new_organisation
@OrgApp.setting(section='org', name='status_mail_roles')
[docs]
def get_status_mail_roles() -> Collection[str]:
return ('admin', 'editor')
@OrgApp.setting(section='org', name='ticket_manager_roles')
[docs]
def get_ticket_manager_roles() -> Collection[str]:
return ('admin', 'editor')
@OrgApp.setting(section='org', name='require_complete_userprofile')
[docs]
def get_require_complete_userprofile() -> bool:
return False
@OrgApp.setting(section='org', name='is_complete_userprofile')
[docs]
def get_is_complete_userprofile_handler(
) -> Callable[[OrgRequest, str], bool]:
def is_complete_userprofile(request: OrgRequest, username: str) -> bool:
return True
return is_complete_userprofile
@OrgApp.setting(section='org', name='default_directory_search_widget')
@OrgApp.setting(section='org', name='default_event_search_widget')
@OrgApp.setting(section='org', name='public_ticket_messages')
[docs]
def get_public_ticket_messages() -> Collection[str]:
""" Returns a list of message types which are availble on the ticket
status page, visible to anyone that knows the unguessable url.
"""
# do *not* add ticket_note here, those are private!
return (
'directory',
'event',
'payment',
'reservation',
'submission',
'ticket',
'ticket_chat',
'translator_mutation'
)
@OrgApp.setting(section='org', name='disabled_extensions')
[docs]
def get_disabled_extensions() -> Collection[str]:
return ()
@OrgApp.tween_factory(under=content_security_policy_tween_factory)
[docs]
def enable_iframes_tween_factory(
app: OrgApp,
handler: Callable[[OrgRequest], Response]
) -> Callable[[OrgRequest], Response]:
no_iframe_paths = (
r'/auth/.*',
r'/manage/.*'
)
no_iframe_paths_re = re.compile(rf"({'|'.join(no_iframe_paths)})")
iframe_paths = (
r'/events/.*',
r'/event/.*',
r'/news/.*',
r'/directories/.*',
r'/resources/.*',
r'/resource/.*',
r'/topics/.*',
)
iframe_paths_re = re.compile(rf"({'|'.join(iframe_paths)})")
def enable_iframes_tween(
request: OrgRequest
) -> Response:
""" Enables iframes. """
result = handler(request)
# Allow iframes for other pages for certain paths
if no_iframe_paths_re.match(request.path_info or '/'):
request.content_security_policy.frame_ancestors = {NONE}
elif iframe_paths_re.match(request.path_info or '/'):
request.content_security_policy.frame_ancestors.add('http://*')
request.content_security_policy.frame_ancestors.add('https://*')
# Allow certain domains as iframes on our pages
for domain in (app.allowed_iframe_domains or []):
request.content_security_policy.child_src.add(domain)
return result
return enable_iframes_tween
@OrgApp.webasset_path()
[docs]
def get_js_path() -> str:
return 'assets/js'
@OrgApp.webasset_path()
[docs]
def get_css_path() -> str:
return 'assets/css'
@OrgApp.webasset_output()
[docs]
def get_webasset_output() -> str:
return 'assets/bundles'
@OrgApp.webasset('sortable')
[docs]
def get_sortable_asset() -> Iterator[str]:
yield 'sortable.js'
yield 'sortable_custom.js'
@OrgApp.webasset('fullcalendar')
[docs]
def get_fullcalendar_asset() -> Iterator[str]:
yield 'fullcalendar.css'
yield 'fullcalendar.js'
yield 'fullcalendar.de.js'
yield 'fullcalendar.fr.js'
yield 'reservationcalendar.jsx'
yield 'reservationcalendar_custom.js'
@OrgApp.webasset('reservationlist')
[docs]
def get_reservation_list_asset() -> Iterator[str]:
yield 'reservationlist.jsx'
yield 'reservationlist_custom.js'
@OrgApp.webasset('code_editor')
[docs]
def get_code_editor_asset() -> Iterator[str]:
yield 'ace.js'
yield 'ace-mode-form.js'
yield 'ace-mode-markdown.js'
yield 'ace-mode-xml.js'
yield 'ace-mode-yaml.js'
yield 'ace-theme-tomorrow.js'
yield 'formcode'
yield 'code_editor.js'
@OrgApp.webasset('editor')
[docs]
def get_editor_asset() -> Iterator[str]:
yield 'bufferbuttons.js'
yield 'definedlinks.js'
yield 'filemanager.js'
yield 'imagemanager.js'
yield 'table.js'
yield 'redactor.de.js'
yield 'redactor.fr.js'
yield 'redactor.it.js'
yield 'input_with_button.js'
yield 'editor.js'
@OrgApp.webasset('timeline')
[docs]
def get_timeline_asset() -> Iterator[str]:
yield 'timeline.jsx'
# do NOT minify the redactor, or the copyright notice goes away, which
# is something we are not allowed to do per our license
@OrgApp.webasset('redactor', filters={'js': None})
[docs]
def get_redactor_asset() -> Iterator[str]:
yield 'redactor.js'
yield 'redactor.css'
@OrgApp.webasset('upload')
[docs]
def get_upload_asset() -> Iterator[str]:
yield 'upload.js'
@OrgApp.webasset('editalttext')
[docs]
def get_editalttext_asset() -> Iterator[str]:
yield 'editalttext.js'
@OrgApp.webasset('prompt')
[docs]
def get_prompt() -> Iterator[str]:
yield 'prompt.jsx'
@OrgApp.webasset('photoswipe')
[docs]
def get_photoswipe_asset() -> Iterator[str]:
yield 'photoswipe.css'
yield 'photoswipe-skin.css'
yield 'photoswipe.js'
yield 'photoswipe-ui.js'
yield 'photoswipe-custom.js'
@OrgApp.webasset('tags-input')
@OrgApp.webasset('filedigest')
[docs]
def get_filehash() -> Iterator[str]:
yield 'asmcrypto-lite.js'
yield 'filedigest.js'
@OrgApp.webasset('monthly-view')
[docs]
def get_monthly_view() -> Iterator[str]:
yield 'daypicker.js'
yield 'monthly-view.jsx'
@OrgApp.webasset('common')
[docs]
def get_common_asset() -> Iterator[str]:
yield 'global.js'
yield 'polyfills.js'
yield 'jquery.datetimepicker.css'
yield 'locale.js'
yield 'modernizr.js'
yield 'clipboard.js'
yield 'jquery.js'
yield 'foundation.js'
yield 'foundation.alert.js'
yield 'foundation.dropdown.js'
yield 'foundation.orbit.js'
yield 'foundation.reveal.js'
yield 'foundation.topbar.js'
yield 'intercooler.js'
yield 'underscore.js'
yield 'react.js'
yield 'react-dom.js'
yield 'form_dependencies.js'
yield 'confirm.jsx'
yield 'typeahead.jsx'
yield 'moment.js'
yield 'moment.de-ch.js'
yield 'moment.fr-ch.js'
yield 'jquery.datetimepicker.js'
yield 'datetimepicker.js'
yield 'many.jsx'
yield 'pay'
yield 'jquery.mousewheel.js'
yield 'jquery.popupoverlay.js'
yield 'jquery.load.js'
yield 'videoframe.js'
yield 'url.js'
yield 'date-range-selector.js'
yield 'lazyalttext.js'
yield 'lazysizes.js'
yield 'toggle.js'
yield 'common.js'
yield '_blank.js'
yield 'forms.js'
yield 'internal_link_check.js'
yield 'tickets.js'
yield 'items_selectable.js'
yield 'notifications.js'
yield 'foundation.accordion.js'
@OrgApp.webasset('fontpreview')
[docs]
def get_fontpreview_asset() -> Iterator[str]:
yield 'fontpreview.js'
@OrgApp.webasset('scroll-to-username')
@OrgApp.webasset('all_blank')
[docs]
def get_all_blank_asset() -> Iterator[str]:
yield 'all_blank.js'
@OrgApp.webasset('people-select')
[docs]
def people_select_asset() -> Iterator[str]:
yield 'people-select.js'
[docs]
def wrap_with_mtan_hook(
func: Callable[[OrgApp, Any, OrgRequest], Any]
) -> Callable[[OrgApp, Any, OrgRequest], Any]:
@wraps(func)
def wrapped(self: OrgApp, obj: Any, request: OrgRequest) -> Any:
response = func(self, obj, request)
if (
# only do the mTAN redirection stuff if the original view didn't
# return a client or server error
not (
isinstance(response, WSGIHTTPException)
and response.code >= 400
)
and getattr(obj, 'access', None) in ('mtan', 'secret_mtan')
# managers don't require mtan authentication
and not request.is_manager
):
# no active mtan session, redirect to mtan auth view
if not request.active_mtan_session:
auth = MTANAuth(self, request.path_url)
return morepath.redirect(request.link(auth, name='request'))
# access limit exceeded
if request.mtan_access_limit_exceeded:
return HTTPTooManyRequests()
# record access
request.mtan_accesses.add(url=request.path_url)
return response
return wrapped
[docs]
class KeyLookupWithMTANHook:
def __init__(self, key_lookup: _KeyLookup):
[docs]
self.key_lookup = key_lookup
[docs]
def component(
self,
key: Sequence[Any]
) -> Callable[..., Any] | None:
result = self.key_lookup.component(key)
if result is None:
return None
return wrap_with_mtan_hook(result)
[docs]
def fallback(
self,
key: Sequence[Any]
) -> Callable[..., Any] | None:
result = self.key_lookup.fallback(key)
if result is None:
return None
return wrap_with_mtan_hook(result)
[docs]
def all(
self,
key: Sequence[Any]
) -> Iterator[Callable[..., Any]]:
return self.key_lookup.all(key)