from __future__ import annotations
from datetime import datetime
from onegov.core.collection import Pagination
from onegov.core.orm.mixins import (
content_property, dict_markup_property, dict_property, meta_property)
from onegov.form import Form, move_fields
from onegov.org import _
from onegov.org.forms import LinkForm, PageForm, IframeForm
from onegov.org.models.atoz import AtoZ
from onegov.org.models.extensions import (
InheritableContactExtension, ContactHiddenOnPageExtension,
PeopleShownOnMainPageExtension, ImageExtension,
NewsletterExtension, PublicationExtension, DeletableContentExtension,
InlinePhotoAlbumExtension
)
from onegov.org.models.extensions import AccessExtension
from onegov.org.models.extensions import CoordinatesExtension
from onegov.org.models.extensions import GeneralFileLinkExtension
from onegov.org.models.extensions import PersonLinkExtension
from onegov.org.models.extensions import VisibleOnHomepageExtension
from onegov.org.models.extensions import SidebarLinksExtension
from onegov.org.models.traitinfo import TraitInfo
from onegov.org.observer import observes
from onegov.page import Page
from onegov.page.collection import AdjacencyListCollection, PageCollection
from onegov.search import SearchableContent
from sedate import replace_timezone
from sqlalchemy import desc, func, or_, and_
from sqlalchemy.dialects.postgresql import array, JSON
from sqlalchemy.orm import undefer, object_session
from typing import Any, TYPE_CHECKING
if TYPE_CHECKING:
from onegov.org.request import OrgRequest, PageMeta
from sqlalchemy.orm import Query, Session
from typing import Self
[docs]
class Topic(Page, TraitInfo, SearchableContent, AccessExtension,
PublicationExtension, VisibleOnHomepageExtension,
InheritableContactExtension, ContactHiddenOnPageExtension,
PeopleShownOnMainPageExtension, PersonLinkExtension,
CoordinatesExtension, ImageExtension,
GeneralFileLinkExtension, SidebarLinksExtension,
InlinePhotoAlbumExtension):
[docs]
__mapper_args__ = {'polymorphic_identity': 'topic'}
[docs]
es_type_name = 'topics'
[docs]
lead: dict_property[str | None] = content_property()
[docs]
text = dict_markup_property('content')
[docs]
url: dict_property[str | None] = content_property()
[docs]
as_card: dict_property[str | None] = content_property()
[docs]
height: dict_property[str | None] = content_property()
# Show the lead on topics page
[docs]
lead_when_child: dict_property[bool] = content_property(default=True)
@property
[docs]
def es_skip(self) -> bool:
return self.meta.get('trait') == 'link' # do not index links
@property
[docs]
def es_public(self) -> bool:
return self.access == 'public' and self.published
@property
[docs]
def deletable(self) -> bool:
""" Returns true if this page may be deleted. """
return True
@property
[docs]
def editable(self) -> bool:
return True
@property
[docs]
def url_changeable(self) -> bool:
"""Open for all topics, even root ones."""
return True
@property
[docs]
def paste_target(self) -> Topic | News:
if self.trait == 'link':
return self.parent or self # type:ignore[return-value]
if self.trait == 'page':
return self
raise NotImplementedError
@property
[docs]
def allowed_subtraits(self) -> tuple[str, ...]:
if self.trait == 'link':
return ()
if self.trait == 'page':
return ('page', 'link', 'iframe')
if self.trait == 'iframe':
return ()
raise NotImplementedError
[docs]
def is_supported_trait(self, trait: str) -> bool:
return trait in {'link', 'page', 'iframe'}
[docs]
class News(Page, TraitInfo, SearchableContent, NewsletterExtension,
AccessExtension, PublicationExtension, VisibleOnHomepageExtension,
InheritableContactExtension, ContactHiddenOnPageExtension,
PeopleShownOnMainPageExtension, PersonLinkExtension,
CoordinatesExtension, ImageExtension, GeneralFileLinkExtension,
DeletableContentExtension, InlinePhotoAlbumExtension):
[docs]
__mapper_args__ = {'polymorphic_identity': 'news'}
[docs]
lead: dict_property[str | None] = content_property()
[docs]
text = dict_markup_property('content')
[docs]
url: dict_property[str | None] = content_property()
[docs]
filter_years: list[int] = []
@property
[docs]
def es_public(self) -> bool:
return self.access == 'public' and self.published
@observes('content')
[docs]
def content_observer(self, content: dict[str, Any]) -> None:
self.hashtags = self.es_tags or []
@property
[docs]
def absorb(self) -> str: # type:ignore[override]
return ''.join(self.path.split('/', 1)[1:])
@property
[docs]
def deletable(self) -> bool:
return self.parent_id is not None
@property
[docs]
def editable(self) -> bool:
return True
@property
[docs]
def url_changeable(self) -> bool:
"""Open for all topics, even root ones."""
return self.parent_id is not None
@property
[docs]
def paste_target(self) -> Topic | News:
if self.parent:
return self.parent # type:ignore[return-value]
else:
return self
@property
[docs]
def allowed_subtraits(self) -> tuple[str, ...]:
# only allow one level of news
if self.parent is None:
return ('news', )
else:
return ()
[docs]
def is_supported_trait(self, trait: str) -> bool:
return trait in {'news'}
[docs]
def get_root_page_form_class(self, request: OrgRequest) -> type[Form]:
return self.with_content_extensions(
Form, request, extensions=(
InheritableContactExtension, ContactHiddenOnPageExtension,
PersonLinkExtension, AccessExtension
)
)
[docs]
def for_year(self, year: int) -> News:
years_ = set(self.filter_years)
years = list(years_ - {year} if year in years_ else years_ | {year})
return News( # type:ignore[misc]
id=self.id,
title=self.title,
name=self.name,
filter_years=sorted(years),
filter_tags=sorted(self.filter_tags)
)
[docs]
def for_tag(self, tag: str) -> News:
tags_ = set(self.filter_tags)
tags = list(tags_ - {tag} if tag in tags_ else tags_ | {tag})
return News( # type:ignore[misc]
id=self.id,
title=self.title,
name=self.name,
filter_years=sorted(self.filter_years),
filter_tags=sorted(tags)
)
@classmethod
[docs]
def news_query_for(
cls,
self: News | PageMeta,
limit: int | None = 2,
published_only: bool = True,
session: Session | None = None,
) -> Query[News]:
if session is None:
session = object_session(self)
news = session.query(News)
if isinstance(self, News):
# avoid a redundant relationship load when we can
news = news.filter(Page.parent == self)
else:
news = news.filter(Page.parent_id == self.id)
if published_only:
news = news.filter(
News.publication_started == True,
News.publication_ended == False
)
year_filters = []
for year in getattr(self, 'filter_years', ()):
start = replace_timezone(datetime(year, 1, 1), 'UTC')
year_filters.append(
and_(
News.published_or_created >= start,
News.published_or_created < start.replace(year=year + 1)
)
)
if year_filters:
news = news.filter(or_(*year_filters))
if filter_tags := getattr(self, 'filter_tags', None):
news = news.filter(
News.meta['hashtags'].has_any(array(filter_tags))
)
news = news.order_by(desc(News.published_or_created))
news = news.options(undefer('created'))
news = news.options(undefer('content'))
news = news.limit(limit)
sticky = func.json_extract_path_text(
func.cast(News.meta, JSON), 'is_visible_on_homepage') == 'true'
sticky_news = news.limit(None)
sticky_news = sticky_news.filter(sticky)
return news.union(sticky_news).order_by(
desc(News.published_or_created))
[docs]
def news_query(
self,
limit: int | None = 2,
published_only: bool = True
) -> Query[News]:
return self.news_query_for(self, limit, published_only)
@property
[docs]
def all_years(self) -> list[int]:
query = object_session(self).query(News)
query = query.with_entities(
func.date_part('year', Page.published_or_created))
query = query.group_by(
func.date_part('year', Page.published_or_created))
query = query.filter(Page.parent == self)
return sorted((int(year) for year, in query), reverse=True)
@property
[docs]
class NewsCollection(Pagination[News], AdjacencyListCollection[News]):
"""
Use it like this:
from onegov.page import NewsCollection
news = NewsCollection(session)
"""
def __init__(
self,
session: Session,
page: int = 0,
):
[docs]
def subset(self) -> Query[News]:
parent = PageCollection(self.session).by_path(
'/news/', ensure_type='news')
news = self.session.query(News)
if parent:
news = news.filter(Page.parent_id == parent.id)
news = news.filter(
News.publication_started == True,
News.publication_ended == False
)
news = news.order_by(desc(News.published_or_created))
news = news.options(undefer('created'))
news = news.options(undefer('content'))
return news
@property
[docs]
def page_index(self) -> int:
return self.page
[docs]
def page_by_index(self, index: int) -> Self:
return self.__class__(
self.session,
page=index
)
[docs]
class AtoZPages(AtoZ[Topic]):
[docs]
def get_title(self, item: Topic) -> str:
return item.title
[docs]
def get_items(self) -> list[Topic]:
# XXX implement correct collation support on the database level
topics = self.request.session.query(Topic).all()
topics = sorted(topics, key=self.sortkey)
if self.request.is_manager:
return [topic for topic in topics if topic.trait == 'page']
else:
return [
topic for topic in topics if topic.trait == 'page'
and topic.access == 'public'
]