from __future__ import annotations
import requests
from base64 import b64encode
from typing import Any
[docs]
class GeverClientCAS:
""" Gever Client that uses CAS for authenticating.
Its purpose is to permit a user to access multiple applications
while providing their credentials (such as user ID and password)
only once.
"""
def __init__(self, username: str, password: str, service_url: str):
[docs]
self.portal_session = requests.Session()
[docs]
self.service_session = requests.Session()
self.portal_session.headers.update({'Accept': 'application/json'})
self.service_session.headers.update({'Accept': 'application/json'})
[docs]
self.username = username
[docs]
self.password = password
[docs]
self.service_url = service_url
last_char = service_url[-1]
[docs]
self.portal_url = (service_url + 'portal' if last_char == '/'
else service_url + '/portal')
[docs]
self.login_url = self.portal_url + '/api/login'
[docs]
self.ticket_url = self.portal_url + '/api/cas/tickets'
[docs]
def request(
self,
method: str,
url: str,
# FIXME: This should probably add annotations for the keyword
# arguments we're using
**kwargs: Any
) -> requests.Response:
# First request will always need to obtain a token first
if 'Authorization' not in self.service_session.headers:
self.obtain_token()
# Optimistically attempt to dispatch request
# url needs a slash at the end
response = self.service_session.request(method, url, **kwargs)
if self.token_has_expired(response):
# We got an 'Access token expired' response => refresh token
self.obtain_token()
# Re-dispatch the request that previously failed
response = self.service_session.request(method, url, **kwargs)
return response
[docs]
def token_has_expired(self, response: requests.Response) -> bool:
status = response.status_code
if status == 401:
return True
return False
[docs]
def obtain_token(self) -> None:
# Login to portal using /api/login endpoint
self.portal_session.post(
self.login_url,
json={'username': self.username, 'password': self.password}
)
# Get CSRF token that was returned by server in a cookie
csrf_token = self.portal_session.cookies['csrftoken']
# Send the CSRF token as a request header in subsequent requests
self.portal_session.headers.update({'X-CSRFToken': csrf_token})
self.portal_session.headers.update({'Referer': self.portal_url})
# Once logged in to the portal, get a CAS ticket
ticket_response = self.portal_session.post(
self.ticket_url,
json={'service': self.service_url + '/'}
)
ticket = ticket_response.json()['ticket']
# Use ticket to authenticate to the @caslogin endpoint on the service
login_response = self.portal_session.post(
self.service_url + '/@caslogin',
json={'ticket': ticket, 'service': self.service_url + '/'}
)
# Get the JWT token from the @caslogin response, and send
# it as a Bearer token in subsequent requests to the service
token = login_response.json()['token']
self.service_session.headers['Authorization'] = f'Bearer {token}'
[docs]
def upload_file(
self,
file: bytes,
filename: str | bytes,
endpoint: str
) -> requests.Response:
def _base64_str(s: str | bytes) -> str:
if not isinstance(s, bytes):
s = s.encode('utf-8')
return b64encode(s).decode('utf-8')
def _prepare_metadata(
filename: str | bytes,
content_type: str | bytes,
_type: str | bytes | None = None
) -> str:
return 'filename {},content-type {}{}'.format(
_base64_str(filename),
_base64_str(content_type),
', @type ' + _base64_str(_type) if _type else '',
)
file_size = len(file)
if not isinstance(file, bytes) or file_size == 0:
raise ValueError(f'Invalid Argument: {type(file)}')
metadata = _prepare_metadata(
filename, 'application/pdf', 'opengever.document.document'
)
headers = {
'Accept': 'application/json',
'Tus-Resumable': '1.0.0',
'Upload-Length': str(file_size),
'Upload-Metadata': metadata,
}
last_char = endpoint[-1]
endpoint += '@tus-upload' if last_char == '/' else '/@tus-upload'
resp = self.request(
'POST', endpoint, headers=headers
)
if not (resp.status_code == 201 and 'Location' in resp.headers.keys()):
return resp # fail early
# The server will return a temporary upload URL 'location' in header
location = resp.headers['Location']
headers = {
'Accept': 'application/json',
'Content-Type': 'application/offset+octet-stream',
'Upload-Offset': '0',
'Tus-Resumable': '1.0.0',
}
resp = self.request(
'PATCH', location, headers=headers, data=file
)
return resp