From 1955256d06d3ae1e6b81b4a73339a2cbf8a1342b Mon Sep 17 00:00:00 2001 From: Eden Kirin Date: Thu, 9 Oct 2025 17:22:11 +0200 Subject: [PATCH] Update --- ...{PeliHost_09_03_30-III.pdf => Pelican.pdf} | Bin glory.py | 80 +++++--- main.py | 24 ++- pelican.py | 178 ++++++++++++++++-- 4 files changed, 231 insertions(+), 51 deletions(-) rename docs/{PeliHost_09_03_30-III.pdf => Pelican.pdf} (100%) diff --git a/docs/PeliHost_09_03_30-III.pdf b/docs/Pelican.pdf similarity index 100% rename from docs/PeliHost_09_03_30-III.pdf rename to docs/Pelican.pdf diff --git a/glory.py b/glory.py index 6c19e72..d6f8536 100644 --- a/glory.py +++ b/glory.py @@ -10,7 +10,6 @@ import logging import random import time from typing import Optional -from datetime import datetime import serial @@ -29,7 +28,7 @@ STATE_MESSAGE_PENDING = "PRN" class GlorySimulator: - def __init__(self, port: str, baudrate: int = 9600): + def __init__(self, logger: logging.Logger, port: str, baudrate: int = 9600): self.port = port self.baudrate = baudrate self.serial_conn: Optional[serial.Serial] = None @@ -77,10 +76,7 @@ class GlorySimulator: # Message buffer self.pending_message = None - logging.basicConfig( - level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" - ) - self.logger = logging.getLogger(__name__) + self.logger = logger def get_status(self) -> str: """Get current machine status""" @@ -116,7 +112,7 @@ class GlorySimulator: def handle_clear_command(self, data: bytes) -> bytes: """Handle clear commands (CC, CB, CS, CG)""" - cmd = data.decode('ascii', errors='ignore').strip() + cmd = data.decode("ascii", errors="ignore").strip() if cmd == "CB": # Clear batch @@ -146,27 +142,32 @@ class GlorySimulator: def handle_get_data_command(self, data: bytes) -> bytes: """Handle get data commands (GD, GT, GS, GG, GI, GV)""" - cmd = data.decode('ascii', errors='ignore').strip() + cmd = data.decode("ascii", errors="ignore").strip() if cmd == "GT": # Get batch total + # If no counts, generate random simulation data + # if not self.batch_counts and self.motor_has_run: + if True: + self._simulate_counting() + total = sum(self.batch_counts.values()) response = f"BT{total:08d}" - self.logger.info(f"Get batch total: {total}") + self.logger.info(f"Get batch total: {total} coins") return self.create_message(response) elif cmd == "GS": # Get subtotal total = sum(self.sub_counts.values()) response = f"BS{total:08d}" - self.logger.info(f"Get subtotal: {total}") + self.logger.info(f"Get subtotal: {total} coins") return self.create_message(response) elif cmd == "GG": # Get grand total total = sum(self.grand_counts.values()) response = f"BG{total:08d}" - self.logger.info(f"Get grand total: {total}") + self.logger.info(f"Get grand total: {total} coins") return self.create_message(response) elif cmd.startswith("GD"): @@ -174,7 +175,7 @@ class GlorySimulator: denom_str = cmd[2:] count = self.partial_counts.get(denom_str, 0) response = f"BD{count:08d}" - self.logger.info(f"Get partial count for {denom_str}: {count}") + self.logger.info(f"Get partial count for {denom_str}: {count} coins") return self.create_message(response) elif cmd.startswith("GT") and len(cmd) > 2: @@ -182,7 +183,7 @@ class GlorySimulator: denom_str = cmd[2:] count = self.batch_counts.get(denom_str, 0) response = f"BT{count:08d}" - self.logger.info(f"Get batch count for {denom_str}: {count}") + self.logger.info(f"Get batch count for {denom_str}: {count} coins") return self.create_message(response) elif cmd == "GI": @@ -205,7 +206,7 @@ class GlorySimulator: def handle_bag_stop_command(self, data: bytes) -> bytes: """Handle SV - Set bag stop command""" - cmd = data.decode('ascii', errors='ignore').strip() + cmd = data.decode("ascii", errors="ignore").strip() if len(cmd) >= 13 and cmd.startswith("SV"): denom_str = cmd[2:5] @@ -224,7 +225,7 @@ class GlorySimulator: def handle_motor_command(self, data: bytes) -> bytes: """Handle MG/MS - Motor control commands""" - cmd = data.decode('ascii', errors='ignore').strip() + cmd = data.decode("ascii", errors="ignore").strip() if cmd == "MG": # Start motor @@ -251,7 +252,7 @@ class GlorySimulator: def handle_accept_command(self, data: bytes) -> bytes: """Handle AB/AS/AG - Accept commands""" - cmd = data.decode('ascii', errors='ignore').strip() + cmd = data.decode("ascii", errors="ignore").strip() if cmd == "AB" or cmd == "Ab": # Accept batch @@ -281,7 +282,7 @@ class GlorySimulator: def handle_partial_count_command(self, data: bytes) -> bytes: """Handle PC - Set partial count command""" - cmd = data.decode('ascii', errors='ignore').strip() + cmd = data.decode("ascii", errors="ignore").strip() if len(cmd) >= 13 and cmd.startswith("PC"): denom_str = cmd[2:5] @@ -300,7 +301,7 @@ class GlorySimulator: def handle_id_command(self, data: bytes) -> bytes: """Handle ID/IS/IG - Set ID number commands""" - cmd = data.decode('ascii', errors='ignore').strip() + cmd = data.decode("ascii", errors="ignore").strip() if cmd.startswith("ID"): self.batch_id = cmd[2:14] if len(cmd) >= 14 else cmd[2:] @@ -319,15 +320,40 @@ class GlorySimulator: def _simulate_counting(self): """Simulate coin counting when motor runs""" # Generate random counts for simulation - denominations = ["005", "010", "025", "050", "100"] - for denom in denominations: - count = random.randint(10, 100) - self.batch_counts[denom] = self.batch_counts.get(denom, 0) + count - self.partial_counts[denom] = self.partial_counts.get(denom, 0) + count + # Standard denominations with realistic count distributions + denominations = { + "001": (20, 150), # Pennies - high count + "005": (10, 100), # Nickels + "010": (10, 80), # Dimes + "025": (5, 60), # Quarters + "050": (0, 20), # Half dollars - rare + "100": (0, 10), # Dollar coins - rare + } + + for denom, (min_count, max_count) in denominations.items(): + count = random.randint(min_count, max_count) + if count > 0: + # Update batch counts + self.batch_counts[denom] = self.batch_counts.get(denom, 0) + count + # Update partial counts + self.partial_counts[denom] = self.partial_counts.get(denom, 0) + count + # Update subtotal + self.sub_counts[denom] = self.sub_counts.get(denom, 0) + count + # Update grand total + self.grand_counts[denom] = self.grand_counts.get(denom, 0) + count + + self.logger.info(f"Counted {count} coins of denomination {denom}") + + # Log total value counted + total_value = sum( + self.batch_counts.get(denom, 0) * int(denom) + for denom in denominations.keys() + ) + self.logger.info(f"Total value counted in this batch: ${total_value/100:.2f}") def create_message(self, data: str) -> bytes: """Create a message with STX and ETX""" - return bytes([STX]) + data.encode('ascii') + bytes([ETX]) + return bytes([STX]) + data.encode("ascii") + bytes([ETX]) def parse_message(self, message: bytes) -> Optional[bytes]: """Parse and validate a received message""" @@ -352,7 +378,7 @@ class GlorySimulator: return None try: - cmd_str = data.decode('ascii', errors='ignore').strip() + cmd_str = data.decode("ascii", errors="ignore").strip() cmd = cmd_str[:2] self.logger.info(f"Received command: {cmd_str}") @@ -435,14 +461,14 @@ class GlorySimulator: message = bytes(buffer[stx_idx : etx_idx + 1]) buffer = buffer[etx_idx + 1 :] - self.logger.debug(f"RX: {message.hex()}") + self.logger.debug(f"RX: {' '.join(f'{b:02X}' for b in message)}") # Parse and handle message parsed_data = self.parse_message(message) if parsed_data: response = self.handle_command(parsed_data) if response: - self.logger.debug(f"TX: {response.hex()}") + self.logger.debug(f"TX: {' '.join(f'{b:02X}' for b in response)}") self.serial_conn.write(response) except ValueError: pass # ETX not found yet diff --git a/main.py b/main.py index bb0d12d..652cfce 100644 --- a/main.py +++ b/main.py @@ -5,6 +5,7 @@ Entry point for running Pelican or Glory device simulators. """ import argparse +import logging import sys from glory import GlorySimulator @@ -12,6 +13,8 @@ from glory import GlorySimulator # Import simulators from pelican import PelicanSimulator +LOG_LEVEL = logging.DEBUG + def main(): parser = argparse.ArgumentParser( @@ -66,25 +69,30 @@ Examples: args = parser.parse_args() + # Setup centralized logger + logging.basicConfig( + level=LOG_LEVEL, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + ) + logger = logging.getLogger(f"{args.simulator}_simulator") + # Run the appropriate simulator try: if args.simulator == "pelican": - print( - f"Starting Pelican simulator on {args.port} at {args.baudrate} baud..." + simulator = PelicanSimulator( + logger=logger, port=args.port, baudrate=args.baudrate ) - simulator = PelicanSimulator(args.port, args.baudrate) simulator.run() elif args.simulator == "glory": - print( - f"Starting Glory MACH6 simulator on {args.port} at {args.baudrate} baud..." + simulator = GlorySimulator( + logger=logger, port=args.port, baudrate=args.baudrate ) - simulator = GlorySimulator(args.port, args.baudrate) simulator.run() except KeyboardInterrupt: - print("\nSimulator stopped by user") + logger.info("Simulator stopped by user") sys.exit(0) except Exception as e: - print(f"Error: {e}", file=sys.stderr) + logger.error(f"Error: {e}", exc_info=True) sys.exit(1) diff --git a/pelican.py b/pelican.py index b13480e..74e780c 100644 --- a/pelican.py +++ b/pelican.py @@ -42,7 +42,7 @@ PASSWORD = "69390274" class PelicanSimulator: - def __init__(self, port: str, baudrate: int = 9600): + def __init__(self, logger: logging.Logger, port: str, baudrate: int = 9600): self.port = port self.baudrate = baudrate self.serial_conn: Optional[serial.Serial] = None @@ -79,10 +79,7 @@ class PelicanSimulator: 200, # 2.00 ] - logging.basicConfig( - level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" - ) - self.logger = logging.getLogger(__name__) + self.logger = logger def calc_crc(self, data: bytes) -> int: """Calculate CRC-16 using the algorithm from the spec""" @@ -184,8 +181,12 @@ class PelicanSimulator: random_counts = [] for i in range(20): # Generate random counts, with higher probability for lower denominations - if i < 8: # Standard denominations (0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2) - max_count = max(1, 100 - i * 10) # Lower denominations have higher counts + if ( + i < 8 + ): # Standard denominations (0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2) + max_count = max( + 1, 100 - i * 10 + ) # Lower denominations have higher counts random_counts.append(random.randint(0, max_count)) else: # Random denominations random_counts.append(random.randint(0, 20)) @@ -217,10 +218,21 @@ class PelicanSimulator: total_coins = sum(random_counts) # Log detailed count information - self.logger.info(f"Current count: {total_coins} total coins, {random_rejected} rejected") + self.logger.info( + f"Current count: {total_coins} total coins, {random_rejected} rejected" + ) # Log counts for standard denominations - standard_denoms = ["0.01", "0.02", "0.05", "0.10", "0.20", "0.50", "1.00", "2.00"] + standard_denoms = [ + "0.01", + "0.02", + "0.05", + "0.10", + "0.20", + "0.50", + "1.00", + "2.00", + ] count_details = [] for i in range(8): if random_counts[i] > 0: @@ -279,7 +291,9 @@ class PelicanSimulator: self.host_version & 0xFF, ] ) - self.logger.info(f"SW Version: {self.sw_version:08X}, SW Code: {self.sw_code:08X}, Host Version: {self.host_version:08X}") + self.logger.info( + f"SW Version: {self.sw_version:08X}, SW Code: {self.sw_code:08X}, Host Version: {self.host_version:08X}" + ) return self.create_message(bytes(response)) # Variable 0x33 - Machine status @@ -317,9 +331,11 @@ class PelicanSimulator: 0x02: "Tubing mode", 0x20: "Memory", 0x40: "Programming", - 0x80: "Setup" + 0x80: "Setup", } - state_desc = program_states.get(self.program_state, f"Unknown (0x{self.program_state:02X})") + state_desc = program_states.get( + self.program_state, f"Unknown (0x{self.program_state:02X})" + ) status_desc = [] if self.motor_running: @@ -356,9 +372,139 @@ class PelicanSimulator: else: formatted_denoms.append(f"Coin{i+1}: 0.{denom:02d}") - self.logger.info(f"Denomination values (20 coins): {', '.join(formatted_denoms[:8])}") + self.logger.info( + f"Denomination values (20 coins): {', '.join(formatted_denoms[:8])}" + ) if len(formatted_denoms) > 8: - self.logger.info(f"Additional denominations: {', '.join(formatted_denoms[8:])}") + self.logger.info( + f"Additional denominations: {', '.join(formatted_denoms[8:])}" + ) + return self.create_message(bytes(response)) + + # Variable 0x22 - Coin exit counters (similar to 0x16 but for sorted coins) + elif var_num == 0x22: + self.logger.info("Responding with coin exit counters") + + # Generate random exit counts for simulation + exit_counts = [] + for i in range(20): + if i < 8: # Standard denominations + max_count = max(1, 100 - i * 10) + exit_counts.append(random.randint(0, max_count)) + else: + exit_counts.append(0) # No exits for unused channels + + response = [CMD_VALUE_RETURNED, 0x00, 0x22] + # Add 20 coin exit counters (4 bytes each) + for count in exit_counts: + response.extend( + [ + (count >> 24) & 0xFF, + (count >> 16) & 0xFF, + (count >> 8) & 0xFF, + count & 0xFF, + ] + ) + + total_exits = sum(exit_counts) + self.logger.info(f"Coin exit counters: {total_exits} total coins sorted") + + # Log counts for standard denominations + standard_denoms = [ + "0.01", + "0.02", + "0.05", + "0.10", + "0.20", + "0.50", + "1.00", + "2.00", + ] + exit_details = [] + for i in range(8): + if exit_counts[i] > 0: + exit_details.append(f"{standard_denoms[i]}: {exit_counts[i]}") + + if exit_details: + self.logger.info(f"Exit counts: {', '.join(exit_details)}") + + return self.create_message(bytes(response)) + + # Variable 0x23 - Get transaction data + elif var_num == 0x23: + # Need transaction number (2 bytes) after variable number + if len(data) < 4: + self.logger.error( + "Get transaction data request missing transaction number" + ) + return self.create_message(bytes([CMD_VALUE_RETURNED, 0x01])) + + transaction_num = (data[2] << 8) | data[3] + self.logger.info( + f"Responding with transaction data for transaction #{transaction_num}" + ) + + # Generate simulated transaction data + import time + + current_time = time.localtime() + + response = [CMD_VALUE_RETURNED, 0x00, 0x23] + # Transaction number (2 bytes) + response.extend([(transaction_num >> 8) & 0xFF, transaction_num & 0xFF]) + # Cashier number (2 bytes) + cashier_num = random.randint(1, 99) + response.extend([(cashier_num >> 8) & 0xFF, cashier_num & 0xFF]) + # Not used yet (1 byte) + response.append(0x00) + # Hour, Minute + response.extend([current_time.tm_hour, current_time.tm_min]) + # Year (2-digit), Month, Day + response.extend( + [current_time.tm_year % 100, current_time.tm_mon, current_time.tm_mday] + ) + + # Fee amount - currency 0 (4 bytes) + fee_amount_0 = random.randint(0, 50000) # Up to 500.00 + response.extend( + [ + (fee_amount_0 >> 24) & 0xFF, + (fee_amount_0 >> 16) & 0xFF, + (fee_amount_0 >> 8) & 0xFF, + fee_amount_0 & 0xFF, + ] + ) + + # Fee amount - currency 1 (4 bytes) + response.extend([0x00, 0x00, 0x00, 0x00]) + + # Coin amount - currency 0 (4 bytes) + coin_amount_0 = random.randint(10000, 500000) # 100.00 to 5000.00 + response.extend( + [ + (coin_amount_0 >> 24) & 0xFF, + (coin_amount_0 >> 16) & 0xFF, + (coin_amount_0 >> 8) & 0xFF, + coin_amount_0 & 0xFF, + ] + ) + + # Coin amount - currency 1 (4 bytes) + response.extend([0x00, 0x00, 0x00, 0x00]) + + # Note amount - currency 0 (4 bytes) / Account number high for CDS + response.extend([0x00, 0x00, 0x00, 0x00]) + + # Note amount - currency 1 (4 bytes) / Account number low for CDS + response.extend([0x00, 0x00, 0x00, 0x00]) + + self.logger.info( + f"Transaction #{transaction_num}: Cashier {cashier_num}, " + f"Date {current_time.tm_year}/{current_time.tm_mon}/{current_time.tm_mday} " + f"{current_time.tm_hour:02d}:{current_time.tm_min:02d}, " + f"Coin amount: ${coin_amount_0/100:.2f}, Fee: ${fee_amount_0/100:.2f}" + ) + return self.create_message(bytes(response)) else: @@ -497,14 +643,14 @@ class PelicanSimulator: message = bytes(buffer[stx_idx : etx_idx + 1]) buffer = buffer[etx_idx + 1 :] - self.logger.debug(f"RX: {message.hex()}") + self.logger.debug(f"RX: {' '.join(f'{b:02X}' for b in message)}") # Parse and handle message parsed_data = self.parse_message(message) if parsed_data: response = self.handle_command(parsed_data) if response: - self.logger.debug(f"TX: {response.hex()}") + self.logger.debug(f"TX: {' '.join(f'{b:02X}' for b in response)}") self.serial_conn.write(response) except ValueError: pass # ETX not found yet