Compare commits
4 Commits
bd8072dccf
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 79af285f58 | |||
| 17958f837a | |||
| bffa4af810 | |||
| af5d2918ec |
71
main.py
71
main.py
@ -45,6 +45,21 @@ Examples:
|
|||||||
""",
|
""",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def add_common_arguments(subparser: argparse.ArgumentParser):
|
||||||
|
subparser.add_argument(
|
||||||
|
"--port",
|
||||||
|
"-p",
|
||||||
|
default=DEFAULT_PORT,
|
||||||
|
help=f"Serial port (default: {DEFAULT_PORT})",
|
||||||
|
)
|
||||||
|
subparser.add_argument(
|
||||||
|
"--baudrate",
|
||||||
|
"-b",
|
||||||
|
type=int,
|
||||||
|
default=DEFAULT_BAUDRATE,
|
||||||
|
help=f"Baud rate (default: {DEFAULT_BAUDRATE})",
|
||||||
|
)
|
||||||
|
|
||||||
# Add subparsers for each simulator type
|
# Add subparsers for each simulator type
|
||||||
subparsers = parser.add_subparsers(
|
subparsers = parser.add_subparsers(
|
||||||
dest="simulator", help="Simulator type to run", required=True
|
dest="simulator", help="Simulator type to run", required=True
|
||||||
@ -54,73 +69,25 @@ Examples:
|
|||||||
pelican_parser = subparsers.add_parser(
|
pelican_parser = subparsers.add_parser(
|
||||||
"pelican", help="Run Pelican coin counter simulator"
|
"pelican", help="Run Pelican coin counter simulator"
|
||||||
)
|
)
|
||||||
pelican_parser.add_argument(
|
add_common_arguments(pelican_parser)
|
||||||
"--port",
|
|
||||||
"-p",
|
|
||||||
default=DEFAULT_PORT,
|
|
||||||
help=f"Serial port (default: {DEFAULT_PORT})",
|
|
||||||
)
|
|
||||||
pelican_parser.add_argument(
|
|
||||||
"--baudrate",
|
|
||||||
"-b",
|
|
||||||
type=int,
|
|
||||||
default=DEFAULT_BAUDRATE,
|
|
||||||
help=f"Baud rate (default: {DEFAULT_BAUDRATE})",
|
|
||||||
)
|
|
||||||
|
|
||||||
# Glory simulator subcommand
|
# Glory simulator subcommand
|
||||||
glory_parser = subparsers.add_parser(
|
glory_parser = subparsers.add_parser(
|
||||||
"glory", help="Run Glory MACH6 coin counter simulator"
|
"glory", help="Run Glory MACH6 coin counter simulator"
|
||||||
)
|
)
|
||||||
glory_parser.add_argument(
|
add_common_arguments(glory_parser)
|
||||||
"--port",
|
|
||||||
"-p",
|
|
||||||
default=DEFAULT_PORT,
|
|
||||||
help=f"Serial port (default: {DEFAULT_PORT})",
|
|
||||||
)
|
|
||||||
glory_parser.add_argument(
|
|
||||||
"--baudrate",
|
|
||||||
"-b",
|
|
||||||
type=int,
|
|
||||||
default=DEFAULT_BAUDRATE,
|
|
||||||
help=f"Baud rate (default: {DEFAULT_BAUDRATE})",
|
|
||||||
)
|
|
||||||
|
|
||||||
# Selex simulator subcommand
|
# Selex simulator subcommand
|
||||||
selex_parser = subparsers.add_parser(
|
selex_parser = subparsers.add_parser(
|
||||||
"selex", help="Run Selex coin counter simulator"
|
"selex", help="Run Selex coin counter simulator"
|
||||||
)
|
)
|
||||||
selex_parser.add_argument(
|
add_common_arguments(selex_parser)
|
||||||
"--port",
|
|
||||||
"-p",
|
|
||||||
default=DEFAULT_PORT,
|
|
||||||
help=f"Serial port (default: {DEFAULT_PORT})",
|
|
||||||
)
|
|
||||||
selex_parser.add_argument(
|
|
||||||
"--baudrate",
|
|
||||||
"-b",
|
|
||||||
type=int,
|
|
||||||
default=DEFAULT_BAUDRATE,
|
|
||||||
help=f"Baud rate (default: {DEFAULT_BAUDRATE})",
|
|
||||||
)
|
|
||||||
|
|
||||||
# JetSort simulator subcommand
|
# JetSort simulator subcommand
|
||||||
jetsort_parser = subparsers.add_parser(
|
jetsort_parser = subparsers.add_parser(
|
||||||
"jetsort", help="Run JetSort coin/bill counter simulator"
|
"jetsort", help="Run JetSort coin/bill counter simulator"
|
||||||
)
|
)
|
||||||
jetsort_parser.add_argument(
|
add_common_arguments(jetsort_parser)
|
||||||
"--port",
|
|
||||||
"-p",
|
|
||||||
default=DEFAULT_PORT,
|
|
||||||
help=f"Serial port (default: {DEFAULT_PORT})",
|
|
||||||
)
|
|
||||||
jetsort_parser.add_argument(
|
|
||||||
"--baudrate",
|
|
||||||
"-b",
|
|
||||||
type=int,
|
|
||||||
default=DEFAULT_BAUDRATE,
|
|
||||||
help=f"Baud rate (default: {DEFAULT_BAUDRATE})",
|
|
||||||
)
|
|
||||||
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
|||||||
20
source/common.py
Normal file
20
source/common.py
Normal file
@ -0,0 +1,20 @@
|
|||||||
|
from typing import Literal
|
||||||
|
|
||||||
|
|
||||||
|
def format_comm_debug(
|
||||||
|
prefix: Literal["RX", "TX"], data: bytes, include_ascii: bool = False
|
||||||
|
) -> str:
|
||||||
|
hex_representation = " ".join(f"{byte:02X}" for byte in data)
|
||||||
|
|
||||||
|
output = [
|
||||||
|
f"{prefix}: ",
|
||||||
|
hex_representation,
|
||||||
|
]
|
||||||
|
|
||||||
|
if include_ascii:
|
||||||
|
ascii_representation = "".join(
|
||||||
|
(chr(byte) if 32 <= byte <= 126 else ".") for byte in data
|
||||||
|
)
|
||||||
|
output.append(" // ASCII: " + ascii_representation)
|
||||||
|
|
||||||
|
return "".join(output)
|
||||||
@ -12,6 +12,8 @@ from typing import Optional
|
|||||||
import serial
|
import serial
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
|
|
||||||
|
from source.common import format_comm_debug
|
||||||
|
|
||||||
# Protocol constants
|
# Protocol constants
|
||||||
STX = 0x02
|
STX = 0x02
|
||||||
ETX = 0x03
|
ETX = 0x03
|
||||||
@ -151,8 +153,7 @@ class GlorySimulator:
|
|||||||
|
|
||||||
# Calculate total monetary value (sum of count × denomination for all coins)
|
# Calculate total monetary value (sum of count × denomination for all coins)
|
||||||
total_value = sum(
|
total_value = sum(
|
||||||
count * int(denom)
|
count * int(denom) for denom, count in self.batch_counts.items()
|
||||||
for denom, count in self.batch_counts.items()
|
|
||||||
)
|
)
|
||||||
response = f"BT{total_value:08d}"
|
response = f"BT{total_value:08d}"
|
||||||
logger.info(f"Get batch total: ${total_value/100:.2f}")
|
logger.info(f"Get batch total: ${total_value/100:.2f}")
|
||||||
@ -162,8 +163,7 @@ class GlorySimulator:
|
|||||||
elif cmd == "GS":
|
elif cmd == "GS":
|
||||||
# Get subtotal (monetary value)
|
# Get subtotal (monetary value)
|
||||||
total_value = sum(
|
total_value = sum(
|
||||||
count * int(denom)
|
count * int(denom) for denom, count in self.sub_counts.items()
|
||||||
for denom, count in self.sub_counts.items()
|
|
||||||
)
|
)
|
||||||
response = f"BS{total_value:08d}"
|
response = f"BS{total_value:08d}"
|
||||||
logger.info(f"Get subtotal: ${total_value/100:.2f}")
|
logger.info(f"Get subtotal: ${total_value/100:.2f}")
|
||||||
@ -172,8 +172,7 @@ class GlorySimulator:
|
|||||||
elif cmd == "GG":
|
elif cmd == "GG":
|
||||||
# Get grand total (monetary value)
|
# Get grand total (monetary value)
|
||||||
total_value = sum(
|
total_value = sum(
|
||||||
count * int(denom)
|
count * int(denom) for denom, count in self.grand_counts.items()
|
||||||
for denom, count in self.grand_counts.items()
|
|
||||||
)
|
)
|
||||||
response = f"BG{total_value:08d}"
|
response = f"BG{total_value:08d}"
|
||||||
logger.info(f"Get grand total: ${total_value/100:.2f}")
|
logger.info(f"Get grand total: ${total_value/100:.2f}")
|
||||||
@ -195,7 +194,9 @@ class GlorySimulator:
|
|||||||
denom_value = int(denom_str) # e.g., "001" = 1 cent, "025" = 25 cents
|
denom_value = int(denom_str) # e.g., "001" = 1 cent, "025" = 25 cents
|
||||||
monetary_value = count * denom_value
|
monetary_value = count * denom_value
|
||||||
response = f"BT{monetary_value:08d}"
|
response = f"BT{monetary_value:08d}"
|
||||||
logger.info(f"Get batch value for {denom_str}: {count} coins × ${denom_value/100:.2f} = ${monetary_value/100:.2f}")
|
logger.info(
|
||||||
|
f"Get batch value for {denom_str}: {count} coins × ${denom_value/100:.2f} = ${monetary_value/100:.2f}"
|
||||||
|
)
|
||||||
return self.create_message(response)
|
return self.create_message(response)
|
||||||
|
|
||||||
elif cmd == "GI":
|
elif cmd == "GI":
|
||||||
@ -270,8 +271,7 @@ class GlorySimulator:
|
|||||||
if cmd == "AB" or cmd == "Ab":
|
if cmd == "AB" or cmd == "Ab":
|
||||||
# Accept batch (monetary value)
|
# Accept batch (monetary value)
|
||||||
total_value = sum(
|
total_value = sum(
|
||||||
count * int(denom)
|
count * int(denom) for denom, count in self.batch_counts.items()
|
||||||
for denom, count in self.batch_counts.items()
|
|
||||||
)
|
)
|
||||||
response = f"BT{total_value:08d}"
|
response = f"BT{total_value:08d}"
|
||||||
logger.info(f"Accept batch: ${total_value/100:.2f}")
|
logger.info(f"Accept batch: ${total_value/100:.2f}")
|
||||||
@ -281,8 +281,7 @@ class GlorySimulator:
|
|||||||
elif cmd == "AS" or cmd == "As":
|
elif cmd == "AS" or cmd == "As":
|
||||||
# Accept subtotal (monetary value)
|
# Accept subtotal (monetary value)
|
||||||
total_value = sum(
|
total_value = sum(
|
||||||
count * int(denom)
|
count * int(denom) for denom, count in self.sub_counts.items()
|
||||||
for denom, count in self.sub_counts.items()
|
|
||||||
)
|
)
|
||||||
response = f"BS{total_value:08d}"
|
response = f"BS{total_value:08d}"
|
||||||
logger.info(f"Accept subtotal: ${total_value/100:.2f}")
|
logger.info(f"Accept subtotal: ${total_value/100:.2f}")
|
||||||
@ -292,8 +291,7 @@ class GlorySimulator:
|
|||||||
elif cmd == "AG" or cmd == "Ag":
|
elif cmd == "AG" or cmd == "Ag":
|
||||||
# Accept grand total (monetary value)
|
# Accept grand total (monetary value)
|
||||||
total_value = sum(
|
total_value = sum(
|
||||||
count * int(denom)
|
count * int(denom) for denom, count in self.grand_counts.items()
|
||||||
for denom, count in self.grand_counts.items()
|
|
||||||
)
|
)
|
||||||
response = f"BG{total_value:08d}"
|
response = f"BG{total_value:08d}"
|
||||||
logger.info(f"Accept grand total: ${total_value/100:.2f}")
|
logger.info(f"Accept grand total: ${total_value/100:.2f}")
|
||||||
@ -483,8 +481,9 @@ class GlorySimulator:
|
|||||||
message = bytes(buffer[stx_idx : etx_idx + 1])
|
message = bytes(buffer[stx_idx : etx_idx + 1])
|
||||||
buffer = buffer[etx_idx + 1 :]
|
buffer = buffer[etx_idx + 1 :]
|
||||||
|
|
||||||
|
# logger.debug(f"RX: {' '.join(f'{b:02X}' for b in message)}")
|
||||||
logger.debug(
|
logger.debug(
|
||||||
f"RX: {' '.join(f'{b:02X}' for b in message)}"
|
format_comm_debug("RX", message, include_ascii=True)
|
||||||
)
|
)
|
||||||
|
|
||||||
# Parse and handle message
|
# Parse and handle message
|
||||||
@ -493,7 +492,9 @@ class GlorySimulator:
|
|||||||
response = self.handle_command(parsed_data)
|
response = self.handle_command(parsed_data)
|
||||||
if response:
|
if response:
|
||||||
logger.debug(
|
logger.debug(
|
||||||
f"TX: {' '.join(f'{b:02X}' for b in response)}"
|
format_comm_debug(
|
||||||
|
"TX", response, include_ascii=True
|
||||||
|
)
|
||||||
)
|
)
|
||||||
self.serial_conn.write(response)
|
self.serial_conn.write(response)
|
||||||
except ValueError:
|
except ValueError:
|
||||||
|
|||||||
@ -16,6 +16,8 @@ from typing import Optional
|
|||||||
import serial
|
import serial
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
|
|
||||||
|
from .common import format_comm_debug
|
||||||
|
|
||||||
# Protocol constants
|
# Protocol constants
|
||||||
STX = 0x02
|
STX = 0x02
|
||||||
ETX = 0x03
|
ETX = 0x03
|
||||||
@ -76,6 +78,10 @@ class JetSortSimulator:
|
|||||||
# Bag limit settings (in cents)
|
# Bag limit settings (in cents)
|
||||||
self.bag_limit = 50000 # $500.00
|
self.bag_limit = 50000 # $500.00
|
||||||
|
|
||||||
|
# Workflow state
|
||||||
|
self.counting_in_progress = False
|
||||||
|
self.report_ready = False
|
||||||
|
|
||||||
def _calculate_checksum(self, data: bytes) -> int:
|
def _calculate_checksum(self, data: bytes) -> int:
|
||||||
"""Calculate checksum for packet"""
|
"""Calculate checksum for packet"""
|
||||||
checksum = 0
|
checksum = 0
|
||||||
@ -85,7 +91,7 @@ class JetSortSimulator:
|
|||||||
return checksum & 0xFF
|
return checksum & 0xFF
|
||||||
|
|
||||||
def _create_packet(self, title: str, data: str) -> bytes:
|
def _create_packet(self, title: str, data: str) -> bytes:
|
||||||
"""Create a packet with STX, checksum, length, title, data, and ETX"""
|
"""Create a packet with STX, checksum, length, CR, LF, title, data, and ETX"""
|
||||||
# Build packet content (without STX and ETX)
|
# Build packet content (without STX and ETX)
|
||||||
content = f"{title}\r\n{data}".encode("ascii")
|
content = f"{title}\r\n{data}".encode("ascii")
|
||||||
|
|
||||||
@ -93,12 +99,12 @@ class JetSortSimulator:
|
|||||||
checksum = self._calculate_checksum(content)
|
checksum = self._calculate_checksum(content)
|
||||||
|
|
||||||
# Calculate length (high and low bytes)
|
# Calculate length (high and low bytes)
|
||||||
length = len(content) + 2 # +2 for CR LF after title
|
length = len(content) + 2 # +2 for CR LF after length bytes
|
||||||
len_high = (length >> 8) & 0xFF
|
len_high = (length >> 8) & 0xFF
|
||||||
len_low = length & 0xFF
|
len_low = length & 0xFF
|
||||||
|
|
||||||
# Build full packet
|
# Build full packet: STX + CHECKSUM + LENGTH + CR + LF + content + ETX
|
||||||
packet = bytes([STX, checksum, len_high, len_low]) + content + bytes([ETX])
|
packet = bytes([STX, checksum, len_high, len_low, CR, LF]) + content + bytes([ETX])
|
||||||
|
|
||||||
return packet
|
return packet
|
||||||
|
|
||||||
@ -376,32 +382,64 @@ class JetSortSimulator:
|
|||||||
logger.info(f"Sending SUB-BATCH report ({len(packet)} bytes)")
|
logger.info(f"Sending SUB-BATCH report ({len(packet)} bytes)")
|
||||||
return packet
|
return packet
|
||||||
|
|
||||||
def send_automatic_report(self):
|
def handle_s_batch_button(self):
|
||||||
"""Send automatic SUB-BATCH report (immediate mode)"""
|
"""Handle S-BATCH button press - starts counting"""
|
||||||
if not self.polled_mode and self.serial_conn:
|
logger.info("=" * 60)
|
||||||
logger.info("Sending automatic SUB-BATCH report")
|
logger.info("S-BATCH button pressed - Starting counting...")
|
||||||
|
logger.info("=" * 60)
|
||||||
|
|
||||||
# Simulate new batch
|
# Simulate counting
|
||||||
self._simulate_counting()
|
self._simulate_counting()
|
||||||
|
|
||||||
|
# Update state
|
||||||
|
self.counting_in_progress = True
|
||||||
|
self.report_ready = True
|
||||||
|
|
||||||
|
logger.info("Counting complete. Press ENTER again (END button) to send report.")
|
||||||
|
|
||||||
|
def handle_end_button(self):
|
||||||
|
"""Handle END button press - sends the report"""
|
||||||
|
if not self.report_ready:
|
||||||
|
logger.warning("No report ready to send. Press ENTER to start counting first.")
|
||||||
|
return
|
||||||
|
|
||||||
|
logger.info("=" * 60)
|
||||||
|
logger.info("END button pressed - Sending SUB-BATCH report")
|
||||||
|
logger.info("=" * 60)
|
||||||
|
|
||||||
# Generate and send report
|
# Generate and send report
|
||||||
report_data = self._generate_sub_batch_report()
|
report_data = self._generate_sub_batch_report()
|
||||||
packet = self._create_packet(REPORT_SUB_BATCH, report_data)
|
packet = self._create_packet(REPORT_SUB_BATCH, report_data)
|
||||||
|
|
||||||
self.serial_conn.write(packet)
|
self.serial_conn.write(packet)
|
||||||
logger.debug(f"TX: {' '.join(f'{b:02X}' for b in packet[:50])}...")
|
logger.debug(format_comm_debug("TX", packet) + "...")
|
||||||
|
logger.info("Report sent successfully")
|
||||||
|
|
||||||
# Reset batch counters after sending
|
# Reset batch counters after sending
|
||||||
self.coin_values = [0] * self.num_coin_lines
|
self.coin_values = [0] * self.num_coin_lines
|
||||||
self.bill_values = [0] * self.num_bill_lines
|
self.bill_values = [0] * self.num_bill_lines
|
||||||
|
|
||||||
|
# Reset state
|
||||||
|
self.counting_in_progress = False
|
||||||
|
self.report_ready = False
|
||||||
|
|
||||||
|
logger.info("")
|
||||||
|
logger.info("Ready for next batch. Press ENTER (S-BATCH button) to start counting.")
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
"""Main simulator loop"""
|
"""Main simulator loop"""
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Starting JetSort simulator on {self.port} at {self.baudrate} baud"
|
f"Starting JetSort simulator on {self.port} at {self.baudrate} baud"
|
||||||
)
|
)
|
||||||
logger.info("Protocol: JetSort Communication Package")
|
logger.info("Protocol: JetSort Communication Package")
|
||||||
logger.info("Press ENTER to send a cash counting report")
|
logger.info("")
|
||||||
|
logger.info("=" * 60)
|
||||||
|
logger.info("WORKFLOW:")
|
||||||
|
logger.info("1. Press ENTER → S-BATCH button (starts counting)")
|
||||||
|
logger.info("2. Press ENTER → END button (sends report)")
|
||||||
|
logger.info("=" * 60)
|
||||||
|
logger.info("")
|
||||||
|
logger.info("Ready for first batch. Press ENTER (S-BATCH button) to start counting.")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.serial_conn = serial.Serial(
|
self.serial_conn = serial.Serial(
|
||||||
@ -414,37 +452,43 @@ class JetSortSimulator:
|
|||||||
)
|
)
|
||||||
|
|
||||||
logger.info("Serial port opened successfully")
|
logger.info("Serial port opened successfully")
|
||||||
help_text_shown = False
|
logger.info("")
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
if not help_text_shown:
|
|
||||||
print("\nPress ENTER to send a cash counting report...")
|
|
||||||
help_text_shown = True
|
|
||||||
|
|
||||||
# Check for keyboard input (non-blocking)
|
# Check for keyboard input (non-blocking)
|
||||||
if sys.platform != "win32":
|
if sys.platform != "win32":
|
||||||
# Unix/Linux - use select
|
# Unix/Linux - use select
|
||||||
ready, _, _ = select.select([sys.stdin], [], [], 0)
|
ready, _, _ = select.select([sys.stdin], [], [], 0)
|
||||||
if ready:
|
if ready:
|
||||||
sys.stdin.readline() # Consume the input
|
sys.stdin.readline() # Consume the input
|
||||||
logger.info("Key pressed - sending cash counting report")
|
|
||||||
self.send_automatic_report()
|
# Handle button press based on current state
|
||||||
help_text_shown = False
|
if not self.report_ready:
|
||||||
|
# First press: S-BATCH button
|
||||||
|
self.handle_s_batch_button()
|
||||||
|
else:
|
||||||
|
# Second press: END button
|
||||||
|
self.handle_end_button()
|
||||||
else:
|
else:
|
||||||
# Windows - use msvcrt
|
# Windows - use msvcrt
|
||||||
import msvcrt
|
import msvcrt
|
||||||
|
|
||||||
if msvcrt.kbhit():
|
if msvcrt.kbhit():
|
||||||
msvcrt.getch() # Consume the input
|
msvcrt.getch() # Consume the input
|
||||||
logger.info("Key pressed - sending cash counting report")
|
|
||||||
self.send_automatic_report()
|
# Handle button press based on current state
|
||||||
help_text_shown = False
|
if not self.report_ready:
|
||||||
|
# First press: S-BATCH button
|
||||||
|
self.handle_s_batch_button()
|
||||||
|
else:
|
||||||
|
# Second press: END button
|
||||||
|
self.handle_end_button()
|
||||||
|
|
||||||
# Check for incoming data from serial port
|
# Check for incoming data from serial port
|
||||||
if self.serial_conn.in_waiting > 0:
|
if self.serial_conn.in_waiting > 0:
|
||||||
data = self.serial_conn.read(self.serial_conn.in_waiting)
|
data = self.serial_conn.read(self.serial_conn.in_waiting)
|
||||||
|
|
||||||
logger.debug(f"RX: {' '.join(f'{b:02X}' for b in data)}")
|
logger.debug(format_comm_debug("RX", data))
|
||||||
|
|
||||||
# Check for ENQ (poll)
|
# Check for ENQ (poll)
|
||||||
if ENQ in data:
|
if ENQ in data:
|
||||||
@ -452,7 +496,7 @@ class JetSortSimulator:
|
|||||||
if response:
|
if response:
|
||||||
self.serial_conn.write(response)
|
self.serial_conn.write(response)
|
||||||
logger.debug(
|
logger.debug(
|
||||||
f"TX: {' '.join(f'{b:02X}' for b in response[:50])}..."
|
f"TX: {' '.join(f'{b:02X}' for b in response)}..."
|
||||||
)
|
)
|
||||||
|
|
||||||
# Reset batch counters
|
# Reset batch counters
|
||||||
|
|||||||
@ -11,6 +11,8 @@ from typing import Optional
|
|||||||
import serial
|
import serial
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
|
|
||||||
|
from .common import format_comm_debug
|
||||||
|
|
||||||
# Protocol constants
|
# Protocol constants
|
||||||
STX = 0x02
|
STX = 0x02
|
||||||
ETX = 0x03
|
ETX = 0x03
|
||||||
@ -640,18 +642,14 @@ class PelicanSimulator:
|
|||||||
message = bytes(buffer[stx_idx : etx_idx + 1])
|
message = bytes(buffer[stx_idx : etx_idx + 1])
|
||||||
buffer = buffer[etx_idx + 1 :]
|
buffer = buffer[etx_idx + 1 :]
|
||||||
|
|
||||||
logger.debug(
|
logger.debug(format_comm_debug("RX", message))
|
||||||
f"RX: {' '.join(f'{b:02X}' for b in message)}"
|
|
||||||
)
|
|
||||||
|
|
||||||
# Parse and handle message
|
# Parse and handle message
|
||||||
parsed_data = self.parse_message(message)
|
parsed_data = self.parse_message(message)
|
||||||
if parsed_data:
|
if parsed_data:
|
||||||
response = self.handle_command(parsed_data)
|
response = self.handle_command(parsed_data)
|
||||||
if response:
|
if response:
|
||||||
logger.debug(
|
logger.debug(format_comm_debug("TX", response))
|
||||||
f"TX: {' '.join(f'{b:02X}' for b in response)}"
|
|
||||||
)
|
|
||||||
self.serial_conn.write(response)
|
self.serial_conn.write(response)
|
||||||
except ValueError:
|
except ValueError:
|
||||||
pass # ETX not found yet
|
pass # ETX not found yet
|
||||||
|
|||||||
132
source/selex.py
132
source/selex.py
@ -13,13 +13,15 @@ from typing import Optional
|
|||||||
import serial
|
import serial
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
|
|
||||||
|
from .common import format_comm_debug
|
||||||
|
|
||||||
# Protocol constants
|
# Protocol constants
|
||||||
STX = ord('s') # 0x73
|
STX = ord("s") # 0x73
|
||||||
ETX = ord('e') # 0x65
|
ETX = ord("e") # 0x65
|
||||||
CRC = ord('v') # 0x76
|
CRC = ord("v") # 0x76
|
||||||
CAN = ord('c') # 0x63
|
CAN = ord("c") # 0x63
|
||||||
ACK = ord('a') # 0x61
|
ACK = ord("a") # 0x61
|
||||||
EOT = ord('t') # 0x74
|
EOT = ord("t") # 0x74
|
||||||
|
|
||||||
# Machine states
|
# Machine states
|
||||||
STATE_IDLE = "01" # Machine stops with finished counting
|
STATE_IDLE = "01" # Machine stops with finished counting
|
||||||
@ -106,8 +108,10 @@ class SelexSimulator:
|
|||||||
# Add 10 to first digit to indicate rejected coins
|
# Add 10 to first digit to indicate rejected coins
|
||||||
status_str = str(int(status_str[0]) + 1) + status_str[1]
|
status_str = str(int(status_str[0]) + 1) + status_str[1]
|
||||||
|
|
||||||
response = b'101' + status_str.encode() + self.error_code.encode()
|
response = b"101" + status_str.encode() + self.error_code.encode()
|
||||||
logger.info(f"Status request: state={self.machine_state}, error={self.error_code}, rejected={self.has_rejected_coins}")
|
logger.info(
|
||||||
|
f"Status request: state={self.machine_state}, error={self.error_code}, rejected={self.has_rejected_coins}"
|
||||||
|
)
|
||||||
return self.create_message(response)
|
return self.create_message(response)
|
||||||
|
|
||||||
def handle_start_counting(self, data: bytes) -> bytes:
|
def handle_start_counting(self, data: bytes) -> bytes:
|
||||||
@ -140,10 +144,14 @@ class SelexSimulator:
|
|||||||
logger.info("Resetting main total (not used in this simulator)")
|
logger.info("Resetting main total (not used in this simulator)")
|
||||||
return bytes([ACK])
|
return bytes([ACK])
|
||||||
|
|
||||||
|
def write_to_serial(self, data: bytes) -> None:
|
||||||
|
logger.debug(format_comm_debug("TX", data, include_ascii=True))
|
||||||
|
self.serial_conn.write(data)
|
||||||
|
|
||||||
def handle_actual_counters_request(self, data: bytes) -> bytes:
|
def handle_actual_counters_request(self, data: bytes) -> bytes:
|
||||||
"""Handle actual counting counters request (s400ve or sc00ve for checksum)"""
|
"""Handle actual counting counters request (s400ve or sc00ve for checksum)"""
|
||||||
cmd_char = chr(data[0]) if len(data) > 0 else '4'
|
cmd_char = chr(data[0]) if len(data) > 0 else "4"
|
||||||
with_checksum = (cmd_char == 'c')
|
with_checksum = cmd_char == "c"
|
||||||
|
|
||||||
logger.info(f"Actual counters request (checksum={with_checksum})")
|
logger.info(f"Actual counters request (checksum={with_checksum})")
|
||||||
|
|
||||||
@ -153,15 +161,17 @@ class SelexSimulator:
|
|||||||
self._simulate_counting()
|
self._simulate_counting()
|
||||||
|
|
||||||
# First ACK
|
# First ACK
|
||||||
self.serial_conn.write(bytes([ACK]))
|
ack_msg = bytes([ACK])
|
||||||
|
self.write_to_serial(ack_msg)
|
||||||
time.sleep(0.01)
|
time.sleep(0.01)
|
||||||
|
|
||||||
# Send each line's count
|
# Send each line's count
|
||||||
for line_num in range(1, self.num_lines + 1):
|
for line_num in range(1, self.num_lines + 1):
|
||||||
count = self.actual_counts[line_num - 1]
|
count = self.actual_counts[line_num - 1]
|
||||||
cmd = b'c' if with_checksum else b'4'
|
cmd = b"c" if with_checksum else b"4"
|
||||||
response = cmd + b'0' + f"{line_num:02d}{count:06d}".encode()
|
response = cmd + b"0" + f"{line_num:02d}{count:06d}".encode()
|
||||||
self.serial_conn.write(self.create_message(response))
|
response_msg = self.create_message(response)
|
||||||
|
self.write_to_serial(response_msg)
|
||||||
logger.info(f" Line {line_num}: {count} coins")
|
logger.info(f" Line {line_num}: {count} coins")
|
||||||
time.sleep(0.01)
|
time.sleep(0.01)
|
||||||
# Wait for ACK
|
# Wait for ACK
|
||||||
@ -170,8 +180,9 @@ class SelexSimulator:
|
|||||||
# Send checksum if requested
|
# Send checksum if requested
|
||||||
if with_checksum:
|
if with_checksum:
|
||||||
total = sum(self.actual_counts)
|
total = sum(self.actual_counts)
|
||||||
response = b'c099' + f"{total:06d}".encode()
|
response = b"c099" + f"{total:06d}".encode()
|
||||||
self.serial_conn.write(self.create_message(response))
|
response_msg = self.create_message(response)
|
||||||
|
self.write_to_serial(response_msg)
|
||||||
logger.info(f" Total checksum: {total} coins")
|
logger.info(f" Total checksum: {total} coins")
|
||||||
time.sleep(0.01)
|
time.sleep(0.01)
|
||||||
# Wait for ACK
|
# Wait for ACK
|
||||||
@ -190,21 +201,23 @@ class SelexSimulator:
|
|||||||
|
|
||||||
def handle_partial_totals_request(self, data: bytes) -> bytes:
|
def handle_partial_totals_request(self, data: bytes) -> bytes:
|
||||||
"""Handle partial total counters request (s600ve or sd00ve for checksum)"""
|
"""Handle partial total counters request (s600ve or sd00ve for checksum)"""
|
||||||
cmd_char = chr(data[0]) if len(data) > 0 else '6'
|
cmd_char = chr(data[0]) if len(data) > 0 else "6"
|
||||||
with_checksum = (cmd_char == 'd')
|
with_checksum = cmd_char == "d"
|
||||||
|
|
||||||
logger.info(f"Partial totals request (checksum={with_checksum})")
|
logger.info(f"Partial totals request (checksum={with_checksum})")
|
||||||
|
|
||||||
# First ACK
|
# First ACK
|
||||||
self.serial_conn.write(bytes([ACK]))
|
ack_msg = bytes([ACK])
|
||||||
|
self.write_to_serial(ack_msg)
|
||||||
time.sleep(0.01)
|
time.sleep(0.01)
|
||||||
|
|
||||||
# Send each line's partial total
|
# Send each line's partial total
|
||||||
for line_num in range(1, self.num_lines + 1):
|
for line_num in range(1, self.num_lines + 1):
|
||||||
count = self.partial_totals[line_num - 1]
|
count = self.partial_totals[line_num - 1]
|
||||||
cmd = b'd' if with_checksum else b'6'
|
cmd = b"d" if with_checksum else b"6"
|
||||||
response = cmd + b'0' + f"{line_num:02d}{count:06d}".encode()
|
response = cmd + b"0" + f"{line_num:02d}{count:06d}".encode()
|
||||||
self.serial_conn.write(self.create_message(response))
|
response_msg = self.create_message(response)
|
||||||
|
self.write_to_serial(response_msg)
|
||||||
logger.info(f" Line {line_num}: {count} coins (partial total)")
|
logger.info(f" Line {line_num}: {count} coins (partial total)")
|
||||||
time.sleep(0.01)
|
time.sleep(0.01)
|
||||||
# Wait for ACK
|
# Wait for ACK
|
||||||
@ -213,8 +226,9 @@ class SelexSimulator:
|
|||||||
# Send checksum if requested
|
# Send checksum if requested
|
||||||
if with_checksum:
|
if with_checksum:
|
||||||
total = sum(self.partial_totals)
|
total = sum(self.partial_totals)
|
||||||
response = b'd099' + f"{total:06d}".encode()
|
response = b"d099" + f"{total:06d}".encode()
|
||||||
self.serial_conn.write(self.create_message(response))
|
response_msg = self.create_message(response)
|
||||||
|
self.write_to_serial(response_msg)
|
||||||
logger.info(f" Total checksum: {total} coins")
|
logger.info(f" Total checksum: {total} coins")
|
||||||
time.sleep(0.01)
|
time.sleep(0.01)
|
||||||
# Wait for ACK
|
# Wait for ACK
|
||||||
@ -231,21 +245,23 @@ class SelexSimulator:
|
|||||||
|
|
||||||
def handle_batch_values_request(self, data: bytes) -> bytes:
|
def handle_batch_values_request(self, data: bytes) -> bytes:
|
||||||
"""Handle batching value request (s900ve or sf00ve for checksum)"""
|
"""Handle batching value request (s900ve or sf00ve for checksum)"""
|
||||||
cmd_char = chr(data[0]) if len(data) > 0 else '9'
|
cmd_char = chr(data[0]) if len(data) > 0 else "9"
|
||||||
with_checksum = (cmd_char == 'f')
|
with_checksum = cmd_char == "f"
|
||||||
|
|
||||||
logger.info(f"Batch values request (checksum={with_checksum})")
|
logger.info(f"Batch values request (checksum={with_checksum})")
|
||||||
|
|
||||||
# First ACK
|
# First ACK
|
||||||
self.serial_conn.write(bytes([ACK]))
|
ack_msg = bytes([ACK])
|
||||||
|
self.write_to_serial(ack_msg)
|
||||||
time.sleep(0.01)
|
time.sleep(0.01)
|
||||||
|
|
||||||
# Send each line's batch value
|
# Send each line's batch value
|
||||||
for line_num in range(1, self.num_lines + 1):
|
for line_num in range(1, self.num_lines + 1):
|
||||||
batch_val = self.batch_values[line_num - 1]
|
batch_val = self.batch_values[line_num - 1]
|
||||||
cmd = b'f' if with_checksum else b'9'
|
cmd = b"f" if with_checksum else b"9"
|
||||||
response = cmd + b'0' + f"{line_num:02d}{batch_val:06d}".encode()
|
response = cmd + b"0" + f"{line_num:02d}{batch_val:06d}".encode()
|
||||||
self.serial_conn.write(self.create_message(response))
|
response_msg = self.create_message(response)
|
||||||
|
self.write_to_serial(response_msg)
|
||||||
logger.info(f" Line {line_num}: {batch_val} coins (batch threshold)")
|
logger.info(f" Line {line_num}: {batch_val} coins (batch threshold)")
|
||||||
time.sleep(0.01)
|
time.sleep(0.01)
|
||||||
# Wait for ACK
|
# Wait for ACK
|
||||||
@ -254,8 +270,9 @@ class SelexSimulator:
|
|||||||
# Send checksum if requested
|
# Send checksum if requested
|
||||||
if with_checksum:
|
if with_checksum:
|
||||||
total = sum(self.batch_values)
|
total = sum(self.batch_values)
|
||||||
response = b'f099' + f"{total:06d}".encode()
|
response = b"f099" + f"{total:06d}".encode()
|
||||||
self.serial_conn.write(self.create_message(response))
|
response_msg = self.create_message(response)
|
||||||
|
self.write_to_serial(response_msg)
|
||||||
logger.info(f" Total checksum: {total} coins")
|
logger.info(f" Total checksum: {total} coins")
|
||||||
time.sleep(0.01)
|
time.sleep(0.01)
|
||||||
# Wait for ACK
|
# Wait for ACK
|
||||||
@ -270,7 +287,7 @@ class SelexSimulator:
|
|||||||
return bytes([CAN])
|
return bytes([CAN])
|
||||||
|
|
||||||
try:
|
try:
|
||||||
line_str = data[1:3].decode('ascii')
|
line_str = data[1:3].decode("ascii")
|
||||||
line_num = int(line_str)
|
line_num = int(line_str)
|
||||||
if 1 <= line_num <= self.num_lines:
|
if 1 <= line_num <= self.num_lines:
|
||||||
self.lines_enabled[line_num - 1] = False
|
self.lines_enabled[line_num - 1] = False
|
||||||
@ -287,7 +304,7 @@ class SelexSimulator:
|
|||||||
return bytes([CAN])
|
return bytes([CAN])
|
||||||
|
|
||||||
try:
|
try:
|
||||||
line_str = data[1:3].decode('ascii')
|
line_str = data[1:3].decode("ascii")
|
||||||
line_num = int(line_str)
|
line_num = int(line_str)
|
||||||
if 1 <= line_num <= self.num_lines:
|
if 1 <= line_num <= self.num_lines:
|
||||||
self.lines_enabled[line_num - 1] = True
|
self.lines_enabled[line_num - 1] = True
|
||||||
@ -301,13 +318,13 @@ class SelexSimulator:
|
|||||||
def handle_software_version(self, data: bytes) -> bytes:
|
def handle_software_version(self, data: bytes) -> bytes:
|
||||||
"""Handle software version request (sA00ve)"""
|
"""Handle software version request (sA00ve)"""
|
||||||
logger.info(f"Software version request: {self.sw_version}")
|
logger.info(f"Software version request: {self.sw_version}")
|
||||||
response = self.sw_version.encode() + b'\r\n'
|
response = self.sw_version.encode() + b"\r\n"
|
||||||
return response
|
return response
|
||||||
|
|
||||||
def handle_serial_number(self, data: bytes) -> bytes:
|
def handle_serial_number(self, data: bytes) -> bytes:
|
||||||
"""Handle serial number request (su00ve)"""
|
"""Handle serial number request (su00ve)"""
|
||||||
logger.info(f"Serial number request: {self.serial_number}")
|
logger.info(f"Serial number request: {self.serial_number}")
|
||||||
response = self.serial_number.encode() + b'\r\n'
|
response = self.serial_number.encode() + b"\r\n"
|
||||||
return response
|
return response
|
||||||
|
|
||||||
def _simulate_counting(self):
|
def _simulate_counting(self):
|
||||||
@ -353,77 +370,77 @@ class SelexSimulator:
|
|||||||
return bytes([CAN])
|
return bytes([CAN])
|
||||||
|
|
||||||
# If machine is counting, only accept status requests
|
# If machine is counting, only accept status requests
|
||||||
if self.is_counting and data[0] != ord('1'):
|
if self.is_counting and data[0] != ord("1"):
|
||||||
logger.warning("Machine is counting - only status requests accepted")
|
logger.warning("Machine is counting - only status requests accepted")
|
||||||
return bytes([CAN])
|
return bytes([CAN])
|
||||||
|
|
||||||
cmd = chr(data[0])
|
cmd = chr(data[0])
|
||||||
sub_cmd = data[1:3].decode('ascii', errors='ignore')
|
sub_cmd = data[1:3].decode("ascii", errors="ignore")
|
||||||
|
|
||||||
logger.info(f"Received command: {cmd}{sub_cmd}")
|
logger.info(f"Received command: {cmd}{sub_cmd}")
|
||||||
|
|
||||||
# Status requests
|
# Status requests
|
||||||
if cmd == '1' and sub_cmd == '00':
|
if cmd == "1" and sub_cmd == "00":
|
||||||
return self.handle_status_request(data)
|
return self.handle_status_request(data)
|
||||||
|
|
||||||
# Start counting
|
# Start counting
|
||||||
elif cmd == '2' and sub_cmd == '00':
|
elif cmd == "2" and sub_cmd == "00":
|
||||||
return self.handle_start_counting(data)
|
return self.handle_start_counting(data)
|
||||||
|
|
||||||
# Main total reset
|
# Main total reset
|
||||||
elif cmd == '3' and sub_cmd == '00':
|
elif cmd == "3" and sub_cmd == "00":
|
||||||
return self.handle_main_total_reset(data)
|
return self.handle_main_total_reset(data)
|
||||||
|
|
||||||
# Actual counters request
|
# Actual counters request
|
||||||
elif cmd == '4' and sub_cmd == '00':
|
elif cmd == "4" and sub_cmd == "00":
|
||||||
return self.handle_actual_counters_request(data)
|
return self.handle_actual_counters_request(data)
|
||||||
|
|
||||||
# Actual counters with checksum
|
# Actual counters with checksum
|
||||||
elif cmd == 'c' and sub_cmd == '00':
|
elif cmd == "c" and sub_cmd == "00":
|
||||||
return self.handle_actual_counters_request(data)
|
return self.handle_actual_counters_request(data)
|
||||||
|
|
||||||
# Uncompleted batch reset
|
# Uncompleted batch reset
|
||||||
elif cmd == '5' and sub_cmd == '00':
|
elif cmd == "5" and sub_cmd == "00":
|
||||||
return self.handle_batch_reset(data)
|
return self.handle_batch_reset(data)
|
||||||
|
|
||||||
# Partial totals request
|
# Partial totals request
|
||||||
elif cmd == '6' and sub_cmd == '00':
|
elif cmd == "6" and sub_cmd == "00":
|
||||||
return self.handle_partial_totals_request(data)
|
return self.handle_partial_totals_request(data)
|
||||||
|
|
||||||
# Partial totals with checksum
|
# Partial totals with checksum
|
||||||
elif cmd == 'd' and sub_cmd == '00':
|
elif cmd == "d" and sub_cmd == "00":
|
||||||
return self.handle_partial_totals_request(data)
|
return self.handle_partial_totals_request(data)
|
||||||
|
|
||||||
# Partial total reset
|
# Partial total reset
|
||||||
elif cmd == '7' and sub_cmd == '00':
|
elif cmd == "7" and sub_cmd == "00":
|
||||||
return self.handle_partial_total_reset(data)
|
return self.handle_partial_total_reset(data)
|
||||||
|
|
||||||
# Stop counting
|
# Stop counting
|
||||||
elif cmd == '8' and sub_cmd == '00':
|
elif cmd == "8" and sub_cmd == "00":
|
||||||
return self.handle_stop_counting(data)
|
return self.handle_stop_counting(data)
|
||||||
|
|
||||||
# Batch values request
|
# Batch values request
|
||||||
elif cmd == '9' and sub_cmd == '00':
|
elif cmd == "9" and sub_cmd == "00":
|
||||||
return self.handle_batch_values_request(data)
|
return self.handle_batch_values_request(data)
|
||||||
|
|
||||||
# Batch values with checksum
|
# Batch values with checksum
|
||||||
elif cmd == 'f' and sub_cmd == '00':
|
elif cmd == "f" and sub_cmd == "00":
|
||||||
return self.handle_batch_values_request(data)
|
return self.handle_batch_values_request(data)
|
||||||
|
|
||||||
# Line disable
|
# Line disable
|
||||||
elif cmd == 'C':
|
elif cmd == "C":
|
||||||
return self.handle_line_disable(data)
|
return self.handle_line_disable(data)
|
||||||
|
|
||||||
# Line enable
|
# Line enable
|
||||||
elif cmd == 'D':
|
elif cmd == "D":
|
||||||
return self.handle_line_enable(data)
|
return self.handle_line_enable(data)
|
||||||
|
|
||||||
# Software version
|
# Software version
|
||||||
elif cmd == 'A' and sub_cmd == '00':
|
elif cmd == "A" and sub_cmd == "00":
|
||||||
return self.handle_software_version(data)
|
return self.handle_software_version(data)
|
||||||
|
|
||||||
# Serial number
|
# Serial number
|
||||||
elif cmd == 'u' and sub_cmd == '00':
|
elif cmd == "u" and sub_cmd == "00":
|
||||||
return self.handle_serial_number(data)
|
return self.handle_serial_number(data)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
@ -462,15 +479,16 @@ class SelexSimulator:
|
|||||||
message = bytes(buffer[stx_idx : etx_idx + 1])
|
message = bytes(buffer[stx_idx : etx_idx + 1])
|
||||||
buffer = buffer[etx_idx + 1 :]
|
buffer = buffer[etx_idx + 1 :]
|
||||||
|
|
||||||
logger.debug(f"RX: {' '.join(f'{b:02X}' for b in message)}")
|
logger.debug(
|
||||||
|
format_comm_debug("RX", message, include_ascii=True)
|
||||||
|
)
|
||||||
|
|
||||||
# Parse and handle message
|
# Parse and handle message
|
||||||
parsed_data = self.parse_message(message)
|
parsed_data = self.parse_message(message)
|
||||||
if parsed_data:
|
if parsed_data:
|
||||||
response = self.handle_command(parsed_data)
|
response = self.handle_command(parsed_data)
|
||||||
if response:
|
if response:
|
||||||
logger.debug(f"TX: {' '.join(f'{b:02X}' for b in response)}")
|
self.write_to_serial(response)
|
||||||
self.serial_conn.write(response)
|
|
||||||
|
|
||||||
# Handle EOT - close communication
|
# Handle EOT - close communication
|
||||||
if response == bytes([EOT]):
|
if response == bytes([EOT]):
|
||||||
|
|||||||
Reference in New Issue
Block a user