#!/usr/bin/env python3 """ Glory MACH6 Device Simulator Simulates a Glory MACH6 coin/currency counter for development/testing purposes. Implements the Enhanced Serial Port Mode with full command support. """ import argparse import logging import random import time from typing import Optional from datetime import datetime import serial # Protocol constants STX = 0x02 ETX = 0x03 ACK = 0x06 NAK = 0x15 # Machine states STATE_IDLE = "OK " STATE_ERROR = "E" STATE_BAG_STOP = "BAG" STATE_RUNNING = "RUN" STATE_MESSAGE_PENDING = "PRN" class GlorySimulator: def __init__(self, port: str, baudrate: int = 9600): self.port = port self.baudrate = baudrate self.serial_conn: Optional[serial.Serial] = None # Device state self.motor_running = False self.display_locked = False self.keyboard_locked = False self.motor_has_run = False # Count mode: "$0" for dollars, "U0" for units self.count_mode = "$0" # Coin/currency counters self.partial_counts = {} # denomination -> count self.batch_counts = {} self.sub_counts = {} self.grand_counts = {} # ID numbers self.batch_id = "" self.sub_id = "" self.grand_id = "" # Bag stops self.bag_stops = {} # denomination -> stop value self.bag_stop_counts = {} # denomination -> number of stops # Station/denomination values (coin types) self.station_values = { 1: 0.11, 2: 0.10, 3: 0.01, 4: 0.05, 5: 0.25, 6: 1.00, 7: 0.50, 8: 0.08, 9: 0.00, } # Error state self.error_code = None # Message buffer self.pending_message = None logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) self.logger = logging.getLogger(__name__) def get_status(self) -> str: """Get current machine status""" if self.pending_message: return STATE_MESSAGE_PENDING if self.error_code: return f"{STATE_ERROR}{self.error_code:02d}" if self.motor_running: return STATE_RUNNING # Check for bag stops for denom, stop_value in self.bag_stops.items(): if denom in self.batch_counts: if self.batch_counts[denom] >= stop_value: return f"{STATE_BAG_STOP}{denom:03d}" # Keys locked/unlocked status if self.keyboard_locked: motor_flag = "1" if self.motor_has_run else "0" return f"OU{motor_flag}" else: motor_flag = "1" if self.motor_has_run else "0" return f"OK{motor_flag}" def handle_status_command(self, data: bytes) -> bytes: """Handle SS - Status command""" status = self.get_status() response = f"ST{self.count_mode}{status}" self.logger.info(f"Status request - Response: {response}") return self.create_message(response) def handle_clear_command(self, data: bytes) -> bytes: """Handle clear commands (CC, CB, CS, CG)""" cmd = data.decode('ascii', errors='ignore').strip() if cmd == "CB": # Clear batch self.batch_counts = {} self.batch_id = "" self.logger.info("Batch counts cleared") elif cmd == "CS": # Clear subtotal self.sub_counts = {} self.sub_id = "" self.logger.info("Subtotal counts cleared") elif cmd == "CG": # Clear grand total self.grand_counts = {} self.grand_id = "" self.logger.info("Grand total counts cleared") elif cmd.startswith("CC"): # Clear partial count denom_str = cmd[2:] if denom_str in self.partial_counts: del self.partial_counts[denom_str] self.logger.info(f"Partial count for {denom_str} cleared") status = self.get_status() response = f"ST{self.count_mode}{status}" return self.create_message(response) def handle_get_data_command(self, data: bytes) -> bytes: """Handle get data commands (GD, GT, GS, GG, GI, GV)""" cmd = data.decode('ascii', errors='ignore').strip() if cmd == "GT": # Get batch total total = sum(self.batch_counts.values()) response = f"BT{total:08d}" self.logger.info(f"Get batch total: {total}") return self.create_message(response) elif cmd == "GS": # Get subtotal total = sum(self.sub_counts.values()) response = f"BS{total:08d}" self.logger.info(f"Get subtotal: {total}") return self.create_message(response) elif cmd == "GG": # Get grand total total = sum(self.grand_counts.values()) response = f"BG{total:08d}" self.logger.info(f"Get grand total: {total}") return self.create_message(response) elif cmd.startswith("GD"): # Get partial count data denom_str = cmd[2:] count = self.partial_counts.get(denom_str, 0) response = f"BD{count:08d}" self.logger.info(f"Get partial count for {denom_str}: {count}") return self.create_message(response) elif cmd.startswith("GT") and len(cmd) > 2: # Get batch counts of denomination denom_str = cmd[2:] count = self.batch_counts.get(denom_str, 0) response = f"BT{count:08d}" self.logger.info(f"Get batch count for {denom_str}: {count}") return self.create_message(response) elif cmd == "GI": # Get product totals (ID totals) response = "" # Return empty if no totals self.logger.info("Get product totals (empty)") return self.create_message(response) elif cmd == "GV": # Get station value table response = "" for station, value in sorted(self.station_values.items()): active = "i" if value == 0 else str(station) response += f"{active} {value:.2f} \r" self.logger.info("Get station values") return self.create_message(response) return self.create_message("") def handle_bag_stop_command(self, data: bytes) -> bytes: """Handle SV - Set bag stop command""" cmd = data.decode('ascii', errors='ignore').strip() if len(cmd) >= 13 and cmd.startswith("SV"): denom_str = cmd[2:5] value_str = cmd[5:13] try: value = int(value_str) self.bag_stops[denom_str] = value self.logger.info(f"Bag stop set for {denom_str}: {value}") except ValueError: self.logger.error(f"Invalid bag stop value: {value_str}") status = self.get_status() response = f"ST{self.count_mode}{status}" return self.create_message(response) def handle_motor_command(self, data: bytes) -> bytes: """Handle MG/MS - Motor control commands""" cmd = data.decode('ascii', errors='ignore').strip() if cmd == "MG": # Start motor self.motor_running = True self.motor_has_run = True self.logger.info("Motor started") # Simulate coin counting self._simulate_counting() elif cmd == "MS": # Stop motor self.motor_running = False self.logger.info("Motor stopped") status = self.get_status() response = f"ST{self.count_mode}{status}" return self.create_message(response) def handle_beep_command(self, data: bytes) -> bytes: """Handle BB - Beep command""" self.logger.info("Beep!") status = self.get_status() response = f"ST{self.count_mode}{status}" return self.create_message(response) def handle_accept_command(self, data: bytes) -> bytes: """Handle AB/AS/AG - Accept commands""" cmd = data.decode('ascii', errors='ignore').strip() if cmd == "AB" or cmd == "Ab": # Accept batch total = sum(self.batch_counts.values()) response = f"BT{total:08d}" self.logger.info(f"Accept batch: {total}") self.pending_message = response return self.create_message(response) elif cmd == "AS" or cmd == "As": # Accept subtotal total = sum(self.sub_counts.values()) response = f"BS{total:08d}" self.logger.info(f"Accept subtotal: {total}") self.pending_message = response return self.create_message(response) elif cmd == "AG" or cmd == "Ag": # Accept grand total total = sum(self.grand_counts.values()) response = f"BG{total:08d}" self.logger.info(f"Accept grand total: {total}") self.pending_message = response return self.create_message(response) return self.create_message("") def handle_partial_count_command(self, data: bytes) -> bytes: """Handle PC - Set partial count command""" cmd = data.decode('ascii', errors='ignore').strip() if len(cmd) >= 13 and cmd.startswith("PC"): denom_str = cmd[2:5] value_str = cmd[5:13] try: value = int(value_str) self.partial_counts[denom_str] = value self.logger.info(f"Partial count set for {denom_str}: {value}") except ValueError: self.logger.error(f"Invalid partial count value: {value_str}") status = self.get_status() response = f"ST{self.count_mode}{status}" return self.create_message(response) def handle_id_command(self, data: bytes) -> bytes: """Handle ID/IS/IG - Set ID number commands""" cmd = data.decode('ascii', errors='ignore').strip() if cmd.startswith("ID"): self.batch_id = cmd[2:14] if len(cmd) >= 14 else cmd[2:] self.logger.info(f"Batch ID set: {self.batch_id}") elif cmd.startswith("IS"): self.sub_id = cmd[2:14] if len(cmd) >= 14 else cmd[2:] self.logger.info(f"Sub ID set: {self.sub_id}") elif cmd.startswith("IG"): self.grand_id = cmd[2:14] if len(cmd) >= 14 else cmd[2:] self.logger.info(f"Grand ID set: {self.grand_id}") status = self.get_status() response = f"ST{self.count_mode}{status}" return self.create_message(response) def _simulate_counting(self): """Simulate coin counting when motor runs""" # Generate random counts for simulation denominations = ["005", "010", "025", "050", "100"] for denom in denominations: count = random.randint(10, 100) self.batch_counts[denom] = self.batch_counts.get(denom, 0) + count self.partial_counts[denom] = self.partial_counts.get(denom, 0) + count def create_message(self, data: str) -> bytes: """Create a message with STX and ETX""" return bytes([STX]) + data.encode('ascii') + bytes([ETX]) def parse_message(self, message: bytes) -> Optional[bytes]: """Parse and validate a received message""" if len(message) < 3: self.logger.error("Message too short") return None if message[0] != STX: self.logger.error("Invalid STX") return None if message[-1] != ETX: self.logger.error("Invalid ETX") return None data = message[1:-1] return data def handle_command(self, data: bytes) -> Optional[bytes]: """Route command to appropriate handler""" if len(data) < 2: return None try: cmd_str = data.decode('ascii', errors='ignore').strip() cmd = cmd_str[:2] self.logger.info(f"Received command: {cmd_str}") if cmd == "SS": return self.handle_status_command(data) elif cmd in ["CB", "CS", "CG"] or cmd_str.startswith("CC"): return self.handle_clear_command(data) elif cmd in ["GD", "GT", "GS", "GG", "GI", "GV"]: return self.handle_get_data_command(data) elif cmd == "SV": return self.handle_bag_stop_command(data) elif cmd in ["MG", "MS"]: return self.handle_motor_command(data) elif cmd == "BB": return self.handle_beep_command(data) elif cmd in ["AB", "AS", "AG"] or cmd_str[:2] in ["Ab", "As", "Ag"]: return self.handle_accept_command(data) elif cmd == "PC": return self.handle_partial_count_command(data) elif cmd in ["ID", "IS", "IG"]: return self.handle_id_command(data) else: self.logger.warning(f"Unknown command: {cmd}") return None except Exception as e: self.logger.error(f"Error handling command: {e}", exc_info=True) return None def run(self): """Main simulator loop""" self.logger.info( f"Starting Glory MACH6 simulator on {self.port} at {self.baudrate} baud" ) self.logger.info("Enhanced Serial Port Mode enabled") try: self.serial_conn = serial.Serial( port=self.port, baudrate=self.baudrate, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=1, ) self.logger.info("Serial port opened successfully") buffer = bytearray() while True: # Handle ACK/NAK for pending messages if self.pending_message: # Wait for ACK or NAK if self.serial_conn.in_waiting > 0: byte_data = self.serial_conn.read(1) if byte_data[0] == ACK: self.logger.info("Received ACK - clearing pending message") # Clear batch after ACK if "BT" in self.pending_message: self.batch_counts = {} self.pending_message = None elif byte_data[0] == NAK: self.logger.info("Received NAK - retransmitting") response = self.create_message(self.pending_message) self.serial_conn.write(response) time.sleep(0.01) continue if self.serial_conn.in_waiting > 0: data = self.serial_conn.read(self.serial_conn.in_waiting) buffer.extend(data) # Look for complete message (STX...ETX) if STX in buffer and ETX in buffer: stx_idx = buffer.index(STX) try: etx_idx = buffer.index(ETX, stx_idx) message = bytes(buffer[stx_idx : etx_idx + 1]) buffer = buffer[etx_idx + 1 :] self.logger.debug(f"RX: {message.hex()}") # Parse and handle message parsed_data = self.parse_message(message) if parsed_data: response = self.handle_command(parsed_data) if response: self.logger.debug(f"TX: {response.hex()}") self.serial_conn.write(response) except ValueError: pass # ETX not found yet time.sleep(0.01) except KeyboardInterrupt: self.logger.info("Simulator stopped by user") except Exception as e: self.logger.error(f"Error: {e}", exc_info=True) finally: if self.serial_conn and self.serial_conn.is_open: self.serial_conn.close() self.logger.info("Serial port closed") def main(): parser = argparse.ArgumentParser(description="Glory MACH6 Device Simulator") parser.add_argument( "--port", "-p", default="/dev/ttyUSB0", help="Serial port (default: /dev/ttyUSB0)", ) parser.add_argument( "--baudrate", "-b", type=int, default=9600, help="Baud rate (default: 9600)" ) args = parser.parse_args() simulator = GlorySimulator(args.port, args.baudrate) simulator.run() if __name__ == "__main__": main()