Files
litestar-machines-test/app/controllers/machine.py
Eden Kirin 03c8aaa312 Cleanup
2023-08-27 11:29:12 +02:00

63 lines
1.6 KiB
Python

from typing import TYPE_CHECKING, Optional
from litestar import Controller, get
from litestar.contrib.repository.filters import LimitOffset, SearchFilter
from litestar.di import Provide
from sqlalchemy.ext.asyncio import AsyncSession
from app.domain.machine import (
Machine,
MachineReadDTO,
MachineWriteDTO,
Repository,
Service,
)
from app.lib.responses import ObjectListResponse, ObjectResponse
if TYPE_CHECKING:
from sqlalchemy.ext.asyncio import AsyncSession
DETAIL_ROUTE = "/{machine_id:int}"
def provides_service(db_session: AsyncSession) -> Service:
"""Constructs repository and service objects for the request."""
return Service(Repository(session=db_session))
class MachineController(Controller):
dto = MachineWriteDTO
return_dto = MachineReadDTO
path = "/machines"
dependencies = {
"service": Provide(provides_service, sync_to_thread=False),
}
tags = ["Machines"]
@get()
async def get_machines(
self, service: Service, search: Optional[str] = None
) -> ObjectListResponse[Machine]:
filters = [
LimitOffset(limit=20, offset=0),
]
if search is not None:
filters.append(
SearchFilter(
field_name="caption",
value=search,
),
)
content = await service.list(*filters)
return ObjectListResponse(content=content)
@get(DETAIL_ROUTE)
async def get_machine(
self, service: Service, machine_id: int
) -> ObjectResponse[Machine]:
content = await service.get(machine_id)
return ObjectResponse(content=content)