Files
coin-counter-simulators/source/jetsort.py
2025-11-10 09:11:56 +01:00

520 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
JetSort Device Simulator
Simulates a JetSort coin/bill counting device (Model 3500, 3600, 6000)
for development/testing purposes.
Implements the JetSort Communication Package protocol
"""
import random
import select
import sys
import time
from datetime import datetime
from typing import Optional
import serial
from loguru import logger
from .common import format_comm_debug
# Protocol constants
STX = 0x02
ETX = 0x03
ENQ = 0x05
ACK = 0x06
CR = 0x0D
LF = 0x0A
# Report types
REPORT_SUB_BATCH = "SUB-BATCH"
REPORT_BATCH_DAY = "BATCH"
REPORT_BAG_LIMIT = "Limit"
class JetSortSimulator:
def __init__(self, port: str, baudrate: int = 9600):
self.port = port
self.baudrate = baudrate
self.serial_conn: Optional[serial.Serial] = None
# Device state
self.is_counting = False
# 9 coin lines and 9 bill lines
self.num_coin_lines = 9
self.num_bill_lines = 9
# Current batch counters (coin values in cents)
self.coin_values = [0] * self.num_coin_lines
self.bill_values = [0] * self.num_bill_lines
# Day totals
self.day_coin_values = [0] * self.num_coin_lines
self.day_bill_values = [0] * self.num_bill_lines
# Grand totals
self.grand_coin_total = 0
self.grand_bill_total = 0
# Batch tracking
self.sub_batch_count = 0
self.batch_count = 0
self.total_batches = 0
# Audit number (12 digits: XXXXXYYYYZZZ)
self.audit_number = "000070004001"
# Labels
self.label_a = "A:"
self.label_b = "B:"
self.label_c = "C:"
self.label_d = "D:"
self.operator_id = "ID:"
# Communication mode
self.polled_mode = False
# Bag limit settings (in cents)
self.bag_limit = 50000 # $500.00
# Workflow state
self.counting_in_progress = False
self.report_ready = False
def _calculate_checksum(self, data: bytes) -> int:
"""Calculate checksum for packet"""
checksum = 0
for byte in data:
checksum += byte
checksum += 0x20
return checksum & 0xFF
def _create_packet(self, title: str, data: str) -> bytes:
"""Create a packet with STX, checksum, length, CR, LF, title, data, and ETX"""
# Build packet content (without STX and ETX)
content = f"{title}\r\n{data}".encode("ascii")
# Calculate checksum
checksum = self._calculate_checksum(content)
# Calculate length (high and low bytes)
length = len(content) + 2 # +2 for CR LF after length bytes
len_high = (length >> 8) & 0xFF
len_low = length & 0xFF
# Build full packet: STX + CHECKSUM + LENGTH + CR + LF + content + ETX
packet = bytes([STX, checksum, len_high, len_low, CR, LF]) + content + bytes([ETX])
return packet
def _simulate_counting(self):
"""Simulate coin and bill counting - generate random values"""
logger.info("Simulating coin/bill counting...")
# Coin denominations in cents and their count ranges
coin_denominations = [
(1, 50, 200), # C1 - Pennies
(5, 30, 150), # C2 - Nickels
(10, 30, 120), # C3 - Dimes
(25, 20, 100), # C4 - Quarters
(50, 5, 40), # C5 - Half dollars
(100, 5, 30), # C6 - Dollar coins
(0, 0, 0), # C7 - Unused
(0, 0, 0), # C8 - Unused
(0, 0, 0), # C9 - Unused
]
# Bill denominations in cents and their count ranges
bill_denominations = [
(100, 10, 50), # B1 - $1 bills
(500, 5, 30), # B2 - $5 bills
(1000, 5, 25), # B3 - $10 bills
(2000, 3, 20), # B4 - $20 bills
(5000, 1, 10), # B5 - $50 bills
(10000, 1, 5), # B6 - $100 bills
(0, 0, 0), # B7 - Unused
(0, 0, 0), # B8 - Unused
(0, 0, 0), # B9 - Unused
]
# Generate random coin values
total_coin_value = 0
for i, (denomination, min_count, max_count) in enumerate(coin_denominations):
if denomination > 0:
count = random.randint(min_count, max_count)
value = count * denomination
self.coin_values[i] = value
total_coin_value += value
logger.info(
f" C{i+1}: {count} coins × ${denomination/100:.2f} = ${value/100:.2f}"
)
else:
self.coin_values[i] = 0
# Generate random bill values
total_bill_value = 0
for i, (denomination, min_count, max_count) in enumerate(bill_denominations):
if denomination > 0:
count = random.randint(min_count, max_count)
value = count * denomination
self.bill_values[i] = value
total_bill_value += value
logger.info(
f" B{i+1}: {count} bills × ${denomination/100:.2f} = ${value/100:.2f}"
)
else:
self.bill_values[i] = 0
# Update day totals
for i in range(self.num_coin_lines):
self.day_coin_values[i] += self.coin_values[i]
for i in range(self.num_bill_lines):
self.day_bill_values[i] += self.bill_values[i]
logger.info(f"Total coin value: ${total_coin_value/100:.2f}")
logger.info(f"Total bill value: ${total_bill_value/100:.2f}")
logger.info(
f"Total batch value: ${(total_coin_value + total_bill_value)/100:.2f}"
)
self.sub_batch_count += 1
def _format_value(self, value_cents: int) -> str:
"""Format value in cents to string format (no decimal point, no leading zeros)"""
# Convert cents to string without decimal (e.g., 10050 for $100.50)
return str(value_cents)
def _generate_sub_batch_report(self) -> str:
"""Generate SUB-BATCH report"""
logger.info("Generating SUB-BATCH report")
# Simulate counting if no values yet
if sum(self.coin_values) == 0 and sum(self.bill_values) == 0:
self._simulate_counting()
lines = []
# Date (blank if not enabled)
date_str = datetime.now().strftime("%m-%d-%y")
lines.append(date_str)
# Audit Number
lines.append(self.audit_number)
# Labels
lines.append(self.label_a)
lines.append(self.label_b)
lines.append(self.label_c)
lines.append(self.label_d)
lines.append(self.operator_id)
# Coin values (C1-C9)
for i in range(self.num_coin_lines):
lines.append(self._format_value(self.coin_values[i]))
# Coin Total Identifier
lines.append("CT:")
# Total Coin Value
total_coin_value = sum(self.coin_values)
lines.append(self._format_value(total_coin_value))
# Sub-Batch Currency (0)
lines.append("0")
# Sub-Batch Checks (0)
lines.append("0")
# Sub-Batch Misc (0)
lines.append("0")
# Bill values (B1-B9)
for i in range(self.num_bill_lines):
lines.append(self._format_value(self.bill_values[i]))
# Sub-Batch Declared Balance (0)
lines.append("0")
# Receipts Identifier
lines.append("RT:")
# Receipts Total (0)
lines.append("0")
# Grand Total Identifier
lines.append("GT:")
# Grand Total
grand_total = total_coin_value + sum(self.bill_values)
lines.append(self._format_value(grand_total))
return "\r\n".join(lines)
def _generate_batch_day_report(self) -> str:
"""Generate BATCH/DAY report"""
logger.info("Generating BATCH/DAY report")
lines = []
# Date
date_str = datetime.now().strftime("%m-%d-%y")
lines.append(date_str)
# Audit Number
lines.append(self.audit_number)
# Labels
lines.append(self.label_a)
lines.append(self.label_b)
lines.append(self.label_c)
lines.append(self.label_d)
lines.append(self.operator_id)
# Batch/Day Coin values (C1-C9)
for i in range(self.num_coin_lines):
lines.append(self._format_value(self.day_coin_values[i]))
# Coin Total Identifier
lines.append("CT:")
# Total Coin Value
total_coin_value = sum(self.day_coin_values)
lines.append(self._format_value(total_coin_value))
# Batch Currency (0)
lines.append("0")
# Batch Checks (0)
lines.append("0")
# Batch Misc (0)
lines.append("0")
# Batch Bill values (B1-B9)
for i in range(self.num_bill_lines):
lines.append(self._format_value(self.day_bill_values[i]))
# Batch Declared Balance (0)
lines.append("0")
# Receipt Identifier
lines.append("RT:")
# Receipts Total (0)
lines.append("0")
# Grand Total Identifier
lines.append("GT:")
# Grand Total
grand_total = total_coin_value + sum(self.day_bill_values)
lines.append(self._format_value(grand_total))
return "\r\n".join(lines)
def _generate_day_report(self) -> str:
"""Generate DAY report (end of batch/day)"""
logger.info("Generating DAY report")
lines = []
# Grand Total
grand_total = sum(self.day_coin_values) + sum(self.day_bill_values)
lines.append(self._format_value(grand_total))
# Title
lines.append("DAY")
# Date
date_str = datetime.now().strftime("%m-%d-%y")
lines.append(date_str)
# Audit Number
lines.append(self.audit_number)
# Day Coin values (C1-C9)
for i in range(self.num_coin_lines):
lines.append(self._format_value(self.day_coin_values[i]))
# Coin Total Identifier
lines.append("CT:")
# Total Coin Value
total_coin_value = sum(self.day_coin_values)
lines.append(self._format_value(total_coin_value))
# Day Currency (0)
lines.append("0")
# Day Checks (0)
lines.append("0")
# Day Misc (0)
lines.append("0")
# Day Bill values (B1-B9)
for i in range(self.num_bill_lines):
lines.append(self._format_value(self.day_bill_values[i]))
# Receipt Identifier
lines.append("RT:")
# Receipts Total (0)
lines.append("0")
# Grand Total Identifier
lines.append("GT:")
# Grand Total
lines.append(self._format_value(grand_total))
return "\r\n".join(lines)
def handle_poll(self) -> Optional[bytes]:
"""Handle ENQ poll from computer"""
logger.info("Received poll (ENQ)")
# Send SUB-BATCH report
report_data = self._generate_sub_batch_report()
packet = self._create_packet(REPORT_SUB_BATCH, report_data)
logger.info(f"Sending SUB-BATCH report ({len(packet)} bytes)")
return packet
def handle_s_batch_button(self):
"""Handle S-BATCH button press - starts counting"""
logger.info("=" * 60)
logger.info("S-BATCH button pressed - Starting counting...")
logger.info("=" * 60)
# Simulate counting
self._simulate_counting()
# Update state
self.counting_in_progress = True
self.report_ready = True
logger.info("Counting complete. Press ENTER again (END button) to send report.")
def handle_end_button(self):
"""Handle END button press - sends the report"""
if not self.report_ready:
logger.warning("No report ready to send. Press ENTER to start counting first.")
return
logger.info("=" * 60)
logger.info("END button pressed - Sending SUB-BATCH report")
logger.info("=" * 60)
# Generate and send report
report_data = self._generate_sub_batch_report()
packet = self._create_packet(REPORT_SUB_BATCH, report_data)
self.serial_conn.write(packet)
logger.debug(format_comm_debug("TX", packet[:50]) + "...")
logger.info("Report sent successfully")
# Reset batch counters after sending
self.coin_values = [0] * self.num_coin_lines
self.bill_values = [0] * self.num_bill_lines
# Reset state
self.counting_in_progress = False
self.report_ready = False
logger.info("")
logger.info("Ready for next batch. Press ENTER (S-BATCH button) to start counting.")
def run(self):
"""Main simulator loop"""
logger.info(
f"Starting JetSort simulator on {self.port} at {self.baudrate} baud"
)
logger.info("Protocol: JetSort Communication Package")
logger.info("")
logger.info("=" * 60)
logger.info("WORKFLOW:")
logger.info("1. Press ENTER → S-BATCH button (starts counting)")
logger.info("2. Press ENTER → END button (sends report)")
logger.info("=" * 60)
logger.info("")
logger.info("Ready for first batch. Press ENTER (S-BATCH button) to start counting.")
try:
self.serial_conn = serial.Serial(
port=self.port,
baudrate=self.baudrate,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=1,
)
logger.info("Serial port opened successfully")
logger.info("")
while True:
# Check for keyboard input (non-blocking)
if sys.platform != "win32":
# Unix/Linux - use select
ready, _, _ = select.select([sys.stdin], [], [], 0)
if ready:
sys.stdin.readline() # Consume the input
# Handle button press based on current state
if not self.report_ready:
# First press: S-BATCH button
self.handle_s_batch_button()
else:
# Second press: END button
self.handle_end_button()
else:
# Windows - use msvcrt
import msvcrt
if msvcrt.kbhit():
msvcrt.getch() # Consume the input
# Handle button press based on current state
if not self.report_ready:
# First press: S-BATCH button
self.handle_s_batch_button()
else:
# Second press: END button
self.handle_end_button()
# Check for incoming data from serial port
if self.serial_conn.in_waiting > 0:
data = self.serial_conn.read(self.serial_conn.in_waiting)
logger.debug(format_comm_debug("RX", data))
# Check for ENQ (poll)
if ENQ in data:
response = self.handle_poll()
if response:
self.serial_conn.write(response)
logger.debug(
f"TX: {' '.join(f'{b:02X}' for b in response[:50])}..."
)
# Reset batch counters
self.coin_values = [0] * self.num_coin_lines
self.bill_values = [0] * self.num_bill_lines
# Check for ACK
elif ACK in data:
logger.info("Received ACK from computer")
time.sleep(0.01)
except KeyboardInterrupt:
logger.info("Simulator stopped by user")
except Exception as e:
logger.error(f"Error: {e}", exc_info=True)
finally:
if self.serial_conn and self.serial_conn.is_open:
self.serial_conn.close()
logger.info("Serial port closed")