You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

112 lines
3.8 KiB

"""Diagnose R3 controller button bitmask positions via wireless_remote in rt/lowstate.
Tries different network interfaces and LowState types to find where
wireless_remote data flows.
"""
import paramiko, sys, time
sys.stdout.reconfigure(encoding='utf-8', errors='replace')
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect('10.0.0.64', username='unitree', password='123',
timeout=15, look_for_keys=False, allow_agent=False)
# Deploy diagnostic script
script = r'''
import sys, time, struct
sys.path.insert(0, "/home/unitree/miniforge3/envs/tv/lib/python3.11/site-packages")
from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize
from unitree_sdk2py.idl.unitree_hg.msg.dds_ import LowState_ as hg_LowState
# Try different interfaces
import subprocess
result = subprocess.run(['ip', 'link', 'show'], capture_output=True, text=True)
print("=== Network interfaces ===")
for line in result.stdout.split('\n'):
if ': ' in line and 'state' in line:
print(f" {line.strip()}")
# Use eth0 which connects to the internal bus
iface = "eth0"
print(f"\nUsing interface: {iface}")
ChannelFactoryInitialize(0, iface)
count = [0]
last_wr = [None]
def handler(msg):
count[0] += 1
# Check if wireless_remote exists and has data
try:
wr = msg.wireless_remote
data = bytes(wr)
# Only print if changed or first time or periodic
if data != last_wr[0] or count[0] % 200 == 1:
btn1 = data[2] if len(data) > 2 else 0
btn2 = data[3] if len(data) > 3 else 0
btn1_names = []
if btn1 & 0x01: btn1_names.append("R1")
if btn1 & 0x02: btn1_names.append("L1")
if btn1 & 0x04: btn1_names.append("Start")
if btn1 & 0x08: btn1_names.append("Select")
if btn1 & 0x10: btn1_names.append("R2")
if btn1 & 0x20: btn1_names.append("L2")
if btn1 & 0x40: btn1_names.append("F1")
if btn1 & 0x80: btn1_names.append("F3")
btn2_names = []
if btn2 & 0x01: btn2_names.append("A")
if btn2 & 0x02: btn2_names.append("B")
if btn2 & 0x04: btn2_names.append("X")
if btn2 & 0x08: btn2_names.append("Y")
if btn2 & 0x10: btn2_names.append("Up")
if btn2 & 0x20: btn2_names.append("Right")
if btn2 & 0x40: btn2_names.append("Down")
if btn2 & 0x80: btn2_names.append("Left")
any_nonzero = any(b != 0 for b in data[:4])
ts = time.strftime('%H:%M:%S')
print(f"[{ts}] #{count[0]:4d} len={len(data)} "
f"btn1=0x{btn1:02x}[{','.join(btn1_names) or '-'}] "
f"btn2=0x{btn2:02x}[{','.join(btn2_names) or '-'}] "
f"raw={data[:8].hex()} "
f"{'*** BUTTON ***' if any_nonzero else ''}", flush=True)
last_wr[0] = data
except Exception as e:
if count[0] <= 3:
print(f"Error reading wireless_remote: {e}", flush=True)
sub = ChannelSubscriber("rt/lowstate", hg_LowState)
sub.Init(handler, 10)
print("\n=== R3 Button Diagnostic (hg_LowState on eth0) ===", flush=True)
print("Press buttons on R3 controller. Running 15 seconds...\n", flush=True)
time.sleep(15)
print(f"\nTotal messages received: {count[0]}", flush=True)
'''
print("Deploying R3 button diagnostic to robot...")
sftp = ssh.open_sftp()
with sftp.file('/tmp/r3_diag.py', 'w') as f:
f.write(script)
sftp.close()
print("Deployed.")
print("\n=== Running diagnostic (15 seconds) ===")
print("Press each R3 button now!\n")
_, o, e = ssh.exec_command(
'/home/unitree/miniforge3/envs/tv/bin/python /tmp/r3_diag.py',
timeout=30)
output = o.read().decode('utf-8', errors='replace')
print(output)
err = e.read().decode('utf-8', errors='replace')
if err:
print(f"Stderr: {err}")
ssh.close()