Source code for being.web.web_socket

"""Web socket proxy."""
import asyncio
import collections
import weakref

import aiohttp
from aiohttp import web
from aiohttp import WSMsgType

from being.serialization import dumps
from being.logging import get_logger


[docs]class WebSocket: """WebSocket connections. Interfaces with aiohttp web socket requests. Can hold multiple open web socket connections simultaneously. Also has a message queue / broker functionality to send messages from non-asyncio world. """ def __init__(self): self.sockets = weakref.WeakSet() """sockets: Active web socket connections.""" self.queue = collections.deque(maxlen=100) """queue: Message queue for synchronous senders.""" self.logger = get_logger('WebSocket') self.brokerTask = None
[docs] async def send_json(self, data): """Send data as JSON to all connected web sockets. Args: data: Data to send as JSON. """ for ws in self.sockets.copy(): if ws.closed: continue try: await ws.send_json(data, dumps=dumps) except ConnectionResetError as err: self.logger.exception(err)
[docs] def send_json_buffered(self, data): """Synchronous send_json(). Data goes into message queue and send at a later stage (if broker task is running). Args: data: Data to send as JSON. """ self.queue.append(data)
[docs] async def handle_new_connection(self, request) -> web.WebSocketResponse: """Aiohttp new web socket connection request handler.""" ws = web.WebSocketResponse(autoclose=True) await ws.prepare(request) self.logger.info('Opened web socket') self.sockets.add(ws) try: async for msg in ws: if msg.type == WSMsgType.ERROR: self.logger.error('Web socket error with exception %s', ws.exception()) break finally: self.logger.info('Discarding web socket') self.sockets.discard(ws) self.logger.debug('Web socket closed') return ws
#pylint: disable=unused-argument
[docs] async def close_all_connections(self, app: web.Application = None): """Close all web sockets. Can be used with app.on_shutdown() / app.on_cleanup(). """ for ws in self.sockets.copy(): await ws.close(code=aiohttp.WSCloseCode.GOING_AWAY, message='Closing web socket')
[docs] async def broker_task(self): """Message broker task. Takes messages from queue and sends them over all open web socket connections. """ while True: for data in self.queue.copy(): await self.send_json(data) self.queue.popleft() await asyncio.sleep(.1)
#pylint: disable=unused-argument
[docs] async def start_broker(self, app: web.Application = None): """Start message broker task.""" await self.stop_broker() self.brokerTask = asyncio.create_task(self.broker_task())
#pylint: disable=unused-argument
[docs] async def stop_broker(self, app: web.Application = None): """Stop message broker task.""" if not self.brokerTask: return self.brokerTask.cancel() await self.brokerTask self.brokerTask = None