You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

357 lines
14 KiB

#!/usr/bin/env python3
"""
Lightweight WebSocket server for Quest 3 native teleop app.
Replaces Vuer (~900 lines) with ~200 lines.
Receives body tracking data (JSON) from the Quest 3 native app,
writes poses to shared memory arrays compatible with teleop_hand_and_arm.py,
and sends webcam JPEG frames back to the app.
Protocol:
Client → Server: JSON text messages with tracking data
Server → Client: Binary messages with JPEG webcam frames
Server → Client: JSON text messages for status/config
Shared Memory Format (matches TeleVuer/TeleVuerWrapper interface):
head_pose_shared: Array('d', 16) — 4x4 SE(3), column-major ('F' order)
left_arm_pose_shared: Array('d', 16) — 4x4 SE(3), column-major ('F' order)
right_arm_pose_shared: Array('d', 16) — 4x4 SE(3), column-major ('F' order)
last_data_time_shared: Value('d') — Unix timestamp of last received data
chest_pose_shared: Array('d', 16) — 4x4 SE(3), NEW: chest orientation
Hand tracking (if enabled):
left_hand_position_shared: Array('d', 75) — 25×3 positions
right_hand_position_shared: Array('d', 75) — 25×3 positions
left_hand_orientation_shared: Array('d', 225) — 25×3×3 rotations (col-major)
right_hand_orientation_shared: Array('d', 225) — 25×3×3 rotations (col-major)
Usage:
# Standalone (for testing):
python teleop_server.py --port 8765
# Integrated with teleop_hand_and_arm.py:
# Import TeleopServer and pass shared memory arrays
"""
import asyncio
import json
import time
import logging
import argparse
import threading
import numpy as np
from multiprocessing import Array, Value
try:
import websockets
import websockets.asyncio.server
except ImportError:
print("ERROR: 'websockets' package required. Install with: pip install websockets")
raise
logger = logging.getLogger("teleop_server")
class TeleopServer:
"""WebSocket server that bridges Quest 3 native app to teleop shared memory."""
def __init__(self, host="0.0.0.0", port=8765,
head_pose_shared=None,
left_arm_pose_shared=None,
right_arm_pose_shared=None,
last_data_time_shared=None,
chest_pose_shared=None,
left_hand_position_shared=None,
right_hand_position_shared=None,
left_hand_orientation_shared=None,
right_hand_orientation_shared=None,
left_hand_pinch_shared=None,
left_hand_squeeze_shared=None,
right_hand_pinch_shared=None,
right_hand_squeeze_shared=None,
body_joints_shared=None):
self.host = host
self.port = port
# Create shared memory if not provided (standalone mode)
self.head_pose_shared = head_pose_shared or Array('d', 16, lock=True)
self.left_arm_pose_shared = left_arm_pose_shared or Array('d', 16, lock=True)
self.right_arm_pose_shared = right_arm_pose_shared or Array('d', 16, lock=True)
self.last_data_time_shared = last_data_time_shared or Value('d', 0.0, lock=True)
self.chest_pose_shared = chest_pose_shared or Array('d', 16, lock=True)
# Hand tracking arrays
self.left_hand_position_shared = left_hand_position_shared or Array('d', 75, lock=True)
self.right_hand_position_shared = right_hand_position_shared or Array('d', 75, lock=True)
self.left_hand_orientation_shared = left_hand_orientation_shared or Array('d', 225, lock=True)
self.right_hand_orientation_shared = right_hand_orientation_shared or Array('d', 225, lock=True)
# Pinch/squeeze (computed from finger positions)
self.left_hand_pinch_shared = left_hand_pinch_shared
self.left_hand_squeeze_shared = left_hand_squeeze_shared
self.right_hand_pinch_shared = right_hand_pinch_shared
self.right_hand_squeeze_shared = right_hand_squeeze_shared
# Body joint transforms for retargeting (8 joints x 7 floats = 56)
# Order: hips, chest, L_upper_arm, L_lower_arm, L_wrist, R_upper_arm, R_lower_arm, R_wrist
self.body_joints_shared = body_joints_shared or Array('d', 56, lock=True)
# Initialize with identity matrices
identity_flat = np.eye(4).flatten(order='F').tolist()
with self.head_pose_shared.get_lock():
self.head_pose_shared[:] = identity_flat
with self.left_arm_pose_shared.get_lock():
self.left_arm_pose_shared[:] = identity_flat
with self.right_arm_pose_shared.get_lock():
self.right_arm_pose_shared[:] = identity_flat
with self.chest_pose_shared.get_lock():
self.chest_pose_shared[:] = identity_flat
# Camera frames: dict of camera_id -> jpeg_bytes (set by external code)
# Camera IDs: 0=head, 1=left_wrist, 2=right_wrist
self._camera_frames = {}
self._webcam_lock = threading.Lock()
self._webcam_event = asyncio.Event()
# Connection tracking
self._clients = set()
self._message_count = 0
self._last_log_time = 0
# Command flags (set by client, read by bridge)
self.recalibrate_requested = Value('i', 0)
def set_webcam_frame(self, jpeg_bytes: bytes, camera_id: int = 0):
"""Called by external code to provide a new JPEG frame for a camera.
Args:
jpeg_bytes: JPEG-encoded image data
camera_id: 0=head, 1=left_wrist, 2=right_wrist
"""
with self._webcam_lock:
self._camera_frames[camera_id] = jpeg_bytes
try:
self._webcam_event.set()
except RuntimeError:
pass # Event loop not running yet
async def _handle_client(self, websocket):
"""Handle a single WebSocket client connection."""
remote = websocket.remote_address
logger.info(f"Client connected: {remote}")
self._clients.add(websocket)
# Send initial config
await websocket.send(json.dumps({
"type": "config",
"version": "1.0",
"body_tracking": True,
}))
# Start webcam sender task for this client
webcam_task = asyncio.create_task(self._send_webcam_loop(websocket))
try:
async for message in websocket:
if isinstance(message, str):
self._handle_tracking_message(message)
# Binary messages from client are ignored
except websockets.exceptions.ConnectionClosed:
logger.info(f"Client disconnected: {remote}")
finally:
self._clients.discard(websocket)
webcam_task.cancel()
try:
await webcam_task
except asyncio.CancelledError:
pass
def _handle_tracking_message(self, message: str):
"""Parse tracking JSON and write to shared memory."""
try:
data = json.loads(message)
except json.JSONDecodeError:
logger.warning("Invalid JSON received")
return
msg_type = data.get("type")
if msg_type == "recalibrate":
with self.recalibrate_requested.get_lock():
self.recalibrate_requested.value = 1
logger.info("Recalibrate requested by client")
return
if msg_type != "tracking":
return
self._message_count += 1
now = time.time()
# Rate-limited logging (every 5 seconds)
if now - self._last_log_time > 5.0:
logger.info(f"Tracking messages received: {self._message_count}")
self._last_log_time = now
# Update timestamp
with self.last_data_time_shared.get_lock():
self.last_data_time_shared.value = now
# Head pose: 7 floats [x, y, z, qx, qy, qz, qw] → 4x4 column-major
head = data.get("head")
if head and len(head) == 7:
mat = _pose7_to_mat16(head)
with self.head_pose_shared.get_lock():
self.head_pose_shared[:] = mat
# Chest pose: 7 floats → 4x4 column-major
chest = data.get("chest")
if chest and len(chest) == 7:
mat = _pose7_to_mat16(chest)
with self.chest_pose_shared.get_lock():
self.chest_pose_shared[:] = mat
# Left wrist: 16 floats (already column-major 4x4)
left_wrist = data.get("left_wrist")
if left_wrist and len(left_wrist) == 16:
with self.left_arm_pose_shared.get_lock():
self.left_arm_pose_shared[:] = left_wrist
# Right wrist: 16 floats (already column-major 4x4)
right_wrist = data.get("right_wrist")
if right_wrist and len(right_wrist) == 16:
with self.right_arm_pose_shared.get_lock():
self.right_arm_pose_shared[:] = right_wrist
# Hand positions: 75 floats each (25 joints × 3)
left_hand_pos = data.get("left_hand_pos")
if left_hand_pos and len(left_hand_pos) == 75:
with self.left_hand_position_shared.get_lock():
self.left_hand_position_shared[:] = left_hand_pos
right_hand_pos = data.get("right_hand_pos")
if right_hand_pos and len(right_hand_pos) == 75:
with self.right_hand_position_shared.get_lock():
self.right_hand_position_shared[:] = right_hand_pos
# Hand rotations: 225 floats each (25 joints × 9)
left_hand_rot = data.get("left_hand_rot")
if left_hand_rot and len(left_hand_rot) == 225:
with self.left_hand_orientation_shared.get_lock():
self.left_hand_orientation_shared[:] = left_hand_rot
right_hand_rot = data.get("right_hand_rot")
if right_hand_rot and len(right_hand_rot) == 225:
with self.right_hand_orientation_shared.get_lock():
self.right_hand_orientation_shared[:] = right_hand_rot
# Body joint transforms: 56 floats (8 joints × 7)
body_joints = data.get("body_joints")
if body_joints and len(body_joints) == 56:
with self.body_joints_shared.get_lock():
self.body_joints_shared[:] = body_joints
async def _send_webcam_loop(self, websocket):
"""Send camera JPEG frames to a client.
Head camera (~10fps), wrist cameras (~5fps).
Each binary message: 1 byte camera_id + JPEG data.
"""
interval = 1.0 / 10.0
cycle = 0
while True:
await asyncio.sleep(interval)
with self._webcam_lock:
frames = dict(self._camera_frames)
# Head every cycle, wrist cameras every other cycle
cam_ids = [0] # always send head
if cycle % 2 == 0:
cam_ids.extend([1, 2])
cycle += 1
for cam_id in cam_ids:
frame = frames.get(cam_id)
if frame is not None:
try:
await websocket.send(bytes([cam_id]) + frame)
except websockets.exceptions.ConnectionClosed:
return
except Exception as e:
logger.warning(f"Webcam send error (cam {cam_id}): {e}")
return
async def serve(self):
"""Start the WebSocket server."""
logger.info(f"Starting teleop server on ws://{self.host}:{self.port}")
async with websockets.asyncio.server.serve(
self._handle_client,
self.host,
self.port,
max_size=2**20, # 1 MB max message size
write_limit=2**18, # 256 KB write buffer (for camera frames)
ping_interval=30,
ping_timeout=60,
) as server:
logger.info(f"Teleop server running on ws://{self.host}:{self.port}")
await asyncio.Future() # Run forever
def start_in_thread(self):
"""Start the server in a background thread. Returns the thread."""
def _run():
asyncio.run(self.serve())
thread = threading.Thread(target=_run, daemon=True)
thread.start()
logger.info("Teleop server started in background thread")
return thread
def _pose7_to_mat16(pose7):
"""Convert [x, y, z, qx, qy, qz, qw] to 16-float column-major 4x4 matrix.
Compatible with NumPy reshape(4,4, order='F').
"""
x, y, z, qx, qy, qz, qw = pose7
# Quaternion to rotation matrix
xx = qx * qx; yy = qy * qy; zz = qz * qz
xy = qx * qy; xz = qx * qz; yz = qy * qz
wx = qw * qx; wy = qw * qy; wz = qw * qz
r00 = 1 - 2 * (yy + zz)
r01 = 2 * (xy - wz)
r02 = 2 * (xz + wy)
r10 = 2 * (xy + wz)
r11 = 1 - 2 * (xx + zz)
r12 = 2 * (yz - wx)
r20 = 2 * (xz - wy)
r21 = 2 * (yz + wx)
r22 = 1 - 2 * (xx + yy)
# Column-major (Fortran order): column 0, column 1, column 2, column 3
return [
r00, r10, r20, 0.0, # Column 0
r01, r11, r21, 0.0, # Column 1
r02, r12, r22, 0.0, # Column 2
x, y, z, 1.0, # Column 3
]
# --- Standalone mode ---
def main():
parser = argparse.ArgumentParser(description="Teleop WebSocket server for Quest 3 native app")
parser.add_argument("--host", default="0.0.0.0", help="Bind address (default: 0.0.0.0)")
parser.add_argument("--port", type=int, default=8765, help="WebSocket port (default: 8765)")
parser.add_argument("--verbose", action="store_true", help="Enable debug logging")
args = parser.parse_args()
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s"
)
server = TeleopServer(host=args.host, port=args.port)
asyncio.run(server.serve())
if __name__ == "__main__":
main()