#!/usr/bin/env python3 """ JetSort Device Simulator Simulates a JetSort coin/bill counting device (Model 3500, 3600, 6000) for development/testing purposes. Implements the JetSort Communication Package protocol """ import random import select import sys import time from datetime import datetime from typing import Optional import serial from loguru import logger # Protocol constants STX = 0x02 ETX = 0x03 ENQ = 0x05 ACK = 0x06 CR = 0x0D LF = 0x0A # Report types REPORT_SUB_BATCH = "SUB-BATCH" REPORT_BATCH_DAY = "BATCH" REPORT_BAG_LIMIT = "Limit" class JetSortSimulator: def __init__(self, port: str, baudrate: int = 9600): self.port = port self.baudrate = baudrate self.serial_conn: Optional[serial.Serial] = None # Device state self.is_counting = False # 9 coin lines and 9 bill lines self.num_coin_lines = 9 self.num_bill_lines = 9 # Current batch counters (coin values in cents) self.coin_values = [0] * self.num_coin_lines self.bill_values = [0] * self.num_bill_lines # Day totals self.day_coin_values = [0] * self.num_coin_lines self.day_bill_values = [0] * self.num_bill_lines # Grand totals self.grand_coin_total = 0 self.grand_bill_total = 0 # Batch tracking self.sub_batch_count = 0 self.batch_count = 0 self.total_batches = 0 # Audit number (12 digits: XXXXXYYYYZZZ) self.audit_number = "000070004001" # Labels self.label_a = "A:" self.label_b = "B:" self.label_c = "C:" self.label_d = "D:" self.operator_id = "ID:" # Communication mode self.polled_mode = False # Bag limit settings (in cents) self.bag_limit = 50000 # $500.00 def _calculate_checksum(self, data: bytes) -> int: """Calculate checksum for packet""" checksum = 0 for byte in data: checksum += byte checksum += 0x20 return checksum & 0xFF def _create_packet(self, title: str, data: str) -> bytes: """Create a packet with STX, checksum, length, title, data, and ETX""" # Build packet content (without STX and ETX) content = f"{title}\r\n{data}".encode("ascii") # Calculate checksum checksum = self._calculate_checksum(content) # Calculate length (high and low bytes) length = len(content) + 2 # +2 for CR LF after title len_high = (length >> 8) & 0xFF len_low = length & 0xFF # Build full packet packet = bytes([STX, checksum, len_high, len_low]) + content + bytes([ETX]) return packet def _simulate_counting(self): """Simulate coin and bill counting - generate random values""" logger.info("Simulating coin/bill counting...") # Coin denominations in cents and their count ranges coin_denominations = [ (1, 50, 200), # C1 - Pennies (5, 30, 150), # C2 - Nickels (10, 30, 120), # C3 - Dimes (25, 20, 100), # C4 - Quarters (50, 5, 40), # C5 - Half dollars (100, 5, 30), # C6 - Dollar coins (0, 0, 0), # C7 - Unused (0, 0, 0), # C8 - Unused (0, 0, 0), # C9 - Unused ] # Bill denominations in cents and their count ranges bill_denominations = [ (100, 10, 50), # B1 - $1 bills (500, 5, 30), # B2 - $5 bills (1000, 5, 25), # B3 - $10 bills (2000, 3, 20), # B4 - $20 bills (5000, 1, 10), # B5 - $50 bills (10000, 1, 5), # B6 - $100 bills (0, 0, 0), # B7 - Unused (0, 0, 0), # B8 - Unused (0, 0, 0), # B9 - Unused ] # Generate random coin values total_coin_value = 0 for i, (denomination, min_count, max_count) in enumerate(coin_denominations): if denomination > 0: count = random.randint(min_count, max_count) value = count * denomination self.coin_values[i] = value total_coin_value += value logger.info( f" C{i+1}: {count} coins × ${denomination/100:.2f} = ${value/100:.2f}" ) else: self.coin_values[i] = 0 # Generate random bill values total_bill_value = 0 for i, (denomination, min_count, max_count) in enumerate(bill_denominations): if denomination > 0: count = random.randint(min_count, max_count) value = count * denomination self.bill_values[i] = value total_bill_value += value logger.info( f" B{i+1}: {count} bills × ${denomination/100:.2f} = ${value/100:.2f}" ) else: self.bill_values[i] = 0 # Update day totals for i in range(self.num_coin_lines): self.day_coin_values[i] += self.coin_values[i] for i in range(self.num_bill_lines): self.day_bill_values[i] += self.bill_values[i] logger.info(f"Total coin value: ${total_coin_value/100:.2f}") logger.info(f"Total bill value: ${total_bill_value/100:.2f}") logger.info( f"Total batch value: ${(total_coin_value + total_bill_value)/100:.2f}" ) self.sub_batch_count += 1 def _format_value(self, value_cents: int) -> str: """Format value in cents to string format (no decimal point, no leading zeros)""" # Convert cents to string without decimal (e.g., 10050 for $100.50) return str(value_cents) def _generate_sub_batch_report(self) -> str: """Generate SUB-BATCH report""" logger.info("Generating SUB-BATCH report") # Simulate counting if no values yet if sum(self.coin_values) == 0 and sum(self.bill_values) == 0: self._simulate_counting() lines = [] # Date (blank if not enabled) date_str = datetime.now().strftime("%m-%d-%y") lines.append(date_str) # Audit Number lines.append(self.audit_number) # Labels lines.append(self.label_a) lines.append(self.label_b) lines.append(self.label_c) lines.append(self.label_d) lines.append(self.operator_id) # Coin values (C1-C9) for i in range(self.num_coin_lines): lines.append(self._format_value(self.coin_values[i])) # Coin Total Identifier lines.append("CT:") # Total Coin Value total_coin_value = sum(self.coin_values) lines.append(self._format_value(total_coin_value)) # Sub-Batch Currency (0) lines.append("0") # Sub-Batch Checks (0) lines.append("0") # Sub-Batch Misc (0) lines.append("0") # Bill values (B1-B9) for i in range(self.num_bill_lines): lines.append(self._format_value(self.bill_values[i])) # Sub-Batch Declared Balance (0) lines.append("0") # Receipts Identifier lines.append("RT:") # Receipts Total (0) lines.append("0") # Grand Total Identifier lines.append("GT:") # Grand Total grand_total = total_coin_value + sum(self.bill_values) lines.append(self._format_value(grand_total)) return "\r\n".join(lines) def _generate_batch_day_report(self) -> str: """Generate BATCH/DAY report""" logger.info("Generating BATCH/DAY report") lines = [] # Date date_str = datetime.now().strftime("%m-%d-%y") lines.append(date_str) # Audit Number lines.append(self.audit_number) # Labels lines.append(self.label_a) lines.append(self.label_b) lines.append(self.label_c) lines.append(self.label_d) lines.append(self.operator_id) # Batch/Day Coin values (C1-C9) for i in range(self.num_coin_lines): lines.append(self._format_value(self.day_coin_values[i])) # Coin Total Identifier lines.append("CT:") # Total Coin Value total_coin_value = sum(self.day_coin_values) lines.append(self._format_value(total_coin_value)) # Batch Currency (0) lines.append("0") # Batch Checks (0) lines.append("0") # Batch Misc (0) lines.append("0") # Batch Bill values (B1-B9) for i in range(self.num_bill_lines): lines.append(self._format_value(self.day_bill_values[i])) # Batch Declared Balance (0) lines.append("0") # Receipt Identifier lines.append("RT:") # Receipts Total (0) lines.append("0") # Grand Total Identifier lines.append("GT:") # Grand Total grand_total = total_coin_value + sum(self.day_bill_values) lines.append(self._format_value(grand_total)) return "\r\n".join(lines) def _generate_day_report(self) -> str: """Generate DAY report (end of batch/day)""" logger.info("Generating DAY report") lines = [] # Grand Total grand_total = sum(self.day_coin_values) + sum(self.day_bill_values) lines.append(self._format_value(grand_total)) # Title lines.append("DAY") # Date date_str = datetime.now().strftime("%m-%d-%y") lines.append(date_str) # Audit Number lines.append(self.audit_number) # Day Coin values (C1-C9) for i in range(self.num_coin_lines): lines.append(self._format_value(self.day_coin_values[i])) # Coin Total Identifier lines.append("CT:") # Total Coin Value total_coin_value = sum(self.day_coin_values) lines.append(self._format_value(total_coin_value)) # Day Currency (0) lines.append("0") # Day Checks (0) lines.append("0") # Day Misc (0) lines.append("0") # Day Bill values (B1-B9) for i in range(self.num_bill_lines): lines.append(self._format_value(self.day_bill_values[i])) # Receipt Identifier lines.append("RT:") # Receipts Total (0) lines.append("0") # Grand Total Identifier lines.append("GT:") # Grand Total lines.append(self._format_value(grand_total)) return "\r\n".join(lines) def handle_poll(self) -> Optional[bytes]: """Handle ENQ poll from computer""" logger.info("Received poll (ENQ)") # Send SUB-BATCH report report_data = self._generate_sub_batch_report() packet = self._create_packet(REPORT_SUB_BATCH, report_data) logger.info(f"Sending SUB-BATCH report ({len(packet)} bytes)") return packet def send_automatic_report(self): """Send automatic SUB-BATCH report (immediate mode)""" if not self.polled_mode and self.serial_conn: logger.info("Sending automatic SUB-BATCH report") # Simulate new batch self._simulate_counting() # Generate and send report report_data = self._generate_sub_batch_report() packet = self._create_packet(REPORT_SUB_BATCH, report_data) self.serial_conn.write(packet) logger.debug(f"TX: {' '.join(f'{b:02X}' for b in packet[:50])}...") # Reset batch counters after sending self.coin_values = [0] * self.num_coin_lines self.bill_values = [0] * self.num_bill_lines def run(self): """Main simulator loop""" logger.info( f"Starting JetSort simulator on {self.port} at {self.baudrate} baud" ) logger.info("Protocol: JetSort Communication Package") logger.info("Press ENTER to send a cash counting report") try: self.serial_conn = serial.Serial( port=self.port, baudrate=self.baudrate, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=1, ) logger.info("Serial port opened successfully") help_text_shown = False while True: if not help_text_shown: print("\nPress ENTER to send a cash counting report...") help_text_shown = True # Check for keyboard input (non-blocking) if sys.platform != "win32": # Unix/Linux - use select ready, _, _ = select.select([sys.stdin], [], [], 0) if ready: sys.stdin.readline() # Consume the input logger.info("Key pressed - sending cash counting report") self.send_automatic_report() help_text_shown = False else: # Windows - use msvcrt import msvcrt if msvcrt.kbhit(): msvcrt.getch() # Consume the input logger.info("Key pressed - sending cash counting report") self.send_automatic_report() help_text_shown = False # Check for incoming data from serial port if self.serial_conn.in_waiting > 0: data = self.serial_conn.read(self.serial_conn.in_waiting) logger.debug(f"RX: {' '.join(f'{b:02X}' for b in data)}") # Check for ENQ (poll) if ENQ in data: response = self.handle_poll() if response: self.serial_conn.write(response) logger.debug( f"TX: {' '.join(f'{b:02X}' for b in response[:50])}..." ) # Reset batch counters self.coin_values = [0] * self.num_coin_lines self.bill_values = [0] * self.num_bill_lines # Check for ACK elif ACK in data: logger.info("Received ACK from computer") time.sleep(0.01) except KeyboardInterrupt: logger.info("Simulator stopped by user") except Exception as e: logger.error(f"Error: {e}", exc_info=True) finally: if self.serial_conn and self.serial_conn.is_open: self.serial_conn.close() logger.info("Serial port closed")