Source code for api.token

import jwt

from base64 import b64decode
from datetime import timedelta
from onegov.api.models import ApiKey
from sedate import utcnow
from webob.exc import HTTPBadRequest


from typing import Any, TYPE_CHECKING
if TYPE_CHECKING:
    from onegov.core.request import CoreRequest


[docs] def jwt_decode(request: 'CoreRequest', token: str | bytes) -> Any: return jwt.decode(token, request.identity_secret, algorithms=['HS512'])
[docs] def jwt_encode(request: 'CoreRequest', payload: dict[str, Any]) -> str: iat = utcnow() # This has to be UTC, # not local exp = iat + timedelta(hours=1) claims = {'iat': iat, 'exp': exp} payload.update(claims) return jwt.encode(payload, request.identity_secret, algorithm='HS512')
[docs] def get_token(request: 'CoreRequest') -> dict[str, str]: key = try_get_encoded_token(request) api_key = request.session.query(ApiKey).filter_by(key=key).one() today = utcnow() api_key.last_used = today payload = { 'id': str(api_key.id), } return {'token': jwt_encode(request, payload)}
[docs] def try_get_encoded_token(request: 'CoreRequest') -> str: assert request.authorization is not None assert isinstance(request.authorization.params, str) if request.authorization.authtype == 'Basic': auth = b64decode( request.authorization.params.strip() ).decode('utf-8') auth, _ = auth.split(':', 1) return auth elif request.authorization.authtype == 'Bearer': return request.authorization.params else: raise HTTPBadRequest('Unsupported authorization scheme')