import asyncio import logging import random from typing import Optional from hopper.countdown_timer import CountdownTimer from hopper.enums import Direction, GameState, PlayerMoveResult, PlayerState from hopper.errors import Collision, GameLockForMovement, PositionOutOfBounds from hopper.models.board import ( BOARD_DUMP_CHARS, BoardLayout, Destination, GameBoard, Layer, LayerObject, ObjectType, create_random_position, ) from hopper.models.player import Player, PlayerList, Position from hopper.watchdog import InactivityWatchdog from hopper.ws_server import WSServer from settings import settings def create_player_start_position(board_width: int, board_height: int) -> Position: """Create random position somewhere on the board border""" border_len = (board_width + board_height) * 2 rnd_position = random.randint(0, border_len - 1) if rnd_position < board_width * 2: x = rnd_position % board_width y = 0 if rnd_position < board_width else board_height - 1 else: rnd_position -= 2 * board_width x = 0 if rnd_position < board_height else board_width - 1 y = rnd_position % board_height return Position(x=x, y=y) class GameEngine: def __init__(self, board: GameBoard, ws_server: WSServer = None) -> None: self.board = board self.ws_server = ws_server self.players = PlayerList() self._inacivity_watchdog = None self._purchase_countdown_timer: Optional[CountdownTimer] = None self.game_state = GameState.RUNNING self.__debug_print_board() def dump_board(self) -> list[list[str]]: dump = self.board.dump() for player in self.players: show_player = ( player.active and player.position.y < len(dump) and player.position.x < len(dump[player.position.y]) ) if show_player: dump[player.position.y][player.position.x] = BOARD_DUMP_CHARS[ ObjectType.PLAYER ] return dump def __debug_print_board(self): if not (settings.debug and settings.debug.PRINT_BOARD): return for line in self.dump_board(): print(" ".join(line)) def _start_inactivity_watchdog(self) -> None: if not self._inacivity_watchdog: self._inacivity_watchdog = InactivityWatchdog( players=self.players, ws_server=self.ws_server, ) self._inacivity_watchdog.start() async def send_game_dump(self): self.__debug_print_board() await self.ws_server.send_game_dump() async def reset_game(self) -> None: self.__debug_print_board() self.game_state = GameState.RUNNING self.players.clear() await self.send_game_dump() async def start_game_for_player(self, player_name: str) -> Player: self._start_inactivity_watchdog() player = Player( name=player_name, position=create_player_start_position(self.board.width, self.board.height), state=PlayerState.CREATED, ) self.players.append(player) logging.info(f"Starting new game for player: {player}") self.__debug_print_board() await self.send_game_dump() await asyncio.sleep(settings.game.MOVE_DELAY) return player def _move_position(self, position: Position, direction: Direction) -> Position: new_position = Position(position.x, position.y) if direction == Direction.LEFT: new_position.x -= 1 elif direction == Direction.RIGHT: new_position.x += 1 elif direction == Direction.UP: new_position.y -= 1 elif direction == Direction.DOWN: new_position.y += 1 else: raise ValueError(f"Unhandled direction: {direction}") return new_position async def move_player( self, player: Player, direction: Direction ) -> PlayerMoveResult: player.reset_timeout() if self.game_state == GameState.LOCK_FOR_MOVEMENT: raise GameLockForMovement("Player reached destination. Can't move anymore.") # player will not be able to move once they reach the destination if player.state == PlayerState.ON_DESTINATION: return PlayerMoveResult.DESTINATION_REACHED logging.info(f"Player {player} move to {direction}") new_position = self._move_position(player.position, direction) player.move_attempt_count += 1 player.state = PlayerState.MOVING if not self._position_in_board_bounds(new_position): raise PositionOutOfBounds() if self._colided_with_obstacle(new_position): raise Collision() player.position = new_position player.move_count += 1 if self._is_player_on_destination(player): player.state = PlayerState.ON_DESTINATION await self._player_on_destination(player) return PlayerMoveResult.DESTINATION_REACHED await self.send_game_dump() await asyncio.sleep(settings.game.MOVE_DELAY) return PlayerMoveResult.OK def _is_player_on_destination(self, player: Player) -> bool: return player.position == self.board.destination.position def _position_in_board_bounds(self, position: Position) -> bool: return ( 0 <= position.x < self.board.width and 0 <= position.y < self.board.height ) def _colided_with_obstacle(self, position: Position) -> bool: return self.board.get_object_at_position(position) is not None async def _player_on_destination(self, player: Player) -> None: logging.info(f"Player {player} reached destination!") self.game_state = GameState.LOCK_FOR_MOVEMENT await self.send_game_dump() await self.ws_server.send_player_reached_destination_message(player=player) logging.info( f"Starting product selection countdown timer for {settings.purchase_timeout} seconds" ) def on_purchase_timer_tick(time_left) -> None: logging.info(f"Product selection countdown timer tick, time left: {time_left}") def on_purchase_timer_done() -> None: logging.info("Ding ding! Product selection countdown timer timeout") self._purchase_countdown_timer = None asyncio.run( self.ws_server.send_product_selection_timeout_message() ) self.game_state = GameState.RUNNING asyncio.run(self.send_game_dump()) self._purchase_countdown_timer = CountdownTimer( seconds=settings.purchase_timeout, timer_tick_callback=on_purchase_timer_tick, timer_done_callback=on_purchase_timer_done, ) self._purchase_countdown_timer.start() await asyncio.sleep(settings.game.PURCHASE_START_DELAY) async def product_selection_done(self) -> None: logging.info("Product selection done, unlocking game") if self._purchase_countdown_timer: self._purchase_countdown_timer.stop() await self.ws_server.send_product_selection_done_message() await self.reset_game() def _reset_player(self, player) -> None: # move player to start position player.position = create_player_start_position( self.board.width, self.board.height ) player.state = PlayerState.CREATED player.last_seen = None def get_board_layout(self) -> BoardLayout: return BoardLayout(board=self.board, players=self.players) class GameEngineFactory: @staticmethod def create( board_width: int, board_height: int, obstacle_count: int = 0, ws_server: WSServer = None, ) -> GameEngine: board = GameBoard( width=board_width, height=board_height, destination=Destination(Position(board_width // 2, board_height // 2)), ) obstacle_layer = Layer(name="obstacles") for _ in range(obstacle_count): obstacle_layer.objects.append( LayerObject( type_=ObjectType.OBSTACLE, position=create_random_position(board_width, board_height), ), ) board.layers.append(obstacle_layer) game = GameEngine( board=board, ws_server=ws_server, ) GameEngineFactory.__add_test_players(game.players) return game @staticmethod def create_default( ws_server: WSServer = None, ) -> GameEngine: return GameEngineFactory.create( board_width=settings.board.WIDTH, board_height=settings.board.HEIGHT, obstacle_count=settings.board.OBSTACLE_COUNT, ws_server=ws_server, ) @staticmethod def __add_test_players(players: PlayerList) -> None: if not settings.debug: return for player in settings.debug.PLAYERS: players.append(player) logging.info(f"Test player created: {player}")