from __future__ import annotations
from functools import cached_property
from onegov.api.models import ApiEndpoint, ApiEndpointItem
from onegov.event.collections import OccurrenceCollection
from onegov.gis import Coordinates
from sqlalchemy.exc import SQLAlchemyError
from typing import Any, Self
from typing import TYPE_CHECKING
from onegov.org.models.directory import (ExtendedDirectory,
ExtendedDirectoryEntry,
ExtendedDirectoryEntryCollection)
from onegov.org.models.page import News, NewsCollection, Topic, TopicCollection
if TYPE_CHECKING:
from onegov.town6.app import TownApp
from onegov.town6.request import TownRequest
from onegov.event.models import Occurrence
from onegov.core.orm.mixins import ContentMixin
from onegov.core.orm.mixins import TimestampMixin
from typing import TypeVar
from onegov.core.collection import PKType
[docs]
def get_geo_location(item: ContentMixin) -> dict[str, Any]:
geo = item.content.get('coordinates', Coordinates()) or Coordinates()
return {'lon': geo.lon, 'lat': geo.lat, 'zoom': geo.zoom}
[docs]
class EventApiEndpoint(ApiEndpoint['Occurrence']):
@property
[docs]
def collection(self) -> Any:
result = OccurrenceCollection(
self.session,
page=self.page or 0,
only_public=True
)
result.batch_size = self.batch_size
return result
[docs]
def item_data(self, item: Occurrence) -> dict[str, Any]:
return {
'title': item.title,
'description': item.event.description,
'organizer': item.event.organizer,
'organizer_email': item.event.organizer_email,
'organizer_phone': item.event.organizer_phone,
'external_event_url': item.event.external_event_url,
'event_registration_url': item.event.event_registration_url,
'price': item.event.price,
'tags': item.event.tags,
'start': item.start.isoformat(),
'end': item.end.isoformat(),
'location': item.location,
'coordinates': get_geo_location(item),
'created': item.created.isoformat(),
'modified': get_modified_iso_format(item),
}
[docs]
def item_links(self, item: Occurrence) -> dict[str, Any]:
return {
'html': item,
'image': item.event.image,
'pfd': item.event.pdf
}
[docs]
class NewsApiEndpoint(ApiEndpoint[News]):
@property
[docs]
def collection(self) -> Any:
result = NewsCollection(
self.request,
page=self.page or 0,
)
result.batch_size = 25
return result
[docs]
def item_data(self, item: News) -> dict[str, Any]:
if item.publication_start:
publication_start = item.publication_start.isoformat()
else:
publication_start = None
if item.publication_end:
publication_end = item.publication_end.isoformat()
else:
publication_end = None
return {
'title': item.title,
'lead': item.lead,
'text': item.text,
'hashtags': item.hashtags,
'publication_start': publication_start,
'publication_end': publication_end,
'created': item.created.isoformat(),
'modified': get_modified_iso_format(item),
}
[docs]
def item_links(self, item: News) -> dict[str, Any]:
return {
'html': item,
'image': item.page_image or None,
}
[docs]
class TopicApiEndpoint(ApiEndpoint[Topic]):
@property
[docs]
def collection(self) -> Any:
result = TopicCollection(
self.session,
page=self.page or 0,
only_public=True
)
result.batch_size = 25
return result
[docs]
def item_data(self, item: Topic) -> dict[str, Any]:
if item.publication_start:
publication_start = item.publication_start.isoformat()
else:
publication_start = None
if item.publication_end:
publication_end = item.publication_end.isoformat()
else:
publication_end = None
return {
'title': item.title,
'lead': item.lead,
'text': item.text,
'publication_start': publication_start,
'publication_end': publication_end,
'created': item.created.isoformat(),
'modified': get_modified_iso_format(item),
}
[docs]
def item_links(self, item: Topic) -> dict[str, Any]:
return {
'html': item,
'image': item.page_image or None,
'parent': ApiEndpointItem(
self.request, self.endpoint, str(item.parent_id)
) if item.parent_id is not None else None,
}
[docs]
class DirectoryEntryApiEndpoint(ApiEndpoint[ExtendedDirectoryEntry]):
[docs]
request: TownRequest
def __init__(
self,
request: TownRequest,
name: str,
extra_parameters: dict[str, str | None] | None = None,
page: int | None = None,
):
super().__init__(request, extra_parameters, page)
self.endpoint = name
@cached_property
[docs]
def directory(self) -> ExtendedDirectory:
return self.request.session.query(ExtendedDirectory).filter_by(
name=self.endpoint
).one()
@property
[docs]
def collection(self) -> Any:
result = ExtendedDirectoryEntryCollection(
self.directory,
page=self.page or 0,
published_only=True
)
result.batch_size = 25
return result
[docs]
def for_page(self, page: int | None) -> DirectoryEntryApiEndpoint:
""" Return a new endpoint instance with the given page while keeping
the current filters.
"""
return self.__class__(self.request, self.endpoint,
self.extra_parameters, page)
[docs]
def for_filter(self, **filters: Any) -> Self:
""" Return a new endpoint instance with the given filters while
discarding the current filters and page.
"""
return self.__class__(self.request, self.endpoint, filters)
[docs]
def by_id(
self, id: PKType
) -> ExtendedDirectoryEntry | None:
""" Return the item with the given ID from the collection. """
try:
return self.collection.by_id(id)
except SQLAlchemyError:
return None
[docs]
def item_data(self, item: ExtendedDirectoryEntry) -> dict[str, Any]:
data: dict[str, Any] = {}
data['title'] = item.title
data['lead'] = item.lead
if item.access == 'public':
if item.content_fields:
data = {f.name: f.object_data for f in item.content_fields}
for field in item.directory.fields:
if any(field_type in field.type for field_type in [
'fileinput', 'url']):
data.pop(field.id, None)
if any(field_type in field.type for field_type in [
'date', 'time']):
if data.get(field.id):
data[field.id] = data[field.id].isoformat()
if 'decimal_range' in field.type:
if data.get(field.id) is not None:
data[field.id] = float(data[field.id])
data['coordinates'] = get_geo_location(item)
data['contact'] = item.contact
return data
[docs]
def item_links(self, item: ExtendedDirectoryEntry) -> dict[str, Any]:
data = {}
if item.access == 'public':
content_field_names = []
if item.content_fields:
content_field_names = [i.name for i in item.content_fields]
data = {
(file.note or 'file'): file
for file in item.files
if file.note
if file.note.split(':', 1)[0] in content_field_names
}
for field in item.content_fields or []:
if field.type == 'URLField':
data[field.name] = field.data
data['html'] = item # type: ignore
return data