Files
litestar-machines-test/app/domain/machine.py
2023-08-27 11:08:14 +02:00

48 lines
1.4 KiB
Python

from __future__ import annotations
from typing import Annotated
from litestar.contrib.repository import FilterTypes
from litestar.contrib.sqlalchemy.base import BigIntBase
from litestar.contrib.sqlalchemy.dto import SQLAlchemyDTO
from litestar.contrib.sqlalchemy.repository import SQLAlchemyAsyncRepository
from litestar.contrib.sqlalchemy.repository.types import SelectT
from litestar.dto import DTOConfig
from sqlalchemy import true
from sqlalchemy.orm import Mapped
from app.lib import service
class Machine(BigIntBase):
__tablename__ = "machines"
caption: Mapped[str]
enabled: Mapped[bool]
alive: Mapped[bool]
deleted: Mapped[bool]
external_id: Mapped[str]
class Repository(SQLAlchemyAsyncRepository[Machine]):
model_type = Machine
def _apply_filters(
self, *filters: FilterTypes, apply_pagination: bool = True, statement: SelectT
) -> SelectT:
statement = super()._apply_filters(
*filters, apply_pagination=apply_pagination, statement=statement
)
statement = statement.where(self.model_type.alive == true())
return statement
class Service(service.Service[Machine]):
repository_type = Repository
# write_config = DTOConfig(exclude={"created_at", "updated_at", "nationality"})
write_config = DTOConfig()
MachineWriteDTO = SQLAlchemyDTO[Annotated[Machine, write_config]]
MachineReadDTO = SQLAlchemyDTO[Machine]