From 17958f837a5bf7b253462d8efc925b4ab77757ff Mon Sep 17 00:00:00 2001 From: Eden Kirin Date: Mon, 10 Nov 2025 09:11:56 +0100 Subject: [PATCH] Update drivers --- source/common.py | 20 +++++++ source/glory.py | 31 +++++----- source/jetsort.py | 6 +- source/pelican.py | 10 ++-- source/selex.py | 146 ++++++++++++++++++++++++++-------------------- 5 files changed, 126 insertions(+), 87 deletions(-) create mode 100644 source/common.py diff --git a/source/common.py b/source/common.py new file mode 100644 index 0000000..0131b33 --- /dev/null +++ b/source/common.py @@ -0,0 +1,20 @@ +from typing import Literal + + +def format_comm_debug( + prefix: Literal["RX", "TX"], data: bytes, include_ascii: bool = False +) -> str: + hex_representation = " ".join(f"{byte:02X}" for byte in data) + + output = [ + f"{prefix}: ", + hex_representation, + ] + + if include_ascii: + ascii_representation = "".join( + (chr(byte) if 32 <= byte <= 126 else ".") for byte in data + ) + output.append(" // ASCII: " + ascii_representation) + + return "".join(output) diff --git a/source/glory.py b/source/glory.py index ce61901..f321653 100644 --- a/source/glory.py +++ b/source/glory.py @@ -12,6 +12,8 @@ from typing import Optional import serial from loguru import logger +from source.common import format_comm_debug + # Protocol constants STX = 0x02 ETX = 0x03 @@ -151,8 +153,7 @@ class GlorySimulator: # Calculate total monetary value (sum of count × denomination for all coins) total_value = sum( - count * int(denom) - for denom, count in self.batch_counts.items() + count * int(denom) for denom, count in self.batch_counts.items() ) response = f"BT{total_value:08d}" logger.info(f"Get batch total: ${total_value/100:.2f}") @@ -162,8 +163,7 @@ class GlorySimulator: elif cmd == "GS": # Get subtotal (monetary value) total_value = sum( - count * int(denom) - for denom, count in self.sub_counts.items() + count * int(denom) for denom, count in self.sub_counts.items() ) response = f"BS{total_value:08d}" logger.info(f"Get subtotal: ${total_value/100:.2f}") @@ -172,8 +172,7 @@ class GlorySimulator: elif cmd == "GG": # Get grand total (monetary value) total_value = sum( - count * int(denom) - for denom, count in self.grand_counts.items() + count * int(denom) for denom, count in self.grand_counts.items() ) response = f"BG{total_value:08d}" logger.info(f"Get grand total: ${total_value/100:.2f}") @@ -195,7 +194,9 @@ class GlorySimulator: denom_value = int(denom_str) # e.g., "001" = 1 cent, "025" = 25 cents monetary_value = count * denom_value response = f"BT{monetary_value:08d}" - logger.info(f"Get batch value for {denom_str}: {count} coins × ${denom_value/100:.2f} = ${monetary_value/100:.2f}") + logger.info( + f"Get batch value for {denom_str}: {count} coins × ${denom_value/100:.2f} = ${monetary_value/100:.2f}" + ) return self.create_message(response) elif cmd == "GI": @@ -270,8 +271,7 @@ class GlorySimulator: if cmd == "AB" or cmd == "Ab": # Accept batch (monetary value) total_value = sum( - count * int(denom) - for denom, count in self.batch_counts.items() + count * int(denom) for denom, count in self.batch_counts.items() ) response = f"BT{total_value:08d}" logger.info(f"Accept batch: ${total_value/100:.2f}") @@ -281,8 +281,7 @@ class GlorySimulator: elif cmd == "AS" or cmd == "As": # Accept subtotal (monetary value) total_value = sum( - count * int(denom) - for denom, count in self.sub_counts.items() + count * int(denom) for denom, count in self.sub_counts.items() ) response = f"BS{total_value:08d}" logger.info(f"Accept subtotal: ${total_value/100:.2f}") @@ -292,8 +291,7 @@ class GlorySimulator: elif cmd == "AG" or cmd == "Ag": # Accept grand total (monetary value) total_value = sum( - count * int(denom) - for denom, count in self.grand_counts.items() + count * int(denom) for denom, count in self.grand_counts.items() ) response = f"BG{total_value:08d}" logger.info(f"Accept grand total: ${total_value/100:.2f}") @@ -483,8 +481,9 @@ class GlorySimulator: message = bytes(buffer[stx_idx : etx_idx + 1]) buffer = buffer[etx_idx + 1 :] + # logger.debug(f"RX: {' '.join(f'{b:02X}' for b in message)}") logger.debug( - f"RX: {' '.join(f'{b:02X}' for b in message)}" + format_comm_debug("RX", message, include_ascii=True) ) # Parse and handle message @@ -493,7 +492,9 @@ class GlorySimulator: response = self.handle_command(parsed_data) if response: logger.debug( - f"TX: {' '.join(f'{b:02X}' for b in response)}" + format_comm_debug( + "TX", response, include_ascii=True + ) ) self.serial_conn.write(response) except ValueError: diff --git a/source/jetsort.py b/source/jetsort.py index 40c74df..ab9e68b 100644 --- a/source/jetsort.py +++ b/source/jetsort.py @@ -16,6 +16,8 @@ from typing import Optional import serial from loguru import logger +from .common import format_comm_debug + # Protocol constants STX = 0x02 ETX = 0x03 @@ -410,7 +412,7 @@ class JetSortSimulator: packet = self._create_packet(REPORT_SUB_BATCH, report_data) self.serial_conn.write(packet) - logger.debug(f"TX: {' '.join(f'{b:02X}' for b in packet[:50])}...") + logger.debug(format_comm_debug("TX", packet[:50]) + "...") logger.info("Report sent successfully") # Reset batch counters after sending @@ -486,7 +488,7 @@ class JetSortSimulator: if self.serial_conn.in_waiting > 0: data = self.serial_conn.read(self.serial_conn.in_waiting) - logger.debug(f"RX: {' '.join(f'{b:02X}' for b in data)}") + logger.debug(format_comm_debug("RX", data)) # Check for ENQ (poll) if ENQ in data: diff --git a/source/pelican.py b/source/pelican.py index 756cd4e..10ba1f4 100644 --- a/source/pelican.py +++ b/source/pelican.py @@ -11,6 +11,8 @@ from typing import Optional import serial from loguru import logger +from .common import format_comm_debug + # Protocol constants STX = 0x02 ETX = 0x03 @@ -640,18 +642,14 @@ class PelicanSimulator: message = bytes(buffer[stx_idx : etx_idx + 1]) buffer = buffer[etx_idx + 1 :] - logger.debug( - f"RX: {' '.join(f'{b:02X}' for b in message)}" - ) + logger.debug(format_comm_debug("RX", message)) # Parse and handle message parsed_data = self.parse_message(message) if parsed_data: response = self.handle_command(parsed_data) if response: - logger.debug( - f"TX: {' '.join(f'{b:02X}' for b in response)}" - ) + logger.debug(format_comm_debug("TX", response)) self.serial_conn.write(response) except ValueError: pass # ETX not found yet diff --git a/source/selex.py b/source/selex.py index 7f83604..44ec42a 100644 --- a/source/selex.py +++ b/source/selex.py @@ -13,13 +13,15 @@ from typing import Optional import serial from loguru import logger +from .common import format_comm_debug + # Protocol constants -STX = ord('s') # 0x73 -ETX = ord('e') # 0x65 -CRC = ord('v') # 0x76 -CAN = ord('c') # 0x63 -ACK = ord('a') # 0x61 -EOT = ord('t') # 0x74 +STX = ord("s") # 0x73 +ETX = ord("e") # 0x65 +CRC = ord("v") # 0x76 +CAN = ord("c") # 0x63 +ACK = ord("a") # 0x61 +EOT = ord("t") # 0x74 # Machine states STATE_IDLE = "01" # Machine stops with finished counting @@ -106,8 +108,10 @@ class SelexSimulator: # Add 10 to first digit to indicate rejected coins status_str = str(int(status_str[0]) + 1) + status_str[1] - response = b'101' + status_str.encode() + self.error_code.encode() - logger.info(f"Status request: state={self.machine_state}, error={self.error_code}, rejected={self.has_rejected_coins}") + response = b"101" + status_str.encode() + self.error_code.encode() + logger.info( + f"Status request: state={self.machine_state}, error={self.error_code}, rejected={self.has_rejected_coins}" + ) return self.create_message(response) def handle_start_counting(self, data: bytes) -> bytes: @@ -140,10 +144,14 @@ class SelexSimulator: logger.info("Resetting main total (not used in this simulator)") return bytes([ACK]) + def write_to_serial(self, data: bytes) -> None: + logger.debug(format_comm_debug("TX", data, include_ascii=True)) + self.serial_conn.write(data) + def handle_actual_counters_request(self, data: bytes) -> bytes: """Handle actual counting counters request (s400ve or sc00ve for checksum)""" - cmd_char = chr(data[0]) if len(data) > 0 else '4' - with_checksum = (cmd_char == 'c') + cmd_char = chr(data[0]) if len(data) > 0 else "4" + with_checksum = cmd_char == "c" logger.info(f"Actual counters request (checksum={with_checksum})") @@ -153,15 +161,17 @@ class SelexSimulator: self._simulate_counting() # First ACK - self.serial_conn.write(bytes([ACK])) + ack_msg = bytes([ACK]) + self.write_to_serial(ack_msg) time.sleep(0.01) # Send each line's count for line_num in range(1, self.num_lines + 1): count = self.actual_counts[line_num - 1] - cmd = b'c' if with_checksum else b'4' - response = cmd + b'0' + f"{line_num:02d}{count:06d}".encode() - self.serial_conn.write(self.create_message(response)) + cmd = b"c" if with_checksum else b"4" + response = cmd + b"0" + f"{line_num:02d}{count:06d}".encode() + response_msg = self.create_message(response) + self.write_to_serial(response_msg) logger.info(f" Line {line_num}: {count} coins") time.sleep(0.01) # Wait for ACK @@ -170,8 +180,9 @@ class SelexSimulator: # Send checksum if requested if with_checksum: total = sum(self.actual_counts) - response = b'c099' + f"{total:06d}".encode() - self.serial_conn.write(self.create_message(response)) + response = b"c099" + f"{total:06d}".encode() + response_msg = self.create_message(response) + self.write_to_serial(response_msg) logger.info(f" Total checksum: {total} coins") time.sleep(0.01) # Wait for ACK @@ -190,21 +201,23 @@ class SelexSimulator: def handle_partial_totals_request(self, data: bytes) -> bytes: """Handle partial total counters request (s600ve or sd00ve for checksum)""" - cmd_char = chr(data[0]) if len(data) > 0 else '6' - with_checksum = (cmd_char == 'd') + cmd_char = chr(data[0]) if len(data) > 0 else "6" + with_checksum = cmd_char == "d" logger.info(f"Partial totals request (checksum={with_checksum})") # First ACK - self.serial_conn.write(bytes([ACK])) + ack_msg = bytes([ACK]) + self.write_to_serial(ack_msg) time.sleep(0.01) # Send each line's partial total for line_num in range(1, self.num_lines + 1): count = self.partial_totals[line_num - 1] - cmd = b'd' if with_checksum else b'6' - response = cmd + b'0' + f"{line_num:02d}{count:06d}".encode() - self.serial_conn.write(self.create_message(response)) + cmd = b"d" if with_checksum else b"6" + response = cmd + b"0" + f"{line_num:02d}{count:06d}".encode() + response_msg = self.create_message(response) + self.write_to_serial(response_msg) logger.info(f" Line {line_num}: {count} coins (partial total)") time.sleep(0.01) # Wait for ACK @@ -213,8 +226,9 @@ class SelexSimulator: # Send checksum if requested if with_checksum: total = sum(self.partial_totals) - response = b'd099' + f"{total:06d}".encode() - self.serial_conn.write(self.create_message(response)) + response = b"d099" + f"{total:06d}".encode() + response_msg = self.create_message(response) + self.write_to_serial(response_msg) logger.info(f" Total checksum: {total} coins") time.sleep(0.01) # Wait for ACK @@ -231,21 +245,23 @@ class SelexSimulator: def handle_batch_values_request(self, data: bytes) -> bytes: """Handle batching value request (s900ve or sf00ve for checksum)""" - cmd_char = chr(data[0]) if len(data) > 0 else '9' - with_checksum = (cmd_char == 'f') + cmd_char = chr(data[0]) if len(data) > 0 else "9" + with_checksum = cmd_char == "f" logger.info(f"Batch values request (checksum={with_checksum})") # First ACK - self.serial_conn.write(bytes([ACK])) + ack_msg = bytes([ACK]) + self.write_to_serial(ack_msg) time.sleep(0.01) # Send each line's batch value for line_num in range(1, self.num_lines + 1): batch_val = self.batch_values[line_num - 1] - cmd = b'f' if with_checksum else b'9' - response = cmd + b'0' + f"{line_num:02d}{batch_val:06d}".encode() - self.serial_conn.write(self.create_message(response)) + cmd = b"f" if with_checksum else b"9" + response = cmd + b"0" + f"{line_num:02d}{batch_val:06d}".encode() + response_msg = self.create_message(response) + self.write_to_serial(response_msg) logger.info(f" Line {line_num}: {batch_val} coins (batch threshold)") time.sleep(0.01) # Wait for ACK @@ -254,8 +270,9 @@ class SelexSimulator: # Send checksum if requested if with_checksum: total = sum(self.batch_values) - response = b'f099' + f"{total:06d}".encode() - self.serial_conn.write(self.create_message(response)) + response = b"f099" + f"{total:06d}".encode() + response_msg = self.create_message(response) + self.write_to_serial(response_msg) logger.info(f" Total checksum: {total} coins") time.sleep(0.01) # Wait for ACK @@ -270,7 +287,7 @@ class SelexSimulator: return bytes([CAN]) try: - line_str = data[1:3].decode('ascii') + line_str = data[1:3].decode("ascii") line_num = int(line_str) if 1 <= line_num <= self.num_lines: self.lines_enabled[line_num - 1] = False @@ -287,7 +304,7 @@ class SelexSimulator: return bytes([CAN]) try: - line_str = data[1:3].decode('ascii') + line_str = data[1:3].decode("ascii") line_num = int(line_str) if 1 <= line_num <= self.num_lines: self.lines_enabled[line_num - 1] = True @@ -301,13 +318,13 @@ class SelexSimulator: def handle_software_version(self, data: bytes) -> bytes: """Handle software version request (sA00ve)""" logger.info(f"Software version request: {self.sw_version}") - response = self.sw_version.encode() + b'\r\n' + response = self.sw_version.encode() + b"\r\n" return response def handle_serial_number(self, data: bytes) -> bytes: """Handle serial number request (su00ve)""" logger.info(f"Serial number request: {self.serial_number}") - response = self.serial_number.encode() + b'\r\n' + response = self.serial_number.encode() + b"\r\n" return response def _simulate_counting(self): @@ -319,11 +336,11 @@ class SelexSimulator: (20, 150), # Line 1 - high count (15, 120), # Line 2 (10, 100), # Line 3 - (10, 80), # Line 4 - (5, 60), # Line 5 - (5, 50), # Line 6 - (1, 30), # Line 7 - lower count - (1, 20), # Line 8 - lowest count + (10, 80), # Line 4 + (5, 60), # Line 5 + (5, 50), # Line 6 + (1, 30), # Line 7 - lower count + (1, 20), # Line 8 - lowest count ] total_coins = 0 @@ -353,77 +370,77 @@ class SelexSimulator: return bytes([CAN]) # If machine is counting, only accept status requests - if self.is_counting and data[0] != ord('1'): + if self.is_counting and data[0] != ord("1"): logger.warning("Machine is counting - only status requests accepted") return bytes([CAN]) cmd = chr(data[0]) - sub_cmd = data[1:3].decode('ascii', errors='ignore') + sub_cmd = data[1:3].decode("ascii", errors="ignore") logger.info(f"Received command: {cmd}{sub_cmd}") # Status requests - if cmd == '1' and sub_cmd == '00': + if cmd == "1" and sub_cmd == "00": return self.handle_status_request(data) # Start counting - elif cmd == '2' and sub_cmd == '00': + elif cmd == "2" and sub_cmd == "00": return self.handle_start_counting(data) # Main total reset - elif cmd == '3' and sub_cmd == '00': + elif cmd == "3" and sub_cmd == "00": return self.handle_main_total_reset(data) # Actual counters request - elif cmd == '4' and sub_cmd == '00': + elif cmd == "4" and sub_cmd == "00": return self.handle_actual_counters_request(data) # Actual counters with checksum - elif cmd == 'c' and sub_cmd == '00': + elif cmd == "c" and sub_cmd == "00": return self.handle_actual_counters_request(data) # Uncompleted batch reset - elif cmd == '5' and sub_cmd == '00': + elif cmd == "5" and sub_cmd == "00": return self.handle_batch_reset(data) # Partial totals request - elif cmd == '6' and sub_cmd == '00': + elif cmd == "6" and sub_cmd == "00": return self.handle_partial_totals_request(data) # Partial totals with checksum - elif cmd == 'd' and sub_cmd == '00': + elif cmd == "d" and sub_cmd == "00": return self.handle_partial_totals_request(data) # Partial total reset - elif cmd == '7' and sub_cmd == '00': + elif cmd == "7" and sub_cmd == "00": return self.handle_partial_total_reset(data) # Stop counting - elif cmd == '8' and sub_cmd == '00': + elif cmd == "8" and sub_cmd == "00": return self.handle_stop_counting(data) # Batch values request - elif cmd == '9' and sub_cmd == '00': + elif cmd == "9" and sub_cmd == "00": return self.handle_batch_values_request(data) # Batch values with checksum - elif cmd == 'f' and sub_cmd == '00': + elif cmd == "f" and sub_cmd == "00": return self.handle_batch_values_request(data) # Line disable - elif cmd == 'C': + elif cmd == "C": return self.handle_line_disable(data) # Line enable - elif cmd == 'D': + elif cmd == "D": return self.handle_line_enable(data) # Software version - elif cmd == 'A' and sub_cmd == '00': + elif cmd == "A" and sub_cmd == "00": return self.handle_software_version(data) # Serial number - elif cmd == 'u' and sub_cmd == '00': + elif cmd == "u" and sub_cmd == "00": return self.handle_serial_number(data) else: @@ -459,18 +476,19 @@ class SelexSimulator: stx_idx = buffer.index(STX) try: etx_idx = buffer.index(ETX, stx_idx) - message = bytes(buffer[stx_idx:etx_idx + 1]) - buffer = buffer[etx_idx + 1:] + message = bytes(buffer[stx_idx : etx_idx + 1]) + buffer = buffer[etx_idx + 1 :] - logger.debug(f"RX: {' '.join(f'{b:02X}' for b in message)}") + logger.debug( + format_comm_debug("RX", message, include_ascii=True) + ) # Parse and handle message parsed_data = self.parse_message(message) if parsed_data: response = self.handle_command(parsed_data) if response: - logger.debug(f"TX: {' '.join(f'{b:02X}' for b in response)}") - self.serial_conn.write(response) + self.write_to_serial(response) # Handle EOT - close communication if response == bytes([EOT]):