Source code for org.request

from functools import cached_property
from onegov.core.orm import orm_cached
from onegov.core.request import CoreRequest
from onegov.core.security import Private
from onegov.core.utils import normalize_for_url
from onegov.org.models import News, TANAccessCollection, Topic
from onegov.page import Page, PageCollection
from onegov.user import User
from sedate import utcnow
from sqlalchemy.orm import noload


from typing import Any, NamedTuple, TYPE_CHECKING
if TYPE_CHECKING:
    from collections.abc import Generator, Iterable
    from onegov.org.app import OrgApp
    from onegov.ticket import Ticket


[docs] class PageMeta(NamedTuple):
[docs] id: int
[docs] type: str
[docs] title: str
[docs] access: str
[docs] published: bool
[docs] is_visible_on_homepage: bool | None
[docs] path: str
[docs] children: tuple['PageMeta', ...]
[docs] class OrgRequest(CoreRequest): if TYPE_CHECKING:
[docs] app: 'OrgApp'
@cached_property
[docs] def is_manager(self) -> bool: """ Returns true if the current user is logged in, and has the role editor or admin. """ return self.has_role('admin', 'editor')
[docs] def is_manager_for_model(self, model: object) -> bool: return self.has_permission(model, Private)
@cached_property
[docs] def is_admin(self) -> bool: """ Returns true if the current user is an admin. """ return self.has_role('admin')
@cached_property
[docs] def is_editor(self) -> bool: """ Returns true if the current user is an editor. """ return self.has_role('editor')
@property
[docs] def current_username(self) -> str | None: return self.identity .userid if self.identity else None
@cached_property
[docs] def current_user(self) -> User | None: if not self.identity: return None return ( self.session.query(User) .filter_by(username=self.identity.userid) .first() )
@cached_property
[docs] def first_admin_available(self) -> User | None: return self.session.query(User).filter_by(role='admin').order_by( User.created).first()
@cached_property
[docs] def auto_accept_user(self) -> User | None: username = self.app.org.auto_closing_user user: User | None = None if username: user = ( self.session.query(User) .filter_by(username=username, role='admin') .first() ) return user or self.first_admin_available
@cached_property
[docs] def email_for_new_tickets(self) -> str | None: return self.app.org.email_for_new_tickets
@cached_property
[docs] def active_mtan_session(self) -> bool: mtan_verified = self.browser_session.get('mtan_verified') if mtan_verified is None: return False session_duration = self.app.org.mtan_session_duration if mtan_verified + session_duration < utcnow(): return False return True
@cached_property
[docs] def mtan_accesses(self) -> TANAccessCollection: return TANAccessCollection( self.session, session_id=self.browser_session.mtan_number, access_window=self.app.org.mtan_access_window )
@cached_property
[docs] def mtan_access_limit_exceeded(self) -> bool: limit = self.app.org.mtan_access_window_requests if limit is None: # no limit so we can't exceed it return False # if we're below the limit we're fine if self.mtan_accesses.count() < limit: return False # if we already accessed this url we are also still fine return self.mtan_accesses.by_url(self.path_url) is None
[docs] def auto_accept(self, ticket: 'Ticket') -> bool: if self.app.org.ticket_auto_accept_style == 'role': roles = self.app.org.ticket_auto_accept_roles if not roles: return False return self.has_role(*roles) return ticket.handler_code in (self.app.org.ticket_auto_accepts or ())
@orm_cached(policy='on-table-change:pages', by_role=True)
[docs] def pages_tree(self) -> tuple[PageMeta, ...]: """ This is the entire pages tree preloaded into the individual parent/children attributes. We optimize this as much as possible by performing the recursive join in Python, rather than SQL. """ query = PageCollection(self.session).query(ordered=False) query = query.options( # we populate these relationship ourselves noload(Page.parent), noload(Page.children), ) query = query.order_by(Page.order) # first we build a map from parent_ids to their children parent_to_child: dict[int | None, list[Page]] = {} for page in query: parent_to_child.setdefault(page.parent_id, []).append(page) def extend_path(page: Page, path: str | None) -> str: if page.type == 'news' and path is None: # the root news page is not part of the path return '' return f'{path}/{page.name}' if path else page.name def generate_subtree( parent_id: int | None, path: str | None ) -> tuple[PageMeta, ...]: return tuple( PageMeta( id=page.id, type=page.type, title=page.title, access=page.meta.get('access', 'public'), published=published, path=(subpath := extend_path(page, path)), is_visible_on_homepage=page.meta.get('is_visible_on_homepage'), children=tuple(generate_subtree(page.id, subpath)) ) for page in parent_to_child.get(parent_id, ()) if self.is_visible(page) if (published := getattr(page, 'published', True)) or self.is_manager ) # we return the root pages which should contain references to all # the child pages return generate_subtree(None, None)
@orm_cached(policy='on-table-change:pages', by_role=True)
[docs] def root_pages(self) -> tuple[PageMeta, ...]: def include(page: PageMeta) -> bool: if page.type != 'news': return True return True if page.children else False return tuple(p for p in self.pages_tree if include(p))
@orm_cached(policy='on-table-change:pages', by_role=True)
[docs] def homepage_pages(self) -> dict[int, list[PageMeta]]: def visit_topics( pages: 'Iterable[PageMeta]', root_id: int | None = None ) -> 'Generator[tuple[int, PageMeta]]': for page in pages: if page.type != 'topic': continue if root_id is not None and page.is_visible_on_homepage: yield root_id, page yield from visit_topics( page.children, root_id=root_id or page.id ) result: dict[int, list[PageMeta]] = {} for root_id, meta in visit_topics(self.root_pages): result.setdefault(root_id, []).append(meta) for topics in result.values(): topics.sort( key=lambda p: normalize_for_url(p.title) ) return result