Better logging

This commit is contained in:
Eden Kirin
2025-10-14 08:20:19 +02:00
parent daa7e53d6d
commit 9d0557f96d
5 changed files with 143 additions and 141 deletions

View File

@ -4,4 +4,7 @@ version = "0.1.0"
description = "Coin counter device simulator for development/testing"
readme = "README.md"
requires-python = ">=3.8"
dependencies = ["pyserial>=3.5"]
dependencies = [
"loguru>=0.7.3",
"pyserial>=3.5",
]

6
source/__init__.py Normal file
View File

@ -0,0 +1,6 @@
"""Device simulators package."""
from .glory import GlorySimulator
from .pelican import PelicanSimulator
__all__ = ["GlorySimulator", "PelicanSimulator"]

View File

@ -5,13 +5,12 @@ Simulates a Glory MACH6 coin/currency counter for development/testing purposes.
Implements the Enhanced Serial Port Mode with full command support.
"""
import argparse
import logging
import random
import time
from typing import Optional
import serial
from loguru import logger
# Protocol constants
STX = 0x02
@ -28,7 +27,7 @@ STATE_MESSAGE_PENDING = "PRN"
class GlorySimulator:
def __init__(self, logger: logging.Logger, port: str, baudrate: int = 9600):
def __init__(self, port: str, baudrate: int = 9600):
self.port = port
self.baudrate = baudrate
self.serial_conn: Optional[serial.Serial] = None
@ -76,8 +75,6 @@ class GlorySimulator:
# Message buffer
self.pending_message = None
self.logger = logger
def get_status(self) -> str:
"""Get current machine status"""
if self.pending_message:
@ -107,7 +104,7 @@ class GlorySimulator:
"""Handle SS - Status command"""
status = self.get_status()
response = f"ST{self.count_mode}{status}"
self.logger.info(f"Status request - Response: {response}")
logger.info(f"Status request - Response: {response}")
return self.create_message(response)
def handle_clear_command(self, data: bytes) -> bytes:
@ -118,23 +115,23 @@ class GlorySimulator:
# Clear batch
self.batch_counts = {}
self.batch_id = ""
self.logger.info("Batch counts cleared")
logger.info("Batch counts cleared")
elif cmd == "CS":
# Clear subtotal
self.sub_counts = {}
self.sub_id = ""
self.logger.info("Subtotal counts cleared")
logger.info("Subtotal counts cleared")
elif cmd == "CG":
# Clear grand total
self.grand_counts = {}
self.grand_id = ""
self.logger.info("Grand total counts cleared")
logger.info("Grand total counts cleared")
elif cmd.startswith("CC"):
# Clear partial count
denom_str = cmd[2:]
if denom_str in self.partial_counts:
del self.partial_counts[denom_str]
self.logger.info(f"Partial count for {denom_str} cleared")
logger.info(f"Partial count for {denom_str} cleared")
status = self.get_status()
response = f"ST{self.count_mode}{status}"
@ -153,21 +150,21 @@ class GlorySimulator:
total = sum(self.batch_counts.values())
response = f"BT{total:08d}"
self.logger.info(f"Get batch total: {total} coins")
logger.info(f"Get batch total: {total} coins")
return self.create_message(response)
elif cmd == "GS":
# Get subtotal
total = sum(self.sub_counts.values())
response = f"BS{total:08d}"
self.logger.info(f"Get subtotal: {total} coins")
logger.info(f"Get subtotal: {total} coins")
return self.create_message(response)
elif cmd == "GG":
# Get grand total
total = sum(self.grand_counts.values())
response = f"BG{total:08d}"
self.logger.info(f"Get grand total: {total} coins")
logger.info(f"Get grand total: {total} coins")
return self.create_message(response)
elif cmd.startswith("GD"):
@ -175,7 +172,7 @@ class GlorySimulator:
denom_str = cmd[2:]
count = self.partial_counts.get(denom_str, 0)
response = f"BD{count:08d}"
self.logger.info(f"Get partial count for {denom_str}: {count} coins")
logger.info(f"Get partial count for {denom_str}: {count} coins")
return self.create_message(response)
elif cmd.startswith("GT") and len(cmd) > 2:
@ -183,14 +180,14 @@ class GlorySimulator:
denom_str = cmd[2:]
count = self.batch_counts.get(denom_str, 0)
response = f"BT{count:08d}"
self.logger.info(f"Get batch count for {denom_str}: {count} coins")
logger.info(f"Get batch count for {denom_str}: {count} coins")
return self.create_message(response)
elif cmd == "GI":
# Get product totals (ID totals)
response = ""
# Return empty if no totals
self.logger.info("Get product totals (empty)")
logger.info("Get product totals (empty)")
return self.create_message(response)
elif cmd == "GV":
@ -199,7 +196,7 @@ class GlorySimulator:
for station, value in sorted(self.station_values.items()):
active = "i" if value == 0 else str(station)
response += f"{active} {value:.2f} \r"
self.logger.info("Get station values")
logger.info("Get station values")
return self.create_message(response)
return self.create_message("")
@ -215,9 +212,9 @@ class GlorySimulator:
try:
value = int(value_str)
self.bag_stops[denom_str] = value
self.logger.info(f"Bag stop set for {denom_str}: {value}")
logger.info(f"Bag stop set for {denom_str}: {value}")
except ValueError:
self.logger.error(f"Invalid bag stop value: {value_str}")
logger.error(f"Invalid bag stop value: {value_str}")
status = self.get_status()
response = f"ST{self.count_mode}{status}"
@ -231,13 +228,13 @@ class GlorySimulator:
# Start motor
self.motor_running = True
self.motor_has_run = True
self.logger.info("Motor started")
logger.info("Motor started")
# Simulate coin counting
self._simulate_counting()
elif cmd == "MS":
# Stop motor
self.motor_running = False
self.logger.info("Motor stopped")
logger.info("Motor stopped")
status = self.get_status()
response = f"ST{self.count_mode}{status}"
@ -245,7 +242,7 @@ class GlorySimulator:
def handle_beep_command(self, data: bytes) -> bytes:
"""Handle BB - Beep command"""
self.logger.info("Beep!")
logger.info("Beep!")
status = self.get_status()
response = f"ST{self.count_mode}{status}"
return self.create_message(response)
@ -258,7 +255,7 @@ class GlorySimulator:
# Accept batch
total = sum(self.batch_counts.values())
response = f"BT{total:08d}"
self.logger.info(f"Accept batch: {total}")
logger.info(f"Accept batch: {total}")
self.pending_message = response
return self.create_message(response)
@ -266,7 +263,7 @@ class GlorySimulator:
# Accept subtotal
total = sum(self.sub_counts.values())
response = f"BS{total:08d}"
self.logger.info(f"Accept subtotal: {total}")
logger.info(f"Accept subtotal: {total}")
self.pending_message = response
return self.create_message(response)
@ -274,7 +271,7 @@ class GlorySimulator:
# Accept grand total
total = sum(self.grand_counts.values())
response = f"BG{total:08d}"
self.logger.info(f"Accept grand total: {total}")
logger.info(f"Accept grand total: {total}")
self.pending_message = response
return self.create_message(response)
@ -291,9 +288,9 @@ class GlorySimulator:
try:
value = int(value_str)
self.partial_counts[denom_str] = value
self.logger.info(f"Partial count set for {denom_str}: {value}")
logger.info(f"Partial count set for {denom_str}: {value}")
except ValueError:
self.logger.error(f"Invalid partial count value: {value_str}")
logger.error(f"Invalid partial count value: {value_str}")
status = self.get_status()
response = f"ST{self.count_mode}{status}"
@ -305,13 +302,13 @@ class GlorySimulator:
if cmd.startswith("ID"):
self.batch_id = cmd[2:14] if len(cmd) >= 14 else cmd[2:]
self.logger.info(f"Batch ID set: {self.batch_id}")
logger.info(f"Batch ID set: {self.batch_id}")
elif cmd.startswith("IS"):
self.sub_id = cmd[2:14] if len(cmd) >= 14 else cmd[2:]
self.logger.info(f"Sub ID set: {self.sub_id}")
logger.info(f"Sub ID set: {self.sub_id}")
elif cmd.startswith("IG"):
self.grand_id = cmd[2:14] if len(cmd) >= 14 else cmd[2:]
self.logger.info(f"Grand ID set: {self.grand_id}")
logger.info(f"Grand ID set: {self.grand_id}")
status = self.get_status()
response = f"ST{self.count_mode}{status}"
@ -342,14 +339,14 @@ class GlorySimulator:
# Update grand total
self.grand_counts[denom] = self.grand_counts.get(denom, 0) + count
self.logger.info(f"Counted {count} coins of denomination {denom}")
logger.info(f"Counted {count} coins of denomination {denom}")
# Log total value counted
total_value = sum(
self.batch_counts.get(denom, 0) * int(denom)
for denom in denominations.keys()
)
self.logger.info(f"Total value counted in this batch: ${total_value/100:.2f}")
logger.info(f"Total value counted in this batch: ${total_value/100:.2f}")
def create_message(self, data: str) -> bytes:
"""Create a message with STX and ETX"""
@ -358,15 +355,15 @@ class GlorySimulator:
def parse_message(self, message: bytes) -> Optional[bytes]:
"""Parse and validate a received message"""
if len(message) < 3:
self.logger.error("Message too short")
logger.error("Message too short")
return None
if message[0] != STX:
self.logger.error("Invalid STX")
logger.error("Invalid STX")
return None
if message[-1] != ETX:
self.logger.error("Invalid ETX")
logger.error("Invalid ETX")
return None
data = message[1:-1]
@ -381,7 +378,7 @@ class GlorySimulator:
cmd_str = data.decode("ascii", errors="ignore").strip()
cmd = cmd_str[:2]
self.logger.info(f"Received command: {cmd_str}")
logger.info(f"Received command: {cmd_str}")
if cmd == "SS":
return self.handle_status_command(data)
@ -402,19 +399,19 @@ class GlorySimulator:
elif cmd in ["ID", "IS", "IG"]:
return self.handle_id_command(data)
else:
self.logger.warning(f"Unknown command: {cmd}")
logger.warning(f"Unknown command: {cmd}")
return None
except Exception as e:
self.logger.error(f"Error handling command: {e}", exc_info=True)
logger.error(f"Error handling command: {e}", exc_info=True)
return None
def run(self):
"""Main simulator loop"""
self.logger.info(
logger.info(
f"Starting Glory MACH6 simulator on {self.port} at {self.baudrate} baud"
)
self.logger.info("Enhanced Serial Port Mode enabled")
logger.info("Enhanced Serial Port Mode enabled")
try:
self.serial_conn = serial.Serial(
@ -426,7 +423,7 @@ class GlorySimulator:
timeout=1,
)
self.logger.info("Serial port opened successfully")
logger.info("Serial port opened successfully")
buffer = bytearray()
@ -437,13 +434,13 @@ class GlorySimulator:
if self.serial_conn.in_waiting > 0:
byte_data = self.serial_conn.read(1)
if byte_data[0] == ACK:
self.logger.info("Received ACK - clearing pending message")
logger.info("Received ACK - clearing pending message")
# Clear batch after ACK
if "BT" in self.pending_message:
self.batch_counts = {}
self.pending_message = None
elif byte_data[0] == NAK:
self.logger.info("Received NAK - retransmitting")
logger.info("Received NAK - retransmitting")
response = self.create_message(self.pending_message)
self.serial_conn.write(response)
time.sleep(0.01)
@ -461,14 +458,18 @@ class GlorySimulator:
message = bytes(buffer[stx_idx : etx_idx + 1])
buffer = buffer[etx_idx + 1 :]
self.logger.debug(f"RX: {' '.join(f'{b:02X}' for b in message)}")
logger.debug(
f"RX: {' '.join(f'{b:02X}' for b in message)}"
)
# Parse and handle message
parsed_data = self.parse_message(message)
if parsed_data:
response = self.handle_command(parsed_data)
if response:
self.logger.debug(f"TX: {' '.join(f'{b:02X}' for b in response)}")
logger.debug(
f"TX: {' '.join(f'{b:02X}' for b in response)}"
)
self.serial_conn.write(response)
except ValueError:
pass # ETX not found yet
@ -476,32 +477,10 @@ class GlorySimulator:
time.sleep(0.01)
except KeyboardInterrupt:
self.logger.info("Simulator stopped by user")
logger.info("Simulator stopped by user")
except Exception as e:
self.logger.error(f"Error: {e}", exc_info=True)
logger.error(f"Error: {e}", exc_info=True)
finally:
if self.serial_conn and self.serial_conn.is_open:
self.serial_conn.close()
self.logger.info("Serial port closed")
def main():
parser = argparse.ArgumentParser(description="Glory MACH6 Device Simulator")
parser.add_argument(
"--port",
"-p",
default="/dev/ttyUSB0",
help="Serial port (default: /dev/ttyUSB0)",
)
parser.add_argument(
"--baudrate", "-b", type=int, default=9600, help="Baud rate (default: 9600)"
)
args = parser.parse_args()
simulator = GlorySimulator(args.port, args.baudrate)
simulator.run()
if __name__ == "__main__":
main()
logger.info("Serial port closed")

View File

@ -4,13 +4,12 @@ Pelican Device Simulator
Simulates a Pelican coin counting device for development/testing purposes.
"""
import argparse
import logging
import random
import time
from typing import Optional
import serial
from loguru import logger
# Protocol constants
STX = 0x02
@ -42,7 +41,7 @@ PASSWORD = "69390274"
class PelicanSimulator:
def __init__(self, logger: logging.Logger, port: str, baudrate: int = 9600):
def __init__(self, port: str, baudrate: int = 9600):
self.port = port
self.baudrate = baudrate
self.serial_conn: Optional[serial.Serial] = None
@ -79,8 +78,6 @@ class PelicanSimulator:
200, # 2.00
]
self.logger = logger
def calc_crc(self, data: bytes) -> int:
"""Calculate CRC-16 using the algorithm from the spec"""
crc = 0
@ -107,29 +104,29 @@ class PelicanSimulator:
def parse_message(self, message: bytes) -> Optional[bytes]:
"""Parse and validate a received message"""
if len(message) < 5:
self.logger.error("Message too short")
logger.error("Message too short")
return None
if message[0] != STX:
self.logger.error("Invalid STX")
logger.error("Invalid STX")
return None
if message[-1] != ETX:
self.logger.error("Invalid ETX")
logger.error("Invalid ETX")
return None
length = message[1]
data = message[2 : 2 + length]
if len(data) != length:
self.logger.error("Length mismatch")
logger.error("Length mismatch")
return None
crc_received = (message[2 + length] << 8) | message[2 + length + 1]
crc_calculated = self.calc_crc(data)
if crc_received != crc_calculated:
self.logger.error(
logger.error(
f"CRC mismatch: received {crc_received:04X}, calculated {crc_calculated:04X}"
)
return None
@ -145,17 +142,17 @@ class PelicanSimulator:
if password == PASSWORD:
self.link_established = True
self.logger.info("Link established successfully")
logger.info("Link established successfully")
return self.create_message(bytes([CMD_RESPONSE_CONSTRUCT_LINK, 0x00]))
else:
self.logger.warning(f"Invalid password: {password}")
logger.warning(f"Invalid password: {password}")
return self.create_message(bytes([CMD_RESPONSE_CONSTRUCT_LINK, 0x01]))
def handle_destruct_link(self, data: bytes) -> bytes:
"""Handle destruct link command (0x03)"""
if self.link_established:
self.link_established = False
self.logger.info("Link destructed")
logger.info("Link destructed")
return self.create_message(bytes([CMD_RESPONSE_DESTRUCT_LINK, 0x00]))
else:
return self.create_message(bytes([CMD_RESPONSE_DESTRUCT_LINK, 0x01]))
@ -163,19 +160,19 @@ class PelicanSimulator:
def handle_get_value(self, data: bytes) -> bytes:
"""Handle get value command (0x11)"""
if not self.link_established:
self.logger.warning("Get value request rejected - link not established")
logger.warning("Get value request rejected - link not established")
return self.create_message(bytes([CMD_VALUE_RETURNED, 0x01]))
if len(data) < 2:
self.logger.error("Get value request missing variable number")
logger.error("Get value request missing variable number")
return self.create_message(bytes([CMD_VALUE_RETURNED, 0x01]))
var_num = data[1]
self.logger.info(f"Get value request for variable 0x{var_num:02X}")
logger.info(f"Get value request for variable 0x{var_num:02X}")
# Variable 0x16 - Current counting result
if var_num == 0x16:
self.logger.info("Responding with current counting result")
logger.info("Responding with current counting result")
# Generate random current counts for simulation
random_counts = []
@ -218,7 +215,7 @@ class PelicanSimulator:
total_coins = sum(random_counts)
# Log detailed count information
self.logger.info(
logger.info(
f"Current count: {total_coins} total coins, {random_rejected} rejected"
)
@ -239,13 +236,13 @@ class PelicanSimulator:
count_details.append(f"{standard_denoms[i]}: {random_counts[i]}")
if count_details:
self.logger.info(f"Coin counts: {', '.join(count_details)}")
logger.info(f"Coin counts: {', '.join(count_details)}")
return self.create_message(bytes(response))
# Variable 0x1C - Total counting result
elif var_num == 0x1C:
self.logger.info("Responding with total counting result")
logger.info("Responding with total counting result")
response = [CMD_VALUE_RETURNED, 0x00, 0x1C]
for count in self.total_count:
response.extend(
@ -257,12 +254,12 @@ class PelicanSimulator:
]
)
total_coins = sum(self.total_count)
self.logger.info(f"Total count: {total_coins} total coins since last reset")
logger.info(f"Total count: {total_coins} total coins since last reset")
return self.create_message(bytes(response))
# Variable 0x31 - Software version
elif var_num == 0x31:
self.logger.info("Responding with software version information")
logger.info("Responding with software version information")
response = [CMD_VALUE_RETURNED, 0x00, 0x31]
# SW version
response.extend(
@ -291,14 +288,14 @@ class PelicanSimulator:
self.host_version & 0xFF,
]
)
self.logger.info(
logger.info(
f"SW Version: {self.sw_version:08X}, SW Code: {self.sw_code:08X}, Host Version: {self.host_version:08X}"
)
return self.create_message(bytes(response))
# Variable 0x33 - Machine status
elif var_num == 0x33:
self.logger.info("Responding with machine status")
logger.info("Responding with machine status")
status_flags_1 = 0x00
if self.motor_running:
status_flags_1 |= 0x01
@ -346,12 +343,12 @@ class PelicanSimulator:
status_desc.append("keyboard locked")
status_str = ", ".join(status_desc) if status_desc else "all unlocked"
self.logger.info(f"Machine status: {state_desc}, {status_str}")
logger.info(f"Machine status: {state_desc}, {status_str}")
return self.create_message(bytes(response))
# Variable 0x1F - Get denomination values
elif var_num == 0x1F:
self.logger.info("Responding with coin denomination values")
logger.info("Responding with coin denomination values")
response = [CMD_VALUE_RETURNED, 0x00, 0x1F]
# Add denomination values for all 20 coin types (4 bytes each)
for denomination in self.denominations:
@ -372,18 +369,18 @@ class PelicanSimulator:
else:
formatted_denoms.append(f"Coin{i+1}: 0.{denom:02d}")
self.logger.info(
logger.info(
f"Denomination values (20 coins): {', '.join(formatted_denoms[:8])}"
)
if len(formatted_denoms) > 8:
self.logger.info(
logger.info(
f"Additional denominations: {', '.join(formatted_denoms[8:])}"
)
return self.create_message(bytes(response))
# Variable 0x22 - Coin exit counters (similar to 0x16 but for sorted coins)
elif var_num == 0x22:
self.logger.info("Responding with coin exit counters")
logger.info("Responding with coin exit counters")
# Generate random exit counts for simulation
exit_counts = []
@ -407,7 +404,7 @@ class PelicanSimulator:
)
total_exits = sum(exit_counts)
self.logger.info(f"Coin exit counters: {total_exits} total coins sorted")
logger.info(f"Coin exit counters: {total_exits} total coins sorted")
# Log counts for standard denominations
standard_denoms = [
@ -426,7 +423,7 @@ class PelicanSimulator:
exit_details.append(f"{standard_denoms[i]}: {exit_counts[i]}")
if exit_details:
self.logger.info(f"Exit counts: {', '.join(exit_details)}")
logger.info(f"Exit counts: {', '.join(exit_details)}")
return self.create_message(bytes(response))
@ -434,13 +431,13 @@ class PelicanSimulator:
elif var_num == 0x23:
# Need transaction number (2 bytes) after variable number
if len(data) < 4:
self.logger.error(
logger.error(
"Get transaction data request missing transaction number"
)
return self.create_message(bytes([CMD_VALUE_RETURNED, 0x01]))
transaction_num = (data[2] << 8) | data[3]
self.logger.info(
logger.info(
f"Responding with transaction data for transaction #{transaction_num}"
)
@ -498,7 +495,7 @@ class PelicanSimulator:
# Note amount - currency 1 (4 bytes) / Account number low for CDS
response.extend([0x00, 0x00, 0x00, 0x00])
self.logger.info(
logger.info(
f"Transaction #{transaction_num}: Cashier {cashier_num}, "
f"Date {current_time.tm_year}/{current_time.tm_mon}/{current_time.tm_mday} "
f"{current_time.tm_hour:02d}:{current_time.tm_min:02d}, "
@ -508,7 +505,7 @@ class PelicanSimulator:
return self.create_message(bytes(response))
else:
self.logger.warning(f"Unknown variable number: 0x{var_num:02X}")
logger.warning(f"Unknown variable number: 0x{var_num:02X}")
return self.create_message(bytes([CMD_VALUE_RETURNED, 0x01]))
def handle_get_display(self, data: bytes) -> bytes:
@ -554,7 +551,7 @@ class PelicanSimulator:
new_line[position + i] = c
self.display_line2 = "".join(new_line)
self.logger.info(
logger.info(
f"Display updated: L1='{self.display_line1}' L2='{self.display_line2}'"
)
@ -569,7 +566,7 @@ class PelicanSimulator:
return self.create_message(bytes([CMD_RESPONSE_LOCK_DISPLAY, 0x01]))
self.display_locked = data[1] == 0x01
self.logger.info(f"Display {'locked' if self.display_locked else 'unlocked'}")
logger.info(f"Display {'locked' if self.display_locked else 'unlocked'}")
return self.create_message(bytes([CMD_RESPONSE_LOCK_DISPLAY, 0x00]))
@ -579,7 +576,7 @@ class PelicanSimulator:
return self.create_message(bytes([CMD_RESPONSE_LOCK_KEYBOARD, 0x01]))
self.keyboard_locked = data[1] == 0x01
self.logger.info(f"Keyboard {'locked' if self.keyboard_locked else 'unlocked'}")
logger.info(f"Keyboard {'locked' if self.keyboard_locked else 'unlocked'}")
return self.create_message(bytes([CMD_RESPONSE_LOCK_KEYBOARD, 0x00]))
@ -590,7 +587,7 @@ class PelicanSimulator:
cmd = data[0]
self.logger.info(f"Received command: 0x{cmd:02X}")
logger.info(f"Received command: 0x{cmd:02X}")
if cmd == CMD_CONSTRUCT_LINK:
return self.handle_construct_link(data)
@ -607,12 +604,12 @@ class PelicanSimulator:
elif cmd == CMD_LOCK_KEYBOARD:
return self.handle_lock_keyboard(data)
else:
self.logger.warning(f"Unknown command: 0x{cmd:02X}")
logger.warning(f"Unknown command: 0x{cmd:02X}")
return None
def run(self):
"""Main simulator loop"""
self.logger.info(
logger.info(
f"Starting Pelican simulator on {self.port} at {self.baudrate} baud"
)
@ -626,7 +623,7 @@ class PelicanSimulator:
timeout=1,
)
self.logger.info("Serial port opened successfully")
logger.info("Serial port opened successfully")
buffer = bytearray()
@ -643,14 +640,18 @@ class PelicanSimulator:
message = bytes(buffer[stx_idx : etx_idx + 1])
buffer = buffer[etx_idx + 1 :]
self.logger.debug(f"RX: {' '.join(f'{b:02X}' for b in message)}")
logger.debug(
f"RX: {' '.join(f'{b:02X}' for b in message)}"
)
# Parse and handle message
parsed_data = self.parse_message(message)
if parsed_data:
response = self.handle_command(parsed_data)
if response:
self.logger.debug(f"TX: {' '.join(f'{b:02X}' for b in response)}")
logger.debug(
f"TX: {' '.join(f'{b:02X}' for b in response)}"
)
self.serial_conn.write(response)
except ValueError:
pass # ETX not found yet
@ -658,32 +659,10 @@ class PelicanSimulator:
time.sleep(0.01)
except KeyboardInterrupt:
self.logger.info("Simulator stopped by user")
logger.info("Simulator stopped by user")
except Exception as e:
self.logger.error(f"Error: {e}", exc_info=True)
logger.error(f"Error: {e}", exc_info=True)
finally:
if self.serial_conn and self.serial_conn.is_open:
self.serial_conn.close()
self.logger.info("Serial port closed")
def main():
parser = argparse.ArgumentParser(description="Pelican Device Simulator")
parser.add_argument(
"--port",
"-p",
default="/dev/ttyUSB0",
help="Serial port (default: /dev/ttyUSB0)",
)
parser.add_argument(
"--baudrate", "-b", type=int, default=9600, help="Baud rate (default: 9600)"
)
args = parser.parse_args()
simulator = PelicanSimulator(args.port, args.baudrate)
simulator.run()
if __name__ == "__main__":
main()
logger.info("Serial port closed")

37
uv.lock generated
View File

@ -7,11 +7,37 @@ name = "coin-counter-simulators"
version = "0.1.0"
source = { virtual = "." }
dependencies = [
{ name = "loguru" },
{ name = "pyserial" },
]
[package.metadata]
requires-dist = [{ name = "pyserial", specifier = ">=3.5" }]
requires-dist = [
{ name = "loguru", specifier = ">=0.7.3" },
{ name = "pyserial", specifier = ">=3.5" },
]
[[package]]
name = "colorama"
version = "0.4.6"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/d8/53/6f443c9a4a8358a93a6792e2acffb9d9d5cb0a5cfd8802644b7b1c9a02e4/colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44", size = 27697, upload-time = "2022-10-25T02:36:22.414Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/d1/d6/3965ed04c63042e047cb6a3e6ed1a63a35087b6a609aa3a15ed8ac56c221/colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6", size = 25335, upload-time = "2022-10-25T02:36:20.889Z" },
]
[[package]]
name = "loguru"
version = "0.7.3"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "colorama", marker = "sys_platform == 'win32'" },
{ name = "win32-setctime", marker = "sys_platform == 'win32'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/3a/05/a1dae3dffd1116099471c643b8924f5aa6524411dc6c63fdae648c4f1aca/loguru-0.7.3.tar.gz", hash = "sha256:19480589e77d47b8d85b2c827ad95d49bf31b0dcde16593892eb51dd18706eb6", size = 63559, upload-time = "2024-12-06T11:20:56.608Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/0c/29/0348de65b8cc732daa3e33e67806420b2ae89bdce2b04af740289c5c6c8c/loguru-0.7.3-py3-none-any.whl", hash = "sha256:31a33c10c8e1e10422bfd431aeb5d351c7cf7fa671e3c4df004162264b28220c", size = 61595, upload-time = "2024-12-06T11:20:54.538Z" },
]
[[package]]
name = "pyserial"
@ -21,3 +47,12 @@ sdist = { url = "https://files.pythonhosted.org/packages/1e/7d/ae3f0a63f41e4d2f6
wheels = [
{ url = "https://files.pythonhosted.org/packages/07/bc/587a445451b253b285629263eb51c2d8e9bcea4fc97826266d186f96f558/pyserial-3.5-py2.py3-none-any.whl", hash = "sha256:c4451db6ba391ca6ca299fb3ec7bae67a5c55dde170964c7a14ceefec02f2cf0", size = 90585, upload-time = "2020-11-23T03:59:13.41Z" },
]
[[package]]
name = "win32-setctime"
version = "1.2.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/b3/8f/705086c9d734d3b663af0e9bb3d4de6578d08f46b1b101c2442fd9aecaa2/win32_setctime-1.2.0.tar.gz", hash = "sha256:ae1fdf948f5640aae05c511ade119313fb6a30d7eabe25fef9764dca5873c4c0", size = 4867, upload-time = "2024-12-07T15:28:28.314Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/e1/07/c6fe3ad3e685340704d314d765b7912993bcb8dc198f0e7a89382d37974b/win32_setctime-1.2.0-py3-none-any.whl", hash = "sha256:95d644c4e708aba81dc3704a116d8cbc974d70b3bdb8be1d150e36be6e9d1390", size = 4083, upload-time = "2024-12-07T15:28:26.465Z" },
]