import csv from enum import IntEnum import requests from lib.csv_loader import CSVLoader, CSVRow from lib.csv_loader.csv_loader import BoolValuePair, CSVRowDefaultConfig class ActionEnum(IntEnum): INSERT = 0 UPDATE = 1 DELETE = 2 class AssetRow(CSVRow): asset_id: str asset_action: ActionEnum serial_number: str brand_id: int model_id: int def main(): response = requests.get( "https://www.ekirin.com/tmp/asset-import/importer-italy-data.csv" ) # iterator will return lines as bytes, so help it figure out it's a string response.encoding = "utf-8" csv_response_iterator = response.iter_lines(decode_unicode=True) reader = csv.reader(csv_response_iterator, delimiter=";") csv_loader = CSVLoader[AssetRow]( reader=reader, output_model_cls=AssetRow, has_header=True, aggregate_errors=True, ) result = csv_loader.read_rows() "Results:" for row in result.rows: print(row) if result.has_errors(): print("Errors:") for error in result.errors: print(f"Line: {error.line_number}: {error.original_error}") if __name__ == "__main__": main()