Source code for file.attachments

from __future__ import annotations

import pdftotext  # type:ignore

from depot.fields.upload import UploadedFile
from depot.io import utils
from depot.io.interfaces import FileStorage
from depot.io.utils import INMEMORY_FILESIZE
from io import BytesIO
from onegov.core.html import sanitize_svg
from onegov.file.utils import digest
from onegov.file.utils import get_image_size
from onegov.file.utils import get_svg_size
from onegov.file.utils import IMAGE_MIME_TYPES
from onegov.file.utils import word_count
from PIL import Image, ImageOps, UnidentifiedImageError
from tempfile import SpooledTemporaryFile
from onegov.pdf.utils import extract_pdf_info


from typing import IO, TYPE_CHECKING
if TYPE_CHECKING:
    from depot.io.interfaces import _FileContent
    from typing import NotRequired, TypedDict

[docs] class _ImageSaveOptionalParams(TypedDict):
[docs] exif: NotRequired[Image.Exif]
[docs] IMAGE_MAX_SIZE = 2048
[docs] IMAGE_QUALITY = 85
[docs] CHECKSUM_FUNCTION = 'md5'
[docs] def get_svg_size_or_default(content: IO[bytes]) -> tuple[str, str]: width, height = get_svg_size(content) width = width if width is not None else f'{IMAGE_MAX_SIZE}px' height = height if height is not None else f'{IMAGE_MAX_SIZE}px' return width, height
[docs] def strip_exif_and_limit_and_store_image_size( file: ProcessedUploadedFile, content: IO[bytes], content_type: str | None ) -> IO[bytes] | None: if content_type == 'image/svg+xml': file.size = get_svg_size_or_default(content) if content_type not in IMAGE_MIME_TYPES: return None try: # TODO: We should consider opening the image twice, and calling # verify on the first open, just in case the verify # is more thorough than a plain load. image = Image.open(content) # we perform a manual load so some of the verification happens # earlier than it otherwise would, so we can catch all the # exceptions here in one place. image.load() except Image.DecompressionBombError: # reraise this one specifically raise except Exception: # treat any other internal error as an UnidentifiedImageError raise UnidentifiedImageError from None needs_resample = max(image.size) > IMAGE_MAX_SIZE has_exif = bool(hasattr(image, 'getexif') and image.getexif()) if needs_resample or has_exif: params: _ImageSaveOptionalParams = {} if has_exif: # bake EXIF orientation into the image ImageOps.exif_transpose(image, in_place=True) # replace EXIF section with an empty one params['exif'] = Image.Exif() if needs_resample: image.thumbnail( (IMAGE_MAX_SIZE, IMAGE_MAX_SIZE), Image.Resampling.LANCZOS ) content = SpooledTemporaryFile(INMEMORY_FILESIZE) # noqa: SIM115 try: # Quality is only supported by jpeg image.save(content, image.format, quality=IMAGE_QUALITY, **params) except ValueError: image.save(content, image.format, **params) # the file size is stored in pixel as string (for browser usage) file.size = get_image_size(image) return content
[docs] def store_checksum( file: ProcessedUploadedFile, content: IO[bytes], content_type: str | None ) -> None: file.checksum = digest(content, type=CHECKSUM_FUNCTION)
[docs] def sanitize_svg_images( file: ProcessedUploadedFile, content: IO[bytes], content_type: str | None ) -> IO[bytes]: if content_type == 'image/svg+xml': sane_svg = sanitize_svg(content.read().decode('utf-8')) content = BytesIO(sane_svg.encode('utf-8')) return content
[docs] def store_extract_and_pages( file: ProcessedUploadedFile, content: IO[bytes], content_type: str | None ) -> None: if content_type == 'application/pdf': pages, file.extract = extract_pdf_info(content) file.stats = { 'pages': pages, 'words': word_count(file.extract) }
[docs] class ProcessedUploadedFile(UploadedFile):
[docs] processors = ( store_checksum, strip_exif_and_limit_and_store_image_size, sanitize_svg_images, store_extract_and_pages, )
[docs] def process_content( self, content: _FileContent, filename: str | None = None, content_type: str | None = None ) -> None: filename, content_type = FileStorage.fileinfo(content)[1:] _, content = utils.file_from_content(content) try: for processor in self.processors: content = processor(self, content, content_type) or content content.seek(0) except Image.DecompressionBombError: # not a real content type - but useful for us to be able to rely # on anything uploaded having a content type, even though that # content is discarded soon afterwards content = b'' content_type = 'application/malicious' except UnidentifiedImageError: # also not a real content type content = b'' content_type = 'application/unidentified-image' except pdftotext.Error: # signed pdfs have shown to be difficult to handle by pdftotext # it uses poppler apt package in the background resulting in old # versions installed on the host. We still would like to sign # the pdfs in which case file.stats has hopefully been set already pass super().process_content(content, filename, content_type)