diff --git a/Makefile b/Makefile index 3571257..2452087 100644 --- a/Makefile +++ b/Makefile @@ -8,3 +8,7 @@ run-glory: run-selex: @ uv run python main.py selex --port /dev/ttyUSB0 --baud 115200 + + +run-jetsort: + @ uv run python main.py jetsort --port /dev/ttyUSB0 --baud 115200 diff --git a/main.py b/main.py index 36c48c1..a9b8509 100644 --- a/main.py +++ b/main.py @@ -11,13 +11,14 @@ from loguru import logger # Import simulators from source.glory import GlorySimulator +from source.jetsort import JetSortSimulator from source.pelican import PelicanSimulator from source.selex import SelexSimulator def main(): parser = argparse.ArgumentParser( - description="Device Simulator - Run Pelican, Glory MACH6, or Selex simulator", + description="Device Simulator - Run Pelican, Glory MACH6, Selex, or JetSort simulator", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: @@ -30,10 +31,14 @@ Examples: # Run Selex simulator python main.py selex --port /dev/ttyUSB2 --baudrate 9600 + # Run JetSort simulator + python main.py jetsort --port /dev/ttyUSB3 --baudrate 9600 + # Get help for specific simulator python main.py pelican --help python main.py glory --help python main.py selex --help + python main.py jetsort --help """, ) @@ -81,7 +86,21 @@ Examples: help="Serial port (default: /dev/ttyUSB0)", ) selex_parser.add_argument( - "--baudrate", "-b", type=int, default=9600, help="Baud rate (default: 9600)" + "--baudrate", "-b", type=int, default=115200, help="Baud rate (default: 115200)" + ) + + # JetSort simulator subcommand + jetsort_parser = subparsers.add_parser( + "jetsort", help="Run JetSort coin/bill counter simulator" + ) + jetsort_parser.add_argument( + "--port", + "-p", + default="/dev/ttyUSB0", + help="Serial port (default: /dev/ttyUSB0)", + ) + jetsort_parser.add_argument( + "--baudrate", "-b", type=int, default=115200, help="Baud rate (default: 115200)" ) args = parser.parse_args() @@ -110,6 +129,9 @@ Examples: elif args.simulator == "selex": simulator = SelexSimulator(port=args.port, baudrate=args.baudrate) simulator.run() + elif args.simulator == "jetsort": + simulator = JetSortSimulator(port=args.port, baudrate=args.baudrate) + simulator.run() except KeyboardInterrupt: logger.info("Simulator stopped by user") sys.exit(0) diff --git a/source/__init__.py b/source/__init__.py index eb354c1..7fa1638 100644 --- a/source/__init__.py +++ b/source/__init__.py @@ -1,7 +1,8 @@ """Device simulators package.""" from .glory import GlorySimulator +from .jetsort import JetSortSimulator from .pelican import PelicanSimulator from .selex import SelexSimulator -__all__ = ["GlorySimulator", "PelicanSimulator", "SelexSimulator"] +__all__ = ["GlorySimulator", "JetSortSimulator", "PelicanSimulator", "SelexSimulator"] diff --git a/source/jetsort.py b/source/jetsort.py new file mode 100644 index 0000000..05eae46 --- /dev/null +++ b/source/jetsort.py @@ -0,0 +1,475 @@ +#!/usr/bin/env python3 +""" +JetSort Device Simulator +Simulates a JetSort coin/bill counting device (Model 3500, 3600, 6000) +for development/testing purposes. +Implements the JetSort Communication Package protocol +""" + +import random +import select +import sys +import time +from datetime import datetime +from typing import Optional + +import serial +from loguru import logger + +# Protocol constants +STX = 0x02 +ETX = 0x03 +ENQ = 0x05 +ACK = 0x06 +CR = 0x0D +LF = 0x0A + +# Report types +REPORT_SUB_BATCH = "SUB-BATCH" +REPORT_BATCH_DAY = "BATCH" +REPORT_BAG_LIMIT = "Limit" + + +class JetSortSimulator: + def __init__(self, port: str, baudrate: int = 9600): + self.port = port + self.baudrate = baudrate + self.serial_conn: Optional[serial.Serial] = None + + # Device state + self.is_counting = False + + # 9 coin lines and 9 bill lines + self.num_coin_lines = 9 + self.num_bill_lines = 9 + + # Current batch counters (coin values in cents) + self.coin_values = [0] * self.num_coin_lines + self.bill_values = [0] * self.num_bill_lines + + # Day totals + self.day_coin_values = [0] * self.num_coin_lines + self.day_bill_values = [0] * self.num_bill_lines + + # Grand totals + self.grand_coin_total = 0 + self.grand_bill_total = 0 + + # Batch tracking + self.sub_batch_count = 0 + self.batch_count = 0 + self.total_batches = 0 + + # Audit number (12 digits: XXXXXYYYYZZZ) + self.audit_number = "000070004001" + + # Labels + self.label_a = "A:" + self.label_b = "B:" + self.label_c = "C:" + self.label_d = "D:" + self.operator_id = "ID:" + + # Communication mode + self.polled_mode = False + + # Bag limit settings (in cents) + self.bag_limit = 50000 # $500.00 + + def _calculate_checksum(self, data: bytes) -> int: + """Calculate checksum for packet""" + checksum = 0 + for byte in data: + checksum += byte + checksum += 0x20 + return checksum & 0xFF + + def _create_packet(self, title: str, data: str) -> bytes: + """Create a packet with STX, checksum, length, title, data, and ETX""" + # Build packet content (without STX and ETX) + content = f"{title}\r\n{data}".encode("ascii") + + # Calculate checksum + checksum = self._calculate_checksum(content) + + # Calculate length (high and low bytes) + length = len(content) + 2 # +2 for CR LF after title + len_high = (length >> 8) & 0xFF + len_low = length & 0xFF + + # Build full packet + packet = bytes([STX, checksum, len_high, len_low]) + content + bytes([ETX]) + + return packet + + def _simulate_counting(self): + """Simulate coin and bill counting - generate random values""" + logger.info("Simulating coin/bill counting...") + + # Coin denominations in cents and their count ranges + coin_denominations = [ + (1, 50, 200), # C1 - Pennies + (5, 30, 150), # C2 - Nickels + (10, 30, 120), # C3 - Dimes + (25, 20, 100), # C4 - Quarters + (50, 5, 40), # C5 - Half dollars + (100, 5, 30), # C6 - Dollar coins + (0, 0, 0), # C7 - Unused + (0, 0, 0), # C8 - Unused + (0, 0, 0), # C9 - Unused + ] + + # Bill denominations in cents and their count ranges + bill_denominations = [ + (100, 10, 50), # B1 - $1 bills + (500, 5, 30), # B2 - $5 bills + (1000, 5, 25), # B3 - $10 bills + (2000, 3, 20), # B4 - $20 bills + (5000, 1, 10), # B5 - $50 bills + (10000, 1, 5), # B6 - $100 bills + (0, 0, 0), # B7 - Unused + (0, 0, 0), # B8 - Unused + (0, 0, 0), # B9 - Unused + ] + + # Generate random coin values + total_coin_value = 0 + for i, (denomination, min_count, max_count) in enumerate(coin_denominations): + if denomination > 0: + count = random.randint(min_count, max_count) + value = count * denomination + self.coin_values[i] = value + total_coin_value += value + logger.info( + f" C{i+1}: {count} coins × ${denomination/100:.2f} = ${value/100:.2f}" + ) + else: + self.coin_values[i] = 0 + + # Generate random bill values + total_bill_value = 0 + for i, (denomination, min_count, max_count) in enumerate(bill_denominations): + if denomination > 0: + count = random.randint(min_count, max_count) + value = count * denomination + self.bill_values[i] = value + total_bill_value += value + logger.info( + f" B{i+1}: {count} bills × ${denomination/100:.2f} = ${value/100:.2f}" + ) + else: + self.bill_values[i] = 0 + + # Update day totals + for i in range(self.num_coin_lines): + self.day_coin_values[i] += self.coin_values[i] + for i in range(self.num_bill_lines): + self.day_bill_values[i] += self.bill_values[i] + + logger.info(f"Total coin value: ${total_coin_value/100:.2f}") + logger.info(f"Total bill value: ${total_bill_value/100:.2f}") + logger.info( + f"Total batch value: ${(total_coin_value + total_bill_value)/100:.2f}" + ) + + self.sub_batch_count += 1 + + def _format_value(self, value_cents: int) -> str: + """Format value in cents to string format (no decimal point, no leading zeros)""" + # Convert cents to string without decimal (e.g., 10050 for $100.50) + return str(value_cents) + + def _generate_sub_batch_report(self) -> str: + """Generate SUB-BATCH report""" + logger.info("Generating SUB-BATCH report") + + # Simulate counting if no values yet + if sum(self.coin_values) == 0 and sum(self.bill_values) == 0: + self._simulate_counting() + + lines = [] + + # Date (blank if not enabled) + date_str = datetime.now().strftime("%m-%d-%y") + lines.append(date_str) + + # Audit Number + lines.append(self.audit_number) + + # Labels + lines.append(self.label_a) + lines.append(self.label_b) + lines.append(self.label_c) + lines.append(self.label_d) + lines.append(self.operator_id) + + # Coin values (C1-C9) + for i in range(self.num_coin_lines): + lines.append(self._format_value(self.coin_values[i])) + + # Coin Total Identifier + lines.append("CT:") + + # Total Coin Value + total_coin_value = sum(self.coin_values) + lines.append(self._format_value(total_coin_value)) + + # Sub-Batch Currency (0) + lines.append("0") + + # Sub-Batch Checks (0) + lines.append("0") + + # Sub-Batch Misc (0) + lines.append("0") + + # Bill values (B1-B9) + for i in range(self.num_bill_lines): + lines.append(self._format_value(self.bill_values[i])) + + # Sub-Batch Declared Balance (0) + lines.append("0") + + # Receipts Identifier + lines.append("RT:") + + # Receipts Total (0) + lines.append("0") + + # Grand Total Identifier + lines.append("GT:") + + # Grand Total + grand_total = total_coin_value + sum(self.bill_values) + lines.append(self._format_value(grand_total)) + + return "\r\n".join(lines) + + def _generate_batch_day_report(self) -> str: + """Generate BATCH/DAY report""" + logger.info("Generating BATCH/DAY report") + + lines = [] + + # Date + date_str = datetime.now().strftime("%m-%d-%y") + lines.append(date_str) + + # Audit Number + lines.append(self.audit_number) + + # Labels + lines.append(self.label_a) + lines.append(self.label_b) + lines.append(self.label_c) + lines.append(self.label_d) + lines.append(self.operator_id) + + # Batch/Day Coin values (C1-C9) + for i in range(self.num_coin_lines): + lines.append(self._format_value(self.day_coin_values[i])) + + # Coin Total Identifier + lines.append("CT:") + + # Total Coin Value + total_coin_value = sum(self.day_coin_values) + lines.append(self._format_value(total_coin_value)) + + # Batch Currency (0) + lines.append("0") + + # Batch Checks (0) + lines.append("0") + + # Batch Misc (0) + lines.append("0") + + # Batch Bill values (B1-B9) + for i in range(self.num_bill_lines): + lines.append(self._format_value(self.day_bill_values[i])) + + # Batch Declared Balance (0) + lines.append("0") + + # Receipt Identifier + lines.append("RT:") + + # Receipts Total (0) + lines.append("0") + + # Grand Total Identifier + lines.append("GT:") + + # Grand Total + grand_total = total_coin_value + sum(self.day_bill_values) + lines.append(self._format_value(grand_total)) + + return "\r\n".join(lines) + + def _generate_day_report(self) -> str: + """Generate DAY report (end of batch/day)""" + logger.info("Generating DAY report") + + lines = [] + + # Grand Total + grand_total = sum(self.day_coin_values) + sum(self.day_bill_values) + lines.append(self._format_value(grand_total)) + + # Title + lines.append("DAY") + + # Date + date_str = datetime.now().strftime("%m-%d-%y") + lines.append(date_str) + + # Audit Number + lines.append(self.audit_number) + + # Day Coin values (C1-C9) + for i in range(self.num_coin_lines): + lines.append(self._format_value(self.day_coin_values[i])) + + # Coin Total Identifier + lines.append("CT:") + + # Total Coin Value + total_coin_value = sum(self.day_coin_values) + lines.append(self._format_value(total_coin_value)) + + # Day Currency (0) + lines.append("0") + + # Day Checks (0) + lines.append("0") + + # Day Misc (0) + lines.append("0") + + # Day Bill values (B1-B9) + for i in range(self.num_bill_lines): + lines.append(self._format_value(self.day_bill_values[i])) + + # Receipt Identifier + lines.append("RT:") + + # Receipts Total (0) + lines.append("0") + + # Grand Total Identifier + lines.append("GT:") + + # Grand Total + lines.append(self._format_value(grand_total)) + + return "\r\n".join(lines) + + def handle_poll(self) -> Optional[bytes]: + """Handle ENQ poll from computer""" + logger.info("Received poll (ENQ)") + + # Send SUB-BATCH report + report_data = self._generate_sub_batch_report() + packet = self._create_packet(REPORT_SUB_BATCH, report_data) + + logger.info(f"Sending SUB-BATCH report ({len(packet)} bytes)") + return packet + + def send_automatic_report(self): + """Send automatic SUB-BATCH report (immediate mode)""" + if not self.polled_mode and self.serial_conn: + logger.info("Sending automatic SUB-BATCH report") + + # Simulate new batch + self._simulate_counting() + + # Generate and send report + report_data = self._generate_sub_batch_report() + packet = self._create_packet(REPORT_SUB_BATCH, report_data) + + self.serial_conn.write(packet) + logger.debug(f"TX: {' '.join(f'{b:02X}' for b in packet[:50])}...") + + # Reset batch counters after sending + self.coin_values = [0] * self.num_coin_lines + self.bill_values = [0] * self.num_bill_lines + + def run(self): + """Main simulator loop""" + logger.info( + f"Starting JetSort simulator on {self.port} at {self.baudrate} baud" + ) + logger.info("Protocol: JetSort Communication Package") + logger.info("Press ENTER to send a cash counting report") + + try: + self.serial_conn = serial.Serial( + port=self.port, + baudrate=self.baudrate, + bytesize=serial.EIGHTBITS, + parity=serial.PARITY_NONE, + stopbits=serial.STOPBITS_ONE, + timeout=1, + ) + + logger.info("Serial port opened successfully") + help_text_shown = False + + while True: + if not help_text_shown: + print("\nPress ENTER to send a cash counting report...") + help_text_shown = True + + # Check for keyboard input (non-blocking) + if sys.platform != "win32": + # Unix/Linux - use select + ready, _, _ = select.select([sys.stdin], [], [], 0) + if ready: + sys.stdin.readline() # Consume the input + logger.info("Key pressed - sending cash counting report") + self.send_automatic_report() + help_text_shown = False + else: + # Windows - use msvcrt + import msvcrt + + if msvcrt.kbhit(): + msvcrt.getch() # Consume the input + logger.info("Key pressed - sending cash counting report") + self.send_automatic_report() + help_text_shown = False + + # Check for incoming data from serial port + if self.serial_conn.in_waiting > 0: + data = self.serial_conn.read(self.serial_conn.in_waiting) + + logger.debug(f"RX: {' '.join(f'{b:02X}' for b in data)}") + + # Check for ENQ (poll) + if ENQ in data: + response = self.handle_poll() + if response: + self.serial_conn.write(response) + logger.debug( + f"TX: {' '.join(f'{b:02X}' for b in response[:50])}..." + ) + + # Reset batch counters + self.coin_values = [0] * self.num_coin_lines + self.bill_values = [0] * self.num_bill_lines + + # Check for ACK + elif ACK in data: + logger.info("Received ACK from computer") + + time.sleep(0.01) + + except KeyboardInterrupt: + logger.info("Simulator stopped by user") + except Exception as e: + logger.error(f"Error: {e}", exc_info=True) + finally: + if self.serial_conn and self.serial_conn.is_open: + self.serial_conn.close() + logger.info("Serial port closed")