Working version

This commit is contained in:
Eden Kirin
2023-08-26 14:38:33 +02:00
parent f4882cfb0c
commit 7f1acec1af
16 changed files with 598 additions and 77 deletions

View File

@ -0,0 +1,15 @@
from litestar import Router
from app.domain.machine import Machine
from . import machines
__all__ = ["create_router"]
def create_router() -> Router:
return Router(
path="/v1",
route_handlers=[machines.MachineController,],
signature_namespace={"Machine": Machine,},
)

113
app/controllers/machines.py Normal file
View File

@ -0,0 +1,113 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Optional, cast
from litestar import Controller, delete, get, post, put
from litestar.di import Provide
from litestar.pagination import (
AbstractAsyncOffsetPaginator,
AbstractSyncClassicPaginator,
)
from litestar.status_codes import HTTP_200_OK
from sqlalchemy import ScalarResult, func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.domain.machine import MachineReadDTO, MachineWriteDTO, Repository, Service
from app.domain.machine import Machine
from litestar.contrib.repository.filters import SearchFilter, LimitOffset
if TYPE_CHECKING:
from sqlalchemy.ext.asyncio import AsyncSession
DETAIL_ROUTE = "/{machine_id:int}"
def provides_service(db_session: AsyncSession) -> Service:
"""Constructs repository and service objects for the request."""
return Service(Repository(session=db_session))
# class MachineOffsetPaginator(AbstractAsyncOffsetPaginator[Machine]):
# def __init__(
# self, async_session: AsyncSession
# ) -> None: # 'async_session' dependency will be injected here.
# self.async_session = async_session
#
# async def get_total(self) -> int:
# return cast(
# "int", await self.async_session.scalar(select(func.count(Machine.id)))
# )
#
# async def get_items(self, limit: int, offset: int) -> list[Machine]:
# people: ScalarResult = await self.async_session.scalars(
# select(Machine).slice(offset, limit)
# )
# return list(people.all())
class MachineController(Controller):
dto = MachineWriteDTO
return_dto = MachineReadDTO
path = "/machines"
dependencies = {
"service": Provide(provides_service, sync_to_thread=False),
}
tags = ["Machines"]
@get()
async def get_machines(
self, service: Service,
search: Optional[str] = None,
) -> list[Machine]:
"""Get a list of authors."""
print("#" * 100)
print(search)
print("#" * 100)
filters = [
LimitOffset(
limit=20,
offset=0
),
]
if search:
filters.append(
SearchFilter(
field_name="caption",
value=search,
),
)
return await service.list(*filters)
# @get()
# async def get_machines(
# self, service: Service, filters: list[FilterTypes]
# ) -> list[Machine]:
# """Get a list of authors."""
# return await service.list(*filters)
# @post()
# async def create_author(self, data: Machine, service: Service) -> Machine:
# return await service.create(data)
#
@get(DETAIL_ROUTE)
async def get_machine(self, service: Service, machine_id: int) -> Machine:
return await service.get(machine_id)
#
# @put(DETAIL_ROUTE)
# async def update_author(
# self, data: Machine, service: Service, author_id: UUID
# ) -> Machine:
# """Update an author."""
# return await service.update(author_id, data)
#
# @delete(DETAIL_ROUTE, status_code=HTTP_200_OK)
# async def delete_author(self, service: Service, author_id: UUID) -> Machine:
# """Delete Author by ID."""
# return await service.delete(author_id)

36
app/database.py Normal file
View File

@ -0,0 +1,36 @@
from typing import AsyncGenerator
from litestar.contrib.sqlalchemy.plugins import SQLAlchemyAsyncConfig
from litestar.exceptions import ClientException
from litestar.status_codes import HTTP_409_CONFLICT
from sqlalchemy import URL
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
async def provide_transaction(
db_session: AsyncSession,
) -> AsyncGenerator[AsyncSession, None]:
try:
async with db_session.begin():
yield db_session
except IntegrityError as exc:
raise ClientException(
status_code=HTTP_409_CONFLICT,
detail=str(exc),
) from exc
sessionmaker = async_sessionmaker(expire_on_commit=False)
db_connection_string = URL.create(
drivername="postgresql+asyncpg",
username="televend",
password="televend",
host="localhost",
port=5433,
database="televend",
)
db_config = SQLAlchemyAsyncConfig(
connection_string=db_connection_string.render_as_string(hide_password=False)
)

27
app/domain/machine.py Normal file
View File

@ -0,0 +1,27 @@
from typing import Annotated
from litestar.contrib.sqlalchemy.base import BigIntBase
from litestar.contrib.sqlalchemy.dto import SQLAlchemyDTO
from litestar.contrib.sqlalchemy.repository import SQLAlchemyAsyncRepository
from litestar.dto import DTOConfig
from sqlalchemy.orm import Mapped
from app.lib import service
class Machine(BigIntBase):
__tablename__ = "machines"
caption: Mapped[str]
class Repository(SQLAlchemyAsyncRepository[Machine]):
model_type = Machine
class Service(service.Service[Machine]):
repository_type = Repository
# write_config = DTOConfig(exclude={"created_at", "updated_at", "nationality"})
write_config = DTOConfig()
MachineWriteDTO = SQLAlchemyDTO[Annotated[Machine, write_config]]
MachineReadDTO = SQLAlchemyDTO[Machine]

0
app/lib/__init__.py Normal file
View File

150
app/lib/dependencies.py Normal file
View File

@ -0,0 +1,150 @@
from datetime import datetime
from uuid import UUID
from litestar.contrib.repository.filters import BeforeAfter, CollectionFilter, FilterTypes, LimitOffset
from litestar.di import Provide
from litestar.params import Dependency, Parameter
DEFAULT_PAGINATION_LIMIT = 20
__all__ = [
"create_collection_dependencies",
"provide_created_filter",
"provide_filter_dependencies",
"provide_id_filter",
"provide_limit_offset_pagination",
"provide_updated_filter",
]
DTorNone = datetime | None
CREATED_FILTER_DEPENDENCY_KEY = "created_filter"
FILTERS_DEPENDENCY_KEY = "filters"
ID_FILTER_DEPENDENCY_KEY = "id_filter"
LIMIT_OFFSET_DEPENDENCY_KEY = "limit_offset"
UPDATED_FILTER_DEPENDENCY_KEY = "updated_filter"
def provide_id_filter(
ids: list[UUID] | None = Parameter(query="ids", default=None, required=False)
) -> CollectionFilter[UUID]:
"""Return type consumed by ``Repository.filter_in_collection()``.
Parameters
----------
ids : list[UUID] | None
Parsed out of comma separated list of values in query params.
Returns:
-------
CollectionFilter[UUID]
"""
return CollectionFilter(field_name="id", values=ids or [])
def provide_created_filter(
before: DTorNone = Parameter(query="created-before", default=None, required=False),
after: DTorNone = Parameter(query="created-after", default=None, required=False),
) -> BeforeAfter:
"""Return type consumed by `Repository.filter_on_datetime_field()`.
Parameters
----------
before : datetime | None
Filter for records updated before this date/time.
after : datetime | None
Filter for records updated after this date/time.
"""
return BeforeAfter("created_at", before, after)
def provide_updated_filter(
before: DTorNone = Parameter(query="updated-before", default=None, required=False),
after: DTorNone = Parameter(query="updated-after", default=None, required=False),
) -> BeforeAfter:
"""Return type consumed by `Repository.filter_on_datetime_field()`.
Parameters
----------
before : datetime | None
Filter for records updated before this date/time.
after : datetime | None
Filter for records updated after this date/time.
"""
return BeforeAfter("updated_at", before, after)
def provide_limit_offset_pagination(
page: int = Parameter(ge=1, default=1, required=False),
page_size: int = Parameter(
query="page-size",
ge=1,
default=DEFAULT_PAGINATION_LIMIT,
required=False,
),
) -> LimitOffset:
"""Return type consumed by `Repository.apply_limit_offset_pagination()`.
Parameters
----------
page : int
LIMIT to apply to select.
page_size : int
OFFSET to apply to select.
"""
return LimitOffset(page_size, page_size * (page - 1))
def provide_filter_dependencies(
created_filter: BeforeAfter = Dependency(skip_validation=True),
updated_filter: BeforeAfter = Dependency(skip_validation=True),
id_filter: CollectionFilter = Dependency(skip_validation=True),
limit_offset: LimitOffset = Dependency(skip_validation=True),
) -> list[FilterTypes]:
"""Common collection route filtering dependencies. Add all filters to any
route by including this function as a dependency, e.g:
@get
def get_collection_handler(filters: Filters) -> ...:
...
The dependency is provided at the application layer, so only need to inject the dependency where
necessary.
Parameters
----------
id_filter : repository.CollectionFilter
Filter for scoping query to limited set of identities.
created_filter : repository.BeforeAfter
Filter for scoping query to instance creation date/time.
updated_filter : repository.BeforeAfter
Filter for scoping query to instance update date/time.
limit_offset : repository.LimitOffset
Filter for query pagination.
Returns:
-------
list[FilterTypes]
List of filters parsed from connection.
"""
return [
created_filter,
id_filter,
limit_offset,
updated_filter,
]
def create_collection_dependencies() -> dict[str, Provide]:
"""Creates a dictionary of provides for pagination endpoints.
Returns:
-------
dict[str, Provide]
"""
return {
LIMIT_OFFSET_DEPENDENCY_KEY: Provide(provide_limit_offset_pagination, sync_to_thread=False),
UPDATED_FILTER_DEPENDENCY_KEY: Provide(provide_updated_filter, sync_to_thread=False),
CREATED_FILTER_DEPENDENCY_KEY: Provide(provide_created_filter, sync_to_thread=False),
ID_FILTER_DEPENDENCY_KEY: Provide(provide_id_filter, sync_to_thread=False),
FILTERS_DEPENDENCY_KEY: Provide(provide_filter_dependencies, sync_to_thread=False),
}

68
app/lib/exceptions.py Normal file
View File

@ -0,0 +1,68 @@
import logging
from typing import TYPE_CHECKING
from litestar.contrib.repository.exceptions import (
ConflictError as RepositoryConflictException,
)
from litestar.contrib.repository.exceptions import (
NotFoundError as RepositoryNotFoundException,
)
from litestar.contrib.repository.exceptions import (
RepositoryError as RepositoryException,
)
from litestar.exceptions import (
HTTPException,
InternalServerException,
NotFoundException,
)
from litestar.middleware.exceptions.middleware import create_exception_response
from .service import ServiceError
if TYPE_CHECKING:
from litestar.connection import Request
from litestar.response import Response
__all__ = [
"repository_exception_to_http_response",
"service_exception_to_http_response",
]
logger = logging.getLogger(__name__)
class ConflictException(HTTPException):
status_code = 409
def repository_exception_to_http_response(request: "Request", exc: RepositoryException) -> "Response":
"""Transform repository exceptions to HTTP exceptions.
Args:
_: The request that experienced the exception.
exc: Exception raised during handling of the request.
Returns:
Exception response appropriate to the type of original exception.
"""
http_exc: type[HTTPException]
if isinstance(exc, RepositoryNotFoundException):
http_exc = NotFoundException
elif isinstance(exc, RepositoryConflictException):
http_exc = ConflictException
else:
http_exc = InternalServerException
return create_exception_response(request, exc=http_exc())
def service_exception_to_http_response(request: "Request", exc: ServiceError) -> "Response":
"""Transform service exceptions to HTTP exceptions.
Args:
_: The request that experienced the exception.
exc: Exception raised during handling of the request.
Returns:
Exception response appropriate to the type of original exception.
"""
return create_exception_response(request, InternalServerException())

95
app/lib/service.py Normal file
View File

@ -0,0 +1,95 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Any, Generic
from litestar.contrib.sqlalchemy.repository import ModelT
__all__ = ["Service", "ServiceError"]
if TYPE_CHECKING:
from litestar.contrib.repository import AbstractAsyncRepository, FilterTypes
class ServiceError(Exception):
"""Base class for `Service` related exceptions."""
class Service(Generic[ModelT]):
def __init__(self, repository: AbstractAsyncRepository[ModelT]) -> None:
"""Generic Service object.
Args:
repository: Instance conforming to `AbstractRepository` interface.
"""
self.repository = repository
async def create(self, data: ModelT) -> ModelT:
"""Wraps repository instance creation.
Args:
data: Representation to be created.
Returns:
Representation of created instance.
"""
return await self.repository.add(data)
async def list(self, *filters: FilterTypes, **kwargs: Any) -> list[ModelT]:
"""Wraps repository scalars operation.
Args:
*filters: Collection route filters.
**kwargs: Keyword arguments for attribute based filtering.
Returns:
The list of instances retrieved from the repository.
"""
return await self.repository.list(*filters, **kwargs)
async def update(self, id_: Any, data: ModelT) -> ModelT:
"""Wraps repository update operation.
Args:
id_: Identifier of item to be updated.
data: Representation to be updated.
Returns:
Updated representation.
"""
return await self.repository.update(data)
async def upsert(self, id_: Any, data: ModelT) -> ModelT:
"""Wraps repository upsert operation.
Args:
id_: Identifier of the object for upsert.
data: Representation for upsert.
Returns:
-------
Updated or created representation.
"""
return await self.repository.upsert(data)
async def get(self, id_: Any) -> ModelT:
"""Wraps repository scalar operation.
Args:
id_: Identifier of instance to be retrieved.
Returns:
Representation of instance with identifier `id_`.
"""
return await self.repository.get(id_)
async def delete(self, id_: Any) -> ModelT:
"""Wraps repository delete operation.
Args:
id_: Identifier of instance to be deleted.
Returns:
Representation of the deleted instance.
"""
return await self.repository.delete(id_)

99
main.py
View File

@ -1,18 +1,28 @@
from typing import Any, AsyncGenerator from uuid import UUID
from litestar import Litestar, get from litestar import Litestar, get
from litestar.contrib.sqlalchemy.plugins import SQLAlchemyAsyncConfig, SQLAlchemyPlugin from litestar.contrib.repository import FilterTypes
from litestar.exceptions import ClientException from litestar.contrib.repository.exceptions import (
RepositoryError as RepositoryException,
)
from litestar.contrib.repository.filters import (
BeforeAfter,
CollectionFilter,
LimitOffset,
NotInCollectionFilter,
NotInSearchFilter,
OnBeforeAfter,
OrderBy,
SearchFilter,
)
from litestar.contrib.sqlalchemy.plugins import SQLAlchemyPlugin
from litestar.openapi import OpenAPIConfig from litestar.openapi import OpenAPIConfig
from litestar.status_codes import HTTP_409_CONFLICT from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from sqlalchemy.engine import URL
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from service.orm.machine import MachineORM from app.controllers import create_router
from app.database import db_config, provide_transaction
sessionmaker = async_sessionmaker(expire_on_commit=False) from app.lib import exceptions
from app.lib.service import ServiceError
@get("/") @get("/")
@ -20,56 +30,27 @@ async def hello_world() -> str:
return "Hello, world!" return "Hello, world!"
async def get_machine_list(session: AsyncSession) -> list[MachineORM]:
query = select(MachineORM)
# if done is not None:
# query = query.where(MachineORM.done.is_(done))
result = await session.execute(query)
return result.scalars().all()
def serialize_todo(machine: MachineORM) -> dict[str, Any]:
return {"id": machine.id, "caption": machine.caption}
@get("/machines")
async def get_machines(transaction: AsyncSession) -> list[dict[str, Any]]:
return [serialize_todo(todo) for todo in await get_machine_list(transaction)]
async def provide_transaction(
db_session: AsyncSession,
) -> AsyncGenerator[AsyncSession, None]:
print("AAAAAAAAAAAAAAAAAA")
try:
async with db_session.begin():
yield db_session
except IntegrityError as exc:
raise ClientException(
status_code=HTTP_409_CONFLICT,
detail=str(exc),
) from exc
db_connection_string = URL.create(
drivername="postgresql+asyncpg",
username="televend",
password="televend",
host="localhost",
port=5433,
database="televend",
)
db_config = SQLAlchemyAsyncConfig(
connection_string=db_connection_string.render_as_string(hide_password=False)
)
app = Litestar( app = Litestar(
route_handlers=[hello_world, get_machines], route_handlers=[hello_world, create_router()],
openapi_config=OpenAPIConfig(title="My API", version="1.0.0"), openapi_config=OpenAPIConfig(title="My API", version="1.0.0"),
dependencies={"transaction": provide_transaction}, dependencies={"session": provide_transaction},
plugins=[SQLAlchemyPlugin(db_config)], plugins=[SQLAlchemyPlugin(db_config)],
exception_handlers={
RepositoryException: exceptions.repository_exception_to_http_response, # type: ignore[dict-item]
ServiceError: exceptions.service_exception_to_http_response, # type: ignore[dict-item]
},
signature_namespace={
"AsyncSession": AsyncSession,
"FilterTypes": FilterTypes,
"BeforeAfter": BeforeAfter,
"CollectionFilter": CollectionFilter,
"LimitOffset": LimitOffset,
"UUID": UUID,
"OrderBy": OrderBy,
"SearchFilter": SearchFilter,
"OnBeforeAfter": OnBeforeAfter,
"NotInSearchFilter": NotInSearchFilter,
"NotInCollectionFilter": NotInCollectionFilter,
},
debug=True, debug=True,
) )

54
poetry.lock generated
View File

@ -479,6 +479,58 @@ msgspec = ["msgspec"]
odmantic = ["odmantic", "pydantic[email]"] odmantic = ["odmantic", "pydantic[email]"]
pydantic = ["pydantic[email]"] pydantic = ["pydantic[email]"]
[[package]]
name = "pydantic"
version = "1.10.12"
description = "Data validation and settings management using python type hints"
optional = false
python-versions = ">=3.7"
files = [
{file = "pydantic-1.10.12-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:a1fcb59f2f355ec350073af41d927bf83a63b50e640f4dbaa01053a28b7a7718"},
{file = "pydantic-1.10.12-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:b7ccf02d7eb340b216ec33e53a3a629856afe1c6e0ef91d84a4e6f2fb2ca70fe"},
{file = "pydantic-1.10.12-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8fb2aa3ab3728d950bcc885a2e9eff6c8fc40bc0b7bb434e555c215491bcf48b"},
{file = "pydantic-1.10.12-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:771735dc43cf8383959dc9b90aa281f0b6092321ca98677c5fb6125a6f56d58d"},
{file = "pydantic-1.10.12-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:ca48477862372ac3770969b9d75f1bf66131d386dba79506c46d75e6b48c1e09"},
{file = "pydantic-1.10.12-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:a5e7add47a5b5a40c49b3036d464e3c7802f8ae0d1e66035ea16aa5b7a3923ed"},
{file = "pydantic-1.10.12-cp310-cp310-win_amd64.whl", hash = "sha256:e4129b528c6baa99a429f97ce733fff478ec955513630e61b49804b6cf9b224a"},
{file = "pydantic-1.10.12-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:b0d191db0f92dfcb1dec210ca244fdae5cbe918c6050b342d619c09d31eea0cc"},
{file = "pydantic-1.10.12-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:795e34e6cc065f8f498c89b894a3c6da294a936ee71e644e4bd44de048af1405"},
{file = "pydantic-1.10.12-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:69328e15cfda2c392da4e713443c7dbffa1505bc9d566e71e55abe14c97ddc62"},
{file = "pydantic-1.10.12-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:2031de0967c279df0d8a1c72b4ffc411ecd06bac607a212892757db7462fc494"},
{file = "pydantic-1.10.12-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:ba5b2e6fe6ca2b7e013398bc7d7b170e21cce322d266ffcd57cca313e54fb246"},
{file = "pydantic-1.10.12-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:2a7bac939fa326db1ab741c9d7f44c565a1d1e80908b3797f7f81a4f86bc8d33"},
{file = "pydantic-1.10.12-cp311-cp311-win_amd64.whl", hash = "sha256:87afda5539d5140cb8ba9e8b8c8865cb5b1463924d38490d73d3ccfd80896b3f"},
{file = "pydantic-1.10.12-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:549a8e3d81df0a85226963611950b12d2d334f214436a19537b2efed61b7639a"},
{file = "pydantic-1.10.12-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:598da88dfa127b666852bef6d0d796573a8cf5009ffd62104094a4fe39599565"},
{file = "pydantic-1.10.12-cp37-cp37m-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:ba5c4a8552bff16c61882db58544116d021d0b31ee7c66958d14cf386a5b5350"},
{file = "pydantic-1.10.12-cp37-cp37m-musllinux_1_1_i686.whl", hash = "sha256:c79e6a11a07da7374f46970410b41d5e266f7f38f6a17a9c4823db80dadf4303"},
{file = "pydantic-1.10.12-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:ab26038b8375581dc832a63c948f261ae0aa21f1d34c1293469f135fa92972a5"},
{file = "pydantic-1.10.12-cp37-cp37m-win_amd64.whl", hash = "sha256:e0a16d274b588767602b7646fa05af2782576a6cf1022f4ba74cbb4db66f6ca8"},
{file = "pydantic-1.10.12-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:6a9dfa722316f4acf4460afdf5d41d5246a80e249c7ff475c43a3a1e9d75cf62"},
{file = "pydantic-1.10.12-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:a73f489aebd0c2121ed974054cb2759af8a9f747de120acd2c3394cf84176ccb"},
{file = "pydantic-1.10.12-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6b30bcb8cbfccfcf02acb8f1a261143fab622831d9c0989707e0e659f77a18e0"},
{file = "pydantic-1.10.12-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:2fcfb5296d7877af406ba1547dfde9943b1256d8928732267e2653c26938cd9c"},
{file = "pydantic-1.10.12-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:2f9a6fab5f82ada41d56b0602606a5506aab165ca54e52bc4545028382ef1c5d"},
{file = "pydantic-1.10.12-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:dea7adcc33d5d105896401a1f37d56b47d443a2b2605ff8a969a0ed5543f7e33"},
{file = "pydantic-1.10.12-cp38-cp38-win_amd64.whl", hash = "sha256:1eb2085c13bce1612da8537b2d90f549c8cbb05c67e8f22854e201bde5d98a47"},
{file = "pydantic-1.10.12-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:ef6c96b2baa2100ec91a4b428f80d8f28a3c9e53568219b6c298c1125572ebc6"},
{file = "pydantic-1.10.12-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:6c076be61cd0177a8433c0adcb03475baf4ee91edf5a4e550161ad57fc90f523"},
{file = "pydantic-1.10.12-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2d5a58feb9a39f481eda4d5ca220aa8b9d4f21a41274760b9bc66bfd72595b86"},
{file = "pydantic-1.10.12-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:e5f805d2d5d0a41633651a73fa4ecdd0b3d7a49de4ec3fadf062fe16501ddbf1"},
{file = "pydantic-1.10.12-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:1289c180abd4bd4555bb927c42ee42abc3aee02b0fb2d1223fb7c6e5bef87dbe"},
{file = "pydantic-1.10.12-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:5d1197e462e0364906cbc19681605cb7c036f2475c899b6f296104ad42b9f5fb"},
{file = "pydantic-1.10.12-cp39-cp39-win_amd64.whl", hash = "sha256:fdbdd1d630195689f325c9ef1a12900524dceb503b00a987663ff4f58669b93d"},
{file = "pydantic-1.10.12-py3-none-any.whl", hash = "sha256:b749a43aa51e32839c9d71dc67eb1e4221bb04af1033a32e3923d46f9effa942"},
{file = "pydantic-1.10.12.tar.gz", hash = "sha256:0fe8a415cea8f340e7a9af9c54fc71a649b43e8ca3cc732986116b3cb135d303"},
]
[package.dependencies]
typing-extensions = ">=4.2.0"
[package.extras]
dotenv = ["python-dotenv (>=0.10.4)"]
email = ["email-validator (>=1.0.3)"]
[[package]] [[package]]
name = "python-dateutil" name = "python-dateutil"
version = "2.8.2" version = "2.8.2"
@ -708,4 +760,4 @@ anyio = ">=3.0.0"
[metadata] [metadata]
lock-version = "2.0" lock-version = "2.0"
python-versions = "^3.11" python-versions = "^3.11"
content-hash = "be317fb95d88f3a21ee7f3f9824483d6c5cd20fb36759f608703a87131ec2b8a" content-hash = "411f2e851f3c3fdf05d5a8f1c200b8ded6223b8e4bb9b1be00db2f6a28a8dc26"

View File

@ -12,6 +12,7 @@ litestar = "^2.0.1"
uvicorn = "^0.23.2" uvicorn = "^0.23.2"
sqlalchemy = "^2.0.20" sqlalchemy = "^2.0.20"
asyncpg = "^0.28.0" asyncpg = "^0.28.0"
pydantic = "<2.0"
[tool.poetry.group.dev.dependencies] [tool.poetry.group.dev.dependencies]

View File

@ -1,7 +0,0 @@
from sqlalchemy.orm import DeclarativeBase
class ORMBase(DeclarativeBase):
...

View File

@ -1,10 +0,0 @@
from sqlalchemy.orm import Mapped, mapped_column
from service.database import ORMBase
class MachineORM(ORMBase):
__tablename__ = "machines"
id: Mapped[int] = mapped_column(primary_key=True)
caption: Mapped[str]