Source code for org.forms.newsletter

from datetime import timedelta
import transaction
from wtforms.validators import DataRequired
from onegov.core.csv import convert_excel_to_csv, CSVFile
from onegov.form.fields import UploadField
from onegov.form.validators import FileSizeLimit
from onegov.form.validators import WhitelistedMimeType
from wtforms.fields import BooleanField
from onegov.core.layout import Layout
from onegov.file.utils import name_without_extension
from onegov.form import Form
from onegov.form.fields import ChosenSelectField
from onegov.form.fields import DateTimeLocalField
from onegov.form.fields import MultiCheckboxField
from onegov.newsletter import Recipient, RecipientCollection
from onegov.org import _
from sedate import replace_timezone, to_timezone, utcnow
from wtforms.fields import RadioField
from wtforms.fields import StringField
from wtforms.fields import TextAreaField
from wtforms.validators import InputRequired
from wtforms.validators import ValidationError
from markupsafe import Markup


from typing import Any, TYPE_CHECKING

from onegov.org.utils import extract_categories_and_subcategories

if TYPE_CHECKING:
    from collections.abc import Iterable
    from onegov.core.csv import DefaultRow
    from onegov.file import File
    from onegov.event.models import Occurrence
    from onegov.org.models import News
    from onegov.org.request import OrgRequest
    from onegov.newsletter.models import Newsletter
    from typing import Self
    from wtforms.fields.choices import _Choice


[docs] class NewsletterForm(Form):
[docs] title = StringField( label=_('Title'), description=_('Used in the overview and the e-mail subject'), validators=[InputRequired()])
[docs] lead = TextAreaField( label=_('Editorial'), description=_('A few words about this edition of the newsletter'), render_kw={'rows': 6})
# FIXME: Why are we passing the request in? It should alread be stored on # the form itself.
[docs] def update_model(self, model: 'Newsletter', request: 'OrgRequest') -> None: assert self.title.data is not None model.title = self.title.data model.lead = self.lead.data model.html = self.get_html(request)
[docs] def apply_model(self, model: 'Newsletter') -> None: self.title.data = model.title self.lead.data = model.lead
# FIXME: same here
[docs] def get_html(self, request: 'OrgRequest') -> str: return ''
@classmethod
[docs] def with_news( cls, request: 'OrgRequest', news: 'Iterable[News]' ) -> type['Self']: # FIXME: using a layout just for format_date seems bad, we should # probably extract these functions into util modules layout = Layout(None, request) choices = tuple( ( str(item.id), Markup( '<div class="title">{}</div>' '<div class="date">{}</div>' ).format( item.title, layout.format_date(item.created, 'relative') ) ) for item in news ) if not choices: return cls class NewsletterWithNewsForm(cls): # type:ignore news = MultiCheckboxField( label=_('Latest news'), choices=choices, render_kw={ 'prefix_label': False, 'class_': 'recommended' } ) show_news_as_tiles = BooleanField( label=_('Show news as tiles'), description=_( 'If checked, news are displayed as tiles. Otherwise, ' 'news are listed in full length.'), default=True ) def update_model( self, model: 'Newsletter', request: 'OrgRequest' # FIXME: same here ) -> None: super().update_model(model, request) model.content['news'] = self.news.data model.content['show_news_as_tiles'] = ( self.show_news_as_tiles.data) def apply_model(self, model: 'Newsletter') -> None: super().apply_model(model) self.news.data = model.content.get('news') self.show_news_as_tiles.data = model.content.get( 'show_news_as_tiles', True) return NewsletterWithNewsForm
@classmethod
[docs] def with_occurrences( cls, request: 'OrgRequest', occurrences: 'Iterable[Occurrence]' ) -> type['Self']: # FIXME: another use of layout for format_date layout = Layout(None, request) choices = tuple( ( str(item.id), Markup( '<div class="title">{}</div>' '<div class="date">{}</div>' ).format( item.title, layout.format_date(item.localized_start, 'datetime') ) ) for item in occurrences ) if not choices: return cls class NewsletterWithOccurrencesForm(cls): # type:ignore occurrences = MultiCheckboxField( label=_('Events'), choices=choices, render_kw={ 'prefix_label': False, 'class_': 'recommended' } ) def update_model( self, model: 'Newsletter', request: 'OrgRequest' # FIXME: same here ) -> None: super().update_model(model, request) model.content['occurrences'] = self.occurrences.data def apply_model(self, model: 'Newsletter') -> None: super().apply_model(model) self.occurrences.data = model.content.get('occurrences') return NewsletterWithOccurrencesForm
@classmethod
[docs] def with_publications( cls, request: 'OrgRequest', publications: 'Iterable[File]' ) -> type['Self']: # FIXME: another use of layout for format_date layout = Layout(None, request) choices = tuple( ( str(item.id), Markup( '<div class="title">{}</div>' '<div class="date">{}</div>' ).format( name_without_extension(item.name), layout.format_date(item.created, 'date') ) ) for item in publications ) if not choices: return cls class NewsletterWithPublicationsForm(cls): # type:ignore publications = MultiCheckboxField( label=_('Publications'), choices=choices, render_kw={ 'prefix_label': False, 'class_': 'recommended' } ) def update_model( self, model: 'Newsletter', request: 'OrgRequest' # FIXME: same here ) -> None: super().update_model(model, request) model.content['publications'] = self.publications.data def apply_model(self, model: 'Newsletter') -> None: super().apply_model(model) self.publications.data = model.content.get('publications') return NewsletterWithPublicationsForm
[docs] class NewsletterSendForm(Form): if TYPE_CHECKING:
[docs] request: OrgRequest
[docs] categories = MultiCheckboxField( label=_('Categories'), description=_('Select categories the newsletter reports on. The ' 'users will receive the newsletter only if it ' 'reports on at least one of the categories the user ' 'subscribed to.'), choices=[] )
[docs] send = RadioField( _('Send'), choices=( ('now', _('Now')), ('specify', _('At a specified time')) ), default='now' )
[docs] time = DateTimeLocalField( label=_('Time'), validators=[InputRequired()], depends_on=('send', 'specify') )
[docs] def validate_time(self, field: DateTimeLocalField) -> None: if not field.data: return # FIXME: We should probably store the timezone on the request # and make the attribute on Layout a pure forwarding # of the one on the request from onegov.org.layout import DefaultLayout # XXX circular import layout = DefaultLayout(self.model, self.request) if self.send.data == 'specify': time = replace_timezone(field.data, layout.timezone) time = to_timezone(time, 'UTC') if time < (utcnow() + timedelta(seconds=60 * 5)): raise ValidationError(_( 'Scheduled time must be at least 5 minutes in the future' )) if time.minute != 0: raise ValidationError(_( 'Newsletters can only be sent on the hour ' '(10:00, 11:00, etc.)' )) self.time.data = time
[docs] def on_request(self) -> None: choices: list[_Choice] = [] categories, subcategories = extract_categories_and_subcategories( self.request.app.org.newsletter_categories) for cat, sub in zip(categories, subcategories): choices.append((cat, cat)) choices.extend((s, f'\xa0\xa0\xa0{s}') for s in sub) self.categories.choices = choices
[docs] class NewsletterTestForm(Form):
[docs] selected_recipient = ChosenSelectField( label=_('Recipient'), choices=[], )
@property
[docs] def recipient(self) -> Recipient: return ( self.request.session.query(Recipient) .filter_by(id=self.selected_recipient.data) .one() )
[docs] def on_request(self) -> None: recipients = ( self.request.session.query(Recipient) .with_entities(Recipient.id, Recipient.address) .filter_by(confirmed=True) ) self.selected_recipient.choices = [ (r.id.hex, r.address) for r in recipients ]
[docs] class NewsletterSubscriberImportExportForm(Form):
[docs] dry_run = BooleanField( label=_('Dry Run'), description=_('Do not actually import the newsletter subscribers'), default=False )
[docs] file = UploadField( label=_('Import'), validators=[ DataRequired(), WhitelistedMimeType({ 'application/excel', 'application/vnd.ms-excel', ( 'application/' 'vnd.openxmlformats-officedocument.spreadsheetml.sheet' ), 'application/vnd.ms-office', 'application/octet-stream', 'application/zip', 'text/csv', 'text/plain', }), FileSizeLimit(10 * 1024 * 1024) ], render_kw={'force_simple': True} )
@property
[docs] def headers(self) -> dict[str, str]: return { 'address': self.request.translate(_('Address')), 'confirmed': self.request.translate(_('Confirmed')), }
[docs] def run_export(self) -> list[dict[str, Any]]: recipients = RecipientCollection( self.request.session).ordered_by_status_address() headers = self.headers def get(recipient: Recipient, attribute: str) -> Any: result = getattr(recipient, attribute, '') if isinstance(result, str): return result.strip() elif attribute == 'confirmed': return bool(result) else: return result return [ { v: get(recipient, k) for k, v in headers.items() } for recipient in recipients ]
[docs] def run_import(self) -> tuple[int, list[str]]: headers = self.headers session = self.request.session recipients = RecipientCollection(session) try: assert self.file.file is not None csvfile = convert_excel_to_csv(self.file.file) except Exception: return 0, ['Error converting file'] try: # dialect needs to be set, else error csv = CSVFile(csvfile, dialect='excel') except Exception: return 0, ['Error reading CSV file'] lines = list(csv.lines) columns = { key: csv.as_valid_identifier(value) for key, value in headers.items() } def get(line: 'DefaultRow', column: str) -> Any: return getattr(line, column) count = 0 skipped = 0 errors = [] for number, line in enumerate(lines, start=1): try: kwargs = { attribute: get(line, column) for attribute, column in columns.items() } kwargs['confirmed'] = True address = next(iter(kwargs.values())) if recipients.by_address(address): # silently skip duplicates skipped += 1 continue recipients.add(**kwargs) count += 1 except Exception as e: error_msg = f'Error on line {number}: {e!s}' errors.append(error_msg) if self.dry_run.data: transaction.abort() summary = [f'Imported: {count}, Skipped duplicates: {skipped}'] if errors: summary.append(f'Errors: {len(errors)}') summary.extend(errors) return count, summary