from itertools import groupby
import re
import morepath
import transaction
from collections import OrderedDict, defaultdict
from onegov.core.html import html_to_text
from onegov.core.security import Public, Private, Secret
from onegov.core.templates import render_template
from onegov.core.utils import render_file
from onegov.directory import Directory
from onegov.directory import DirectoryCollection
from onegov.directory import DirectoryEntry
from onegov.directory import DirectoryZipArchive
from onegov.directory.archive import DirectoryFileNotFound
from onegov.directory.collections.directory import EntryRecipientCollection
from onegov.directory.errors import DuplicateEntryError
from onegov.directory.errors import MissingColumnError
from onegov.directory.errors import MissingFileError
from onegov.directory.errors import ValidationError
from onegov.directory.models.directory import EntrySubscription
from onegov.form import FormCollection, as_internal_id, move_fields
from onegov.form.errors import (
InvalidFormSyntax, MixedTypeError, DuplicateLabelError)
from onegov.form.fields import UploadField
from onegov.org import OrgApp, _
from onegov.org.forms import DirectoryForm, DirectoryImportForm
from onegov.org.forms.directory import DirectoryRecipientForm, DirectoryUrlForm
from onegov.org.forms.generic import ExportForm
from onegov.org.layout import (DefaultMailLayout, DirectoryCollectionLayout,
DefaultLayout)
from onegov.org.layout import DirectoryEntryCollectionLayout
from onegov.org.layout import DirectoryEntryLayout
from onegov.org.models import DirectorySubmissionAction
from onegov.org.models import ExtendedDirectory, ExtendedDirectoryEntry
from onegov.core.elements import Link
from purl import URL
from tempfile import NamedTemporaryFile
from webob import Response
from webob.exc import HTTPForbidden
from wtforms import TextAreaField
from wtforms.validators import InputRequired
from onegov.org.models.directory import ExtendedDirectoryEntryCollection
from typing import cast, Any, NamedTuple, TYPE_CHECKING
if TYPE_CHECKING:
from collections.abc import Mapping, Sequence, Iterator
from onegov.core.types import JSON_ro, RenderData, EmailJsonDict
from onegov.directory.models.directory import DirectoryEntryForm
from onegov.org.models.directory import ExtendedDirectoryEntryForm
from onegov.org.request import OrgRequest
from typing import type_check_only
@type_check_only
[docs]
class DirectoryEntryWithNumber(ExtendedDirectoryEntry):
[docs]
number: str | int | None
[docs]
def get_directory_entry_form_class(
model: ExtendedDirectoryEntry,
request: 'OrgRequest'
) -> type['DirectoryEntryForm']:
form_class = ExtendedDirectoryEntry().with_content_extensions(
model.directory.form_class, request)
class InternalNotesAndOptionalMapPublicationForm(
form_class # type:ignore
):
internal_notes = TextAreaField(
label=_('Internal Notes'),
fieldset=_('Administrative'),
render_kw={'rows': 7}
)
def on_request(self) -> None:
# just a little safety guard so we for sure don't skip
# an on_request call that should have been called
if hasattr(super(), 'on_request'):
super().on_request()
if not self.request.is_manager:
self.delete_field('internal_notes')
if model.directory.enable_map == 'no':
self.delete_field('coordinates')
if not model.directory.enable_publication and not request.is_admin:
self.delete_field('publication_start')
self.delete_field('publication_end')
elif model.directory.required_publication:
self.publication_start.validators[0] = InputRequired()
self.publication_end.validators[0] = InputRequired()
move_fields(
InternalNotesAndOptionalMapPublicationForm,
fields=(
'delete_when_expired',
),
after='publication_end'
)
return InternalNotesAndOptionalMapPublicationForm
@OrgApp.html(
model=DirectoryCollection,
template='directories.pt',
permission=Public)
[docs]
def view_directories(
self: DirectoryCollection[Any],
request: 'OrgRequest',
layout: DirectoryCollectionLayout | None = None
) -> 'RenderData':
return {
'title': _('Directories'),
'layout': layout or DirectoryCollectionLayout(self, request),
'directories': request.exclude_invisible(self.query()),
'link': lambda directory: request.link(
ExtendedDirectoryEntryCollection(
directory,
published_only=not request.is_manager
)
)
}
@OrgApp.view(model=Directory, permission=Public)
[docs]
def view_directory_redirect(
self: Directory,
request: 'OrgRequest'
) -> Response:
return request.redirect(request.class_link(
ExtendedDirectoryEntryCollection, {'directory_name': self.name}
))
@OrgApp.form(model=DirectoryCollection, name='new', template='form.pt',
permission=Secret, form=get_directory_form_class)
[docs]
def handle_new_directory(
self: DirectoryCollection[Any],
request: 'OrgRequest',
form: DirectoryForm,
layout: DirectoryCollectionLayout | None = None
) -> 'RenderData | Response':
if form.submitted(request):
try:
directory = self.add_by_form(form, properties=('configuration', ))
except DuplicateEntryError as e:
request.alert(_('The entry ${name} exists twice', mapping={
'name': e.name
}))
transaction.abort()
return request.redirect(request.link(self))
request.success(_('Added a new directory'))
return request.redirect(
request.link(ExtendedDirectoryEntryCollection(directory)))
layout = layout or DirectoryCollectionLayout(self, request)
layout.breadcrumbs = [
Link(_('Homepage'), layout.homepage_url),
Link(_('Directories'), request.link(self)),
Link(_('New'), request.link(self, name='new'))
]
layout.edit_mode = True
return {
'layout': layout,
'title': _('New Directory'),
'form': form,
'form_width': 'huge',
}
@OrgApp.form(model=ExtendedDirectoryEntryCollection, name='edit',
template='directory_form.pt', permission=Secret,
form=get_directory_form_class)
[docs]
def handle_edit_directory(
self: ExtendedDirectoryEntryCollection,
request: 'OrgRequest',
form: DirectoryForm,
layout: DirectoryCollectionLayout | None = None
) -> 'RenderData | Response':
migration = None
error = None
try:
if form.submitted(request):
save_changes = True
if self.directory.entries:
assert form.structure.data is not None
migration = self.directory.migration(
form.structure.data,
form.configuration
)
if migration.changes:
if not migration.possible:
save_changes = False
request.alert(_(
'The requested change cannot be performed, '
'as it is incompatible with existing entries'
))
else:
if not request.params.get('confirm'):
form.action += '&confirm=1'
save_changes = False
if save_changes:
form.populate_obj(self.directory)
try:
self.session.flush()
except ValidationError as e:
error = e
error.link = request.class_link( # type:ignore
DirectoryEntry,
{
'directory_name': self.directory.name,
'name': e.entry.name
}
)
transaction.abort()
else:
request.success(_('Your changes were saved'))
return request.redirect(request.link(self))
elif not request.POST:
form.process(obj=self.directory)
except InvalidFormSyntax as e:
request.warning(
_('Syntax Error in line ${line}', mapping={'line': e.line})
)
except AttributeError:
request.warning(_('Syntax error in form'))
except MixedTypeError as e:
request.warning(
_('Syntax error in field ${field_name}',
mapping={'field_name': e.field_name})
)
except DuplicateLabelError as e:
request.warning(
_('Error: Duplicate label ${label}', mapping={'label': e.label})
)
layout = layout or DirectoryCollectionLayout(self, request)
layout.edit_mode = True
layout.breadcrumbs = [
Link(_('Homepage'), layout.homepage_url),
Link(_('Directories'), request.link(self)),
Link(_(self.directory.title), request.link(self)),
Link(_('Edit'), '#')
]
return {
'layout': layout,
'title': self.directory.title,
'form': form,
'form_width': 'large',
'migration': migration,
'model': self,
'error': error,
'error_translate': lambda text: request.translate(_(text)),
'directory': self.directory,
}
@OrgApp.view(
model=ExtendedDirectoryEntryCollection,
permission=Secret,
request_method='DELETE'
)
[docs]
def delete_directory(
self: ExtendedDirectoryEntryCollection,
request: 'OrgRequest'
) -> None:
request.assert_valid_csrf_token()
session = request.session
if hasattr(self.directory, 'files'):
# unlink any linked files
self.directory.files = []
session.flush()
for entry in self.directory.entries:
session.delete(entry)
DirectoryCollection(session).delete(self.directory)
request.success(_('The directory was deleted'))
@OrgApp.form(
model=Directory,
name='change-url',
template='form.pt',
permission=Private,
form=DirectoryUrlForm
)
[docs]
def change_directory_url(
self: Directory,
request: 'OrgRequest',
form: DirectoryUrlForm,
layout: DefaultLayout | None = None
) -> 'RenderData | Response':
layout = layout or DefaultLayout(self, request)
assert isinstance(layout.breadcrumbs, list)
layout.breadcrumbs.append(Link(_('Change URL'), '#'))
form.delete_field('test')
if form.submitted(request):
assert form.name.data is not None
self.name = form.name.data
request.success(_('Your changes were saved'))
return morepath.redirect(request.link(self))
elif not request.POST:
form.process(obj=self)
return {
'layout': layout,
'form': form,
'title': _('Change URL'),
'callout': _(
'Stable URLs are important. Here you can change the path to your '
'site independently from the title.'
),
}
[docs]
class Filter(NamedTuple):
[docs]
def get_filters(
request: 'OrgRequest',
self: ExtendedDirectoryEntryCollection,
keyword_counts: 'Mapping[str, Mapping[str, int]] | None' = None,
view_name: str = ''
) -> list[Filter]:
filters = []
empty = ()
# FIXME: It seems kind of strange to make this dependent on the fields
# of the directory, shouldn't this depend on the type of the
# filter instead? Even if a directory can only have one value
# you should still be able to filter for two distinct types of
# entries. One could even argue that this should always be a
# multi-select, regardless of what the filter form declares.
radio_fields = {
f.id for f in self.directory.fields if f.type == 'radio'
}
def link_title(field_id: str, value: str) -> str:
if keyword_counts is None:
return value
count = keyword_counts.get(field_id, {}).get(value, 0)
return f'{value} ({count})'
for keyword, title, values in self.available_filters(sort_choices=False):
singular = keyword in radio_fields
filters.append(Filter(title=title, tags=tuple(
Link(
text=link_title(keyword, value),
active=value in self.keywords.get(keyword, empty),
url=request.link(
self.for_toggled_keyword_value(
keyword,
value,
singular=singular
),
name=view_name
),
rounded=singular
)
for value in values
if keyword_counts.get( # type:ignore[union-attr]
keyword, {}).get(value, 0)
)))
return filters
[docs]
def keyword_count(
request: 'OrgRequest',
collection: ExtendedDirectoryEntryCollection
) -> dict[str, dict[str, int]]:
self = collection
keywords = tuple(
as_internal_id(k)
for k in self.directory.configuration.keywords or ()
)
fields = {f.id: f for f in self.directory.fields if f.id in keywords}
counts: dict[str, dict[str, int]] = {}
# NOTE: The counting can get incredibly expensive with many entries
# so we should skip it when we know we can skip it
if not fields:
return counts
# FIXME: This is incredibly slow. We need to think of a better way.
for model in request.exclude_invisible(self.without_keywords().query()):
for entry in model.keywords:
field_id, value = entry.split(':', 1)
if field_id in fields:
f_count = counts.setdefault(field_id, defaultdict(int))
f_count[value] += 1
return counts
@OrgApp.html(
model=ExtendedDirectoryEntryCollection,
permission=Public,
template='directory.pt')
[docs]
def view_directory(
self: ExtendedDirectoryEntryCollection,
request: 'OrgRequest',
layout: DirectoryEntryCollectionLayout | None = None
) -> 'RenderData':
entries = request.exclude_invisible(self.query())
for i, e in enumerate(entries):
e = cast('DirectoryEntryWithNumber', e)
if self.directory.numbering == 'custom':
assert isinstance(self.directory.numbers, str)
e.number = e.content['values'].get(
as_internal_id(self.directory.numbers)) or 'x'
elif self.directory.numbering == 'standard':
e.number = i + 1
else:
e.number = None
keyword_counts = keyword_count(request, self)
filters = get_filters(request, self, keyword_counts)
layout = layout or DirectoryEntryCollectionLayout(self, request)
if request.is_manager:
layout.editbar_links.append(
Link(_('Recipients'), request.link(self, '+recipients'),
attrs={'class': 'manage-subscribers'}))
new_recipient_link = request.class_link(
ExtendedDirectoryEntryCollection, {
'directory_name': self.directory_name
}, name='+new-recipient'
)
return {
'layout': layout,
'title': self.directory.title,
'entries': entries,
'directory': self.directory,
'files': getattr(self.directory, 'files', None),
'search_widget': self.search_widget,
'new_recipient_link': new_recipient_link,
'filters': filters,
'geojson': request.link(self, name='+geojson'),
'submit': request.link(self, name='+submit'),
'show_thumbnails': layout.thumbnail_field_id and True or False,
'thumbnail_link': layout.thumbnail_link,
'overview_two_columns': self.directory.overview_two_columns
}
@OrgApp.json(
model=ExtendedDirectoryEntryCollection,
permission=Public,
name='geojson')
[docs]
def view_geojson(
self: ExtendedDirectoryEntryCollection,
request: 'OrgRequest'
) -> 'JSON_ro':
q = self.query().with_entities(
DirectoryEntry.id,
DirectoryEntry.name,
DirectoryEntry.title,
DirectoryEntry.lead,
DirectoryEntry.content['coordinates']['lat'].label('lat'),
DirectoryEntry.content['coordinates']['lon'].label('lon'),
DirectoryEntry.meta['access'].label('access'),
)
q = q.filter(DirectoryEntry.content['coordinates']['lat'] != None)
with_categories = request.params.get('with-categories', False)
if with_categories:
q = q.add_column(DirectoryEntry._keywords)
# this could be done using a query, but that seems to be more verbose
# FIXME: We should create a utility function that yields visibility
# based on role and access
if request.is_manager:
entries = q
else:
# guests are allowed to see public and mtan views
accesses = {
'public',
'mtan'
}
if request.current_username:
# but members can also see member views
accesses.add('member')
entries = (c for c in q if not c.access or c.access in accesses)
url_prefix = request.class_link(DirectoryEntry, {
'directory_name': self.directory.name,
'name': ''
})
# FIXME: For better type safety we should define a NamedTuple that
# matches our query above
def as_dict(entry: Any) -> dict[str, Any]:
result: dict[str, Any] = {
'type': 'Feature',
'properties': {
'name': entry.name,
'title': entry.title,
'lead': entry.lead,
'link': url_prefix + entry.name
},
'geometry': {
'coordinates': (entry.lon, entry.lat),
'type': 'Point'
}
}
if with_categories:
categories = defaultdict(list)
for item in entry._keywords.keys():
k, v = item.split(':', 1)
categories[k].append(v)
result['properties']['categories'] = categories
return result
return tuple(as_dict(e) for e in entries)
@OrgApp.form(
model=ExtendedDirectoryEntryCollection,
permission=Private,
template='form.pt',
form=get_directory_entry_form_class,
name='new')
[docs]
def handle_new_directory_entry(
self: ExtendedDirectoryEntryCollection,
request: 'OrgRequest',
form: 'DirectoryEntryForm',
layout: DirectoryEntryCollectionLayout | None = None
) -> 'RenderData | Response':
if form.submitted(request):
entry: ExtendedDirectoryEntry
try:
entry = self.directory.add_by_form( # type:ignore[assignment]
form,
type='extended'
)
except DuplicateEntryError as e:
request.alert(_('The entry ${name} exists twice', mapping={
'name': e.name
}))
transaction.abort()
return request.redirect(request.link(self))
# FIXME: if this entry is not yet published we will need to send
# a notification using some kind of cronjob, but we need
# to take care to only send it once, so we probably need
# to add a marker to entries to indicate that notifications
# have already been sent.
if self.directory.enable_update_notifications and entry.access in (
'public',
'mtan'
) and entry.published:
title = request.translate(_(
'${org}: New Entry in "${directory}"',
mapping={'org': request.app.org.title,
'directory': self.directory.title},
))
entry_link = request.link(entry)
recipients = EntryRecipientCollection(request.session).query(
).filter_by(directory_id=self.directory.id).filter_by(
confirmed=True).all()
def email_iter() -> 'Iterator[EmailJsonDict]':
for recipient in recipients:
unsubscribe = request.link(
recipient.subscription, 'unsubscribe')
content = render_template(
'mail_new_directory_entry.pt',
request,
{
'layout': DefaultMailLayout(object(), request),
'title': title,
'directory': self.directory,
'entry_title': entry.title,
'entry_link': entry_link,
'unsubscribe': unsubscribe
},
)
plaintext = html_to_text(content)
yield request.app.prepare_email(
receivers=(recipient.address,),
subject=title,
content=content,
plaintext=plaintext,
category='transactional',
attachments=(),
headers={
'List-Unsubscribe': f'<{unsubscribe}>',
'List-Unsubscribe-Post': (
'List-Unsubscribe=One-Click')
}
)
request.app.send_transactional_email_batch(email_iter())
request.success(_('Added a new directory entry'))
return request.redirect(request.link(entry))
if form.errors:
for field in form.match_fields(include_classes=(UploadField, )):
getattr(form, field).data = {}
layout = layout or DirectoryEntryCollectionLayout(self, request)
layout.include_code_editor()
layout.breadcrumbs.append(Link(_('New'), '#'))
layout.editbar_links = []
layout.edit_mode = True
return {
'layout': layout,
'title': _('New Directory Entry'),
'form': form,
}
@OrgApp.form(
model=DirectoryEntry,
permission=Private,
template='form.pt',
form=get_directory_entry_form_class,
name='edit')
[docs]
def handle_edit_directory_entry(
self: DirectoryEntry,
request: 'OrgRequest',
form: 'DirectoryEntryForm',
layout: DirectoryEntryLayout | None = None
) -> 'RenderData | Response':
if form.submitted(request):
form.populate_obj(self)
request.success(_('Your changes were saved'))
return request.redirect(request.link(self))
elif not request.POST:
form.process(obj=self)
# FIXME: Should we only register this view for ExtendedDirectoryEntry?
layout = layout or DirectoryEntryLayout(self, request) # type:ignore
layout.include_code_editor()
layout.breadcrumbs.append(Link(_('Edit'), '#'))
layout.editbar_links = []
layout.edit_mode = True
return {
'layout': layout,
'title': self.title,
'form': form,
}
@OrgApp.form(model=ExtendedDirectoryEntryCollection,
permission=Public,
template='directory_entry_submission_form.pt',
form=get_submission_form_class,
name='submit')
[docs]
def handle_submit_directory_entry(
self: ExtendedDirectoryEntryCollection,
request: 'OrgRequest',
form: 'ExtendedDirectoryEntryForm',
layout: DirectoryEntryCollectionLayout | None = None
) -> 'RenderData | Response':
title = _('Submit a New Directory Entry')
if form.submitted(request):
forms = FormCollection(request.session)
# required by the form submissions collection
form._source = self.directory.structure
# the price per submission
if self.directory.price == 'paid':
amount = self.directory.price_per_submission
else:
amount = 0.0
submission = forms.submissions.add_external(
form=form,
state='pending',
payment_method=self.directory.payment_method,
minimum_price_total=self.directory.minimum_price_total,
email=form.submitter.data,
meta={
'handler_code': 'DIR',
'directory': self.directory.id.hex,
'price': {
'amount': amount,
'currency': self.directory.currency
},
'extensions': tuple(
ext for ext in self.directory.extensions
if ext != 'submitter'
),
**form.submitter_meta
},
)
# remove old submission while we are at it
self.directory.remove_old_pending_submissions()
url = URL(request.link(submission))
url = url.query_param('title', request.translate(title))
return request.redirect(url.as_string())
layout = layout or DirectoryEntryCollectionLayout(self, request)
layout.include_code_editor()
layout.breadcrumbs.append(Link(title, '#'))
return {
'directory': self.directory,
'form': form,
'layout': layout,
'title': title,
'guideline': self.directory.submissions_guideline,
'button_text': _('Continue')
}
@OrgApp.form(model=ExtendedDirectoryEntry,
permission=Public,
template='directory_entry_submission_form.pt',
form=get_change_request_form_class,
name='change-request')
[docs]
def handle_change_request(
self: ExtendedDirectoryEntry,
request: 'OrgRequest',
form: 'ExtendedDirectoryEntryForm',
layout: DirectoryEntryLayout | None = None
) -> 'RenderData | Response':
title = _('Propose a change')
if form.submitted(request):
forms = FormCollection(request.session)
# required by the form submissions collection
form._source = self.directory.structure
extensions = [
ext for ext in self.directory.extensions if ext != 'submitter']
extensions.append('change-request')
submission = forms.submissions.add_external(
form=form,
state='pending',
email=form.submitter.data,
meta={
'handler_code': 'DIR',
'directory': self.directory.id.hex,
'directory_entry': self.id.hex,
'extensions': extensions,
**form.submitter_meta
}
)
# remove old submission while we are at it
self.directory.remove_old_pending_submissions()
url = URL(request.link(submission))
url = url.query_param('title', request.translate(title))
return request.redirect(url.as_string())
elif not request.POST:
form.process(obj=self)
layout = layout or DirectoryEntryLayout(self, request)
layout.include_code_editor()
layout.breadcrumbs.append(Link(title, '#'))
layout.editbar_links = []
return {
'directory': self.directory,
'form': form,
'layout': layout,
'title': title,
'hint': _(
'To request a change, edit the fields you would like to change, '
'leaving the other fields intact. Then submit your request.'
),
'guideline': self.directory.change_requests_guideline,
'button_text': _('Continue')
}
@OrgApp.html(
model=ExtendedDirectoryEntry,
permission=Public,
template='directory_entry.pt')
[docs]
def view_directory_entry(
self: ExtendedDirectoryEntry,
request: 'OrgRequest',
layout: DirectoryEntryLayout | None = None
) -> 'RenderData':
directory = self.directory
siblings = request.exclude_invisible(ExtendedDirectoryEntryCollection(
directory,
published_only=not request.is_manager
).query())
prev_entry: ExtendedDirectoryEntry | bool
next_entry: ExtendedDirectoryEntry | bool
more_entries: bool
if self not in siblings:
# we don't know where we're at within the collection so don't
# show anything
prev_entry = next_entry = more_entries = False
else:
entry_index = siblings.index(self)
prev_entry = siblings[entry_index - 1] if entry_index != 0 else False
next_entry = siblings[
entry_index + 1] if entry_index != len(siblings) - 1 else False
more_entries = bool(prev_entry or next_entry)
return {
'layout': layout or DirectoryEntryLayout(self, request),
'title': self.title,
'entry': self,
'more_entries': more_entries,
'prev_entry': prev_entry,
'next_entry': next_entry
}
@OrgApp.view(
model=DirectoryEntry,
permission=Private,
request_method='DELETE')
[docs]
def delete_directory_entry(
self: DirectoryEntry,
request: 'OrgRequest'
) -> None:
request.assert_valid_csrf_token()
session = request.session
session.delete(self)
request.success(_('The entry was deleted'))
@OrgApp.form(model=ExtendedDirectoryEntryCollection,
permission=Public, name='export',
template='export.pt', form=ExportForm)
[docs]
def view_export(
self: ExtendedDirectoryEntryCollection,
request: 'OrgRequest',
form: ExportForm,
layout: DirectoryEntryCollectionLayout | None = None
) -> 'RenderData | Response':
if not request.is_visible(self.directory):
return HTTPForbidden()
layout = layout or DirectoryEntryCollectionLayout(self, request)
layout.breadcrumbs.append(Link(_('Export'), '#'))
layout.editbar_links = None # type:ignore[assignment]
if form.submitted(request):
url = URL(request.link(self, '+zip'))
url = url.query_param('format', form.format)
return request.redirect(url.as_string())
filters = get_filters(request, self, keyword_count(request, self),
view_name='+export')
if filters:
pretext = _('On the right side, you can filter the entries of this '
'directory to export.')
else:
pretext = _('Exports all entries of this directory.')
return {
'layout': layout,
'title': _('Export'),
'form': form,
'explanation': f'{request.translate(pretext)} ' + request.translate(_(
'The resulting zipfile contains the selected format as well '
'as metadata and images/files if the directory contains any.'
)),
'filters': filters,
'count': len(request.exclude_invisible(self.query().all()))
}
@OrgApp.view(model=ExtendedDirectoryEntryCollection,
permission=Public, name='zip')
[docs]
def view_zip_file(
self: ExtendedDirectoryEntryCollection,
request: 'OrgRequest'
) -> Response:
if not request.is_visible(self.directory):
return HTTPForbidden()
layout = DirectoryEntryCollectionLayout(self, request)
format = request.params.get('format')
if not isinstance(format, str):
format = 'json'
formatter = layout.export_formatter(format)
def transform(key: object, value: object) -> tuple[Any, Any]:
return formatter(key), formatter(value)
with NamedTemporaryFile() as f:
archive = DirectoryZipArchive(f.name + '.zip', format, transform)
try:
archive.write(
self.directory,
entry_filter=request.exclude_invisible,
query=self.query()
)
except DirectoryFileNotFound as err:
entry = self.by_name(err.entry_name)
entry_url = request.link(entry, name='edit')
request.alert(
_('You have been redirect to this entry because '
'it could not be exported due to missing file ${name}. '
'Please re-upload them and try again',
mapping={'name': err.filename})
)
return request.redirect(entry_url)
response = render_file(str(archive.path), request)
filename = ' '.join((
self.directory.name, layout.format_date(layout.now(), 'datetime')))
filename = re.sub(r'[\.:]+', '-', filename)
filename = filename + '.zip'
response.headers['Content-Disposition'] = (
f'attachment; filename="{filename}"')
return response
@OrgApp.form(model=ExtendedDirectoryEntryCollection,
permission=Private, name='import',
template='directory_import.pt', form=DirectoryImportForm)
[docs]
def view_import(
self: ExtendedDirectoryEntryCollection,
request: 'OrgRequest',
form: DirectoryImportForm,
layout: DirectoryEntryCollectionLayout | None = None
) -> 'RenderData | Response':
error = None
layout = layout or DirectoryEntryCollectionLayout(self, request)
layout.breadcrumbs.append(Link(_('Import'), '#'))
layout.editbar_links = None # type:ignore[assignment]
if form.submitted(request):
try:
imported = form.run_import(target=self.directory)
except MissingColumnError as e:
field = self.directory.field_by_id(e.column)
assert field is not None
request.alert(_('The column ${name} is missing', mapping={
'name': field.human_id
}))
except MissingFileError as e:
request.alert(_('The file ${name} is missing', mapping={
'name': e.name
}))
except DuplicateEntryError as e:
request.alert(_('The entry ${name} exists twice', mapping={
'name': e.name
}))
except ValidationError as e:
error = e
except NotImplementedError:
request.alert(_(
'The given file is invalid, does it include a metadata.json '
'with a data.xlsx, data.csv, or data.json?'
))
else:
notify = request.success if imported else request.warning
notify(_('Imported ${count} entries', mapping={
'count': imported
}))
return request.redirect(request.link(self))
# no success if we land here
transaction.abort()
return {
'layout': layout,
'title': _('Import'),
'form': form,
'explanation': _(
'Updates the directory configuration and imports all entries '
'given in the ZIP file. The format is the same as produced by '
'the export function. Note that only 100 items are imported at a '
'time. To import more items repeat the import accordingly.'
),
'directory': self.directory,
'error': error,
'error_translate': lambda text: request.translate(_(text)),
}
@OrgApp.view(
model=DirectorySubmissionAction,
permission=Private,
request_method='POST'
)
[docs]
def execute_submission_action(
self: DirectorySubmissionAction,
request: 'OrgRequest'
) -> None:
self.execute(request)
@OrgApp.form(model=ExtendedDirectoryEntryCollection,
permission=Public, name='new-recipient',
template='form.pt', form=DirectoryRecipientForm)
[docs]
def new_recipient(
self: ExtendedDirectoryEntryCollection,
request: 'OrgRequest',
form: DirectoryRecipientForm,
layout: DirectoryEntryCollectionLayout | None = None,
mail_layout: DefaultMailLayout | None = None
) -> 'RenderData | Response':
layout = layout or DirectoryEntryCollectionLayout(self, request)
layout.breadcrumbs.append(Link(_('New Recipient'), '#'))
layout.editbar_links = []
if form.submitted(request):
assert form.address.data is not None
recipients = EntryRecipientCollection(request.session)
recipient = recipients.query().filter_by(
directory_id=self.directory.id).filter_by(
address=form.address.data).first()
# do not show a specific error message if the user already signed up,
# just pretend like everything worked correctly - if someone signed up
# or not is private
if not recipient:
recipient = recipients.add(address=form.address.data,
directory_id=self.directory.id)
unsubscribe = request.link(recipient.subscription, 'unsubscribe')
title = request.translate(
_('Registration for notifications on new entries in the '
'directory "${directory}"',
mapping={
'directory': self.directory.title
})
)
confirm_mail = render_template(
'mail_confirm_directory_subscription.pt',
request, {
'layout': mail_layout or DefaultMailLayout(self, request),
'directory': self.directory,
'subscription': recipient.subscription,
'title': title,
'unsubscribe': unsubscribe
})
request.app.send_transactional_email(
subject=title,
receivers=(recipient.address, ),
content=confirm_mail,
headers={
'List-Unsubscribe': f'<{unsubscribe}>',
'List-Unsubscribe-Post': 'List-Unsubscribe=One-Click'
},
)
request.success(_((
"Success! We have sent a confirmation link to "
"${address}, if we didn't send you one already."
), mapping={'address': form.address.data}))
return request.redirect(request.link(self))
return {
'layout': layout,
'title': _('Notification for new entries'),
'form': form,
}
@OrgApp.html(
model=ExtendedDirectoryEntryCollection,
permission=Private, name='recipients',
template='directory_entry_recipients.pt'
)
[docs]
def view_directory_entry_update_recipients(
self: ExtendedDirectoryEntryCollection,
request: 'OrgRequest',
layout: DirectoryEntryCollectionLayout | None = None
) -> 'RenderData | Response':
# i18n:attributes translations do not support variables, so we need
# to do this ourselves
warning = request.translate(_('Do you really want to unsubscribe "{}"?'))
recipients = EntryRecipientCollection(request.session).query().filter_by(
directory_id=self.directory.id).filter_by(confirmed=True).all()
by_letter = OrderedDict()
for key, values in groupby(recipients, key=lambda r: r.address[0].upper()):
by_letter[key] = list(values)
layout = layout or DirectoryEntryCollectionLayout(self, request)
layout.breadcrumbs.append(Link(_('Recipients of new entry updates'), '#'))
layout.editbar_links = []
return {
'layout': layout,
'title': _('Recipients of new entry updates'),
'recipients': recipients,
'warning': warning,
'by_letter': by_letter,
}
# use an english name for this view, so robots know what we use it for
@OrgApp.view(model=EntrySubscription, name='confirm', permission=Public)
[docs]
def view_confirm(
self: EntrySubscription, request: 'OrgRequest'
) -> Response:
if self.confirm():
request.success(_(
'the subscription for ${address} was successfully confirmed',
mapping={'address': self.recipient.address}
))
else:
request.alert(_(
'the subscription for ${address} could not be confirmed, '
'wrong token',
mapping={'address': self.recipient.address}
))
return morepath.redirect(
request.link(DirectoryCollection(request.session).by_id(
self.recipient.directory_id
))
)
# use an english name for this view, so robots know what we use it for
@OrgApp.view(model=EntrySubscription, name='unsubscribe', permission=Public)
[docs]
def view_unsubscribe(
self: EntrySubscription,
request: 'OrgRequest'
) -> Response:
# RFC-8058: just return an empty response on a POST request
# don't check for success
if request.method == 'POST':
self.unsubscribe()
return Response()
address = self.recipient.address
if self.unsubscribe():
request.success(_(
'${address} successfully unsubscribed',
mapping={'address': address}
))
else:
request.alert(_(
'${address} could not be unsubscribed, wrong token',
mapping={'address': address}
))
return morepath.redirect(
request.link(DirectoryCollection(request.session).by_id(
self.recipient.directory_id
))
)
# RFC-8058: respond to POST requests as well
@OrgApp.view(
model=EntrySubscription,
name='unsubscribe',
permission=Public,
request_method='POST'
)
[docs]
def view_unsubscribe_rfc8058(
self: EntrySubscription,
request: 'OrgRequest'
) -> Response:
# it doesn't really make sense to check for success here
# since this is an automated action without verficiation
self.unsubscribe()
return Response()