Source code for websockets.client

from __future__ import annotations

from json import dumps
from json import loads
from onegov.websockets import log


from typing import TYPE_CHECKING
if TYPE_CHECKING:
    from onegov.core.types import JSON_ro
    from websockets.legacy.client import WebSocketClientProtocol


[docs] async def acknowledged(websocket: WebSocketClientProtocol) -> None: """ Wait for an OK from the server. """ message = await websocket.recv() try: assert loads(message)['type'] == 'acknowledged' except Exception as exception: # FIXME: technically message can be bytes log.error(f'Unexpected response: {message}') # type:ignore await websocket.close() raise OSError(message) from exception
[docs] async def register( websocket: WebSocketClientProtocol, schema: str, channel: str | None ) -> None: """ Registers for broadcast messages. """ await websocket.send( dumps({ 'type': 'register', 'schema': schema, 'channel': channel }) ) await acknowledged(websocket)
[docs] async def authenticate( websocket: WebSocketClientProtocol, token: str ) -> None: """ Authenticates with the given token. """ await websocket.send( dumps({ 'type': 'authenticate', 'token': token }) ) await acknowledged(websocket)
[docs] async def broadcast( websocket: WebSocketClientProtocol, schema: str, channel: str | None, message: JSON_ro ) -> None: """ Broadcasts the given message to all connected clients. Assumes prior authentication. """ await websocket.send( dumps({ 'type': 'broadcast', 'schema': schema, 'channel': channel, 'message': message }) ) await acknowledged(websocket)
[docs] async def status(websocket: WebSocketClientProtocol) -> str | None: """ Receives the status of the server. Assumes prior authentication. """ await websocket.send( dumps({ 'type': 'status', }) ) await acknowledged(websocket) message = await websocket.recv() try: payload = loads(message) assert payload['type'] == 'status' assert payload['message'] except Exception: # FIXME: technically message can be bytes log.error(f'Unexpected response: {message}') # type:ignore await websocket.close() return None else: return payload['message']