Pydantic benchmark

This commit is contained in:
Eden Kirin
2023-10-11 20:33:21 +02:00
commit 3e02a67d94
13 changed files with 553 additions and 0 deletions

0
benchmark/__init__.py Normal file
View File

32
benchmark/base.py Normal file
View File

@ -0,0 +1,32 @@
from abc import ABC, abstractmethod
from pathlib import Path
import time
class BenchmarkBase(ABC):
def __init__(self, test_file_name: Path) -> None:
self.test_file_name = test_file_name
def _read_test_file(self) -> str:
with open(self.test_file_name, "r") as f:
return f.read()
def start_timer(self) -> None:
self.__timer = time.perf_counter()
def stop_timer(self) -> float:
return time.perf_counter() - self.__timer
def execute(self) -> float:
print(f"*** Running {self.__class__.__name__}")
self.start_timer()
self._benchmark()
benchmark_time = self.stop_timer()
print(f"Finished in {benchmark_time:0.3f}s")
return benchmark_time
@abstractmethod
def _benchmark(self):
...

39
benchmark/factories.py Normal file
View File

@ -0,0 +1,39 @@
from pathlib import Path
from polyfactory.factories.pydantic_factory import ModelFactory
from polyfactory import Use
from benchmark.pydantic_benchmark.models import (
ColumnsInput,
PlanogramInput,
PlanogramsBulkInputPayload,
)
COLUMNS_COUNT = 100
PLANOGRAMS_COUNT = 1000
class ColumnsInputFactory(ModelFactory):
__model__ = ColumnsInput
class PlanogramInputFactory(ModelFactory):
__model__ = PlanogramInput
columns = Use(ColumnsInputFactory.batch, size=COLUMNS_COUNT)
class PlanogramsBulkInputPayloadFactory(ModelFactory):
__model__ = PlanogramsBulkInputPayload
planograms = Use(PlanogramInputFactory.batch, size=PLANOGRAMS_COUNT)
def create_test_file(filename: Path):
bulk = PlanogramsBulkInputPayloadFactory.build()
out_data = bulk.model_dump_json(by_alias=True)
with open(filename, "w") as f:
f.write(out_data)
print(f"{len(out_data)} bytes of test data written to {filename}.")

View File

View File

View File

View File

View File

@ -0,0 +1,8 @@
from benchmark.base import BenchmarkBase
from benchmark.pydantic_benchmark.models import PlanogramsBulkInputPayload
class PydanticBenchmark(BenchmarkBase):
def _benchmark(self) -> None:
test_data = self._read_test_file()
PlanogramsBulkInputPayload.parse_raw(test_data)

View File

@ -0,0 +1,138 @@
from enum import Enum
from typing import Annotated, Any, Dict, List, Optional
from uuid import uuid4
from pydantic import BaseModel, Field, validator
from pydantic.deprecated.class_validators import root_validator
def to_camel_case(snake_str: str) -> str:
components = snake_str.split("_")
return components[0] + "".join(x.title() for x in components[1:])
class ColumnItemType(str, Enum):
PRODUCT = "PRODUCT"
COMPONENT = "COMPONENT"
QuantityInt = Annotated[int, Field(ge=0, le=2147483647)]
StrictSmallInt = Annotated[int, Field(ge=0, le=32767)]
class CorrelationId(BaseModel):
correlation_id: str = Field(default_factory=lambda: uuid4().hex)
class ColumnsInput(BaseModel):
column_number: StrictSmallInt = Field(
alias="columnNumber", description="View index in Televend"
)
external_product_id: Optional[str] = Field(
default=None,
alias="externalProductId",
description="Product or Component external ID used for product/component identification in external partner's ERP system",
min_length=1,
max_length=32,
)
old_qty: Optional[QuantityInt] = Field(
default_factory=lambda: None,
alias="oldQty",
)
new_qty: Optional[QuantityInt] = Field(
default_factory=lambda: None,
alias="newQty",
)
old_price: Optional[float] = Field(
default_factory=lambda: None,
alias="oldPrc",
ge=0,
le=99999999.99,
)
new_price: Optional[float] = Field(
default_factory=lambda: None,
alias="newPrc",
ge=0,
le=99999999.99,
)
select_map: Optional[List[StrictSmallInt]] = Field(
default_factory=lambda: None, alias="selectMap"
)
item_type: Optional[ColumnItemType] = Field(
default_factory=lambda: ColumnItemType.PRODUCT,
alias="itemType",
description="MUST be set if item is COMPONENT",
)
class Config:
populate_by_name = True
alias_generator = to_camel_case
str_strip_whitespace = True
# @root_validator
# def check_required_fields(cls, values):
# if values.get("external_product_id"):
# if values.get("old_qty") is None:
# raise ValueError(
# f"provide oldQty for product {values['external_product_id']}"
# )
# if values.get("new_qty") is None:
# raise ValueError(
# f"provide newQty for product {values['external_product_id']}"
# )
# if values.get("old_price") is None:
# raise ValueError(
# f"provide oldPrc for product {values['external_product_id']}"
# )
# if values.get("new_price") is None:
# raise ValueError(
# f"provide newPrc for product {values['external_product_id']}"
# )
# return values
@validator("item_type")
def set_item_type(cls, value):
return value or ColumnItemType.PRODUCT
class PlanogramInput(CorrelationId, BaseModel):
machine_external_id: str = Field(
alias="machineExternalId",
description="Machine external ID",
min_length=1,
max_length=32,
)
columns: List[ColumnsInput] = Field(
title="Columns", description="A list of columns this planogram specifies"
)
class Config:
title = "Planogram"
alias_generator = to_camel_case
populate_by_name = True
str_strip_whitespace = True
class PlanogramsBulkInputPayload(BaseModel):
planograms: List[PlanogramInput] = Field(
title="Planograms",
description="A list of Planograms this bulk carries",
)
class Config:
populate_by_name = True
alias_generator = to_camel_case
# @root_validator
# def check_machine_unique(cls, values):
# planograms = values.get("planograms", [])
# external_ids = set()
# for planogram in planograms:
# if planogram.machine_external_id in external_ids:
# raise ValueError(
# f"Machine externalId must be unique! Duplicate {planogram.machine_external_id}"
# )
# external_ids.add(planogram.machine_external_id)
# return values