You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

98 lines
3.8 KiB

#!/usr/bin/env python3
"""Quick DDS connectivity test for G1 robot.
Run this on the GB10 after connecting the robot to the network.
Usage: python3 dds_test.py [interface_name]
"""
import sys
import time
import os
import subprocess
import socket
import struct
def main():
# Auto-detect or use provided interface
iface = sys.argv[1] if len(sys.argv) > 1 else None
if iface is None:
result = subprocess.run(['ip', '-o', '-4', 'addr', 'show'], capture_output=True, text=True)
for line in result.stdout.splitlines():
if '192.168.123' in line:
iface = line.split()[1]
break
if iface is None:
print('ERROR: No interface with 192.168.123.x found. Add one with:')
print(' sudo ip addr add 192.168.123.100/24 dev <interface>')
sys.exit(1)
# Get our IP on that interface
result = subprocess.run(['ip', '-o', '-4', 'addr', 'show', iface], capture_output=True, text=True)
our_ip = None
for word in result.stdout.split():
if '192.168.123' in word:
our_ip = word.split('/')[0]
break
print(f'[1/4] Interface: {iface} ({our_ip})', flush=True)
# Step 1: Ping test
print('[2/4] Ping test...', flush=True)
for ip in ['192.168.123.161', '192.168.123.164']:
r = subprocess.run(['ping', '-c', '1', '-W', '1', ip], capture_output=True, text=True)
status = 'OK' if r.returncode == 0 else 'FAIL'
print(f' {ip}: {status}', flush=True)
# Step 2: Multicast receive test
print('[3/4] DDS multicast discovery (5s)...', flush=True)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('', 7400))
mreq = struct.pack('4s4s', socket.inet_aton('239.255.0.1'), socket.inet_aton(our_ip))
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
sock.settimeout(1.0)
count = 0
sources = set()
start = time.time()
while time.time() - start < 5:
try:
data, addr = sock.recvfrom(4096)
count += 1
sources.add(addr[0])
except socket.timeout:
pass
sock.close()
robot_sources = {s for s in sources if s.startswith('192.168.123')}
print(f' {count} packets from {sources}', flush=True)
if robot_sources:
print(f' Robot DDS discovered: {robot_sources}', flush=True)
else:
print(' WARNING: No DDS multicast from robot!', flush=True)
print(' Check: sudo ufw disable OR sudo ufw allow from 192.168.123.0/24', flush=True)
sys.exit(1)
# Step 3: SDK subscriber test
print('[4/4] Unitree SDK rt/lowstate test (8s)...', flush=True)
try:
sys.path.insert(0, '/home/mitchaiet/GR00T-WholeBodyControl/.venv/lib/python3.12/site-packages')
from unitree_sdk2py.core.channel import ChannelFactoryInitialize, ChannelSubscriber
from unitree_sdk2py.idl.unitree_hg.msg.dds_ import LowState_
ChannelFactoryInitialize(0, iface)
sub = ChannelSubscriber('rt/lowstate', LowState_)
sub.Init()
for i in range(16):
msg = sub.Read()
if msg is not None:
gyro = msg.imu_state.gyroscope
print(f' GOT rt/lowstate! IMU gyro: [{gyro[0]:.4f}, {gyro[1]:.4f}, {gyro[2]:.4f}]', flush=True)
print(f' Motor states: {len(msg.motor_state)} joints', flush=True)
print('\n=== ALL TESTS PASSED ===', flush=True)
os._exit(0)
time.sleep(0.5)
print(' FAIL: No data on rt/lowstate after 8s', flush=True)
print(' DDS discovery works but topic data not received', flush=True)
except Exception as e:
print(f' SDK error: {e}', flush=True)
os._exit(1)
if __name__ == '__main__':
main()