Source code for user.collections.tan

from hashlib import sha256
from secrets import choice

from onegov.core.collection import GenericCollection
from onegov.user.models import TAN
from onegov.user.models.tan import DEFAULT_EXPIRES_AFTER


from typing import cast, Any, TYPE_CHECKING
if TYPE_CHECKING:
    from datetime import datetime, timedelta
    from sqlalchemy.orm import Query, Session

[docs] class _GeneratedTAN(TAN):
[docs] tan: str
[docs] ALPHABET = 'ABCDEFGHIJKLMNPQRSTUVWXYZ123456789'
# NOTE: The same TAN can be generated multiple times, but this # should occur only rarely and since we only query the # TANs that haven't expired yet, it shouldn't cause any # issues.
[docs] def generate_tan() -> str: return ''.join(choice(ALPHABET) for _ in range(6))
[docs] class TANCollection(GenericCollection[TAN]): def __init__( self, session: 'Session', # NOTE: For disambiguating between e.g. second factor TAN # and temporary privilege escalation TAN scope: str, expires_after: 'timedelta' = DEFAULT_EXPIRES_AFTER, ): super().__init__(session)
[docs] self.expires_after = expires_after
[docs] self.scope = scope
@property
[docs] def model_class(self) -> type[TAN]: return TAN
[docs] def query(self) -> 'Query[TAN]': return self.session.query(TAN).filter( TAN.is_active(self.expires_after), TAN.scope == self.scope )
[docs] def hash_tan(self, tan: str) -> str: return sha256(tan.encode('utf-8')).hexdigest()
[docs] def add( # type:ignore[override] self, *, client: str, created: 'datetime | None' = None, # for testing **meta: Any ) -> '_GeneratedTAN': tan = generate_tan() obj = cast('_GeneratedTAN', TAN( hashed_tan=self.hash_tan(tan), created=created or TAN.timestamp(), client=client, scope=self.scope, meta=meta )) obj.tan = tan self.session.add(obj) self.session.flush() return obj
[docs] def by_client(self, client: str) -> 'Query[TAN]': return self.query().filter(TAN.client == client)
[docs] def by_tan(self, tan: str) -> TAN | None: hashed_tan = self.hash_tan(tan) return self.query().filter(TAN.hashed_tan == hashed_tan).first()