"""
Send SMS through ASPSMS
Adapted from `repoze.sendmail<https://github.com/repoze/repoze.sendmail>`_.
Usage::
qp = SmsQueueProcessor(sms_directory)
qp.send_messages()
"""
import errno
import logging
import json
import os
import pycurl
import stat
import time
from io import BytesIO
from typing import Any, TYPE_CHECKING
if TYPE_CHECKING:
from collections.abc import Sequence
from onegov.core.framework import Framework
from onegov.core.types import JSON_ro
[docs]
log = logging.getLogger('onegov.core')
# The below diagram depicts the operations performed while sending a message.
# This sequence of operations will be performed for each file in the maildir
# on which ``send_message`` is called.
#
# Any error conditions not depected on the diagram will provoke the catch-all
# exception logging of the ``send_message`` method.
#
# In the diagram the "message file" is the file in the maildir's "cur"
# directory that contains the message and "tmp file" is a hard link to the
# message file created in the maildir's "tmp" directory.
#
# ( start trying to deliver a message )
# |
# |
# V
# +-----( get tmp file mtime )
# | |
# | | file exists
# | V
# | ( check age )-----------------------------+
# tmp file | | file is new |
# does not | | file is old |
# exist | | |
# | ( unlink tmp file )-----------------------+ |
# | | file does | |
# | | file unlinked not exist | |
# | V | |
# +---->( touch message file )------------------+ | |
# | file does | | |
# | not exist | | |
# V | | |
# ( link message file to tmp file )----------+ | | |
# | tmp file | | | |
# | already exists | | | |
# | | | | |
# V V V V V
# ( send message ) ( skip this message )
# |
# V
# ( unlink message file )---------+
# | |
# | file unlinked | file no longer exists
# | |
# | +-----------------+
# | |
# | V
# ( unlink tmp file )------------+
# | |
# | file unlinked | file no longer exists
# V |
# ( message delivered )<---------+
# The longest time sending a file is expected to take. Longer than this and
# the send attempt will be assumed to have failed. This means that sending
# very large files or using very slow mail servers could result in duplicate
# messages sent.
[docs]
MAX_SEND_TIME = 60 * 60 * 3
[docs]
class SmsQueueProcessor:
def __init__(
self,
path: str,
username: str,
password: str,
originator: str | None = None
):
[docs]
self.username = username
[docs]
self.password = password
[docs]
self.originator = originator or 'OneGov'
# Keep a pycurl object around, to use HTTP keep-alive - though pycurl
# is much worse in terms of it's API, the performance is *much* better
# than requests and it supports modern features like HTTP/2 or HTTP/3
[docs]
self.url = 'https://json.aspsms.com/SendSimpleTextSMS'
[docs]
self.curl = pycurl.Curl()
self.curl.setopt(pycurl.TCP_KEEPALIVE, 1)
self.curl.setopt(pycurl.URL, self.url)
self.curl.setopt(pycurl.HTTPHEADER, ['Content-Type:application/json'])
self.curl.setopt(pycurl.POST, 1)
[docs]
def split(self, filename: str) -> tuple[str, str, str]:
""" Returns the path, the name and the suffix of the given path. """
if '/' in filename:
path, name = filename.rsplit('/', 1)
else:
path = ''
name = filename
if '.' in name:
name, suffix = name.split('.', 1)
else:
suffix = ''
return path, name, suffix
[docs]
def message_files(self) -> tuple[str, ...]:
""" Returns a tuple of full paths that need processing.
The file names in the directory usually look like this:
* 0.1571822840.745629
* 1.1571822743.595377
The part before the first dot is the batch number the rest is
the timestamp at time of calling app.send_sms.
The messages are sorted by suffix, so by default the sorting
happens from oldest to newest message.
"""
files = []
for f in os.scandir(self.path):
if not f.is_file():
continue
# ignore .sending- .rejected- files
if f.name.startswith('.'):
continue
files.append(f)
files.sort(key=lambda i: self.split(i.name)[-1])
return tuple(os.path.join(self.path, f.name) for f in files)
[docs]
def send(
self,
numbers: 'Sequence[str]',
content: str
) -> dict[str, Any] | None:
""" Sends the SMS and returns the API response on error.
On success this returns None.
"""
code, body = self.send_request({
'UserName': self.username,
'Password': self.password,
'Originator': self.originator,
'Recipients': numbers,
'MessageText': content,
})
if 400 <= code < 600:
raise RuntimeError(f'{code} calling {self.url}: {body}')
result = json.loads(body)
if result.get('StatusInfo') != 'OK' or result.get('StatusCode') != '1':
return result
return None
[docs]
def send_request(self, parameters: 'JSON_ro') -> tuple[int, str]:
""" Performes the API request using the given parameters. """
body = BytesIO()
self.curl.setopt(pycurl.WRITEDATA, body)
self.curl.setopt(pycurl.POSTFIELDS, json.dumps(parameters))
self.curl.perform()
code = self.curl.getinfo(pycurl.RESPONSE_CODE)
body.seek(0)
body_str = body.read().decode('utf-8')
return code, body_str
[docs]
def parse(
self,
filename: str
) -> tuple[tuple[str, ...] | None, str | None]:
with open(filename) as f:
try:
data = json.loads(f.read())
except json.JSONDecodeError:
return None, None
if not isinstance(data, dict):
return None, None
receivers = data.get('receivers')
content = data.get('content')
if not isinstance(receivers, list):
return None, content
# NOTE: For now we silently drop invalid numbers in a batch
# maybe we want to fail instead in this case.
# This should only really come into play if someone
# messes with the file contents in an editor.
# Numbers stored in the system are pre-validated.
receivers = tuple(
r for r in receivers
if isinstance(r, str) and r.lstrip('+').isdigit()
)
return receivers, content
[docs]
def send_messages(self) -> None:
for filename in self.message_files():
self.send_message(filename)
[docs]
def send_message(self, filename: str) -> None:
head, tail = os.path.split(filename)
tmp_filename = os.path.join(head, f'.sending-{tail}')
rejected_filename = os.path.join(head, f'.rejected-{tail}')
failed_filename = os.path.join(head, f'.failed-{tail}')
# perform a series of operations in an attempt to ensure
# that no two threads/processes send this message
# simultaneously as well as attempting to not generate
# spurious failure messages in the log; a diagram that
# represents these operations is included in a
# comment above this class
try:
# find the age of the tmp file (if it exists)
mtime = os.stat(tmp_filename)[stat.ST_MTIME]
except OSError as e:
if e.errno == errno.ENOENT:
# file does not exist
# the tmp file could not be stated because it
# doesn't exist, that's fine, keep going
age = None
else:
# the tmp file could not be stated for some reason
# other than not existing; we'll report the error
raise
else:
age = time.time() - mtime
# if the tmp file exists, check it's age
if age is not None:
try:
if age > MAX_SEND_TIME:
# the tmp file is "too old"; this suggests
# that during an attemt to send it, the
# process died; remove the tmp file so we
# can try again
os.remove(tmp_filename)
else:
# the tmp file is "new", so someone else may
# be sending this message, try again later
return
# if we get here, the file existed, but was too
# old, so it was unlinked
except OSError as e:
if e.errno == errno.ENOENT:
# file does not exist
# it looks like someone else removed the tmp
# file, that's fine, we'll try to deliver the
# message again later
return
# now we know that the tmp file doesn't exist, we need to
# "touch" the message before we create the tmp file so the
# mtime will reflect the fact that the file is being
# processed (there is a race here, but it's OK for two or
# more processes to touch the file "simultaneously")
try:
os.utime(filename, None)
except OSError as e:
if e.errno == errno.ENOENT:
# file does not exist
# someone removed the message before we could
# touch it, no need to complain, we'll just keep
# going
return
else:
# Some other error, propogate it
raise
# creating this hard link will fail if another process is
# also sending this message
try:
os.link(filename, tmp_filename)
except OSError as e:
if e.errno == errno.EEXIST:
# file exists, *nix
# it looks like someone else is sending this
# message too; we'll try again later
return
else:
# Some other error, propogate it
raise
# read message file and send contents
numbers, message = self.parse(filename)
if numbers and message:
status = self.send(numbers, message)
if status is None:
log.info('SMS to {} sent.'.format(', '.join(numbers)))
else:
# this should cause stderr output, which
# will write the cronjob output to chat
log.error(
f'Failed sending SMS batch {filename} with '
f'API response {status}'
)
os.link(filename, failed_filename)
else:
# this should cause stderr output, which
# will write the cronjob output to chat
log.error(
f'Discarding SMS batch {filename} due to invalid '
'content/numbers'
)
os.link(filename, rejected_filename)
try:
os.remove(filename)
except OSError as e:
if e.errno == errno.ENOENT:
# file does not exist
# someone else unlinked the file; oh well
pass
else:
# something bad happend, log it
raise
try:
os.remove(tmp_filename)
except OSError as e:
if e.errno == errno.ENOENT:
# file does not exist
# someone else unlinked the file; oh well
pass
else:
# something bad happened, log it
raise
[docs]
def get_sms_queue_processor(
app: 'Framework',
missing_path_ok: bool = False
) -> SmsQueueProcessor | None:
if not app.can_deliver_sms:
return None
username = app.sms.get('user')
password = app.sms.get('password')
originator = app.sms.get('originator')
tenants = app.sms.get('tenants', {})
sms = tenants.get(app.application_id, tenants.get(app.namespace))
if sms is not None:
username = sms.get('user', username)
password = sms.get('password', password)
originator = sms.get('originator', originator)
elif username is None:
return None
assert app.sms_directory
assert username is not None and password is not None
path = os.path.join(app.sms_directory, app.schema)
path = os.path.abspath(path)
if missing_path_ok or os.path.exists(path):
return SmsQueueProcessor(
path,
username,
password,
originator
)
return None