from onegov.api import ApiApp
from onegov.api.models import ApiEndpoint, ApiException, AuthEndpoint
from onegov.api.models import ApiEndpointCollection
from onegov.api.models import ApiEndpointItem
from onegov.api.token import get_token
from onegov.api.utils import authenticate, check_rate_limit
from onegov.core.security import Public
from onegov.form.fields import HoneyPotField
from webob.exc import HTTPMethodNotAllowed, HTTPNotFound, HTTPUnauthorized
from wtforms import HiddenField
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from collections.abc import Generator, Sequence
from onegov.core.request import CoreRequest
from onegov.core.types import JSONObject
from morepath.request import Response
from wtforms.form import _FormErrors
@ApiApp.json(model=ApiException, permission=Public)
[docs]
def handle_exception(
self: ApiException, request: 'CoreRequest'
) -> dict[str, dict[str, dict[str, Any] | str]]:
@request.after
def add_headers(response: 'Response') -> None:
response.status_code = self.status_code
response.headers['Content-Type'] = 'application/vnd.collection+json'
for name, value in self.headers.items():
response.headers.add(name, value)
return {
'collection': {
'version': '1.0',
'href': request.url,
'error': {'message': self.message}
}
}
@ApiApp.json(
model=ApiEndpointCollection,
permission=Public
)
[docs]
def view_api_endpoints(
self: ApiEndpointCollection, request: 'CoreRequest'
) -> dict[str, Any]:
@request.after
def add_headers(response: 'Response') -> None:
response.headers['Content-Type'] = 'application/vnd.collection+json'
return {
'collection': {
'version': '1.0',
'href': request.link(self),
'queries': [
{
'href': request.link(endpoint(request.app)),
'rel': endpoint.endpoint,
'data': [
{'name': name}
for name in getattr(endpoint, 'filters', [])
]
}
for endpoint in self.endpoints.values()
]
}
}
@ApiApp.json(
model=ApiEndpoint,
permission=Public
)
[docs]
def view_api_endpoint(
self: 'ApiEndpoint[Any]', request: 'CoreRequest'
) -> dict[str, Any]:
headers = check_rate_limit(request)
@request.after
def add_headers(response: 'Response') -> None:
response.headers['Content-Type'] = 'application/vnd.collection+json'
try:
payload: dict[str, JSONObject] = {
'collection': {
'version': '1.0',
'href': request.link(self.for_filter()),
'links': [
{
'rel': rel,
'href': request.link(item) if item else None
}
for rel, item in self.links.items()
],
'items': [
{
'href': request.link(target),
'data': [
{
'name': name,
'value': value
}
for name, value in self.item_data(item).items()
],
'links': [
{
'rel': name,
'href': (
link if not link or isinstance(link, str)
else request.link(link)
),
}
for name, link in self.item_links(item).items()
]
}
for target, item in self.batch.items()
],
}
}
if form := self.form(None, request):
payload['collection']['template'] = {
'data': [
{
'name': field.name,
'prompt': field.gettext(field.label.text)
}
for field in form
if not isinstance(field, (HiddenField, HoneyPotField))
]
}
return payload
except Exception as exception:
raise ApiException(exception=exception, headers=headers) from exception
@ApiApp.json(
model=ApiEndpointItem,
permission=Public
)
[docs]
def view_api_endpoint_item(
self: ApiEndpointItem[Any], request: 'CoreRequest'
) -> dict[str, Any]:
headers = check_rate_limit(request)
@request.after
def add_headers(response: 'Response') -> None:
response.headers['Content-Type'] = 'application/vnd.collection+json'
try:
endpoint = self.api_endpoint
assert endpoint is not None
links = self.links or {}
data = self.data or {}
# make sure we are actually supposed to be able to see this
# the API shouldn't include invisible items either (for now)
if (item := self.item) and not request.is_visible(item):
raise HTTPNotFound()
payload: dict[str, JSONObject] = {
'collection': {
'version': '1.0',
'href': request.link(endpoint),
'items': [
{
'href': request.link(self),
'data': [
{
'name': name,
'value': value
}
for name, value in data.items()
],
'links': [
{
'rel': rel,
'href': (
link if not link or isinstance(link, str)
else request.link(link)
),
}
for rel, link in links.items()
]
}
],
}
}
if form := self.form(request):
payload['collection']['template'] = {
'data': [
{
'name': field.name,
'prompt': field.gettext(field.label.text)
}
for field in form
if not isinstance(field, (HiddenField, HoneyPotField))
]
}
return payload
except Exception as exception:
raise ApiException(exception=exception, headers=headers) from exception
@ApiApp.json(
model=ApiEndpointItem,
permission=Public,
request_method='PUT'
)
[docs]
def edit_api_endpoint_item(
self: ApiEndpointItem[Any], request: 'CoreRequest'
) -> None:
endpoint = self.api_endpoint
assert endpoint is not None
try:
form = self.form(request)
if form is None:
raise HTTPMethodNotAllowed()
if not request.is_logged_in:
api_key = authenticate(request)
if api_key.read_only:
raise HTTPUnauthorized()
# make sure we are actually supposed to be able to see this
# the API shouldn't include invisible items either (for now)
if (item := self.item) and not request.is_visible(item):
raise HTTPNotFound()
def walk_errors(
errors: 'Sequence[str] | _FormErrors',
prefix: str | None
) -> 'Generator[tuple[str | None, str]]':
if isinstance(errors, dict):
for suffix, errs in errors.items():
yield from walk_errors(
errs,
suffix if prefix is None else f'{prefix}.{suffix}'
)
else:
for error in errors:
yield prefix, error
if not form.validate():
raise ApiException(
', '.join(
f'{field_name}: {error}' if field_name else error
for prefix, errors in form.errors.items()
for field_name, error in walk_errors(errors, prefix)
),
status_code=400
)
endpoint.apply_changes(self.item, form)
except Exception as exception:
raise ApiException(exception=exception) from exception
@ApiApp.json(model=AuthEndpoint, permission=Public)
[docs]
def get_time_restricted_token(
self: AuthEndpoint, request: 'CoreRequest'
) -> dict[str, str]:
try:
if request.authorization is None:
raise HTTPUnauthorized()
return get_token(request)
except Exception as exception:
raise ApiException(exception=exception) from exception