#!/usr/bin/env python3 """ Joint calibration tool: animates ONE robot joint while recording Quest tracking data. The user copies the robot's motion. By comparing the known robot joint angle with the raw XR tracking deltas, we determine the correct axis mapping and sign. Usage: python joint_calibrate.py l_shoulder_pitch [--period 6] [--amplitude 0.8] """ import sys import os import time import argparse import signal import numpy as np from scipy.spatial.transform import Rotation # Add xr_teleoperate to path sys.path.insert(0, "/home/sparky/git/xr_teleoperate") import logging_mp if not hasattr(logging_mp, 'getLogger'): logging_mp.getLogger = logging_mp.get_logger import dex_retargeting if not hasattr(dex_retargeting, 'RetargetingConfig'): from dex_retargeting.retargeting_config import RetargetingConfig dex_retargeting.RetargetingConfig = RetargetingConfig from teleop_server import TeleopServer from native_tv_wrapper import T_ROBOT_OPENXR, NativeTeleWrapper NUM_JOINTS = 29 R3_ROBOT_OPENXR = T_ROBOT_OPENXR[:3, :3] JOINTS = { 'l_shoulder_pitch': 15, 'l_shoulder_roll': 16, 'l_shoulder_yaw': 17, 'l_elbow': 18, 'l_wrist_roll': 19, 'l_wrist_pitch': 20, 'l_wrist_yaw': 21, 'r_shoulder_pitch': 22, 'r_shoulder_roll': 23, 'r_shoulder_yaw': 24, 'r_elbow': 25, } # Which XR parent-child pair to monitor for each robot joint # Using hips (not chest) for shoulder to avoid head rotation contamination JOINT_PARENTS = { 'l_shoulder_pitch': ('hips', 'l_upper'), 'l_shoulder_roll': ('hips', 'l_upper'), 'l_shoulder_yaw': ('hips', 'l_upper'), 'l_elbow': ('l_upper', 'l_lower'), 'l_wrist_roll': ('l_lower', 'l_wrist'), 'l_wrist_pitch': ('l_lower', 'l_wrist'), 'l_wrist_yaw': ('l_lower', 'l_wrist'), 'r_shoulder_pitch': ('hips', 'r_upper'), 'r_shoulder_roll': ('hips', 'r_upper'), 'r_shoulder_yaw': ('hips', 'r_upper'), 'r_elbow': ('r_upper', 'r_lower'), } KP = [60,60,60,100,40,40, 60,60,60,100,40,40, 60,40,40, 40,40,40,40,40,40,40, 40,40,40,40,40,40,40] KD = [1,1,1,2,1,1, 1,1,1,2,1,1, 1,1,1, 1,1,1,1,1,1,1, 1,1,1,1,1,1,1] # Calibrated extents from joint testing DEFAULT_AMP = { 'l_shoulder_pitch': 1.6, 'l_shoulder_roll': 0.8, 'l_shoulder_yaw': 1.15, 'l_elbow': 1.065, 'l_wrist_roll': 1.35, 'l_wrist_pitch': 0.8, 'l_wrist_yaw': 0.89, 'r_shoulder_pitch': 1.6, 'r_shoulder_roll': 0.8, 'r_shoulder_yaw': 1.15, 'r_elbow': 1.065, } DEFAULT_OFFSET = { 'l_shoulder_pitch': -0.8, 'l_shoulder_roll': 0.785, 'l_shoulder_yaw': 0.0, 'l_elbow': 0.265, 'l_wrist_roll': 0.0, 'l_wrist_pitch': 0.0, 'l_wrist_yaw': -0.39, 'r_shoulder_pitch': -0.8, 'r_shoulder_roll': -0.785, 'r_shoulder_yaw': 0.0, 'r_elbow': 0.265, } def pose7_to_mat4(pose7): mat = np.eye(4) r = Rotation.from_quat(pose7[3:7]) mat[:3, :3] = r.as_matrix() mat[:3, 3] = pose7[:3] return mat def parse_body_joints(flat): names = ['hips', 'chest', 'l_upper', 'l_lower', 'l_wrist', 'r_upper', 'r_lower', 'r_wrist'] joints = {} for i, name in enumerate(names): p7 = flat[i*7:(i+1)*7] if np.all(p7 == 0): joints[name] = None else: joints[name] = pose7_to_mat4(p7) return joints def main(): parser = argparse.ArgumentParser(description="Joint calibration with Quest tracking") parser.add_argument("joint", choices=list(JOINTS.keys())) parser.add_argument("--amplitude", "-a", type=float, default=None) parser.add_argument("--offset", "-o", type=float, default=None) parser.add_argument("--period", "-p", type=float, default=8.0) parser.add_argument("--hz", type=float, default=50.0) parser.add_argument("--port", type=int, default=8765) parser.add_argument("--dds-domain", type=int, default=1) parser.add_argument("--cal-delay", type=float, default=10.0, help="Seconds to wait before calibrating XR (default: 10.0)") args = parser.parse_args() joint_idx = JOINTS[args.joint] amplitude = args.amplitude if args.amplitude is not None else DEFAULT_AMP.get(args.joint, 0.5) offset = args.offset if args.offset is not None else DEFAULT_OFFSET.get(args.joint, 0.0) parent_name, child_name = JOINT_PARENTS[args.joint] print(f"\n{'='*70}") print(f"JOINT CALIBRATION: {args.joint} (index {joint_idx})") print(f"Monitoring XR: {parent_name} -> {child_name}") print(f"Animation: sine amp={amplitude:.2f} offset={offset:.2f} rad, period {args.period:.1f}s") print(f"Range: [{offset-amplitude:.2f}, {offset+amplitude:.2f}] rad") print(f"XR cal delay: {args.cal_delay}s") print(f"{'='*70}") print(f"\nStarting WebSocket server on port {args.port}...") # Start teleop server tv = NativeTeleWrapper(port=args.port, host="0.0.0.0") tv.start() time.sleep(0.5) # Init DDS from unitree_sdk2py.core.channel import ChannelPublisher, ChannelFactoryInitialize from unitree_sdk2py.idl.unitree_hg.msg.dds_ import LowCmd_ from unitree_sdk2py.idl.default import unitree_hg_msg_dds__LowCmd_ from unitree_sdk2py.utils.crc import CRC ChannelFactoryInitialize(args.dds_domain) pub = ChannelPublisher("rt/lowcmd", LowCmd_) pub.Init() cmd = unitree_hg_msg_dds__LowCmd_() crc = CRC() running = True def handler(sig, frame): nonlocal running running = False signal.signal(signal.SIGINT, handler) signal.signal(signal.SIGTERM, handler) # Wait for Quest connection print("\nConnect Quest 3 and hold the ROBOT'S REST POSE for calibration...") print("(Arms at sides, elbows bent 90 degrees, forearms forward)") print(f"XR calibration will happen {args.cal_delay}s after first tracking frame.\n") cal_ref = None first_tracking_time = None interval = 1.0 / args.hz t0 = time.time() # Start animation immediately step = 0 while running: t_start = time.perf_counter() now = time.time() step += 1 # 1) Compute robot animation value elapsed = now - t0 robot_val = offset + amplitude * np.sin(2 * np.pi * elapsed / args.period) angles = np.zeros(NUM_JOINTS) angles[joint_idx] = robot_val # 2) Read XR tracking and compute comparison (if available) xr_info = None with tv.server.body_joints_shared.get_lock(): bj_raw = np.array(tv.server.body_joints_shared[:]) has_data = np.any(bj_raw != 0) data_time = tv.last_data_time tracking = data_time > 0 and (now - data_time) < 0.5 if tracking and has_data: if first_tracking_time is None: first_tracking_time = now print(f"\nFirst tracking data! Calibrating in {args.cal_delay}s — hold the rest pose...") bj = parse_body_joints(bj_raw) parent = bj.get(parent_name) child = bj.get(child_name) if parent is not None and child is not None: R_parent = parent[:3, :3] R_child = child[:3, :3] R_rel = R_parent.T @ R_child # Calibrate after delay if cal_ref is None and first_tracking_time is not None and (now - first_tracking_time) >= args.cal_delay: cal_ref = R_rel.copy() print("\nCALIBRATED! Now copy the robot's motion.\n") print(f"{'Robot Value':>12s} | {'XR Delta (robot frame YXZ Euler)':>40s} | {'XR Delta (robot frame XYZ)':>35s}") print("-" * 95) if cal_ref is not None: # Simple basis change: XR frame -> robot frame R_delta = cal_ref.T @ R_rel R_delta_robot = R3_ROBOT_OPENXR @ R_delta @ R3_ROBOT_OPENXR.T yxz = Rotation.from_matrix(R_delta_robot).as_euler('YXZ') xyz = Rotation.from_matrix(R_delta_robot).as_euler('XYZ') xr_info = (yxz, xyz) # 3) Publish DDS (once per frame, always with current angles) cmd.mode_pr = 0 cmd.mode_machine = 0 for i in range(NUM_JOINTS): cmd.motor_cmd[i].mode = 1 cmd.motor_cmd[i].q = float(angles[i]) cmd.motor_cmd[i].dq = 0.0 cmd.motor_cmd[i].tau = 0.0 cmd.motor_cmd[i].kp = float(KP[i]) cmd.motor_cmd[i].kd = float(KD[i]) for i in range(12): cmd.motor_cmd[i].mode = 0 cmd.motor_cmd[i].kp = 0.0 cmd.motor_cmd[i].kd = 0.0 cmd.crc = crc.Crc(cmd) pub.Write(cmd) # 4) Log if xr_info and step % int(args.hz) == 0: yxz, xyz = xr_info print(f" {robot_val:+.3f} rad | " f"Y={yxz[0]:+.3f} X={yxz[1]:+.3f} Z={yxz[2]:+.3f} | " f"X={xyz[0]:+.3f} Y={xyz[1]:+.3f} Z={xyz[2]:+.3f}") elif not xr_info and step % int(args.hz * 2) == 0: print(f" robot={robot_val:+.3f} (Waiting for Quest tracking data...)") sleep_time = interval - (time.perf_counter() - t_start) if sleep_time > 0: time.sleep(sleep_time) # Return to zero print("\nReturning to zero...") angles = np.zeros(NUM_JOINTS) for _ in range(25): cmd.mode_pr = 0 cmd.mode_machine = 0 for i in range(NUM_JOINTS): cmd.motor_cmd[i].mode = 1 cmd.motor_cmd[i].q = 0.0 cmd.motor_cmd[i].dq = 0.0 cmd.motor_cmd[i].tau = 0.0 cmd.motor_cmd[i].kp = float(KP[i]) cmd.motor_cmd[i].kd = float(KD[i]) for i in range(12): cmd.motor_cmd[i].mode = 0 cmd.motor_cmd[i].kp = 0.0 cmd.motor_cmd[i].kd = 0.0 cmd.crc = crc.Crc(cmd) pub.Write(cmd) time.sleep(0.02) print("Done.") if __name__ == "__main__": main()