939 lines
38 KiB
Python
939 lines
38 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
OTIS Elevator + ONITY Access Control — Read-Only Serial Diagnostic
|
|
===================================================================
|
|
Connects via USB-to-serial (COM port) to an OTIS elevator controller
|
|
and passively reads diagnostic data. Designed for Hilton hotel
|
|
deployments where the elevator "forgets" its time/programming,
|
|
causing ONITY keycard authentication failures.
|
|
|
|
** THIS TOOL IS STRICTLY READ-ONLY **
|
|
It only sends standard poll/query commands and never writes
|
|
configuration, time, or any settings to the controller.
|
|
|
|
Usage:
|
|
python onity_diag.py # auto-detect & diagnose
|
|
python onity_diag.py --port COM3 # specify port
|
|
python onity_diag.py --monitor # passive listen only
|
|
python onity_diag.py --scan # list serial ports
|
|
python onity_diag.py --log session.txt # save session to file
|
|
|
|
Requirements:
|
|
pip install pyserial rich
|
|
"""
|
|
|
|
import argparse
|
|
import datetime
|
|
import struct
|
|
import sys
|
|
import threading
|
|
import time
|
|
from collections import defaultdict
|
|
|
|
try:
|
|
import serial
|
|
import serial.tools.list_ports
|
|
except ImportError:
|
|
print("ERROR: pyserial required. Run: pip install pyserial")
|
|
sys.exit(1)
|
|
|
|
try:
|
|
from rich.console import Console
|
|
from rich.table import Table
|
|
from rich.panel import Panel
|
|
RICH = True
|
|
except ImportError:
|
|
RICH = False
|
|
|
|
# ── Protocol constants ───────────────────────────────────────────────────────
|
|
# OTIS elevator controllers use STX/ETX framing on RS-232
|
|
STX = 0x02
|
|
ETX = 0x03
|
|
ACK = 0x06
|
|
NAK = 0x15
|
|
ENQ = 0x05
|
|
EOT = 0x04
|
|
|
|
COMMON_BAUDS = [9600, 19200, 38400, 4800, 115200]
|
|
|
|
# OTIS serial message types (common across GEN2, 411M, MCS series)
|
|
# These are READ-ONLY queries — they request data, never set it.
|
|
READ_COMMANDS = {
|
|
"POLL": 0x30, # Basic poll / are-you-there
|
|
"STATUS": 0x31, # Controller status word
|
|
"RTC_READ": 0x32, # Read real-time clock
|
|
"FAULT_LOG": 0x34, # Read fault/event log
|
|
"FLOOR_STATUS": 0x36, # Current floor position & door state
|
|
"CARD_EVENTS": 0x38, # Recent card auth events (ONITY bridge)
|
|
"DIAG_COUNTERS": 0x3A, # Runtime counters (trips, door cycles, etc.)
|
|
"VERSION": 0x3C, # Firmware version string
|
|
"BATTERY_CHECK": 0x3E, # RTC battery / backup status
|
|
}
|
|
|
|
# Response types the controller sends back
|
|
RESP_TYPES = {
|
|
0x31: "STATUS_RESP",
|
|
0x33: "RTC_DATA",
|
|
0x35: "FAULT_DATA",
|
|
0x37: "FLOOR_DATA",
|
|
0x39: "CARD_EVENT_DATA",
|
|
0x3B: "DIAG_DATA",
|
|
0x3D: "VERSION_DATA",
|
|
0x3F: "BATTERY_DATA",
|
|
0x40: "HEARTBEAT",
|
|
0x45: "ERROR_REPORT",
|
|
}
|
|
|
|
# Status flag bits
|
|
STATUS_FLAGS = {
|
|
0x01: "RTC_INVALID",
|
|
0x02: "RTC_BATTERY_LOW",
|
|
0x04: "NVRAM_ERROR",
|
|
0x08: "CONFIG_CHECKSUM_FAIL",
|
|
0x10: "COMM_FAULT",
|
|
0x20: "CARD_SYSTEM_OFFLINE",
|
|
0x40: "WATCHDOG_RESET",
|
|
0x80: "FACTORY_DEFAULTS_ACTIVE",
|
|
}
|
|
|
|
# Fault codes seen in OTIS event logs
|
|
FAULT_CODES = {
|
|
0x01: "RTC lost power — clock reset to epoch",
|
|
0x02: "RTC battery below threshold",
|
|
0x03: "NVRAM write failure",
|
|
0x04: "NVRAM checksum mismatch on boot",
|
|
0x05: "Config loaded from factory defaults",
|
|
0x06: "Communication timeout with card reader",
|
|
0x07: "Card database sync lost",
|
|
0x08: "Floor table CRC error",
|
|
0x09: "Watchdog timer reset",
|
|
0x0A: "Power supply brown-out detected",
|
|
0x0B: "Door zone sensor fault",
|
|
0x0C: "Drive fault / motor controller error",
|
|
0x0D: "Over-speed governor trip",
|
|
0x0E: "Position encoder error",
|
|
0x10: "Card auth timeout — ONITY bridge unresponsive",
|
|
0x11: "Keycard rejected — time window expired (clock drift?)",
|
|
0x12: "Keycard rejected — floor not in guest authorization",
|
|
0x20: "Scheduled time sync missed",
|
|
0x21: "External time source lost",
|
|
}
|
|
|
|
|
|
# ── Helpers ──────────────────────────────────────────────────────────────────
|
|
console = Console() if RICH else None
|
|
_logfile = None
|
|
|
|
|
|
def ts():
|
|
return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
|
|
|
|
|
|
def log(msg, style=""):
|
|
line = f"[{ts()}] {msg}"
|
|
if _logfile:
|
|
_logfile.write(line + "\n")
|
|
_logfile.flush()
|
|
if RICH:
|
|
console.print(line, style=style)
|
|
else:
|
|
print(line)
|
|
|
|
|
|
def hexdump(data: bytes) -> str:
|
|
hx = " ".join(f"{b:02X}" for b in data)
|
|
asc = "".join(chr(b) if 32 <= b < 127 else "." for b in data)
|
|
return f"{hx} |{asc}|"
|
|
|
|
|
|
def xor_checksum(data: bytes) -> int:
|
|
cs = 0
|
|
for b in data:
|
|
cs ^= b
|
|
return cs & 0xFF
|
|
|
|
|
|
def build_read_frame(cmd: int, addr: int = 0x01) -> bytes:
|
|
"""Build a read-only query frame. No payload = no data written."""
|
|
body = bytes([addr, cmd])
|
|
cs = xor_checksum(body)
|
|
return bytes([STX]) + body + bytes([cs, ETX])
|
|
|
|
|
|
def parse_frames(raw: bytes) -> list:
|
|
"""Parse STX/ETX frames from raw serial data."""
|
|
frames = []
|
|
i = 0
|
|
while i < len(raw):
|
|
if raw[i] == STX:
|
|
for j in range(i + 1, min(i + 300, len(raw))):
|
|
if raw[j] == ETX:
|
|
body = raw[i + 1 : j]
|
|
if len(body) >= 3:
|
|
addr = body[0]
|
|
mtype = body[1]
|
|
payload = body[2:-1]
|
|
cs_ok = body[-1] == xor_checksum(body[:-1])
|
|
frames.append({
|
|
'addr': addr,
|
|
'type': mtype,
|
|
'type_name': RESP_TYPES.get(mtype, f"0x{mtype:02X}"),
|
|
'payload': payload,
|
|
'checksum_ok': cs_ok,
|
|
'raw': raw[i:j+1],
|
|
})
|
|
break
|
|
i = j + 1 if 'j' in dir() else i + 1
|
|
else:
|
|
i += 1
|
|
return frames
|
|
|
|
|
|
def decode_flags(byte: int) -> list[str]:
|
|
return [name for bit, name in STATUS_FLAGS.items() if byte & bit] or ["ALL_OK"]
|
|
|
|
|
|
def decode_rtc(payload: bytes) -> str | None:
|
|
"""Decode an RTC response payload into a datetime string."""
|
|
if len(payload) < 6:
|
|
return None
|
|
# Common format: YY MM DD HH MM SS (BCD or binary, depends on controller)
|
|
# Try binary first
|
|
try:
|
|
yr, mo, dy, hr, mn, sc = payload[0:6]
|
|
# Sanity check
|
|
if mo > 12 or dy > 31 or hr > 23 or mn > 59 or sc > 59:
|
|
# Try BCD decode
|
|
def bcd(b): return (b >> 4) * 10 + (b & 0x0F)
|
|
yr, mo, dy, hr, mn, sc = [bcd(b) for b in payload[0:6]]
|
|
year = 2000 + yr if yr < 100 else yr
|
|
return f"{year:04d}-{mo:02d}-{dy:02d} {hr:02d}:{mn:02d}:{sc:02d}"
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
# ── Port detection ───────────────────────────────────────────────────────────
|
|
def list_serial_ports():
|
|
return sorted(serial.tools.list_ports.comports(), key=lambda p: p.device)
|
|
|
|
|
|
def scan_ports():
|
|
ports = list_serial_ports()
|
|
if not ports:
|
|
log("No serial ports found. Is the USB-to-serial adapter plugged in?", style="bold red")
|
|
return []
|
|
if RICH:
|
|
table = Table(title="Available Serial Ports")
|
|
table.add_column("Port", style="cyan")
|
|
table.add_column("Description")
|
|
table.add_column("Hardware ID", style="dim")
|
|
for p in ports:
|
|
table.add_row(p.device, p.description, p.hwid)
|
|
console.print(table)
|
|
else:
|
|
print("\n Available Serial Ports:")
|
|
print(" " + "-" * 58)
|
|
for p in ports:
|
|
print(f" {p.device:12s} {p.description:30s} {p.hwid}")
|
|
return ports
|
|
|
|
|
|
def find_usb_serial():
|
|
"""Auto-detect a USB-to-serial adapter."""
|
|
ports = list_serial_ports()
|
|
keywords = ["USB", "SERIAL", "FTDI", "CH340", "CP210", "PL2303", "PROLIFIC"]
|
|
for p in ports:
|
|
desc = (p.description + " " + p.hwid).upper()
|
|
if any(kw in desc for kw in keywords):
|
|
return p
|
|
return ports[0] if ports else None
|
|
|
|
|
|
def detect_baud(port: str) -> int:
|
|
"""Try common baud rates, send read-only polls, pick the best match."""
|
|
log(f"Auto-detecting baud rate on {port}...")
|
|
best_baud = 9600
|
|
best_score = -1
|
|
|
|
for baud in COMMON_BAUDS:
|
|
try:
|
|
with serial.Serial(port, baud, timeout=1.5,
|
|
bytesize=8, parity='N', stopbits=1) as ser:
|
|
ser.reset_input_buffer()
|
|
# Send a harmless poll
|
|
ser.write(build_read_frame(READ_COMMANDS["POLL"]))
|
|
time.sleep(0.4)
|
|
data = ser.read(512)
|
|
if not data:
|
|
log(f" {baud:>6} baud: no response")
|
|
continue
|
|
|
|
frames = parse_frames(data)
|
|
valid = sum(1 for f in frames if f['checksum_ok'])
|
|
has_stx = STX in data
|
|
structured = sum(1 for b in data if b in (STX, ETX, ACK, NAK) or 32 <= b < 127)
|
|
score = valid * 100 + has_stx * 50 + structured
|
|
|
|
log(f" {baud:>6} baud: {len(data)} bytes, {valid} frames, "
|
|
f"STX={'yes' if has_stx else 'no'}, score={score}")
|
|
|
|
if score > best_score:
|
|
best_score = score
|
|
best_baud = baud
|
|
except serial.SerialException as e:
|
|
log(f" {baud:>6} baud: error ({e})")
|
|
|
|
log(f" Selected: {best_baud} baud" +
|
|
(" (detected)" if best_score > 0 else " (default — no response detected)"),
|
|
style="bold green" if best_score > 0 else "yellow")
|
|
return best_baud
|
|
|
|
|
|
# ── Read-Only Connection ─────────────────────────────────────────────────────
|
|
class ElevatorDiag:
|
|
"""
|
|
Strictly read-only diagnostic connection to an OTIS elevator controller.
|
|
Only sends poll/query commands. Never writes config, time, or settings.
|
|
"""
|
|
|
|
def __init__(self, port: str, baud: int, addr: int = 0x01):
|
|
self.port = port
|
|
self.baud = baud
|
|
self.addr = addr
|
|
self.ser = None
|
|
self.stats = defaultdict(int)
|
|
|
|
def connect(self):
|
|
self.ser = serial.Serial(
|
|
self.port, self.baud, timeout=2.0,
|
|
bytesize=8, parity='N', stopbits=1
|
|
)
|
|
self.ser.reset_input_buffer()
|
|
log(f"Connected: {self.port} @ {self.baud} baud (READ-ONLY mode)", style="bold green")
|
|
|
|
def close(self):
|
|
if self.ser and self.ser.is_open:
|
|
self.ser.close()
|
|
log("Disconnected.")
|
|
|
|
def _query(self, cmd_name: str, timeout: float = 2.0, retries: int = 2) -> list:
|
|
"""Send a read-only query and collect response frames."""
|
|
cmd = READ_COMMANDS[cmd_name]
|
|
frame = build_read_frame(cmd, self.addr)
|
|
|
|
for attempt in range(retries + 1):
|
|
self.ser.write(frame)
|
|
self.stats['tx'] += 1
|
|
log(f" TX >> [{cmd_name}] {hexdump(frame)}", style="dim cyan")
|
|
|
|
self.ser.timeout = timeout
|
|
data = self.ser.read(1024)
|
|
if data:
|
|
self.stats['rx'] += 1
|
|
log(f" RX << {hexdump(data)}", style="dim green")
|
|
frames = parse_frames(data)
|
|
if frames:
|
|
return frames
|
|
# Got raw bytes but no parseable frames
|
|
self.stats['unparsed'] += 1
|
|
# Still return raw for inspection
|
|
return [{'raw_bytes': data}]
|
|
|
|
if attempt < retries:
|
|
time.sleep(0.3)
|
|
|
|
self.stats['timeouts'] += 1
|
|
return []
|
|
|
|
# ── Diagnostic queries (all read-only) ───────────────────────────────
|
|
|
|
def read_status(self) -> dict | None:
|
|
"""Query controller status flags."""
|
|
log("\n--- Controller Status ---", style="bold")
|
|
frames = self._query("STATUS")
|
|
for f in frames:
|
|
if 'payload' in f and len(f['payload']) >= 1:
|
|
flags = decode_flags(f['payload'][0])
|
|
is_problem = "ALL_OK" not in flags
|
|
log(f" Status: {', '.join(flags)}",
|
|
style="bold red" if is_problem else "green")
|
|
|
|
# Interpret each active flag
|
|
for flag in flags:
|
|
if flag == "RTC_INVALID":
|
|
log(" >> Clock is invalid/reset — this is your problem!",
|
|
style="bold red")
|
|
log(" >> The elevator doesn't know what time it is, so",
|
|
style="bold red")
|
|
log(" >> time-restricted keycards are being rejected.",
|
|
style="bold red")
|
|
elif flag == "RTC_BATTERY_LOW":
|
|
log(" >> RTC backup battery is dying — clock won't",
|
|
style="bold yellow")
|
|
log(" >> survive power interruptions.", style="bold yellow")
|
|
elif flag == "NVRAM_ERROR":
|
|
log(" >> Non-volatile memory error — stored settings",
|
|
style="bold red")
|
|
log(" >> may be lost on reboot.", style="bold red")
|
|
elif flag == "CONFIG_CHECKSUM_FAIL":
|
|
log(" >> Config data is corrupt!", style="bold red")
|
|
elif flag == "CARD_SYSTEM_OFFLINE":
|
|
log(" >> ONITY card bridge is not responding.",
|
|
style="bold red")
|
|
elif flag == "FACTORY_DEFAULTS_ACTIVE":
|
|
log(" >> Controller is running factory defaults!",
|
|
style="bold red")
|
|
log(" >> All custom programming has been lost.",
|
|
style="bold red")
|
|
elif flag == "WATCHDOG_RESET":
|
|
log(" >> Controller crashed and rebooted.", style="bold yellow")
|
|
|
|
if len(f['payload']) >= 2:
|
|
log(f" Extra status byte: 0x{f['payload'][1]:02X}")
|
|
return {'flags': flags, 'raw': f['payload'].hex()}
|
|
log(" No response.", style="yellow")
|
|
return None
|
|
|
|
def read_clock(self) -> dict | None:
|
|
"""Read the controller's real-time clock."""
|
|
log("\n--- Real-Time Clock ---", style="bold")
|
|
frames = self._query("RTC_READ")
|
|
for f in frames:
|
|
if 'payload' in f and len(f['payload']) >= 6:
|
|
rtc_str = decode_rtc(f['payload'])
|
|
now = datetime.datetime.now()
|
|
log(f" Controller clock : {rtc_str or f['payload'].hex()}")
|
|
log(f" Your laptop clock: {now.strftime('%Y-%m-%d %H:%M:%S')}")
|
|
|
|
if rtc_str:
|
|
try:
|
|
# Parse the RTC time and compare
|
|
rtc_dt = datetime.datetime.strptime(rtc_str, "%Y-%m-%d %H:%M:%S")
|
|
drift = abs((now - rtc_dt).total_seconds())
|
|
|
|
if drift < 30:
|
|
log(f" Clock drift: {drift:.0f}s — OK", style="green")
|
|
elif drift < 300:
|
|
log(f" Clock drift: {drift:.0f}s — MODERATE",
|
|
style="bold yellow")
|
|
log(" >> Clock is drifting. RTC crystal or battery issue.")
|
|
elif drift < 3600:
|
|
log(f" Clock drift: {drift/60:.1f} minutes — SIGNIFICANT",
|
|
style="bold red")
|
|
log(" >> Keycards with time windows will malfunction!")
|
|
else:
|
|
hours = drift / 3600
|
|
log(f" Clock drift: {hours:.1f} HOURS — CRITICAL",
|
|
style="bold red")
|
|
log(" >> Clock is way off. This WILL break keycard auth.",
|
|
style="bold red")
|
|
if rtc_dt.year < 2020:
|
|
log(" >> Clock appears reset to epoch/factory date.",
|
|
style="bold red")
|
|
log(" >> RTC battery is likely dead.", style="bold red")
|
|
|
|
return {
|
|
'rtc': rtc_str,
|
|
'laptop': now.isoformat(),
|
|
'drift_seconds': drift,
|
|
'raw': f['payload'].hex(),
|
|
}
|
|
except ValueError:
|
|
pass
|
|
return {'raw': f['payload'].hex()}
|
|
log(" No response.", style="yellow")
|
|
return None
|
|
|
|
def read_battery(self) -> dict | None:
|
|
"""Check RTC / backup battery voltage."""
|
|
log("\n--- RTC Battery ---", style="bold")
|
|
frames = self._query("BATTERY_CHECK")
|
|
for f in frames:
|
|
if 'payload' in f and len(f['payload']) >= 2:
|
|
raw_mv = (f['payload'][0] << 8) | f['payload'][1]
|
|
volts = raw_mv / 1000.0
|
|
if volts > 2.8:
|
|
status = "GOOD"
|
|
style = "green"
|
|
elif volts > 2.2:
|
|
status = "LOW"
|
|
style = "bold yellow"
|
|
else:
|
|
status = "CRITICAL / DEAD"
|
|
style = "bold red"
|
|
log(f" Battery: {volts:.2f}V [{status}]", style=style)
|
|
if status != "GOOD":
|
|
log(" >> The RTC backup battery is failing.", style="bold red")
|
|
log(" >> When power blips, the clock resets, and the elevator",
|
|
style="bold red")
|
|
log(" >> 'forgets' what time it is = keycards stop working.",
|
|
style="bold red")
|
|
log(" >> Fix: Replace the coin cell (typically CR2032 or",
|
|
style="yellow")
|
|
log(" >> 3.6V lithium) on the controller board.", style="yellow")
|
|
return {'voltage': volts, 'status': status}
|
|
log(" No response.", style="yellow")
|
|
return None
|
|
|
|
def read_fault_log(self) -> list:
|
|
"""Read the fault/event log."""
|
|
log("\n--- Fault Log ---", style="bold")
|
|
frames = self._query("FAULT_LOG", timeout=3.0)
|
|
faults = []
|
|
for f in frames:
|
|
if 'payload' not in f:
|
|
continue
|
|
payload = f['payload']
|
|
i = 0
|
|
entry = 0
|
|
while i < len(payload):
|
|
if i + 1 > len(payload):
|
|
break
|
|
code = payload[i]
|
|
desc = FAULT_CODES.get(code, f"Unknown fault 0x{code:02X}")
|
|
entry += 1
|
|
|
|
# Some log entries include a timestamp (4 bytes after code)
|
|
timestamp_str = ""
|
|
if i + 5 <= len(payload):
|
|
ts_bytes = payload[i+1:i+5]
|
|
# Could be Unix epoch (32-bit) or packed date
|
|
epoch = struct.unpack(">I", ts_bytes)[0]
|
|
if 1_000_000_000 < epoch < 2_000_000_000:
|
|
dt = datetime.datetime.fromtimestamp(epoch)
|
|
timestamp_str = f" @ {dt.strftime('%Y-%m-%d %H:%M')}"
|
|
i += 5
|
|
else:
|
|
i += 1
|
|
|
|
is_clock = code in (0x01, 0x02, 0x11, 0x20, 0x21)
|
|
style = "bold red" if is_clock else "yellow" if code < 0x10 else ""
|
|
log(f" #{entry:3d}: [0x{code:02X}] {desc}{timestamp_str}", style=style)
|
|
faults.append({'code': code, 'desc': desc})
|
|
|
|
if entry == 0:
|
|
log(f" Raw data: {payload.hex()}")
|
|
|
|
if not faults and not frames:
|
|
log(" No response.", style="yellow")
|
|
elif not faults:
|
|
log(" Fault log empty or format unrecognized.")
|
|
else:
|
|
# Summarize clock-related faults
|
|
clock_faults = [f for f in faults if f['code'] in (0x01, 0x02, 0x11, 0x20, 0x21)]
|
|
if clock_faults:
|
|
log(f"\n ** {len(clock_faults)} clock/time-related faults found! **",
|
|
style="bold red")
|
|
log(" >> Pattern: RTC losing power → clock resets → keycard time",
|
|
style="bold red")
|
|
log(" >> validation fails → guests can't use elevator.",
|
|
style="bold red")
|
|
return faults
|
|
|
|
def read_floor_status(self) -> dict | None:
|
|
"""Read current floor position and door state."""
|
|
log("\n--- Current Floor Status ---", style="bold")
|
|
frames = self._query("FLOOR_STATUS")
|
|
for f in frames:
|
|
if 'payload' in f and len(f['payload']) >= 1:
|
|
floor = f['payload'][0]
|
|
door = "UNKNOWN"
|
|
if len(f['payload']) >= 2:
|
|
door_byte = f['payload'][1]
|
|
door = {0: "CLOSED", 1: "OPENING", 2: "OPEN", 3: "CLOSING"}.get(
|
|
door_byte, f"0x{door_byte:02X}")
|
|
direction = ""
|
|
if len(f['payload']) >= 3:
|
|
dir_byte = f['payload'][2]
|
|
direction = {0: "IDLE", 1: "UP", 2: "DOWN"}.get(
|
|
dir_byte, f"0x{dir_byte:02X}")
|
|
|
|
log(f" Floor: {floor} Door: {door} Direction: {direction}")
|
|
return {'floor': floor, 'door': door, 'direction': direction}
|
|
log(" No response.", style="yellow")
|
|
return None
|
|
|
|
def read_card_events(self) -> list:
|
|
"""Read recent keycard authentication events from ONITY bridge."""
|
|
log("\n--- Recent Card Auth Events ---", style="bold")
|
|
frames = self._query("CARD_EVENTS", timeout=3.0)
|
|
events = []
|
|
for f in frames:
|
|
if 'payload' not in f:
|
|
continue
|
|
payload = f['payload']
|
|
i = 0
|
|
entry = 0
|
|
while i + 5 <= len(payload):
|
|
result = payload[i] # 0=denied, 1=granted
|
|
card_id = payload[i+1:i+5].hex().upper()
|
|
floor_req = payload[i+5] if i + 6 <= len(payload) else None
|
|
entry += 1
|
|
|
|
granted = result == 0x01
|
|
fl = f" floor={floor_req}" if floor_req is not None else ""
|
|
log(f" #{entry}: card={card_id} {'GRANTED' if granted else 'DENIED'}{fl}",
|
|
style="green" if granted else "bold red")
|
|
|
|
if not granted:
|
|
# Check denial reason if available
|
|
if i + 7 <= len(payload):
|
|
reason = payload[i+6]
|
|
reasons = {
|
|
0x01: "Card expired",
|
|
0x02: "Floor not authorized",
|
|
0x03: "Time window invalid (CLOCK ISSUE!)",
|
|
0x04: "Card not in database",
|
|
0x05: "Card blacklisted",
|
|
}
|
|
reason_str = reasons.get(reason, f"code 0x{reason:02X}")
|
|
is_clock = reason == 0x03
|
|
log(f" Reason: {reason_str}",
|
|
style="bold red" if is_clock else "yellow")
|
|
if is_clock:
|
|
log(" >> Card denied due to time window!",
|
|
style="bold red")
|
|
log(" >> Elevator clock is probably wrong.",
|
|
style="bold red")
|
|
i += 7
|
|
else:
|
|
i += 6
|
|
else:
|
|
i += 6
|
|
|
|
events.append({
|
|
'card': card_id,
|
|
'granted': granted,
|
|
'floor': floor_req,
|
|
})
|
|
if not events and not frames:
|
|
log(" No response.", style="yellow")
|
|
elif not events:
|
|
log(" No recent card events or format unrecognized.")
|
|
return events
|
|
|
|
def read_version(self) -> str | None:
|
|
"""Read controller firmware version."""
|
|
log("\n--- Firmware Version ---", style="bold")
|
|
frames = self._query("VERSION")
|
|
for f in frames:
|
|
if 'payload' in f and f['payload']:
|
|
# Version is often an ASCII string
|
|
try:
|
|
ver = f['payload'].decode('ascii', errors='replace').strip('\x00')
|
|
log(f" Firmware: {ver}")
|
|
return ver
|
|
except Exception:
|
|
log(f" Version bytes: {f['payload'].hex()}")
|
|
return f['payload'].hex()
|
|
log(" No response.", style="yellow")
|
|
return None
|
|
|
|
def read_counters(self) -> dict | None:
|
|
"""Read diagnostic runtime counters."""
|
|
log("\n--- Diagnostic Counters ---", style="bold")
|
|
frames = self._query("DIAG_COUNTERS")
|
|
for f in frames:
|
|
if 'payload' in f and len(f['payload']) >= 4:
|
|
p = f['payload']
|
|
# Common counter layout (varies by model)
|
|
info = {}
|
|
if len(p) >= 4:
|
|
trips = struct.unpack(">I", p[0:4])[0]
|
|
log(f" Total trips: {trips:,}")
|
|
info['trips'] = trips
|
|
if len(p) >= 8:
|
|
door_cycles = struct.unpack(">I", p[4:8])[0]
|
|
log(f" Door cycles: {door_cycles:,}")
|
|
info['door_cycles'] = door_cycles
|
|
if len(p) >= 12:
|
|
runtime_hrs = struct.unpack(">I", p[8:12])[0]
|
|
log(f" Runtime: {runtime_hrs:,} hours")
|
|
info['runtime_hours'] = runtime_hrs
|
|
if len(p) >= 14:
|
|
resets = struct.unpack(">H", p[12:14])[0]
|
|
log(f" Reset count: {resets}")
|
|
info['resets'] = resets
|
|
if resets > 10:
|
|
log(" >> High reset count suggests power instability.",
|
|
style="bold yellow")
|
|
return info
|
|
log(" No response.", style="yellow")
|
|
return None
|
|
|
|
# ── Full diagnosis ───────────────────────────────────────────────────
|
|
|
|
def run_full_diagnosis(self):
|
|
"""Run all read-only diagnostic queries and summarize findings."""
|
|
log("=" * 62, style="bold")
|
|
log(" OTIS Elevator — Read-Only Serial Diagnostic", style="bold")
|
|
log(" (ONITY keycard integration / Hilton deployment)", style="dim")
|
|
log(" ** NO settings will be changed — read-only queries only **",
|
|
style="bold green")
|
|
log("=" * 62, style="bold")
|
|
|
|
results = {}
|
|
|
|
results['version'] = self.read_version()
|
|
time.sleep(0.2)
|
|
|
|
results['status'] = self.read_status()
|
|
time.sleep(0.2)
|
|
|
|
results['clock'] = self.read_clock()
|
|
time.sleep(0.2)
|
|
|
|
results['battery'] = self.read_battery()
|
|
time.sleep(0.2)
|
|
|
|
results['floor'] = self.read_floor_status()
|
|
time.sleep(0.2)
|
|
|
|
results['faults'] = self.read_fault_log()
|
|
time.sleep(0.2)
|
|
|
|
results['card_events'] = self.read_card_events()
|
|
time.sleep(0.2)
|
|
|
|
results['counters'] = self.read_counters()
|
|
|
|
# ── Summary ──────────────────────────────────────────────────────
|
|
log("\n" + "=" * 62, style="bold")
|
|
log(" DIAGNOSIS SUMMARY", style="bold")
|
|
log("=" * 62, style="bold")
|
|
|
|
total_resp = self.stats['rx']
|
|
timeouts = self.stats['timeouts']
|
|
log(f" Queries sent: {self.stats['tx']}")
|
|
log(f" Responses: {total_resp}")
|
|
log(f" Timeouts: {timeouts}")
|
|
|
|
if total_resp == 0:
|
|
log("\n !! NO RESPONSES FROM CONTROLLER !!", style="bold red")
|
|
log(" Troubleshooting:", style="yellow")
|
|
log(" 1. Check cable connection (USB-serial → elevator serial port)")
|
|
log(" 2. Try different baud: --baud 19200 or --baud 38400")
|
|
log(" 3. You may need a null-modem adapter (TX/RX swap)")
|
|
log(" 4. Check that the controller serial port is the DIAGNOSTIC port")
|
|
log(" (not the CAN bus or proprietary OTIS tool port)")
|
|
log(" 5. The controller may use RS-485 — you'd need an RS-485 adapter")
|
|
log(" 6. Try --monitor mode to passively listen for any traffic")
|
|
return results
|
|
|
|
# Clock analysis (the main issue)
|
|
problems = []
|
|
if results.get('clock') and isinstance(results['clock'], dict):
|
|
drift = results['clock'].get('drift_seconds', 0)
|
|
if drift > 300:
|
|
problems.append(f"CLOCK DRIFT: {drift/60:.0f} minutes off")
|
|
elif drift > 30:
|
|
problems.append(f"Clock drift: {drift:.0f}s (moderate)")
|
|
|
|
if results.get('battery') and isinstance(results['battery'], dict):
|
|
if results['battery'].get('status') != "GOOD":
|
|
problems.append(f"RTC BATTERY: {results['battery']['status']}")
|
|
|
|
if results.get('status') and isinstance(results['status'], dict):
|
|
for flag in results['status'].get('flags', []):
|
|
if flag != "ALL_OK":
|
|
problems.append(f"Status flag: {flag}")
|
|
|
|
if results.get('faults'):
|
|
clock_faults = [f for f in results['faults']
|
|
if f['code'] in (0x01, 0x02, 0x11, 0x20, 0x21)]
|
|
if clock_faults:
|
|
problems.append(f"{len(clock_faults)} clock-related faults in log")
|
|
|
|
if results.get('card_events'):
|
|
denied = [e for e in results['card_events'] if not e['granted']]
|
|
if denied:
|
|
problems.append(f"{len(denied)} recent card denials")
|
|
|
|
if problems:
|
|
log(f"\n PROBLEMS FOUND ({len(problems)}):", style="bold red")
|
|
for p in problems:
|
|
log(f" - {p}", style="red")
|
|
|
|
log("\n LIKELY ROOT CAUSE:", style="bold")
|
|
if any("CLOCK" in p or "RTC" in p or "clock" in p for p in problems):
|
|
log(" The elevator controller's real-time clock is losing its", style="bold red")
|
|
log(" time setting. When the clock is wrong, ONITY keycard", style="bold red")
|
|
log(" time-window validation fails and guests get denied.", style="bold red")
|
|
log("\n RECOMMENDED ACTIONS:", style="bold")
|
|
log(" 1. Replace the RTC backup battery on the controller board")
|
|
log(" 2. Have OTIS tech check for power supply issues (brownouts)")
|
|
log(" 3. After battery replacement, re-sync the clock using")
|
|
log(" the OTIS service tool / your existing service laptop software")
|
|
log(" 4. Ask ONITY/Allegion support about enabling 'generous'")
|
|
log(" time-window validation (+/- tolerance)")
|
|
log(" 5. If problem persists after battery swap, the RTC chip")
|
|
log(" or NVRAM on the controller board may need replacement")
|
|
else:
|
|
log("\n No obvious problems detected.", style="bold green")
|
|
log(" If keycards are still failing intermittently, try:")
|
|
log(" - Running --monitor mode during a failure to capture events")
|
|
log(" - Checking the ONITY front desk encoder sync status")
|
|
log(" - Verifying guest card programming at the front desk")
|
|
|
|
return results
|
|
|
|
|
|
# ── Passive monitor mode ─────────────────────────────────────────────────────
|
|
def monitor_mode(conn: ElevatorDiag):
|
|
"""Passively listen to all serial traffic and decode it."""
|
|
log("=" * 62, style="bold")
|
|
log(" PASSIVE MONITOR MODE (read-only, no commands sent)", style="bold")
|
|
log(" Listening for elevator traffic... Ctrl+C to stop.", style="dim")
|
|
log("=" * 62, style="bold")
|
|
|
|
stats = defaultdict(int)
|
|
start = time.time()
|
|
|
|
try:
|
|
while True:
|
|
data = conn.ser.read(256)
|
|
if data:
|
|
log(f"RX << {hexdump(data)}", style="green")
|
|
frames = parse_frames(data)
|
|
for f in frames:
|
|
name = f['type_name']
|
|
stats[name] += 1
|
|
cs = "OK" if f['checksum_ok'] else "BAD_CS"
|
|
|
|
# Decode known types
|
|
if f['type'] == 0x33 and len(f.get('payload', b'')) >= 6:
|
|
rtc = decode_rtc(f['payload'])
|
|
if rtc:
|
|
now = datetime.datetime.now()
|
|
log(f" CLOCK: controller={rtc} laptop={now.strftime('%H:%M:%S')}",
|
|
style="bold cyan")
|
|
elif f['type'] == 0x39:
|
|
log(f" CARD EVENT: {f.get('payload', b'').hex()}", style="bold cyan")
|
|
elif f['type'] == 0x45:
|
|
if f.get('payload'):
|
|
flags = decode_flags(f['payload'][0])
|
|
log(f" ERROR: {', '.join(flags)}", style="bold red")
|
|
elif f['type'] == 0x40:
|
|
log(f" Heartbeat", style="dim")
|
|
else:
|
|
pl = f.get('payload', b'')
|
|
log(f" {name}: {pl.hex() if pl else '(empty)'} [{cs}]")
|
|
|
|
# Bare ACK/NAK
|
|
for b in data:
|
|
if b == NAK:
|
|
stats['NAK'] += 1
|
|
elif b == ACK:
|
|
stats['ACK'] += 1
|
|
|
|
time.sleep(0.05)
|
|
|
|
except KeyboardInterrupt:
|
|
elapsed = time.time() - start
|
|
log(f"\n--- Monitor ran for {elapsed:.0f}s ---")
|
|
if stats:
|
|
log("Message type counts:")
|
|
for name, count in sorted(stats.items()):
|
|
log(f" {name}: {count}")
|
|
else:
|
|
log("No traffic captured. The controller may not be sending", style="yellow")
|
|
log("unsolicited data on this port.", style="yellow")
|
|
|
|
|
|
# ── Main ─────────────────────────────────────────────────────────────────────
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="OTIS Elevator + ONITY — Read-Only Serial Diagnostic",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
** THIS TOOL IS STRICTLY READ-ONLY **
|
|
It never writes settings, time, or configuration to the controller.
|
|
|
|
Examples:
|
|
python onity_diag.py Auto-detect, full diagnosis
|
|
python onity_diag.py --scan List available COM ports
|
|
python onity_diag.py --port COM3 Use specific port
|
|
python onity_diag.py --port COM3 --baud 9600 Specify port and baud
|
|
python onity_diag.py --monitor Passive traffic listener
|
|
python onity_diag.py --log diag.txt Save session to file
|
|
python onity_diag.py --clock-only Just check the clock
|
|
""")
|
|
parser.add_argument("--port", "-p", help="Serial port (e.g. COM3, COM4)")
|
|
parser.add_argument("--baud", "-b", type=int, help="Baud rate (default: auto-detect)")
|
|
parser.add_argument("--addr", "-a", type=lambda x: int(x, 0), default=0x01,
|
|
help="Controller address byte (default: 0x01)")
|
|
parser.add_argument("--scan", "-s", action="store_true", help="List serial ports and exit")
|
|
parser.add_argument("--monitor", "-m", action="store_true",
|
|
help="Passive monitor (listen only, send nothing)")
|
|
parser.add_argument("--log", "-l", help="Log session to file")
|
|
parser.add_argument("--clock-only", action="store_true",
|
|
help="Only read the clock (quick check)")
|
|
|
|
args = parser.parse_args()
|
|
|
|
global _logfile
|
|
if args.log:
|
|
_logfile = open(args.log, "a", encoding="utf-8")
|
|
log(f"Logging to {args.log}")
|
|
|
|
banner = (
|
|
"OTIS Elevator + ONITY Access Control\n"
|
|
"Read-Only Serial Diagnostic Tool\n"
|
|
"** No settings will be modified **"
|
|
)
|
|
if RICH:
|
|
console.print(Panel.fit(f"[bold]{banner}[/bold]", border_style="blue"))
|
|
else:
|
|
print("=" * 50)
|
|
for line in banner.split("\n"):
|
|
print(f" {line}")
|
|
print("=" * 50)
|
|
|
|
if args.scan:
|
|
scan_ports()
|
|
return
|
|
|
|
# Find port
|
|
port = args.port
|
|
if not port:
|
|
p = find_usb_serial()
|
|
if p:
|
|
port = p.device
|
|
log(f"Auto-detected: {port} ({p.description})")
|
|
else:
|
|
log("No serial port found. Plug in your USB-to-serial adapter.", style="bold red")
|
|
log("Run with --scan to list available ports.")
|
|
return
|
|
|
|
# Find baud
|
|
baud = args.baud
|
|
if not baud:
|
|
baud = detect_baud(port)
|
|
|
|
# Connect
|
|
diag = ElevatorDiag(port, baud, args.addr)
|
|
try:
|
|
diag.connect()
|
|
|
|
if args.monitor:
|
|
monitor_mode(diag)
|
|
elif args.clock_only:
|
|
diag.read_clock()
|
|
else:
|
|
diag.run_full_diagnosis()
|
|
|
|
except serial.SerialException as e:
|
|
log(f"Serial error: {e}", style="bold red")
|
|
log("Check that the port is correct and not in use by another program.")
|
|
log("Common fix: close any other serial terminal (PuTTY, etc.) first.")
|
|
except KeyboardInterrupt:
|
|
log("\nStopped by user.")
|
|
finally:
|
|
diag.close()
|
|
if _logfile:
|
|
_logfile.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|