105 lines
3.2 KiB
Python
105 lines
3.2 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from typing import TYPE_CHECKING, Optional, cast, TypeVar, Type, Generic
|
|
|
|
from litestar import Controller, delete, get, post, put
|
|
from litestar.contrib.repository.filters import LimitOffset, SearchFilter, FilterTypes
|
|
from litestar.di import Provide
|
|
from litestar.pagination import (
|
|
AbstractAsyncOffsetPaginator,
|
|
AbstractSyncClassicPaginator,
|
|
)
|
|
from litestar.status_codes import HTTP_200_OK
|
|
from sqlalchemy import ScalarResult, func, select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.domain.machine import (
|
|
Machine,
|
|
MachineReadDTO,
|
|
MachineWriteDTO,
|
|
Repository,
|
|
Service,
|
|
)
|
|
from app.lib.responses import ObjectListResponse, ObjectResponse
|
|
|
|
if TYPE_CHECKING:
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
|
|
DETAIL_ROUTE = "/{machine_id:int}"
|
|
|
|
|
|
def provides_service(db_session: AsyncSession) -> Service:
|
|
"""Constructs repository and service objects for the request."""
|
|
return Service(Repository(session=db_session))
|
|
|
|
|
|
# class MachineOffsetPaginator(AbstractAsyncOffsetPaginator[Machine]):
|
|
# def __init__(
|
|
# self, async_session: AsyncSession
|
|
# ) -> None: # 'async_session' dependency will be injected here.
|
|
# self.async_session = async_session
|
|
#
|
|
# async def get_total(self) -> int:
|
|
# return cast(
|
|
# "int", await self.async_session.scalar(select(func.count(Machine.id)))
|
|
# )
|
|
#
|
|
# async def get_items(self, limit: int, offset: int) -> list[Machine]:
|
|
# people: ScalarResult = await self.async_session.scalars(
|
|
# select(Machine).slice(offset, limit)
|
|
# )
|
|
# return list(people.all())
|
|
|
|
|
|
class MachineController(Controller):
|
|
dto = MachineWriteDTO
|
|
return_dto = MachineReadDTO
|
|
path = "/machines"
|
|
dependencies = {
|
|
"service": Provide(provides_service, sync_to_thread=False),
|
|
}
|
|
tags = ["Machines"]
|
|
|
|
@get()
|
|
async def get_machines(
|
|
self, service: Service, search: Optional[str] = None
|
|
) -> ObjectListResponse[Machine]:
|
|
filters = [
|
|
LimitOffset(limit=20, offset=0),
|
|
]
|
|
|
|
if search is not None:
|
|
filters.append(
|
|
SearchFilter(
|
|
field_name="caption",
|
|
value=search,
|
|
),
|
|
)
|
|
|
|
content = await service.list(*filters)
|
|
return ObjectListResponse(content=content)
|
|
|
|
# @post()
|
|
# async def create_author(self, data: Machine, service: Service) -> Machine:
|
|
# return await service.create(data)
|
|
#
|
|
@get(DETAIL_ROUTE)
|
|
async def get_machine(self, service: Service, machine_id: int) -> ObjectResponse[Machine]:
|
|
content = await service.get(machine_id)
|
|
return ObjectResponse(content=content)
|
|
|
|
#
|
|
# @put(DETAIL_ROUTE)
|
|
# async def update_author(
|
|
# self, data: Machine, service: Service, author_id: UUID
|
|
# ) -> Machine:
|
|
# """Update an author."""
|
|
# return await service.update(author_id, data)
|
|
#
|
|
# @delete(DETAIL_ROUTE, status_code=HTTP_200_OK)
|
|
# async def delete_author(self, service: Service, author_id: UUID) -> Machine:
|
|
# """Delete Author by ID."""
|
|
# return await service.delete(author_id)
|