This commit is contained in:
Eden Kirin
2025-10-09 14:00:22 +02:00
commit 724f116ed2
10 changed files with 1072 additions and 0 deletions

11
.gitignore vendored Normal file
View File

@ -0,0 +1,11 @@
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info
*.log
# Virtual environments
.venv

1
.python-version Normal file
View File

@ -0,0 +1 @@
3.13

6
Makefile Normal file
View File

@ -0,0 +1,6 @@
run-pelican:
@ uv run python pelican.py --port /dev/ttyUSB0 --baud 115200
run-glory:
@ uv run python glory.py --port /dev/ttyUSB0 --baud 115200

BIN
README.md Normal file

Binary file not shown.

BIN
docs/Glory.pdf Normal file

Binary file not shown.

Binary file not shown.

481
glory.py Normal file
View File

@ -0,0 +1,481 @@
#!/usr/bin/env python3
"""
Glory MACH6 Device Simulator
Simulates a Glory MACH6 coin/currency counter for development/testing purposes.
Implements the Enhanced Serial Port Mode with full command support.
"""
import argparse
import logging
import random
import time
from typing import Optional
from datetime import datetime
import serial
# Protocol constants
STX = 0x02
ETX = 0x03
ACK = 0x06
NAK = 0x15
# Machine states
STATE_IDLE = "OK "
STATE_ERROR = "E"
STATE_BAG_STOP = "BAG"
STATE_RUNNING = "RUN"
STATE_MESSAGE_PENDING = "PRN"
class GlorySimulator:
def __init__(self, port: str, baudrate: int = 9600):
self.port = port
self.baudrate = baudrate
self.serial_conn: Optional[serial.Serial] = None
# Device state
self.motor_running = False
self.display_locked = False
self.keyboard_locked = False
self.motor_has_run = False
# Count mode: "$0" for dollars, "U0" for units
self.count_mode = "$0"
# Coin/currency counters
self.partial_counts = {} # denomination -> count
self.batch_counts = {}
self.sub_counts = {}
self.grand_counts = {}
# ID numbers
self.batch_id = ""
self.sub_id = ""
self.grand_id = ""
# Bag stops
self.bag_stops = {} # denomination -> stop value
self.bag_stop_counts = {} # denomination -> number of stops
# Station/denomination values (coin types)
self.station_values = {
1: 0.11,
2: 0.10,
3: 0.01,
4: 0.05,
5: 0.25,
6: 1.00,
7: 0.50,
8: 0.08,
9: 0.00,
}
# Error state
self.error_code = None
# Message buffer
self.pending_message = None
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
self.logger = logging.getLogger(__name__)
def get_status(self) -> str:
"""Get current machine status"""
if self.pending_message:
return STATE_MESSAGE_PENDING
if self.error_code:
return f"{STATE_ERROR}{self.error_code:02d}"
if self.motor_running:
return STATE_RUNNING
# Check for bag stops
for denom, stop_value in self.bag_stops.items():
if denom in self.batch_counts:
if self.batch_counts[denom] >= stop_value:
return f"{STATE_BAG_STOP}{denom:03d}"
# Keys locked/unlocked status
if self.keyboard_locked:
motor_flag = "1" if self.motor_has_run else "0"
return f"OU{motor_flag}"
else:
motor_flag = "1" if self.motor_has_run else "0"
return f"OK{motor_flag}"
def handle_status_command(self, data: bytes) -> bytes:
"""Handle SS - Status command"""
status = self.get_status()
response = f"ST{self.count_mode}{status}"
self.logger.info(f"Status request - Response: {response}")
return self.create_message(response)
def handle_clear_command(self, data: bytes) -> bytes:
"""Handle clear commands (CC, CB, CS, CG)"""
cmd = data.decode('ascii', errors='ignore').strip()
if cmd == "CB":
# Clear batch
self.batch_counts = {}
self.batch_id = ""
self.logger.info("Batch counts cleared")
elif cmd == "CS":
# Clear subtotal
self.sub_counts = {}
self.sub_id = ""
self.logger.info("Subtotal counts cleared")
elif cmd == "CG":
# Clear grand total
self.grand_counts = {}
self.grand_id = ""
self.logger.info("Grand total counts cleared")
elif cmd.startswith("CC"):
# Clear partial count
denom_str = cmd[2:]
if denom_str in self.partial_counts:
del self.partial_counts[denom_str]
self.logger.info(f"Partial count for {denom_str} cleared")
status = self.get_status()
response = f"ST{self.count_mode}{status}"
return self.create_message(response)
def handle_get_data_command(self, data: bytes) -> bytes:
"""Handle get data commands (GD, GT, GS, GG, GI, GV)"""
cmd = data.decode('ascii', errors='ignore').strip()
if cmd == "GT":
# Get batch total
total = sum(self.batch_counts.values())
response = f"BT{total:08d}"
self.logger.info(f"Get batch total: {total}")
return self.create_message(response)
elif cmd == "GS":
# Get subtotal
total = sum(self.sub_counts.values())
response = f"BS{total:08d}"
self.logger.info(f"Get subtotal: {total}")
return self.create_message(response)
elif cmd == "GG":
# Get grand total
total = sum(self.grand_counts.values())
response = f"BG{total:08d}"
self.logger.info(f"Get grand total: {total}")
return self.create_message(response)
elif cmd.startswith("GD"):
# Get partial count data
denom_str = cmd[2:]
count = self.partial_counts.get(denom_str, 0)
response = f"BD{count:08d}"
self.logger.info(f"Get partial count for {denom_str}: {count}")
return self.create_message(response)
elif cmd.startswith("GT") and len(cmd) > 2:
# Get batch counts of denomination
denom_str = cmd[2:]
count = self.batch_counts.get(denom_str, 0)
response = f"BT{count:08d}"
self.logger.info(f"Get batch count for {denom_str}: {count}")
return self.create_message(response)
elif cmd == "GI":
# Get product totals (ID totals)
response = ""
# Return empty if no totals
self.logger.info("Get product totals (empty)")
return self.create_message(response)
elif cmd == "GV":
# Get station value table
response = ""
for station, value in sorted(self.station_values.items()):
active = "i" if value == 0 else str(station)
response += f"{active} {value:.2f} \r"
self.logger.info("Get station values")
return self.create_message(response)
return self.create_message("")
def handle_bag_stop_command(self, data: bytes) -> bytes:
"""Handle SV - Set bag stop command"""
cmd = data.decode('ascii', errors='ignore').strip()
if len(cmd) >= 13 and cmd.startswith("SV"):
denom_str = cmd[2:5]
value_str = cmd[5:13]
try:
value = int(value_str)
self.bag_stops[denom_str] = value
self.logger.info(f"Bag stop set for {denom_str}: {value}")
except ValueError:
self.logger.error(f"Invalid bag stop value: {value_str}")
status = self.get_status()
response = f"ST{self.count_mode}{status}"
return self.create_message(response)
def handle_motor_command(self, data: bytes) -> bytes:
"""Handle MG/MS - Motor control commands"""
cmd = data.decode('ascii', errors='ignore').strip()
if cmd == "MG":
# Start motor
self.motor_running = True
self.motor_has_run = True
self.logger.info("Motor started")
# Simulate coin counting
self._simulate_counting()
elif cmd == "MS":
# Stop motor
self.motor_running = False
self.logger.info("Motor stopped")
status = self.get_status()
response = f"ST{self.count_mode}{status}"
return self.create_message(response)
def handle_beep_command(self, data: bytes) -> bytes:
"""Handle BB - Beep command"""
self.logger.info("Beep!")
status = self.get_status()
response = f"ST{self.count_mode}{status}"
return self.create_message(response)
def handle_accept_command(self, data: bytes) -> bytes:
"""Handle AB/AS/AG - Accept commands"""
cmd = data.decode('ascii', errors='ignore').strip()
if cmd == "AB" or cmd == "Ab":
# Accept batch
total = sum(self.batch_counts.values())
response = f"BT{total:08d}"
self.logger.info(f"Accept batch: {total}")
self.pending_message = response
return self.create_message(response)
elif cmd == "AS" or cmd == "As":
# Accept subtotal
total = sum(self.sub_counts.values())
response = f"BS{total:08d}"
self.logger.info(f"Accept subtotal: {total}")
self.pending_message = response
return self.create_message(response)
elif cmd == "AG" or cmd == "Ag":
# Accept grand total
total = sum(self.grand_counts.values())
response = f"BG{total:08d}"
self.logger.info(f"Accept grand total: {total}")
self.pending_message = response
return self.create_message(response)
return self.create_message("")
def handle_partial_count_command(self, data: bytes) -> bytes:
"""Handle PC - Set partial count command"""
cmd = data.decode('ascii', errors='ignore').strip()
if len(cmd) >= 13 and cmd.startswith("PC"):
denom_str = cmd[2:5]
value_str = cmd[5:13]
try:
value = int(value_str)
self.partial_counts[denom_str] = value
self.logger.info(f"Partial count set for {denom_str}: {value}")
except ValueError:
self.logger.error(f"Invalid partial count value: {value_str}")
status = self.get_status()
response = f"ST{self.count_mode}{status}"
return self.create_message(response)
def handle_id_command(self, data: bytes) -> bytes:
"""Handle ID/IS/IG - Set ID number commands"""
cmd = data.decode('ascii', errors='ignore').strip()
if cmd.startswith("ID"):
self.batch_id = cmd[2:14] if len(cmd) >= 14 else cmd[2:]
self.logger.info(f"Batch ID set: {self.batch_id}")
elif cmd.startswith("IS"):
self.sub_id = cmd[2:14] if len(cmd) >= 14 else cmd[2:]
self.logger.info(f"Sub ID set: {self.sub_id}")
elif cmd.startswith("IG"):
self.grand_id = cmd[2:14] if len(cmd) >= 14 else cmd[2:]
self.logger.info(f"Grand ID set: {self.grand_id}")
status = self.get_status()
response = f"ST{self.count_mode}{status}"
return self.create_message(response)
def _simulate_counting(self):
"""Simulate coin counting when motor runs"""
# Generate random counts for simulation
denominations = ["005", "010", "025", "050", "100"]
for denom in denominations:
count = random.randint(10, 100)
self.batch_counts[denom] = self.batch_counts.get(denom, 0) + count
self.partial_counts[denom] = self.partial_counts.get(denom, 0) + count
def create_message(self, data: str) -> bytes:
"""Create a message with STX and ETX"""
return bytes([STX]) + data.encode('ascii') + bytes([ETX])
def parse_message(self, message: bytes) -> Optional[bytes]:
"""Parse and validate a received message"""
if len(message) < 3:
self.logger.error("Message too short")
return None
if message[0] != STX:
self.logger.error("Invalid STX")
return None
if message[-1] != ETX:
self.logger.error("Invalid ETX")
return None
data = message[1:-1]
return data
def handle_command(self, data: bytes) -> Optional[bytes]:
"""Route command to appropriate handler"""
if len(data) < 2:
return None
try:
cmd_str = data.decode('ascii', errors='ignore').strip()
cmd = cmd_str[:2]
self.logger.info(f"Received command: {cmd_str}")
if cmd == "SS":
return self.handle_status_command(data)
elif cmd in ["CB", "CS", "CG"] or cmd_str.startswith("CC"):
return self.handle_clear_command(data)
elif cmd in ["GD", "GT", "GS", "GG", "GI", "GV"]:
return self.handle_get_data_command(data)
elif cmd == "SV":
return self.handle_bag_stop_command(data)
elif cmd in ["MG", "MS"]:
return self.handle_motor_command(data)
elif cmd == "BB":
return self.handle_beep_command(data)
elif cmd in ["AB", "AS", "AG"] or cmd_str[:2] in ["Ab", "As", "Ag"]:
return self.handle_accept_command(data)
elif cmd == "PC":
return self.handle_partial_count_command(data)
elif cmd in ["ID", "IS", "IG"]:
return self.handle_id_command(data)
else:
self.logger.warning(f"Unknown command: {cmd}")
return None
except Exception as e:
self.logger.error(f"Error handling command: {e}", exc_info=True)
return None
def run(self):
"""Main simulator loop"""
self.logger.info(
f"Starting Glory MACH6 simulator on {self.port} at {self.baudrate} baud"
)
self.logger.info("Enhanced Serial Port Mode enabled")
try:
self.serial_conn = serial.Serial(
port=self.port,
baudrate=self.baudrate,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=1,
)
self.logger.info("Serial port opened successfully")
buffer = bytearray()
while True:
# Handle ACK/NAK for pending messages
if self.pending_message:
# Wait for ACK or NAK
if self.serial_conn.in_waiting > 0:
byte_data = self.serial_conn.read(1)
if byte_data[0] == ACK:
self.logger.info("Received ACK - clearing pending message")
# Clear batch after ACK
if "BT" in self.pending_message:
self.batch_counts = {}
self.pending_message = None
elif byte_data[0] == NAK:
self.logger.info("Received NAK - retransmitting")
response = self.create_message(self.pending_message)
self.serial_conn.write(response)
time.sleep(0.01)
continue
if self.serial_conn.in_waiting > 0:
data = self.serial_conn.read(self.serial_conn.in_waiting)
buffer.extend(data)
# Look for complete message (STX...ETX)
if STX in buffer and ETX in buffer:
stx_idx = buffer.index(STX)
try:
etx_idx = buffer.index(ETX, stx_idx)
message = bytes(buffer[stx_idx : etx_idx + 1])
buffer = buffer[etx_idx + 1 :]
self.logger.debug(f"RX: {message.hex()}")
# Parse and handle message
parsed_data = self.parse_message(message)
if parsed_data:
response = self.handle_command(parsed_data)
if response:
self.logger.debug(f"TX: {response.hex()}")
self.serial_conn.write(response)
except ValueError:
pass # ETX not found yet
time.sleep(0.01)
except KeyboardInterrupt:
self.logger.info("Simulator stopped by user")
except Exception as e:
self.logger.error(f"Error: {e}", exc_info=True)
finally:
if self.serial_conn and self.serial_conn.is_open:
self.serial_conn.close()
self.logger.info("Serial port closed")
def main():
parser = argparse.ArgumentParser(description="Glory MACH6 Device Simulator")
parser.add_argument(
"--port",
"-p",
default="/dev/ttyUSB0",
help="Serial port (default: /dev/ttyUSB0)",
)
parser.add_argument(
"--baudrate", "-b", type=int, default=9600, help="Baud rate (default: 9600)"
)
args = parser.parse_args()
simulator = GlorySimulator(args.port, args.baudrate)
simulator.run()
if __name__ == "__main__":
main()

543
pelican.py Normal file
View File

@ -0,0 +1,543 @@
#!/usr/bin/env python3
"""
Pelican Device Simulator
Simulates a Pelican coin counting device for development/testing purposes.
"""
import argparse
import logging
import random
import time
from typing import Optional
import serial
# Protocol constants
STX = 0x02
ETX = 0x03
ACK = 0x06
NACK = 0x15
# Commands
CMD_CONSTRUCT_LINK = 0x01
CMD_RESPONSE_CONSTRUCT_LINK = 0x02
CMD_DESTRUCT_LINK = 0x03
CMD_RESPONSE_DESTRUCT_LINK = 0x04
CMD_GET_VALUE = 0x11
CMD_VALUE_RETURNED = 0x12
CMD_SET_VALUE = 0x21
CMD_RESPONSE_SET_VALUE = 0x22
CMD_GET_DISPLAY = 0x31
CMD_RESPONSE_GET_DISPLAY = 0x32
CMD_SET_DISPLAY = 0x33
CMD_RESPONSE_SET_DISPLAY = 0x34
CMD_LOCK_DISPLAY = 0x37
CMD_RESPONSE_LOCK_DISPLAY = 0x38
CMD_LOCK_KEYBOARD = 0x39
CMD_RESPONSE_LOCK_KEYBOARD = 0x3A
CMD_GET_STATUS = 0x33
# Password for construct link
PASSWORD = "69390274"
class PelicanSimulator:
def __init__(self, port: str, baudrate: int = 9600):
self.port = port
self.baudrate = baudrate
self.serial_conn: Optional[serial.Serial] = None
self.link_established = False
# Device state
self.display_line1 = " " * 20
self.display_line2 = " " * 20
self.display_locked = False
self.keyboard_locked = False
self.motor_running = False
self.program_state = 0x00 # No coins expected
# Coin counters (20 coin types)
self.current_count = [0] * 20
self.total_count = [0] * 20
self.rejected_count = 0
# Software version info
self.sw_version = 0x01020304 # 1.2.3.4
self.sw_code = 0x12345678
self.host_version = 0x09033003 # From document name
# Coin denominations (in cents/hundredths) for 20 coin types
# First 8 are the requested denominations: 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2
self.denominations = [
1, # 0.01
2, # 0.02
5, # 0.05
10, # 0.10
20, # 0.20
50, # 0.50
100, # 1.00
200, # 2.00
]
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
self.logger = logging.getLogger(__name__)
def calc_crc(self, data: bytes) -> int:
"""Calculate CRC-16 using the algorithm from the spec"""
crc = 0
for byte in data:
crc = crc ^ (byte << 8)
for _ in range(8):
if (crc & 0x8000) != 0:
crc = (crc << 1) ^ 0x1021
else:
crc = crc << 1
crc = crc & 0xFFFF
return crc
def create_message(self, data: bytes) -> bytes:
"""Create a message with STX, length, data, CRC, and ETX"""
length = len(data)
crc = self.calc_crc(data)
crc_hi = (crc >> 8) & 0xFF
crc_lo = crc & 0xFF
message = bytes([STX, length]) + data + bytes([crc_hi, crc_lo, ETX])
return message
def parse_message(self, message: bytes) -> Optional[bytes]:
"""Parse and validate a received message"""
if len(message) < 5:
self.logger.error("Message too short")
return None
if message[0] != STX:
self.logger.error("Invalid STX")
return None
if message[-1] != ETX:
self.logger.error("Invalid ETX")
return None
length = message[1]
data = message[2 : 2 + length]
if len(data) != length:
self.logger.error("Length mismatch")
return None
crc_received = (message[2 + length] << 8) | message[2 + length + 1]
crc_calculated = self.calc_crc(data)
if crc_received != crc_calculated:
self.logger.error(
f"CRC mismatch: received {crc_received:04X}, calculated {crc_calculated:04X}"
)
return None
return data
def handle_construct_link(self, data: bytes) -> bytes:
"""Handle construct link command (0x01)"""
if len(data) < 9:
return self.create_message(bytes([CMD_RESPONSE_CONSTRUCT_LINK, 0x01]))
password = data[1:9].decode("ascii", errors="ignore")
if password == PASSWORD:
self.link_established = True
self.logger.info("Link established successfully")
return self.create_message(bytes([CMD_RESPONSE_CONSTRUCT_LINK, 0x00]))
else:
self.logger.warning(f"Invalid password: {password}")
return self.create_message(bytes([CMD_RESPONSE_CONSTRUCT_LINK, 0x01]))
def handle_destruct_link(self, data: bytes) -> bytes:
"""Handle destruct link command (0x03)"""
if self.link_established:
self.link_established = False
self.logger.info("Link destructed")
return self.create_message(bytes([CMD_RESPONSE_DESTRUCT_LINK, 0x00]))
else:
return self.create_message(bytes([CMD_RESPONSE_DESTRUCT_LINK, 0x01]))
def handle_get_value(self, data: bytes) -> bytes:
"""Handle get value command (0x11)"""
if not self.link_established:
self.logger.warning("Get value request rejected - link not established")
return self.create_message(bytes([CMD_VALUE_RETURNED, 0x01]))
if len(data) < 2:
self.logger.error("Get value request missing variable number")
return self.create_message(bytes([CMD_VALUE_RETURNED, 0x01]))
var_num = data[1]
self.logger.info(f"Get value request for variable 0x{var_num:02X}")
# Variable 0x16 - Current counting result
if var_num == 0x16:
self.logger.info("Responding with current counting result")
# Generate random current counts for simulation
random_counts = []
for i in range(20):
# Generate random counts, with higher probability for lower denominations
if i < 8: # Standard denominations (0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2)
max_count = max(1, 100 - i * 10) # Lower denominations have higher counts
random_counts.append(random.randint(0, max_count))
else: # Random denominations
random_counts.append(random.randint(0, 20))
# Random rejected count
random_rejected = random.randint(0, 10)
response = [CMD_VALUE_RETURNED, 0x00, 0x16]
# Add 20 coins (4 bytes each)
for count in random_counts:
response.extend(
[
(count >> 24) & 0xFF,
(count >> 16) & 0xFF,
(count >> 8) & 0xFF,
count & 0xFF,
]
)
# Add rejected count
response.extend(
[
(random_rejected >> 24) & 0xFF,
(random_rejected >> 16) & 0xFF,
(random_rejected >> 8) & 0xFF,
random_rejected & 0xFF,
]
)
total_coins = sum(random_counts)
# Log detailed count information
self.logger.info(f"Current count: {total_coins} total coins, {random_rejected} rejected")
# Log counts for standard denominations
standard_denoms = ["0.01", "0.02", "0.05", "0.10", "0.20", "0.50", "1.00", "2.00"]
count_details = []
for i in range(8):
if random_counts[i] > 0:
count_details.append(f"{standard_denoms[i]}: {random_counts[i]}")
if count_details:
self.logger.info(f"Coin counts: {', '.join(count_details)}")
return self.create_message(bytes(response))
# Variable 0x1C - Total counting result
elif var_num == 0x1C:
self.logger.info("Responding with total counting result")
response = [CMD_VALUE_RETURNED, 0x00, 0x1C]
for count in self.total_count:
response.extend(
[
(count >> 24) & 0xFF,
(count >> 16) & 0xFF,
(count >> 8) & 0xFF,
count & 0xFF,
]
)
total_coins = sum(self.total_count)
self.logger.info(f"Total count: {total_coins} total coins since last reset")
return self.create_message(bytes(response))
# Variable 0x31 - Software version
elif var_num == 0x31:
self.logger.info("Responding with software version information")
response = [CMD_VALUE_RETURNED, 0x00, 0x31]
# SW version
response.extend(
[
(self.sw_version >> 24) & 0xFF,
(self.sw_version >> 16) & 0xFF,
(self.sw_version >> 8) & 0xFF,
self.sw_version & 0xFF,
]
)
# SW code
response.extend(
[
(self.sw_code >> 24) & 0xFF,
(self.sw_code >> 16) & 0xFF,
(self.sw_code >> 8) & 0xFF,
self.sw_code & 0xFF,
]
)
# HOST version
response.extend(
[
(self.host_version >> 24) & 0xFF,
(self.host_version >> 16) & 0xFF,
(self.host_version >> 8) & 0xFF,
self.host_version & 0xFF,
]
)
self.logger.info(f"SW Version: {self.sw_version:08X}, SW Code: {self.sw_code:08X}, Host Version: {self.host_version:08X}")
return self.create_message(bytes(response))
# Variable 0x33 - Machine status
elif var_num == 0x33:
self.logger.info("Responding with machine status")
status_flags_1 = 0x00
if self.motor_running:
status_flags_1 |= 0x01
status_flags_2 = 0x00
if self.display_locked:
status_flags_2 |= 0x01
if self.keyboard_locked:
status_flags_2 |= 0x04
response = [
CMD_VALUE_RETURNED,
0x00,
0x33,
self.program_state,
status_flags_1,
status_flags_2,
0x00, # flags 3
0x00, # flags 4
0x00, # flags 5
0x00, # flags 6
0x00, # flags 7
0x00, # flags 8
]
# Human readable status
program_states = {
0x00: "No coins expected",
0x01: "Counting mode",
0x02: "Tubing mode",
0x20: "Memory",
0x40: "Programming",
0x80: "Setup"
}
state_desc = program_states.get(self.program_state, f"Unknown (0x{self.program_state:02X})")
status_desc = []
if self.motor_running:
status_desc.append("motor running")
if self.display_locked:
status_desc.append("display locked")
if self.keyboard_locked:
status_desc.append("keyboard locked")
status_str = ", ".join(status_desc) if status_desc else "all unlocked"
self.logger.info(f"Machine status: {state_desc}, {status_str}")
return self.create_message(bytes(response))
# Variable 0x1F - Get denomination values
elif var_num == 0x1F:
self.logger.info("Responding with coin denomination values")
response = [CMD_VALUE_RETURNED, 0x00, 0x1F]
# Add denomination values for all 20 coin types (4 bytes each)
for denomination in self.denominations:
response.extend(
[
(denomination >> 24) & 0xFF,
(denomination >> 16) & 0xFF,
(denomination >> 8) & 0xFF,
denomination & 0xFF,
]
)
# Format denominations for human-readable logging
formatted_denoms = []
for i, denom in enumerate(self.denominations):
if denom >= 100:
formatted_denoms.append(f"Coin{i+1}: {denom/100:.2f}")
else:
formatted_denoms.append(f"Coin{i+1}: 0.{denom:02d}")
self.logger.info(f"Denomination values (20 coins): {', '.join(formatted_denoms[:8])}")
if len(formatted_denoms) > 8:
self.logger.info(f"Additional denominations: {', '.join(formatted_denoms[8:])}")
return self.create_message(bytes(response))
else:
self.logger.warning(f"Unknown variable number: 0x{var_num:02X}")
return self.create_message(bytes([CMD_VALUE_RETURNED, 0x01]))
def handle_get_display(self, data: bytes) -> bytes:
"""Handle get display command (0x31)"""
if not self.link_established:
return self.create_message(bytes([CMD_RESPONSE_GET_DISPLAY, 0x01]))
response = [CMD_RESPONSE_GET_DISPLAY, 0x00]
response.extend([ord(c) for c in self.display_line1])
response.extend([ord(c) for c in self.display_line2])
return self.create_message(bytes(response))
def handle_set_display(self, data: bytes) -> bytes:
"""Handle set display command (0x33)"""
if not self.link_established:
return self.create_message(bytes([CMD_RESPONSE_SET_DISPLAY, 0x01, 0x00]))
if len(data) < 2:
return self.create_message(bytes([CMD_RESPONSE_SET_DISPLAY, 0x01, 0x00]))
control = data[1]
clear_first = (control & 0x80) != 0
line = (control >> 5) & 0x03
position = control & 0x1F
if clear_first:
self.display_line1 = " " * 20
self.display_line2 = " " * 20
text = data[2:].decode("ascii", errors="ignore")
if line == 0:
new_line = list(self.display_line1)
for i, c in enumerate(text):
if position + i < 20:
new_line[position + i] = c
self.display_line1 = "".join(new_line)
elif line == 1:
new_line = list(self.display_line2)
for i, c in enumerate(text):
if position + i < 20:
new_line[position + i] = c
self.display_line2 = "".join(new_line)
self.logger.info(
f"Display updated: L1='{self.display_line1}' L2='{self.display_line2}'"
)
control_byte = 0x01 if self.display_locked else 0x00
return self.create_message(
bytes([CMD_RESPONSE_SET_DISPLAY, 0x00, control_byte])
)
def handle_lock_display(self, data: bytes) -> bytes:
"""Handle lock display command (0x37)"""
if len(data) < 2:
return self.create_message(bytes([CMD_RESPONSE_LOCK_DISPLAY, 0x01]))
self.display_locked = data[1] == 0x01
self.logger.info(f"Display {'locked' if self.display_locked else 'unlocked'}")
return self.create_message(bytes([CMD_RESPONSE_LOCK_DISPLAY, 0x00]))
def handle_lock_keyboard(self, data: bytes) -> bytes:
"""Handle lock keyboard command (0x39)"""
if len(data) < 2:
return self.create_message(bytes([CMD_RESPONSE_LOCK_KEYBOARD, 0x01]))
self.keyboard_locked = data[1] == 0x01
self.logger.info(f"Keyboard {'locked' if self.keyboard_locked else 'unlocked'}")
return self.create_message(bytes([CMD_RESPONSE_LOCK_KEYBOARD, 0x00]))
def handle_command(self, data: bytes) -> Optional[bytes]:
"""Route command to appropriate handler"""
if len(data) == 0:
return None
cmd = data[0]
self.logger.info(f"Received command: 0x{cmd:02X}")
if cmd == CMD_CONSTRUCT_LINK:
return self.handle_construct_link(data)
elif cmd == CMD_DESTRUCT_LINK:
return self.handle_destruct_link(data)
elif cmd == CMD_GET_VALUE:
return self.handle_get_value(data)
elif cmd == CMD_GET_DISPLAY:
return self.handle_get_display(data)
elif cmd == CMD_SET_DISPLAY:
return self.handle_set_display(data)
elif cmd == CMD_LOCK_DISPLAY:
return self.handle_lock_display(data)
elif cmd == CMD_LOCK_KEYBOARD:
return self.handle_lock_keyboard(data)
else:
self.logger.warning(f"Unknown command: 0x{cmd:02X}")
return None
def run(self):
"""Main simulator loop"""
self.logger.info(
f"Starting Pelican simulator on {self.port} at {self.baudrate} baud"
)
try:
self.serial_conn = serial.Serial(
port=self.port,
baudrate=self.baudrate,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=1,
)
self.logger.info("Serial port opened successfully")
buffer = bytearray()
while True:
if self.serial_conn.in_waiting > 0:
data = self.serial_conn.read(self.serial_conn.in_waiting)
buffer.extend(data)
# Look for complete message (STX...ETX)
if STX in buffer and ETX in buffer:
stx_idx = buffer.index(STX)
try:
etx_idx = buffer.index(ETX, stx_idx)
message = bytes(buffer[stx_idx : etx_idx + 1])
buffer = buffer[etx_idx + 1 :]
self.logger.debug(f"RX: {message.hex()}")
# Parse and handle message
parsed_data = self.parse_message(message)
if parsed_data:
response = self.handle_command(parsed_data)
if response:
self.logger.debug(f"TX: {response.hex()}")
self.serial_conn.write(response)
except ValueError:
pass # ETX not found yet
time.sleep(0.01)
except KeyboardInterrupt:
self.logger.info("Simulator stopped by user")
except Exception as e:
self.logger.error(f"Error: {e}", exc_info=True)
finally:
if self.serial_conn and self.serial_conn.is_open:
self.serial_conn.close()
self.logger.info("Serial port closed")
def main():
parser = argparse.ArgumentParser(description="Pelican Device Simulator")
parser.add_argument(
"--port",
"-p",
default="/dev/ttyUSB0",
help="Serial port (default: /dev/ttyUSB0)",
)
parser.add_argument(
"--baudrate", "-b", type=int, default=9600, help="Baud rate (default: 9600)"
)
args = parser.parse_args()
simulator = PelicanSimulator(args.port, args.baudrate)
simulator.run()
if __name__ == "__main__":
main()

7
pyproject.toml Normal file
View File

@ -0,0 +1,7 @@
[project]
name = "coin-counter-simulators"
version = "0.1.0"
description = "Coin counter device simulator for development/testing"
readme = "README.md"
requires-python = ">=3.8"
dependencies = ["pyserial>=3.5"]

23
uv.lock generated Normal file
View File

@ -0,0 +1,23 @@
version = 1
revision = 3
requires-python = ">=3.8"
[[package]]
name = "pelican-simulator"
version = "0.1.0"
source = { virtual = "." }
dependencies = [
{ name = "pyserial" },
]
[package.metadata]
requires-dist = [{ name = "pyserial", specifier = ">=3.5" }]
[[package]]
name = "pyserial"
version = "3.5"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/1e/7d/ae3f0a63f41e4d2f6cb66a5b57197850f919f59e558159a4dd3a818f5082/pyserial-3.5.tar.gz", hash = "sha256:3c77e014170dfffbd816e6ffc205e9842efb10be9f58ec16d3e8675b4925cddb", size = 159125, upload-time = "2020-11-23T03:59:15.045Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/07/bc/587a445451b253b285629263eb51c2d8e9bcea4fc97826266d186f96f558/pyserial-3.5-py2.py3-none-any.whl", hash = "sha256:c4451db6ba391ca6ca299fb3ec7bae67a5c55dde170964c7a14ceefec02f2cf0", size = 90585, upload-time = "2020-11-23T03:59:13.41Z" },
]