import asyncio import logging from threading import Thread from typing import Iterable, Optional import websockets from websockets import WebSocketServerProtocol from websockets.exceptions import ConnectionClosedError, ConnectionClosedOK from hopper.models.player import Player from hopper.models.product import Product from hopper.models.ws_dto import ( GameDumpDto, PlayerReachedDestinationDto, ProductPurchaseTimerDto, WSGameDumpMessage, WSMessage, WSPlayerReachedDestinationMessage, WSProductPurchaseTimerTickMessage, ) class WSServer(Thread): def __init__(self, host: str, port: int) -> None: self.host = host self.port = port super().__init__(daemon=True) async def handler(self, websocket: WebSocketServerProtocol) -> None: """New handler instance spawns for each connected client""" self.connected_clients.add(websocket) logging.info(f"Add client: {websocket.id}") try: # send initial game dump to connected client await self.send_game_dump_to_client(websocket) # loop and do nothing while client is connected connected = True while connected: try: # we're expecting nothing from client, but read if client sends a message await websocket.recv() except ConnectionClosedOK: logging.info(f"Connection closed OK for client: {websocket.id}") connected = False except ConnectionClosedError: logging.info(f"Connection closed error for client: {websocket.id}") connected = False finally: self.connected_clients.remove(websocket) logging.info(f"Remove client: {websocket.id}") async def send_message_to_client( self, client: WebSocketServerProtocol, message: WSMessage ) -> None: message_str = message.to_str() logging.debug( f"Sending message {message.message} to clients: {self.connected_clients}: {message_str}" ) await client.send(message_str) async def send_message_to_clients(self, message: WSMessage) -> None: for client in self.connected_clients: await self.send_message_to_client(client, message) def _create_game_dump_message(self) -> WSGameDumpMessage: # avoid circular imports from hopper.api.dependencies import get_game_engine engine = get_game_engine() game_dump = GameDumpDto( board=engine.board, destination=engine.board.destination, players=engine.players, layers=engine.get_board_layout().layers, ) return WSGameDumpMessage(data=game_dump) async def send_game_dump_to_client( self, websocket: WebSocketServerProtocol ) -> None: """Send game dump to the client""" message = self._create_game_dump_message() logging.debug(f"Sending game dump to client: {websocket.id}") await websocket.send(message.to_str()) async def send_game_dump(self) -> None: """Broadcast game state to all connected clients""" message = self._create_game_dump_message() await self.send_message_to_clients(message) async def send_player_reached_destination_message(self, player: Player) -> None: message = WSPlayerReachedDestinationMessage( data=PlayerReachedDestinationDto( player=player, ) ) await self.send_message_to_clients(message) async def send_product_purchase_time_left_message( self, player: Player, time_left: int ) -> None: message = WSProductPurchaseTimerTickMessage( data=ProductPurchaseTimerDto( player=player, time_left=time_left, ) ) await self.send_message_to_clients(message) async def send_product_purchase_done_message( self, player: Player, product: Optional[Product] = None ) -> None: # message = WSProductPurchaseDoneMessage( # data=ProductPurchaseDoneDto(player=player, product=product), # ) # await self.send_message_to_clients(message) ... async def run_async(self) -> None: logging.info( f"Starting FairHopper Websockets Server on {self.host}:{self.port}" ) async with websockets.serve( ws_handler=self.handler, host=self.host, port=self.port, ): await asyncio.Future() # run forever def run(self) -> None: self.connected_clients = set[WebSocketServerProtocol]() asyncio.run(self.run_async())