Source code for core.directives

from __future__ import annotations

import os.path

from dectate import Action, Query
from itertools import count
from morepath.directive import HtmlAction
from onegov.core.utils import Bunch


from typing import Any, ClassVar, TypeVar, TYPE_CHECKING
if TYPE_CHECKING:
    from _typeshed import StrOrBytesPath
    from collections.abc import Callable
    from webob import Response
    from wtforms import Form

    from .request import CoreRequest


[docs] _T = TypeVar('_T')
[docs] _FormT = TypeVar('_FormT', bound='Form')
[docs] _RequestT = TypeVar('_RequestT', bound='CoreRequest')
[docs] class HtmlHandleFormAction(HtmlAction): """ Register Form view. Basically wraps the Morepath's ``html`` directive, registering both POST and GET (if no specific request method is given) and wrapping the view handler with :func:`wrap_with_generic_form_handler`. The form is either a class or a function. If it's a function, it is expected to return a form class when given an instance of the model. The form may also be None, which is useful under special circumstances. Generally you don't want that though. Example: .. code-block:: python @App.form(model=Root, template='form.pt', permission=Public, form=LoginForm) def handle_form(self, request, form): if form.submitted(): # do something if the form was submitted with valid data else: # do something if the form was not submitted or not # submitted correctly return {} # template variables """ def __init__( self, model: type | str, form: type[Form] | Callable[[Any, _RequestT], type[Form]], render: Callable[[Any, _RequestT], Response] | str | None = None, template: StrOrBytesPath | None = None, load: Callable[[_RequestT], Any] | str | None = None, permission: object | str | None = None, internal: bool = False, **predicates: Any ):
[docs] self.form = form
super().__init__(model, render, template, load, permission, internal, **predicates)
[docs] def perform( self, obj: Callable[[Any, _RequestT, Any], Any], *args: Any, **kwargs: Any ) -> None: wrapped = wrap_with_generic_form_handler(obj, self.form) # if a request method is given explicitly, we honor it if 'request_method' in self.predicates: return super().perform(wrapped, *args, **kwargs) # otherwise we register ourselves twice, once for each method predicates = self.predicates.copy() self.predicates['request_method'] = 'GET' super().perform(wrapped, *args, **kwargs) self.predicates['request_method'] = 'POST' super().perform(wrapped, *args, **kwargs) self.predicates = predicates
[docs] def fetch_form_class( form_class: type[_FormT] | Callable[[Any, _RequestT], type[_FormT]], model: object, request: _RequestT ) -> type[_FormT]: """ Given the form_class defined with the form action, together with model and request, this function returns the actual class to be used. """ if isinstance(form_class, type): return form_class else: return form_class(model, request)
[docs] def query_form_class( request: _RequestT, model: object, name: str | None = None ) -> type[Form] | None: """ Queries the app configuration for the form class associated with the given model and name. Take this configuration for example:: @App.form(model=Model, form_class=Form, name='foobar') ... The form class defined here can be retrieved as follows: query_form_class(request, model=Model, name='foobar') """ appcls = request.app.__class__ action = appcls.form.action_factory assert issubclass(action, HtmlHandleFormAction) for a, fn in Query(action)(appcls): if not isinstance(a, action): continue if a.key_dict().get('name') == name: return fetch_form_class( a.form, # type:ignore[arg-type] model, request ) return None
[docs] def wrap_with_generic_form_handler( obj: Callable[[_T, _RequestT, _FormT], Any], form_class: type[_FormT] | Callable[[_T, _RequestT], type[_FormT]] ) -> Callable[[_T, _RequestT], Any]: """ Wraps a view handler with generic form handling. This includes instantiating the form with translations/csrf protection and setting the correct action. """ def handle_form(self: _T, request: _RequestT) -> Any: _class = fetch_form_class(form_class, self, request) if _class: form = request.get_form(_class, model=self) form.action = request.url # type: ignore[attr-defined] else: # FIXME: This seems potentially bad, do we actually ever want # to handle a missing form within the view? If we don't # we could just throw an exception here... form = None return obj(self, request, form) # type:ignore[arg-type] return handle_form
[docs] class CronjobAction(Action): """ Register a cronjob. """
[docs] config = { 'cronjob_registry': Bunch }
[docs] counter: ClassVar = count(1)
def __init__( self, hour: int | str, minute: int | str, timezone: str, once: bool = False ):
[docs] self.hour = hour
[docs] self.minute = minute
[docs] self.timezone = timezone
[docs] self.name = next(self.counter)
[docs] self.once = once
[docs] def identifier(self, **kw: Any) -> int: return self.name
[docs] def perform( # type:ignore[override] self, func: Callable[[CoreRequest], Any], cronjob_registry: Bunch ) -> None: from onegov.core.cronjobs import register_cronjob register_cronjob( registry=cronjob_registry, function=func, hour=self.hour, minute=self.minute, timezone=self.timezone, once=self.once)
[docs] class StaticDirectoryAction(Action): """ Registers a static files directory. """
[docs] config = { 'staticdirectory_registry': Bunch }
[docs] counter: ClassVar = count(1)
def __init__(self) -> None:
[docs] self.name = next(self.counter)
[docs] def identifier( # type:ignore[override] self, staticdirectory_registry: Bunch ) -> int: return self.name
[docs] def perform( # type:ignore[override] self, func: Callable[..., Any], staticdirectory_registry: Bunch ) -> None: if not hasattr(staticdirectory_registry, 'paths'): staticdirectory_registry.paths = [] path = func() if not os.path.isabs(path): assert self.code_info is not None path = os.path.join(os.path.dirname(self.code_info.path), path) staticdirectory_registry.paths.append(path)
[docs] class TemplateVariablesRegistry:
[docs] __slots__ = ('callbacks',)
def __init__(self) -> None:
[docs] self.callbacks: list[Callable[[CoreRequest], dict[str, Any]]] = []
[docs] def get_variables( self, request: CoreRequest, base: dict[str, Any] | None = None ) -> dict[str, Any]: base = base or {} for callback in self.callbacks: base.update(callback(request)) return base
[docs] class TemplateVariablesAction(Action): """ Registers a set of global template variables for chameleon templates. Only exists once per application. Template variables with conflicting keys defined in child applications override the keys with the same name in the parent application. Non-conflicting keys are kept individually. Example:: @App.template_variables() def get_template_variables(request): return { 'foo': 'bar' } """
[docs] config = { 'templatevariables_registry': TemplateVariablesRegistry }
[docs] counter: ClassVar = count(1)
def __init__(self) -> None: # XXX I would expect this to work with a static name (and it does in # tests), but in real world usage the same name leads to overriden # paths
[docs] self.name = next(self.counter)
[docs] def identifier( # type:ignore[override] self, templatevariables_registry: TemplateVariablesRegistry ) -> int: return self.name
[docs] def perform( # type:ignore[override] self, func: Callable[[CoreRequest], dict[str, Any]], templatevariables_registry: TemplateVariablesRegistry ) -> None: templatevariables_registry.callbacks.append(func)