Source code for agency.utils

import operator
from email.headerregistry import Address
from markupsafe import Markup
from sqlalchemy import func

from onegov.core.mail import coerce_address
from onegov.people.models import Agency, Person

from typing import Literal
from typing import TYPE_CHECKING
    from import Iterable
    from import Iterator
    from datetime import datetime
    from import AgencyRequest
    from onegov.core.orm.mixins import TimestampMixin
    from onegov.user import RoleMapping
    from sqlalchemy.orm import Query
    from typing import TypeVar

[docs] _T = TypeVar('_T')
[docs] def handle_empty_p_tags(html: Markup) -> Markup: return html if html != Markup('<p></p>') else Markup('')
[docs] def emails_for_new_ticket( model: Agency | Person, request: 'AgencyRequest' ) -> 'Iterator[Address]': """ Returns an iterator with all the unique email addresses that need to be notified for a new ticket of this type """ agencies: Iterable[Agency] if isinstance(model, Agency): agencies = (model, ) handler_code = 'AGN' elif isinstance(model, Person): agencies = ( for membership in model.memberships) handler_code = 'PER' else: raise NotImplementedError() seen = set() if request.email_for_new_tickets: # adding this to seen ensures it does not receive two emails address = coerce_address(request.email_for_new_tickets) seen.add(address.addr_spec) yield address # we need to match the access permission behavior of the tickets # so we can figure out if a group filter needs to be applied # we do this by getting the relevant list of groupids, if there # is no relevant list groupids will be None, and we don't need # to filter the groups at all. permissions =, {}) if hasattr(model, 'group') and in permissions: groupids: list[str] | None = permissions[] else: groupids = permissions.get(None) # we try to minimize the amount of e-mail address parsing we # perform by de-duplicating the raw usernames as we get them role_mapping: RoleMapping for agency in agencies: for role_mapping in getattr(agency, 'role_mappings', ()): if role_mapping.role != 'editor': continue # we only care about group role mappings group = if group is None: continue # if the group does not have permission to manage this # type of ticket then we need to skip it if groupids is not None and not in groupids: continue # if the group does not have immediate notification # turned on, then skip it if not group.meta: continue if group.meta.get('immediate_notification') != 'yes': continue for user in group.users: # we already yielded this Address if user.username in seen: continue seen.add(user.username) try: yield Address( display_name=user.realname or '', addr_spec=user.username ) except ValueError: # if it's not a valid address then skip it pass
[docs] def get_html_paragraph_with_line_breaks(text: str | None) -> Markup: if not text: return Markup('') return Markup('<p>{}</p>').format( Markup('<br>').join(line for line in str(text).splitlines()) )
[docs] def filter_modified_or_created( query: 'Query[_T]', relate: Literal['>', '<', '>=', '<=', '=='], # FIXME: This is a bit lax about types, SQLAlchemy is doing the heavy # lifting here, auto casting ISO formatted date strings comparison_property: 'datetime | str', collection_class: type['TimestampMixin'] ) -> 'Query[_T]': ops = { '>':, '<':, '>=':, '<=': operator.le, '==': operator.eq, } return query.filter( ops[relate]( func.date_trunc('minute', collection_class.last_change), comparison_property ) )