from __future__ import annotations
from collections import defaultdict
import uuid
from datetime import datetime
from sqlalchemy import distinct
from onegov.core.elements import BackLink, Confirm, Intercooler, Link
from onegov.core.security import Private
from onegov.pas import _
from onegov.pas import PasApp
from onegov.pas.collections import (
AttendenceCollection,
PASParliamentarianCollection,
SettlementRunCollection,
)
from onegov.pas.custom import (
validate_attendance_date,
notify_admins_finalized,
)
from onegov.pas.forms import AttendenceAddCommissionBulkForm, AttendenceAddForm
from onegov.pas.forms import AttendenceAddPlenaryForm
from onegov.pas.forms import AttendenceForm
from onegov.pas.forms.attendence import (AttendenceCommissionBulkEditForm,
AttendencePlenaryBulkEditForm)
from onegov.pas.layouts import AttendenceCollectionLayout
from onegov.pas.layouts import AttendenceLayout
from onegov.pas.models import Attendence
from onegov.pas.models import Change
from onegov.pas.models import SettlementRun
from onegov.pas.models.attendence import TYPES
from onegov.pas.models.commission_membership import PASCommissionMembership
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from onegov.core.types import RenderData
from onegov.town6.request import TownRequest
from onegov.pas.request import PasRequest
from webob import Response
@PasApp.html(
model=AttendenceCollection,
template='attendences.pt',
permission=Private
)
[docs]
def view_attendences(
self: AttendenceCollection,
request: PasRequest
) -> RenderData:
layout = AttendenceCollectionLayout(self, request)
attendences = self.query_for_current_user(request)
groups: dict[str, list[Attendence]] = defaultdict(list)
for a in attendences:
if a.bulk_edit_id:
groups[str(a.bulk_edit_id)].append(a)
bulk_edit_groups = sorted(
groups.values(),
key=lambda g: max(a.modified or a.created for a in g),
reverse=True,
)
attendences_sorted = sorted(
attendences,
key=lambda a: a.created or a.modified or datetime.min,
reverse=True,
)
type_filters = [
Link(
text=request.translate(_('All')),
active=self.type is None,
url=request.link(
self.for_filter(
settlement_run_id=self.settlement_run_id,
date_from=self.date_from,
date_to=self.date_to,
type=None,
parliamentarian_id=self.parliamentarian_id,
commission_id=self.commission_id,
party_id=self.party_id,
plenary_date=self.plenary_date,
)
),
)
]
for key, label in TYPES.items():
if not request.is_admin and key == 'plenary':
continue
type_filters.append(
Link(
text=request.translate(label),
active=self.type == key,
url=request.link(
self.for_filter(
settlement_run_id=self.settlement_run_id,
date_from=self.date_from,
date_to=self.date_to,
type=key,
parliamentarian_id=self.parliamentarian_id,
commission_id=self.commission_id,
party_id=self.party_id,
plenary_date=self.plenary_date,
)
),
)
)
settlement_runs = (
SettlementRunCollection(request.session).query().all()
)
run_filters = [
Link(
text=request.translate(_('All')),
active=self.settlement_run_id is None,
url=request.link(
self.for_filter(
settlement_run_id='all',
date_from=self.date_from,
date_to=self.date_to,
type=self.type,
parliamentarian_id=self.parliamentarian_id,
commission_id=self.commission_id,
party_id=self.party_id,
plenary_date=self.plenary_date,
)
),
)
]
for run in settlement_runs:
run_id = str(run.id)
run_filters.append(
Link(
text=run.name,
active=self.settlement_run_id == run_id,
url=request.link(
self.for_filter(
settlement_run_id=run_id,
date_from=self.date_from,
date_to=self.date_to,
type=self.type,
parliamentarian_id=self.parliamentarian_id,
commission_id=self.commission_id,
party_id=self.party_id,
plenary_date=self.plenary_date,
)
),
)
)
plenary_filters: list[Link] = []
if request.is_admin:
plenary_dates = (
request.session.query(distinct(Attendence.date))
.filter(Attendence.type == 'plenary')
.order_by(Attendence.date.desc())
.all()
)
plenary_filters = [
Link(
text=request.translate(_('All')),
active=self.plenary_date is None,
url=request.link(
self.for_filter(
settlement_run_id=self.settlement_run_id,
date_from=self.date_from,
date_to=self.date_to,
type=self.type,
parliamentarian_id=(self.parliamentarian_id),
commission_id=self.commission_id,
party_id=self.party_id,
plenary_date=None,
)
),
)
]
for (d,) in plenary_dates:
plenary_filters.append(
Link(
text=layout.format_date(d, 'date'),
active=self.plenary_date == d,
url=request.link(
self.for_filter(
settlement_run_id=(self.settlement_run_id),
date_from=self.date_from,
date_to=self.date_to,
type=self.type,
parliamentarian_id=(self.parliamentarian_id),
commission_id=(self.commission_id),
party_id=self.party_id,
plenary_date=d,
)
),
)
)
edit_links: dict[uuid.UUID, str] = {}
for a in attendences_sorted:
if a.bulk_edit_id:
name = (
'edit-plenary-bulk-attendences'
if a.type == 'plenary'
else 'edit-commission-bulk-attendences'
)
edit_links[a.id] = request.link(a, name)
else:
edit_links[a.id] = request.link(a)
return {
'add_link': request.link(self, name='new'),
'layout': layout,
'attendences': attendences_sorted,
'edit_links': edit_links,
'title': layout.title,
'bulk_edit_groups': bulk_edit_groups,
'can_edit_attendences': request.is_admin,
'filters': {
'settlement_run': run_filters,
'type': type_filters,
'plenary_session': plenary_filters,
},
}
@PasApp.form(
model=AttendenceCollection,
name='new',
template='form.pt',
form=AttendenceAddForm
)
[docs]
def add_attendence(
self: AttendenceCollection,
request: PasRequest,
form: AttendenceAddForm
) -> RenderData | Response:
request.include('custom')
if form.submitted(request):
if form.date.data:
if error := validate_attendance_date(
request.session, form.date.data
):
request.alert(error)
return {
'layout': AttendenceCollectionLayout(self, request),
'title': _('New attendence'),
'form': form,
'form_width': 'large',
}
attendence = self.add(**form.get_useful_data())
Change.add(request, 'add', attendence)
request.success(_('Added a new attendence'))
return request.redirect(request.link(attendence))
layout = AttendenceCollectionLayout(self, request)
layout.breadcrumbs.append(Link(_('New'), '#'))
layout.edit_mode = True
return {
'layout': layout,
'title': _('New attendence'),
'form': form,
'form_width': 'large'
}
@PasApp.form(
model=AttendenceCollection,
name='new-commission-bulk',
template='form.pt',
permission=Private,
form=AttendenceAddCommissionBulkForm
)
[docs]
def add_bulk_attendence(
self: AttendenceCollection,
request: PasRequest,
form: AttendenceAddCommissionBulkForm
) -> RenderData | Response:
request.include('custom')
if not request.is_admin and not request.is_commission_president:
request.alert(_('Only admins can add bulk attendance records.'))
return request.redirect(request.class_link(AttendenceCollection))
title = _('New commission session (bulk)')
if form.submitted(request):
if form.date.data:
if error := validate_attendance_date(
request.session, form.date.data
):
request.alert(error)
return {
'layout': AttendenceCollectionLayout(self, request),
'title': title,
'form': form,
'form_width': 'large',
}
data = form.get_useful_data()
if raw_parl_ids := request.POST.getall('parliamentarian_id'):
data.pop('parliamentarian_id', None)
bulk_edit_id = uuid.uuid4()
notify_attendence = None
for parliamentarian_id in raw_parl_ids:
attendence = self.add(
parliamentarian_id=parliamentarian_id, **data
)
attendence.bulk_edit_id = bulk_edit_id
Change.add(request, 'add', attendence)
if attendence.abschluss and notify_attendence is None:
notify_attendence = attendence
if notify_attendence is not None:
notify_admins_finalized(request, notify_attendence)
else:
request.warning(_('No parliamentarians selected'))
return request.redirect(request.class_link(AttendenceCollection))
request.success(_('Added session'))
return request.redirect(request.link(self))
layout = AttendenceCollectionLayout(self, request)
layout.breadcrumbs.append(Link(title, '#'))
layout.edit_mode = True
return {
'layout': layout,
'title': title,
'form': form,
'form_width': 'large',
}
@PasApp.form(
model=Attendence,
name='edit-plenary-bulk-attendences',
template='form.pt',
permission=Private,
form=AttendencePlenaryBulkEditForm
)
[docs]
def edit_plenary_bulk_attendence(
self: Attendence,
request: TownRequest,
form: AttendencePlenaryBulkEditForm
) -> RenderData | Response:
request.include('custom')
if not request.is_admin:
request.alert(
_('You do not have permission to edit plenary sessions.')
)
return request.redirect(request.class_link(AttendenceCollection))
all_parliamentarians = [
str(parliamentarian.id)
for parliamentarian
in PASParliamentarianCollection(
request.app, active=[True]).query()
]
if form.submitted(request):
# Check if attendance date is in a closed settlement run
if form.date.data:
if error := validate_attendance_date(
request.session, form.date.data
):
request.alert(error)
return {
'layout': AttendenceCollectionLayout(self, request),
'title': _('Edit bulk: plenary session'),
'form': form,
'form_width': 'large'
}
data = form.get_useful_data()
raw_parl_ids = request.POST.getall('parliamentarian_id')
data.pop('parliamentarian_id', None)
collection = AttendenceCollection(request.session)
unselected_parliamentarians = [
pid for pid in all_parliamentarians
if pid not in raw_parl_ids
]
for parliamentarian in unselected_parliamentarians:
if attendence := collection.query().filter(
Attendence.parliamentarian_id == parliamentarian,
Attendence.bulk_edit_id == form.bulk_edit_id.data,
).first():
Change.add(request, 'delete', attendence)
collection.delete(attendence)
for parliamentarian_id in raw_parl_ids:
attendence = collection.query().filter(
Attendence.parliamentarian_id == parliamentarian_id,
Attendence.bulk_edit_id == form.bulk_edit_id.data,
).first()
if attendence:
form.populate_obj(attendence)
Change.add(request, 'edit', attendence)
else:
attendence = collection.add(
parliamentarian_id=parliamentarian_id, **data
)
Change.add(request, 'add', attendence)
request.success(_('Edited attendences'))
return request.redirect(request.class_link(AttendenceCollection))
elif not request.POST:
form.process(obj=self)
layout = AttendenceCollectionLayout(self, request)
layout.breadcrumbs = [
Link(_('Homepage'), layout.homepage_url),
Link(_('Attendences'), request.class_link(AttendenceCollection)),
Link(_('Edit session'), '#')
]
layout.edit_mode = True
layout.editmode_links[1] = BackLink(
attrs={'class': 'cancel-link'},
)
layout.editmode_links.append(
Link(
text=_('Delete'),
url=layout.csrf_protected_url(
request.link(self, name='delete-attendences')),
traits=(
Confirm(
_(
'Do you really want to delete this bulk edit?'
),
_('This cannot be undone.'),
_('Delete Attendencess'),
_('Cancel')
),
Intercooler(
request_method='DELETE',
redirect_after=request.class_link(AttendenceCollection)
)),
attrs={'class': 'delete-link'},
)
)
return {
'layout': layout,
'title': _('Edit bulk: plenary session'),
'form': form,
'form_width': 'large'
}
@PasApp.form(
model=Attendence,
name='edit-commission-bulk-attendences',
template='form.pt',
permission=Private,
form=AttendenceCommissionBulkEditForm
)
[docs]
def edit_commission_bulk_attendence(
self: Attendence,
request: TownRequest,
form: AttendenceCommissionBulkEditForm
) -> RenderData | Response:
request.include('custom')
if not request.is_admin:
request.alert(_('Only admins can edit attendance records.'))
return request.redirect(request.class_link(AttendenceCollection))
type_label = request.translate(self.type_label)
title = _('Edit bulk: ${type}', mapping={'type': type_label})
if form.submitted(request):
if form.date.data:
if error := validate_attendance_date(
request.session, form.date.data
):
request.alert(error)
return {
'layout': AttendenceCollectionLayout(self, request),
'title': title,
'form': form,
'form_width': 'large'
}
data = form.get_useful_data()
if raw_parl_ids := request.POST.getall('parliamentarian_id'):
data.pop('parliamentarian_id', None)
collection = AttendenceCollection(request.session)
memberships = request.session.query(
PASCommissionMembership
).filter(
PASCommissionMembership.commission_id
== form.commission_id.data
).all()
commission_parliamentarians = [
str(membership.parliamentarian_id)
for membership in memberships
]
unselected_parliamentarians = [
pid for pid in commission_parliamentarians
if pid not in raw_parl_ids
]
for parliamentarian in unselected_parliamentarians:
if attendence := collection.query().filter(
Attendence.parliamentarian_id == parliamentarian,
Attendence.bulk_edit_id == form.bulk_edit_id.data,
).first():
Change.add(request, 'delete', attendence)
collection.delete(attendence)
notify_attendence = None
for parliamentarian_id in raw_parl_ids:
attendence = collection.query().filter(
Attendence.parliamentarian_id == parliamentarian_id,
Attendence.bulk_edit_id == form.bulk_edit_id.data,
).first()
if attendence:
was_abschluss = attendence.abschluss
form.populate_obj(attendence)
Change.add(request, 'edit', attendence)
if (
attendence.abschluss
and not was_abschluss
and notify_attendence is None
):
notify_attendence = attendence
else:
attendence = collection.add(
parliamentarian_id=parliamentarian_id, **data
)
Change.add(request, 'add', attendence)
if attendence.abschluss and notify_attendence is None:
notify_attendence = attendence
if notify_attendence is not None:
notify_admins_finalized(request, notify_attendence)
request.success(_('Edited attendences'))
return request.redirect(request.class_link(AttendenceCollection))
elif not request.POST:
form.process(obj=self)
layout = AttendenceCollectionLayout(self, request)
layout.breadcrumbs = [
Link(_('Homepage'), layout.homepage_url),
Link(_('Attendences'), request.class_link(AttendenceCollection)),
Link(_('Edit session'), '#')
]
layout.edit_mode = True
layout.editmode_links[1] = BackLink(
attrs={'class': 'cancel-link'},
)
layout.editmode_links.append(
Link(
text=_('Delete'),
url=layout.csrf_protected_url(
request.link(self, name='delete-attendences')),
traits=(
Confirm(
_(
'Do you really want to delete this bulk edit?'
),
_('This cannot be undone.'),
_('Delete Attendencess'),
_('Cancel')
),
Intercooler(
request_method='DELETE',
redirect_after=request.class_link(AttendenceCollection)
)),
attrs={'class': 'delete-link'},
)
)
return {
'layout': layout,
'title': title,
'form': form,
'form_width': 'large'
}
@PasApp.form(
model=AttendenceCollection,
name='new-bulk',
template='form.pt',
form=AttendenceAddPlenaryForm
)
[docs]
def add_plenary_attendence(self: AttendenceCollection,
request: PasRequest,
form: AttendenceAddPlenaryForm
) -> RenderData | Response:
request.include('custom')
if not request.is_admin:
request.alert(_('You do not have permission to add plenary sessions.'))
return request.redirect(request.link(self))
if form.submitted(request):
# Check if attendance date is in a closed settlement run
if form.date.data:
if error := validate_attendance_date(
request.session, form.date.data
):
request.alert(error)
return {
'layout': AttendenceCollectionLayout(self, request),
'title': _('New plenary session'),
'form': form,
'form_width': 'large'
}
data = form.get_useful_data()
if raw_parl_ids := request.POST.getall('parliamentarian_id'):
# Remove static field; choices are set dynamically via JS
data.pop('parliamentarian_id', None)
bulk_edit_id = uuid.uuid4()
for parliamentarian_id in raw_parl_ids:
attendence = self.add(
parliamentarian_id=parliamentarian_id, **data
)
attendence.bulk_edit_id = bulk_edit_id
Change.add(request, 'add', attendence)
request.success(_('Added plenary session'))
return request.redirect(request.link(self))
layout = AttendenceCollectionLayout(self, request)
layout.breadcrumbs.append(Link(_('New plenary session'), '#'))
layout.edit_mode = True
return {
'layout': layout,
'title': _('New plenary session'),
'form': form,
'form_width': 'large'
}
@PasApp.html(
model=Attendence,
template='attendence.pt',
permission=Private
)
[docs]
def view_attendence(
self: Attendence,
request: PasRequest
) -> RenderData:
layout = AttendenceLayout(self, request)
return {
'layout': layout,
'attendence': self,
'title': layout.title,
}
@PasApp.form(
model=Attendence,
name='edit',
template='form.pt',
form=AttendenceForm,
permission=Private
)
[docs]
def edit_attendence(
self: Attendence,
request: PasRequest,
form: AttendenceForm
) -> RenderData | Response:
if not request.is_admin:
request.alert(_('Only admins can edit attendance records.'))
return request.redirect(request.class_link(AttendenceCollection))
if self.bulk_edit_id:
name = (
'edit-plenary-bulk-attendences'
if self.type == 'plenary'
else 'edit-commission-bulk-attendences'
)
return request.redirect(request.link(self, name))
if form.submitted(request):
if form.date.data:
if error := validate_attendance_date(
request.session, form.date.data
):
request.alert(error)
return {
'layout': AttendenceCollectionLayout(self, request),
'title': _('New plenary session'),
'form': form,
'form_width': 'large'
}
was_abschluss = self.abschluss
form.populate_obj(self)
Change.add(request, 'edit', self)
if self.abschluss and not was_abschluss:
notify_admins_finalized(request, self)
request.success(_('Your changes were saved'))
return request.redirect(request.link(self))
form.process(obj=self)
layout = AttendenceLayout(self, request)
layout.breadcrumbs.append(Link(_('Edit'), '#'))
layout.edit_mode = True
return {
'layout': layout,
'title': layout.title,
'form': form,
'form_width': 'large'
}
@PasApp.view(
model=Attendence,
request_method='DELETE',
)
[docs]
def delete_attendence(
self: Attendence,
request: PasRequest
) -> None:
request.assert_valid_csrf_token()
if not request.is_admin:
request.alert(_('Only admins can delete attendance records.'))
return
if self.bulk_edit_id:
request.alert(_('Cannot delete individual bulk attendance.'))
return
# Check if attendance is in a closed settlement run
settlement_run = request.session.query(SettlementRun).filter(
SettlementRun.start <= self.date,
SettlementRun.end >= self.date,
SettlementRun.closed == True
).first()
if settlement_run:
request.alert(
_('Cannot delete attendance in closed settlement run.')
)
return
Change.add(request, 'delete', self)
collection = AttendenceCollection(request.session)
collection.delete(self)
@PasApp.view(
model=Attendence,
name='delete-attendences',
request_method='DELETE',
permission=Private
)
[docs]
def delete_attendences(
self: Attendence,
request: TownRequest
) -> None:
request.assert_valid_csrf_token()
if not request.is_admin:
request.alert(_('Only admins can delete attendance records.'))
return
attendences = request.session.query(Attendence).filter(
Attendence.bulk_edit_id == self.bulk_edit_id
).all()
# Check if any attendance is in a closed settlement run
for attendence in attendences:
settlement_run = request.session.query(SettlementRun).filter(
SettlementRun.start <= attendence.date,
SettlementRun.end >= attendence.date,
SettlementRun.closed == True
).first()
if settlement_run:
request.alert(
_('Cannot delete attendance in closed settlement run.')
)
return
for attendence in attendences:
Change.add(request, 'delete', attendence)
collection = AttendenceCollection(request.session)
collection.delete(attendence)
request.success(_('Deleted ${count} attendeces', mapping={
'count': len(attendences)
}))