import asyncio import json import logging from threading import Thread import websockets from websockets import WebSocketServerProtocol from websockets.exceptions import ConnectionClosedError, ConnectionClosedOK from hopper.models.player import Player from hopper.models.ws_dto import ( GameDumpDto, PlayerReachedDestinationDto, WSGameDumpMessage, WSMessage, WSPlayerReachedDestinationMessage, WSProductSelectionDoneMessage, WSProductSelectionTimeoutMessage, ) class WSServer(Thread): def __init__(self, host: str, port: int) -> None: self.host = host self.port = port super().__init__(daemon=True) async def handle_rcv_message( self, client: WebSocketServerProtocol, raw_message: str ) -> None: try: ws_message = json.loads(raw_message) except Exception as ex: logging.error( f"Error decoding WS message from {client.id} {raw_message}: {ex}" ) return None data_message = ws_message.get("message") if data_message == WSProductSelectionDoneMessage.message_type: await self.handle_rcv_product_selection_done(client) async def handle_rcv_product_selection_done( self, client: WebSocketServerProtocol ) -> None: logging.info(f"Handle WSProductSelectionDoneMessage: {client.id}") # avoid circular imports from hopper.api.dependencies import get_game_engine engine = get_game_engine() await engine.product_selection_done() async def handler(self, websocket: WebSocketServerProtocol) -> None: """New handler instance spawns for each connected client""" self.connected_clients.add(websocket) logging.info(f"Add client: {websocket.id}") try: # send initial game dump to connected client await self.send_game_dump_to_client(websocket) # loop and do nothing while client is connected connected = True while connected: try: # we're expecting nothing from client, but read if client sends a message rcv_data = await websocket.recv() await self.handle_rcv_message( client=websocket, raw_message=rcv_data ) except ConnectionClosedOK: logging.info(f"Connection closed OK for client: {websocket.id}") connected = False except ConnectionClosedError: logging.info(f"Connection closed error for client: {websocket.id}") connected = False finally: self.connected_clients.remove(websocket) logging.info(f"Remove client: {websocket.id}") async def send_message_to_client( self, client: WebSocketServerProtocol, message: WSMessage ) -> None: message_str = message.to_str() logging.debug( f"Sending message {message.message} to clients: {self.connected_clients}: {message_str}" ) await client.send(message_str) async def send_message_to_clients(self, message: WSMessage) -> None: for client in self.connected_clients: await self.send_message_to_client(client, message) def _create_game_dump_message(self) -> WSGameDumpMessage: # avoid circular imports from hopper.api.dependencies import get_game_engine engine = get_game_engine() game_dump = GameDumpDto( board=engine.board, destination=engine.board.destination, players=engine.players, layers=engine.get_board_layout().layers, ) return WSGameDumpMessage(data=game_dump) async def send_game_dump_to_client( self, websocket: WebSocketServerProtocol ) -> None: """Send game dump to the client""" message = self._create_game_dump_message() logging.debug(f"Sending game dump to client: {websocket.id}") await websocket.send(message.to_str()) async def send_game_dump(self) -> None: """Broadcast game state to all connected clients""" message = self._create_game_dump_message() await self.send_message_to_clients(message) async def send_player_reached_destination_message(self, player: Player) -> None: message = WSPlayerReachedDestinationMessage( data=PlayerReachedDestinationDto( player=player, ) ) await self.send_message_to_clients(message) async def send_product_selection_done_message(self) -> None: message = WSProductSelectionDoneMessage() await self.send_message_to_clients(message) async def send_product_selection_timeout_message(self) -> None: message = WSProductSelectionTimeoutMessage() await self.send_message_to_clients(message) async def run_async(self) -> None: logging.info( f"Starting FairHopper Websockets Server on {self.host}:{self.port}" ) async with websockets.serve( ws_handler=self.handler, host=self.host, port=self.port, ): await asyncio.Future() # run forever def run(self) -> None: self.connected_clients = set[WebSocketServerProtocol]() asyncio.run(self.run_async())