from typing import Any, AsyncGenerator from litestar import Litestar, get from litestar.contrib.sqlalchemy.plugins import SQLAlchemyAsyncConfig, SQLAlchemyPlugin from litestar.exceptions import ClientException from litestar.openapi import OpenAPIConfig from litestar.status_codes import HTTP_409_CONFLICT from sqlalchemy import select from sqlalchemy.engine import URL from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker from service.orm.machine import MachineORM sessionmaker = async_sessionmaker(expire_on_commit=False) @get("/") async def hello_world() -> str: return "Hello, world!" async def get_machine_list(session: AsyncSession) -> list[MachineORM]: query = select(MachineORM) # if done is not None: # query = query.where(MachineORM.done.is_(done)) result = await session.execute(query) return result.scalars().all() def serialize_todo(machine: MachineORM) -> dict[str, Any]: return {"id": machine.id, "caption": machine.caption} @get("/machines") async def get_machines(transaction: AsyncSession) -> list[dict[str, Any]]: return [serialize_todo(todo) for todo in await get_machine_list(transaction)] async def provide_transaction( db_session: AsyncSession, ) -> AsyncGenerator[AsyncSession, None]: print("AAAAAAAAAAAAAAAAAA") try: async with db_session.begin(): yield db_session except IntegrityError as exc: raise ClientException( status_code=HTTP_409_CONFLICT, detail=str(exc), ) from exc db_connection_string = URL.create( drivername="postgresql+asyncpg", username="televend", password="televend", host="localhost", port=5433, database="televend", ) db_config = SQLAlchemyAsyncConfig( connection_string=db_connection_string.render_as_string(hide_password=False) ) app = Litestar( route_handlers=[hello_world, get_machines], openapi_config=OpenAPIConfig(title="My API", version="1.0.0"), dependencies={"transaction": provide_transaction}, plugins=[SQLAlchemyPlugin(db_config)], debug=True, )