#!/usr/bin/env python3 """ Retargeting bridge: Quest 3 body tracking -> G1 robot joint commands via DDS. Reads body tracking data from the teleop server's shared memory, retargets body joint orientations to G1 robot joint angles, and publishes commands over CycloneDDS for Isaac Sim (or real robot). Usage: python retarget_bridge.py [--port 8765] [--dds-domain 1] [--hz 50] Architecture: Quest 3 -> WebSocket -> TeleopServer (shared memory) | retarget_bridge.py | CycloneDDS -> Isaac Sim """ import sys import os import time import argparse import logging import signal import numpy as np from multiprocessing import Array # Local imports from teleop_server import TeleopServer logger = logging.getLogger("retarget_bridge") # --------------------------------------------------------------------------- # Coordinate transform constants # --------------------------------------------------------------------------- # OpenXR (Y-up, -Z forward, X-right) -> Robot (X-forward, Y-left, Z-up) R_ROBOT_OPENXR = np.array([[ 0, 0,-1], [-1, 0, 0], [ 0, 1, 0]], dtype=np.float64) R_OPENXR_ROBOT = R_ROBOT_OPENXR.T # --------------------------------------------------------------------------- # G1 joint indices (29 DOF) # --------------------------------------------------------------------------- NUM_JOINTS = 29 # Waist WAIST_YAW = 12 WAIST_ROLL = 13 WAIST_PITCH = 14 # Left arm L_SHOULDER_PITCH = 15 L_SHOULDER_ROLL = 16 L_SHOULDER_YAW = 17 L_ELBOW = 18 L_WRIST_ROLL = 19 L_WRIST_PITCH = 20 L_WRIST_YAW = 21 # Right arm R_SHOULDER_PITCH = 22 R_SHOULDER_ROLL = 23 R_SHOULDER_YAW = 24 R_ELBOW = 25 R_WRIST_ROLL = 26 R_WRIST_PITCH = 27 R_WRIST_YAW = 28 # --------------------------------------------------------------------------- # Body joints shared memory layout (8 joints x 7 floats = 56) # Each joint: [x, y, z, qx, qy, qz, qw] # --------------------------------------------------------------------------- BJ_HIPS = 0 # offset 0 BJ_CHEST = 1 # offset 7 BJ_L_UPPER_ARM = 2 # offset 14 BJ_L_LOWER_ARM = 3 # offset 21 BJ_L_WRIST = 4 # offset 28 BJ_R_UPPER_ARM = 5 # offset 35 BJ_R_LOWER_ARM = 6 # offset 42 BJ_R_WRIST = 7 # offset 49 NUM_BODY_JOINTS = 8 # --------------------------------------------------------------------------- # URDF joint limits (from g1_custom_collision_29dof.urdf) # --------------------------------------------------------------------------- JOINT_LIMITS = { 12: (-2.618, 2.618), # waist_yaw 13: (-0.52, 0.52), # waist_roll 14: (-0.52, 0.52), # waist_pitch 15: (-3.0892, 2.6704), # left_shoulder_pitch 16: (-1.5882, 2.2515), # left_shoulder_roll 17: (-2.618, 2.618), # left_shoulder_yaw 18: (-1.0472, 2.0944), # left_elbow 19: (-1.972, 1.972), # left_wrist_roll 20: (-1.614, 1.614), # left_wrist_pitch 21: (-1.614, 1.614), # left_wrist_yaw 22: (-3.0892, 2.6704), # right_shoulder_pitch 23: (-2.2515, 1.5882), # right_shoulder_roll 24: (-2.618, 2.618), # right_shoulder_yaw 25: (-1.0472, 2.0944), # right_elbow 26: (-1.972, 1.972), # right_wrist_roll 27: (-1.614, 1.614), # right_wrist_pitch 28: (-1.614, 1.614), # right_wrist_yaw } SOFT_LIMIT_FACTOR = 0.9 # Default Kp/Kd from Unitree SDK example KP = [ 60, 60, 60, 100, 40, 40, # Left leg 60, 60, 60, 100, 40, 40, # Right leg 60, 40, 40, # Waist 40, 40, 40, 40, 40, 40, 40, # Left arm 40, 40, 40, 40, 40, 40, 40, # Right arm ] KD = [ 1, 1, 1, 2, 1, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ] # Inspire hand joint ordering in DDS MotorCmds_ (12 total) # Indices 0-5: right hand, 6-11: left hand INSPIRE_R_PINKY = 0 INSPIRE_R_RING = 1 INSPIRE_R_MIDDLE = 2 INSPIRE_R_INDEX = 3 INSPIRE_R_THUMB_PITCH = 4 INSPIRE_R_THUMB_YAW = 5 INSPIRE_L_PINKY = 6 INSPIRE_L_RING = 7 INSPIRE_L_MIDDLE = 8 INSPIRE_L_INDEX = 9 INSPIRE_L_THUMB_PITCH = 10 INSPIRE_L_THUMB_YAW = 11 # Quest hand joint indices (XR_EXT_hand_tracking, 25 per hand) QH_PALM = 0 QH_WRIST = 1 QH_THUMB_METACARPAL = 2 QH_THUMB_PROXIMAL = 3 QH_THUMB_TIP = 4 QH_INDEX_METACARPAL = 5 QH_INDEX_PROXIMAL = 6 QH_INDEX_INTERMEDIATE = 7 QH_INDEX_TIP = 8 QH_MIDDLE_METACARPAL = 9 QH_MIDDLE_PROXIMAL = 10 QH_MIDDLE_INTERMEDIATE = 11 QH_MIDDLE_TIP = 12 QH_RING_METACARPAL = 13 QH_RING_PROXIMAL = 14 QH_RING_INTERMEDIATE = 15 QH_RING_TIP = 16 QH_PINKY_METACARPAL = 17 QH_PINKY_PROXIMAL = 18 QH_PINKY_INTERMEDIATE = 19 QH_PINKY_TIP = 20 # --------------------------------------------------------------------------- # Rotation math utilities # --------------------------------------------------------------------------- def _rx(angle): """Rotation matrix about X axis.""" c, s = np.cos(angle), np.sin(angle) return np.array([[1, 0, 0], [0, c, -s], [0, s, c]], dtype=np.float64) def _ry(angle): """Rotation matrix about Y axis.""" c, s = np.cos(angle), np.sin(angle) return np.array([[c, 0, s], [0, 1, 0], [-s, 0, c]], dtype=np.float64) def _rz(angle): """Rotation matrix about Z axis.""" c, s = np.cos(angle), np.sin(angle) return np.array([[c, -s, 0], [s, c, 0], [0, 0, 1]], dtype=np.float64) def quat_to_rotmat(qx, qy, qz, qw): """Quaternion to 3x3 rotation matrix.""" xx = qx * qx; yy = qy * qy; zz = qz * qz xy = qx * qy; xz = qx * qz; yz = qy * qz wx = qw * qx; wy = qw * qy; wz = qw * qz return np.array([ [1 - 2*(yy + zz), 2*(xy - wz), 2*(xz + wy)], [2*(xy + wz), 1 - 2*(xx + zz), 2*(yz - wx)], [2*(xz - wy), 2*(yz + wx), 1 - 2*(xx + yy)], ], dtype=np.float64) def euler_zxy(R): """Decompose R = Rz(yaw) * Rx(roll) * Ry(pitch). Returns (yaw, roll, pitch). Used for waist joints: yaw(Z) -> roll(X) -> pitch(Y). """ # R[2,1] = sin(roll) roll = np.arcsin(np.clip(R[2, 1], -1.0, 1.0)) if np.abs(R[2, 1]) < 0.9999: yaw = np.arctan2(-R[0, 1], R[1, 1]) pitch = np.arctan2(-R[2, 0], R[2, 2]) else: yaw = np.arctan2(R[1, 0], R[0, 0]) pitch = 0.0 return yaw, roll, pitch def euler_yxz(R): """Decompose R = Ry(pitch) * Rx(roll) * Rz(yaw). Returns (pitch, roll, yaw). Used for shoulder joints: pitch(Y) -> roll(X) -> yaw(Z). """ # R[1,2] = -sin(roll) roll = np.arcsin(np.clip(-R[1, 2], -1.0, 1.0)) if np.abs(R[1, 2]) < 0.9999: pitch = np.arctan2(R[0, 2], R[2, 2]) yaw = np.arctan2(R[1, 0], R[1, 1]) else: pitch = np.arctan2(-R[2, 0], R[0, 0]) yaw = 0.0 return pitch, roll, yaw def euler_xyz(R): """Decompose R = Rx(roll) * Ry(pitch) * Rz(yaw). Returns (roll, pitch, yaw). Used for wrist joints: roll(X) -> pitch(Y) -> yaw(Z). """ # R[0,2] = sin(pitch) pitch = np.arcsin(np.clip(R[0, 2], -1.0, 1.0)) if np.abs(R[0, 2]) < 0.9999: roll = np.arctan2(-R[1, 2], R[2, 2]) yaw = np.arctan2(-R[0, 1], R[0, 0]) else: roll = np.arctan2(R[2, 1], R[1, 1]) yaw = 0.0 return roll, pitch, yaw def extract_y_rotation(R): """Extract the Y-axis rotation angle from a rotation matrix. Used for elbow (single-axis pitch). """ return np.arctan2(R[0, 2], R[0, 0]) # Pre-compute shoulder rest pose rotations L_SHOULDER_REST = _ry(0.27931) @ _rx(-0.27925) R_SHOULDER_REST = _ry(-0.27931) @ _rx(0.27925) L_SHOULDER_REST_INV = L_SHOULDER_REST.T R_SHOULDER_REST_INV = R_SHOULDER_REST.T # --------------------------------------------------------------------------- # Joint smoother # --------------------------------------------------------------------------- class JointSmoother: """EMA smoothing with rate limiter for joint angles.""" def __init__(self, n_joints, alpha=0.3, max_delta=0.15): """ Args: n_joints: Number of joints to smooth. alpha: EMA factor (0=no change, 1=instant). Default 0.3. max_delta: Max angle change per step in radians. At 50Hz, 0.15 rad/step = 7.5 rad/s. """ self.prev = np.zeros(n_joints) self.alpha = alpha self.max_delta = max_delta def smooth(self, target): """Apply EMA + rate limiting. Returns smoothed values.""" # EMA smoothed = self.alpha * target + (1.0 - self.alpha) * self.prev # Rate limiter delta = np.clip(smoothed - self.prev, -self.max_delta, self.max_delta) result = self.prev + delta self.prev = result.copy() return result # --------------------------------------------------------------------------- # Body retargeter # --------------------------------------------------------------------------- class BodyRetargeter: """Converts body tracking poses to G1 joint angles via orientation decomposition.""" def __init__(self): # Pre-compute soft joint limits self.soft_lo = np.zeros(NUM_JOINTS) self.soft_hi = np.zeros(NUM_JOINTS) for idx, (lo, hi) in JOINT_LIMITS.items(): mid = (lo + hi) / 2.0 half = (hi - lo) / 2.0 self.soft_lo[idx] = mid - SOFT_LIMIT_FACTOR * half self.soft_hi[idx] = mid + SOFT_LIMIT_FACTOR * half # Calibration: captured at first valid frame self._calibrated = False self._ref_rotations = {} # joint_group -> R_rel at calibration def retarget(self, body_rotmats): """Compute 29 joint angles from body tracking rotation matrices. Args: body_rotmats: dict mapping joint index (BJ_*) to 3x3 rotation matrix in OpenXR world frame. Returns: np.ndarray of shape (29,) with joint angles in radians. """ angles = np.zeros(NUM_JOINTS) # --- Waist: HIPS -> CHEST --- R_hips = body_rotmats[BJ_HIPS] R_chest = body_rotmats[BJ_CHEST] R_waist_rel = self._to_robot_rel(R_hips, R_chest) if not self._calibrated: self._ref_rotations['waist'] = R_waist_rel.copy() R_waist_delta = self._ref_rotations.get('waist', np.eye(3)).T @ R_waist_rel yaw, roll, pitch = euler_zxy(R_waist_delta) angles[WAIST_YAW] = yaw angles[WAIST_ROLL] = roll angles[WAIST_PITCH] = pitch # --- Left shoulder: CHEST -> LEFT_UPPER_ARM --- R_l_upper = body_rotmats[BJ_L_UPPER_ARM] R_l_shoulder_rel = self._to_robot_rel(R_chest, R_l_upper) if not self._calibrated: self._ref_rotations['l_shoulder'] = R_l_shoulder_rel.copy() R_l_shoulder_delta = self._ref_rotations.get('l_shoulder', np.eye(3)).T @ R_l_shoulder_rel l_sp, l_sr, l_sy = euler_yxz(R_l_shoulder_delta) angles[L_SHOULDER_PITCH] = l_sp angles[L_SHOULDER_ROLL] = l_sr angles[L_SHOULDER_YAW] = l_sy # --- Left elbow: LEFT_UPPER_ARM -> LEFT_LOWER_ARM --- R_l_lower = body_rotmats[BJ_L_LOWER_ARM] R_l_elbow_rel = self._to_robot_rel(R_l_upper, R_l_lower) if not self._calibrated: self._ref_rotations['l_elbow'] = R_l_elbow_rel.copy() R_l_elbow_delta = self._ref_rotations.get('l_elbow', np.eye(3)).T @ R_l_elbow_rel angles[L_ELBOW] = extract_y_rotation(R_l_elbow_delta) # --- Left wrist: LEFT_LOWER_ARM -> LEFT_WRIST --- R_l_wrist = body_rotmats[BJ_L_WRIST] R_l_wrist_rel = self._to_robot_rel(R_l_lower, R_l_wrist) if not self._calibrated: self._ref_rotations['l_wrist'] = R_l_wrist_rel.copy() R_l_wrist_delta = self._ref_rotations.get('l_wrist', np.eye(3)).T @ R_l_wrist_rel l_wr, l_wp, l_wy = euler_xyz(R_l_wrist_delta) angles[L_WRIST_ROLL] = l_wr angles[L_WRIST_PITCH] = l_wp angles[L_WRIST_YAW] = l_wy # --- Right shoulder: CHEST -> RIGHT_UPPER_ARM --- R_r_upper = body_rotmats[BJ_R_UPPER_ARM] R_r_shoulder_rel = self._to_robot_rel(R_chest, R_r_upper) if not self._calibrated: self._ref_rotations['r_shoulder'] = R_r_shoulder_rel.copy() R_r_shoulder_delta = self._ref_rotations.get('r_shoulder', np.eye(3)).T @ R_r_shoulder_rel r_sp, r_sr, r_sy = euler_yxz(R_r_shoulder_delta) angles[R_SHOULDER_PITCH] = r_sp angles[R_SHOULDER_ROLL] = r_sr angles[R_SHOULDER_YAW] = r_sy # --- Right elbow: RIGHT_UPPER_ARM -> RIGHT_LOWER_ARM --- R_r_lower = body_rotmats[BJ_R_LOWER_ARM] R_r_elbow_rel = self._to_robot_rel(R_r_upper, R_r_lower) if not self._calibrated: self._ref_rotations['r_elbow'] = R_r_elbow_rel.copy() R_r_elbow_delta = self._ref_rotations.get('r_elbow', np.eye(3)).T @ R_r_elbow_rel angles[R_ELBOW] = extract_y_rotation(R_r_elbow_delta) # --- Right wrist: RIGHT_LOWER_ARM -> RIGHT_WRIST --- R_r_wrist = body_rotmats[BJ_R_WRIST] R_r_wrist_rel = self._to_robot_rel(R_r_lower, R_r_wrist) if not self._calibrated: self._ref_rotations['r_wrist'] = R_r_wrist_rel.copy() R_r_wrist_delta = self._ref_rotations.get('r_wrist', np.eye(3)).T @ R_r_wrist_rel r_wr, r_wp, r_wy = euler_xyz(R_r_wrist_delta) angles[R_WRIST_ROLL] = r_wr angles[R_WRIST_PITCH] = r_wp angles[R_WRIST_YAW] = r_wy # Mark calibration complete if not self._calibrated: self._calibrated = True logger.info("Calibration captured (neutral pose reference)") # Clamp to soft joint limits for idx in JOINT_LIMITS: angles[idx] = np.clip(angles[idx], self.soft_lo[idx], self.soft_hi[idx]) return angles def _to_robot_rel(self, R_parent_openxr, R_child_openxr): """Compute parent-relative rotation in robot frame. R_rel_openxr = R_parent^T @ R_child R_rel_robot = R_ROBOT_OPENXR @ R_rel_openxr @ R_OPENXR_ROBOT """ R_rel = R_parent_openxr.T @ R_child_openxr return R_ROBOT_OPENXR @ R_rel @ R_OPENXR_ROBOT def reset_calibration(self): """Reset calibration so next frame becomes the new reference.""" self._calibrated = False self._ref_rotations.clear() logger.info("Calibration reset -- next valid frame will be captured") # --------------------------------------------------------------------------- # Hand retargeter # --------------------------------------------------------------------------- class HandRetargeter: """Maps Quest hand tracking to Inspire hand joint values.""" def retarget(self, left_hand_pos, right_hand_pos): """Compute 12 Inspire hand joint values from Quest hand positions. Args: left_hand_pos: (25, 3) wrist-relative hand positions (robot frame) right_hand_pos: (25, 3) wrist-relative hand positions (robot frame) Returns: np.ndarray of shape (12,) with normalized hand values. Convention: 1.0 = closed, 0.0 = open (matching Inspire DDS). """ hand_cmd = np.zeros(12) if np.any(left_hand_pos != 0): hand_cmd[INSPIRE_L_INDEX] = self._finger_curl( left_hand_pos, QH_INDEX_METACARPAL, QH_INDEX_PROXIMAL, QH_INDEX_INTERMEDIATE, QH_INDEX_TIP) hand_cmd[INSPIRE_L_MIDDLE] = self._finger_curl( left_hand_pos, QH_MIDDLE_METACARPAL, QH_MIDDLE_PROXIMAL, QH_MIDDLE_INTERMEDIATE, QH_MIDDLE_TIP) hand_cmd[INSPIRE_L_RING] = self._finger_curl( left_hand_pos, QH_RING_METACARPAL, QH_RING_PROXIMAL, QH_RING_INTERMEDIATE, QH_RING_TIP) hand_cmd[INSPIRE_L_PINKY] = self._finger_curl( left_hand_pos, QH_PINKY_METACARPAL, QH_PINKY_PROXIMAL, QH_PINKY_INTERMEDIATE, QH_PINKY_TIP) t_pitch, t_yaw = self._thumb_angles(left_hand_pos) hand_cmd[INSPIRE_L_THUMB_PITCH] = t_pitch hand_cmd[INSPIRE_L_THUMB_YAW] = t_yaw if np.any(right_hand_pos != 0): hand_cmd[INSPIRE_R_INDEX] = self._finger_curl( right_hand_pos, QH_INDEX_METACARPAL, QH_INDEX_PROXIMAL, QH_INDEX_INTERMEDIATE, QH_INDEX_TIP) hand_cmd[INSPIRE_R_MIDDLE] = self._finger_curl( right_hand_pos, QH_MIDDLE_METACARPAL, QH_MIDDLE_PROXIMAL, QH_MIDDLE_INTERMEDIATE, QH_MIDDLE_TIP) hand_cmd[INSPIRE_R_RING] = self._finger_curl( right_hand_pos, QH_RING_METACARPAL, QH_RING_PROXIMAL, QH_RING_INTERMEDIATE, QH_RING_TIP) hand_cmd[INSPIRE_R_PINKY] = self._finger_curl( right_hand_pos, QH_PINKY_METACARPAL, QH_PINKY_PROXIMAL, QH_PINKY_INTERMEDIATE, QH_PINKY_TIP) t_pitch, t_yaw = self._thumb_angles(right_hand_pos) hand_cmd[INSPIRE_R_THUMB_PITCH] = t_pitch hand_cmd[INSPIRE_R_THUMB_YAW] = t_yaw return hand_cmd def _finger_curl(self, positions, meta_idx, prox_idx, inter_idx, tip_idx): """Compute normalized finger curl value. Returns: 0.0 (open) to 1.0 (closed) matching Inspire DDS convention. """ v1 = positions[prox_idx] - positions[meta_idx] v2 = positions[inter_idx] - positions[prox_idx] v3 = positions[tip_idx] - positions[inter_idx] n1 = np.linalg.norm(v1) n2 = np.linalg.norm(v2) n3 = np.linalg.norm(v3) if n1 < 1e-6 or n2 < 1e-6 or n3 < 1e-6: return 0.0 # Angle between consecutive bone segments cos1 = np.clip(np.dot(v1 / n1, v2 / n2), -1.0, 1.0) cos2 = np.clip(np.dot(v2 / n2, v3 / n3), -1.0, 1.0) angle1 = np.arccos(cos1) angle2 = np.arccos(cos2) # Average curl angle: 0 (straight) to ~pi/2 (curled) avg_curl = (angle1 + angle2) / 2.0 # Normalize to [0, 1] where 1 = fully curled # Max curl is approximately 1.5 rad normalized = np.clip(avg_curl / 1.5, 0.0, 1.0) return normalized def _thumb_angles(self, positions): """Compute thumb pitch and yaw from hand positions. Returns: (pitch_normalized, yaw_normalized) both in [0, 1]. """ meta = positions[QH_THUMB_METACARPAL] prox = positions[QH_THUMB_PROXIMAL] tip = positions[QH_THUMB_TIP] v1 = prox - meta v2 = tip - prox n1 = np.linalg.norm(v1) n2 = np.linalg.norm(v2) if n1 < 1e-6 or n2 < 1e-6: return 0.0, 0.5 v1n = v1 / n1 v2n = v2 / n2 # Pitch: flexion angle between thumb bones cos_pitch = np.clip(np.dot(v1n, v2n), -1.0, 1.0) pitch_rad = np.arccos(cos_pitch) # Inspire thumb pitch range: [0.0, 0.5] rad # Normalize: 1.0 = max flexion (0.5 rad), 0.0 = straight (0 rad) pitch_norm = np.clip(pitch_rad / 0.5, 0.0, 1.0) # Yaw: abduction (spread) of thumb # Use the lateral component of the thumb direction relative to palm palm = positions[QH_PALM] index_meta = positions[QH_INDEX_METACARPAL] palm_to_index = index_meta - palm palm_to_index_n = np.linalg.norm(palm_to_index) if palm_to_index_n < 1e-6: return pitch_norm, 0.5 # Project thumb direction onto palm plane palm_dir = palm_to_index / palm_to_index_n thumb_dir = (tip - meta) thumb_dir_n = np.linalg.norm(thumb_dir) if thumb_dir_n < 1e-6: return pitch_norm, 0.5 thumb_dir = thumb_dir / thumb_dir_n # Yaw angle: how far the thumb points away from the index finger cos_yaw = np.clip(np.dot(thumb_dir, palm_dir), -1.0, 1.0) yaw_rad = np.arccos(cos_yaw) # Inspire thumb yaw range: [-0.1, 1.3] rad. Normalize to [0, 1]. yaw_norm = np.clip((yaw_rad - (-0.1)) / (1.3 - (-0.1)), 0.0, 1.0) return pitch_norm, yaw_norm # --------------------------------------------------------------------------- # DDS publisher # --------------------------------------------------------------------------- class DDSPublisher: """Publishes G1 joint commands and Inspire hand commands via CycloneDDS.""" def __init__(self, domain_id=1): from unitree_sdk2py.core.channel import ( ChannelPublisher, ChannelFactoryInitialize) from unitree_sdk2py.idl.unitree_hg.msg.dds_ import LowCmd_ from unitree_sdk2py.idl.default import unitree_hg_msg_dds__LowCmd_ from unitree_sdk2py.utils.crc import CRC from unitree_sdk2py.idl.unitree_go.msg.dds_ import MotorCmds_ from unitree_sdk2py.idl.default import unitree_go_msg_dds__MotorCmd_ logger.info(f"Initializing DDS (domain {domain_id})...") ChannelFactoryInitialize(domain_id) # Body joint publisher self._body_pub = ChannelPublisher("rt/lowcmd", LowCmd_) self._body_pub.Init() self._low_cmd = unitree_hg_msg_dds__LowCmd_() self._crc = CRC() self._MotorCmd_ = unitree_go_msg_dds__MotorCmd_ # Inspire hand publisher self._hand_pub = ChannelPublisher("rt/inspire/cmd", MotorCmds_) self._hand_pub.Init() self._MotorCmds_ = MotorCmds_ logger.info("DDS publishers initialized (rt/lowcmd, rt/inspire/cmd)") def publish_body(self, joint_angles): """Publish 29 joint angle commands via LowCmd_. Args: joint_angles: np.ndarray of shape (29,) in radians. """ cmd = self._low_cmd cmd.mode_pr = 0 cmd.mode_machine = 0 for i in range(NUM_JOINTS): cmd.motor_cmd[i].mode = 1 cmd.motor_cmd[i].q = float(joint_angles[i]) cmd.motor_cmd[i].dq = 0.0 cmd.motor_cmd[i].tau = 0.0 cmd.motor_cmd[i].kp = float(KP[i]) cmd.motor_cmd[i].kd = float(KD[i]) # Zero out legs (not retargeted -- leave to standing controller) for i in range(12): cmd.motor_cmd[i].mode = 0 cmd.motor_cmd[i].kp = 0.0 cmd.motor_cmd[i].kd = 0.0 cmd.crc = self._crc.Crc(cmd) self._body_pub.Write(cmd) def publish_hands(self, hand_values): """Publish 12 Inspire hand joint commands via MotorCmds_. Args: hand_values: np.ndarray of shape (12,) with normalized [0,1] values. """ hand_cmd = self._MotorCmds_() hand_cmd.cmds = [] for i in range(12): mc = self._MotorCmd_() mc.q = float(np.clip(hand_values[i], 0.0, 1.0)) mc.dq = 0.0 mc.tau = 0.0 mc.kp = 5.0 mc.kd = 0.5 hand_cmd.cmds.append(mc) self._hand_pub.Write(hand_cmd) # --------------------------------------------------------------------------- # Shared memory reader # --------------------------------------------------------------------------- def read_body_joints(server): """Read body joint rotation matrices from shared memory. Returns: dict mapping BJ_* index to 3x3 rotation matrix, or None if no data. """ with server.body_joints_shared.get_lock(): raw = np.array(server.body_joints_shared[:]) # Check if any data has arrived (all zeros = no data yet) if np.all(raw == 0): return None rotmats = {} for i in range(NUM_BODY_JOINTS): offset = i * 7 qx, qy, qz, qw = raw[offset+3], raw[offset+4], raw[offset+5], raw[offset+6] # Check for valid quaternion qnorm = qx*qx + qy*qy + qz*qz + qw*qw if qnorm < 0.5: return None # Invalid data rotmats[i] = quat_to_rotmat(qx, qy, qz, qw) return rotmats def read_hand_positions(server): """Read hand joint positions from shared memory. Returns: (left_25x3, right_25x3) numpy arrays in robot frame. """ with server.left_hand_position_shared.get_lock(): raw_left = np.array(server.left_hand_position_shared[:]) with server.right_hand_position_shared.get_lock(): raw_right = np.array(server.right_hand_position_shared[:]) left = np.zeros((25, 3)) right = np.zeros((25, 3)) if np.any(raw_left != 0): left_openxr = raw_left.reshape(25, 3) # Transform each position from OpenXR to robot frame left = (R_ROBOT_OPENXR @ left_openxr.T).T if np.any(raw_right != 0): right_openxr = raw_right.reshape(25, 3) right = (R_ROBOT_OPENXR @ right_openxr.T).T return left, right # --------------------------------------------------------------------------- # Main loop # --------------------------------------------------------------------------- def main(): parser = argparse.ArgumentParser( description="Retargeting bridge: Quest 3 body tracking -> G1 DDS commands") parser.add_argument("--port", type=int, default=8765, help="WebSocket server port (default: 8765)") parser.add_argument("--host", default="0.0.0.0", help="WebSocket bind address (default: 0.0.0.0)") parser.add_argument("--dds-domain", type=int, default=1, help="DDS domain ID (default: 1 for sim)") parser.add_argument("--hz", type=float, default=50.0, help="Publishing frequency in Hz (default: 50)") parser.add_argument("--alpha", type=float, default=0.3, help="Smoothing factor, 0=slow 1=instant (default: 0.3)") parser.add_argument("--no-hands", action="store_true", help="Disable hand retargeting") parser.add_argument("--verbose", action="store_true", help="Enable debug logging") args = parser.parse_args() logging.basicConfig( level=logging.DEBUG if args.verbose else logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s: %(message)s" ) # --- Start teleop server --- logger.info(f"Starting teleop server on ws://{args.host}:{args.port}") server = TeleopServer(host=args.host, port=args.port) server_thread = server.start_in_thread() # --- Initialize DDS --- dds = DDSPublisher(domain_id=args.dds_domain) # --- Initialize retargeters --- body_retargeter = BodyRetargeter() hand_retargeter = HandRetargeter() body_smoother = JointSmoother(NUM_JOINTS, alpha=args.alpha, max_delta=0.15) hand_smoother = JointSmoother(12, alpha=0.4, max_delta=0.2) # --- Deadman / timeout state --- tracking_timeout = 0.5 # seconds return_to_zero_rate = 0.02 # rad/step last_valid_time = 0.0 startup_ramp = 0.0 # 0.0 -> 1.0 over 2 seconds startup_ramp_duration = 2.0 startup_time = None # --- Control loop --- interval = 1.0 / args.hz running = True def signal_handler(sig, frame): nonlocal running running = False signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) logger.info(f"Bridge running at {args.hz} Hz. Waiting for Quest 3 connection...") step_count = 0 current_angles = np.zeros(NUM_JOINTS) current_hands = np.zeros(12) while running: t_start = time.perf_counter() # Read body tracking data body_rotmats = read_body_joints(server) if body_rotmats is not None: now = time.time() with server.last_data_time_shared.get_lock(): data_time = server.last_data_time_shared.value if data_time > 0: last_valid_time = now # Retarget body raw_angles = body_retargeter.retarget(body_rotmats) smoothed_angles = body_smoother.smooth(raw_angles) # Startup ramp if startup_time is None: startup_time = now logger.info("First tracking data received -- starting ramp-up") elapsed = now - startup_time startup_ramp = min(elapsed / startup_ramp_duration, 1.0) current_angles = smoothed_angles * startup_ramp # Retarget hands if not args.no_hands: left_hand, right_hand = read_hand_positions(server) raw_hands = hand_retargeter.retarget(left_hand, right_hand) current_hands = hand_smoother.smooth(raw_hands) * startup_ramp else: # No data -- check timeout now = time.time() if last_valid_time > 0 and (now - last_valid_time) > tracking_timeout: # Gradually return to zero for i in range(12, NUM_JOINTS): if abs(current_angles[i]) > return_to_zero_rate: current_angles[i] -= np.sign(current_angles[i]) * return_to_zero_rate else: current_angles[i] = 0.0 # Return hands to open current_hands = np.maximum(current_hands - 0.02, 0.0) # Publish dds.publish_body(current_angles) if not args.no_hands: dds.publish_hands(current_hands) # Status logging step_count += 1 if step_count % int(args.hz * 5) == 0: # Every 5 seconds status = "TRACKING" if body_rotmats is not None else "WAITING" ramp_pct = int(startup_ramp * 100) logger.info(f"[{status}] step={step_count} ramp={ramp_pct}% " f"waist=[{current_angles[12]:.2f},{current_angles[13]:.2f},{current_angles[14]:.2f}] " f"L_shoulder=[{current_angles[15]:.2f},{current_angles[16]:.2f},{current_angles[17]:.2f}] " f"R_shoulder=[{current_angles[22]:.2f},{current_angles[23]:.2f},{current_angles[24]:.2f}]") # Sleep to maintain target frequency elapsed = time.perf_counter() - t_start sleep_time = interval - elapsed if sleep_time > 0: time.sleep(sleep_time) logger.info("Bridge shutting down") if __name__ == "__main__": main()