from enum import Enum import json from typing import Optional, List from fastapi import FastAPI, Request, Response, status from pydantic import BaseModel, Field app = FastAPI() class ImportType(str, Enum): MACHINES = "MACHINES" LOCATIONS = "LOCATIONS" REGIONS = "REGIONS" PRODUCTS = "PRODUCTS" PLANOGRAMS = "PLANOGRAMS" CLIENTS = "CLIENTS" USERS = "USERS" MACHINE_TYPES = "MACHINE_TYPES" PACKINGS = "PACKINGS" ASSETS = "ASSETS" class ImportStatus(str, Enum): STARTED = "STARTED" ERROR = "ERROR" SUCCESS = "SUCCESS" FAIL = "FAIL" IN_PROGRESS = "IN_PROGRESS" WARNING = "WARNING" class ImportState(str, Enum): CSV_VALIDATION = "CSV_VALIDATION" DATABASE_VALIDATION = "DATABASE_VALIDATION" DATABASE_IMPORT = "DATABASE_IMPORT" DATA_VALIDATION = "DATA_VALIDATION" IMPORT_STARTED = "IMPORT_STARTED" IMPORT_FINISHED = "IMPORT_FINISHED" class StatusReportDetailTotals(BaseModel): inserted_count: Optional[int] = 0 updated_count: Optional[int] = 0 deleted_count: Optional[int] = 0 reused_count: Optional[int] = 0 class StatusReportError(BaseModel): external_id: Optional[str] = None line_number: Optional[int] field_name: Optional[str] = None detail: str class StatusReportDetails(BaseModel): totals: StatusReportDetailTotals errors: List[StatusReportError] = Field(default_factory=list) class StatusReport(BaseModel): import_type: ImportType request_id: str details: Optional[StatusReportDetails] = None state: ImportState status: ImportStatus @app.get("/ping") async def ping(): return Response( content="PONG!", status_code=status.HTTP_200_OK, ) @app.post("/status-report") async def test_status_report(request: Request, status_report: StatusReport): print("HEADERS", request.headers) print(status_report.json()) response_data = { "message": "OkieDokie", } return Response( content=json.dumps(response_data), status_code=status.HTTP_200_OK, # status_code=404, )