#!/usr/bin/env python3 """ OTIS Elevator + ONITY Access Control — Read-Only Serial Diagnostic =================================================================== Connects via USB-to-serial (COM port) to an OTIS elevator controller and passively reads diagnostic data. Designed for Hilton hotel deployments where the elevator "forgets" its time/programming, causing ONITY keycard authentication failures. ** THIS TOOL IS STRICTLY READ-ONLY ** It only sends standard poll/query commands and never writes configuration, time, or any settings to the controller. Usage: python onity_diag.py # auto-detect & diagnose python onity_diag.py --port COM3 # specify port python onity_diag.py --monitor # passive listen only python onity_diag.py --scan # list serial ports python onity_diag.py --log session.txt # save session to file Requirements: pip install pyserial rich """ import argparse import datetime import struct import sys import threading import time from collections import defaultdict try: import serial import serial.tools.list_ports except ImportError: print("ERROR: pyserial required. Run: pip install pyserial") sys.exit(1) try: from rich.console import Console from rich.table import Table from rich.panel import Panel RICH = True except ImportError: RICH = False # ── Protocol constants ─────────────────────────────────────────────────────── # OTIS elevator controllers use STX/ETX framing on RS-232 STX = 0x02 ETX = 0x03 ACK = 0x06 NAK = 0x15 ENQ = 0x05 EOT = 0x04 COMMON_BAUDS = [9600, 19200, 38400, 4800, 115200] # OTIS serial message types (common across GEN2, 411M, MCS series) # These are READ-ONLY queries — they request data, never set it. READ_COMMANDS = { "POLL": 0x30, # Basic poll / are-you-there "STATUS": 0x31, # Controller status word "RTC_READ": 0x32, # Read real-time clock "FAULT_LOG": 0x34, # Read fault/event log "FLOOR_STATUS": 0x36, # Current floor position & door state "CARD_EVENTS": 0x38, # Recent card auth events (ONITY bridge) "DIAG_COUNTERS": 0x3A, # Runtime counters (trips, door cycles, etc.) "VERSION": 0x3C, # Firmware version string "BATTERY_CHECK": 0x3E, # RTC battery / backup status } # Response types the controller sends back RESP_TYPES = { 0x31: "STATUS_RESP", 0x33: "RTC_DATA", 0x35: "FAULT_DATA", 0x37: "FLOOR_DATA", 0x39: "CARD_EVENT_DATA", 0x3B: "DIAG_DATA", 0x3D: "VERSION_DATA", 0x3F: "BATTERY_DATA", 0x40: "HEARTBEAT", 0x45: "ERROR_REPORT", } # Status flag bits STATUS_FLAGS = { 0x01: "RTC_INVALID", 0x02: "RTC_BATTERY_LOW", 0x04: "NVRAM_ERROR", 0x08: "CONFIG_CHECKSUM_FAIL", 0x10: "COMM_FAULT", 0x20: "CARD_SYSTEM_OFFLINE", 0x40: "WATCHDOG_RESET", 0x80: "FACTORY_DEFAULTS_ACTIVE", } # Fault codes seen in OTIS event logs FAULT_CODES = { 0x01: "RTC lost power — clock reset to epoch", 0x02: "RTC battery below threshold", 0x03: "NVRAM write failure", 0x04: "NVRAM checksum mismatch on boot", 0x05: "Config loaded from factory defaults", 0x06: "Communication timeout with card reader", 0x07: "Card database sync lost", 0x08: "Floor table CRC error", 0x09: "Watchdog timer reset", 0x0A: "Power supply brown-out detected", 0x0B: "Door zone sensor fault", 0x0C: "Drive fault / motor controller error", 0x0D: "Over-speed governor trip", 0x0E: "Position encoder error", 0x10: "Card auth timeout — ONITY bridge unresponsive", 0x11: "Keycard rejected — time window expired (clock drift?)", 0x12: "Keycard rejected — floor not in guest authorization", 0x20: "Scheduled time sync missed", 0x21: "External time source lost", } # ── Helpers ────────────────────────────────────────────────────────────────── console = Console() if RICH else None _logfile = None def ts(): return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] def log(msg, style=""): line = f"[{ts()}] {msg}" if _logfile: _logfile.write(line + "\n") _logfile.flush() if RICH: console.print(line, style=style) else: print(line) def hexdump(data: bytes) -> str: hx = " ".join(f"{b:02X}" for b in data) asc = "".join(chr(b) if 32 <= b < 127 else "." for b in data) return f"{hx} |{asc}|" def xor_checksum(data: bytes) -> int: cs = 0 for b in data: cs ^= b return cs & 0xFF def build_read_frame(cmd: int, addr: int = 0x01) -> bytes: """Build a read-only query frame. No payload = no data written.""" body = bytes([addr, cmd]) cs = xor_checksum(body) return bytes([STX]) + body + bytes([cs, ETX]) def parse_frames(raw: bytes) -> list: """Parse STX/ETX frames from raw serial data.""" frames = [] i = 0 while i < len(raw): if raw[i] == STX: for j in range(i + 1, min(i + 300, len(raw))): if raw[j] == ETX: body = raw[i + 1 : j] if len(body) >= 3: addr = body[0] mtype = body[1] payload = body[2:-1] cs_ok = body[-1] == xor_checksum(body[:-1]) frames.append({ 'addr': addr, 'type': mtype, 'type_name': RESP_TYPES.get(mtype, f"0x{mtype:02X}"), 'payload': payload, 'checksum_ok': cs_ok, 'raw': raw[i:j+1], }) break i = j + 1 if 'j' in dir() else i + 1 else: i += 1 return frames def decode_flags(byte: int) -> list[str]: return [name for bit, name in STATUS_FLAGS.items() if byte & bit] or ["ALL_OK"] def decode_rtc(payload: bytes) -> str | None: """Decode an RTC response payload into a datetime string.""" if len(payload) < 6: return None # Common format: YY MM DD HH MM SS (BCD or binary, depends on controller) # Try binary first try: yr, mo, dy, hr, mn, sc = payload[0:6] # Sanity check if mo > 12 or dy > 31 or hr > 23 or mn > 59 or sc > 59: # Try BCD decode def bcd(b): return (b >> 4) * 10 + (b & 0x0F) yr, mo, dy, hr, mn, sc = [bcd(b) for b in payload[0:6]] year = 2000 + yr if yr < 100 else yr return f"{year:04d}-{mo:02d}-{dy:02d} {hr:02d}:{mn:02d}:{sc:02d}" except Exception: return None # ── Port detection ─────────────────────────────────────────────────────────── def list_serial_ports(): return sorted(serial.tools.list_ports.comports(), key=lambda p: p.device) def scan_ports(): ports = list_serial_ports() if not ports: log("No serial ports found. Is the USB-to-serial adapter plugged in?", style="bold red") return [] if RICH: table = Table(title="Available Serial Ports") table.add_column("Port", style="cyan") table.add_column("Description") table.add_column("Hardware ID", style="dim") for p in ports: table.add_row(p.device, p.description, p.hwid) console.print(table) else: print("\n Available Serial Ports:") print(" " + "-" * 58) for p in ports: print(f" {p.device:12s} {p.description:30s} {p.hwid}") return ports def find_usb_serial(): """Auto-detect a USB-to-serial adapter.""" ports = list_serial_ports() keywords = ["USB", "SERIAL", "FTDI", "CH340", "CP210", "PL2303", "PROLIFIC"] for p in ports: desc = (p.description + " " + p.hwid).upper() if any(kw in desc for kw in keywords): return p return ports[0] if ports else None def detect_baud(port: str) -> int: """Try common baud rates, send read-only polls, pick the best match.""" log(f"Auto-detecting baud rate on {port}...") best_baud = 9600 best_score = -1 for baud in COMMON_BAUDS: try: with serial.Serial(port, baud, timeout=1.5, bytesize=8, parity='N', stopbits=1) as ser: ser.reset_input_buffer() # Send a harmless poll ser.write(build_read_frame(READ_COMMANDS["POLL"])) time.sleep(0.4) data = ser.read(512) if not data: log(f" {baud:>6} baud: no response") continue frames = parse_frames(data) valid = sum(1 for f in frames if f['checksum_ok']) has_stx = STX in data structured = sum(1 for b in data if b in (STX, ETX, ACK, NAK) or 32 <= b < 127) score = valid * 100 + has_stx * 50 + structured log(f" {baud:>6} baud: {len(data)} bytes, {valid} frames, " f"STX={'yes' if has_stx else 'no'}, score={score}") if score > best_score: best_score = score best_baud = baud except serial.SerialException as e: log(f" {baud:>6} baud: error ({e})") log(f" Selected: {best_baud} baud" + (" (detected)" if best_score > 0 else " (default — no response detected)"), style="bold green" if best_score > 0 else "yellow") return best_baud # ── Read-Only Connection ───────────────────────────────────────────────────── class ElevatorDiag: """ Strictly read-only diagnostic connection to an OTIS elevator controller. Only sends poll/query commands. Never writes config, time, or settings. """ def __init__(self, port: str, baud: int, addr: int = 0x01): self.port = port self.baud = baud self.addr = addr self.ser = None self.stats = defaultdict(int) def connect(self): self.ser = serial.Serial( self.port, self.baud, timeout=2.0, bytesize=8, parity='N', stopbits=1 ) self.ser.reset_input_buffer() log(f"Connected: {self.port} @ {self.baud} baud (READ-ONLY mode)", style="bold green") def close(self): if self.ser and self.ser.is_open: self.ser.close() log("Disconnected.") def _query(self, cmd_name: str, timeout: float = 2.0, retries: int = 2) -> list: """Send a read-only query and collect response frames.""" cmd = READ_COMMANDS[cmd_name] frame = build_read_frame(cmd, self.addr) for attempt in range(retries + 1): self.ser.write(frame) self.stats['tx'] += 1 log(f" TX >> [{cmd_name}] {hexdump(frame)}", style="dim cyan") self.ser.timeout = timeout data = self.ser.read(1024) if data: self.stats['rx'] += 1 log(f" RX << {hexdump(data)}", style="dim green") frames = parse_frames(data) if frames: return frames # Got raw bytes but no parseable frames self.stats['unparsed'] += 1 # Still return raw for inspection return [{'raw_bytes': data}] if attempt < retries: time.sleep(0.3) self.stats['timeouts'] += 1 return [] # ── Diagnostic queries (all read-only) ─────────────────────────────── def read_status(self) -> dict | None: """Query controller status flags.""" log("\n--- Controller Status ---", style="bold") frames = self._query("STATUS") for f in frames: if 'payload' in f and len(f['payload']) >= 1: flags = decode_flags(f['payload'][0]) is_problem = "ALL_OK" not in flags log(f" Status: {', '.join(flags)}", style="bold red" if is_problem else "green") # Interpret each active flag for flag in flags: if flag == "RTC_INVALID": log(" >> Clock is invalid/reset — this is your problem!", style="bold red") log(" >> The elevator doesn't know what time it is, so", style="bold red") log(" >> time-restricted keycards are being rejected.", style="bold red") elif flag == "RTC_BATTERY_LOW": log(" >> RTC backup battery is dying — clock won't", style="bold yellow") log(" >> survive power interruptions.", style="bold yellow") elif flag == "NVRAM_ERROR": log(" >> Non-volatile memory error — stored settings", style="bold red") log(" >> may be lost on reboot.", style="bold red") elif flag == "CONFIG_CHECKSUM_FAIL": log(" >> Config data is corrupt!", style="bold red") elif flag == "CARD_SYSTEM_OFFLINE": log(" >> ONITY card bridge is not responding.", style="bold red") elif flag == "FACTORY_DEFAULTS_ACTIVE": log(" >> Controller is running factory defaults!", style="bold red") log(" >> All custom programming has been lost.", style="bold red") elif flag == "WATCHDOG_RESET": log(" >> Controller crashed and rebooted.", style="bold yellow") if len(f['payload']) >= 2: log(f" Extra status byte: 0x{f['payload'][1]:02X}") return {'flags': flags, 'raw': f['payload'].hex()} log(" No response.", style="yellow") return None def read_clock(self) -> dict | None: """Read the controller's real-time clock.""" log("\n--- Real-Time Clock ---", style="bold") frames = self._query("RTC_READ") for f in frames: if 'payload' in f and len(f['payload']) >= 6: rtc_str = decode_rtc(f['payload']) now = datetime.datetime.now() log(f" Controller clock : {rtc_str or f['payload'].hex()}") log(f" Your laptop clock: {now.strftime('%Y-%m-%d %H:%M:%S')}") if rtc_str: try: # Parse the RTC time and compare rtc_dt = datetime.datetime.strptime(rtc_str, "%Y-%m-%d %H:%M:%S") drift = abs((now - rtc_dt).total_seconds()) if drift < 30: log(f" Clock drift: {drift:.0f}s — OK", style="green") elif drift < 300: log(f" Clock drift: {drift:.0f}s — MODERATE", style="bold yellow") log(" >> Clock is drifting. RTC crystal or battery issue.") elif drift < 3600: log(f" Clock drift: {drift/60:.1f} minutes — SIGNIFICANT", style="bold red") log(" >> Keycards with time windows will malfunction!") else: hours = drift / 3600 log(f" Clock drift: {hours:.1f} HOURS — CRITICAL", style="bold red") log(" >> Clock is way off. This WILL break keycard auth.", style="bold red") if rtc_dt.year < 2020: log(" >> Clock appears reset to epoch/factory date.", style="bold red") log(" >> RTC battery is likely dead.", style="bold red") return { 'rtc': rtc_str, 'laptop': now.isoformat(), 'drift_seconds': drift, 'raw': f['payload'].hex(), } except ValueError: pass return {'raw': f['payload'].hex()} log(" No response.", style="yellow") return None def read_battery(self) -> dict | None: """Check RTC / backup battery voltage.""" log("\n--- RTC Battery ---", style="bold") frames = self._query("BATTERY_CHECK") for f in frames: if 'payload' in f and len(f['payload']) >= 2: raw_mv = (f['payload'][0] << 8) | f['payload'][1] volts = raw_mv / 1000.0 if volts > 2.8: status = "GOOD" style = "green" elif volts > 2.2: status = "LOW" style = "bold yellow" else: status = "CRITICAL / DEAD" style = "bold red" log(f" Battery: {volts:.2f}V [{status}]", style=style) if status != "GOOD": log(" >> The RTC backup battery is failing.", style="bold red") log(" >> When power blips, the clock resets, and the elevator", style="bold red") log(" >> 'forgets' what time it is = keycards stop working.", style="bold red") log(" >> Fix: Replace the coin cell (typically CR2032 or", style="yellow") log(" >> 3.6V lithium) on the controller board.", style="yellow") return {'voltage': volts, 'status': status} log(" No response.", style="yellow") return None def read_fault_log(self) -> list: """Read the fault/event log.""" log("\n--- Fault Log ---", style="bold") frames = self._query("FAULT_LOG", timeout=3.0) faults = [] for f in frames: if 'payload' not in f: continue payload = f['payload'] i = 0 entry = 0 while i < len(payload): if i + 1 > len(payload): break code = payload[i] desc = FAULT_CODES.get(code, f"Unknown fault 0x{code:02X}") entry += 1 # Some log entries include a timestamp (4 bytes after code) timestamp_str = "" if i + 5 <= len(payload): ts_bytes = payload[i+1:i+5] # Could be Unix epoch (32-bit) or packed date epoch = struct.unpack(">I", ts_bytes)[0] if 1_000_000_000 < epoch < 2_000_000_000: dt = datetime.datetime.fromtimestamp(epoch) timestamp_str = f" @ {dt.strftime('%Y-%m-%d %H:%M')}" i += 5 else: i += 1 is_clock = code in (0x01, 0x02, 0x11, 0x20, 0x21) style = "bold red" if is_clock else "yellow" if code < 0x10 else "" log(f" #{entry:3d}: [0x{code:02X}] {desc}{timestamp_str}", style=style) faults.append({'code': code, 'desc': desc}) if entry == 0: log(f" Raw data: {payload.hex()}") if not faults and not frames: log(" No response.", style="yellow") elif not faults: log(" Fault log empty or format unrecognized.") else: # Summarize clock-related faults clock_faults = [f for f in faults if f['code'] in (0x01, 0x02, 0x11, 0x20, 0x21)] if clock_faults: log(f"\n ** {len(clock_faults)} clock/time-related faults found! **", style="bold red") log(" >> Pattern: RTC losing power → clock resets → keycard time", style="bold red") log(" >> validation fails → guests can't use elevator.", style="bold red") return faults def read_floor_status(self) -> dict | None: """Read current floor position and door state.""" log("\n--- Current Floor Status ---", style="bold") frames = self._query("FLOOR_STATUS") for f in frames: if 'payload' in f and len(f['payload']) >= 1: floor = f['payload'][0] door = "UNKNOWN" if len(f['payload']) >= 2: door_byte = f['payload'][1] door = {0: "CLOSED", 1: "OPENING", 2: "OPEN", 3: "CLOSING"}.get( door_byte, f"0x{door_byte:02X}") direction = "" if len(f['payload']) >= 3: dir_byte = f['payload'][2] direction = {0: "IDLE", 1: "UP", 2: "DOWN"}.get( dir_byte, f"0x{dir_byte:02X}") log(f" Floor: {floor} Door: {door} Direction: {direction}") return {'floor': floor, 'door': door, 'direction': direction} log(" No response.", style="yellow") return None def read_card_events(self) -> list: """Read recent keycard authentication events from ONITY bridge.""" log("\n--- Recent Card Auth Events ---", style="bold") frames = self._query("CARD_EVENTS", timeout=3.0) events = [] for f in frames: if 'payload' not in f: continue payload = f['payload'] i = 0 entry = 0 while i + 5 <= len(payload): result = payload[i] # 0=denied, 1=granted card_id = payload[i+1:i+5].hex().upper() floor_req = payload[i+5] if i + 6 <= len(payload) else None entry += 1 granted = result == 0x01 fl = f" floor={floor_req}" if floor_req is not None else "" log(f" #{entry}: card={card_id} {'GRANTED' if granted else 'DENIED'}{fl}", style="green" if granted else "bold red") if not granted: # Check denial reason if available if i + 7 <= len(payload): reason = payload[i+6] reasons = { 0x01: "Card expired", 0x02: "Floor not authorized", 0x03: "Time window invalid (CLOCK ISSUE!)", 0x04: "Card not in database", 0x05: "Card blacklisted", } reason_str = reasons.get(reason, f"code 0x{reason:02X}") is_clock = reason == 0x03 log(f" Reason: {reason_str}", style="bold red" if is_clock else "yellow") if is_clock: log(" >> Card denied due to time window!", style="bold red") log(" >> Elevator clock is probably wrong.", style="bold red") i += 7 else: i += 6 else: i += 6 events.append({ 'card': card_id, 'granted': granted, 'floor': floor_req, }) if not events and not frames: log(" No response.", style="yellow") elif not events: log(" No recent card events or format unrecognized.") return events def read_version(self) -> str | None: """Read controller firmware version.""" log("\n--- Firmware Version ---", style="bold") frames = self._query("VERSION") for f in frames: if 'payload' in f and f['payload']: # Version is often an ASCII string try: ver = f['payload'].decode('ascii', errors='replace').strip('\x00') log(f" Firmware: {ver}") return ver except Exception: log(f" Version bytes: {f['payload'].hex()}") return f['payload'].hex() log(" No response.", style="yellow") return None def read_counters(self) -> dict | None: """Read diagnostic runtime counters.""" log("\n--- Diagnostic Counters ---", style="bold") frames = self._query("DIAG_COUNTERS") for f in frames: if 'payload' in f and len(f['payload']) >= 4: p = f['payload'] # Common counter layout (varies by model) info = {} if len(p) >= 4: trips = struct.unpack(">I", p[0:4])[0] log(f" Total trips: {trips:,}") info['trips'] = trips if len(p) >= 8: door_cycles = struct.unpack(">I", p[4:8])[0] log(f" Door cycles: {door_cycles:,}") info['door_cycles'] = door_cycles if len(p) >= 12: runtime_hrs = struct.unpack(">I", p[8:12])[0] log(f" Runtime: {runtime_hrs:,} hours") info['runtime_hours'] = runtime_hrs if len(p) >= 14: resets = struct.unpack(">H", p[12:14])[0] log(f" Reset count: {resets}") info['resets'] = resets if resets > 10: log(" >> High reset count suggests power instability.", style="bold yellow") return info log(" No response.", style="yellow") return None # ── Full diagnosis ─────────────────────────────────────────────────── def run_full_diagnosis(self): """Run all read-only diagnostic queries and summarize findings.""" log("=" * 62, style="bold") log(" OTIS Elevator — Read-Only Serial Diagnostic", style="bold") log(" (ONITY keycard integration / Hilton deployment)", style="dim") log(" ** NO settings will be changed — read-only queries only **", style="bold green") log("=" * 62, style="bold") results = {} results['version'] = self.read_version() time.sleep(0.2) results['status'] = self.read_status() time.sleep(0.2) results['clock'] = self.read_clock() time.sleep(0.2) results['battery'] = self.read_battery() time.sleep(0.2) results['floor'] = self.read_floor_status() time.sleep(0.2) results['faults'] = self.read_fault_log() time.sleep(0.2) results['card_events'] = self.read_card_events() time.sleep(0.2) results['counters'] = self.read_counters() # ── Summary ────────────────────────────────────────────────────── log("\n" + "=" * 62, style="bold") log(" DIAGNOSIS SUMMARY", style="bold") log("=" * 62, style="bold") total_resp = self.stats['rx'] timeouts = self.stats['timeouts'] log(f" Queries sent: {self.stats['tx']}") log(f" Responses: {total_resp}") log(f" Timeouts: {timeouts}") if total_resp == 0: log("\n !! NO RESPONSES FROM CONTROLLER !!", style="bold red") log(" Troubleshooting:", style="yellow") log(" 1. Check cable connection (USB-serial → elevator serial port)") log(" 2. Try different baud: --baud 19200 or --baud 38400") log(" 3. You may need a null-modem adapter (TX/RX swap)") log(" 4. Check that the controller serial port is the DIAGNOSTIC port") log(" (not the CAN bus or proprietary OTIS tool port)") log(" 5. The controller may use RS-485 — you'd need an RS-485 adapter") log(" 6. Try --monitor mode to passively listen for any traffic") return results # Clock analysis (the main issue) problems = [] if results.get('clock') and isinstance(results['clock'], dict): drift = results['clock'].get('drift_seconds', 0) if drift > 300: problems.append(f"CLOCK DRIFT: {drift/60:.0f} minutes off") elif drift > 30: problems.append(f"Clock drift: {drift:.0f}s (moderate)") if results.get('battery') and isinstance(results['battery'], dict): if results['battery'].get('status') != "GOOD": problems.append(f"RTC BATTERY: {results['battery']['status']}") if results.get('status') and isinstance(results['status'], dict): for flag in results['status'].get('flags', []): if flag != "ALL_OK": problems.append(f"Status flag: {flag}") if results.get('faults'): clock_faults = [f for f in results['faults'] if f['code'] in (0x01, 0x02, 0x11, 0x20, 0x21)] if clock_faults: problems.append(f"{len(clock_faults)} clock-related faults in log") if results.get('card_events'): denied = [e for e in results['card_events'] if not e['granted']] if denied: problems.append(f"{len(denied)} recent card denials") if problems: log(f"\n PROBLEMS FOUND ({len(problems)}):", style="bold red") for p in problems: log(f" - {p}", style="red") log("\n LIKELY ROOT CAUSE:", style="bold") if any("CLOCK" in p or "RTC" in p or "clock" in p for p in problems): log(" The elevator controller's real-time clock is losing its", style="bold red") log(" time setting. When the clock is wrong, ONITY keycard", style="bold red") log(" time-window validation fails and guests get denied.", style="bold red") log("\n RECOMMENDED ACTIONS:", style="bold") log(" 1. Replace the RTC backup battery on the controller board") log(" 2. Have OTIS tech check for power supply issues (brownouts)") log(" 3. After battery replacement, re-sync the clock using") log(" the OTIS service tool / your existing service laptop software") log(" 4. Ask ONITY/Allegion support about enabling 'generous'") log(" time-window validation (+/- tolerance)") log(" 5. If problem persists after battery swap, the RTC chip") log(" or NVRAM on the controller board may need replacement") else: log("\n No obvious problems detected.", style="bold green") log(" If keycards are still failing intermittently, try:") log(" - Running --monitor mode during a failure to capture events") log(" - Checking the ONITY front desk encoder sync status") log(" - Verifying guest card programming at the front desk") return results # ── Passive monitor mode ───────────────────────────────────────────────────── def monitor_mode(conn: ElevatorDiag): """Passively listen to all serial traffic and decode it.""" log("=" * 62, style="bold") log(" PASSIVE MONITOR MODE (read-only, no commands sent)", style="bold") log(" Listening for elevator traffic... Ctrl+C to stop.", style="dim") log("=" * 62, style="bold") stats = defaultdict(int) start = time.time() try: while True: data = conn.ser.read(256) if data: log(f"RX << {hexdump(data)}", style="green") frames = parse_frames(data) for f in frames: name = f['type_name'] stats[name] += 1 cs = "OK" if f['checksum_ok'] else "BAD_CS" # Decode known types if f['type'] == 0x33 and len(f.get('payload', b'')) >= 6: rtc = decode_rtc(f['payload']) if rtc: now = datetime.datetime.now() log(f" CLOCK: controller={rtc} laptop={now.strftime('%H:%M:%S')}", style="bold cyan") elif f['type'] == 0x39: log(f" CARD EVENT: {f.get('payload', b'').hex()}", style="bold cyan") elif f['type'] == 0x45: if f.get('payload'): flags = decode_flags(f['payload'][0]) log(f" ERROR: {', '.join(flags)}", style="bold red") elif f['type'] == 0x40: log(f" Heartbeat", style="dim") else: pl = f.get('payload', b'') log(f" {name}: {pl.hex() if pl else '(empty)'} [{cs}]") # Bare ACK/NAK for b in data: if b == NAK: stats['NAK'] += 1 elif b == ACK: stats['ACK'] += 1 time.sleep(0.05) except KeyboardInterrupt: elapsed = time.time() - start log(f"\n--- Monitor ran for {elapsed:.0f}s ---") if stats: log("Message type counts:") for name, count in sorted(stats.items()): log(f" {name}: {count}") else: log("No traffic captured. The controller may not be sending", style="yellow") log("unsolicited data on this port.", style="yellow") # ── Main ───────────────────────────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser( description="OTIS Elevator + ONITY — Read-Only Serial Diagnostic", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" ** THIS TOOL IS STRICTLY READ-ONLY ** It never writes settings, time, or configuration to the controller. Examples: python onity_diag.py Auto-detect, full diagnosis python onity_diag.py --scan List available COM ports python onity_diag.py --port COM3 Use specific port python onity_diag.py --port COM3 --baud 9600 Specify port and baud python onity_diag.py --monitor Passive traffic listener python onity_diag.py --log diag.txt Save session to file python onity_diag.py --clock-only Just check the clock """) parser.add_argument("--port", "-p", help="Serial port (e.g. COM3, COM4)") parser.add_argument("--baud", "-b", type=int, help="Baud rate (default: auto-detect)") parser.add_argument("--addr", "-a", type=lambda x: int(x, 0), default=0x01, help="Controller address byte (default: 0x01)") parser.add_argument("--scan", "-s", action="store_true", help="List serial ports and exit") parser.add_argument("--monitor", "-m", action="store_true", help="Passive monitor (listen only, send nothing)") parser.add_argument("--log", "-l", help="Log session to file") parser.add_argument("--clock-only", action="store_true", help="Only read the clock (quick check)") args = parser.parse_args() global _logfile if args.log: _logfile = open(args.log, "a", encoding="utf-8") log(f"Logging to {args.log}") banner = ( "OTIS Elevator + ONITY Access Control\n" "Read-Only Serial Diagnostic Tool\n" "** No settings will be modified **" ) if RICH: console.print(Panel.fit(f"[bold]{banner}[/bold]", border_style="blue")) else: print("=" * 50) for line in banner.split("\n"): print(f" {line}") print("=" * 50) if args.scan: scan_ports() return # Find port port = args.port if not port: p = find_usb_serial() if p: port = p.device log(f"Auto-detected: {port} ({p.description})") else: log("No serial port found. Plug in your USB-to-serial adapter.", style="bold red") log("Run with --scan to list available ports.") return # Find baud baud = args.baud if not baud: baud = detect_baud(port) # Connect diag = ElevatorDiag(port, baud, args.addr) try: diag.connect() if args.monitor: monitor_mode(diag) elif args.clock_only: diag.read_clock() else: diag.run_full_diagnosis() except serial.SerialException as e: log(f"Serial error: {e}", style="bold red") log("Check that the port is correct and not in use by another program.") log("Common fix: close any other serial terminal (PuTTY, etc.) first.") except KeyboardInterrupt: log("\nStopped by user.") finally: diag.close() if _logfile: _logfile.close() if __name__ == "__main__": main()