Browse Source

Add native Quest 3 teleop support (--native flag)

- teleop_server.py: lightweight WebSocket server replacing Vuer (~250 lines)
- native_tv_wrapper.py: drop-in TeleVuerWrapper replacement using chest-relative body tracking
- teleop_hand_and_arm.py: --native/--native-port args, skip head pos compensation in native mode

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
main
Joe DiPrima 1 month ago
parent
commit
536cb10591
  1. 330
      teleop/native_tv_wrapper.py
  2. 26
      teleop/teleop_hand_and_arm.py
  3. 315
      teleop/teleop_server.py

330
teleop/native_tv_wrapper.py

@ -0,0 +1,330 @@
"""
Native TeleVuerWrapper drop-in replacement for TeleVuerWrapper that uses
the Quest 3 native app via TeleopServer instead of Vuer/browser.
Returns TeleData in the same format as the original TeleVuerWrapper, but:
- Wrist poses arrive chest-relative from body tracking (no head subtraction needed)
- Chest orientation is available for body rotation decoupling
- No SSL, no browser, no Vuer dependency
The coordinate transform pipeline is preserved:
1. Quest 3 sends poses in Godot's coordinate system (Y-up, -Z forward)
2. This wrapper converts to robot convention (X-forward, Y-left, Z-up)
3. Applies Unitree arm URDF convention transforms
4. Applies head-to-waist offset (same as original)
Usage:
# Instead of:
# tv_wrapper = TeleVuerWrapper(use_hand_tracking=True, ...)
# Use:
# tv_wrapper = NativeTeleWrapper(port=8765)
# tv_wrapper.start() # Starts WebSocket server in background
"""
import numpy as np
import time
import threading
from multiprocessing import Array, Value
from dataclasses import dataclass, field
from typing import Optional
from teleop_server import TeleopServer
# --- Coordinate Transform Constants ---
# Godot coordinate system: Y-up, -Z forward, X-right
# This is equivalent to OpenXR convention.
# Robot convention: X-forward, Y-left, Z-up
# Basis change: OpenXR (Y-up, -Z fwd) → Robot (Z-up, X fwd)
T_ROBOT_OPENXR = np.array([[ 0, 0,-1, 0],
[-1, 0, 0, 0],
[ 0, 1, 0, 0],
[ 0, 0, 0, 1]], dtype=np.float64)
T_OPENXR_ROBOT = np.array([[ 0,-1, 0, 0],
[ 0, 0, 1, 0],
[-1, 0, 0, 0],
[ 0, 0, 0, 1]], dtype=np.float64)
R_ROBOT_OPENXR = T_ROBOT_OPENXR[:3, :3]
R_OPENXR_ROBOT = T_OPENXR_ROBOT[:3, :3]
# Arm initial pose convention transforms
T_TO_UNITREE_HUMANOID_LEFT_ARM = np.array([[1, 0, 0, 0],
[0, 0,-1, 0],
[0, 1, 0, 0],
[0, 0, 0, 1]], dtype=np.float64)
T_TO_UNITREE_HUMANOID_RIGHT_ARM = np.array([[1, 0, 0, 0],
[0, 0, 1, 0],
[0,-1, 0, 0],
[0, 0, 0, 1]], dtype=np.float64)
# Hand convention transform
T_TO_UNITREE_HAND = np.array([[0, 0, 1, 0],
[-1, 0, 0, 0],
[0, -1, 0, 0],
[0, 0, 0, 1]], dtype=np.float64)
# Default poses (fallback when no data)
CONST_HEAD_POSE = np.array([[1, 0, 0, 0],
[0, 1, 0, 1.5],
[0, 0, 1, -0.2],
[0, 0, 0, 1]], dtype=np.float64)
CONST_LEFT_ARM_POSE = np.array([[1, 0, 0, -0.15],
[0, 1, 0, 1.13],
[0, 0, 1, -0.3],
[0, 0, 0, 1]], dtype=np.float64)
CONST_RIGHT_ARM_POSE = np.array([[1, 0, 0, 0.15],
[0, 1, 0, 1.13],
[0, 0, 1, -0.3],
[0, 0, 0, 1]], dtype=np.float64)
def fast_mat_inv(mat):
"""Fast SE(3) inverse using rotation transpose."""
ret = np.eye(4)
ret[:3, :3] = mat[:3, :3].T
ret[:3, 3] = -mat[:3, :3].T @ mat[:3, 3]
return ret
def safe_mat_update(prev_mat, mat):
"""Return previous matrix if new matrix is singular."""
det = np.linalg.det(mat)
if not np.isfinite(det) or np.isclose(det, 0.0, atol=1e-6):
return prev_mat, False
return mat, True
@dataclass
class TeleData:
"""Matches the TeleData from tv_wrapper.py exactly."""
head_pose: np.ndarray # (4,4) SE(3) pose of head
left_wrist_pose: np.ndarray # (4,4) SE(3) pose of left wrist (IK frame)
right_wrist_pose: np.ndarray # (4,4) SE(3) pose of right wrist (IK frame)
chest_pose: np.ndarray = None # (4,4) SE(3) NEW: chest orientation
# Hand tracking
left_hand_pos: np.ndarray = None # (25,3) 3D positions of left hand joints
right_hand_pos: np.ndarray = None # (25,3) 3D positions of right hand joints
left_hand_rot: np.ndarray = None # (25,3,3) rotation matrices
right_hand_rot: np.ndarray = None # (25,3,3) rotation matrices
# Hand state (computed from finger positions)
left_hand_pinch: bool = False
left_hand_pinchValue: float = 10.0
left_hand_squeeze: bool = False
left_hand_squeezeValue: float = 0.0
right_hand_pinch: bool = False
right_hand_pinchValue: float = 10.0
right_hand_squeeze: bool = False
right_hand_squeezeValue: float = 0.0
class NativeTeleWrapper:
"""Drop-in replacement for TeleVuerWrapper using Quest 3 native app.
The key difference: wrist poses from body tracking are chest-relative,
so the head position subtraction in the original TeleVuerWrapper is
unnecessary. Instead, we use chest-relative data directly.
"""
def __init__(self, port: int = 8765, host: str = "0.0.0.0",
chest_to_waist_x: float = 0.15,
chest_to_waist_z: float = 0.28):
"""
Args:
port: WebSocket server port
host: WebSocket bind address
chest_to_waist_x: Forward offset from chest to IK solver origin (meters).
Original head-to-waist was 0.15; chest is slightly forward
of head, so this may need tuning.
chest_to_waist_z: Vertical offset from chest to IK solver origin (meters).
Original head-to-waist was 0.45; chest-to-waist is shorter
(~0.25-0.30). Default 0.28 is a starting estimate.
"""
self.server = TeleopServer(host=host, port=port)
self._server_thread = None
self._chest_to_waist_x = chest_to_waist_x
self._chest_to_waist_z = chest_to_waist_z
def start(self):
"""Start the WebSocket server in a background thread."""
self._server_thread = self.server.start_in_thread()
def get_tele_data(self) -> TeleData:
"""Get processed motion data, transformed to robot convention.
Returns TeleData compatible with teleop_hand_and_arm.py.
"""
# Read raw poses from shared memory (column-major 4x4)
with self.server.head_pose_shared.get_lock():
head_raw = np.array(self.server.head_pose_shared[:]).reshape(4, 4, order='F')
with self.server.chest_pose_shared.get_lock():
chest_raw = np.array(self.server.chest_pose_shared[:]).reshape(4, 4, order='F')
with self.server.left_arm_pose_shared.get_lock():
left_wrist_raw = np.array(self.server.left_arm_pose_shared[:]).reshape(4, 4, order='F')
with self.server.right_arm_pose_shared.get_lock():
right_wrist_raw = np.array(self.server.right_arm_pose_shared[:]).reshape(4, 4, order='F')
# Validate matrices
head_raw, head_valid = safe_mat_update(CONST_HEAD_POSE, head_raw)
left_wrist_raw, left_valid = safe_mat_update(CONST_LEFT_ARM_POSE, left_wrist_raw)
right_wrist_raw, right_valid = safe_mat_update(CONST_RIGHT_ARM_POSE, right_wrist_raw)
# --- Transform from Godot/OpenXR convention to Robot convention ---
# Head: basis change (world-space pose)
Brobot_world_head = T_ROBOT_OPENXR @ head_raw @ T_OPENXR_ROBOT
# Chest: basis change (world-space pose)
Brobot_world_chest = T_ROBOT_OPENXR @ chest_raw @ T_OPENXR_ROBOT
# Wrist poses: these are CHEST-RELATIVE from body tracking.
# Apply basis change to chest-relative poses.
left_Brobot = T_ROBOT_OPENXR @ left_wrist_raw @ T_OPENXR_ROBOT
right_Brobot = T_ROBOT_OPENXR @ right_wrist_raw @ T_OPENXR_ROBOT
# Apply Unitree arm URDF convention transforms
left_IPunitree = left_Brobot @ (T_TO_UNITREE_HUMANOID_LEFT_ARM if left_valid else np.eye(4))
right_IPunitree = right_Brobot @ (T_TO_UNITREE_HUMANOID_RIGHT_ARM if right_valid else np.eye(4))
# CHEST-TO-WAIST offset: the IK solver origin is near the waist.
# Body tracking gives us chest-relative poses. The chest is lower
# than the head, so the vertical offset is smaller than the original
# head-to-waist offset (0.45). Configurable via constructor args.
left_wrist_final = left_IPunitree.copy()
right_wrist_final = right_IPunitree.copy()
left_wrist_final[0, 3] += self._chest_to_waist_x # x (forward)
right_wrist_final[0, 3] += self._chest_to_waist_x
left_wrist_final[2, 3] += self._chest_to_waist_z # z (up)
right_wrist_final[2, 3] += self._chest_to_waist_z
# --- Hand positions ---
left_hand_pos = np.zeros((25, 3))
right_hand_pos = np.zeros((25, 3))
if left_valid and right_valid:
with self.server.left_hand_position_shared.get_lock():
raw_left = np.array(self.server.left_hand_position_shared[:])
with self.server.right_hand_position_shared.get_lock():
raw_right = np.array(self.server.right_hand_position_shared[:])
if np.any(raw_left != 0):
# Hand positions are wrist-relative from Quest 3, in OpenXR coords.
# Must apply basis change (OpenXR→Robot) before hand convention transform.
# Derivation: original code does T_TO_UNITREE_HAND @ inv(arm_robot) @ (T_ROBOT_OPENXR @ world_pos)
# which simplifies to T_TO_UNITREE_HAND @ T_ROBOT_OPENXR @ wrist_relative_openxr
left_pos_25x3 = raw_left.reshape(25, 3)
left_hom = np.concatenate([left_pos_25x3.T, np.ones((1, 25))])
left_hand_pos = (T_TO_UNITREE_HAND @ T_ROBOT_OPENXR @ left_hom)[0:3, :].T
if np.any(raw_right != 0):
right_pos_25x3 = raw_right.reshape(25, 3)
right_hom = np.concatenate([right_pos_25x3.T, np.ones((1, 25))])
right_hand_pos = (T_TO_UNITREE_HAND @ T_ROBOT_OPENXR @ right_hom)[0:3, :].T
# --- Hand rotations ---
left_hand_rot = None
right_hand_rot = None
# TODO: Transform hand rotations if needed for dexterous hand control
# --- Pinch/squeeze from finger distances ---
# Compute pinch from thumb-tip to index-tip distance
left_pinch_val, left_pinch = _compute_pinch(left_hand_pos)
right_pinch_val, right_pinch = _compute_pinch(right_hand_pos)
left_squeeze_val, left_squeeze = _compute_squeeze(left_hand_pos)
right_squeeze_val, right_squeeze = _compute_squeeze(right_hand_pos)
return TeleData(
head_pose=Brobot_world_head,
left_wrist_pose=left_wrist_final,
right_wrist_pose=right_wrist_final,
chest_pose=Brobot_world_chest,
left_hand_pos=left_hand_pos,
right_hand_pos=right_hand_pos,
left_hand_rot=left_hand_rot,
right_hand_rot=right_hand_rot,
left_hand_pinch=left_pinch,
left_hand_pinchValue=left_pinch_val,
left_hand_squeeze=left_squeeze,
left_hand_squeezeValue=left_squeeze_val,
right_hand_pinch=right_pinch,
right_hand_pinchValue=right_pinch_val,
right_hand_squeeze=right_squeeze,
right_hand_squeezeValue=right_squeeze_val,
)
def render_to_xr(self, img):
"""Send a webcam frame to the Quest 3 app.
Args:
img: BGR numpy array (OpenCV format). Will be JPEG-encoded and sent.
"""
import cv2
# Encode as JPEG
_, jpeg_buf = cv2.imencode('.jpg', img, [cv2.IMWRITE_JPEG_QUALITY, 70])
self.server.set_webcam_frame(jpeg_buf.tobytes())
def close(self):
"""Shutdown the server."""
# The server thread is daemonic, it will stop when the process exits.
pass
@property
def has_client(self) -> bool:
"""Check if a Quest 3 client is connected."""
return len(self.server._clients) > 0
@property
def last_data_time(self) -> float:
"""Timestamp of last received tracking data."""
with self.server.last_data_time_shared.get_lock():
return self.server.last_data_time_shared.value
def _compute_pinch(hand_pos_25x3: np.ndarray) -> tuple:
"""Compute pinch state from thumb tip (4) and index tip (8) distance.
Returns (pinchValue, is_pinching).
pinchValue: ~15.0 (open) 0.0 (pinched), scaled to match Vuer convention * 100.
"""
if np.all(hand_pos_25x3 == 0):
return 10.0, False
# OpenXR hand joint indices: thumb tip = 4, index tip = 8
thumb_tip = hand_pos_25x3[4]
index_tip = hand_pos_25x3[8]
dist = np.linalg.norm(thumb_tip - index_tip)
# Map distance to pinch value (meters → normalized)
# Typical range: 0.0 (touching) to 0.10 (fully open)
pinch_value = min(dist * 150.0, 15.0) # Scale to ~0-15 range
is_pinching = dist < 0.025 # 2.5cm threshold
return pinch_value * 100.0, is_pinching # Match TeleVuerWrapper scaling
def _compute_squeeze(hand_pos_25x3: np.ndarray) -> tuple:
"""Compute squeeze (fist) state from average finger curl.
Returns (squeezeValue, is_squeezing).
squeezeValue: 0.0 (open) 1.0 (fist).
"""
if np.all(hand_pos_25x3 == 0):
return 0.0, False
# Measure distance from fingertips to palm center
# Joint indices: palm=0, index_tip=8, middle_tip=12, ring_tip=16, pinky_tip=20
palm = hand_pos_25x3[0]
tips = hand_pos_25x3[[8, 12, 16, 20]]
dists = np.linalg.norm(tips - palm, axis=1)
avg_dist = np.mean(dists)
# Map: ~0.02m (fist) → 1.0, ~0.10m (open) → 0.0
squeeze_value = np.clip(1.0 - (avg_dist - 0.02) / 0.08, 0.0, 1.0)
is_squeezing = squeeze_value > 0.7
return squeeze_value, is_squeezing

26
teleop/teleop_hand_and_arm.py

@ -18,6 +18,7 @@ sys.path.append(parent_dir)
from unitree_sdk2py.core.channel import ChannelFactoryInitialize # dds
from televuer import TeleVuerWrapper
from native_tv_wrapper import NativeTeleWrapper
from teleop.robot_control.robot_arm import G1_29_ArmController, G1_23_ArmController, H1_2_ArmController, H1_ArmController
from teleop.robot_control.robot_arm_ik import G1_29_ArmIK, G1_23_ArmIK, H1_2_ArmIK, H1_ArmIK
from teleimager.image_client import ImageClient
@ -211,6 +212,11 @@ if __name__ == '__main__':
parser.add_argument('--diag-log', action='store_true', default=False,
help='Write per-frame diagnostic CSV to /tmp/teleop_diag.csv')
# webcam
# native Quest 3 app mode
parser.add_argument('--native', action='store_true',
help='Use native Quest 3 app (body tracking) instead of Vuer/browser')
parser.add_argument('--native-port', type=int, default=8765,
help='WebSocket port for native Quest 3 app (default: 8765)')
parser.add_argument('--webcam', type=int, default=None,
help='USB webcam device index (e.g. 0 for /dev/video0). Bypasses teleimager.')
parser.add_argument('--webcam-res', type=str, default='480p', choices=['480p', '720p', '1080p'],
@ -288,6 +294,13 @@ if __name__ == '__main__':
logger_mp.debug(f"Camera config: {camera_config}")
# televuer_wrapper: obtain hand pose data from the XR device and transmit the robot's head camera image to the XR device.
if args.native:
# Native Quest 3 app with body tracking — replaces Vuer/browser
tv_wrapper = NativeTeleWrapper(port=args.native_port)
tv_wrapper.start()
logger_mp.info(f"[native] WebSocket server started on port {args.native_port}")
xr_need_local_img = args.webcam is not None # Only need to send webcam if we have one
else:
# Match display_fps to webcam send rate to avoid re-sending stale frames
vuer_display_fps = float(args.webcam_fps) if args.webcam is not None else 30.0
tv_wrapper = TeleVuerWrapper(use_hand_tracking=args.input_mode == "hand",
@ -665,10 +678,15 @@ if __name__ == '__main__':
settle_elapsed = now - calibration_time
settle_alpha = min(1.0, settle_elapsed / max(args.settle_time, 0.01))
# Frozen head position: tv_wrapper computes wrist - head_pos,
# so head movement creates spurious wrist drift. Cancel it by
# adding back the head position change since calibration,
# effectively using the frozen calibration head position.
# Head position compensation:
# - Vuer/browser mode: wrist = world - head_pos, so head movement
# creates spurious drift. Cancel by adding head_pos_delta back.
# - Native mode: wrist is chest-relative from body tracking,
# already correct. Skip head compensation entirely.
if args.native:
left_pos_comp = tele_data.left_wrist_pose[:3, 3]
right_pos_comp = tele_data.right_wrist_pose[:3, 3]
else:
R_head_delta = tele_data.head_pose[:3, :3] @ head_R_at_cal.T if head_R_at_cal is not None else np.eye(3)
head_pos_delta = tele_data.head_pose[:3, 3] - head_pos_at_cal
left_pos_comp = tele_data.left_wrist_pose[:3, 3] + head_pos_delta

315
teleop/teleop_server.py

@ -0,0 +1,315 @@
#!/usr/bin/env python3
"""
Lightweight WebSocket server for Quest 3 native teleop app.
Replaces Vuer (~900 lines) with ~200 lines.
Receives body tracking data (JSON) from the Quest 3 native app,
writes poses to shared memory arrays compatible with teleop_hand_and_arm.py,
and sends webcam JPEG frames back to the app.
Protocol:
Client Server: JSON text messages with tracking data
Server Client: Binary messages with JPEG webcam frames
Server Client: JSON text messages for status/config
Shared Memory Format (matches TeleVuer/TeleVuerWrapper interface):
head_pose_shared: Array('d', 16) 4x4 SE(3), column-major ('F' order)
left_arm_pose_shared: Array('d', 16) 4x4 SE(3), column-major ('F' order)
right_arm_pose_shared: Array('d', 16) 4x4 SE(3), column-major ('F' order)
last_data_time_shared: Value('d') Unix timestamp of last received data
chest_pose_shared: Array('d', 16) 4x4 SE(3), NEW: chest orientation
Hand tracking (if enabled):
left_hand_position_shared: Array('d', 75) 25×3 positions
right_hand_position_shared: Array('d', 75) 25×3 positions
left_hand_orientation_shared: Array('d', 225) 25×3×3 rotations (col-major)
right_hand_orientation_shared: Array('d', 225) 25×3×3 rotations (col-major)
Usage:
# Standalone (for testing):
python teleop_server.py --port 8765
# Integrated with teleop_hand_and_arm.py:
# Import TeleopServer and pass shared memory arrays
"""
import asyncio
import json
import time
import logging
import argparse
import threading
import numpy as np
from multiprocessing import Array, Value
try:
import websockets
import websockets.asyncio.server
except ImportError:
print("ERROR: 'websockets' package required. Install with: pip install websockets")
raise
logger = logging.getLogger("teleop_server")
class TeleopServer:
"""WebSocket server that bridges Quest 3 native app to teleop shared memory."""
def __init__(self, host="0.0.0.0", port=8765,
head_pose_shared=None,
left_arm_pose_shared=None,
right_arm_pose_shared=None,
last_data_time_shared=None,
chest_pose_shared=None,
left_hand_position_shared=None,
right_hand_position_shared=None,
left_hand_orientation_shared=None,
right_hand_orientation_shared=None,
left_hand_pinch_shared=None,
left_hand_squeeze_shared=None,
right_hand_pinch_shared=None,
right_hand_squeeze_shared=None):
self.host = host
self.port = port
# Create shared memory if not provided (standalone mode)
self.head_pose_shared = head_pose_shared or Array('d', 16, lock=True)
self.left_arm_pose_shared = left_arm_pose_shared or Array('d', 16, lock=True)
self.right_arm_pose_shared = right_arm_pose_shared or Array('d', 16, lock=True)
self.last_data_time_shared = last_data_time_shared or Value('d', 0.0, lock=True)
self.chest_pose_shared = chest_pose_shared or Array('d', 16, lock=True)
# Hand tracking arrays
self.left_hand_position_shared = left_hand_position_shared or Array('d', 75, lock=True)
self.right_hand_position_shared = right_hand_position_shared or Array('d', 75, lock=True)
self.left_hand_orientation_shared = left_hand_orientation_shared or Array('d', 225, lock=True)
self.right_hand_orientation_shared = right_hand_orientation_shared or Array('d', 225, lock=True)
# Pinch/squeeze (computed from finger positions)
self.left_hand_pinch_shared = left_hand_pinch_shared
self.left_hand_squeeze_shared = left_hand_squeeze_shared
self.right_hand_pinch_shared = right_hand_pinch_shared
self.right_hand_squeeze_shared = right_hand_squeeze_shared
# Initialize with identity matrices
identity_flat = np.eye(4).flatten(order='F').tolist()
with self.head_pose_shared.get_lock():
self.head_pose_shared[:] = identity_flat
with self.left_arm_pose_shared.get_lock():
self.left_arm_pose_shared[:] = identity_flat
with self.right_arm_pose_shared.get_lock():
self.right_arm_pose_shared[:] = identity_flat
with self.chest_pose_shared.get_lock():
self.chest_pose_shared[:] = identity_flat
# Webcam frame (set by external code, sent to clients)
self._webcam_frame = None
self._webcam_lock = threading.Lock()
self._webcam_event = asyncio.Event()
# Connection tracking
self._clients = set()
self._message_count = 0
self._last_log_time = 0
def set_webcam_frame(self, jpeg_bytes: bytes):
"""Called by external code (e.g., ThreadedWebcam) to provide a new JPEG frame."""
with self._webcam_lock:
self._webcam_frame = jpeg_bytes
# Signal the async send loop
try:
self._webcam_event.set()
except RuntimeError:
pass # Event loop not running yet
async def _handle_client(self, websocket):
"""Handle a single WebSocket client connection."""
remote = websocket.remote_address
logger.info(f"Client connected: {remote}")
self._clients.add(websocket)
# Send initial config
await websocket.send(json.dumps({
"type": "config",
"version": "1.0",
"body_tracking": True,
}))
# Start webcam sender task for this client
webcam_task = asyncio.create_task(self._send_webcam_loop(websocket))
try:
async for message in websocket:
if isinstance(message, str):
self._handle_tracking_message(message)
# Binary messages from client are ignored
except websockets.exceptions.ConnectionClosed:
logger.info(f"Client disconnected: {remote}")
finally:
self._clients.discard(websocket)
webcam_task.cancel()
try:
await webcam_task
except asyncio.CancelledError:
pass
def _handle_tracking_message(self, message: str):
"""Parse tracking JSON and write to shared memory."""
try:
data = json.loads(message)
except json.JSONDecodeError:
logger.warning("Invalid JSON received")
return
msg_type = data.get("type")
if msg_type != "tracking":
return
self._message_count += 1
now = time.time()
# Rate-limited logging (every 5 seconds)
if now - self._last_log_time > 5.0:
logger.info(f"Tracking messages received: {self._message_count}")
self._last_log_time = now
# Update timestamp
with self.last_data_time_shared.get_lock():
self.last_data_time_shared.value = now
# Head pose: 7 floats [x, y, z, qx, qy, qz, qw] → 4x4 column-major
head = data.get("head")
if head and len(head) == 7:
mat = _pose7_to_mat16(head)
with self.head_pose_shared.get_lock():
self.head_pose_shared[:] = mat
# Chest pose: 7 floats → 4x4 column-major
chest = data.get("chest")
if chest and len(chest) == 7:
mat = _pose7_to_mat16(chest)
with self.chest_pose_shared.get_lock():
self.chest_pose_shared[:] = mat
# Left wrist: 16 floats (already column-major 4x4)
left_wrist = data.get("left_wrist")
if left_wrist and len(left_wrist) == 16:
with self.left_arm_pose_shared.get_lock():
self.left_arm_pose_shared[:] = left_wrist
# Right wrist: 16 floats (already column-major 4x4)
right_wrist = data.get("right_wrist")
if right_wrist and len(right_wrist) == 16:
with self.right_arm_pose_shared.get_lock():
self.right_arm_pose_shared[:] = right_wrist
# Hand positions: 75 floats each (25 joints × 3)
left_hand_pos = data.get("left_hand_pos")
if left_hand_pos and len(left_hand_pos) == 75:
with self.left_hand_position_shared.get_lock():
self.left_hand_position_shared[:] = left_hand_pos
right_hand_pos = data.get("right_hand_pos")
if right_hand_pos and len(right_hand_pos) == 75:
with self.right_hand_position_shared.get_lock():
self.right_hand_position_shared[:] = right_hand_pos
# Hand rotations: 225 floats each (25 joints × 9)
left_hand_rot = data.get("left_hand_rot")
if left_hand_rot and len(left_hand_rot) == 225:
with self.left_hand_orientation_shared.get_lock():
self.left_hand_orientation_shared[:] = left_hand_rot
right_hand_rot = data.get("right_hand_rot")
if right_hand_rot and len(right_hand_rot) == 225:
with self.right_hand_orientation_shared.get_lock():
self.right_hand_orientation_shared[:] = right_hand_rot
async def _send_webcam_loop(self, websocket):
"""Send webcam JPEG frames to a client at ~15 fps."""
interval = 1.0 / 15.0
while True:
await asyncio.sleep(interval)
with self._webcam_lock:
frame = self._webcam_frame
if frame is not None:
try:
await websocket.send(frame)
except websockets.exceptions.ConnectionClosed:
break
async def serve(self):
"""Start the WebSocket server."""
logger.info(f"Starting teleop server on ws://{self.host}:{self.port}")
async with websockets.asyncio.server.serve(
self._handle_client,
self.host,
self.port,
max_size=2**20, # 1 MB max message size
ping_interval=20,
ping_timeout=20,
) as server:
logger.info(f"Teleop server running on ws://{self.host}:{self.port}")
await asyncio.Future() # Run forever
def start_in_thread(self):
"""Start the server in a background thread. Returns the thread."""
def _run():
asyncio.run(self.serve())
thread = threading.Thread(target=_run, daemon=True)
thread.start()
logger.info("Teleop server started in background thread")
return thread
def _pose7_to_mat16(pose7):
"""Convert [x, y, z, qx, qy, qz, qw] to 16-float column-major 4x4 matrix.
Compatible with NumPy reshape(4,4, order='F').
"""
x, y, z, qx, qy, qz, qw = pose7
# Quaternion to rotation matrix
xx = qx * qx; yy = qy * qy; zz = qz * qz
xy = qx * qy; xz = qx * qz; yz = qy * qz
wx = qw * qx; wy = qw * qy; wz = qw * qz
r00 = 1 - 2 * (yy + zz)
r01 = 2 * (xy - wz)
r02 = 2 * (xz + wy)
r10 = 2 * (xy + wz)
r11 = 1 - 2 * (xx + zz)
r12 = 2 * (yz - wx)
r20 = 2 * (xz - wy)
r21 = 2 * (yz + wx)
r22 = 1 - 2 * (xx + yy)
# Column-major (Fortran order): column 0, column 1, column 2, column 3
return [
r00, r10, r20, 0.0, # Column 0
r01, r11, r21, 0.0, # Column 1
r02, r12, r22, 0.0, # Column 2
x, y, z, 1.0, # Column 3
]
# --- Standalone mode ---
def main():
parser = argparse.ArgumentParser(description="Teleop WebSocket server for Quest 3 native app")
parser.add_argument("--host", default="0.0.0.0", help="Bind address (default: 0.0.0.0)")
parser.add_argument("--port", type=int, default=8765, help="WebSocket port (default: 8765)")
parser.add_argument("--verbose", action="store_true", help="Enable debug logging")
args = parser.parse_args()
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s"
)
server = TeleopServer(host=args.host, port=args.port)
asyncio.run(server.serve())
if __name__ == "__main__":
main()
Loading…
Cancel
Save