Files
2023-10-27 13:01:46 +02:00

92 lines
2.1 KiB
Python

from enum import Enum
import json
from typing import Optional, List
from fastapi import FastAPI, Request, Response, status
from pydantic import BaseModel, Field
app = FastAPI()
class ImportType(str, Enum):
MACHINES = "MACHINES"
LOCATIONS = "LOCATIONS"
REGIONS = "REGIONS"
PRODUCTS = "PRODUCTS"
PLANOGRAMS = "PLANOGRAMS"
CLIENTS = "CLIENTS"
USERS = "USERS"
MACHINE_TYPES = "MACHINE_TYPES"
PACKINGS = "PACKINGS"
ASSETS = "ASSETS"
ASSETS_ASSIGNMENT = "ASSETS_ASSIGNMENT"
class ImportStatus(str, Enum):
STARTED = "STARTED"
ERROR = "ERROR"
SUCCESS = "SUCCESS"
FAIL = "FAIL"
IN_PROGRESS = "IN_PROGRESS"
WARNING = "WARNING"
class ImportState(str, Enum):
CSV_VALIDATION = "CSV_VALIDATION"
DATABASE_VALIDATION = "DATABASE_VALIDATION"
DATABASE_IMPORT = "DATABASE_IMPORT"
DATA_VALIDATION = "DATA_VALIDATION"
IMPORT_STARTED = "IMPORT_STARTED"
IMPORT_FINISHED = "IMPORT_FINISHED"
class StatusReportDetailTotals(BaseModel):
inserted_count: Optional[int] = 0
updated_count: Optional[int] = 0
deleted_count: Optional[int] = 0
reused_count: Optional[int] = 0
class StatusReportError(BaseModel):
external_id: Optional[str] = None
line_number: Optional[int]
field_name: Optional[str] = None
detail: str
class StatusReportDetails(BaseModel):
totals: StatusReportDetailTotals
errors: List[StatusReportError] = Field(default_factory=list)
class StatusReport(BaseModel):
import_type: ImportType
request_id: str
details: Optional[StatusReportDetails] = None
state: ImportState
status: ImportStatus
@app.get("/ping")
async def ping():
return Response(
content="PONG!",
status_code=status.HTTP_200_OK,
)
@app.post("/status-report")
async def test_status_report(request: Request, status_report: StatusReport):
print("HEADERS", request.headers)
print(status_report.json())
response_data = {
"message": "OkieDokie",
}
return Response(
content=json.dumps(response_data),
status_code=status.HTTP_200_OK,
# status_code=404,
)