244 lines
8.1 KiB
Python
244 lines
8.1 KiB
Python
import asyncio
|
|
import logging
|
|
import random
|
|
from typing import Optional
|
|
|
|
from hopper.countdown_timer import CountdownTimer
|
|
from hopper.enums import Direction, GameState, PlayerMoveResult, PlayerState
|
|
from hopper.errors import Collision, GameLockForMovement, PositionOutOfBounds
|
|
from hopper.interfaces import SendGameDumpInterface
|
|
from hopper.models.board import (
|
|
BOARD_DUMP_CHARS,
|
|
BoardLayout,
|
|
Destination,
|
|
GameBoard,
|
|
Layer,
|
|
LayerObject,
|
|
ObjectType,
|
|
create_random_position,
|
|
)
|
|
from hopper.models.player import Player, PlayerList, Position
|
|
from hopper.watchdog import InactivityWatchdog
|
|
from settings import settings
|
|
|
|
|
|
class GameEngine:
|
|
def __init__(
|
|
self, board: GameBoard, ws_server: Optional[SendGameDumpInterface] = None
|
|
) -> None:
|
|
self.board = board
|
|
self.ws_server = ws_server
|
|
self.players = PlayerList()
|
|
self._inacivity_watchdog = None
|
|
self._purchase_countdown_timer: Optional[CountdownTimer] = None
|
|
self.reset_game()
|
|
|
|
def dump_board(self) -> list[list[str]]:
|
|
dump = self.board.dump()
|
|
|
|
for player in self.players:
|
|
show_player = (
|
|
player.active
|
|
and player.position.y < len(dump)
|
|
and player.position.x < len(dump[player.position.y])
|
|
)
|
|
if show_player:
|
|
dump[player.position.y][player.position.x] = BOARD_DUMP_CHARS[
|
|
ObjectType.PLAYER
|
|
]
|
|
|
|
return dump
|
|
|
|
def __debug_print_board(self):
|
|
if not (settings.debug and settings.debug.PRINT_BOARD):
|
|
return
|
|
for line in self.dump_board():
|
|
print(" ".join(line))
|
|
|
|
def _start_inactivity_watchdog(self) -> None:
|
|
if not self._inacivity_watchdog:
|
|
self._inacivity_watchdog = InactivityWatchdog(
|
|
players=self.players,
|
|
ws_server=self.ws_server,
|
|
)
|
|
self._inacivity_watchdog.start()
|
|
|
|
def reset_game(self) -> None:
|
|
self.__debug_print_board()
|
|
self.game_state = GameState.RUNNING
|
|
|
|
async def start_game_for_player(self, player_name: str) -> Player:
|
|
self._start_inactivity_watchdog()
|
|
player = Player(
|
|
name=player_name,
|
|
position=self._create_player_start_position(),
|
|
state=PlayerState.CREATED,
|
|
)
|
|
self.players.append(player)
|
|
|
|
logging.info(f"Starting new game for player: {player}")
|
|
self.__debug_print_board()
|
|
|
|
if self.ws_server:
|
|
await self.ws_server.send_game_dump()
|
|
|
|
await asyncio.sleep(settings.game.MOVE_DELAY)
|
|
return player
|
|
|
|
def _create_player_start_position(self) -> Position:
|
|
"""Create random position somewhere on the board border"""
|
|
border_len = (self.board.width + self.board.height) * 2
|
|
rnd_position = random.randint(0, border_len - 1)
|
|
|
|
if rnd_position < self.board.width * 2:
|
|
x = rnd_position % self.board.width
|
|
y = 0 if rnd_position < self.board.width else self.board.height - 1
|
|
else:
|
|
rnd_position -= 2 * self.board.width
|
|
x = 0 if rnd_position < self.board.height else self.board.width - 1
|
|
y = rnd_position % self.board.height
|
|
|
|
return Position(x=x, y=y)
|
|
|
|
def _move_position(self, position: Position, direction: Direction) -> Position:
|
|
new_position = Position(position.x, position.y)
|
|
if direction == Direction.LEFT:
|
|
new_position.x -= 1
|
|
elif direction == Direction.RIGHT:
|
|
new_position.x += 1
|
|
elif direction == Direction.UP:
|
|
new_position.y -= 1
|
|
elif direction == Direction.DOWN:
|
|
new_position.y += 1
|
|
else:
|
|
raise ValueError(f"Unhandled direction: {direction}")
|
|
return new_position
|
|
|
|
async def move_player(
|
|
self, player: Player, direction: Direction
|
|
) -> PlayerMoveResult:
|
|
player.reset_timeout()
|
|
|
|
if self.game_state == GameState.LOCK_FOR_MOVEMENT:
|
|
raise GameLockForMovement("Player reached destination. Can't move anymore.")
|
|
|
|
# player will not be able to move once they reach the destination
|
|
if player.state == PlayerState.ON_DESTINATION:
|
|
return PlayerMoveResult.DESTINATION_REACHED
|
|
|
|
logging.info(f"Player {player} move to {direction}")
|
|
new_position = self._move_position(player.position, direction)
|
|
|
|
player.move_attempt_count += 1
|
|
player.state = PlayerState.MOVING
|
|
|
|
if not self._position_in_board_bounds(new_position):
|
|
raise PositionOutOfBounds()
|
|
|
|
if self._colided_with_obstacle(new_position):
|
|
raise Collision()
|
|
|
|
player.position = new_position
|
|
player.move_count += 1
|
|
|
|
if self._is_player_on_destination(player):
|
|
player.state = PlayerState.ON_DESTINATION
|
|
await self._player_on_destination(player)
|
|
return PlayerMoveResult.DESTINATION_REACHED
|
|
|
|
if self.ws_server:
|
|
await self.ws_server.send_game_dump()
|
|
|
|
self.__debug_print_board()
|
|
await asyncio.sleep(settings.game.MOVE_DELAY)
|
|
return PlayerMoveResult.OK
|
|
|
|
def _is_player_on_destination(self, player: Player) -> bool:
|
|
return player.position == self.board.destination.position
|
|
|
|
def _position_in_board_bounds(self, position: Position) -> bool:
|
|
return (
|
|
0 <= position.x < self.board.width and 0 <= position.y < self.board.height
|
|
)
|
|
|
|
def _colided_with_obstacle(self, position: Position) -> bool:
|
|
return self.board.get_object_at_position(position) is not None
|
|
|
|
async def _player_on_destination(self, player: Player) -> None:
|
|
logging.info(f"Player {player} reached destination!")
|
|
|
|
self.game_state = GameState.LOCK_FOR_MOVEMENT
|
|
await self.ws_server.send_game_dump()
|
|
self.__debug_print_board()
|
|
|
|
await self.ws_server.send_product_purchase_message(products=settings.products)
|
|
|
|
logging.info(f"Starting purchase countdown timer for {settings.purchase_timeout} seconds")
|
|
self._purchase_countdown_timer = CountdownTimer(
|
|
seconds=settings.purchase_timeout,
|
|
callback=self._on_purchase_timeout,
|
|
)
|
|
self._purchase_countdown_timer.start()
|
|
|
|
def _on_purchase_timeout(self) -> None:
|
|
logging.info("Ding ding! Purchase countdown timer timeout")
|
|
self._purchase_countdown_timer = None
|
|
|
|
asyncio.run(self.ws_server.send_product_purchase_done_message(product=None))
|
|
|
|
self.game_state = GameState.RUNNING
|
|
|
|
def get_board_layout(self) -> BoardLayout:
|
|
return BoardLayout(board=self.board, players=self.players)
|
|
|
|
|
|
class GameEngineFactory:
|
|
@staticmethod
|
|
def create(
|
|
board_width: int,
|
|
board_height: int,
|
|
obstacle_count: int = 0,
|
|
ws_server: Optional[SendGameDumpInterface] = None,
|
|
) -> GameEngine:
|
|
board = GameBoard(
|
|
width=board_width,
|
|
height=board_height,
|
|
destination=Destination(Position(board_width // 2, board_height // 2)),
|
|
)
|
|
obstacle_layer = Layer(name="obstacles")
|
|
for _ in range(obstacle_count):
|
|
obstacle_layer.objects.append(
|
|
LayerObject(
|
|
type_=ObjectType.OBSTACLE,
|
|
position=create_random_position(board_width, board_height),
|
|
),
|
|
)
|
|
board.layers.append(obstacle_layer)
|
|
|
|
game = GameEngine(
|
|
board=board,
|
|
ws_server=ws_server,
|
|
)
|
|
GameEngineFactory.__add_test_players(game.players)
|
|
return game
|
|
|
|
@staticmethod
|
|
def create_default(
|
|
ws_server: Optional[SendGameDumpInterface] = None,
|
|
) -> GameEngine:
|
|
return GameEngineFactory.create(
|
|
board_width=settings.board.WIDTH,
|
|
board_height=settings.board.HEIGHT,
|
|
obstacle_count=settings.board.OBSTACLE_COUNT,
|
|
ws_server=ws_server,
|
|
)
|
|
|
|
@staticmethod
|
|
def __add_test_players(players: PlayerList) -> None:
|
|
if not settings.debug:
|
|
return
|
|
|
|
for player in settings.debug.PLAYERS:
|
|
players.append(player)
|
|
logging.info(f"Test player created: {player}")
|