Files
fairhopper/hopper/engine.py
Eden Kirin 806a379253 Demo SDK
2023-03-26 14:37:39 +02:00

215 lines
6.9 KiB
Python

import asyncio
import logging
import random
from typing import Optional
from hopper.enums import Direction, PlayerMoveResult
from hopper.errors import Collision, PositionOutOfBounds
from hopper.models.board import (
BOARD_DUMP_CHARS,
BoardLayout,
Destination,
GameBoard,
Layer,
LayerObject,
ObjectType,
create_random_position,
)
from hopper.models.player import Player, PlayerList, Position
from hopper.watchdog import InactivityWatchdog
from hopper.ws_server import WSServer
from settings import settings
class GameEngine:
def __init__(self, board: GameBoard, ws_server: Optional[WSServer] = None) -> None:
self.board = board
self.ws_server = ws_server
self.players = PlayerList()
self._inacivity_watchdog = None
self.__debug_print_board()
def dump_board(self) -> list[list[str]]:
dump = self.board.dump()
for player in self.players:
if player.position.y < len(dump) and player.position.x < len(
dump[player.position.y]
):
dump[player.position.y][player.position.x] = BOARD_DUMP_CHARS[
ObjectType.PLAYER
]
return dump
def __debug_print_board(self):
if not (settings.debug and settings.debug.PRINT_BOARD):
return
for line in self.dump_board():
print(" ".join(line))
def _start_inactivity_watchdog(self) -> None:
if not self._inacivity_watchdog:
self._inacivity_watchdog = InactivityWatchdog(
players=self.players,
ws_server=self.ws_server,
daemon=True,
)
self._inacivity_watchdog.start()
async def start_game(self, player_name: str) -> Player:
self._start_inactivity_watchdog()
player = Player(
name=player_name,
position=self._create_player_start_position(),
)
self.players.append(player)
logging.info(f"Starting new game for player: {player}")
self.__debug_print_board()
if self.ws_server:
await self.ws_server.send_game_state()
await asyncio.sleep(settings.game.MOVE_DELAY)
return player
def _create_player_start_position(self) -> Position:
"""Create random position somewhere on the board border"""
border_len = (self.board.width + self.board.height) * 2
rnd_position = random.randint(0, border_len - 1)
if rnd_position < self.board.width * 2:
x = rnd_position % self.board.width
y = 0 if rnd_position < self.board.width else self.board.height - 1
else:
rnd_position -= 2 * self.board.width
x = 0 if rnd_position < self.board.height else self.board.width - 1
y = rnd_position % self.board.height
return Position(x=x, y=y)
def _move_position(self, position: Position, direction: Direction) -> Position:
new_position = Position(position.x, position.y)
if direction == Direction.LEFT:
new_position.x -= 1
elif direction == Direction.RIGHT:
new_position.x += 1
elif direction == Direction.UP:
new_position.y -= 1
elif direction == Direction.DOWN:
new_position.y += 1
else:
raise ValueError(f"Unhandled direction: {direction}")
return new_position
async def move_player(
self, player: Player, direction: Direction
) -> PlayerMoveResult:
player.reset_timeout()
# player will not be able to move once they reach the destination
if player.reached_destination:
return PlayerMoveResult.DESTINATION_REACHED
logging.info(f"Player {player} move to {direction}")
new_position = self._move_position(player.position, direction)
player.move_attempt_count += 1
if not self._position_in_board_bounds(new_position):
raise PositionOutOfBounds()
if self._colided_with_obstacle(new_position):
raise Collision()
player.position = new_position
player.move_count += 1
if self._is_player_on_destination(player):
player.reached_destination = True
logging.info(f"Player {player} reached destination!")
if self.ws_server:
await self.ws_server.send_game_state()
self.__debug_print_board()
if player.reached_destination:
return PlayerMoveResult.DESTINATION_REACHED
await asyncio.sleep(settings.game.MOVE_DELAY)
return PlayerMoveResult.OK
def _is_player_on_destination(self, player: Player) -> bool:
return player.position == self.board.destination.position
def _position_in_board_bounds(self, position: Position) -> bool:
return (
0 <= position.x < self.board.width and 0 <= position.y < self.board.height
)
def _colided_with_obstacle(self, position: Position) -> bool:
return self.board.get_object_at_position(position) is not None
def get_board_layout(self) -> BoardLayout:
return BoardLayout(board=self.board, players=self.players)
class GameEngineFactory:
@staticmethod
def create(
board_width: int,
board_height: int,
obstacle_count: int = 0,
ws_server: Optional[WSServer] = None,
) -> GameEngine:
board = GameBoard(
width=board_width,
height=board_height,
destination=Destination(Position(board_width // 2, board_height // 2)),
)
obstacle_layer = Layer(name="obstacles")
for _ in range(obstacle_count):
obstacle_layer.objects.append(
LayerObject(
type_=ObjectType.OBSTACLE,
position=create_random_position(board_width, board_height),
),
)
board.layers.append(obstacle_layer)
game = GameEngine(
board=board,
ws_server=ws_server,
)
GameEngineFactory.__add_test_player(game.players)
return game
@staticmethod
def create_default(ws_server: Optional[WSServer] = None) -> GameEngine:
return GameEngineFactory.create(
board_width=settings.board.WIDTH,
board_height=settings.board.HEIGHT,
obstacle_count=settings.board.OBSTACLE_COUNT,
ws_server=ws_server,
)
@staticmethod
def __add_test_player(players: PlayerList) -> None:
if not (settings.debug and settings.debug.CREATE_TEST_PLAYER):
return
player = Player(
name="Pero",
uuid="test-player-id",
position=Position(
settings.debug.TEST_PLAYER_START_X,
settings.debug.TEST_PLAYER_START_Y,
),
can_be_deactivated=False,
)
players.append(player)
logging.info(f"Test player created: {player}")