From 569866d4042bee9851d4f98952b5b8eed2743f34 Mon Sep 17 00:00:00 2001 From: Eden Kirin Date: Fri, 24 Oct 2025 13:47:35 +0200 Subject: [PATCH] Selex simulator --- Makefile | 4 + main.py | 24 ++- source/__init__.py | 3 +- source/selex.py | 491 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 520 insertions(+), 2 deletions(-) create mode 100644 source/selex.py diff --git a/Makefile b/Makefile index 38633f2..3571257 100644 --- a/Makefile +++ b/Makefile @@ -4,3 +4,7 @@ run-pelican: run-glory: @ uv run python main.py glory --port /dev/ttyUSB0 --baud 115200 + + +run-selex: + @ uv run python main.py selex --port /dev/ttyUSB0 --baud 115200 diff --git a/main.py b/main.py index e1c1442..36c48c1 100644 --- a/main.py +++ b/main.py @@ -12,11 +12,12 @@ from loguru import logger # Import simulators from source.glory import GlorySimulator from source.pelican import PelicanSimulator +from source.selex import SelexSimulator def main(): parser = argparse.ArgumentParser( - description="Device Simulator - Run Pelican or Glory MACH6 simulator", + description="Device Simulator - Run Pelican, Glory MACH6, or Selex simulator", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: @@ -26,9 +27,13 @@ Examples: # Run Glory simulator python main.py glory --port /dev/ttyUSB1 --baudrate 9600 + # Run Selex simulator + python main.py selex --port /dev/ttyUSB2 --baudrate 9600 + # Get help for specific simulator python main.py pelican --help python main.py glory --help + python main.py selex --help """, ) @@ -65,6 +70,20 @@ Examples: "--baudrate", "-b", type=int, default=115200, help="Baud rate (default: 115200)" ) + # Selex simulator subcommand + selex_parser = subparsers.add_parser( + "selex", help="Run Selex coin counter simulator" + ) + selex_parser.add_argument( + "--port", + "-p", + default="/dev/ttyUSB0", + help="Serial port (default: /dev/ttyUSB0)", + ) + selex_parser.add_argument( + "--baudrate", "-b", type=int, default=9600, help="Baud rate (default: 9600)" + ) + args = parser.parse_args() # Configure loguru logger @@ -88,6 +107,9 @@ Examples: elif args.simulator == "glory": simulator = GlorySimulator(port=args.port, baudrate=args.baudrate) simulator.run() + elif args.simulator == "selex": + simulator = SelexSimulator(port=args.port, baudrate=args.baudrate) + simulator.run() except KeyboardInterrupt: logger.info("Simulator stopped by user") sys.exit(0) diff --git a/source/__init__.py b/source/__init__.py index 6c8f3a0..eb354c1 100644 --- a/source/__init__.py +++ b/source/__init__.py @@ -2,5 +2,6 @@ from .glory import GlorySimulator from .pelican import PelicanSimulator +from .selex import SelexSimulator -__all__ = ["GlorySimulator", "PelicanSimulator"] +__all__ = ["GlorySimulator", "PelicanSimulator", "SelexSimulator"] diff --git a/source/selex.py b/source/selex.py new file mode 100644 index 0000000..7f83604 --- /dev/null +++ b/source/selex.py @@ -0,0 +1,491 @@ +#!/usr/bin/env python3 +""" +Selex Device Simulator +Simulates a Selex coin counting device (MIX-RED, MOON, MOON2000, CARRY & COUNT, ORION) +for development/testing purposes. +Implements the RS232 Communication Protocol MC-SELEX v2.2 +""" + +import random +import time +from typing import Optional + +import serial +from loguru import logger + +# Protocol constants +STX = ord('s') # 0x73 +ETX = ord('e') # 0x65 +CRC = ord('v') # 0x76 +CAN = ord('c') # 0x63 +ACK = ord('a') # 0x61 +EOT = ord('t') # 0x74 + +# Machine states +STATE_IDLE = "01" # Machine stops with finished counting +STATE_NO_MONEY = "02" # Machine stops for lack of money +STATE_COUNTING = "03" # Machine counting process +STATE_ERROR = "04" # Machine stops for error +STATE_BATCHING = "07" # Machine in batching mode +STATE_STOPPED_HOST = "08" # Machine stopped by host +STATE_STOPPED_KEYBOARD = "09" # Machine stopped by keyboard + +# Error codes +ERROR_NONE = "10" +ERROR_DISK_LOCKED_OVERLAP = "11" +ERROR_DISK_LOCKED_MOTOR = "12" +ERROR_LED_SETTING = "13" +ERROR_REJECTED_EXCESS = "14" +ERROR_DEJAM_FAILED = "15" +ERROR_COINS_UNDER_CCD = "17" +ERROR_DOORS_OPEN = "18" +ERROR_CCD_COMM = "19" + + +class SelexSimulator: + def __init__(self, port: str, baudrate: int = 9600): + self.port = port + self.baudrate = baudrate + self.serial_conn: Optional[serial.Serial] = None + + # Device state + self.machine_state = STATE_IDLE + self.error_code = ERROR_NONE + self.has_rejected_coins = False + self.is_counting = False + + # 8 counting lines + self.num_lines = 8 + + # Coin counters for each line (8 lines) + self.actual_counts = [0] * self.num_lines # Current batch + self.partial_totals = [0] * self.num_lines # Accumulated totals + self.batch_values = [0] * self.num_lines # Batch thresholds + self.extra_coins = [0] * self.num_lines # Extra coins + + # Rejected coins counter + self.rejected_coins = 0 + + # Line enable/disable status + self.lines_enabled = [True] * self.num_lines + + # Batching control + self.batching_enabled = True + + # Software version + self.sw_version = "2.2.0" + self.serial_number = "0001234567" + + def create_message(self, data: bytes) -> bytes: + """Create a message with STX, data, CRC placeholder, and ETX""" + return bytes([STX]) + data + bytes([CRC, ETX]) + + def parse_message(self, message: bytes) -> Optional[bytes]: + """Parse and validate a received message""" + if len(message) < 4: + logger.error("Message too short") + return None + + if message[0] != STX: + logger.error("Invalid STX") + return None + + if message[-1] != ETX: + logger.error("Invalid ETX") + return None + + # Extract data (skip STX, CRC, ETX) + data = message[1:-2] + return data + + def handle_status_request(self, data: bytes) -> bytes: + """Handle machine status request (s100ve)""" + # Format: s101ve + status_str = self.machine_state + if self.has_rejected_coins: + # Add 10 to first digit to indicate rejected coins + status_str = str(int(status_str[0]) + 1) + status_str[1] + + response = b'101' + status_str.encode() + self.error_code.encode() + logger.info(f"Status request: state={self.machine_state}, error={self.error_code}, rejected={self.has_rejected_coins}") + return self.create_message(response) + + def handle_start_counting(self, data: bytes) -> bytes: + """Handle counting start command (s200ve)""" + logger.info("Starting new counting session - resetting actual counts") + self.actual_counts = [0] * self.num_lines + self.rejected_coins = 0 + self.has_rejected_coins = False + self.machine_state = STATE_COUNTING + self.is_counting = True + + # Simulate coin counting + self._simulate_counting() + + # After counting simulation, machine stops + self.machine_state = STATE_IDLE + self.is_counting = False + + return bytes([ACK]) + + def handle_stop_counting(self, data: bytes) -> bytes: + """Handle counting stop command (s800ve)""" + logger.info("Stopping counting") + self.machine_state = STATE_STOPPED_HOST + self.is_counting = False + return bytes([ACK]) + + def handle_main_total_reset(self, data: bytes) -> bytes: + """Handle main total reset (s300ve)""" + logger.info("Resetting main total (not used in this simulator)") + return bytes([ACK]) + + def handle_actual_counters_request(self, data: bytes) -> bytes: + """Handle actual counting counters request (s400ve or sc00ve for checksum)""" + cmd_char = chr(data[0]) if len(data) > 0 else '4' + with_checksum = (cmd_char == 'c') + + logger.info(f"Actual counters request (checksum={with_checksum})") + + # For simulator: if no counts yet, simulate counting + if sum(self.actual_counts) == 0: + logger.info("No counts yet - simulating counting for testing") + self._simulate_counting() + + # First ACK + self.serial_conn.write(bytes([ACK])) + time.sleep(0.01) + + # Send each line's count + for line_num in range(1, self.num_lines + 1): + count = self.actual_counts[line_num - 1] + cmd = b'c' if with_checksum else b'4' + response = cmd + b'0' + f"{line_num:02d}{count:06d}".encode() + self.serial_conn.write(self.create_message(response)) + logger.info(f" Line {line_num}: {count} coins") + time.sleep(0.01) + # Wait for ACK + ack = self.serial_conn.read(1) + + # Send checksum if requested + if with_checksum: + total = sum(self.actual_counts) + response = b'c099' + f"{total:06d}".encode() + self.serial_conn.write(self.create_message(response)) + logger.info(f" Total checksum: {total} coins") + time.sleep(0.01) + # Wait for ACK + ack = self.serial_conn.read(1) + + # Send EOT + return bytes([EOT]) + + def handle_batch_reset(self, data: bytes) -> bytes: + """Handle uncompleted batch counters reset (s500ve)""" + logger.info("Resetting batch counters and adding to partial totals") + for i in range(self.num_lines): + self.partial_totals[i] += self.actual_counts[i] + self.actual_counts[i] = 0 + return bytes([ACK]) + + def handle_partial_totals_request(self, data: bytes) -> bytes: + """Handle partial total counters request (s600ve or sd00ve for checksum)""" + cmd_char = chr(data[0]) if len(data) > 0 else '6' + with_checksum = (cmd_char == 'd') + + logger.info(f"Partial totals request (checksum={with_checksum})") + + # First ACK + self.serial_conn.write(bytes([ACK])) + time.sleep(0.01) + + # Send each line's partial total + for line_num in range(1, self.num_lines + 1): + count = self.partial_totals[line_num - 1] + cmd = b'd' if with_checksum else b'6' + response = cmd + b'0' + f"{line_num:02d}{count:06d}".encode() + self.serial_conn.write(self.create_message(response)) + logger.info(f" Line {line_num}: {count} coins (partial total)") + time.sleep(0.01) + # Wait for ACK + ack = self.serial_conn.read(1) + + # Send checksum if requested + if with_checksum: + total = sum(self.partial_totals) + response = b'd099' + f"{total:06d}".encode() + self.serial_conn.write(self.create_message(response)) + logger.info(f" Total checksum: {total} coins") + time.sleep(0.01) + # Wait for ACK + ack = self.serial_conn.read(1) + + # Send EOT + return bytes([EOT]) + + def handle_partial_total_reset(self, data: bytes) -> bytes: + """Handle partial total counters reset (s700ve)""" + logger.info("Resetting partial totals") + self.partial_totals = [0] * self.num_lines + return bytes([ACK]) + + def handle_batch_values_request(self, data: bytes) -> bytes: + """Handle batching value request (s900ve or sf00ve for checksum)""" + cmd_char = chr(data[0]) if len(data) > 0 else '9' + with_checksum = (cmd_char == 'f') + + logger.info(f"Batch values request (checksum={with_checksum})") + + # First ACK + self.serial_conn.write(bytes([ACK])) + time.sleep(0.01) + + # Send each line's batch value + for line_num in range(1, self.num_lines + 1): + batch_val = self.batch_values[line_num - 1] + cmd = b'f' if with_checksum else b'9' + response = cmd + b'0' + f"{line_num:02d}{batch_val:06d}".encode() + self.serial_conn.write(self.create_message(response)) + logger.info(f" Line {line_num}: {batch_val} coins (batch threshold)") + time.sleep(0.01) + # Wait for ACK + ack = self.serial_conn.read(1) + + # Send checksum if requested + if with_checksum: + total = sum(self.batch_values) + response = b'f099' + f"{total:06d}".encode() + self.serial_conn.write(self.create_message(response)) + logger.info(f" Total checksum: {total} coins") + time.sleep(0.01) + # Wait for ACK + ack = self.serial_conn.read(1) + + # Send EOT + return bytes([EOT]) + + def handle_line_disable(self, data: bytes) -> bytes: + """Handle single counting line disable (sCve)""" + if len(data) < 3: + return bytes([CAN]) + + try: + line_str = data[1:3].decode('ascii') + line_num = int(line_str) + if 1 <= line_num <= self.num_lines: + self.lines_enabled[line_num - 1] = False + logger.info(f"Disabled counting line {line_num}") + return bytes([ACK]) + except: + pass + + return bytes([CAN]) + + def handle_line_enable(self, data: bytes) -> bytes: + """Handle single counting line enable (sDve)""" + if len(data) < 3: + return bytes([CAN]) + + try: + line_str = data[1:3].decode('ascii') + line_num = int(line_str) + if 1 <= line_num <= self.num_lines: + self.lines_enabled[line_num - 1] = True + logger.info(f"Enabled counting line {line_num}") + return bytes([ACK]) + except: + pass + + return bytes([CAN]) + + def handle_software_version(self, data: bytes) -> bytes: + """Handle software version request (sA00ve)""" + logger.info(f"Software version request: {self.sw_version}") + response = self.sw_version.encode() + b'\r\n' + return response + + def handle_serial_number(self, data: bytes) -> bytes: + """Handle serial number request (su00ve)""" + logger.info(f"Serial number request: {self.serial_number}") + response = self.serial_number.encode() + b'\r\n' + return response + + def _simulate_counting(self): + """Simulate coin counting - generate random counts for all enabled lines""" + logger.info("Simulating coin counting...") + + # Standard denominations ranges + count_ranges = [ + (20, 150), # Line 1 - high count + (15, 120), # Line 2 + (10, 100), # Line 3 + (10, 80), # Line 4 + (5, 60), # Line 5 + (5, 50), # Line 6 + (1, 30), # Line 7 - lower count + (1, 20), # Line 8 - lowest count + ] + + total_coins = 0 + for i in range(self.num_lines): + if self.lines_enabled[i]: + min_count, max_count = count_ranges[i] + count = random.randint(min_count, max_count) + self.actual_counts[i] = count + total_coins += count + logger.info(f" Line {i+1}: {count} coins") + else: + self.actual_counts[i] = 0 + logger.info(f" Line {i+1}: disabled") + + # Simulate some rejected coins (5-10% of total) + if total_coins > 0: + self.rejected_coins = random.randint(1, max(1, total_coins // 10)) + self.has_rejected_coins = True + logger.info(f" Rejected: {self.rejected_coins} coins") + + logger.info(f"Total coins counted: {total_coins}") + + def handle_command(self, data: bytes) -> Optional[bytes]: + """Route command to appropriate handler""" + if len(data) < 3: + logger.error("Command too short") + return bytes([CAN]) + + # If machine is counting, only accept status requests + if self.is_counting and data[0] != ord('1'): + logger.warning("Machine is counting - only status requests accepted") + return bytes([CAN]) + + cmd = chr(data[0]) + sub_cmd = data[1:3].decode('ascii', errors='ignore') + + logger.info(f"Received command: {cmd}{sub_cmd}") + + # Status requests + if cmd == '1' and sub_cmd == '00': + return self.handle_status_request(data) + + # Start counting + elif cmd == '2' and sub_cmd == '00': + return self.handle_start_counting(data) + + # Main total reset + elif cmd == '3' and sub_cmd == '00': + return self.handle_main_total_reset(data) + + # Actual counters request + elif cmd == '4' and sub_cmd == '00': + return self.handle_actual_counters_request(data) + + # Actual counters with checksum + elif cmd == 'c' and sub_cmd == '00': + return self.handle_actual_counters_request(data) + + # Uncompleted batch reset + elif cmd == '5' and sub_cmd == '00': + return self.handle_batch_reset(data) + + # Partial totals request + elif cmd == '6' and sub_cmd == '00': + return self.handle_partial_totals_request(data) + + # Partial totals with checksum + elif cmd == 'd' and sub_cmd == '00': + return self.handle_partial_totals_request(data) + + # Partial total reset + elif cmd == '7' and sub_cmd == '00': + return self.handle_partial_total_reset(data) + + # Stop counting + elif cmd == '8' and sub_cmd == '00': + return self.handle_stop_counting(data) + + # Batch values request + elif cmd == '9' and sub_cmd == '00': + return self.handle_batch_values_request(data) + + # Batch values with checksum + elif cmd == 'f' and sub_cmd == '00': + return self.handle_batch_values_request(data) + + # Line disable + elif cmd == 'C': + return self.handle_line_disable(data) + + # Line enable + elif cmd == 'D': + return self.handle_line_enable(data) + + # Software version + elif cmd == 'A' and sub_cmd == '00': + return self.handle_software_version(data) + + # Serial number + elif cmd == 'u' and sub_cmd == '00': + return self.handle_serial_number(data) + + else: + logger.warning(f"Unknown command: {cmd}{sub_cmd}") + return bytes([CAN]) + + def run(self): + """Main simulator loop""" + logger.info(f"Starting Selex simulator on {self.port} at {self.baudrate} baud") + logger.info("Protocol: MC-SELEX v2.2") + + try: + self.serial_conn = serial.Serial( + port=self.port, + baudrate=self.baudrate, + bytesize=serial.EIGHTBITS, + parity=serial.PARITY_NONE, + stopbits=serial.STOPBITS_ONE, + timeout=1, + ) + + logger.info("Serial port opened successfully") + + buffer = bytearray() + + while True: + if self.serial_conn.in_waiting > 0: + data = self.serial_conn.read(self.serial_conn.in_waiting) + buffer.extend(data) + + # Look for complete message (STX...ETX) + if STX in buffer and ETX in buffer: + stx_idx = buffer.index(STX) + try: + etx_idx = buffer.index(ETX, stx_idx) + message = bytes(buffer[stx_idx:etx_idx + 1]) + buffer = buffer[etx_idx + 1:] + + logger.debug(f"RX: {' '.join(f'{b:02X}' for b in message)}") + + # Parse and handle message + parsed_data = self.parse_message(message) + if parsed_data: + response = self.handle_command(parsed_data) + if response: + logger.debug(f"TX: {' '.join(f'{b:02X}' for b in response)}") + self.serial_conn.write(response) + + # Handle EOT - close communication + if response == bytes([EOT]): + logger.debug("Communication closed with EOT") + time.sleep(0.5) # Wait 500ms after EOT + except ValueError: + pass # ETX not found yet + + time.sleep(0.01) + + except KeyboardInterrupt: + logger.info("Simulator stopped by user") + except Exception as e: + logger.error(f"Error: {e}", exc_info=True) + finally: + if self.serial_conn and self.serial_conn.is_open: + self.serial_conn.close() + logger.info("Serial port closed")