#!/usr/bin/env python3 """ Lightweight WebSocket server for Quest 3 native teleop app. Replaces Vuer (~900 lines) with ~200 lines. Receives body tracking data (JSON) from the Quest 3 native app, writes poses to shared memory arrays compatible with teleop_hand_and_arm.py, and sends webcam JPEG frames back to the app. Protocol: Client → Server: JSON text messages with tracking data Server → Client: Binary messages with JPEG webcam frames Server → Client: JSON text messages for status/config Shared Memory Format (matches TeleVuer/TeleVuerWrapper interface): head_pose_shared: Array('d', 16) — 4x4 SE(3), column-major ('F' order) left_arm_pose_shared: Array('d', 16) — 4x4 SE(3), column-major ('F' order) right_arm_pose_shared: Array('d', 16) — 4x4 SE(3), column-major ('F' order) last_data_time_shared: Value('d') — Unix timestamp of last received data chest_pose_shared: Array('d', 16) — 4x4 SE(3), NEW: chest orientation Hand tracking (if enabled): left_hand_position_shared: Array('d', 75) — 25×3 positions right_hand_position_shared: Array('d', 75) — 25×3 positions left_hand_orientation_shared: Array('d', 225) — 25×3×3 rotations (col-major) right_hand_orientation_shared: Array('d', 225) — 25×3×3 rotations (col-major) Usage: # Standalone (for testing): python teleop_server.py --port 8765 # Integrated with teleop_hand_and_arm.py: # Import TeleopServer and pass shared memory arrays """ import asyncio import json import time import logging import argparse import threading import numpy as np from multiprocessing import Array, Value try: import websockets import websockets.asyncio.server except ImportError: print("ERROR: 'websockets' package required. Install with: pip install websockets") raise logger = logging.getLogger("teleop_server") class TeleopServer: """WebSocket server that bridges Quest 3 native app to teleop shared memory.""" def __init__(self, host="0.0.0.0", port=8765, head_pose_shared=None, left_arm_pose_shared=None, right_arm_pose_shared=None, last_data_time_shared=None, chest_pose_shared=None, left_hand_position_shared=None, right_hand_position_shared=None, left_hand_orientation_shared=None, right_hand_orientation_shared=None, left_hand_pinch_shared=None, left_hand_squeeze_shared=None, right_hand_pinch_shared=None, right_hand_squeeze_shared=None, body_joints_shared=None): self.host = host self.port = port # Create shared memory if not provided (standalone mode) self.head_pose_shared = head_pose_shared or Array('d', 16, lock=True) self.left_arm_pose_shared = left_arm_pose_shared or Array('d', 16, lock=True) self.right_arm_pose_shared = right_arm_pose_shared or Array('d', 16, lock=True) self.last_data_time_shared = last_data_time_shared or Value('d', 0.0, lock=True) self.chest_pose_shared = chest_pose_shared or Array('d', 16, lock=True) # Hand tracking arrays self.left_hand_position_shared = left_hand_position_shared or Array('d', 75, lock=True) self.right_hand_position_shared = right_hand_position_shared or Array('d', 75, lock=True) self.left_hand_orientation_shared = left_hand_orientation_shared or Array('d', 225, lock=True) self.right_hand_orientation_shared = right_hand_orientation_shared or Array('d', 225, lock=True) # Pinch/squeeze (computed from finger positions) self.left_hand_pinch_shared = left_hand_pinch_shared self.left_hand_squeeze_shared = left_hand_squeeze_shared self.right_hand_pinch_shared = right_hand_pinch_shared self.right_hand_squeeze_shared = right_hand_squeeze_shared # Body joint transforms for retargeting (8 joints x 7 floats = 56) # Order: hips, chest, L_upper_arm, L_lower_arm, L_wrist, R_upper_arm, R_lower_arm, R_wrist self.body_joints_shared = body_joints_shared or Array('d', 56, lock=True) # Initialize with identity matrices identity_flat = np.eye(4).flatten(order='F').tolist() with self.head_pose_shared.get_lock(): self.head_pose_shared[:] = identity_flat with self.left_arm_pose_shared.get_lock(): self.left_arm_pose_shared[:] = identity_flat with self.right_arm_pose_shared.get_lock(): self.right_arm_pose_shared[:] = identity_flat with self.chest_pose_shared.get_lock(): self.chest_pose_shared[:] = identity_flat # Camera frames: dict of camera_id -> jpeg_bytes (set by external code) # Camera IDs: 0=head, 1=left_wrist, 2=right_wrist self._camera_frames = {} self._webcam_lock = threading.Lock() self._webcam_event = asyncio.Event() # Connection tracking self._clients = set() self._message_count = 0 self._last_log_time = 0 def set_webcam_frame(self, jpeg_bytes: bytes, camera_id: int = 0): """Called by external code to provide a new JPEG frame for a camera. Args: jpeg_bytes: JPEG-encoded image data camera_id: 0=head, 1=left_wrist, 2=right_wrist """ with self._webcam_lock: self._camera_frames[camera_id] = jpeg_bytes try: self._webcam_event.set() except RuntimeError: pass # Event loop not running yet async def _handle_client(self, websocket): """Handle a single WebSocket client connection.""" remote = websocket.remote_address logger.info(f"Client connected: {remote}") self._clients.add(websocket) # Send initial config await websocket.send(json.dumps({ "type": "config", "version": "1.0", "body_tracking": True, })) # Start webcam sender task for this client webcam_task = asyncio.create_task(self._send_webcam_loop(websocket)) try: async for message in websocket: if isinstance(message, str): self._handle_tracking_message(message) # Binary messages from client are ignored except websockets.exceptions.ConnectionClosed: logger.info(f"Client disconnected: {remote}") finally: self._clients.discard(websocket) webcam_task.cancel() try: await webcam_task except asyncio.CancelledError: pass def _handle_tracking_message(self, message: str): """Parse tracking JSON and write to shared memory.""" try: data = json.loads(message) except json.JSONDecodeError: logger.warning("Invalid JSON received") return msg_type = data.get("type") if msg_type != "tracking": return self._message_count += 1 now = time.time() # Rate-limited logging (every 5 seconds) if now - self._last_log_time > 5.0: logger.info(f"Tracking messages received: {self._message_count}") self._last_log_time = now # Update timestamp with self.last_data_time_shared.get_lock(): self.last_data_time_shared.value = now # Head pose: 7 floats [x, y, z, qx, qy, qz, qw] → 4x4 column-major head = data.get("head") if head and len(head) == 7: mat = _pose7_to_mat16(head) with self.head_pose_shared.get_lock(): self.head_pose_shared[:] = mat # Chest pose: 7 floats → 4x4 column-major chest = data.get("chest") if chest and len(chest) == 7: mat = _pose7_to_mat16(chest) with self.chest_pose_shared.get_lock(): self.chest_pose_shared[:] = mat # Left wrist: 16 floats (already column-major 4x4) left_wrist = data.get("left_wrist") if left_wrist and len(left_wrist) == 16: with self.left_arm_pose_shared.get_lock(): self.left_arm_pose_shared[:] = left_wrist # Right wrist: 16 floats (already column-major 4x4) right_wrist = data.get("right_wrist") if right_wrist and len(right_wrist) == 16: with self.right_arm_pose_shared.get_lock(): self.right_arm_pose_shared[:] = right_wrist # Hand positions: 75 floats each (25 joints × 3) left_hand_pos = data.get("left_hand_pos") if left_hand_pos and len(left_hand_pos) == 75: with self.left_hand_position_shared.get_lock(): self.left_hand_position_shared[:] = left_hand_pos right_hand_pos = data.get("right_hand_pos") if right_hand_pos and len(right_hand_pos) == 75: with self.right_hand_position_shared.get_lock(): self.right_hand_position_shared[:] = right_hand_pos # Hand rotations: 225 floats each (25 joints × 9) left_hand_rot = data.get("left_hand_rot") if left_hand_rot and len(left_hand_rot) == 225: with self.left_hand_orientation_shared.get_lock(): self.left_hand_orientation_shared[:] = left_hand_rot right_hand_rot = data.get("right_hand_rot") if right_hand_rot and len(right_hand_rot) == 225: with self.right_hand_orientation_shared.get_lock(): self.right_hand_orientation_shared[:] = right_hand_rot # Body joint transforms: 56 floats (8 joints × 7) body_joints = data.get("body_joints") if body_joints and len(body_joints) == 56: with self.body_joints_shared.get_lock(): self.body_joints_shared[:] = body_joints async def _send_webcam_loop(self, websocket): """Send camera JPEG frames to a client. Head camera (~10fps), wrist cameras (~5fps). Each binary message: 1 byte camera_id + JPEG data. """ interval = 1.0 / 10.0 cycle = 0 while True: await asyncio.sleep(interval) with self._webcam_lock: frames = dict(self._camera_frames) # Head every cycle, wrist cameras every other cycle cam_ids = [0] # always send head if cycle % 2 == 0: cam_ids.extend([1, 2]) cycle += 1 for cam_id in cam_ids: frame = frames.get(cam_id) if frame is not None: try: await websocket.send(bytes([cam_id]) + frame) except websockets.exceptions.ConnectionClosed: return except Exception as e: logger.warning(f"Webcam send error (cam {cam_id}): {e}") return async def serve(self): """Start the WebSocket server.""" logger.info(f"Starting teleop server on ws://{self.host}:{self.port}") async with websockets.asyncio.server.serve( self._handle_client, self.host, self.port, max_size=2**20, # 1 MB max message size write_limit=2**18, # 256 KB write buffer (for camera frames) ping_interval=30, ping_timeout=60, ) as server: logger.info(f"Teleop server running on ws://{self.host}:{self.port}") await asyncio.Future() # Run forever def start_in_thread(self): """Start the server in a background thread. Returns the thread.""" def _run(): asyncio.run(self.serve()) thread = threading.Thread(target=_run, daemon=True) thread.start() logger.info("Teleop server started in background thread") return thread def _pose7_to_mat16(pose7): """Convert [x, y, z, qx, qy, qz, qw] to 16-float column-major 4x4 matrix. Compatible with NumPy reshape(4,4, order='F'). """ x, y, z, qx, qy, qz, qw = pose7 # Quaternion to rotation matrix xx = qx * qx; yy = qy * qy; zz = qz * qz xy = qx * qy; xz = qx * qz; yz = qy * qz wx = qw * qx; wy = qw * qy; wz = qw * qz r00 = 1 - 2 * (yy + zz) r01 = 2 * (xy - wz) r02 = 2 * (xz + wy) r10 = 2 * (xy + wz) r11 = 1 - 2 * (xx + zz) r12 = 2 * (yz - wx) r20 = 2 * (xz - wy) r21 = 2 * (yz + wx) r22 = 1 - 2 * (xx + yy) # Column-major (Fortran order): column 0, column 1, column 2, column 3 return [ r00, r10, r20, 0.0, # Column 0 r01, r11, r21, 0.0, # Column 1 r02, r12, r22, 0.0, # Column 2 x, y, z, 1.0, # Column 3 ] # --- Standalone mode --- def main(): parser = argparse.ArgumentParser(description="Teleop WebSocket server for Quest 3 native app") parser.add_argument("--host", default="0.0.0.0", help="Bind address (default: 0.0.0.0)") parser.add_argument("--port", type=int, default=8765, help="WebSocket port (default: 8765)") parser.add_argument("--verbose", action="store_true", help="Enable debug logging") args = parser.parse_args() logging.basicConfig( level=logging.DEBUG if args.verbose else logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s: %(message)s" ) server = TeleopServer(host=args.host, port=args.port) asyncio.run(server.serve()) if __name__ == "__main__": main()