Files
coin-counter-simulators/source/pelican.py
2025-10-14 08:20:19 +02:00

669 lines
23 KiB
Python

#!/usr/bin/env python3
"""
Pelican Device Simulator
Simulates a Pelican coin counting device for development/testing purposes.
"""
import random
import time
from typing import Optional
import serial
from loguru import logger
# Protocol constants
STX = 0x02
ETX = 0x03
ACK = 0x06
NACK = 0x15
# Commands
CMD_CONSTRUCT_LINK = 0x01
CMD_RESPONSE_CONSTRUCT_LINK = 0x02
CMD_DESTRUCT_LINK = 0x03
CMD_RESPONSE_DESTRUCT_LINK = 0x04
CMD_GET_VALUE = 0x11
CMD_VALUE_RETURNED = 0x12
CMD_SET_VALUE = 0x21
CMD_RESPONSE_SET_VALUE = 0x22
CMD_GET_DISPLAY = 0x31
CMD_RESPONSE_GET_DISPLAY = 0x32
CMD_SET_DISPLAY = 0x33
CMD_RESPONSE_SET_DISPLAY = 0x34
CMD_LOCK_DISPLAY = 0x37
CMD_RESPONSE_LOCK_DISPLAY = 0x38
CMD_LOCK_KEYBOARD = 0x39
CMD_RESPONSE_LOCK_KEYBOARD = 0x3A
CMD_GET_STATUS = 0x33
# Password for construct link
PASSWORD = "69390274"
class PelicanSimulator:
def __init__(self, port: str, baudrate: int = 9600):
self.port = port
self.baudrate = baudrate
self.serial_conn: Optional[serial.Serial] = None
self.link_established = False
# Device state
self.display_line1 = " " * 20
self.display_line2 = " " * 20
self.display_locked = False
self.keyboard_locked = False
self.motor_running = False
self.program_state = 0x00 # No coins expected
# Coin counters (20 coin types)
self.current_count = [0] * 20
self.total_count = [0] * 20
self.rejected_count = 0
# Software version info
self.sw_version = 0x01020304 # 1.2.3.4
self.sw_code = 0x12345678
self.host_version = 0x09033003 # From document name
# Coin denominations (in cents/hundredths) for 20 coin types
# First 8 are the requested denominations: 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2
self.denominations = [
1, # 0.01
2, # 0.02
5, # 0.05
10, # 0.10
20, # 0.20
50, # 0.50
100, # 1.00
200, # 2.00
]
def calc_crc(self, data: bytes) -> int:
"""Calculate CRC-16 using the algorithm from the spec"""
crc = 0
for byte in data:
crc = crc ^ (byte << 8)
for _ in range(8):
if (crc & 0x8000) != 0:
crc = (crc << 1) ^ 0x1021
else:
crc = crc << 1
crc = crc & 0xFFFF
return crc
def create_message(self, data: bytes) -> bytes:
"""Create a message with STX, length, data, CRC, and ETX"""
length = len(data)
crc = self.calc_crc(data)
crc_hi = (crc >> 8) & 0xFF
crc_lo = crc & 0xFF
message = bytes([STX, length]) + data + bytes([crc_hi, crc_lo, ETX])
return message
def parse_message(self, message: bytes) -> Optional[bytes]:
"""Parse and validate a received message"""
if len(message) < 5:
logger.error("Message too short")
return None
if message[0] != STX:
logger.error("Invalid STX")
return None
if message[-1] != ETX:
logger.error("Invalid ETX")
return None
length = message[1]
data = message[2 : 2 + length]
if len(data) != length:
logger.error("Length mismatch")
return None
crc_received = (message[2 + length] << 8) | message[2 + length + 1]
crc_calculated = self.calc_crc(data)
if crc_received != crc_calculated:
logger.error(
f"CRC mismatch: received {crc_received:04X}, calculated {crc_calculated:04X}"
)
return None
return data
def handle_construct_link(self, data: bytes) -> bytes:
"""Handle construct link command (0x01)"""
if len(data) < 9:
return self.create_message(bytes([CMD_RESPONSE_CONSTRUCT_LINK, 0x01]))
password = data[1:9].decode("ascii", errors="ignore")
if password == PASSWORD:
self.link_established = True
logger.info("Link established successfully")
return self.create_message(bytes([CMD_RESPONSE_CONSTRUCT_LINK, 0x00]))
else:
logger.warning(f"Invalid password: {password}")
return self.create_message(bytes([CMD_RESPONSE_CONSTRUCT_LINK, 0x01]))
def handle_destruct_link(self, data: bytes) -> bytes:
"""Handle destruct link command (0x03)"""
if self.link_established:
self.link_established = False
logger.info("Link destructed")
return self.create_message(bytes([CMD_RESPONSE_DESTRUCT_LINK, 0x00]))
else:
return self.create_message(bytes([CMD_RESPONSE_DESTRUCT_LINK, 0x01]))
def handle_get_value(self, data: bytes) -> bytes:
"""Handle get value command (0x11)"""
if not self.link_established:
logger.warning("Get value request rejected - link not established")
return self.create_message(bytes([CMD_VALUE_RETURNED, 0x01]))
if len(data) < 2:
logger.error("Get value request missing variable number")
return self.create_message(bytes([CMD_VALUE_RETURNED, 0x01]))
var_num = data[1]
logger.info(f"Get value request for variable 0x{var_num:02X}")
# Variable 0x16 - Current counting result
if var_num == 0x16:
logger.info("Responding with current counting result")
# Generate random current counts for simulation
random_counts = []
for i in range(20):
# Generate random counts, with higher probability for lower denominations
if (
i < 8
): # Standard denominations (0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2)
max_count = max(
1, 100 - i * 10
) # Lower denominations have higher counts
random_counts.append(random.randint(0, max_count))
else: # Random denominations
random_counts.append(random.randint(0, 20))
# Random rejected count
random_rejected = random.randint(0, 10)
response = [CMD_VALUE_RETURNED, 0x00, 0x16]
# Add 20 coins (4 bytes each)
for count in random_counts:
response.extend(
[
(count >> 24) & 0xFF,
(count >> 16) & 0xFF,
(count >> 8) & 0xFF,
count & 0xFF,
]
)
# Add rejected count
response.extend(
[
(random_rejected >> 24) & 0xFF,
(random_rejected >> 16) & 0xFF,
(random_rejected >> 8) & 0xFF,
random_rejected & 0xFF,
]
)
total_coins = sum(random_counts)
# Log detailed count information
logger.info(
f"Current count: {total_coins} total coins, {random_rejected} rejected"
)
# Log counts for standard denominations
standard_denoms = [
"0.01",
"0.02",
"0.05",
"0.10",
"0.20",
"0.50",
"1.00",
"2.00",
]
count_details = []
for i in range(8):
if random_counts[i] > 0:
count_details.append(f"{standard_denoms[i]}: {random_counts[i]}")
if count_details:
logger.info(f"Coin counts: {', '.join(count_details)}")
return self.create_message(bytes(response))
# Variable 0x1C - Total counting result
elif var_num == 0x1C:
logger.info("Responding with total counting result")
response = [CMD_VALUE_RETURNED, 0x00, 0x1C]
for count in self.total_count:
response.extend(
[
(count >> 24) & 0xFF,
(count >> 16) & 0xFF,
(count >> 8) & 0xFF,
count & 0xFF,
]
)
total_coins = sum(self.total_count)
logger.info(f"Total count: {total_coins} total coins since last reset")
return self.create_message(bytes(response))
# Variable 0x31 - Software version
elif var_num == 0x31:
logger.info("Responding with software version information")
response = [CMD_VALUE_RETURNED, 0x00, 0x31]
# SW version
response.extend(
[
(self.sw_version >> 24) & 0xFF,
(self.sw_version >> 16) & 0xFF,
(self.sw_version >> 8) & 0xFF,
self.sw_version & 0xFF,
]
)
# SW code
response.extend(
[
(self.sw_code >> 24) & 0xFF,
(self.sw_code >> 16) & 0xFF,
(self.sw_code >> 8) & 0xFF,
self.sw_code & 0xFF,
]
)
# HOST version
response.extend(
[
(self.host_version >> 24) & 0xFF,
(self.host_version >> 16) & 0xFF,
(self.host_version >> 8) & 0xFF,
self.host_version & 0xFF,
]
)
logger.info(
f"SW Version: {self.sw_version:08X}, SW Code: {self.sw_code:08X}, Host Version: {self.host_version:08X}"
)
return self.create_message(bytes(response))
# Variable 0x33 - Machine status
elif var_num == 0x33:
logger.info("Responding with machine status")
status_flags_1 = 0x00
if self.motor_running:
status_flags_1 |= 0x01
status_flags_2 = 0x00
if self.display_locked:
status_flags_2 |= 0x01
if self.keyboard_locked:
status_flags_2 |= 0x04
response = [
CMD_VALUE_RETURNED,
0x00,
0x33,
self.program_state,
status_flags_1,
status_flags_2,
0x00, # flags 3
0x00, # flags 4
0x00, # flags 5
0x00, # flags 6
0x00, # flags 7
0x00, # flags 8
]
# Human readable status
program_states = {
0x00: "No coins expected",
0x01: "Counting mode",
0x02: "Tubing mode",
0x20: "Memory",
0x40: "Programming",
0x80: "Setup",
}
state_desc = program_states.get(
self.program_state, f"Unknown (0x{self.program_state:02X})"
)
status_desc = []
if self.motor_running:
status_desc.append("motor running")
if self.display_locked:
status_desc.append("display locked")
if self.keyboard_locked:
status_desc.append("keyboard locked")
status_str = ", ".join(status_desc) if status_desc else "all unlocked"
logger.info(f"Machine status: {state_desc}, {status_str}")
return self.create_message(bytes(response))
# Variable 0x1F - Get denomination values
elif var_num == 0x1F:
logger.info("Responding with coin denomination values")
response = [CMD_VALUE_RETURNED, 0x00, 0x1F]
# Add denomination values for all 20 coin types (4 bytes each)
for denomination in self.denominations:
response.extend(
[
(denomination >> 24) & 0xFF,
(denomination >> 16) & 0xFF,
(denomination >> 8) & 0xFF,
denomination & 0xFF,
]
)
# Format denominations for human-readable logging
formatted_denoms = []
for i, denom in enumerate(self.denominations):
if denom >= 100:
formatted_denoms.append(f"Coin{i+1}: {denom/100:.2f}")
else:
formatted_denoms.append(f"Coin{i+1}: 0.{denom:02d}")
logger.info(
f"Denomination values (20 coins): {', '.join(formatted_denoms[:8])}"
)
if len(formatted_denoms) > 8:
logger.info(
f"Additional denominations: {', '.join(formatted_denoms[8:])}"
)
return self.create_message(bytes(response))
# Variable 0x22 - Coin exit counters (similar to 0x16 but for sorted coins)
elif var_num == 0x22:
logger.info("Responding with coin exit counters")
# Generate random exit counts for simulation
exit_counts = []
for i in range(20):
if i < 8: # Standard denominations
max_count = max(1, 100 - i * 10)
exit_counts.append(random.randint(0, max_count))
else:
exit_counts.append(0) # No exits for unused channels
response = [CMD_VALUE_RETURNED, 0x00, 0x22]
# Add 20 coin exit counters (4 bytes each)
for count in exit_counts:
response.extend(
[
(count >> 24) & 0xFF,
(count >> 16) & 0xFF,
(count >> 8) & 0xFF,
count & 0xFF,
]
)
total_exits = sum(exit_counts)
logger.info(f"Coin exit counters: {total_exits} total coins sorted")
# Log counts for standard denominations
standard_denoms = [
"0.01",
"0.02",
"0.05",
"0.10",
"0.20",
"0.50",
"1.00",
"2.00",
]
exit_details = []
for i in range(8):
if exit_counts[i] > 0:
exit_details.append(f"{standard_denoms[i]}: {exit_counts[i]}")
if exit_details:
logger.info(f"Exit counts: {', '.join(exit_details)}")
return self.create_message(bytes(response))
# Variable 0x23 - Get transaction data
elif var_num == 0x23:
# Need transaction number (2 bytes) after variable number
if len(data) < 4:
logger.error(
"Get transaction data request missing transaction number"
)
return self.create_message(bytes([CMD_VALUE_RETURNED, 0x01]))
transaction_num = (data[2] << 8) | data[3]
logger.info(
f"Responding with transaction data for transaction #{transaction_num}"
)
# Generate simulated transaction data
import time
current_time = time.localtime()
response = [CMD_VALUE_RETURNED, 0x00, 0x23]
# Transaction number (2 bytes)
response.extend([(transaction_num >> 8) & 0xFF, transaction_num & 0xFF])
# Cashier number (2 bytes)
cashier_num = random.randint(1, 99)
response.extend([(cashier_num >> 8) & 0xFF, cashier_num & 0xFF])
# Not used yet (1 byte)
response.append(0x00)
# Hour, Minute
response.extend([current_time.tm_hour, current_time.tm_min])
# Year (2-digit), Month, Day
response.extend(
[current_time.tm_year % 100, current_time.tm_mon, current_time.tm_mday]
)
# Fee amount - currency 0 (4 bytes)
fee_amount_0 = random.randint(0, 50000) # Up to 500.00
response.extend(
[
(fee_amount_0 >> 24) & 0xFF,
(fee_amount_0 >> 16) & 0xFF,
(fee_amount_0 >> 8) & 0xFF,
fee_amount_0 & 0xFF,
]
)
# Fee amount - currency 1 (4 bytes)
response.extend([0x00, 0x00, 0x00, 0x00])
# Coin amount - currency 0 (4 bytes)
coin_amount_0 = random.randint(10000, 500000) # 100.00 to 5000.00
response.extend(
[
(coin_amount_0 >> 24) & 0xFF,
(coin_amount_0 >> 16) & 0xFF,
(coin_amount_0 >> 8) & 0xFF,
coin_amount_0 & 0xFF,
]
)
# Coin amount - currency 1 (4 bytes)
response.extend([0x00, 0x00, 0x00, 0x00])
# Note amount - currency 0 (4 bytes) / Account number high for CDS
response.extend([0x00, 0x00, 0x00, 0x00])
# Note amount - currency 1 (4 bytes) / Account number low for CDS
response.extend([0x00, 0x00, 0x00, 0x00])
logger.info(
f"Transaction #{transaction_num}: Cashier {cashier_num}, "
f"Date {current_time.tm_year}/{current_time.tm_mon}/{current_time.tm_mday} "
f"{current_time.tm_hour:02d}:{current_time.tm_min:02d}, "
f"Coin amount: ${coin_amount_0/100:.2f}, Fee: ${fee_amount_0/100:.2f}"
)
return self.create_message(bytes(response))
else:
logger.warning(f"Unknown variable number: 0x{var_num:02X}")
return self.create_message(bytes([CMD_VALUE_RETURNED, 0x01]))
def handle_get_display(self, data: bytes) -> bytes:
"""Handle get display command (0x31)"""
if not self.link_established:
return self.create_message(bytes([CMD_RESPONSE_GET_DISPLAY, 0x01]))
response = [CMD_RESPONSE_GET_DISPLAY, 0x00]
response.extend([ord(c) for c in self.display_line1])
response.extend([ord(c) for c in self.display_line2])
return self.create_message(bytes(response))
def handle_set_display(self, data: bytes) -> bytes:
"""Handle set display command (0x33)"""
if not self.link_established:
return self.create_message(bytes([CMD_RESPONSE_SET_DISPLAY, 0x01, 0x00]))
if len(data) < 2:
return self.create_message(bytes([CMD_RESPONSE_SET_DISPLAY, 0x01, 0x00]))
control = data[1]
clear_first = (control & 0x80) != 0
line = (control >> 5) & 0x03
position = control & 0x1F
if clear_first:
self.display_line1 = " " * 20
self.display_line2 = " " * 20
text = data[2:].decode("ascii", errors="ignore")
if line == 0:
new_line = list(self.display_line1)
for i, c in enumerate(text):
if position + i < 20:
new_line[position + i] = c
self.display_line1 = "".join(new_line)
elif line == 1:
new_line = list(self.display_line2)
for i, c in enumerate(text):
if position + i < 20:
new_line[position + i] = c
self.display_line2 = "".join(new_line)
logger.info(
f"Display updated: L1='{self.display_line1}' L2='{self.display_line2}'"
)
control_byte = 0x01 if self.display_locked else 0x00
return self.create_message(
bytes([CMD_RESPONSE_SET_DISPLAY, 0x00, control_byte])
)
def handle_lock_display(self, data: bytes) -> bytes:
"""Handle lock display command (0x37)"""
if len(data) < 2:
return self.create_message(bytes([CMD_RESPONSE_LOCK_DISPLAY, 0x01]))
self.display_locked = data[1] == 0x01
logger.info(f"Display {'locked' if self.display_locked else 'unlocked'}")
return self.create_message(bytes([CMD_RESPONSE_LOCK_DISPLAY, 0x00]))
def handle_lock_keyboard(self, data: bytes) -> bytes:
"""Handle lock keyboard command (0x39)"""
if len(data) < 2:
return self.create_message(bytes([CMD_RESPONSE_LOCK_KEYBOARD, 0x01]))
self.keyboard_locked = data[1] == 0x01
logger.info(f"Keyboard {'locked' if self.keyboard_locked else 'unlocked'}")
return self.create_message(bytes([CMD_RESPONSE_LOCK_KEYBOARD, 0x00]))
def handle_command(self, data: bytes) -> Optional[bytes]:
"""Route command to appropriate handler"""
if len(data) == 0:
return None
cmd = data[0]
logger.info(f"Received command: 0x{cmd:02X}")
if cmd == CMD_CONSTRUCT_LINK:
return self.handle_construct_link(data)
elif cmd == CMD_DESTRUCT_LINK:
return self.handle_destruct_link(data)
elif cmd == CMD_GET_VALUE:
return self.handle_get_value(data)
elif cmd == CMD_GET_DISPLAY:
return self.handle_get_display(data)
elif cmd == CMD_SET_DISPLAY:
return self.handle_set_display(data)
elif cmd == CMD_LOCK_DISPLAY:
return self.handle_lock_display(data)
elif cmd == CMD_LOCK_KEYBOARD:
return self.handle_lock_keyboard(data)
else:
logger.warning(f"Unknown command: 0x{cmd:02X}")
return None
def run(self):
"""Main simulator loop"""
logger.info(
f"Starting Pelican simulator on {self.port} at {self.baudrate} baud"
)
try:
self.serial_conn = serial.Serial(
port=self.port,
baudrate=self.baudrate,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=1,
)
logger.info("Serial port opened successfully")
buffer = bytearray()
while True:
if self.serial_conn.in_waiting > 0:
data = self.serial_conn.read(self.serial_conn.in_waiting)
buffer.extend(data)
# Look for complete message (STX...ETX)
if STX in buffer and ETX in buffer:
stx_idx = buffer.index(STX)
try:
etx_idx = buffer.index(ETX, stx_idx)
message = bytes(buffer[stx_idx : etx_idx + 1])
buffer = buffer[etx_idx + 1 :]
logger.debug(
f"RX: {' '.join(f'{b:02X}' for b in message)}"
)
# Parse and handle message
parsed_data = self.parse_message(message)
if parsed_data:
response = self.handle_command(parsed_data)
if response:
logger.debug(
f"TX: {' '.join(f'{b:02X}' for b in response)}"
)
self.serial_conn.write(response)
except ValueError:
pass # ETX not found yet
time.sleep(0.01)
except KeyboardInterrupt:
logger.info("Simulator stopped by user")
except Exception as e:
logger.error(f"Error: {e}", exc_info=True)
finally:
if self.serial_conn and self.serial_conn.is_open:
self.serial_conn.close()
logger.info("Serial port closed")