#!/usr/bin/env python3 """ Glory MACH6 Device Simulator Simulates a Glory MACH6 coin/currency counter for development/testing purposes. Implements the Enhanced Serial Port Mode with full command support. """ import random import time from typing import Optional import serial from loguru import logger from source.common import format_comm_debug # Protocol constants STX = 0x02 ETX = 0x03 ACK = 0x06 NAK = 0x15 # Machine states STATE_IDLE = "OK " STATE_ERROR = "E" STATE_BAG_STOP = "BAG" STATE_RUNNING = "RUN" STATE_MESSAGE_PENDING = "PRN" class GlorySimulator: def __init__(self, port: str, baudrate: int = 9600): self.port = port self.baudrate = baudrate self.serial_conn: Optional[serial.Serial] = None # Device state self.motor_running = False self.display_locked = False self.keyboard_locked = False self.motor_has_run = False # Count mode: "$0" for dollars, "U0" for units self.count_mode = "$0" # Coin/currency counters self.partial_counts = {} # denomination -> count self.batch_counts = {} self.sub_counts = {} self.grand_counts = {} # ID numbers self.batch_id = "" self.sub_id = "" self.grand_id = "" # Bag stops self.bag_stops = {} # denomination -> stop value self.bag_stop_counts = {} # denomination -> number of stops # Station/denomination values (coin types) self.station_values = { 1: 0.11, 2: 0.10, 3: 0.01, 4: 0.05, 5: 0.25, 6: 1.00, 7: 0.50, 8: 0.08, 9: 0.00, } # Error state self.error_code = None # Message buffer self.pending_message = None def get_status(self) -> str: """Get current machine status""" if self.pending_message: return STATE_MESSAGE_PENDING if self.error_code: return f"{STATE_ERROR}{self.error_code:02d}" if self.motor_running: return STATE_RUNNING # Check for bag stops for denom, stop_value in self.bag_stops.items(): if denom in self.batch_counts: if self.batch_counts[denom] >= stop_value: return f"{STATE_BAG_STOP}{denom:03d}" # Keys locked/unlocked status if self.keyboard_locked: motor_flag = "1" if self.motor_has_run else "0" return f"OU{motor_flag}" else: motor_flag = "1" if self.motor_has_run else "0" return f"OK{motor_flag}" def handle_status_command(self, data: bytes) -> bytes: """Handle SS - Status command""" status = self.get_status() response = f"ST{self.count_mode}{status}" logger.info(f"Status request - Response: {response}") return self.create_message(response) def handle_clear_command(self, data: bytes) -> bytes: """Handle clear commands (CC, CB, CS, CG)""" cmd = data.decode("ascii", errors="ignore").strip() if cmd == "CB": # Clear batch self.batch_counts = {} self.batch_id = "" logger.info("Batch counts cleared") elif cmd == "CS": # Clear subtotal self.sub_counts = {} self.sub_id = "" logger.info("Subtotal counts cleared") elif cmd == "CG": # Clear grand total self.grand_counts = {} self.grand_id = "" logger.info("Grand total counts cleared") elif cmd.startswith("CC"): # Clear partial count denom_str = cmd[2:] if denom_str in self.partial_counts: del self.partial_counts[denom_str] logger.info(f"Partial count for {denom_str} cleared") status = self.get_status() response = f"ST{self.count_mode}{status}" return self.create_message(response) def handle_get_data_command(self, data: bytes) -> bytes: """Handle get data commands (GD, GT, GS, GG, GI, GV)""" cmd = data.decode("ascii", errors="ignore").strip() if cmd == "GT": # Get batch total (monetary value) # For simulator: reset and generate new counts for each GT request # This simulates a fresh batch counting session logger.info("GT received - resetting batch and simulating new count") self.batch_counts = {} self._simulate_counting() # Calculate total monetary value (sum of count × denomination for all coins) total_value = sum( count * int(denom) for denom, count in self.batch_counts.items() ) response = f"BT{total_value:08d}" logger.info(f"Get batch total: ${total_value/100:.2f}") return self.create_message(response) elif cmd == "GS": # Get subtotal (monetary value) total_value = sum( count * int(denom) for denom, count in self.sub_counts.items() ) response = f"BS{total_value:08d}" logger.info(f"Get subtotal: ${total_value/100:.2f}") return self.create_message(response) elif cmd == "GG": # Get grand total (monetary value) total_value = sum( count * int(denom) for denom, count in self.grand_counts.items() ) response = f"BG{total_value:08d}" logger.info(f"Get grand total: ${total_value/100:.2f}") return self.create_message(response) elif cmd.startswith("GD"): # Get partial count data denom_str = cmd[2:] count = self.partial_counts.get(denom_str, 0) response = f"BD{count:08d}" logger.info(f"Get partial count for {denom_str}: {count} coins") return self.create_message(response) elif cmd.startswith("GT") and len(cmd) > 2: # Get batch value of denomination (monetary value, not count) denom_str = cmd[2:] count = self.batch_counts.get(denom_str, 0) # Calculate monetary value: count × denomination (in cents) denom_value = int(denom_str) # e.g., "001" = 1 cent, "025" = 25 cents monetary_value = count * denom_value response = f"BT{monetary_value:08d}" logger.info( f"Get batch value for {denom_str}: {count} coins × ${denom_value/100:.2f} = ${monetary_value/100:.2f}" ) return self.create_message(response) elif cmd == "GI": # Get product totals (ID totals) response = "" # Return empty if no totals logger.info("Get product totals (empty)") return self.create_message(response) elif cmd == "GV": # Get station value table response = "" for station, value in sorted(self.station_values.items()): active = "i" if value == 0 else str(station) response += f"{active} {value:.2f} \r" logger.info("Get station values") return self.create_message(response) return self.create_message("") def handle_bag_stop_command(self, data: bytes) -> bytes: """Handle SV - Set bag stop command""" cmd = data.decode("ascii", errors="ignore").strip() if len(cmd) >= 13 and cmd.startswith("SV"): denom_str = cmd[2:5] value_str = cmd[5:13] try: value = int(value_str) self.bag_stops[denom_str] = value logger.info(f"Bag stop set for {denom_str}: {value}") except ValueError: logger.error(f"Invalid bag stop value: {value_str}") status = self.get_status() response = f"ST{self.count_mode}{status}" return self.create_message(response) def handle_motor_command(self, data: bytes) -> bytes: """Handle MG/MS - Motor control commands""" cmd = data.decode("ascii", errors="ignore").strip() if cmd == "MG": # Start motor - reset batch counts for new counting session self.motor_running = True self.motor_has_run = True self.batch_counts = {} # Reset batch for new counting session logger.info("Motor started - batch counts reset") # Simulate coin counting self._simulate_counting() elif cmd == "MS": # Stop motor self.motor_running = False logger.info("Motor stopped") status = self.get_status() response = f"ST{self.count_mode}{status}" return self.create_message(response) def handle_beep_command(self, data: bytes) -> bytes: """Handle BB - Beep command""" logger.info("Beep!") status = self.get_status() response = f"ST{self.count_mode}{status}" return self.create_message(response) def handle_accept_command(self, data: bytes) -> bytes: """Handle AB/AS/AG - Accept commands""" cmd = data.decode("ascii", errors="ignore").strip() if cmd == "AB" or cmd == "Ab": # Accept batch (monetary value) total_value = sum( count * int(denom) for denom, count in self.batch_counts.items() ) response = f"BT{total_value:08d}" logger.info(f"Accept batch: ${total_value/100:.2f}") self.pending_message = response return self.create_message(response) elif cmd == "AS" or cmd == "As": # Accept subtotal (monetary value) total_value = sum( count * int(denom) for denom, count in self.sub_counts.items() ) response = f"BS{total_value:08d}" logger.info(f"Accept subtotal: ${total_value/100:.2f}") self.pending_message = response return self.create_message(response) elif cmd == "AG" or cmd == "Ag": # Accept grand total (monetary value) total_value = sum( count * int(denom) for denom, count in self.grand_counts.items() ) response = f"BG{total_value:08d}" logger.info(f"Accept grand total: ${total_value/100:.2f}") self.pending_message = response return self.create_message(response) return self.create_message("") def handle_partial_count_command(self, data: bytes) -> bytes: """Handle PC - Set partial count command""" cmd = data.decode("ascii", errors="ignore").strip() if len(cmd) >= 13 and cmd.startswith("PC"): denom_str = cmd[2:5] value_str = cmd[5:13] try: value = int(value_str) self.partial_counts[denom_str] = value logger.info(f"Partial count set for {denom_str}: {value}") except ValueError: logger.error(f"Invalid partial count value: {value_str}") status = self.get_status() response = f"ST{self.count_mode}{status}" return self.create_message(response) def handle_id_command(self, data: bytes) -> bytes: """Handle ID/IS/IG - Set ID number commands""" cmd = data.decode("ascii", errors="ignore").strip() if cmd.startswith("ID"): self.batch_id = cmd[2:14] if len(cmd) >= 14 else cmd[2:] logger.info(f"Batch ID set: {self.batch_id}") elif cmd.startswith("IS"): self.sub_id = cmd[2:14] if len(cmd) >= 14 else cmd[2:] logger.info(f"Sub ID set: {self.sub_id}") elif cmd.startswith("IG"): self.grand_id = cmd[2:14] if len(cmd) >= 14 else cmd[2:] logger.info(f"Grand ID set: {self.grand_id}") status = self.get_status() response = f"ST{self.count_mode}{status}" return self.create_message(response) def _simulate_counting(self): """Simulate coin counting when motor runs""" # Generate random counts for simulation # Standard denominations with realistic count distributions # All ranges start at 1 to ensure non-zero counts denominations = { "001": (20, 150), # Pennies - high count "005": (10, 100), # Nickels "010": (10, 80), # Dimes "025": (5, 60), # Quarters "050": (1, 20), # Half dollars - rare (changed from 0) "100": (1, 10), # Dollar coins - rare (changed from 0) } for denom, (min_count, max_count) in denominations.items(): count = random.randint(min_count, max_count) # Update batch counts self.batch_counts[denom] = self.batch_counts.get(denom, 0) + count # Update partial counts self.partial_counts[denom] = self.partial_counts.get(denom, 0) + count # Update subtotal self.sub_counts[denom] = self.sub_counts.get(denom, 0) + count # Update grand total self.grand_counts[denom] = self.grand_counts.get(denom, 0) + count logger.info(f"Counted {count} coins of denomination {denom}") # Log total value counted total_value = sum( self.batch_counts.get(denom, 0) * int(denom) for denom in denominations.keys() ) logger.info(f"Total value counted in this batch: ${total_value/100:.2f}") def create_message(self, data: str) -> bytes: """Create a message with STX and ETX""" return bytes([STX]) + data.encode("ascii") + bytes([ETX]) def parse_message(self, message: bytes) -> Optional[bytes]: """Parse and validate a received message""" if len(message) < 3: logger.error("Message too short") return None if message[0] != STX: logger.error("Invalid STX") return None if message[-1] != ETX: logger.error("Invalid ETX") return None data = message[1:-1] return data def handle_command(self, data: bytes) -> Optional[bytes]: """Route command to appropriate handler""" if len(data) < 2: return None try: cmd_str = data.decode("ascii", errors="ignore").strip() cmd = cmd_str[:2] logger.info(f"Received command: {cmd_str}") if cmd == "SS": return self.handle_status_command(data) elif cmd in ["CB", "CS", "CG"] or cmd_str.startswith("CC"): return self.handle_clear_command(data) elif cmd in ["GD", "GT", "GS", "GG", "GI", "GV"]: return self.handle_get_data_command(data) elif cmd == "SV": return self.handle_bag_stop_command(data) elif cmd in ["MG", "MS"]: return self.handle_motor_command(data) elif cmd == "BB": return self.handle_beep_command(data) elif cmd in ["AB", "AS", "AG"] or cmd_str[:2] in ["Ab", "As", "Ag"]: return self.handle_accept_command(data) elif cmd == "PC": return self.handle_partial_count_command(data) elif cmd in ["ID", "IS", "IG"]: return self.handle_id_command(data) else: logger.warning(f"Unknown command: {cmd}") return None except Exception as e: logger.error(f"Error handling command: {e}", exc_info=True) return None def run(self): """Main simulator loop""" logger.info( f"Starting Glory MACH6 simulator on {self.port} at {self.baudrate} baud" ) logger.info("Enhanced Serial Port Mode enabled") try: self.serial_conn = serial.Serial( port=self.port, baudrate=self.baudrate, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=1, ) logger.info("Serial port opened successfully") buffer = bytearray() while True: # Handle ACK/NAK for pending messages if self.pending_message: # Wait for ACK or NAK if self.serial_conn.in_waiting > 0: byte_data = self.serial_conn.read(1) if byte_data[0] == ACK: logger.info("Received ACK - clearing pending message") # Clear batch after ACK if "BT" in self.pending_message: self.batch_counts = {} self.pending_message = None elif byte_data[0] == NAK: logger.info("Received NAK - retransmitting") response = self.create_message(self.pending_message) self.serial_conn.write(response) time.sleep(0.01) continue if self.serial_conn.in_waiting > 0: data = self.serial_conn.read(self.serial_conn.in_waiting) buffer.extend(data) # Look for complete message (STX...ETX) if STX in buffer and ETX in buffer: stx_idx = buffer.index(STX) try: etx_idx = buffer.index(ETX, stx_idx) message = bytes(buffer[stx_idx : etx_idx + 1]) buffer = buffer[etx_idx + 1 :] # logger.debug(f"RX: {' '.join(f'{b:02X}' for b in message)}") logger.debug( format_comm_debug("RX", message, include_ascii=True) ) # Parse and handle message parsed_data = self.parse_message(message) if parsed_data: response = self.handle_command(parsed_data) if response: logger.debug( format_comm_debug( "TX", response, include_ascii=True ) ) self.serial_conn.write(response) except ValueError: pass # ETX not found yet time.sleep(0.01) except KeyboardInterrupt: logger.info("Simulator stopped by user") except Exception as e: logger.error(f"Error: {e}", exc_info=True) finally: if self.serial_conn and self.serial_conn.is_open: self.serial_conn.close() logger.info("Serial port closed")