from functools import cached_property
from onegov.core.orm import orm_cached
from onegov.core.request import CoreRequest
from onegov.core.security import Private
from onegov.core.utils import normalize_for_url
from onegov.org.models import News, TANAccessCollection, Topic
from onegov.page import Page, PageCollection
from onegov.user import User
from sedate import utcnow
from sqlalchemy.orm import noload
from typing import Any, NamedTuple, TYPE_CHECKING
if TYPE_CHECKING:
from collections.abc import Generator, Iterable
from onegov.org.app import OrgApp
from onegov.ticket import Ticket
[docs]
class PageMeta(NamedTuple):
[docs]
is_visible_on_homepage: bool | None
[docs]
children: tuple['PageMeta', ...]
[docs]
def link(
self,
request: 'OrgRequest',
variables: dict[str, Any] | None = None,
name: str = '',
) -> str:
if variables is not None:
variables['absorb'] = self.path
else:
variables = {'absorb': self.path}
return request.class_link(
Topic if self.type == 'topic' else News,
variables,
name
)
[docs]
class OrgRequest(CoreRequest):
if TYPE_CHECKING:
@cached_property
[docs]
def is_manager(self) -> bool:
""" Returns true if the current user is logged in, and has the role
editor or admin.
"""
return self.has_role('admin', 'editor')
[docs]
def is_manager_for_model(self, model: object) -> bool:
return self.has_permission(model, Private)
@cached_property
[docs]
def is_admin(self) -> bool:
""" Returns true if the current user is an admin.
"""
return self.has_role('admin')
@cached_property
[docs]
def is_editor(self) -> bool:
""" Returns true if the current user is an editor.
"""
return self.has_role('editor')
@property
[docs]
def current_username(self) -> str | None:
return self.identity .userid if self.identity else None
@cached_property
[docs]
def current_user(self) -> User | None:
if not self.identity:
return None
return (
self.session.query(User)
.filter_by(username=self.identity.userid)
.first()
)
@cached_property
[docs]
def first_admin_available(self) -> User | None:
return self.session.query(User).filter_by(role='admin').order_by(
User.created).first()
@cached_property
[docs]
def auto_accept_user(self) -> User | None:
username = self.app.org.auto_closing_user
user: User | None = None
if username:
user = (
self.session.query(User)
.filter_by(username=username, role='admin')
.first()
)
return user or self.first_admin_available
@cached_property
[docs]
def email_for_new_tickets(self) -> str | None:
return self.app.org.email_for_new_tickets
@cached_property
[docs]
def active_mtan_session(self) -> bool:
mtan_verified = self.browser_session.get('mtan_verified')
if mtan_verified is None:
return False
session_duration = self.app.org.mtan_session_duration
if mtan_verified + session_duration < utcnow():
return False
return True
@cached_property
[docs]
def mtan_accesses(self) -> TANAccessCollection:
return TANAccessCollection(
self.session,
session_id=self.browser_session.mtan_number,
access_window=self.app.org.mtan_access_window
)
@cached_property
[docs]
def mtan_access_limit_exceeded(self) -> bool:
limit = self.app.org.mtan_access_window_requests
if limit is None:
# no limit so we can't exceed it
return False
# if we're below the limit we're fine
if self.mtan_accesses.count() < limit:
return False
# if we already accessed this url we are also still fine
return self.mtan_accesses.by_url(self.path_url) is None
[docs]
def auto_accept(self, ticket: 'Ticket') -> bool:
if self.app.org.ticket_auto_accept_style == 'role':
roles = self.app.org.ticket_auto_accept_roles
if not roles:
return False
return self.has_role(*roles)
return ticket.handler_code in (self.app.org.ticket_auto_accepts or ())
@orm_cached(policy='on-table-change:pages', by_role=True)
[docs]
def pages_tree(self) -> tuple[PageMeta, ...]:
"""
This is the entire pages tree preloaded into the individual
parent/children attributes. We optimize this as much as possible
by performing the recursive join in Python, rather than SQL.
"""
query = PageCollection(self.session).query(ordered=False)
query = query.options(
# we populate these relationship ourselves
noload(Page.parent),
noload(Page.children),
)
query = query.order_by(Page.order)
# first we build a map from parent_ids to their children
parent_to_child: dict[int | None, list[Page]] = {}
for page in query:
parent_to_child.setdefault(page.parent_id, []).append(page)
def extend_path(page: Page, path: str | None) -> str:
if page.type == 'news' and path is None:
# the root news page is not part of the path
return ''
return f'{path}/{page.name}' if path else page.name
def generate_subtree(
parent_id: int | None,
path: str | None
) -> tuple[PageMeta, ...]:
return tuple(
PageMeta(
id=page.id,
type=page.type,
title=page.title,
access=page.meta.get('access', 'public'),
published=published,
path=(subpath := extend_path(page, path)),
is_visible_on_homepage=page.meta.get('is_visible_on_homepage'),
children=tuple(generate_subtree(page.id, subpath))
)
for page in parent_to_child.get(parent_id, ())
if self.is_visible(page)
if (published := getattr(page, 'published', True))
or self.is_manager
)
# we return the root pages which should contain references to all
# the child pages
return generate_subtree(None, None)
@orm_cached(policy='on-table-change:pages', by_role=True)
[docs]
def root_pages(self) -> tuple[PageMeta, ...]:
def include(page: PageMeta) -> bool:
if page.type != 'news':
return True
return True if page.children else False
return tuple(p for p in self.pages_tree if include(p))
@orm_cached(policy='on-table-change:pages', by_role=True)
[docs]
def homepage_pages(self) -> dict[int, list[PageMeta]]:
def visit_topics(
pages: 'Iterable[PageMeta]',
root_id: int | None = None
) -> 'Generator[tuple[int, PageMeta]]':
for page in pages:
if page.type != 'topic':
continue
if root_id is not None and page.is_visible_on_homepage:
yield root_id, page
yield from visit_topics(
page.children,
root_id=root_id or page.id
)
result: dict[int, list[PageMeta]] = {}
for root_id, meta in visit_topics(self.root_pages):
result.setdefault(root_id, []).append(meta)
for topics in result.values():
topics.sort(
key=lambda p: normalize_for_url(p.title)
)
return result