Files
2025-11-10 09:11:56 +01:00

513 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Glory MACH6 Device Simulator
Simulates a Glory MACH6 coin/currency counter for development/testing purposes.
Implements the Enhanced Serial Port Mode with full command support.
"""
import random
import time
from typing import Optional
import serial
from loguru import logger
from source.common import format_comm_debug
# Protocol constants
STX = 0x02
ETX = 0x03
ACK = 0x06
NAK = 0x15
# Machine states
STATE_IDLE = "OK "
STATE_ERROR = "E"
STATE_BAG_STOP = "BAG"
STATE_RUNNING = "RUN"
STATE_MESSAGE_PENDING = "PRN"
class GlorySimulator:
def __init__(self, port: str, baudrate: int = 9600):
self.port = port
self.baudrate = baudrate
self.serial_conn: Optional[serial.Serial] = None
# Device state
self.motor_running = False
self.display_locked = False
self.keyboard_locked = False
self.motor_has_run = False
# Count mode: "$0" for dollars, "U0" for units
self.count_mode = "$0"
# Coin/currency counters
self.partial_counts = {} # denomination -> count
self.batch_counts = {}
self.sub_counts = {}
self.grand_counts = {}
# ID numbers
self.batch_id = ""
self.sub_id = ""
self.grand_id = ""
# Bag stops
self.bag_stops = {} # denomination -> stop value
self.bag_stop_counts = {} # denomination -> number of stops
# Station/denomination values (coin types)
self.station_values = {
1: 0.11,
2: 0.10,
3: 0.01,
4: 0.05,
5: 0.25,
6: 1.00,
7: 0.50,
8: 0.08,
9: 0.00,
}
# Error state
self.error_code = None
# Message buffer
self.pending_message = None
def get_status(self) -> str:
"""Get current machine status"""
if self.pending_message:
return STATE_MESSAGE_PENDING
if self.error_code:
return f"{STATE_ERROR}{self.error_code:02d}"
if self.motor_running:
return STATE_RUNNING
# Check for bag stops
for denom, stop_value in self.bag_stops.items():
if denom in self.batch_counts:
if self.batch_counts[denom] >= stop_value:
return f"{STATE_BAG_STOP}{denom:03d}"
# Keys locked/unlocked status
if self.keyboard_locked:
motor_flag = "1" if self.motor_has_run else "0"
return f"OU{motor_flag}"
else:
motor_flag = "1" if self.motor_has_run else "0"
return f"OK{motor_flag}"
def handle_status_command(self, data: bytes) -> bytes:
"""Handle SS - Status command"""
status = self.get_status()
response = f"ST{self.count_mode}{status}"
logger.info(f"Status request - Response: {response}")
return self.create_message(response)
def handle_clear_command(self, data: bytes) -> bytes:
"""Handle clear commands (CC, CB, CS, CG)"""
cmd = data.decode("ascii", errors="ignore").strip()
if cmd == "CB":
# Clear batch
self.batch_counts = {}
self.batch_id = ""
logger.info("Batch counts cleared")
elif cmd == "CS":
# Clear subtotal
self.sub_counts = {}
self.sub_id = ""
logger.info("Subtotal counts cleared")
elif cmd == "CG":
# Clear grand total
self.grand_counts = {}
self.grand_id = ""
logger.info("Grand total counts cleared")
elif cmd.startswith("CC"):
# Clear partial count
denom_str = cmd[2:]
if denom_str in self.partial_counts:
del self.partial_counts[denom_str]
logger.info(f"Partial count for {denom_str} cleared")
status = self.get_status()
response = f"ST{self.count_mode}{status}"
return self.create_message(response)
def handle_get_data_command(self, data: bytes) -> bytes:
"""Handle get data commands (GD, GT, GS, GG, GI, GV)"""
cmd = data.decode("ascii", errors="ignore").strip()
if cmd == "GT":
# Get batch total (monetary value)
# For simulator: reset and generate new counts for each GT request
# This simulates a fresh batch counting session
logger.info("GT received - resetting batch and simulating new count")
self.batch_counts = {}
self._simulate_counting()
# Calculate total monetary value (sum of count × denomination for all coins)
total_value = sum(
count * int(denom) for denom, count in self.batch_counts.items()
)
response = f"BT{total_value:08d}"
logger.info(f"Get batch total: ${total_value/100:.2f}")
return self.create_message(response)
elif cmd == "GS":
# Get subtotal (monetary value)
total_value = sum(
count * int(denom) for denom, count in self.sub_counts.items()
)
response = f"BS{total_value:08d}"
logger.info(f"Get subtotal: ${total_value/100:.2f}")
return self.create_message(response)
elif cmd == "GG":
# Get grand total (monetary value)
total_value = sum(
count * int(denom) for denom, count in self.grand_counts.items()
)
response = f"BG{total_value:08d}"
logger.info(f"Get grand total: ${total_value/100:.2f}")
return self.create_message(response)
elif cmd.startswith("GD"):
# Get partial count data
denom_str = cmd[2:]
count = self.partial_counts.get(denom_str, 0)
response = f"BD{count:08d}"
logger.info(f"Get partial count for {denom_str}: {count} coins")
return self.create_message(response)
elif cmd.startswith("GT") and len(cmd) > 2:
# Get batch value of denomination (monetary value, not count)
denom_str = cmd[2:]
count = self.batch_counts.get(denom_str, 0)
# Calculate monetary value: count × denomination (in cents)
denom_value = int(denom_str) # e.g., "001" = 1 cent, "025" = 25 cents
monetary_value = count * denom_value
response = f"BT{monetary_value:08d}"
logger.info(
f"Get batch value for {denom_str}: {count} coins × ${denom_value/100:.2f} = ${monetary_value/100:.2f}"
)
return self.create_message(response)
elif cmd == "GI":
# Get product totals (ID totals)
response = ""
# Return empty if no totals
logger.info("Get product totals (empty)")
return self.create_message(response)
elif cmd == "GV":
# Get station value table
response = ""
for station, value in sorted(self.station_values.items()):
active = "i" if value == 0 else str(station)
response += f"{active} {value:.2f} \r"
logger.info("Get station values")
return self.create_message(response)
return self.create_message("")
def handle_bag_stop_command(self, data: bytes) -> bytes:
"""Handle SV - Set bag stop command"""
cmd = data.decode("ascii", errors="ignore").strip()
if len(cmd) >= 13 and cmd.startswith("SV"):
denom_str = cmd[2:5]
value_str = cmd[5:13]
try:
value = int(value_str)
self.bag_stops[denom_str] = value
logger.info(f"Bag stop set for {denom_str}: {value}")
except ValueError:
logger.error(f"Invalid bag stop value: {value_str}")
status = self.get_status()
response = f"ST{self.count_mode}{status}"
return self.create_message(response)
def handle_motor_command(self, data: bytes) -> bytes:
"""Handle MG/MS - Motor control commands"""
cmd = data.decode("ascii", errors="ignore").strip()
if cmd == "MG":
# Start motor - reset batch counts for new counting session
self.motor_running = True
self.motor_has_run = True
self.batch_counts = {} # Reset batch for new counting session
logger.info("Motor started - batch counts reset")
# Simulate coin counting
self._simulate_counting()
elif cmd == "MS":
# Stop motor
self.motor_running = False
logger.info("Motor stopped")
status = self.get_status()
response = f"ST{self.count_mode}{status}"
return self.create_message(response)
def handle_beep_command(self, data: bytes) -> bytes:
"""Handle BB - Beep command"""
logger.info("Beep!")
status = self.get_status()
response = f"ST{self.count_mode}{status}"
return self.create_message(response)
def handle_accept_command(self, data: bytes) -> bytes:
"""Handle AB/AS/AG - Accept commands"""
cmd = data.decode("ascii", errors="ignore").strip()
if cmd == "AB" or cmd == "Ab":
# Accept batch (monetary value)
total_value = sum(
count * int(denom) for denom, count in self.batch_counts.items()
)
response = f"BT{total_value:08d}"
logger.info(f"Accept batch: ${total_value/100:.2f}")
self.pending_message = response
return self.create_message(response)
elif cmd == "AS" or cmd == "As":
# Accept subtotal (monetary value)
total_value = sum(
count * int(denom) for denom, count in self.sub_counts.items()
)
response = f"BS{total_value:08d}"
logger.info(f"Accept subtotal: ${total_value/100:.2f}")
self.pending_message = response
return self.create_message(response)
elif cmd == "AG" or cmd == "Ag":
# Accept grand total (monetary value)
total_value = sum(
count * int(denom) for denom, count in self.grand_counts.items()
)
response = f"BG{total_value:08d}"
logger.info(f"Accept grand total: ${total_value/100:.2f}")
self.pending_message = response
return self.create_message(response)
return self.create_message("")
def handle_partial_count_command(self, data: bytes) -> bytes:
"""Handle PC - Set partial count command"""
cmd = data.decode("ascii", errors="ignore").strip()
if len(cmd) >= 13 and cmd.startswith("PC"):
denom_str = cmd[2:5]
value_str = cmd[5:13]
try:
value = int(value_str)
self.partial_counts[denom_str] = value
logger.info(f"Partial count set for {denom_str}: {value}")
except ValueError:
logger.error(f"Invalid partial count value: {value_str}")
status = self.get_status()
response = f"ST{self.count_mode}{status}"
return self.create_message(response)
def handle_id_command(self, data: bytes) -> bytes:
"""Handle ID/IS/IG - Set ID number commands"""
cmd = data.decode("ascii", errors="ignore").strip()
if cmd.startswith("ID"):
self.batch_id = cmd[2:14] if len(cmd) >= 14 else cmd[2:]
logger.info(f"Batch ID set: {self.batch_id}")
elif cmd.startswith("IS"):
self.sub_id = cmd[2:14] if len(cmd) >= 14 else cmd[2:]
logger.info(f"Sub ID set: {self.sub_id}")
elif cmd.startswith("IG"):
self.grand_id = cmd[2:14] if len(cmd) >= 14 else cmd[2:]
logger.info(f"Grand ID set: {self.grand_id}")
status = self.get_status()
response = f"ST{self.count_mode}{status}"
return self.create_message(response)
def _simulate_counting(self):
"""Simulate coin counting when motor runs"""
# Generate random counts for simulation
# Standard denominations with realistic count distributions
# All ranges start at 1 to ensure non-zero counts
denominations = {
"001": (20, 150), # Pennies - high count
"005": (10, 100), # Nickels
"010": (10, 80), # Dimes
"025": (5, 60), # Quarters
"050": (1, 20), # Half dollars - rare (changed from 0)
"100": (1, 10), # Dollar coins - rare (changed from 0)
}
for denom, (min_count, max_count) in denominations.items():
count = random.randint(min_count, max_count)
# Update batch counts
self.batch_counts[denom] = self.batch_counts.get(denom, 0) + count
# Update partial counts
self.partial_counts[denom] = self.partial_counts.get(denom, 0) + count
# Update subtotal
self.sub_counts[denom] = self.sub_counts.get(denom, 0) + count
# Update grand total
self.grand_counts[denom] = self.grand_counts.get(denom, 0) + count
logger.info(f"Counted {count} coins of denomination {denom}")
# Log total value counted
total_value = sum(
self.batch_counts.get(denom, 0) * int(denom)
for denom in denominations.keys()
)
logger.info(f"Total value counted in this batch: ${total_value/100:.2f}")
def create_message(self, data: str) -> bytes:
"""Create a message with STX and ETX"""
return bytes([STX]) + data.encode("ascii") + bytes([ETX])
def parse_message(self, message: bytes) -> Optional[bytes]:
"""Parse and validate a received message"""
if len(message) < 3:
logger.error("Message too short")
return None
if message[0] != STX:
logger.error("Invalid STX")
return None
if message[-1] != ETX:
logger.error("Invalid ETX")
return None
data = message[1:-1]
return data
def handle_command(self, data: bytes) -> Optional[bytes]:
"""Route command to appropriate handler"""
if len(data) < 2:
return None
try:
cmd_str = data.decode("ascii", errors="ignore").strip()
cmd = cmd_str[:2]
logger.info(f"Received command: {cmd_str}")
if cmd == "SS":
return self.handle_status_command(data)
elif cmd in ["CB", "CS", "CG"] or cmd_str.startswith("CC"):
return self.handle_clear_command(data)
elif cmd in ["GD", "GT", "GS", "GG", "GI", "GV"]:
return self.handle_get_data_command(data)
elif cmd == "SV":
return self.handle_bag_stop_command(data)
elif cmd in ["MG", "MS"]:
return self.handle_motor_command(data)
elif cmd == "BB":
return self.handle_beep_command(data)
elif cmd in ["AB", "AS", "AG"] or cmd_str[:2] in ["Ab", "As", "Ag"]:
return self.handle_accept_command(data)
elif cmd == "PC":
return self.handle_partial_count_command(data)
elif cmd in ["ID", "IS", "IG"]:
return self.handle_id_command(data)
else:
logger.warning(f"Unknown command: {cmd}")
return None
except Exception as e:
logger.error(f"Error handling command: {e}", exc_info=True)
return None
def run(self):
"""Main simulator loop"""
logger.info(
f"Starting Glory MACH6 simulator on {self.port} at {self.baudrate} baud"
)
logger.info("Enhanced Serial Port Mode enabled")
try:
self.serial_conn = serial.Serial(
port=self.port,
baudrate=self.baudrate,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=1,
)
logger.info("Serial port opened successfully")
buffer = bytearray()
while True:
# Handle ACK/NAK for pending messages
if self.pending_message:
# Wait for ACK or NAK
if self.serial_conn.in_waiting > 0:
byte_data = self.serial_conn.read(1)
if byte_data[0] == ACK:
logger.info("Received ACK - clearing pending message")
# Clear batch after ACK
if "BT" in self.pending_message:
self.batch_counts = {}
self.pending_message = None
elif byte_data[0] == NAK:
logger.info("Received NAK - retransmitting")
response = self.create_message(self.pending_message)
self.serial_conn.write(response)
time.sleep(0.01)
continue
if self.serial_conn.in_waiting > 0:
data = self.serial_conn.read(self.serial_conn.in_waiting)
buffer.extend(data)
# Look for complete message (STX...ETX)
if STX in buffer and ETX in buffer:
stx_idx = buffer.index(STX)
try:
etx_idx = buffer.index(ETX, stx_idx)
message = bytes(buffer[stx_idx : etx_idx + 1])
buffer = buffer[etx_idx + 1 :]
# logger.debug(f"RX: {' '.join(f'{b:02X}' for b in message)}")
logger.debug(
format_comm_debug("RX", message, include_ascii=True)
)
# Parse and handle message
parsed_data = self.parse_message(message)
if parsed_data:
response = self.handle_command(parsed_data)
if response:
logger.debug(
format_comm_debug(
"TX", response, include_ascii=True
)
)
self.serial_conn.write(response)
except ValueError:
pass # ETX not found yet
time.sleep(0.01)
except KeyboardInterrupt:
logger.info("Simulator stopped by user")
except Exception as e:
logger.error(f"Error: {e}", exc_info=True)
finally:
if self.serial_conn and self.serial_conn.is_open:
self.serial_conn.close()
logger.info("Serial port closed")