This commit is contained in:
Eden Kirin
2025-10-09 17:22:11 +02:00
parent 0a29898bf4
commit 1955256d06
4 changed files with 231 additions and 51 deletions

View File

@ -10,7 +10,6 @@ import logging
import random import random
import time import time
from typing import Optional from typing import Optional
from datetime import datetime
import serial import serial
@ -29,7 +28,7 @@ STATE_MESSAGE_PENDING = "PRN"
class GlorySimulator: class GlorySimulator:
def __init__(self, port: str, baudrate: int = 9600): def __init__(self, logger: logging.Logger, port: str, baudrate: int = 9600):
self.port = port self.port = port
self.baudrate = baudrate self.baudrate = baudrate
self.serial_conn: Optional[serial.Serial] = None self.serial_conn: Optional[serial.Serial] = None
@ -77,10 +76,7 @@ class GlorySimulator:
# Message buffer # Message buffer
self.pending_message = None self.pending_message = None
logging.basicConfig( self.logger = logger
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
self.logger = logging.getLogger(__name__)
def get_status(self) -> str: def get_status(self) -> str:
"""Get current machine status""" """Get current machine status"""
@ -116,7 +112,7 @@ class GlorySimulator:
def handle_clear_command(self, data: bytes) -> bytes: def handle_clear_command(self, data: bytes) -> bytes:
"""Handle clear commands (CC, CB, CS, CG)""" """Handle clear commands (CC, CB, CS, CG)"""
cmd = data.decode('ascii', errors='ignore').strip() cmd = data.decode("ascii", errors="ignore").strip()
if cmd == "CB": if cmd == "CB":
# Clear batch # Clear batch
@ -146,27 +142,32 @@ class GlorySimulator:
def handle_get_data_command(self, data: bytes) -> bytes: def handle_get_data_command(self, data: bytes) -> bytes:
"""Handle get data commands (GD, GT, GS, GG, GI, GV)""" """Handle get data commands (GD, GT, GS, GG, GI, GV)"""
cmd = data.decode('ascii', errors='ignore').strip() cmd = data.decode("ascii", errors="ignore").strip()
if cmd == "GT": if cmd == "GT":
# Get batch total # Get batch total
# If no counts, generate random simulation data
# if not self.batch_counts and self.motor_has_run:
if True:
self._simulate_counting()
total = sum(self.batch_counts.values()) total = sum(self.batch_counts.values())
response = f"BT{total:08d}" response = f"BT{total:08d}"
self.logger.info(f"Get batch total: {total}") self.logger.info(f"Get batch total: {total} coins")
return self.create_message(response) return self.create_message(response)
elif cmd == "GS": elif cmd == "GS":
# Get subtotal # Get subtotal
total = sum(self.sub_counts.values()) total = sum(self.sub_counts.values())
response = f"BS{total:08d}" response = f"BS{total:08d}"
self.logger.info(f"Get subtotal: {total}") self.logger.info(f"Get subtotal: {total} coins")
return self.create_message(response) return self.create_message(response)
elif cmd == "GG": elif cmd == "GG":
# Get grand total # Get grand total
total = sum(self.grand_counts.values()) total = sum(self.grand_counts.values())
response = f"BG{total:08d}" response = f"BG{total:08d}"
self.logger.info(f"Get grand total: {total}") self.logger.info(f"Get grand total: {total} coins")
return self.create_message(response) return self.create_message(response)
elif cmd.startswith("GD"): elif cmd.startswith("GD"):
@ -174,7 +175,7 @@ class GlorySimulator:
denom_str = cmd[2:] denom_str = cmd[2:]
count = self.partial_counts.get(denom_str, 0) count = self.partial_counts.get(denom_str, 0)
response = f"BD{count:08d}" response = f"BD{count:08d}"
self.logger.info(f"Get partial count for {denom_str}: {count}") self.logger.info(f"Get partial count for {denom_str}: {count} coins")
return self.create_message(response) return self.create_message(response)
elif cmd.startswith("GT") and len(cmd) > 2: elif cmd.startswith("GT") and len(cmd) > 2:
@ -182,7 +183,7 @@ class GlorySimulator:
denom_str = cmd[2:] denom_str = cmd[2:]
count = self.batch_counts.get(denom_str, 0) count = self.batch_counts.get(denom_str, 0)
response = f"BT{count:08d}" response = f"BT{count:08d}"
self.logger.info(f"Get batch count for {denom_str}: {count}") self.logger.info(f"Get batch count for {denom_str}: {count} coins")
return self.create_message(response) return self.create_message(response)
elif cmd == "GI": elif cmd == "GI":
@ -205,7 +206,7 @@ class GlorySimulator:
def handle_bag_stop_command(self, data: bytes) -> bytes: def handle_bag_stop_command(self, data: bytes) -> bytes:
"""Handle SV - Set bag stop command""" """Handle SV - Set bag stop command"""
cmd = data.decode('ascii', errors='ignore').strip() cmd = data.decode("ascii", errors="ignore").strip()
if len(cmd) >= 13 and cmd.startswith("SV"): if len(cmd) >= 13 and cmd.startswith("SV"):
denom_str = cmd[2:5] denom_str = cmd[2:5]
@ -224,7 +225,7 @@ class GlorySimulator:
def handle_motor_command(self, data: bytes) -> bytes: def handle_motor_command(self, data: bytes) -> bytes:
"""Handle MG/MS - Motor control commands""" """Handle MG/MS - Motor control commands"""
cmd = data.decode('ascii', errors='ignore').strip() cmd = data.decode("ascii", errors="ignore").strip()
if cmd == "MG": if cmd == "MG":
# Start motor # Start motor
@ -251,7 +252,7 @@ class GlorySimulator:
def handle_accept_command(self, data: bytes) -> bytes: def handle_accept_command(self, data: bytes) -> bytes:
"""Handle AB/AS/AG - Accept commands""" """Handle AB/AS/AG - Accept commands"""
cmd = data.decode('ascii', errors='ignore').strip() cmd = data.decode("ascii", errors="ignore").strip()
if cmd == "AB" or cmd == "Ab": if cmd == "AB" or cmd == "Ab":
# Accept batch # Accept batch
@ -281,7 +282,7 @@ class GlorySimulator:
def handle_partial_count_command(self, data: bytes) -> bytes: def handle_partial_count_command(self, data: bytes) -> bytes:
"""Handle PC - Set partial count command""" """Handle PC - Set partial count command"""
cmd = data.decode('ascii', errors='ignore').strip() cmd = data.decode("ascii", errors="ignore").strip()
if len(cmd) >= 13 and cmd.startswith("PC"): if len(cmd) >= 13 and cmd.startswith("PC"):
denom_str = cmd[2:5] denom_str = cmd[2:5]
@ -300,7 +301,7 @@ class GlorySimulator:
def handle_id_command(self, data: bytes) -> bytes: def handle_id_command(self, data: bytes) -> bytes:
"""Handle ID/IS/IG - Set ID number commands""" """Handle ID/IS/IG - Set ID number commands"""
cmd = data.decode('ascii', errors='ignore').strip() cmd = data.decode("ascii", errors="ignore").strip()
if cmd.startswith("ID"): if cmd.startswith("ID"):
self.batch_id = cmd[2:14] if len(cmd) >= 14 else cmd[2:] self.batch_id = cmd[2:14] if len(cmd) >= 14 else cmd[2:]
@ -319,15 +320,40 @@ class GlorySimulator:
def _simulate_counting(self): def _simulate_counting(self):
"""Simulate coin counting when motor runs""" """Simulate coin counting when motor runs"""
# Generate random counts for simulation # Generate random counts for simulation
denominations = ["005", "010", "025", "050", "100"] # Standard denominations with realistic count distributions
for denom in denominations: denominations = {
count = random.randint(10, 100) "001": (20, 150), # Pennies - high count
self.batch_counts[denom] = self.batch_counts.get(denom, 0) + count "005": (10, 100), # Nickels
self.partial_counts[denom] = self.partial_counts.get(denom, 0) + count "010": (10, 80), # Dimes
"025": (5, 60), # Quarters
"050": (0, 20), # Half dollars - rare
"100": (0, 10), # Dollar coins - rare
}
for denom, (min_count, max_count) in denominations.items():
count = random.randint(min_count, max_count)
if count > 0:
# Update batch counts
self.batch_counts[denom] = self.batch_counts.get(denom, 0) + count
# Update partial counts
self.partial_counts[denom] = self.partial_counts.get(denom, 0) + count
# Update subtotal
self.sub_counts[denom] = self.sub_counts.get(denom, 0) + count
# Update grand total
self.grand_counts[denom] = self.grand_counts.get(denom, 0) + count
self.logger.info(f"Counted {count} coins of denomination {denom}")
# Log total value counted
total_value = sum(
self.batch_counts.get(denom, 0) * int(denom)
for denom in denominations.keys()
)
self.logger.info(f"Total value counted in this batch: ${total_value/100:.2f}")
def create_message(self, data: str) -> bytes: def create_message(self, data: str) -> bytes:
"""Create a message with STX and ETX""" """Create a message with STX and ETX"""
return bytes([STX]) + data.encode('ascii') + bytes([ETX]) return bytes([STX]) + data.encode("ascii") + bytes([ETX])
def parse_message(self, message: bytes) -> Optional[bytes]: def parse_message(self, message: bytes) -> Optional[bytes]:
"""Parse and validate a received message""" """Parse and validate a received message"""
@ -352,7 +378,7 @@ class GlorySimulator:
return None return None
try: try:
cmd_str = data.decode('ascii', errors='ignore').strip() cmd_str = data.decode("ascii", errors="ignore").strip()
cmd = cmd_str[:2] cmd = cmd_str[:2]
self.logger.info(f"Received command: {cmd_str}") self.logger.info(f"Received command: {cmd_str}")
@ -435,14 +461,14 @@ class GlorySimulator:
message = bytes(buffer[stx_idx : etx_idx + 1]) message = bytes(buffer[stx_idx : etx_idx + 1])
buffer = buffer[etx_idx + 1 :] buffer = buffer[etx_idx + 1 :]
self.logger.debug(f"RX: {message.hex()}") self.logger.debug(f"RX: {' '.join(f'{b:02X}' for b in message)}")
# Parse and handle message # Parse and handle message
parsed_data = self.parse_message(message) parsed_data = self.parse_message(message)
if parsed_data: if parsed_data:
response = self.handle_command(parsed_data) response = self.handle_command(parsed_data)
if response: if response:
self.logger.debug(f"TX: {response.hex()}") self.logger.debug(f"TX: {' '.join(f'{b:02X}' for b in response)}")
self.serial_conn.write(response) self.serial_conn.write(response)
except ValueError: except ValueError:
pass # ETX not found yet pass # ETX not found yet

24
main.py
View File

@ -5,6 +5,7 @@ Entry point for running Pelican or Glory device simulators.
""" """
import argparse import argparse
import logging
import sys import sys
from glory import GlorySimulator from glory import GlorySimulator
@ -12,6 +13,8 @@ from glory import GlorySimulator
# Import simulators # Import simulators
from pelican import PelicanSimulator from pelican import PelicanSimulator
LOG_LEVEL = logging.DEBUG
def main(): def main():
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
@ -66,25 +69,30 @@ Examples:
args = parser.parse_args() args = parser.parse_args()
# Setup centralized logger
logging.basicConfig(
level=LOG_LEVEL,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(f"{args.simulator}_simulator")
# Run the appropriate simulator # Run the appropriate simulator
try: try:
if args.simulator == "pelican": if args.simulator == "pelican":
print( simulator = PelicanSimulator(
f"Starting Pelican simulator on {args.port} at {args.baudrate} baud..." logger=logger, port=args.port, baudrate=args.baudrate
) )
simulator = PelicanSimulator(args.port, args.baudrate)
simulator.run() simulator.run()
elif args.simulator == "glory": elif args.simulator == "glory":
print( simulator = GlorySimulator(
f"Starting Glory MACH6 simulator on {args.port} at {args.baudrate} baud..." logger=logger, port=args.port, baudrate=args.baudrate
) )
simulator = GlorySimulator(args.port, args.baudrate)
simulator.run() simulator.run()
except KeyboardInterrupt: except KeyboardInterrupt:
print("\nSimulator stopped by user") logger.info("Simulator stopped by user")
sys.exit(0) sys.exit(0)
except Exception as e: except Exception as e:
print(f"Error: {e}", file=sys.stderr) logger.error(f"Error: {e}", exc_info=True)
sys.exit(1) sys.exit(1)

View File

@ -42,7 +42,7 @@ PASSWORD = "69390274"
class PelicanSimulator: class PelicanSimulator:
def __init__(self, port: str, baudrate: int = 9600): def __init__(self, logger: logging.Logger, port: str, baudrate: int = 9600):
self.port = port self.port = port
self.baudrate = baudrate self.baudrate = baudrate
self.serial_conn: Optional[serial.Serial] = None self.serial_conn: Optional[serial.Serial] = None
@ -79,10 +79,7 @@ class PelicanSimulator:
200, # 2.00 200, # 2.00
] ]
logging.basicConfig( self.logger = logger
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
self.logger = logging.getLogger(__name__)
def calc_crc(self, data: bytes) -> int: def calc_crc(self, data: bytes) -> int:
"""Calculate CRC-16 using the algorithm from the spec""" """Calculate CRC-16 using the algorithm from the spec"""
@ -184,8 +181,12 @@ class PelicanSimulator:
random_counts = [] random_counts = []
for i in range(20): for i in range(20):
# Generate random counts, with higher probability for lower denominations # Generate random counts, with higher probability for lower denominations
if i < 8: # Standard denominations (0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2) if (
max_count = max(1, 100 - i * 10) # Lower denominations have higher counts i < 8
): # Standard denominations (0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2)
max_count = max(
1, 100 - i * 10
) # Lower denominations have higher counts
random_counts.append(random.randint(0, max_count)) random_counts.append(random.randint(0, max_count))
else: # Random denominations else: # Random denominations
random_counts.append(random.randint(0, 20)) random_counts.append(random.randint(0, 20))
@ -217,10 +218,21 @@ class PelicanSimulator:
total_coins = sum(random_counts) total_coins = sum(random_counts)
# Log detailed count information # Log detailed count information
self.logger.info(f"Current count: {total_coins} total coins, {random_rejected} rejected") self.logger.info(
f"Current count: {total_coins} total coins, {random_rejected} rejected"
)
# Log counts for standard denominations # Log counts for standard denominations
standard_denoms = ["0.01", "0.02", "0.05", "0.10", "0.20", "0.50", "1.00", "2.00"] standard_denoms = [
"0.01",
"0.02",
"0.05",
"0.10",
"0.20",
"0.50",
"1.00",
"2.00",
]
count_details = [] count_details = []
for i in range(8): for i in range(8):
if random_counts[i] > 0: if random_counts[i] > 0:
@ -279,7 +291,9 @@ class PelicanSimulator:
self.host_version & 0xFF, self.host_version & 0xFF,
] ]
) )
self.logger.info(f"SW Version: {self.sw_version:08X}, SW Code: {self.sw_code:08X}, Host Version: {self.host_version:08X}") self.logger.info(
f"SW Version: {self.sw_version:08X}, SW Code: {self.sw_code:08X}, Host Version: {self.host_version:08X}"
)
return self.create_message(bytes(response)) return self.create_message(bytes(response))
# Variable 0x33 - Machine status # Variable 0x33 - Machine status
@ -317,9 +331,11 @@ class PelicanSimulator:
0x02: "Tubing mode", 0x02: "Tubing mode",
0x20: "Memory", 0x20: "Memory",
0x40: "Programming", 0x40: "Programming",
0x80: "Setup" 0x80: "Setup",
} }
state_desc = program_states.get(self.program_state, f"Unknown (0x{self.program_state:02X})") state_desc = program_states.get(
self.program_state, f"Unknown (0x{self.program_state:02X})"
)
status_desc = [] status_desc = []
if self.motor_running: if self.motor_running:
@ -356,9 +372,139 @@ class PelicanSimulator:
else: else:
formatted_denoms.append(f"Coin{i+1}: 0.{denom:02d}") formatted_denoms.append(f"Coin{i+1}: 0.{denom:02d}")
self.logger.info(f"Denomination values (20 coins): {', '.join(formatted_denoms[:8])}") self.logger.info(
f"Denomination values (20 coins): {', '.join(formatted_denoms[:8])}"
)
if len(formatted_denoms) > 8: if len(formatted_denoms) > 8:
self.logger.info(f"Additional denominations: {', '.join(formatted_denoms[8:])}") self.logger.info(
f"Additional denominations: {', '.join(formatted_denoms[8:])}"
)
return self.create_message(bytes(response))
# Variable 0x22 - Coin exit counters (similar to 0x16 but for sorted coins)
elif var_num == 0x22:
self.logger.info("Responding with coin exit counters")
# Generate random exit counts for simulation
exit_counts = []
for i in range(20):
if i < 8: # Standard denominations
max_count = max(1, 100 - i * 10)
exit_counts.append(random.randint(0, max_count))
else:
exit_counts.append(0) # No exits for unused channels
response = [CMD_VALUE_RETURNED, 0x00, 0x22]
# Add 20 coin exit counters (4 bytes each)
for count in exit_counts:
response.extend(
[
(count >> 24) & 0xFF,
(count >> 16) & 0xFF,
(count >> 8) & 0xFF,
count & 0xFF,
]
)
total_exits = sum(exit_counts)
self.logger.info(f"Coin exit counters: {total_exits} total coins sorted")
# Log counts for standard denominations
standard_denoms = [
"0.01",
"0.02",
"0.05",
"0.10",
"0.20",
"0.50",
"1.00",
"2.00",
]
exit_details = []
for i in range(8):
if exit_counts[i] > 0:
exit_details.append(f"{standard_denoms[i]}: {exit_counts[i]}")
if exit_details:
self.logger.info(f"Exit counts: {', '.join(exit_details)}")
return self.create_message(bytes(response))
# Variable 0x23 - Get transaction data
elif var_num == 0x23:
# Need transaction number (2 bytes) after variable number
if len(data) < 4:
self.logger.error(
"Get transaction data request missing transaction number"
)
return self.create_message(bytes([CMD_VALUE_RETURNED, 0x01]))
transaction_num = (data[2] << 8) | data[3]
self.logger.info(
f"Responding with transaction data for transaction #{transaction_num}"
)
# Generate simulated transaction data
import time
current_time = time.localtime()
response = [CMD_VALUE_RETURNED, 0x00, 0x23]
# Transaction number (2 bytes)
response.extend([(transaction_num >> 8) & 0xFF, transaction_num & 0xFF])
# Cashier number (2 bytes)
cashier_num = random.randint(1, 99)
response.extend([(cashier_num >> 8) & 0xFF, cashier_num & 0xFF])
# Not used yet (1 byte)
response.append(0x00)
# Hour, Minute
response.extend([current_time.tm_hour, current_time.tm_min])
# Year (2-digit), Month, Day
response.extend(
[current_time.tm_year % 100, current_time.tm_mon, current_time.tm_mday]
)
# Fee amount - currency 0 (4 bytes)
fee_amount_0 = random.randint(0, 50000) # Up to 500.00
response.extend(
[
(fee_amount_0 >> 24) & 0xFF,
(fee_amount_0 >> 16) & 0xFF,
(fee_amount_0 >> 8) & 0xFF,
fee_amount_0 & 0xFF,
]
)
# Fee amount - currency 1 (4 bytes)
response.extend([0x00, 0x00, 0x00, 0x00])
# Coin amount - currency 0 (4 bytes)
coin_amount_0 = random.randint(10000, 500000) # 100.00 to 5000.00
response.extend(
[
(coin_amount_0 >> 24) & 0xFF,
(coin_amount_0 >> 16) & 0xFF,
(coin_amount_0 >> 8) & 0xFF,
coin_amount_0 & 0xFF,
]
)
# Coin amount - currency 1 (4 bytes)
response.extend([0x00, 0x00, 0x00, 0x00])
# Note amount - currency 0 (4 bytes) / Account number high for CDS
response.extend([0x00, 0x00, 0x00, 0x00])
# Note amount - currency 1 (4 bytes) / Account number low for CDS
response.extend([0x00, 0x00, 0x00, 0x00])
self.logger.info(
f"Transaction #{transaction_num}: Cashier {cashier_num}, "
f"Date {current_time.tm_year}/{current_time.tm_mon}/{current_time.tm_mday} "
f"{current_time.tm_hour:02d}:{current_time.tm_min:02d}, "
f"Coin amount: ${coin_amount_0/100:.2f}, Fee: ${fee_amount_0/100:.2f}"
)
return self.create_message(bytes(response)) return self.create_message(bytes(response))
else: else:
@ -497,14 +643,14 @@ class PelicanSimulator:
message = bytes(buffer[stx_idx : etx_idx + 1]) message = bytes(buffer[stx_idx : etx_idx + 1])
buffer = buffer[etx_idx + 1 :] buffer = buffer[etx_idx + 1 :]
self.logger.debug(f"RX: {message.hex()}") self.logger.debug(f"RX: {' '.join(f'{b:02X}' for b in message)}")
# Parse and handle message # Parse and handle message
parsed_data = self.parse_message(message) parsed_data = self.parse_message(message)
if parsed_data: if parsed_data:
response = self.handle_command(parsed_data) response = self.handle_command(parsed_data)
if response: if response:
self.logger.debug(f"TX: {response.hex()}") self.logger.debug(f"TX: {' '.join(f'{b:02X}' for b in response)}")
self.serial_conn.write(response) self.serial_conn.write(response)
except ValueError: except ValueError:
pass # ETX not found yet pass # ETX not found yet