Source code for server.cli

""" The onegov.server can be run through the 'onegov-server' command after
installation.

Said command runs the onegov server with the given configuration file in the
foreground.

Use this **for debugging/development only**.

Example::

    onegov-server --config-file test.yml

The onegov-server will load 'onegov.yml' by default and it will restart
when any file in the current folder or any file somewhere inside './src'
changes.

Changes to omlette directories require a manual restart.

A onegov.yml file looks like this:

.. code-block:: yaml

    applications:
      - path: /apps/*
        application: my.app.TestApp
        namespace: apps
        configuration:
          allowed_hosts_expression: '^[a-z]+.apps.dev'
          dsn: postgres://username:password@localhost:5432/db
          identity_secure: false
          identity_secret: very-secret-key

    logging:
      formatters:
        simpleFormater:
          format: '%(asctime)s - %(levelname)s: %(message)s'
          datefmt: '%Y-%m-%d %H:%M:%S'

    handlers:
      console:
        class: logging.StreamHandler
        formatter: simpleFormater
        level: DEBUG
        stream: ext://sys.stdout

    loggers:
      onegov.core:
        level: DEBUG
        handlers: [console]
"""

import bjoern  # type:ignore[import-untyped]
import click
import multiprocessing
import os
import sentry_sdk
import shutil
import signal
import sys
import time
import traceback
import tracemalloc

from functools import partial
from importlib import import_module
from onegov.server import Config
from onegov.server import log
from onegov.server import Server
from onegov.server.tracker import ResourceTracker
from sentry_sdk.integrations.redis import RedisIntegration
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
from sentry_sdk.integrations.wsgi import SentryWsgiMiddleware
from time import perf_counter
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer


from typing import Any, Literal, TYPE_CHECKING
if TYPE_CHECKING:
    from _typeshed import OptExcInfo
    from _typeshed.wsgi import WSGIApplication, WSGIEnvironment, StartResponse
    from collections.abc import Callable, Iterable
    from multiprocessing.sharedctypes import Synchronized
    from types import FrameType
    from watchdog.events import FileSystemEvent


[docs] RESOURCE_TRACKER: ResourceTracker = None # type:ignore[assignment]
@click.command() @click.option( '--config-file', '-c', help='Configuration file to use', type=click.Path(exists=True), default='onegov.yml' ) @click.option( '--port', '-p', help='Port to bind to', type=click.IntRange(min=0, max=65535), default=8080 ) @click.option( '--pdb', help='Enable post-mortem debugging (debug mode only)', default=False, is_flag=True ) @click.option( '--tracemalloc', help='Enable tracemalloc (debug mode only)', default=False, is_flag=True ) @click.option( '--mode', help='Defines the mode used to run the server cli (debug|production)', type=click.Choice(('debug', 'production'), case_sensitive=False), default='debug', ) @click.option( '--sentry-dsn', help='Sentry DSN to use (production mode only)', default=None, ) @click.option( '--sentry-environment', help='Sentry environment tag (production mode only)', default='testing', ) @click.option( '--sentry-release', help='Sentry release tag (production mode only)', default=None, ) @click.option( '--send-ppi', help='Allow sentry_sdk to send personally identifiable information', default=False, is_flag=True ) @click.option( '--traces-sample-rate', help='How often should sentry_sdk send traces to the backend', type=click.FloatRange(min=0.0, max=1.0), default=0.1 ) @click.option( '--profiles-sample-rate', help='How often should sentry_sdk also send a profile with the trace', type=click.FloatRange(min=0.0, max=1.0), default=0.25 )
[docs] def run( config_file: str | bytes, port: int, pdb: bool, tracemalloc: bool, mode: Literal['debug', 'production'], sentry_dsn: str | None, sentry_environment: str, sentry_release: str | None, send_ppi: bool, traces_sample_rate: float, profiles_sample_rate: float ) -> None: """ Runs the onegov server with the given configuration file in the foreground. Use this **for debugging/development only**. Example:: onegov-server --config-file test.yml The onegov-server will load 'onegov.yml' by default and it will restart when any file in the current folder or any file somewhere inside './src' changes. Changes to omlette directories require a manual restart. """ # <- the docs are currently duplicated somewhat at the top of the module # because click does not play well with sphinx yet # see https://github.com/mitsuhiko/click/issues/127 # We do not use process forking for Python's multiprocessing here, as some # shared libraries (namely kerberos) do not work well with forks (i.e. # there are hangs). # # It is also cleaner to use 'spawn' as we get a new process spawned each # time, which ensures that there is no residual state around that might # cause the first run of onegov-server to be different than any subsequent # runs through automated reloads. if mode == 'debug': return run_debug(config_file, port, pdb, tracemalloc) if sentry_dsn: with_sentry = True integrations = [ RedisIntegration(), SqlalchemyIntegration(), ] # HACK: We don't want to statically depend on onegov.core # from onegov.server, but our sentry integration does # need to be defined there, so it knows about the internals # of the application, so we use importlib.import_module # to make this dependency optional, if we decide we can # live with less information, we could move this out into # more.sentry and import the integration from there. try: ogc_sentry = import_module('onegov.core.sentry') integrations.insert(0, ogc_sentry.OneGovCloudIntegration( # while inserting the wsgi middleware twice is not # harmful, we do save ourselves some static overhead # if we don't do it. with_wsgi_middleware=False )) except ImportError: pass sentry_sdk.init( dsn=sentry_dsn, release=sentry_release, environment=sentry_environment, send_default_pii=send_ppi, traces_sample_rate=traces_sample_rate, profiles_sample_rate=profiles_sample_rate, integrations=integrations) # somehow sentry attaches itself to the global exception hook, even if # we set 'install_sys_hook' to False -> so we just reset to the # original state before serving our application (otherwise we get each # error report twice) sys.excepthook = sys.__excepthook__ else: with_sentry = False return run_production(config_file, port, with_sentry=with_sentry)
[docs] def run_production( config_file: str | bytes, port: int, with_sentry: bool ) -> None: # required by Bjoern env = {'webob.url_encoding': 'latin-1'} app: WSGIApplication = Server( config=Config.from_yaml_file(config_file), environ_overrides=env) if with_sentry: # NOTE: Most things should be caught at lower scopes with # more detailed information by our integrations, but # the SentryWsgiMiddleware also performs some bookkeeping # necessary for traces and profiles to work, so we # should make this the top level application wrapper # We could wrap each hosted application individually # instead, but then we are not measuring the overhead # of this top-level application router. app = SentryWsgiMiddleware(app) log.debug(f'started onegov server on http://127.0.0.1:{port}') bjoern.run(app, '127.0.0.1', port, reuse_port=True)
[docs] def run_debug( config_file: str | bytes, port: int, pdb: bool, tracemalloc: bool ) -> None: multiprocessing.set_start_method('spawn') factory = partial(debug_wsgi_factory, config_file=config_file, pdb=pdb) server = WsgiServer(factory, port=port, enable_tracemalloc=tracemalloc) server.start() observer = Observer() observer.schedule(server, 'src', recursive=True) observer.schedule(server, '.', recursive=False) observer.start() while True: try: server.join(0.2) except KeyboardInterrupt: observer.stop() server.stop() sys.exit(0)
[docs] def debug_wsgi_factory(config_file: str | bytes, pdb: bool) -> Server: return Server(Config.from_yaml_file(config_file), post_mortem=pdb)
[docs] class WSGIRequestMonitorMiddleware: """ Measures the time it takes to respond to a request and prints it at the end of the request. """ def __init__(self, app: 'WSGIApplication'):
[docs] self.app = app
[docs] def __call__( self, environ: 'WSGIEnvironment', start_response: 'StartResponse' ) -> 'Iterable[bytes]': received = perf_counter() received_status: str = '' def local_start_response( status: str, headers: list[tuple[str, str]], exc_info: 'OptExcInfo | None' = None ) -> 'Callable[[bytes], object]': nonlocal received_status received_status = status return start_response(status, headers, exc_info) response = self.app(environ, local_start_response) self.log(environ, received_status, received) return response
[docs] def log( self, environ: 'WSGIEnvironment', status: str, received: float ) -> None: duration_ms = (perf_counter() - received) * 1000.0 status = status.split(' ', 1)[0] path = f"{environ['SCRIPT_NAME']}{environ['PATH_INFO']}" method = environ['REQUEST_METHOD'] template = ( '{status} - {duration} - {method} {path} - {c:.3f} MiB ({d:+.3f})' ) if status in ('302', '304'): path = click.style(path, fg=(66, 66, 66)) # grey else: pass # white if duration_ms > 500.0: duration = click.style(f'{duration_ms:.0f} ms', fg='red') elif duration_ms > 250.0: duration = click.style(f'{duration_ms:.0f} ms', fg='yellow') else: duration = click.style(f'{duration_ms:.0f} ms', fg='green') if method == 'POST': method = click.style(method, underline=True) RESOURCE_TRACKER.track() usage = RESOURCE_TRACKER.memory_usage delta = RESOURCE_TRACKER.memory_usage_delta print(template.format( status=status, method=method, path=path, duration=duration, c=usage / 1024 / 1024, d=delta / 1024 / 1024 ))
[docs] class WsgiProcess(multiprocessing.Process): """ Runs the WSGI reference server in a separate process. This is a debug process, not used in production. """
[docs] _ready: 'Synchronized[int]'
def __init__( self, app_factory: 'Callable[[], WSGIApplication]', host: str = '127.0.0.1', port: int = 8080, env: dict[str, str] | None = None, enable_tracemalloc: bool = False ):
[docs] env = env or {}
multiprocessing.Process.__init__(self)
[docs] self.app_factory = app_factory
[docs] self.host = host
[docs] self.port = port
[docs] self.enable_tracemalloc = enable_tracemalloc
self._ready = multiprocessing.Value('i', 0) # hook up environment variables for key, value in env.items(): os.environ[key] = value try: self.stdin_fileno = sys.stdin.fileno() except ValueError: pass # in testing, stdin is not always real @property
[docs] def ready(self) -> bool: return self._ready.value == 1
[docs] def print_memory_stats( self, signum: int, frame: 'FrameType | None' ) -> None: print('-' * shutil.get_terminal_size((80, 20)).columns) RESOURCE_TRACKER.show_memory_usage() if tracemalloc.is_tracing(): RESOURCE_TRACKER.show_monotonically_increasing_traces() print('-' * shutil.get_terminal_size((80, 20)).columns)
[docs] def disable_systemwide_darwin_proxies(self): # type:ignore # System-wide proxy settings on darwin need to be disabled, because # it leads to crashes in our forked subprocess: # https://bugs.python.org/issue27126 # https://bugs.python.org/issue13829 import urllib.request urllib.request.proxy_bypass_macosx_sysconf = lambda host: None urllib.request.getproxies_macosx_sysconf = dict
[docs] def run(self) -> None: # use the parent's process stdin to be able to provide pdb correctly if hasattr(self, 'stdin_fileno'): sys.stdin = os.fdopen(self.stdin_fileno) # when pressing ctrl+c exit immediately signal.signal(signal.SIGINT, lambda *args: sys.exit(0)) # when pressing ctrl+t show the memory usage of the process if hasattr(signal, 'SIGINFO'): signal.signal(signal.SIGINFO, self.print_memory_stats) # reset the tty every time, fixing problems that might occur if # the process is restarted during a pdb session os.system('stty sane') try: if sys.platform == 'darwin': self.disable_systemwide_darwin_proxies() global RESOURCE_TRACKER RESOURCE_TRACKER = ResourceTracker( enable_tracemalloc=self.enable_tracemalloc) wsgi_application = WSGIRequestMonitorMiddleware(self.app_factory()) bjoern.listen(wsgi_application, self.host, self.port) except Exception: # if there's an error, print it print(traceback.format_exc()) # and just never start the server (but don't stop the # process either). this makes this work: # 1. save -> import error # 2. save corrected version -> server restarted while True: time.sleep(10.0) self._ready.value = 1 print(f'started onegov server on http://{self.host}:{self.port}') bjoern.run()
[docs] class WsgiServer(FileSystemEventHandler): """ Wraps the WSGI process, providing the ability to restart the process and acting as an event-handler for watchdog. """ def __init__( self, app_factory: 'Callable[[], WSGIApplication]', host: str = '127.0.0.1', port: int = 8080, **kwargs: Any ):
[docs] self.app_factory = app_factory
[docs] self._host = host
[docs] self._port = port
[docs] self.kwargs = kwargs
[docs] def spawn(self) -> WsgiProcess: return WsgiProcess(self.app_factory, self._host, self._port, { 'ONEGOV_DEVELOPMENT': '1', 'CHAMELEON_CACHE': '.chameleon_cache' }, **self.kwargs)
[docs] def join(self, timeout: float | None = None) -> None: try: self.process.join(timeout) except Exception: # ignore errors such as not yet started, process already finished # or already closed process objects - it's used for debug anyway log.warning('Could not join')
[docs] def start(self) -> None: self.process = self.spawn() self.process.start()
[docs] def restart(self) -> None: self.stop(block=True) self.start()
[docs] def stop(self, block: bool = False) -> None: self.process.terminate() if block: self.join()
[docs] def on_any_event(self, event: 'FileSystemEvent') -> None: """ If anything of significance changed, restart the process. """ if getattr(event, 'event_type', None) in ('opened', 'closed_no_write'): return src_path = event.src_path assert isinstance(src_path, str) if 'tests/' in src_path: return if event.is_directory: return if src_path.endswith('pyc'): return if src_path.endswith('scss'): return if src_path.endswith('pt'): return if src_path.endswith('.rdb'): return if '/.testmondata' in src_path: return if '/.git' in src_path: return if '/__pycache__' in src_path: return if '/onegov.server' in src_path: return if '/file-storage' in src_path: return if '/mails' in src_path: return if '/profiles' in src_path: return if '.webassets-cache' in src_path: return if 'assets/bundles' in src_path: return if 'onegov.sublime' in src_path: return if '.cache' in src_path: return if src_path.endswith('~'): return print(f'changed: {src_path}') self.restart()