Source code for org.request

from __future__ import annotations

from functools import cached_property
from onegov.core.custom import msgpack
from onegov.core.orm import orm_cached
from onegov.core.request import CoreRequest
from onegov.core.security import Private
from onegov.core.utils import normalize_for_url
from onegov.org.models import News, TANAccessCollection, Topic
from onegov.page import Page, PageCollection
from onegov.user import User
from sedate import utcnow
from sqlalchemy import inspect
from sqlalchemy.orm import noload


from typing import Any, NamedTuple, TYPE_CHECKING
if TYPE_CHECKING:
    from collections.abc import Generator, Iterable
    from onegov.core.analytics import AnalyticsProvider
    from onegov.org.app import OrgApp
    from onegov.ticket import Ticket


@msgpack.make_serializable(tag=20)
[docs] class PageMeta(NamedTuple):
[docs] id: int
[docs] type: str
[docs] title: str
[docs] access: str
[docs] published: bool
[docs] is_visible_on_homepage: bool | None
[docs] path: str
[docs] children: tuple[PageMeta, ...]
[docs] class OrgRequest(CoreRequest): if TYPE_CHECKING:
[docs] app: OrgApp
@cached_property
[docs] def is_manager(self) -> bool: """ Returns true if the current user is logged in, and has the role editor or admin. """ return self.has_role('admin', 'editor')
[docs] def is_manager_for_model(self, model: object) -> bool: return self.has_permission(model, Private)
@cached_property
[docs] def is_admin(self) -> bool: """ Returns true if the current user is an admin. """ return self.has_role('admin')
@cached_property
[docs] def is_editor(self) -> bool: """ Returns true if the current user is an editor. """ return self.has_role('editor')
@cached_property
[docs] def is_supporter(self) -> bool: """ Returns true if the current user is a supporter. """ return self.has_role('supporter')
@cached_property
[docs] def is_member(self) -> bool: """ Returns true if the current user is a member. """ return self.has_role('member')
@property
[docs] def current_username(self) -> str | None: return self.identity.userid if self.identity else None
# NOTE: Internal cache, don't use directly!
[docs] _current_user: User | None
@property
[docs] def current_user(self) -> User | None: if not self.identity: return None if not hasattr(self, '_current_user'): self._current_user = ( self.session.query(User) .filter_by(username=self.identity.userid) .first() ) return self._current_user if self._current_user is None: return None # NOTE: In rare cases our cached version of the User object # gets detached, in which case we need to merge it # back into the current session, when we try to access it # otherwise accessing deferred attributes can result in # exceptions. if inspect(self._current_user).detached: self._current_user = self.session.merge(self._current_user) return self._current_user
@cached_property
[docs] def authenticated_email(self) -> str | None: """ Used for granting access to private information that isn't necessarily tied to a registered user. """ return self.browser_session.get('authenticated_email')
@cached_property
[docs] def first_admin_available(self) -> User | None: return self.session.query(User).filter_by(role='admin').order_by( User.created).first()
@cached_property
[docs] def auto_accept_user(self) -> User | None: username = self.app.org.auto_closing_user user: User | None = None if username: user = ( self.session.query(User) .filter_by(username=username, role='admin') .first() ) return user or self.first_admin_available
@cached_property
[docs] def email_for_new_tickets(self) -> str | None: return self.app.org.email_for_new_tickets
@cached_property
[docs] def active_mtan_session(self) -> bool: mtan_verified = self.browser_session.get('mtan_verified') if mtan_verified is None: return False session_duration = self.app.org.mtan_session_duration if mtan_verified + session_duration < utcnow(): return False return True
@cached_property
[docs] def mtan_accesses(self) -> TANAccessCollection: return TANAccessCollection( self.session, session_id=self.browser_session.mtan_number, access_window=self.app.org.mtan_access_window )
@cached_property
[docs] def mtan_access_limit_exceeded(self) -> bool: limit = self.app.org.mtan_access_window_requests if limit is None: # no limit so we can't exceed it return False # if we're below the limit we're fine if self.mtan_accesses.count() < limit: return False # if we already accessed this url we are also still fine return self.mtan_accesses.by_url(self.path_url) is None
[docs] def auto_accept(self, ticket: Ticket) -> bool: if self.app.org.ticket_auto_accept_style == 'role': roles = self.app.org.ticket_auto_accept_roles if not roles: return False return self.has_role(*roles) return ticket.handler_code in (self.app.org.ticket_auto_accepts or ())
@orm_cached(policy='on-table-change:pages', by_role=True)
[docs] def pages_tree(self) -> tuple[PageMeta, ...]: """ This is the entire pages tree preloaded into the individual parent/children attributes. We optimize this as much as possible by performing the recursive join in Python, rather than SQL. """ query = PageCollection(self.session).query(ordered=False) query = query.options( # we populate these relationship ourselves noload(Page.parent), noload(Page.children), ) query = query.order_by(Page.order) # first we build a map from parent_ids to their children parent_to_child: dict[int | None, list[Page]] = {} for page in query: parent_to_child.setdefault(page.parent_id, []).append(page) def extend_path(page: Page, path: str | None) -> str: if page.type == 'news' and path is None: # the root news page is not part of the path return '' return f'{path}/{page.name}' if path else page.name def generate_subtree( parent_id: int | None, path: str | None ) -> tuple[PageMeta, ...]: return tuple( PageMeta( id=page.id, type=page.type, title=page.title, access=page.meta.get('access', 'public'), published=published, path=(subpath := extend_path(page, path)), is_visible_on_homepage=page.meta.get('is_visible_on_homepage'), children=tuple(generate_subtree(page.id, subpath)) ) for page in parent_to_child.get(parent_id, ()) if self.is_visible(page) if (published := getattr(page, 'published', True)) or self.is_manager ) # we return the root pages which should contain references to all # the child pages return generate_subtree(None, None)
@orm_cached(policy='on-table-change:pages', by_role=True)
[docs] def root_pages(self) -> tuple[PageMeta, ...]: def include(page: PageMeta) -> bool: if page.type != 'news': return True return True if page.children else False return tuple(p for p in self.pages_tree if include(p))
@orm_cached(policy='on-table-change:pages', by_role=True)
[docs] def homepage_pages(self) -> dict[int, list[PageMeta]]: def visit_topics( pages: Iterable[PageMeta], root_id: int | None = None ) -> Generator[tuple[int, PageMeta]]: for page in pages: if page.type != 'topic': continue if root_id is not None and page.is_visible_on_homepage: yield root_id, page yield from visit_topics( page.children, root_id=root_id or page.id ) result: dict[int, list[PageMeta]] = {} for root_id, meta in visit_topics(self.root_pages): result.setdefault(root_id, []).append(meta) for topics in result.values(): topics.sort( key=lambda p: normalize_for_url(p.title) ) return result
@property
[docs] def analytics_provider(self) -> AnalyticsProvider | None: """ Returns the active analytics provider. """ if name := self.app.org.analytics_provider_name: return self.app.available_analytics_providers.get(name) return None