#!/usr/bin/env python3 """ Pelican Device Simulator Simulates a Pelican coin counting device for development/testing purposes. """ import random import time from typing import Optional import serial from loguru import logger # Protocol constants STX = 0x02 ETX = 0x03 ACK = 0x06 NACK = 0x15 # Commands CMD_CONSTRUCT_LINK = 0x01 CMD_RESPONSE_CONSTRUCT_LINK = 0x02 CMD_DESTRUCT_LINK = 0x03 CMD_RESPONSE_DESTRUCT_LINK = 0x04 CMD_GET_VALUE = 0x11 CMD_VALUE_RETURNED = 0x12 CMD_SET_VALUE = 0x21 CMD_RESPONSE_SET_VALUE = 0x22 CMD_GET_DISPLAY = 0x31 CMD_RESPONSE_GET_DISPLAY = 0x32 CMD_SET_DISPLAY = 0x33 CMD_RESPONSE_SET_DISPLAY = 0x34 CMD_LOCK_DISPLAY = 0x37 CMD_RESPONSE_LOCK_DISPLAY = 0x38 CMD_LOCK_KEYBOARD = 0x39 CMD_RESPONSE_LOCK_KEYBOARD = 0x3A CMD_GET_STATUS = 0x33 # Password for construct link PASSWORD = "69390274" class PelicanSimulator: def __init__(self, port: str, baudrate: int = 9600): self.port = port self.baudrate = baudrate self.serial_conn: Optional[serial.Serial] = None self.link_established = False # Device state self.display_line1 = " " * 20 self.display_line2 = " " * 20 self.display_locked = False self.keyboard_locked = False self.motor_running = False self.program_state = 0x00 # No coins expected # Coin counters (20 coin types) self.current_count = [0] * 20 self.total_count = [0] * 20 self.rejected_count = 0 # Software version info self.sw_version = 0x01020304 # 1.2.3.4 self.sw_code = 0x12345678 self.host_version = 0x09033003 # From document name # Coin denominations (in cents/hundredths) for 20 coin types # First 8 are the requested denominations: 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2 self.denominations = [ 1, # 0.01 2, # 0.02 5, # 0.05 10, # 0.10 20, # 0.20 50, # 0.50 100, # 1.00 200, # 2.00 ] def calc_crc(self, data: bytes) -> int: """Calculate CRC-16 using the algorithm from the spec""" crc = 0 for byte in data: crc = crc ^ (byte << 8) for _ in range(8): if (crc & 0x8000) != 0: crc = (crc << 1) ^ 0x1021 else: crc = crc << 1 crc = crc & 0xFFFF return crc def create_message(self, data: bytes) -> bytes: """Create a message with STX, length, data, CRC, and ETX""" length = len(data) crc = self.calc_crc(data) crc_hi = (crc >> 8) & 0xFF crc_lo = crc & 0xFF message = bytes([STX, length]) + data + bytes([crc_hi, crc_lo, ETX]) return message def parse_message(self, message: bytes) -> Optional[bytes]: """Parse and validate a received message""" if len(message) < 5: logger.error("Message too short") return None if message[0] != STX: logger.error("Invalid STX") return None if message[-1] != ETX: logger.error("Invalid ETX") return None length = message[1] data = message[2 : 2 + length] if len(data) != length: logger.error("Length mismatch") return None crc_received = (message[2 + length] << 8) | message[2 + length + 1] crc_calculated = self.calc_crc(data) if crc_received != crc_calculated: logger.error( f"CRC mismatch: received {crc_received:04X}, calculated {crc_calculated:04X}" ) return None return data def handle_construct_link(self, data: bytes) -> bytes: """Handle construct link command (0x01)""" if len(data) < 9: return self.create_message(bytes([CMD_RESPONSE_CONSTRUCT_LINK, 0x01])) password = data[1:9].decode("ascii", errors="ignore") if password == PASSWORD: self.link_established = True logger.info("Link established successfully") return self.create_message(bytes([CMD_RESPONSE_CONSTRUCT_LINK, 0x00])) else: logger.warning(f"Invalid password: {password}") return self.create_message(bytes([CMD_RESPONSE_CONSTRUCT_LINK, 0x01])) def handle_destruct_link(self, data: bytes) -> bytes: """Handle destruct link command (0x03)""" if self.link_established: self.link_established = False logger.info("Link destructed") return self.create_message(bytes([CMD_RESPONSE_DESTRUCT_LINK, 0x00])) else: return self.create_message(bytes([CMD_RESPONSE_DESTRUCT_LINK, 0x01])) def handle_get_value(self, data: bytes) -> bytes: """Handle get value command (0x11)""" if not self.link_established: logger.warning("Get value request rejected - link not established") return self.create_message(bytes([CMD_VALUE_RETURNED, 0x01])) if len(data) < 2: logger.error("Get value request missing variable number") return self.create_message(bytes([CMD_VALUE_RETURNED, 0x01])) var_num = data[1] logger.info(f"Get value request for variable 0x{var_num:02X}") # Variable 0x16 - Current counting result if var_num == 0x16: logger.info("Responding with current counting result") # Generate random current counts for simulation random_counts = [] for i in range(20): # Generate random counts, with higher probability for lower denominations if ( i < 8 ): # Standard denominations (0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2) max_count = max( 1, 100 - i * 10 ) # Lower denominations have higher counts random_counts.append(random.randint(0, max_count)) else: # Random denominations random_counts.append(random.randint(0, 20)) # Random rejected count random_rejected = random.randint(0, 10) response = [CMD_VALUE_RETURNED, 0x00, 0x16] # Add 20 coins (4 bytes each) for count in random_counts: response.extend( [ (count >> 24) & 0xFF, (count >> 16) & 0xFF, (count >> 8) & 0xFF, count & 0xFF, ] ) # Add rejected count response.extend( [ (random_rejected >> 24) & 0xFF, (random_rejected >> 16) & 0xFF, (random_rejected >> 8) & 0xFF, random_rejected & 0xFF, ] ) total_coins = sum(random_counts) # Log detailed count information logger.info( f"Current count: {total_coins} total coins, {random_rejected} rejected" ) # Log counts for standard denominations standard_denoms = [ "0.01", "0.02", "0.05", "0.10", "0.20", "0.50", "1.00", "2.00", ] count_details = [] for i in range(8): if random_counts[i] > 0: count_details.append(f"{standard_denoms[i]}: {random_counts[i]}") if count_details: logger.info(f"Coin counts: {', '.join(count_details)}") return self.create_message(bytes(response)) # Variable 0x1C - Total counting result elif var_num == 0x1C: logger.info("Responding with total counting result") response = [CMD_VALUE_RETURNED, 0x00, 0x1C] for count in self.total_count: response.extend( [ (count >> 24) & 0xFF, (count >> 16) & 0xFF, (count >> 8) & 0xFF, count & 0xFF, ] ) total_coins = sum(self.total_count) logger.info(f"Total count: {total_coins} total coins since last reset") return self.create_message(bytes(response)) # Variable 0x31 - Software version elif var_num == 0x31: logger.info("Responding with software version information") response = [CMD_VALUE_RETURNED, 0x00, 0x31] # SW version response.extend( [ (self.sw_version >> 24) & 0xFF, (self.sw_version >> 16) & 0xFF, (self.sw_version >> 8) & 0xFF, self.sw_version & 0xFF, ] ) # SW code response.extend( [ (self.sw_code >> 24) & 0xFF, (self.sw_code >> 16) & 0xFF, (self.sw_code >> 8) & 0xFF, self.sw_code & 0xFF, ] ) # HOST version response.extend( [ (self.host_version >> 24) & 0xFF, (self.host_version >> 16) & 0xFF, (self.host_version >> 8) & 0xFF, self.host_version & 0xFF, ] ) logger.info( f"SW Version: {self.sw_version:08X}, SW Code: {self.sw_code:08X}, Host Version: {self.host_version:08X}" ) return self.create_message(bytes(response)) # Variable 0x33 - Machine status elif var_num == 0x33: logger.info("Responding with machine status") status_flags_1 = 0x00 if self.motor_running: status_flags_1 |= 0x01 status_flags_2 = 0x00 if self.display_locked: status_flags_2 |= 0x01 if self.keyboard_locked: status_flags_2 |= 0x04 response = [ CMD_VALUE_RETURNED, 0x00, 0x33, self.program_state, status_flags_1, status_flags_2, 0x00, # flags 3 0x00, # flags 4 0x00, # flags 5 0x00, # flags 6 0x00, # flags 7 0x00, # flags 8 ] # Human readable status program_states = { 0x00: "No coins expected", 0x01: "Counting mode", 0x02: "Tubing mode", 0x20: "Memory", 0x40: "Programming", 0x80: "Setup", } state_desc = program_states.get( self.program_state, f"Unknown (0x{self.program_state:02X})" ) status_desc = [] if self.motor_running: status_desc.append("motor running") if self.display_locked: status_desc.append("display locked") if self.keyboard_locked: status_desc.append("keyboard locked") status_str = ", ".join(status_desc) if status_desc else "all unlocked" logger.info(f"Machine status: {state_desc}, {status_str}") return self.create_message(bytes(response)) # Variable 0x1F - Get denomination values elif var_num == 0x1F: logger.info("Responding with coin denomination values") response = [CMD_VALUE_RETURNED, 0x00, 0x1F] # Add denomination values for all 20 coin types (4 bytes each) for denomination in self.denominations: response.extend( [ (denomination >> 24) & 0xFF, (denomination >> 16) & 0xFF, (denomination >> 8) & 0xFF, denomination & 0xFF, ] ) # Format denominations for human-readable logging formatted_denoms = [] for i, denom in enumerate(self.denominations): if denom >= 100: formatted_denoms.append(f"Coin{i+1}: {denom/100:.2f}") else: formatted_denoms.append(f"Coin{i+1}: 0.{denom:02d}") logger.info( f"Denomination values (20 coins): {', '.join(formatted_denoms[:8])}" ) if len(formatted_denoms) > 8: logger.info( f"Additional denominations: {', '.join(formatted_denoms[8:])}" ) return self.create_message(bytes(response)) # Variable 0x22 - Coin exit counters (similar to 0x16 but for sorted coins) elif var_num == 0x22: logger.info("Responding with coin exit counters") # Generate random exit counts for simulation exit_counts = [] for i in range(20): if i < 8: # Standard denominations max_count = max(1, 100 - i * 10) exit_counts.append(random.randint(0, max_count)) else: exit_counts.append(0) # No exits for unused channels response = [CMD_VALUE_RETURNED, 0x00, 0x22] # Add 20 coin exit counters (4 bytes each) for count in exit_counts: response.extend( [ (count >> 24) & 0xFF, (count >> 16) & 0xFF, (count >> 8) & 0xFF, count & 0xFF, ] ) total_exits = sum(exit_counts) logger.info(f"Coin exit counters: {total_exits} total coins sorted") # Log counts for standard denominations standard_denoms = [ "0.01", "0.02", "0.05", "0.10", "0.20", "0.50", "1.00", "2.00", ] exit_details = [] for i in range(8): if exit_counts[i] > 0: exit_details.append(f"{standard_denoms[i]}: {exit_counts[i]}") if exit_details: logger.info(f"Exit counts: {', '.join(exit_details)}") return self.create_message(bytes(response)) # Variable 0x23 - Get transaction data elif var_num == 0x23: # Need transaction number (2 bytes) after variable number if len(data) < 4: logger.error( "Get transaction data request missing transaction number" ) return self.create_message(bytes([CMD_VALUE_RETURNED, 0x01])) transaction_num = (data[2] << 8) | data[3] logger.info( f"Responding with transaction data for transaction #{transaction_num}" ) # Generate simulated transaction data import time current_time = time.localtime() response = [CMD_VALUE_RETURNED, 0x00, 0x23] # Transaction number (2 bytes) response.extend([(transaction_num >> 8) & 0xFF, transaction_num & 0xFF]) # Cashier number (2 bytes) cashier_num = random.randint(1, 99) response.extend([(cashier_num >> 8) & 0xFF, cashier_num & 0xFF]) # Not used yet (1 byte) response.append(0x00) # Hour, Minute response.extend([current_time.tm_hour, current_time.tm_min]) # Year (2-digit), Month, Day response.extend( [current_time.tm_year % 100, current_time.tm_mon, current_time.tm_mday] ) # Fee amount - currency 0 (4 bytes) fee_amount_0 = random.randint(0, 50000) # Up to 500.00 response.extend( [ (fee_amount_0 >> 24) & 0xFF, (fee_amount_0 >> 16) & 0xFF, (fee_amount_0 >> 8) & 0xFF, fee_amount_0 & 0xFF, ] ) # Fee amount - currency 1 (4 bytes) response.extend([0x00, 0x00, 0x00, 0x00]) # Coin amount - currency 0 (4 bytes) coin_amount_0 = random.randint(10000, 500000) # 100.00 to 5000.00 response.extend( [ (coin_amount_0 >> 24) & 0xFF, (coin_amount_0 >> 16) & 0xFF, (coin_amount_0 >> 8) & 0xFF, coin_amount_0 & 0xFF, ] ) # Coin amount - currency 1 (4 bytes) response.extend([0x00, 0x00, 0x00, 0x00]) # Note amount - currency 0 (4 bytes) / Account number high for CDS response.extend([0x00, 0x00, 0x00, 0x00]) # Note amount - currency 1 (4 bytes) / Account number low for CDS response.extend([0x00, 0x00, 0x00, 0x00]) logger.info( f"Transaction #{transaction_num}: Cashier {cashier_num}, " f"Date {current_time.tm_year}/{current_time.tm_mon}/{current_time.tm_mday} " f"{current_time.tm_hour:02d}:{current_time.tm_min:02d}, " f"Coin amount: ${coin_amount_0/100:.2f}, Fee: ${fee_amount_0/100:.2f}" ) return self.create_message(bytes(response)) else: logger.warning(f"Unknown variable number: 0x{var_num:02X}") return self.create_message(bytes([CMD_VALUE_RETURNED, 0x01])) def handle_get_display(self, data: bytes) -> bytes: """Handle get display command (0x31)""" if not self.link_established: return self.create_message(bytes([CMD_RESPONSE_GET_DISPLAY, 0x01])) response = [CMD_RESPONSE_GET_DISPLAY, 0x00] response.extend([ord(c) for c in self.display_line1]) response.extend([ord(c) for c in self.display_line2]) return self.create_message(bytes(response)) def handle_set_display(self, data: bytes) -> bytes: """Handle set display command (0x33)""" if not self.link_established: return self.create_message(bytes([CMD_RESPONSE_SET_DISPLAY, 0x01, 0x00])) if len(data) < 2: return self.create_message(bytes([CMD_RESPONSE_SET_DISPLAY, 0x01, 0x00])) control = data[1] clear_first = (control & 0x80) != 0 line = (control >> 5) & 0x03 position = control & 0x1F if clear_first: self.display_line1 = " " * 20 self.display_line2 = " " * 20 text = data[2:].decode("ascii", errors="ignore") if line == 0: new_line = list(self.display_line1) for i, c in enumerate(text): if position + i < 20: new_line[position + i] = c self.display_line1 = "".join(new_line) elif line == 1: new_line = list(self.display_line2) for i, c in enumerate(text): if position + i < 20: new_line[position + i] = c self.display_line2 = "".join(new_line) logger.info( f"Display updated: L1='{self.display_line1}' L2='{self.display_line2}'" ) control_byte = 0x01 if self.display_locked else 0x00 return self.create_message( bytes([CMD_RESPONSE_SET_DISPLAY, 0x00, control_byte]) ) def handle_lock_display(self, data: bytes) -> bytes: """Handle lock display command (0x37)""" if len(data) < 2: return self.create_message(bytes([CMD_RESPONSE_LOCK_DISPLAY, 0x01])) self.display_locked = data[1] == 0x01 logger.info(f"Display {'locked' if self.display_locked else 'unlocked'}") return self.create_message(bytes([CMD_RESPONSE_LOCK_DISPLAY, 0x00])) def handle_lock_keyboard(self, data: bytes) -> bytes: """Handle lock keyboard command (0x39)""" if len(data) < 2: return self.create_message(bytes([CMD_RESPONSE_LOCK_KEYBOARD, 0x01])) self.keyboard_locked = data[1] == 0x01 logger.info(f"Keyboard {'locked' if self.keyboard_locked else 'unlocked'}") return self.create_message(bytes([CMD_RESPONSE_LOCK_KEYBOARD, 0x00])) def handle_command(self, data: bytes) -> Optional[bytes]: """Route command to appropriate handler""" if len(data) == 0: return None cmd = data[0] logger.info(f"Received command: 0x{cmd:02X}") if cmd == CMD_CONSTRUCT_LINK: return self.handle_construct_link(data) elif cmd == CMD_DESTRUCT_LINK: return self.handle_destruct_link(data) elif cmd == CMD_GET_VALUE: return self.handle_get_value(data) elif cmd == CMD_GET_DISPLAY: return self.handle_get_display(data) elif cmd == CMD_SET_DISPLAY: return self.handle_set_display(data) elif cmd == CMD_LOCK_DISPLAY: return self.handle_lock_display(data) elif cmd == CMD_LOCK_KEYBOARD: return self.handle_lock_keyboard(data) else: logger.warning(f"Unknown command: 0x{cmd:02X}") return None def run(self): """Main simulator loop""" logger.info( f"Starting Pelican simulator on {self.port} at {self.baudrate} baud" ) try: self.serial_conn = serial.Serial( port=self.port, baudrate=self.baudrate, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=1, ) logger.info("Serial port opened successfully") buffer = bytearray() while True: if self.serial_conn.in_waiting > 0: data = self.serial_conn.read(self.serial_conn.in_waiting) buffer.extend(data) # Look for complete message (STX...ETX) if STX in buffer and ETX in buffer: stx_idx = buffer.index(STX) try: etx_idx = buffer.index(ETX, stx_idx) message = bytes(buffer[stx_idx : etx_idx + 1]) buffer = buffer[etx_idx + 1 :] logger.debug( f"RX: {' '.join(f'{b:02X}' for b in message)}" ) # Parse and handle message parsed_data = self.parse_message(message) if parsed_data: response = self.handle_command(parsed_data) if response: logger.debug( f"TX: {' '.join(f'{b:02X}' for b in response)}" ) self.serial_conn.write(response) except ValueError: pass # ETX not found yet time.sleep(0.01) except KeyboardInterrupt: logger.info("Simulator stopped by user") except Exception as e: logger.error(f"Error: {e}", exc_info=True) finally: if self.serial_conn and self.serial_conn.is_open: self.serial_conn.close() logger.info("Serial port closed")