Source code for org.cronjobs

from __future__ import annotations
import traceback
from collections import OrderedDict

import requests
import logging
from babel.dates import get_month_names
from datetime import datetime, timedelta
from functools import lru_cache
from itertools import groupby

from onegov.chat.collections import ChatCollection
from onegov.chat.models import Chat
from onegov.core.orm import find_models, Base
from onegov.core.orm.mixins.publication import UTCPublicationMixin
from onegov.core.templates import render_template
from onegov.directory.collections.directory import EntryRecipientCollection
from onegov.event import Occurrence, Event
from onegov.file import FileCollection
from onegov.form import FormSubmission, parse_form, Form
from onegov.newsletter.models import Recipient
from onegov.org.mail import send_ticket_mail
from onegov.newsletter import (Newsletter, NewsletterCollection,
                               RecipientCollection)
from onegov.org import _, OrgApp
from onegov.org.layout import DefaultMailLayout
from onegov.org.models import (
    ResourceRecipient,
    ResourceRecipientCollection,
    TANAccess,
    News,
    PushNotification
)
from onegov.org.models.extensions import (
    GeneralFileLinkExtension, DeletableContentExtension)
from onegov.org.models.ticket import ReservationHandler
from onegov.gever.encrypt import decrypt_symmetric
from cryptography.fernet import InvalidToken
from sqlalchemy.exc import IntegrityError
from onegov.org.views.allocation import handle_rules_cronjob
from onegov.org.views.directory import (
    send_email_notification_for_directory_entry)
from onegov.org.views.newsletter import send_newsletter
from onegov.org.views.ticket import delete_tickets_and_related_data
from onegov.reservation import Reservation, Resource, ResourceCollection
from onegov.search import Searchable
from onegov.ticket import Ticket, TicketCollection
from onegov.org.models import TicketMessage, ExtendedDirectoryEntry
from onegov.user import User, UserCollection
from onegov.user.models import TAN
from sedate import to_timezone, utcnow, align_date_to_day
from sqlalchemy import and_, or_, func
from sqlalchemy.orm import undefer
from uuid import UUID
from onegov.org.notification_service import (
    get_notification_service,
)


from typing import Any, TYPE_CHECKING
if TYPE_CHECKING:
    from collections.abc import Iterator
    from onegov.core.types import RenderData
    from sqlalchemy.orm import Session
    from sqlalchemy.orm import Query

    from onegov.org.request import OrgRequest


[docs] log = logging.getLogger('onegov.org.cronjobs')
[docs] MON = 0
[docs] TUE = 1
[docs] WED = 2
[docs] THU = 3
[docs] FRI = 4
[docs] SAT = 5
[docs] SUN = 6
[docs] WEEKDAYS = ( 'MO', 'TU', 'WE', 'TH', 'FR', 'SA', 'SU', )
@OrgApp.cronjob(hour='*', minute=0, timezone='UTC')
[docs] def hourly_maintenance_tasks(request: OrgRequest) -> None: publish_files(request) handle_publication_models(request) send_scheduled_newsletter(request) delete_old_tans(request) delete_old_tan_accesses(request)
[docs] def send_scheduled_newsletter(request: OrgRequest) -> None: newsletters = NewsletterCollection(request.session).query().filter(and_( Newsletter.scheduled != None, Newsletter.scheduled <= (utcnow() + timedelta(seconds=60)), )) for newsletter in newsletters: send_newsletter(request, newsletter, newsletter.open_recipients) newsletter.scheduled = None
[docs] def publish_files(request: OrgRequest) -> None: FileCollection(request.session).publish_files()
[docs] def handle_publication_models(request: OrgRequest) -> None: """ Reindexes all recently published/unpublished objects in the elasticsearch database. For pages it also updates the propagated access to any associated files. For directory entries it also sends out e-mail notifications if published within the last hour. """ if not hasattr(request.app, 'es_client'): return def publication_models( base: type[Base] # NOTE: This should be Iterator[type[Base & UTCPublicationMixin]] ) -> Iterator[type[UTCPublicationMixin]]: yield from find_models(base, lambda cls: issubclass( # type:ignore cls, UTCPublicationMixin) ) objects = set() session = request.app.session() now = utcnow() then = request.app.org.meta.get('hourly_maintenance_tasks_last_run', now - timedelta(hours=1)) for base in request.app.session_manager.bases: for model in publication_models(base): query = session.query(model).filter( or_( and_( then <= model.publication_start, now >= model.publication_start ), and_( then <= model.publication_end, now >= model.publication_end ) ) ) objects.update(query.all()) for obj in objects: if isinstance(obj, GeneralFileLinkExtension): # manually invoke the files observer which updates access obj.files_observer(obj.files, set(), None, None) if isinstance(obj, Searchable): request.app.es_orm_events.index(request.app.schema, obj) if (isinstance(obj, ExtendedDirectoryEntry) and obj.published and obj.access in ('public', 'mtan') and obj.directory.enable_update_notifications): send_email_notification_for_directory_entry( obj.directory, obj, request) request.app.org.meta['hourly_maintenance_tasks_last_run'] = now
[docs] def delete_old_tans(request: OrgRequest) -> None: """ Deletes TANs that are older than half a year. Technically we could delete them as soon as they expire but for debugging purposes it makes sense to keep them around a while longer. """ cutoff = utcnow() - timedelta(days=180) query = request.session.query(TAN).filter(TAN.created < cutoff) # cronjobs happen outside a regular request, so we don't need # to synchronize with the session query.delete(synchronize_session=False)
[docs] def delete_old_tan_accesses(request: OrgRequest) -> None: """ Deletes TAN accesses that are older than half a year. Technically we could delete them as soon as they expire but for debugging purposes it makes sense to keep them around a while longer. """ cutoff = utcnow() - timedelta(days=180) query = request.session.query(TANAccess).filter(TANAccess.created < cutoff) # cronjobs happen outside a regular request, so we don't need # to synchronize with the session query.delete(synchronize_session=False)
@OrgApp.cronjob(hour=23, minute=45, timezone='Europe/Zurich')
[docs] def process_resource_rules(request: OrgRequest) -> None: resources = ResourceCollection(request.app.libres_context) for resource in resources.query(): handle_rules_cronjob(resources.bind(resource), request)
[docs] def ticket_statistics_common_template_args( request: OrgRequest, collection: TicketCollection ) -> dict[str, Any]: args: dict[str, Any] = {} layout = DefaultMailLayout(object(), request) # get the current ticket count count = collection.get_count() args['currently_open'] = count.open args['currently_pending'] = count.pending args['currently_closed'] = count.closed # FIXME: a owner of None is not actually valid at runtime # we use this only for generating a link where # owner is not part of the query string, we should # probably come up with a more clean way to handle # situations like that. Ideally morepath would elide # query parameters if they're at their default value. args['open_link'] = request.link( collection.for_state('open').for_owner(None)) # type:ignore args['pending_link'] = request.link( collection.for_state('pending').for_owner(None)) # type:ignore args['closed_link'] = request.link( collection.for_state('closed').for_owner(None)) # type:ignore args['title'] = request.translate( _('${org} OneGov Cloud Status', mapping={ 'org': request.app.org.title }) ) args['layout'] = layout args['org'] = request.app.org.title return args
[docs] def ticket_statistics_users(app: OrgApp) -> list[User]: users = UserCollection(app.session()).query() users = users.filter(User.active == True) users = users.filter(User.role.in_(app.settings.org.status_mail_roles)) users = users.options(undefer('data')) return users.all()
@OrgApp.cronjob(hour=8, minute=30, timezone='Europe/Zurich')
[docs] def send_daily_ticket_statistics(request: OrgRequest) -> None: today = to_timezone(utcnow(), 'Europe/Zurich') if today.weekday() in (SAT, SUN): return if not request.app.send_ticket_statistics: return app = request.app collection = TicketCollection(app.session()) args = ticket_statistics_common_template_args(request, collection) # get tickets created yesterday or on the weekend end = datetime(today.year, today.month, today.day, tzinfo=today.tzinfo) if today.weekday() == MON: start = end - timedelta(days=2) else: start = end - timedelta(days=1) query = collection.query() query = query.filter(Ticket.created >= start) query = query.filter(Ticket.created <= end) args['opened'] = query.count() query = collection.query() query = query.filter(Ticket.modified >= start) query = query.filter(Ticket.modified <= end) query = query.filter(Ticket.state == 'pending') args['pending'] = query.count() query = collection.query() query = query.filter(Ticket.modified >= start) query = query.filter(Ticket.modified <= end) query = query.filter(Ticket.state == 'closed') args['closed'] = query.count() args['is_monday'] = today.weekday() == MON for user in ticket_statistics_users(app): if not user.data or user.data.get('ticket_statistics') != 'daily': continue unsubscribe = args['layout'].unsubscribe_link(user.username) args['username'] = user.username args['unsubscribe'] = unsubscribe content = render_template( 'mail_daily_ticket_statistics.pt', request, args ) app.send_marketing_email( subject=args['title'], receivers=(user.username, ), content=content, headers={ 'List-Unsubscribe': f'<{unsubscribe}>', 'List-Unsubscribe-Post': 'List-Unsubscribe=One-Click' } )
@OrgApp.cronjob(hour=8, minute=45, timezone='Europe/Zurich')
[docs] def send_weekly_ticket_statistics(request: OrgRequest) -> None: today = to_timezone(utcnow(), 'Europe/Zurich') if today.weekday() != MON: return if not request.app.send_ticket_statistics: return app = request.app collection = TicketCollection(app.session()) args = ticket_statistics_common_template_args(request, collection) # get tickets created in the last week end = datetime(today.year, today.month, today.day, tzinfo=today.tzinfo) start = end - timedelta(days=7) query = collection.query() query = query.filter(Ticket.created >= start) query = query.filter(Ticket.created <= end) args['opened'] = query.count() query = collection.query() query = query.filter(Ticket.modified >= start) query = query.filter(Ticket.modified <= end) query = query.filter(Ticket.state == 'pending') args['pending'] = query.count() query = collection.query() query = query.filter(Ticket.modified >= start) query = query.filter(Ticket.modified <= end) query = query.filter(Ticket.state == 'closed') args['closed'] = query.count() # send one e-mail per user for user in ticket_statistics_users(app): if user.data and user.data.get('ticket_statistics') != 'weekly': continue unsubscribe = args['layout'].unsubscribe_link(user.username) args['username'] = user.username args['unsubscribe'] = unsubscribe content = render_template( 'mail_weekly_ticket_statistics.pt', request, args ) app.send_marketing_email( subject=args['title'], receivers=(user.username, ), content=content, headers={ 'List-Unsubscribe': f'<{unsubscribe}>', 'List-Unsubscribe-Post': 'List-Unsubscribe=One-Click' } )
@OrgApp.cronjob(hour=9, minute=0, timezone='Europe/Zurich')
[docs] def send_monthly_ticket_statistics(request: OrgRequest) -> None: today = to_timezone(utcnow(), 'Europe/Zurich') if today.weekday() != MON or today.day > 7: return if not request.app.send_ticket_statistics: return args = {} app = request.app collection = TicketCollection(app.session()) args = ticket_statistics_common_template_args(request, collection) # get tickets created in the last four or five weeks # depending on when the first monday was last month end = datetime(today.year, today.month, today.day, tzinfo=today.tzinfo) start = end - timedelta(days=28) if start.day > 7: start -= timedelta(days=7) query = collection.query() query = query.filter(Ticket.created >= start) query = query.filter(Ticket.created <= end) args['opened'] = query.count() query = collection.query() query = query.filter(Ticket.modified >= start) query = query.filter(Ticket.modified <= end) query = query.filter(Ticket.state == 'pending') args['pending'] = query.count() query = collection.query() query = query.filter(Ticket.modified >= start) query = query.filter(Ticket.modified <= end) query = query.filter(Ticket.state == 'closed') args['closed'] = query.count() # send one e-mail per user for user in ticket_statistics_users(app): if not user.data or user.data.get('ticket_statistics') != 'monthly': continue unsubscribe = args['layout'].unsubscribe_link(user.username) args['username'] = user.username args['unsubscribe'] = unsubscribe content = render_template( 'mail_monthly_ticket_statistics.pt', request, args ) app.send_marketing_email( subject=args['title'], receivers=(user.username, ), content=content, headers={ 'List-Unsubscribe': f'<{unsubscribe}>', 'List-Unsubscribe-Post': 'List-Unsubscribe=One-Click' } )
@OrgApp.cronjob(hour=6, minute=5, timezone='Europe/Zurich')
[docs] def send_daily_resource_usage_overview(request: OrgRequest) -> None: today = to_timezone(utcnow(), 'Europe/Zurich') weekday = WEEKDAYS[today.weekday()] # get all recipients which require an e-mail today recipients_q = ( ResourceRecipientCollection(request.session).query() .filter(ResourceRecipient.medium == 'email') .order_by(None) .order_by(ResourceRecipient.address) .with_entities( ResourceRecipient.address, ResourceRecipient.content ) ) # If the key 'daily_reservations' doesn't exist, the recipient was # created before anything else was an option, therefore it must be true recipients = [ (address, content['resources']) for address, content in recipients_q if content.get('daily_reservations', True) and weekday in content['send_on'] ] if not recipients: return # extract a list of all required resource ids resource_ids = { UUID(rid) for _, resources in recipients for rid in resources } # get the resource titles and ids default_group = request.translate(_('General')) all_resources = tuple( ResourceCollection(request.app.libres_context).query() .filter(Resource.id.in_(resource_ids)) .with_entities( Resource.id, Resource.group, Resource.title, Resource.definition ) .order_by(Resource.group, Resource.name, Resource.id) ) resources = OrderedDict( (r.id.hex, f'{r.group or default_group} - {r.title}') for r in all_resources ) @lru_cache(maxsize=128) def form(definition: str) -> type[Form]: return parse_form(definition) # get the reservations of this day start = align_date_to_day(today, 'Europe/Zurich', 'down') end = align_date_to_day(today, 'Europe/Zurich', 'up') # load all approved reservations for all required resources all_reservations = [ r for r in request.session.query(Reservation) .filter(Reservation.resource.in_(resource_ids)) .filter(Reservation.status == 'approved') .filter(Reservation.data != None) .filter(and_(start <= Reservation.start, Reservation.start <= end)) .order_by(Reservation.resource, Reservation.start) if r.data and r.data.get('accepted') ] # load all linked form submissions if all_reservations: q = request.session.query(FormSubmission) q = q.filter(FormSubmission.id.in_( {r.token for r in all_reservations} )) submissions = {submission.id: submission for submission in q} for reservation in all_reservations: submission = submissions.get(reservation.token) # FIXME: Is this an actual relationship that exists or do # we set this attribute temporarily for the mail # template? It might be cleaner to do this lookup # inside the template, rather than rely on a # temporary attribute reservation.submission = submission # type:ignore # group th reservations by resource reservations = { resid.hex: tuple(reservations) for resid, reservations in groupby( all_reservations, key=lambda r: r.resource ) } # send out the e-mails args: RenderData = { 'layout': DefaultMailLayout(object(), request), 'title': request.translate( _('${org} Reservation Overview', mapping={ 'org': request.app.org.title }) ), 'organisation': request.app.org.title, 'resources': resources, 'parse_form': form } for address, included_resources in recipients: args['included_resources'] = included_resources args['reservations'] = reservations content = render_template( 'mail_daily_resource_usage_overview.pt', request, args ) request.app.send_transactional_email( subject=args['title'], receivers=(address, ), content=content )
@OrgApp.cronjob(hour='*', minute='*/30', timezone='UTC')
[docs] def end_chats_and_create_tickets(request: OrgRequest) -> None: half_hour_ago = utcnow() - timedelta(minutes=30) chats = ChatCollection(request.session).query().filter( Chat.active == True).filter(Chat.chat_history != []).filter( Chat.last_change < half_hour_ago) for chat in chats: chat.active = False with chats.session.no_autoflush: ticket = TicketCollection(request.session).open_ticket( handler_code='CHT', handler_id=chat.id.hex ) TicketMessage.create(ticket, request, 'opened') send_ticket_mail( request=request, template='mail_turned_chat_into_ticket.pt', subject=_('Your Chat has been turned into a ticket'), receivers=(chat.email, ), ticket=ticket, content={ 'model': chats, 'ticket': ticket, 'chat': chat, 'organisation': request.app.org.title, } )
@OrgApp.cronjob(hour=4, minute=30, timezone='Europe/Zurich')
[docs] def archive_old_tickets(request: OrgRequest) -> None: archive_timespan = request.app.org.auto_archive_timespan session = request.session if archive_timespan is None: return # type:ignore[unreachable] if archive_timespan == 0: return cutoff_date = utcnow() - timedelta(days=archive_timespan) query = session.query(Ticket) query = query.filter(Ticket.state == 'closed') query = query.filter(Ticket.last_change <= cutoff_date) further_back = cutoff_date - timedelta(days=712) for ticket in query: if isinstance(ticket.handler, ReservationHandler): if ticket.handler.has_future_reservation: continue most_future_reservation = ticket.handler.most_future_reservation if ( most_future_reservation is not None and most_future_reservation.end is not None and most_future_reservation.end > further_back ): continue ticket.archive_ticket()
@OrgApp.cronjob(hour=5, minute=30, timezone='Europe/Zurich')
[docs] def delete_old_tickets(request: OrgRequest) -> None: delete_timespan = request.app.org.auto_delete_timespan session = request.session if delete_timespan is None: return # type:ignore[unreachable] if delete_timespan == 0: return cutoff_date = utcnow() - timedelta(days=delete_timespan) query = session.query(Ticket) query = query.filter(Ticket.state == 'archived') query = query.filter(Ticket.last_change <= cutoff_date) delete_tickets_and_related_data(request, query)
@OrgApp.cronjob(hour=9, minute=30, timezone='Europe/Zurich')
[docs] def send_monthly_mtan_statistics(request: OrgRequest) -> None: today = to_timezone(utcnow(), 'Europe/Zurich') if today.weekday() != MON or today.day > 7: return year = today.year month = today.month # rewind to previous month if month == 1: month = 12 year -= 1 else: month -= 1 # count all the mTAN created in that period # we use UTC as a reference for day boundaries so we don't have to # calculate the boundaries ourselves and risk creating overlapping # intervals mtan_count: int = request.session.query(func.count(TAN.id)).filter(and_( func.extract('year', TAN.created) == year, func.extract('month', TAN.created) == month, TAN.meta['mobile_number'].isnot(None) )).scalar() if not mtan_count: # don't send a mail if we generated no mTANs return month_name = get_month_names('wide', locale='de_CH')[month] org_name = request.app.org.name # FIXME: Make e-mail configurable and text translatable # TODO: Include more detailed stats? E.g. volume per country code # or numbers that triggered more than a configured amount # to catch suspicious activity request.app.send_transactional_email( receivers='info@seantis.ch', subject=f'{org_name}: mTAN Statistik {month_name} {year}', plaintext=( f'{org_name} hatte im {month_name} {year}\n' f'{mtan_count} mTAN SMS versendet' ) )
@OrgApp.cronjob(hour=4, minute=0, timezone='Europe/Zurich')
[docs] def delete_content_marked_deletable(request: OrgRequest) -> None: """ Find all models inheriting from DeletableContentExtension, iterate over objects marked as `deletable` and delete them if expired. Currently extended directory entries, news, events and occurrences. """ now = to_timezone(utcnow(), 'Europe/Zurich') count = 0 for base in request.app.session_manager.bases: for model in find_models(base, lambda cls: issubclass( cls, DeletableContentExtension)): query = request.session.query(model) query = query.filter(model.delete_when_expired == True) for obj in query: # delete entry if end date passed if isinstance(obj, (News, ExtendedDirectoryEntry)): if obj.publication_end and obj.publication_end < now: request.session.delete(obj) count += 1 # check on past events and its occurrences if request.app.org.delete_past_events: query = request.session.query(Occurrence) query = query.filter(Occurrence.end < now) for obj in query: request.session.delete(obj) count += 1 query = request.session.query(Event) for obj in query: if not obj.future_occurrences(limit=1).all(): request.session.delete(obj) count += 1 if count: print(f'Cron: Deleted {count} expired deletable objects in db')
@OrgApp.cronjob(hour=7, minute=0, timezone='Europe/Zurich')
[docs] def update_newsletter_email_bounce_statistics( request: OrgRequest ) -> None: # I choose hour=7 as the maximum time difference between Eastern Standard # Time (EST) and Central European Summer Time (CEST) is 7 hours. This # occurs when EST is observing standard time (UTC-5) and CEST is observing # daylight saving time (UTC+2). # Postmark uses EST in `fromdate` and `todate`, see # https://postmarkapp.com/developer/api/bounce-api. def get_postmark_token() -> str: # read postmark token from the applications configuration mail_config = request.app.mail if mail_config: mailer = mail_config.get('marketing', {}).get('mailer', None) if mailer == 'postmark': return mail_config.get('marketing', {}).get('token', '') return '' def get_bounces() -> list[dict[str, Any]]: token = get_postmark_token() yesterday = utcnow() - timedelta(days=1) r = None try: r = requests.get( 'https://api.postmarkapp.com/bounces?count=500&offset=0', f'fromDate={yesterday.date()}&toDate=' f'{yesterday.date()}&inactive=true', headers={ 'Accept': 'application/json', 'X-Postmark-Server-Token': token, }, timeout=30, ) r.raise_for_status() bounces = r.json().get('Bounces', []) except requests.exceptions.HTTPError as http_err: if r and r.status_code == 401: raise RuntimeWarning( f'Postmark API token is not set or invalid: {http_err}' ) from None else: raise return bounces postmark_bounces = get_bounces() collections = (RecipientCollection, EntryRecipientCollection) for collection in collections: recipients = collection(request.session) for bounce in postmark_bounces: email = bounce.get('Email', '') inactive = bounce.get('Inactive', False) recipient = recipients.by_address(email) if recipient and inactive: print(f'Mark recipient {recipient.address} as inactive') recipient.mark_inactive()
@OrgApp.cronjob(hour=4, minute=30, timezone='Europe/Zurich')
[docs] def delete_unconfirmed_newsletter_subscriptions(request: OrgRequest) -> None: """ Delete unconfirmed newsletter subscriptions older than 7 days. """ now = to_timezone(utcnow(), 'Europe/Zurich') cutoff_date = now - timedelta(days=7) count = 0 query = request.session.query(Recipient) query = query.filter(Recipient.confirmed == False) query = query.filter(Recipient.created < cutoff_date) for obj in query: request.session.delete(obj) count += 1 if count: print(f'Cron: Deleted {count} unconfirmed newsletter subscriptions')
[docs] def get_news_for_push_notification(session: Session) -> Query[News]: # Use UTC time for database comparisons since publication_start is stored # in UTC now = utcnow() # Get all news items that should trigger push notifications query = session.query(News) query = query.filter(News.published.is_(True)) query = query.filter(News.publication_start <= now) news_with_sent_notifications = session.query( PushNotification.news_id ).subquery() query = query.filter(~News.id.in_(news_with_sent_notifications)) only_public_news = query.filter( or_( News.meta['access'].astext == 'public', News.meta['access'].astext.is_(None) ) ) only_public_with_send_push_notification = only_public_news.filter( News.meta['send_push_notifications_to_app'].astext == 'true' ) return only_public_with_send_push_notification
@OrgApp.cronjob(hour='*', minute='*/10', timezone='UTC')
[docs] def send_push_notifications_for_news(request: OrgRequest) -> None: """ Cronjob that runs every 10 minutes to send push notifications for news. It collects all news items with: - Publication start date within the last 10 minutes - send_push_notifications_to_app flag enabled - Defined push_notifications topics Then uses Firebase to send notifications to the corresponding topics. """ session = request.session org = request.app.org # Skip if no Firebase credentials are configured if not org.firebase_adminsdk_credential: return news_items = get_news_for_push_notification(session).all() if not news_items: return # Get the mapping topic_mapping = org.meta.get('selectable_push_notification_options', []) if not topic_mapping: return # Decrypt the Firebase credentials key_base64 = request.app.hashed_identity_key encrypted_creds = org.firebase_adminsdk_credential if not encrypted_creds: return try: firebase_creds_json = decrypt_symmetric( encrypted_creds.encode('utf-8'), key_base64 ) except InvalidToken: log.warning('Failed to decrypt Firebase credentials: InvalidToken') return try: # Get notification service notification_service = get_notification_service(firebase_creds_json) # Process each news item for notifications sent_count = 0 duplicate_count = 0 for news in news_items: # Get the topics to send to topics = news.meta.get('push_notifications', []) if not topics: print(f'No topics configured for news item: {news.title}') continue print( f'Processing notification for news: {news.title} to ' f'{len(topics)} topics' ) for topic_id in topics: if not isinstance(topic_id, str): continue # Check if notification was already sent if PushNotification.was_notification_sent( session, news.id, topic_id ): print( f"Skipping duplicate notification to topic " f"'{topic_id}' for news '{news.title}'." ) duplicate_count += 1 continue notification_title = news.title notification_body = news.lead or '' notification_data = { 'id': str(request.link(news)), 'title': news.title, 'lead': notification_body, } try: # Create a "pending" notification record notification = PushNotification( news_id=news.id, topic_id=topic_id, sent_at=utcnow(), response_data={'status': 'pending'}, ) session.add(notification) session.flush() # This will raise IntegrityError if record # exists # If we got here, we're the first process to try sending # this notification response = notification_service.send_notification( topic=topic_id, title=notification_title, body=notification_body, data=notification_data, ) # Update the record with actual response data notification.response_data = { 'message_id': response, 'timestamp': utcnow().isoformat(), 'status': 'sent', } session.flush() sent_count += 1 log.debug( f"Successfully sent notification to topic '{topic_id}'" f" for news '{news.title}'. Response: {response}" ) except IntegrityError: # Another process probably already created a record for # this notification session.rollback() duplicate_count += 1 log.info( f"Skipping duplicate notification to topic " f"'{topic_id}' for news '{news.title}'. " ) except Exception as e: # For other exceptions (like notification service failures) error_details = str(e) log.error( f"Error sending notification to topic '{topic_id}' " f"for news '{news.title}': {error_details}" ) if sent_count: print(f'Cron: Sent {sent_count} push notifications for news items') if duplicate_count: print(f'Cron: Skipped {duplicate_count} duplicate notifications') if not sent_count and not duplicate_count: print('No notifications were sent') except Exception as e: # Rollback in case of error session.rollback() print(traceback.format_exc()) print(f'Error sending notifications: {e}')