Files
fairhopper/hopper/engine.py
Eden Kirin fa2aee881d Cleanups
2023-03-26 23:59:15 +02:00

219 lines
6.9 KiB
Python

import asyncio
import logging
import random
from typing import Optional
from hopper.enums import Direction, PlayerMoveResult
from hopper.errors import Collision, PositionOutOfBounds
from hopper.interfaces import SendGameStateInterface
from hopper.models.board import (
BOARD_DUMP_CHARS,
BoardLayout,
Destination,
GameBoard,
Layer,
LayerObject,
ObjectType,
create_random_position,
)
from hopper.models.player import Player, PlayerList, Position
from hopper.watchdog import InactivityWatchdog
from settings import settings
class GameEngine:
def __init__(
self, board: GameBoard, ws_server: Optional[SendGameStateInterface] = None
) -> None:
self.board = board
self.ws_server = ws_server
self.players = PlayerList()
self._inacivity_watchdog = None
self.__debug_print_board()
def dump_board(self) -> list[list[str]]:
dump = self.board.dump()
for player in self.players:
if player.position.y < len(dump) and player.position.x < len(
dump[player.position.y]
):
dump[player.position.y][player.position.x] = BOARD_DUMP_CHARS[
ObjectType.PLAYER
]
return dump
def __debug_print_board(self):
if not (settings.debug and settings.debug.PRINT_BOARD):
return
for line in self.dump_board():
print(" ".join(line))
def _start_inactivity_watchdog(self) -> None:
if not self._inacivity_watchdog:
self._inacivity_watchdog = InactivityWatchdog(
players=self.players,
ws_server=self.ws_server,
daemon=True,
)
self._inacivity_watchdog.start()
async def start_game(self, player_name: str) -> Player:
self._start_inactivity_watchdog()
player = Player(
name=player_name,
position=self._create_player_start_position(),
)
self.players.append(player)
logging.info(f"Starting new game for player: {player}")
self.__debug_print_board()
if self.ws_server:
await self.ws_server.send_game_state()
await asyncio.sleep(settings.game.MOVE_DELAY)
return player
def _create_player_start_position(self) -> Position:
"""Create random position somewhere on the board border"""
border_len = (self.board.width + self.board.height) * 2
rnd_position = random.randint(0, border_len - 1)
if rnd_position < self.board.width * 2:
x = rnd_position % self.board.width
y = 0 if rnd_position < self.board.width else self.board.height - 1
else:
rnd_position -= 2 * self.board.width
x = 0 if rnd_position < self.board.height else self.board.width - 1
y = rnd_position % self.board.height
return Position(x=x, y=y)
def _move_position(self, position: Position, direction: Direction) -> Position:
new_position = Position(position.x, position.y)
if direction == Direction.LEFT:
new_position.x -= 1
elif direction == Direction.RIGHT:
new_position.x += 1
elif direction == Direction.UP:
new_position.y -= 1
elif direction == Direction.DOWN:
new_position.y += 1
else:
raise ValueError(f"Unhandled direction: {direction}")
return new_position
async def move_player(
self, player: Player, direction: Direction
) -> PlayerMoveResult:
player.reset_timeout()
# player will not be able to move once they reach the destination
if player.reached_destination:
return PlayerMoveResult.DESTINATION_REACHED
logging.info(f"Player {player} move to {direction}")
new_position = self._move_position(player.position, direction)
player.move_attempt_count += 1
if not self._position_in_board_bounds(new_position):
raise PositionOutOfBounds()
if self._colided_with_obstacle(new_position):
raise Collision()
player.position = new_position
player.move_count += 1
if self._is_player_on_destination(player):
player.reached_destination = True
logging.info(f"Player {player} reached destination!")
if self.ws_server:
await self.ws_server.send_game_state()
self.__debug_print_board()
if player.reached_destination:
return PlayerMoveResult.DESTINATION_REACHED
await asyncio.sleep(settings.game.MOVE_DELAY)
return PlayerMoveResult.OK
def _is_player_on_destination(self, player: Player) -> bool:
return player.position == self.board.destination.position
def _position_in_board_bounds(self, position: Position) -> bool:
return (
0 <= position.x < self.board.width and 0 <= position.y < self.board.height
)
def _colided_with_obstacle(self, position: Position) -> bool:
return self.board.get_object_at_position(position) is not None
def get_board_layout(self) -> BoardLayout:
return BoardLayout(board=self.board, players=self.players)
class GameEngineFactory:
@staticmethod
def create(
board_width: int,
board_height: int,
obstacle_count: int = 0,
ws_server: Optional[SendGameStateInterface] = None,
) -> GameEngine:
board = GameBoard(
width=board_width,
height=board_height,
destination=Destination(Position(board_width // 2, board_height // 2)),
)
obstacle_layer = Layer(name="obstacles")
for _ in range(obstacle_count):
obstacle_layer.objects.append(
LayerObject(
type_=ObjectType.OBSTACLE,
position=create_random_position(board_width, board_height),
),
)
board.layers.append(obstacle_layer)
game = GameEngine(
board=board,
ws_server=ws_server,
)
GameEngineFactory.__add_test_player(game.players)
return game
@staticmethod
def create_default(
ws_server: Optional[SendGameStateInterface] = None,
) -> GameEngine:
return GameEngineFactory.create(
board_width=settings.board.WIDTH,
board_height=settings.board.HEIGHT,
obstacle_count=settings.board.OBSTACLE_COUNT,
ws_server=ws_server,
)
@staticmethod
def __add_test_player(players: PlayerList) -> None:
if not (settings.debug and settings.debug.CREATE_TEST_PLAYER):
return
player = Player(
name="Pero",
uuid="test-player-id",
position=Position(
settings.debug.TEST_PLAYER_START_X,
settings.debug.TEST_PLAYER_START_Y,
),
can_be_deactivated=False,
)
players.append(player)
logging.info(f"Test player created: {player}")