Compare commits

...

2 Commits

Author SHA1 Message Date
bffa4af810 Fix support for jetsort 2025-11-05 10:34:09 +01:00
af5d2918ec Common comm params 2025-11-05 09:21:59 +01:00
2 changed files with 91 additions and 82 deletions

71
main.py
View File

@ -45,6 +45,21 @@ Examples:
""", """,
) )
def add_common_arguments(subparser: argparse.ArgumentParser):
subparser.add_argument(
"--port",
"-p",
default=DEFAULT_PORT,
help=f"Serial port (default: {DEFAULT_PORT})",
)
subparser.add_argument(
"--baudrate",
"-b",
type=int,
default=DEFAULT_BAUDRATE,
help=f"Baud rate (default: {DEFAULT_BAUDRATE})",
)
# Add subparsers for each simulator type # Add subparsers for each simulator type
subparsers = parser.add_subparsers( subparsers = parser.add_subparsers(
dest="simulator", help="Simulator type to run", required=True dest="simulator", help="Simulator type to run", required=True
@ -54,73 +69,25 @@ Examples:
pelican_parser = subparsers.add_parser( pelican_parser = subparsers.add_parser(
"pelican", help="Run Pelican coin counter simulator" "pelican", help="Run Pelican coin counter simulator"
) )
pelican_parser.add_argument( add_common_arguments(pelican_parser)
"--port",
"-p",
default=DEFAULT_PORT,
help=f"Serial port (default: {DEFAULT_PORT})",
)
pelican_parser.add_argument(
"--baudrate",
"-b",
type=int,
default=DEFAULT_BAUDRATE,
help=f"Baud rate (default: {DEFAULT_BAUDRATE})",
)
# Glory simulator subcommand # Glory simulator subcommand
glory_parser = subparsers.add_parser( glory_parser = subparsers.add_parser(
"glory", help="Run Glory MACH6 coin counter simulator" "glory", help="Run Glory MACH6 coin counter simulator"
) )
glory_parser.add_argument( add_common_arguments(glory_parser)
"--port",
"-p",
default=DEFAULT_PORT,
help=f"Serial port (default: {DEFAULT_PORT})",
)
glory_parser.add_argument(
"--baudrate",
"-b",
type=int,
default=DEFAULT_BAUDRATE,
help=f"Baud rate (default: {DEFAULT_BAUDRATE})",
)
# Selex simulator subcommand # Selex simulator subcommand
selex_parser = subparsers.add_parser( selex_parser = subparsers.add_parser(
"selex", help="Run Selex coin counter simulator" "selex", help="Run Selex coin counter simulator"
) )
selex_parser.add_argument( add_common_arguments(selex_parser)
"--port",
"-p",
default=DEFAULT_PORT,
help=f"Serial port (default: {DEFAULT_PORT})",
)
selex_parser.add_argument(
"--baudrate",
"-b",
type=int,
default=DEFAULT_BAUDRATE,
help=f"Baud rate (default: {DEFAULT_BAUDRATE})",
)
# JetSort simulator subcommand # JetSort simulator subcommand
jetsort_parser = subparsers.add_parser( jetsort_parser = subparsers.add_parser(
"jetsort", help="Run JetSort coin/bill counter simulator" "jetsort", help="Run JetSort coin/bill counter simulator"
) )
jetsort_parser.add_argument( add_common_arguments(jetsort_parser)
"--port",
"-p",
default=DEFAULT_PORT,
help=f"Serial port (default: {DEFAULT_PORT})",
)
jetsort_parser.add_argument(
"--baudrate",
"-b",
type=int,
default=DEFAULT_BAUDRATE,
help=f"Baud rate (default: {DEFAULT_BAUDRATE})",
)
args = parser.parse_args() args = parser.parse_args()

View File

@ -76,6 +76,10 @@ class JetSortSimulator:
# Bag limit settings (in cents) # Bag limit settings (in cents)
self.bag_limit = 50000 # $500.00 self.bag_limit = 50000 # $500.00
# Workflow state
self.counting_in_progress = False
self.report_ready = False
def _calculate_checksum(self, data: bytes) -> int: def _calculate_checksum(self, data: bytes) -> int:
"""Calculate checksum for packet""" """Calculate checksum for packet"""
checksum = 0 checksum = 0
@ -85,7 +89,7 @@ class JetSortSimulator:
return checksum & 0xFF return checksum & 0xFF
def _create_packet(self, title: str, data: str) -> bytes: def _create_packet(self, title: str, data: str) -> bytes:
"""Create a packet with STX, checksum, length, title, data, and ETX""" """Create a packet with STX, checksum, length, CR, LF, title, data, and ETX"""
# Build packet content (without STX and ETX) # Build packet content (without STX and ETX)
content = f"{title}\r\n{data}".encode("ascii") content = f"{title}\r\n{data}".encode("ascii")
@ -93,12 +97,12 @@ class JetSortSimulator:
checksum = self._calculate_checksum(content) checksum = self._calculate_checksum(content)
# Calculate length (high and low bytes) # Calculate length (high and low bytes)
length = len(content) + 2 # +2 for CR LF after title length = len(content) + 2 # +2 for CR LF after length bytes
len_high = (length >> 8) & 0xFF len_high = (length >> 8) & 0xFF
len_low = length & 0xFF len_low = length & 0xFF
# Build full packet # Build full packet: STX + CHECKSUM + LENGTH + CR + LF + content + ETX
packet = bytes([STX, checksum, len_high, len_low]) + content + bytes([ETX]) packet = bytes([STX, checksum, len_high, len_low, CR, LF]) + content + bytes([ETX])
return packet return packet
@ -376,24 +380,49 @@ class JetSortSimulator:
logger.info(f"Sending SUB-BATCH report ({len(packet)} bytes)") logger.info(f"Sending SUB-BATCH report ({len(packet)} bytes)")
return packet return packet
def send_automatic_report(self): def handle_s_batch_button(self):
"""Send automatic SUB-BATCH report (immediate mode)""" """Handle S-BATCH button press - starts counting"""
if not self.polled_mode and self.serial_conn: logger.info("=" * 60)
logger.info("Sending automatic SUB-BATCH report") logger.info("S-BATCH button pressed - Starting counting...")
logger.info("=" * 60)
# Simulate new batch # Simulate counting
self._simulate_counting() self._simulate_counting()
# Generate and send report # Update state
report_data = self._generate_sub_batch_report() self.counting_in_progress = True
packet = self._create_packet(REPORT_SUB_BATCH, report_data) self.report_ready = True
self.serial_conn.write(packet) logger.info("Counting complete. Press ENTER again (END button) to send report.")
logger.debug(f"TX: {' '.join(f'{b:02X}' for b in packet[:50])}...")
# Reset batch counters after sending def handle_end_button(self):
self.coin_values = [0] * self.num_coin_lines """Handle END button press - sends the report"""
self.bill_values = [0] * self.num_bill_lines if not self.report_ready:
logger.warning("No report ready to send. Press ENTER to start counting first.")
return
logger.info("=" * 60)
logger.info("END button pressed - Sending SUB-BATCH report")
logger.info("=" * 60)
# Generate and send report
report_data = self._generate_sub_batch_report()
packet = self._create_packet(REPORT_SUB_BATCH, report_data)
self.serial_conn.write(packet)
logger.debug(f"TX: {' '.join(f'{b:02X}' for b in packet[:50])}...")
logger.info("Report sent successfully")
# Reset batch counters after sending
self.coin_values = [0] * self.num_coin_lines
self.bill_values = [0] * self.num_bill_lines
# Reset state
self.counting_in_progress = False
self.report_ready = False
logger.info("")
logger.info("Ready for next batch. Press ENTER (S-BATCH button) to start counting.")
def run(self): def run(self):
"""Main simulator loop""" """Main simulator loop"""
@ -401,7 +430,14 @@ class JetSortSimulator:
f"Starting JetSort simulator on {self.port} at {self.baudrate} baud" f"Starting JetSort simulator on {self.port} at {self.baudrate} baud"
) )
logger.info("Protocol: JetSort Communication Package") logger.info("Protocol: JetSort Communication Package")
logger.info("Press ENTER to send a cash counting report") logger.info("")
logger.info("=" * 60)
logger.info("WORKFLOW:")
logger.info("1. Press ENTER → S-BATCH button (starts counting)")
logger.info("2. Press ENTER → END button (sends report)")
logger.info("=" * 60)
logger.info("")
logger.info("Ready for first batch. Press ENTER (S-BATCH button) to start counting.")
try: try:
self.serial_conn = serial.Serial( self.serial_conn = serial.Serial(
@ -414,31 +450,37 @@ class JetSortSimulator:
) )
logger.info("Serial port opened successfully") logger.info("Serial port opened successfully")
help_text_shown = False logger.info("")
while True: while True:
if not help_text_shown:
print("\nPress ENTER to send a cash counting report...")
help_text_shown = True
# Check for keyboard input (non-blocking) # Check for keyboard input (non-blocking)
if sys.platform != "win32": if sys.platform != "win32":
# Unix/Linux - use select # Unix/Linux - use select
ready, _, _ = select.select([sys.stdin], [], [], 0) ready, _, _ = select.select([sys.stdin], [], [], 0)
if ready: if ready:
sys.stdin.readline() # Consume the input sys.stdin.readline() # Consume the input
logger.info("Key pressed - sending cash counting report")
self.send_automatic_report() # Handle button press based on current state
help_text_shown = False if not self.report_ready:
# First press: S-BATCH button
self.handle_s_batch_button()
else:
# Second press: END button
self.handle_end_button()
else: else:
# Windows - use msvcrt # Windows - use msvcrt
import msvcrt import msvcrt
if msvcrt.kbhit(): if msvcrt.kbhit():
msvcrt.getch() # Consume the input msvcrt.getch() # Consume the input
logger.info("Key pressed - sending cash counting report")
self.send_automatic_report() # Handle button press based on current state
help_text_shown = False if not self.report_ready:
# First press: S-BATCH button
self.handle_s_batch_button()
else:
# Second press: END button
self.handle_end_button()
# Check for incoming data from serial port # Check for incoming data from serial port
if self.serial_conn.in_waiting > 0: if self.serial_conn.in_waiting > 0: