You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
344 lines
15 KiB
344 lines
15 KiB
"""
|
|
Native TeleVuerWrapper — drop-in replacement for TeleVuerWrapper that uses
|
|
the Quest 3 native app via TeleopServer instead of Vuer/browser.
|
|
|
|
Returns TeleData in the same format as the original TeleVuerWrapper, but:
|
|
- Wrist poses arrive chest-relative from body tracking (no head subtraction needed)
|
|
- Chest orientation is available for body rotation decoupling
|
|
- No SSL, no browser, no Vuer dependency
|
|
|
|
The coordinate transform pipeline is preserved:
|
|
1. Quest 3 sends poses in Godot's coordinate system (Y-up, -Z forward)
|
|
2. This wrapper converts to robot convention (X-forward, Y-left, Z-up)
|
|
3. Applies Unitree arm URDF convention transforms
|
|
4. Applies head-to-waist offset (same as original)
|
|
|
|
Usage:
|
|
# Instead of:
|
|
# tv_wrapper = TeleVuerWrapper(use_hand_tracking=True, ...)
|
|
# Use:
|
|
# tv_wrapper = NativeTeleWrapper(port=8765)
|
|
# tv_wrapper.start() # Starts WebSocket server in background
|
|
"""
|
|
|
|
import numpy as np
|
|
import time
|
|
import threading
|
|
from multiprocessing import Array, Value
|
|
from dataclasses import dataclass, field
|
|
from typing import Optional
|
|
|
|
from teleop_server import TeleopServer
|
|
|
|
|
|
# --- Coordinate Transform Constants ---
|
|
# Godot coordinate system: Y-up, -Z forward, X-right
|
|
# This is equivalent to OpenXR convention.
|
|
# Robot convention: X-forward, Y-left, Z-up
|
|
|
|
# Basis change: OpenXR (Y-up, -Z fwd) → Robot (Z-up, X fwd)
|
|
T_ROBOT_OPENXR = np.array([[ 0, 0,-1, 0],
|
|
[-1, 0, 0, 0],
|
|
[ 0, 1, 0, 0],
|
|
[ 0, 0, 0, 1]], dtype=np.float64)
|
|
|
|
T_OPENXR_ROBOT = np.array([[ 0,-1, 0, 0],
|
|
[ 0, 0, 1, 0],
|
|
[-1, 0, 0, 0],
|
|
[ 0, 0, 0, 1]], dtype=np.float64)
|
|
|
|
R_ROBOT_OPENXR = T_ROBOT_OPENXR[:3, :3]
|
|
R_OPENXR_ROBOT = T_OPENXR_ROBOT[:3, :3]
|
|
|
|
# Arm initial pose convention transforms
|
|
T_TO_UNITREE_HUMANOID_LEFT_ARM = np.array([[1, 0, 0, 0],
|
|
[0, 0,-1, 0],
|
|
[0, 1, 0, 0],
|
|
[0, 0, 0, 1]], dtype=np.float64)
|
|
|
|
T_TO_UNITREE_HUMANOID_RIGHT_ARM = np.array([[1, 0, 0, 0],
|
|
[0, 0, 1, 0],
|
|
[0,-1, 0, 0],
|
|
[0, 0, 0, 1]], dtype=np.float64)
|
|
|
|
# Hand convention transform
|
|
T_TO_UNITREE_HAND = np.array([[0, 0, 1, 0],
|
|
[-1, 0, 0, 0],
|
|
[0, -1, 0, 0],
|
|
[0, 0, 0, 1]], dtype=np.float64)
|
|
|
|
# Default poses (fallback when no data)
|
|
CONST_HEAD_POSE = np.array([[1, 0, 0, 0],
|
|
[0, 1, 0, 1.5],
|
|
[0, 0, 1, -0.2],
|
|
[0, 0, 0, 1]], dtype=np.float64)
|
|
|
|
CONST_LEFT_ARM_POSE = np.array([[1, 0, 0, -0.15],
|
|
[0, 1, 0, 1.13],
|
|
[0, 0, 1, -0.3],
|
|
[0, 0, 0, 1]], dtype=np.float64)
|
|
|
|
CONST_RIGHT_ARM_POSE = np.array([[1, 0, 0, 0.15],
|
|
[0, 1, 0, 1.13],
|
|
[0, 0, 1, -0.3],
|
|
[0, 0, 0, 1]], dtype=np.float64)
|
|
|
|
|
|
def fast_mat_inv(mat):
|
|
"""Fast SE(3) inverse using rotation transpose."""
|
|
ret = np.eye(4)
|
|
ret[:3, :3] = mat[:3, :3].T
|
|
ret[:3, 3] = -mat[:3, :3].T @ mat[:3, 3]
|
|
return ret
|
|
|
|
|
|
def safe_mat_update(prev_mat, mat):
|
|
"""Return previous matrix if new matrix is singular."""
|
|
det = np.linalg.det(mat)
|
|
if not np.isfinite(det) or np.isclose(det, 0.0, atol=1e-6):
|
|
return prev_mat, False
|
|
return mat, True
|
|
|
|
|
|
@dataclass
|
|
class TeleData:
|
|
"""Matches the TeleData from tv_wrapper.py exactly."""
|
|
head_pose: np.ndarray # (4,4) SE(3) pose of head
|
|
left_wrist_pose: np.ndarray # (4,4) SE(3) pose of left wrist (IK frame)
|
|
right_wrist_pose: np.ndarray # (4,4) SE(3) pose of right wrist (IK frame)
|
|
chest_pose: np.ndarray = None # (4,4) SE(3) NEW: chest orientation
|
|
# Hand tracking
|
|
left_hand_pos: np.ndarray = None # (25,3) 3D positions of left hand joints
|
|
right_hand_pos: np.ndarray = None # (25,3) 3D positions of right hand joints
|
|
left_hand_rot: np.ndarray = None # (25,3,3) rotation matrices
|
|
right_hand_rot: np.ndarray = None # (25,3,3) rotation matrices
|
|
# Hand state (computed from finger positions)
|
|
left_hand_pinch: bool = False
|
|
left_hand_pinchValue: float = 10.0
|
|
left_hand_squeeze: bool = False
|
|
left_hand_squeezeValue: float = 0.0
|
|
right_hand_pinch: bool = False
|
|
right_hand_pinchValue: float = 10.0
|
|
right_hand_squeeze: bool = False
|
|
right_hand_squeezeValue: float = 0.0
|
|
|
|
|
|
class NativeTeleWrapper:
|
|
"""Drop-in replacement for TeleVuerWrapper using Quest 3 native app.
|
|
|
|
Matches the Unitree TeleVuerWrapper pipeline exactly:
|
|
1. Wrist poses arrive chest-relative from body tracking (Godot computes chest_inv * wrist)
|
|
2. We reconstruct world-space wrist poses: chest * chest_relative
|
|
3. Apply basis change (OpenXR -> Robot convention)
|
|
4. Apply Unitree arm URDF convention transforms
|
|
5. Subtract HEAD position (translation only — rotation stays world-space)
|
|
6. Add head-to-waist offset (matches Unitree: 0.15 x, 0.45 z)
|
|
"""
|
|
|
|
def __init__(self, port: int = 8765, host: str = "0.0.0.0",
|
|
head_to_waist_x: float = 0.15,
|
|
head_to_waist_z: float = 0.45):
|
|
"""
|
|
Args:
|
|
port: WebSocket server port
|
|
host: WebSocket bind address
|
|
head_to_waist_x: Forward offset from head to IK solver origin (meters).
|
|
Matches Unitree default: 0.15.
|
|
head_to_waist_z: Vertical offset from head to IK solver origin (meters).
|
|
Matches Unitree default: 0.45.
|
|
"""
|
|
self.server = TeleopServer(host=host, port=port)
|
|
self._server_thread = None
|
|
self._head_to_waist_x = head_to_waist_x
|
|
self._head_to_waist_z = head_to_waist_z
|
|
|
|
def start(self):
|
|
"""Start the WebSocket server in a background thread."""
|
|
self._server_thread = self.server.start_in_thread()
|
|
|
|
def get_tele_data(self) -> TeleData:
|
|
"""Get processed motion data, transformed to robot convention.
|
|
|
|
Pipeline matches Unitree's TeleVuerWrapper (televuer/tv_wrapper.py):
|
|
1. Reconstruct world-space wrist from chest-relative data
|
|
2. Basis change (OpenXR -> Robot)
|
|
3. Arm convention transform
|
|
4. Head subtraction (translation only — rotation stays world-space)
|
|
5. Head-to-waist offset
|
|
"""
|
|
# Read raw poses from shared memory (column-major 4x4)
|
|
with self.server.head_pose_shared.get_lock():
|
|
head_raw = np.array(self.server.head_pose_shared[:]).reshape(4, 4, order='F')
|
|
with self.server.chest_pose_shared.get_lock():
|
|
chest_raw = np.array(self.server.chest_pose_shared[:]).reshape(4, 4, order='F')
|
|
with self.server.left_arm_pose_shared.get_lock():
|
|
left_wrist_chest_rel = np.array(self.server.left_arm_pose_shared[:]).reshape(4, 4, order='F')
|
|
with self.server.right_arm_pose_shared.get_lock():
|
|
right_wrist_chest_rel = np.array(self.server.right_arm_pose_shared[:]).reshape(4, 4, order='F')
|
|
|
|
# Validate matrices
|
|
head_raw, head_valid = safe_mat_update(CONST_HEAD_POSE, head_raw)
|
|
chest_raw, chest_valid = safe_mat_update(np.eye(4), chest_raw)
|
|
left_wrist_chest_rel, left_valid = safe_mat_update(np.eye(4), left_wrist_chest_rel)
|
|
right_wrist_chest_rel, right_valid = safe_mat_update(np.eye(4), right_wrist_chest_rel)
|
|
|
|
# --- Reconstruct world-space wrist poses ---
|
|
# body_tracker.gd sends: left_wrist_rel = chest_inv * left_wrist_world
|
|
# So: left_wrist_world = chest * left_wrist_rel
|
|
left_wrist_world = chest_raw @ left_wrist_chest_rel
|
|
right_wrist_world = chest_raw @ right_wrist_chest_rel
|
|
|
|
# Validate reconstructed world-space poses
|
|
left_wrist_world, left_valid = safe_mat_update(CONST_LEFT_ARM_POSE, left_wrist_world)
|
|
right_wrist_world, right_valid = safe_mat_update(CONST_RIGHT_ARM_POSE, right_wrist_world)
|
|
|
|
# --- Basis change: OpenXR -> Robot convention ---
|
|
Brobot_world_head = T_ROBOT_OPENXR @ head_raw @ T_OPENXR_ROBOT
|
|
Brobot_world_chest = T_ROBOT_OPENXR @ chest_raw @ T_OPENXR_ROBOT
|
|
left_Brobot_world = T_ROBOT_OPENXR @ left_wrist_world @ T_OPENXR_ROBOT
|
|
right_Brobot_world = T_ROBOT_OPENXR @ right_wrist_world @ T_OPENXR_ROBOT
|
|
|
|
# --- Arm URDF convention transform ---
|
|
left_IPunitree = left_Brobot_world @ (T_TO_UNITREE_HUMANOID_LEFT_ARM if left_valid else np.eye(4))
|
|
right_IPunitree = right_Brobot_world @ (T_TO_UNITREE_HUMANOID_RIGHT_ARM if right_valid else np.eye(4))
|
|
|
|
# --- Head subtraction (TRANSLATION ONLY, matching Unitree tv_wrapper.py) ---
|
|
# The rotation stays in world-space, which is what the IK solver expects.
|
|
# Only the position is made head-relative.
|
|
left_wrist_final = left_IPunitree.copy()
|
|
right_wrist_final = right_IPunitree.copy()
|
|
left_wrist_final[0:3, 3] -= Brobot_world_head[0:3, 3]
|
|
right_wrist_final[0:3, 3] -= Brobot_world_head[0:3, 3]
|
|
|
|
# --- Head-to-waist offset (matching Unitree: x=0.15, z=0.45) ---
|
|
left_wrist_final[0, 3] += self._head_to_waist_x # x (forward)
|
|
right_wrist_final[0, 3] += self._head_to_waist_x
|
|
left_wrist_final[2, 3] += self._head_to_waist_z # z (up)
|
|
right_wrist_final[2, 3] += self._head_to_waist_z
|
|
|
|
# --- Hand positions ---
|
|
left_hand_pos = np.zeros((25, 3))
|
|
right_hand_pos = np.zeros((25, 3))
|
|
|
|
if left_valid and right_valid:
|
|
with self.server.left_hand_position_shared.get_lock():
|
|
raw_left = np.array(self.server.left_hand_position_shared[:])
|
|
with self.server.right_hand_position_shared.get_lock():
|
|
raw_right = np.array(self.server.right_hand_position_shared[:])
|
|
|
|
if np.any(raw_left != 0):
|
|
# Hand positions are wrist-relative from Quest 3, in OpenXR coords.
|
|
# Must apply basis change (OpenXR→Robot) before hand convention transform.
|
|
# Derivation: original code does T_TO_UNITREE_HAND @ inv(arm_robot) @ (T_ROBOT_OPENXR @ world_pos)
|
|
# which simplifies to T_TO_UNITREE_HAND @ T_ROBOT_OPENXR @ wrist_relative_openxr
|
|
left_pos_25x3 = raw_left.reshape(25, 3)
|
|
left_hom = np.concatenate([left_pos_25x3.T, np.ones((1, 25))])
|
|
left_hand_pos = (T_TO_UNITREE_HAND @ T_ROBOT_OPENXR @ left_hom)[0:3, :].T
|
|
|
|
if np.any(raw_right != 0):
|
|
right_pos_25x3 = raw_right.reshape(25, 3)
|
|
right_hom = np.concatenate([right_pos_25x3.T, np.ones((1, 25))])
|
|
right_hand_pos = (T_TO_UNITREE_HAND @ T_ROBOT_OPENXR @ right_hom)[0:3, :].T
|
|
|
|
# --- Hand rotations ---
|
|
left_hand_rot = None
|
|
right_hand_rot = None
|
|
# TODO: Transform hand rotations if needed for dexterous hand control
|
|
|
|
# --- Pinch/squeeze from finger distances ---
|
|
# Compute pinch from thumb-tip to index-tip distance
|
|
left_pinch_val, left_pinch = _compute_pinch(left_hand_pos)
|
|
right_pinch_val, right_pinch = _compute_pinch(right_hand_pos)
|
|
left_squeeze_val, left_squeeze = _compute_squeeze(left_hand_pos)
|
|
right_squeeze_val, right_squeeze = _compute_squeeze(right_hand_pos)
|
|
|
|
return TeleData(
|
|
head_pose=Brobot_world_head,
|
|
left_wrist_pose=left_wrist_final,
|
|
right_wrist_pose=right_wrist_final,
|
|
chest_pose=Brobot_world_chest,
|
|
left_hand_pos=left_hand_pos,
|
|
right_hand_pos=right_hand_pos,
|
|
left_hand_rot=left_hand_rot,
|
|
right_hand_rot=right_hand_rot,
|
|
left_hand_pinch=left_pinch,
|
|
left_hand_pinchValue=left_pinch_val,
|
|
left_hand_squeeze=left_squeeze,
|
|
left_hand_squeezeValue=left_squeeze_val,
|
|
right_hand_pinch=right_pinch,
|
|
right_hand_pinchValue=right_pinch_val,
|
|
right_hand_squeeze=right_squeeze,
|
|
right_hand_squeezeValue=right_squeeze_val,
|
|
)
|
|
|
|
def render_to_xr(self, img):
|
|
"""Send a webcam frame to the Quest 3 app.
|
|
|
|
Args:
|
|
img: BGR numpy array (OpenCV format). Will be JPEG-encoded and sent.
|
|
"""
|
|
import cv2
|
|
# Encode as JPEG
|
|
_, jpeg_buf = cv2.imencode('.jpg', img, [cv2.IMWRITE_JPEG_QUALITY, 70])
|
|
self.server.set_webcam_frame(jpeg_buf.tobytes())
|
|
|
|
def close(self):
|
|
"""Shutdown the server."""
|
|
# The server thread is daemonic, it will stop when the process exits.
|
|
pass
|
|
|
|
@property
|
|
def has_client(self) -> bool:
|
|
"""Check if a Quest 3 client is connected."""
|
|
return len(self.server._clients) > 0
|
|
|
|
@property
|
|
def last_data_time(self) -> float:
|
|
"""Timestamp of last received tracking data."""
|
|
with self.server.last_data_time_shared.get_lock():
|
|
return self.server.last_data_time_shared.value
|
|
|
|
|
|
def _compute_pinch(hand_pos_25x3: np.ndarray) -> tuple:
|
|
"""Compute pinch state from thumb tip (4) and index tip (8) distance.
|
|
|
|
Returns (pinchValue, is_pinching).
|
|
pinchValue: ~15.0 (open) → 0.0 (pinched), scaled to match Vuer convention * 100.
|
|
"""
|
|
if np.all(hand_pos_25x3 == 0):
|
|
return 10.0, False
|
|
|
|
# OpenXR hand joint indices: thumb tip = 4, index tip = 8
|
|
thumb_tip = hand_pos_25x3[4]
|
|
index_tip = hand_pos_25x3[8]
|
|
dist = np.linalg.norm(thumb_tip - index_tip)
|
|
|
|
# Map distance to pinch value (meters → normalized)
|
|
# Typical range: 0.0 (touching) to 0.10 (fully open)
|
|
pinch_value = min(dist * 150.0, 15.0) # Scale to ~0-15 range
|
|
is_pinching = dist < 0.025 # 2.5cm threshold
|
|
|
|
return pinch_value * 100.0, is_pinching # Match TeleVuerWrapper scaling
|
|
|
|
|
|
def _compute_squeeze(hand_pos_25x3: np.ndarray) -> tuple:
|
|
"""Compute squeeze (fist) state from average finger curl.
|
|
|
|
Returns (squeezeValue, is_squeezing).
|
|
squeezeValue: 0.0 (open) → 1.0 (fist).
|
|
"""
|
|
if np.all(hand_pos_25x3 == 0):
|
|
return 0.0, False
|
|
|
|
# Measure distance from fingertips to palm center
|
|
# Joint indices: palm=0, index_tip=8, middle_tip=12, ring_tip=16, pinky_tip=20
|
|
palm = hand_pos_25x3[0]
|
|
tips = hand_pos_25x3[[8, 12, 16, 20]]
|
|
dists = np.linalg.norm(tips - palm, axis=1)
|
|
avg_dist = np.mean(dists)
|
|
|
|
# Map: ~0.02m (fist) → 1.0, ~0.10m (open) → 0.0
|
|
squeeze_value = np.clip(1.0 - (avg_dist - 0.02) / 0.08, 0.0, 1.0)
|
|
is_squeezing = squeeze_value > 0.7
|
|
|
|
return squeeze_value, is_squeezing
|