diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..99ad1d1 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +*pycache* +*.pyc +*.egg-info +*build/ +*.pem diff --git a/README.md b/README.md new file mode 100644 index 0000000..ccd0be5 --- /dev/null +++ b/README.md @@ -0,0 +1,95 @@ +# TeleVuer + +The TeleVuer library is a specialized version of the Vuer library, designed to enable XR device-based teleoperation of Unitree Robotics robots. This library acts as a wrapper for Vuer, providing additional adaptations specifically tailored for Unitree Robotics. By integrating XR device capabilities, such as hand tracking and controller tracking, TeleVuer facilitates seamless interaction and control of robotic systems in immersive environments. + +Currently, this module serves as a core component of the [xr_teleoperate](https://github.com/unitreerobotics/xr_teleoperate) library, offering advanced functionality for teleoperation tasks. It supports various XR devices, including Apple Vision Pro, Meta Quest3, Pico 4 Ultra Enterprise etc., ensuring compatibility and ease of use for robotic teleoperation applications. + +## Install + +```bash +cd televuer +pip install -e . +openssl req -x509 -nodes -days 365 -newkey rsa:2048 -keyout key.pem -out cert.pem +``` + +## Test + +```bash +python _test_televuer.py +# or +python _test_tv_wrapper.py + +# First, use Apple Vision Pro or Pico 4 Ultra Enterprise to connect to the same Wi-Fi network as your computer. +# Next, open safari / pico browser, enter https://host machine's ip:8012/?ws=wss://host machine's ip:8012 +# for example, https://192.168.123.2:8012?ws=wss://192.168.123.2:8012 +# Use the appropriate method (hand gesture or controller) to click the "Virtual Reality" button in the bottom-left corner of the screen. + +# Press Enter in the terminal to launch the program. +``` + +## Version History + +`vuer==0.0.32rc7` + +- **Functionality**: + - Hand tracking works fine. + - Controller tracking is not supported. + +--- + +`vuer==0.0.35` + +- **Functionality**: + - AVP hand tracking works fine. + - PICO hand tracking works fine, but the right eye occasionally goes black for a short time at startup. + +--- + +`vuer==0.0.36rc1` to `vuer==0.0.42rc16` + +- **Functionality**: + - Hand tracking only shows a flat RGB image (no stereo view). + - PICO hand and controller tracking behave the same, with occasional right-eye blackouts at startup. + - Hand or controller markers are displayed as either black boxes (`vuer==0.0.36rc1`) or RGB three-axis color coordinates (`vuer==0.0.42rc16`). + +--- + +`vuer==0.0.42` to `vuer==0.0.45` + +- **Functionality**: + - Hand tracking only shows a flat RGB image (no stereo view). + - Unable to retrieve hand tracking data. + - Controller tracking only shows a flat RGB image (no stereo view), but controller data can be retrieved. + +--- + +`vuer==0.0.46` to `vuer==0.0.56` + +- **Functionality**: + - AVP hand tracking works fine. + - In PICO hand tracking mode: + - Using a hand gesture to click the "Virtual Reality" button causes the screen to stay black and stuck loading. + - Using the controller to click the button works fine. + - In PICO controller tracking mode: + - Using the controller to click the "Virtual Reality" button causes the screen to stay black and stuck loading. + - Using a hand gesture to click the button works fine. + - Hand marker visualization is displayed as RGB three-axis color coordinates. + +--- + +`vuer==0.0.60` +- **Recommended Version** + +- **Functionality**: + - Stable functionality with good compatibility. + - Most known issues have been resolved. +- **References**: + - [GitHub Issue #53](https://github.com/unitreerobotics/xr_teleoperate/issues/53) + - [GitHub Issue #45](https://github.com/vuer-ai/vuer/issues/45) + - [GitHub Issue #65](https://github.com/vuer-ai/vuer/issues/65) + +--- + +## Notes +- **Recommended Version**: Use `vuer==0.0.60` for the best functionality and stability. +- **Black Screen Issue**: On PICO devices, choose the appropriate interaction method (hand gesture or controller) based on the mode to avoid black screen issues. \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..f126a7d --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,26 @@ +[build-system] +requires = ["setuptools>=64", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "televuer" +version = "0.0.3" +description = "XR vision and hand/controller teleoperate interface for unitree robotics" +authors = [ + { name = "silencht", email = "silencht@qq.com" } +] +requires-python = ">=3.10" +license = { text = "MIT" } + +dependencies = [ + "numpy<2.0.0", + "opencv-python", + "logging-mp", + "vuer[all]==0.0.60" +] + +[tool.setuptools] +package-dir = {"" = "src"} + +[tool.setuptools.packages.find] +where = ["src"] \ No newline at end of file diff --git a/src/televuer/__init__.py b/src/televuer/__init__.py new file mode 100644 index 0000000..17b4fd9 --- /dev/null +++ b/src/televuer/__init__.py @@ -0,0 +1,3 @@ +# unitree_televuer/__init__.py +from .televuer import TeleVuer +from .tv_wrapper import TeleVuerWrapper, TeleData, TeleStateData \ No newline at end of file diff --git a/src/televuer/televuer.py b/src/televuer/televuer.py new file mode 100644 index 0000000..2992f67 --- /dev/null +++ b/src/televuer/televuer.py @@ -0,0 +1,515 @@ +from vuer import Vuer +from vuer.schemas import ImageBackground, Hands, MotionControllers, WebRTCVideoPlane, WebRTCStereoVideoPlane +from multiprocessing import Value, Array, Process, shared_memory +import numpy as np +import asyncio +import cv2 +import os +from pathlib import Path + + +class TeleVuer: + def __init__(self, binocular: bool, use_hand_tracking: bool, img_shape, img_shm_name, cert_file=None, key_file=None, ngrok=False, webrtc=False): + """ + TeleVuer class for OpenXR-based XR teleoperate applications. + This class handles the communication with the Vuer server and manages the shared memory for image and pose data. + + :param binocular: bool, whether the application is binocular (stereoscopic) or monocular. + :param use_hand_tracking: bool, whether to use hand tracking or controller tracking. + :param img_shape: tuple, shape of the image (height, width, channels). + :param img_shm_name: str, name of the shared memory for the image. + :param cert_file: str, path to the SSL certificate file. + :param key_file: str, path to the SSL key file. + :param ngrok: bool, whether to use ngrok for tunneling. + """ + self.binocular = binocular + self.use_hand_tracking = use_hand_tracking + self.img_height = img_shape[0] + if self.binocular: + self.img_width = img_shape[1] // 2 + else: + self.img_width = img_shape[1] + + current_module_dir = Path(__file__).resolve().parent.parent.parent + if cert_file is None: + cert_file = os.path.join(current_module_dir, "cert.pem") + if key_file is None: + key_file = os.path.join(current_module_dir, "key.pem") + + if ngrok: + self.vuer = Vuer(host='0.0.0.0', queries=dict(grid=False), queue_len=3) + else: + self.vuer = Vuer(host='0.0.0.0', cert=cert_file, key=key_file, queries=dict(grid=False), queue_len=3) + + self.vuer.add_handler("CAMERA_MOVE")(self.on_cam_move) + if self.use_hand_tracking: + self.vuer.add_handler("HAND_MOVE")(self.on_hand_move) + else: + self.vuer.add_handler("CONTROLLER_MOVE")(self.on_controller_move) + + existing_shm = shared_memory.SharedMemory(name=img_shm_name) + self.img_array = np.ndarray(img_shape, dtype=np.uint8, buffer=existing_shm.buf) + + self.webrtc = webrtc + if self.binocular and not self.webrtc: + self.vuer.spawn(start=False)(self.main_image_binocular) + elif not self.binocular and not self.webrtc: + self.vuer.spawn(start=False)(self.main_image_monocular) + elif self.webrtc: + self.vuer.spawn(start=False)(self.main_image_webrtc) + + self.head_pose_shared = Array('d', 16, lock=True) + self.left_arm_pose_shared = Array('d', 16, lock=True) + self.right_arm_pose_shared = Array('d', 16, lock=True) + if self.use_hand_tracking: + self.left_hand_position_shared = Array('d', 75, lock=True) + self.right_hand_position_shared = Array('d', 75, lock=True) + self.left_hand_orientation_shared = Array('d', 25 * 9, lock=True) + self.right_hand_orientation_shared = Array('d', 25 * 9, lock=True) + + self.left_pinch_state_shared = Value('b', False, lock=True) + self.left_pinch_value_shared = Value('d', 0.0, lock=True) + self.left_squeeze_state_shared = Value('b', False, lock=True) + self.left_squeeze_value_shared = Value('d', 0.0, lock=True) + + self.right_pinch_state_shared = Value('b', False, lock=True) + self.right_pinch_value_shared = Value('d', 0.0, lock=True) + self.right_squeeze_state_shared = Value('b', False, lock=True) + self.right_squeeze_value_shared = Value('d', 0.0, lock=True) + else: + self.left_trigger_state_shared = Value('b', False, lock=True) + self.left_trigger_value_shared = Value('d', 0.0, lock=True) + self.left_squeeze_state_shared = Value('b', False, lock=True) + self.left_squeeze_value_shared = Value('d', 0.0, lock=True) + self.left_thumbstick_state_shared = Value('b', False, lock=True) + self.left_thumbstick_value_shared = Array('d', 2, lock=True) + self.left_aButton_shared = Value('b', False, lock=True) + self.left_bButton_shared = Value('b', False, lock=True) + + self.right_trigger_state_shared = Value('b', False, lock=True) + self.right_trigger_value_shared = Value('d', 0.0, lock=True) + self.right_squeeze_state_shared = Value('b', False, lock=True) + self.right_squeeze_value_shared = Value('d', 0.0, lock=True) + self.right_thumbstick_state_shared = Value('b', False, lock=True) + self.right_thumbstick_value_shared = Array('d', 2, lock=True) + self.right_aButton_shared = Value('b', False, lock=True) + self.right_bButton_shared = Value('b', False, lock=True) + + self.process = Process(target=self.vuer_run) + self.process.daemon = True + self.process.start() + + def vuer_run(self): + self.vuer.run() + + async def on_cam_move(self, event, session, fps=60): + try: + with self.head_pose_shared.get_lock(): + self.head_pose_shared[:] = event.value["camera"]["matrix"] + except: + pass + + async def on_controller_move(self, event, session, fps=60): + try: + with self.left_arm_pose_shared.get_lock(): + self.left_arm_pose_shared[:] = event.value["left"] + with self.right_arm_pose_shared.get_lock(): + self.right_arm_pose_shared[:] = event.value["right"] + + left_controller_state = event.value["leftState"] + right_controller_state = event.value["rightState"] + + def extract_controller_states(state_dict, prefix): + # trigger + with getattr(self, f"{prefix}_trigger_state_shared").get_lock(): + getattr(self, f"{prefix}_trigger_state_shared").value = bool(state_dict.get("trigger", False)) + with getattr(self, f"{prefix}_trigger_value_shared").get_lock(): + getattr(self, f"{prefix}_trigger_value_shared").value = float(state_dict.get("triggerValue", 0.0)) + # squeeze + with getattr(self, f"{prefix}_squeeze_state_shared").get_lock(): + getattr(self, f"{prefix}_squeeze_state_shared").value = bool(state_dict.get("squeeze", False)) + with getattr(self, f"{prefix}_squeeze_value_shared").get_lock(): + getattr(self, f"{prefix}_squeeze_value_shared").value = float(state_dict.get("squeezeValue", 0.0)) + # thumbstick + with getattr(self, f"{prefix}_thumbstick_state_shared").get_lock(): + getattr(self, f"{prefix}_thumbstick_state_shared").value = bool(state_dict.get("thumbstick", False)) + with getattr(self, f"{prefix}_thumbstick_value_shared").get_lock(): + getattr(self, f"{prefix}_thumbstick_value_shared")[:] = state_dict.get("thumbstickValue", [0.0, 0.0]) + # buttons + with getattr(self, f"{prefix}_aButton_shared").get_lock(): + getattr(self, f"{prefix}_aButton_shared").value = bool(state_dict.get("aButton", False)) + with getattr(self, f"{prefix}_bButton_shared").get_lock(): + getattr(self, f"{prefix}_bButton_shared").value = bool(state_dict.get("bButton", False)) + + extract_controller_states(left_controller_state, "left") + extract_controller_states(right_controller_state, "right") + except: + pass + + async def on_hand_move(self, event, session, fps=60): + try: + left_hand_data = event.value["left"] + right_hand_data = event.value["right"] + left_hand_state = event.value["leftState"] + right_hand_state = event.value["rightState"] + + def extract_hand_poses(hand_data, arm_pose_shared, hand_position_shared, hand_orientation_shared): + with arm_pose_shared.get_lock(): + arm_pose_shared[:] = hand_data[0:16] + + with hand_position_shared.get_lock(): + for i in range(25): + base = i * 16 + hand_position_shared[i * 3: i * 3 + 3] = [hand_data[base + 12], hand_data[base + 13], hand_data[base + 14]] + + with hand_orientation_shared.get_lock(): + for i in range(25): + base = i * 16 + hand_orientation_shared[i * 9: i * 9 + 9] = [ + hand_data[base + 0], hand_data[base + 1], hand_data[base + 2], + hand_data[base + 4], hand_data[base + 5], hand_data[base + 6], + hand_data[base + 8], hand_data[base + 9], hand_data[base + 10], + ] + + def extract_hand_states(state_dict, prefix): + # pinch + with getattr(self, f"{prefix}_pinch_state_shared").get_lock(): + getattr(self, f"{prefix}_pinch_state_shared").value = bool(state_dict.get("pinch", False)) + with getattr(self, f"{prefix}_pinch_value_shared").get_lock(): + getattr(self, f"{prefix}_pinch_value_shared").value = float(state_dict.get("pinchValue", 0.0)) + # squeeze + with getattr(self, f"{prefix}_squeeze_state_shared").get_lock(): + getattr(self, f"{prefix}_squeeze_state_shared").value = bool(state_dict.get("squeeze", False)) + with getattr(self, f"{prefix}_squeeze_value_shared").get_lock(): + getattr(self, f"{prefix}_squeeze_value_shared").value = float(state_dict.get("squeezeValue", 0.0)) + + extract_hand_poses(left_hand_data, self.left_arm_pose_shared, self.left_hand_position_shared, self.left_hand_orientation_shared) + extract_hand_poses(right_hand_data, self.right_arm_pose_shared, self.right_hand_position_shared, self.right_hand_orientation_shared) + extract_hand_states(left_hand_state, "left") + extract_hand_states(right_hand_state, "right") + + except: + pass + + async def main_image_binocular(self, session, fps=60): + if self.use_hand_tracking: + session.upsert( + Hands( + stream=True, + key="hands", + hideLeft=True, + hideRight=True + ), + to="bgChildren", + ) + else: + session.upsert( + MotionControllers( + stream=True, + key="motionControllers", + left=True, + right=True, + ), + to="bgChildren", + ) + + while True: + display_image = cv2.cvtColor(self.img_array, cv2.COLOR_BGR2RGB) + # aspect_ratio = self.img_width / self.img_height + session.upsert( + [ + ImageBackground( + display_image[:, :self.img_width], + aspect=1.778, + height=1, + distanceToCamera=1, + # The underlying rendering engine supported a layer binary bitmask for both objects and the camera. + # Below we set the two image planes, left and right, to layers=1 and layers=2. + # Note that these two masks are associated with left eye’s camera and the right eye’s camera. + layers=1, + format="jpeg", + quality=100, + key="background-left", + interpolate=True, + ), + ImageBackground( + display_image[:, self.img_width:], + aspect=1.778, + height=1, + distanceToCamera=1, + layers=2, + format="jpeg", + quality=100, + key="background-right", + interpolate=True, + ), + ], + to="bgChildren", + ) + # 'jpeg' encoding should give you about 30fps with a 16ms wait in-between. + await asyncio.sleep(0.016 * 2) + + async def main_image_monocular(self, session, fps=60): + if self.use_hand_tracking: + session.upsert( + Hands( + stream=True, + key="hands", + hideLeft=True, + hideRight=True + ), + to="bgChildren", + ) + else: + session.upsert( + MotionControllers( + stream=True, + key="motionControllers", + left=True, + right=True, + ), + to="bgChildren", + ) + + while True: + display_image = cv2.cvtColor(self.img_array, cv2.COLOR_BGR2RGB) + # aspect_ratio = self.img_width / self.img_height + session.upsert( + [ + ImageBackground( + display_image, + aspect=1.778, + height=1, + distanceToCamera=1, + format="jpeg", + quality=50, + key="background-mono", + interpolate=True, + ), + ], + to="bgChildren", + ) + await asyncio.sleep(0.016) + + async def main_image_webrtc(self, session, fps=60): + if self.use_hand_tracking: + session.upsert( + Hands( + stream=True, + key="hands", + showLeft=False, + showRight=False + ), + to="bgChildren", + ) + else: + session.upsert( + MotionControllers( + stream=True, + key="motionControllers", + showLeft=False, + showRight=False, + ) + ) + + session.upsert( + WebRTCVideoPlane( + # WebRTCStereoVideoPlane( + src="https://10.0.7.49:8080/offer", + iceServer={}, + key="webrtc", + aspect=1.778, + height = 7, + ), + to="bgChildren", + ) + while True: + await asyncio.sleep(1) + # ==================== common data ==================== + @property + def head_pose(self): + """np.ndarray, shape (4, 4), head SE(3) pose matrix from Vuer (basis OpenXR Convention).""" + with self.head_pose_shared.get_lock(): + return np.array(self.head_pose_shared[:]).reshape(4, 4, order="F") + + @property + def left_arm_pose(self): + """np.ndarray, shape (4, 4), left arm SE(3) pose matrix from Vuer (basis OpenXR Convention).""" + with self.left_arm_pose_shared.get_lock(): + return np.array(self.left_arm_pose_shared[:]).reshape(4, 4, order="F") + + @property + def right_arm_pose(self): + """np.ndarray, shape (4, 4), right arm SE(3) pose matrix from Vuer (basis OpenXR Convention).""" + with self.right_arm_pose_shared.get_lock(): + return np.array(self.right_arm_pose_shared[:]).reshape(4, 4, order="F") + + # ==================== Hand Tracking Data ==================== + @property + def left_hand_positions(self): + """np.ndarray, shape (25, 3), left hand 25 landmarks' 3D positions.""" + with self.left_hand_position_shared.get_lock(): + return np.array(self.left_hand_position_shared[:]).reshape(25, 3) + + @property + def right_hand_positions(self): + """np.ndarray, shape (25, 3), right hand 25 landmarks' 3D positions.""" + with self.right_hand_position_shared.get_lock(): + return np.array(self.right_hand_position_shared[:]).reshape(25, 3) + + @property + def left_hand_orientations(self): + """np.ndarray, shape (25, 3, 3), left hand 25 landmarks' orientations (flattened 3x3 matrices, column-major).""" + with self.left_hand_orientation_shared.get_lock(): + return np.array(self.left_hand_orientation_shared[:]).reshape(25, 9).reshape(25, 3, 3, order="F") + + @property + def right_hand_orientations(self): + """np.ndarray, shape (25, 3, 3), right hand 25 landmarks' orientations (flattened 3x3 matrices, column-major).""" + with self.right_hand_orientation_shared.get_lock(): + return np.array(self.right_hand_orientation_shared[:]).reshape(25, 9).reshape(25, 3, 3, order="F") + + @property + def left_hand_pinch_state(self): + """bool, whether left hand is pinching.""" + with self.left_pinch_state_shared.get_lock(): + return self.left_pinch_state_shared.value + + @property + def left_hand_pinch_value(self): + """float, pinch strength of left hand.""" + with self.left_pinch_value_shared.get_lock(): + return self.left_pinch_value_shared.value + + @property + def left_hand_squeeze_state(self): + """bool, whether left hand is squeezing.""" + with self.left_squeeze_state_shared.get_lock(): + return self.left_squeeze_state_shared.value + + @property + def left_hand_squeeze_value(self): + """float, squeeze strength of left hand.""" + with self.left_squeeze_value_shared.get_lock(): + return self.left_squeeze_value_shared.value + + @property + def right_hand_pinch_state(self): + """bool, whether right hand is pinching.""" + with self.right_pinch_state_shared.get_lock(): + return self.right_pinch_state_shared.value + + @property + def right_hand_pinch_value(self): + """float, pinch strength of right hand.""" + with self.right_pinch_value_shared.get_lock(): + return self.right_pinch_value_shared.value + + @property + def right_hand_squeeze_state(self): + """bool, whether right hand is squeezing.""" + with self.right_squeeze_state_shared.get_lock(): + return self.right_squeeze_state_shared.value + + @property + def right_hand_squeeze_value(self): + """float, squeeze strength of right hand.""" + with self.right_squeeze_value_shared.get_lock(): + return self.right_squeeze_value_shared.value + + # ==================== Controller Data ==================== + @property + def left_controller_trigger_state(self): + """bool, left controller trigger pressed or not.""" + with self.left_trigger_state_shared.get_lock(): + return self.left_trigger_state_shared.value + + @property + def left_controller_trigger_value(self): + """float, left controller trigger analog value (0.0 ~ 1.0).""" + with self.left_trigger_value_shared.get_lock(): + return self.left_trigger_value_shared.value + + @property + def left_controller_squeeze_state(self): + """bool, left controller squeeze pressed or not.""" + with self.left_squeeze_state_shared.get_lock(): + return self.left_squeeze_state_shared.value + + @property + def left_controller_squeeze_value(self): + """float, left controller squeeze analog value (0.0 ~ 1.0).""" + with self.left_squeeze_value_shared.get_lock(): + return self.left_squeeze_value_shared.value + + @property + def left_controller_thumbstick_state(self): + """bool, whether left thumbstick is touched or clicked.""" + with self.left_thumbstick_state_shared.get_lock(): + return self.left_thumbstick_state_shared.value + + @property + def left_controller_thumbstick_value(self): + """np.ndarray, shape (2,), left thumbstick 2D axis values (x, y).""" + with self.left_thumbstick_value_shared.get_lock(): + return np.array(self.left_thumbstick_value_shared[:]) + + @property + def left_controller_aButton(self): + """bool, left controller 'A' button pressed.""" + with self.left_aButton_shared.get_lock(): + return self.left_aButton_shared.value + + @property + def left_controller_bButton(self): + """bool, left controller 'B' button pressed.""" + with self.left_bButton_shared.get_lock(): + return self.left_bButton_shared.value + + @property + def right_controller_trigger_state(self): + """bool, right controller trigger pressed or not.""" + with self.right_trigger_state_shared.get_lock(): + return self.right_trigger_state_shared.value + + @property + def right_controller_trigger_value(self): + """float, right controller trigger analog value (0.0 ~ 1.0).""" + with self.right_trigger_value_shared.get_lock(): + return self.right_trigger_value_shared.value + + @property + def right_controller_squeeze_state(self): + """bool, right controller squeeze pressed or not.""" + with self.right_squeeze_state_shared.get_lock(): + return self.right_squeeze_state_shared.value + + @property + def right_controller_squeeze_value(self): + """float, right controller squeeze analog value (0.0 ~ 1.0).""" + with self.right_squeeze_value_shared.get_lock(): + return self.right_squeeze_value_shared.value + + @property + def right_controller_thumbstick_state(self): + """bool, whether right thumbstick is touched or clicked.""" + with self.right_thumbstick_state_shared.get_lock(): + return self.right_thumbstick_state_shared.value + + @property + def right_controller_thumbstick_value(self): + """np.ndarray, shape (2,), right thumbstick 2D axis values (x, y).""" + with self.right_thumbstick_value_shared.get_lock(): + return np.array(self.right_thumbstick_value_shared[:]) + + @property + def right_controller_aButton(self): + """bool, right controller 'A' button pressed.""" + with self.right_aButton_shared.get_lock(): + return self.right_aButton_shared.value + + @property + def right_controller_bButton(self): + """bool, right controller 'B' button pressed.""" + with self.right_bButton_shared.get_lock(): + return self.right_bButton_shared.value \ No newline at end of file diff --git a/src/televuer/tv_wrapper.py b/src/televuer/tv_wrapper.py new file mode 100644 index 0000000..b875a93 --- /dev/null +++ b/src/televuer/tv_wrapper.py @@ -0,0 +1,410 @@ +import numpy as np +from .televuer import TeleVuer +from dataclasses import dataclass, field + +""" +(basis) OpenXR Convention : y up, z back, x right. +(basis) Robot Convention : z up, y left, x front. + +under (basis) Robot Convention, humanoid arm's initial pose convention: + + # (initial pose) OpenXR Left Arm Pose Convention (hand tracking): + - the x-axis pointing from wrist toward middle. + - the y-axis pointing from index toward pinky. + - the z-axis pointing from palm toward back of the hand. + + # (initial pose) OpenXR Right Arm Pose Convention (hand tracking): + - the x-axis pointing from wrist toward middle. + - the y-axis pointing from pinky toward index. + - the z-axis pointing from palm toward back of the hand. + + # (initial pose) Unitree Humanoid Left Arm URDF Convention: + - the x-axis pointing from wrist toward middle. + - the y-axis pointing from palm toward back of the hand. + - the z-axis pointing from pinky toward index. + + # (initial pose) Unitree Humanoid Right Arm URDF Convention: + - the x-axis pointing from wrist toward middle. + - the y-axis pointing from back of the hand toward palm. + - the z-axis pointing from pinky toward index. + +under (basis) Robot Convention, humanoid hand's initial pose convention: + + # (initial pose) OpenXR Left Hand Pose Convention (hand tracking): + - the x-axis pointing from wrist toward middle. + - the y-axis pointing from index toward pinky. + - the z-axis pointing from palm toward back of the hand. + + # (initial pose) OpenXR Right Hand Pose Convention (hand tracking): + - the x-axis pointing from wrist toward middle. + - the y-axis pointing from pinky toward index. + - the z-axis pointing from palm toward back of the hand. + + # (initial pose) Unitree Humanoid Left Hand URDF Convention: + - The x-axis pointing from palm toward back of the hand. + - The y-axis pointing from middle toward wrist. + - The z-axis pointing from pinky toward index. + + # (initial pose) Unitree Humanoid Right Hand URDF Convention: + - The x-axis pointing from palm toward back of the hand. + - The y-axis pointing from middle toward wrist. + - The z-axis pointing from index toward pinky. + +p.s. TeleVuer obtains all raw data under the (basis) OpenXR Convention. + In addition, arm pose data (hand tracking) follows the (initial pose) OpenXR Arm Pose Convention, + while arm pose data (controller tracking) directly follows the (initial pose) Unitree Humanoid Arm URDF Convention (thus no transform is needed). + Meanwhile, all raw data is in the WORLD frame defined by XR device odometry. + +p.s. From website: https://registry.khronos.org/OpenXR/specs/1.1/man/html/openxr.html. + You can find **(initial pose) OpenXR Left/Right Arm Pose Convention** related information like this below: + "The wrist joint is located at the pivot point of the wrist, which is location invariant when twisting the hand without moving the forearm. + The backward (+Z) direction is parallel to the line from wrist joint to middle finger metacarpal joint, and points away from the finger tips. + The up (+Y) direction points out towards back of the hand and perpendicular to the skin at wrist. + The X direction is perpendicular to the Y and Z directions and follows the right hand rule." + Note: The above context is of course under **(basis) OpenXR Convention**. + +p.s. **Unitree Arm/Hand URDF initial pose Convention** information come from URDF files. +""" + + +def safe_mat_update(prev_mat, mat): + # Return previous matrix and False flag if the new matrix is non-singular (determinant ≠ 0). + det = np.linalg.det(mat) + if not np.isfinite(det) or np.isclose(det, 0.0, atol=1e-6): + return prev_mat, False + return mat, True + +def fast_mat_inv(mat): + ret = np.eye(4) + ret[:3, :3] = mat[:3, :3].T + ret[:3, 3] = -mat[:3, :3].T @ mat[:3, 3] + return ret + +def safe_rot_update(prev_rot_array, rot_array): + dets = np.linalg.det(rot_array) + if not np.all(np.isfinite(dets)) or np.any(np.isclose(dets, 0.0, atol=1e-6)): + return prev_rot_array, False + return rot_array, True + +# constants variable +T_TO_UNITREE_HUMANOID_LEFT_ARM = np.array([[1, 0, 0, 0], + [0, 0,-1, 0], + [0, 1, 0, 0], + [0, 0, 0, 1]]) + +T_TO_UNITREE_HUMANOID_RIGHT_ARM = np.array([[1, 0, 0, 0], + [0, 0, 1, 0], + [0,-1, 0, 0], + [0, 0, 0, 1]]) + +T_TO_UNITREE_HAND = np.array([[0, 0, 1, 0], + [-1, 0, 0, 0], + [0, -1, 0, 0], + [0, 0, 0, 1]]) + +T_ROBOT_OPENXR = np.array([[ 0, 0,-1, 0], + [-1, 0, 0, 0], + [ 0, 1, 0, 0], + [ 0, 0, 0, 1]]) + +T_OPENXR_ROBOT = np.array([[ 0,-1, 0, 0], + [ 0, 0, 1, 0], + [-1, 0, 0, 0], + [ 0, 0, 0, 1]]) + +R_ROBOT_OPENXR = np.array([[ 0, 0,-1], + [-1, 0, 0], + [ 0, 1, 0]]) + +R_OPENXR_ROBOT = np.array([[ 0,-1, 0], + [ 0, 0, 1], + [-1, 0, 0]]) + +CONST_HEAD_POSE = np.array([[1, 0, 0, 0], + [0, 1, 0, 1.5], + [0, 0, 1, -0.2], + [0, 0, 0, 1]]) + +# For Robot initial position +CONST_RIGHT_ARM_POSE = np.array([[1, 0, 0, 0.15], + [0, 1, 0, 1.13], + [0, 0, 1, -0.3], + [0, 0, 0, 1]]) + +CONST_LEFT_ARM_POSE = np.array([[1, 0, 0, -0.15], + [0, 1, 0, 1.13], + [0, 0, 1, -0.3], + [0, 0, 0, 1]]) + +CONST_HAND_ROT = np.tile(np.eye(3)[None, :, :], (25, 1, 1)) + +@dataclass +class TeleStateData: + # hand tracking + left_pinch_state: bool = False # True if index and thumb are pinching + left_squeeze_state: bool = False # True if hand is making a fist + left_squeeze_value: float = 0.0 # (0.0 ~ 1.0) degree of hand squeeze + right_pinch_state: bool = False # True if index and thumb are pinching + right_squeeze_state: bool = False # True if hand is making a fist + right_squeeze_value: float = 0.0 # (0.0 ~ 1.0) degree of hand squeeze + + # controller tracking + left_trigger_state: bool = False # True if trigger is actively pressed + left_squeeze_ctrl_state: bool = False # True if grip button is pressed + left_squeeze_ctrl_value: float = 0.0 # (0.0 ~ 1.0) grip pull depth + left_thumbstick_state: bool = False # True if thumbstick button is pressed + left_thumbstick_value: np.ndarray = field(default_factory=lambda: np.zeros(2)) # 2D vector (x, y), normalized + left_aButton: bool = False # True if A button is pressed + left_bButton: bool = False # True if B button is pressed + right_trigger_state: bool = False # True if trigger is actively pressed + right_squeeze_ctrl_state: bool = False # True if grip button is pressed + right_squeeze_ctrl_value: float = 0.0 # (0.0 ~ 1.0) grip pull depth + right_thumbstick_state: bool = False # True if thumbstick button is pressed + right_thumbstick_value: np.ndarray = field(default_factory=lambda: np.zeros(2)) # 2D vector (x, y), normalized + right_aButton: bool = False # True if A button is pressed + right_bButton: bool = False # True if B button is pressed + +@dataclass +class TeleData: + head_pose: np.ndarray # (4,4) SE(3) pose of head matrix + left_arm_pose: np.ndarray # (4,4) SE(3) pose of left arm + right_arm_pose: np.ndarray # (4,4) SE(3) pose of right arm + left_hand_pos: np.ndarray = None # (25,3) 3D positions of left hand joints + right_hand_pos: np.ndarray = None # (25,3) 3D positions of right hand joints + left_hand_rot: np.ndarray = None # (25,3,3) rotation matrices of left hand joints + right_hand_rot: np.ndarray = None # (25,3,3) rotation matrices of right hand joints + left_pinch_value: float = None # float (1x.0 ~ 0.0) pinch distance between index and thumb + right_pinch_value: float = None # float (1x.0 ~ 0.0) pinch distance between index and thumb + left_trigger_value: float = None # float (10.0 ~ 0.0) trigger pull depth + right_trigger_value: float = None # float (10.0 ~ 0.0) trigger pull depth + tele_state: TeleStateData = field(default_factory=TeleStateData) + + +class TeleVuerWrapper: + def __init__(self, binocular: bool, use_hand_tracking: bool, img_shape, img_shm_name, return_state_data: bool = True, return_hand_rot_data: bool = False, + cert_file = None, key_file = None, ngrok = False, webrtc = False): + """ + TeleVuerWrapper is a wrapper for the TeleVuer class, which handles XR device's data suit for robot control. + It initializes the TeleVuer instance with the specified parameters and provides a method to get motion state data. + + :param binocular: A boolean indicating whether the head camera device is binocular or not. + :param use_hand_tracking: A boolean indicating whether to use hand tracking or use controller tracking. + :param img_shape: The shape of the image to be processed. + :param img_shm_name: The name of the shared memory for the image. + :param return_state: A boolean indicating whether to return the state of the hand or controller. + :param return_hand_rot: A boolean indicating whether to return the hand rotation data. + :param cert_file: The path to the certificate file for secure connection. + :param key_file: The path to the key file for secure connection. + """ + self.use_hand_tracking = use_hand_tracking + self.return_state_data = return_state_data + self.return_hand_rot_data = return_hand_rot_data + self.tvuer = TeleVuer(binocular, use_hand_tracking, img_shape, img_shm_name, cert_file=cert_file, key_file=key_file, + ngrok=ngrok, webrtc=webrtc) + + def get_motion_state_data(self): + """ + Get processed motion state data from the TeleVuer instance. + + All returned data are transformed from the OpenXR Convention to the (Robot & Unitree) Convention. + """ + # Variable Naming Convention below, + # ┌────────────┬───────────────────────────┬──────────────────────────────────┬────────────────────────────────────┬────────────────────────────────────┐ + # │left / right│ Bxr │ Brobot │ IPxr │ IPunitree │ + # │────────────│───────────────────────────│──────────────────────────────────│────────────────────────────────────│────────────────────────────────────│ + # │ side │ (basis) OpenXR Convention │ (basis) Robot Convention │ (initial pose) OpenXR Convention │ (initial pose) Unitree Convention │ + # └────────────┴───────────────────────────┴──────────────────────────────────┴────────────────────────────────────┴────────────────────────────────────┘ + # ┌───────────────────────────────────┬─────────────────────┐ + # │ world / arm / head / waist │ arm / head / hand │ + # │───────────────────────────────────│─────────────────────│ + # │ source frame │ target frame │ + # └───────────────────────────────────┴─────────────────────┘ + + # TeleVuer (Vuer) obtains all raw data under the (basis) OpenXR Convention. + Bxr_world_head, head_pose_is_valid = safe_mat_update(CONST_HEAD_POSE, self.tvuer.head_pose) + + if self.use_hand_tracking: + # 'Arm' pose data follows (basis) OpenXR Convention and (initial pose) OpenXR Arm Convention. + left_IPxr_Bxr_world_arm, left_arm_is_valid = safe_mat_update(CONST_LEFT_ARM_POSE, self.tvuer.left_arm_pose) + right_IPxr_Bxr_world_arm, right_arm_is_valid = safe_mat_update(CONST_RIGHT_ARM_POSE, self.tvuer.right_arm_pose) + + # Change basis convention + # From (basis) OpenXR Convention to (basis) Robot Convention: + # Brobot_Pose = T_{robot}_{openxr} * Bxr_Pose * T_{robot}_{openxr}^T ==> + # Brobot_Pose = T_{robot}_{openxr} * Bxr_Pose * T_{openxr}_{robot} + # Reason for right multiply T_OPENXR_ROBOT = fast_mat_inv(T_ROBOT_OPENXR): + # This is similarity transformation: B = PAP^{-1}, that is B ~ A + # For example: + # - For a pose data T_r under the (basis) Robot Convention, left-multiplying Brobot_Pose means: + # Brobot_Pose * T_r ==> T_{robot}_{openxr} * PoseMatrix_openxr * T_{openxr}_{robot} * T_r + # - First, transform T_r to the (basis) OpenXR Convention (The function of T_{openxr}_{robot}) + # - Then, apply the rotation PoseMatrix_openxr in the OpenXR Convention (The function of PoseMatrix_openxr) + # - Finally, transform back to the Robot Convention (The function of T_{robot}_{openxr}) + # - This results in the same rotation effect under the Robot Convention as in the OpenXR Convention. + Brobot_world_head = T_ROBOT_OPENXR @ Bxr_world_head @ T_OPENXR_ROBOT + left_IPxr_Brobot_world_arm = T_ROBOT_OPENXR @ left_IPxr_Bxr_world_arm @ T_OPENXR_ROBOT + right_IPxr_Brobot_world_arm = T_ROBOT_OPENXR @ right_IPxr_Bxr_world_arm @ T_OPENXR_ROBOT + + # Change initial pose convention + # From (initial pose) OpenXR Arm Convention to (initial pose) Unitree Humanoid Arm URDF Convention + # Reason for right multiply (T_TO_UNITREE_HUMANOID_LEFT_ARM) : Rotate 90 degrees counterclockwise about its own x-axis. + # Reason for right multiply (T_TO_UNITREE_HUMANOID_RIGHT_ARM): Rotate 90 degrees clockwise about its own x-axis. + left_IPunitree_Brobot_world_arm = left_IPxr_Brobot_world_arm @ (T_TO_UNITREE_HUMANOID_LEFT_ARM if left_arm_is_valid else np.eye(4)) + right_IPunitree_Brobot_world_arm = right_IPxr_Brobot_world_arm @ (T_TO_UNITREE_HUMANOID_RIGHT_ARM if right_arm_is_valid else np.eye(4)) + + # Transfer from WORLD to HEAD coordinate (translation adjustment only) + left_IPunitree_Brobot_head_arm = left_IPunitree_Brobot_world_arm.copy() + right_IPunitree_Brobot_head_arm = right_IPunitree_Brobot_world_arm.copy() + left_IPunitree_Brobot_head_arm[0:3, 3] = left_IPunitree_Brobot_head_arm[0:3, 3] - Brobot_world_head[0:3, 3] + right_IPunitree_Brobot_head_arm[0:3, 3] = right_IPunitree_Brobot_world_arm[0:3, 3] - Brobot_world_head[0:3, 3] + + # =====coordinate origin offset===== + # The origin of the coordinate for IK Solve is near the WAIST joint motor. You can use teleop/robot_control/robot_arm_ik.py Unit_Test to visualize it. + # The origin of the coordinate of IPunitree_Brobot_head_arm is HEAD. + # So it is necessary to translate the origin of IPunitree_Brobot_head_arm from HEAD to WAIST. + left_IPunitree_Brobot_waist_arm = left_IPunitree_Brobot_head_arm.copy() + right_IPunitree_Brobot_waist_arm = right_IPunitree_Brobot_head_arm.copy() + left_IPunitree_Brobot_waist_arm[0, 3] +=0.15 # x + right_IPunitree_Brobot_waist_arm[0,3] +=0.15 + left_IPunitree_Brobot_waist_arm[2, 3] +=0.45 # z + right_IPunitree_Brobot_waist_arm[2,3] +=0.45 + + # -----------------------------------hand position---------------------------------------- + if left_arm_is_valid and right_arm_is_valid: + # Homogeneous, [xyz] to [xyz1] + # np.concatenate([25,3]^T,(1,25)) ==> Bxr_world_hand_pos.shape is (4,25) + # Now under (basis) OpenXR Convention, Bxr_world_hand_pos data like this: + # [x0 x1 x2 ··· x23 x24] + # [y0 y1 y1 ··· y23 y24] + # [z0 z1 z2 ··· z23 z24] + # [ 1 1 1 ··· 1 1] + left_IPxr_Bxr_world_hand_pos = np.concatenate([self.tvuer.left_hand_positions.T, np.ones((1, self.tvuer.left_hand_positions.shape[0]))]) + right_IPxr_Bxr_world_hand_pos = np.concatenate([self.tvuer.right_hand_positions.T, np.ones((1, self.tvuer.right_hand_positions.shape[0]))]) + + # Change basis convention + # From (basis) OpenXR Convention to (basis) Robot Convention + # Just a change of basis for 3D points. No rotation, only translation. So, no need to right-multiply fast_mat_inv(T_ROBOT_OPENXR). + left_IPxr_Brobot_world_hand_pos = T_ROBOT_OPENXR @ left_IPxr_Bxr_world_hand_pos + right_IPxr_Brobot_world_hand_pos = T_ROBOT_OPENXR @ right_IPxr_Bxr_world_hand_pos + + # Transfer from WORLD to ARM frame under (basis) Robot Convention: + # Brobot_{world}_{arm}^T * Brobot_{world}_pos ==> Brobot_{arm}_{world} * Brobot_{world}_pos ==> Brobot_arm_hand_pos, Now it's based on the arm frame. + left_IPxr_Brobot_arm_hand_pos = fast_mat_inv(left_IPxr_Brobot_world_arm) @ left_IPxr_Brobot_world_hand_pos + right_IPxr_Brobot_arm_hand_pos = fast_mat_inv(right_IPxr_Brobot_world_arm) @ right_IPxr_Brobot_world_hand_pos + + # Change initial pose convention + # From (initial pose) XR Hand Convention to (initial pose) Unitree Humanoid Hand URDF Convention: + # T_TO_UNITREE_HAND @ IPxr_Brobot_arm_hand_pos ==> IPunitree_Brobot_arm_hand_pos + # ((4,4) @ (4,25))[0:3, :].T ==> (4,25)[0:3, :].T ==> (3,25).T ==> (25,3) + # Now under (initial pose) Unitree Humanoid Hand URDF Convention, matrix shape like this: + # [x0, y0, z0] + # [x1, y1, z1] + # ··· + # [x23,y23,z23] + # [x24,y24,z24] + left_IPunitree_Brobot_arm_hand_pos = (T_TO_UNITREE_HAND @ left_IPxr_Brobot_arm_hand_pos)[0:3, :].T + right_IPunitree_Brobot_arm_hand_pos = (T_TO_UNITREE_HAND @ right_IPxr_Brobot_arm_hand_pos)[0:3, :].T + else: + left_IPunitree_Brobot_arm_hand_pos = np.zeros((25, 3)) + right_IPunitree_Brobot_arm_hand_pos = np.zeros((25, 3)) + + # -----------------------------------hand rotation---------------------------------------- + if self.return_hand_rot_data: + left_Bxr_world_hand_rot, left_hand_rot_is_valid = safe_rot_update(CONST_HAND_ROT, self.tvuer.left_hand_orientations) # [25, 3, 3] + right_Bxr_world_hand_rot, right_hand_rot_is_valid = safe_rot_update(CONST_HAND_ROT, self.tvuer.right_hand_orientations) + + if left_hand_rot_is_valid and right_hand_rot_is_valid: + left_Bxr_arm_hand_rot = np.einsum('ij,njk->nik', left_IPxr_Bxr_world_arm[:3, :3].T, left_Bxr_world_hand_rot) + right_Bxr_arm_hand_rot = np.einsum('ij,njk->nik', right_IPxr_Bxr_world_arm[:3, :3].T, right_Bxr_world_hand_rot) + # Change basis convention + left_Brobot_arm_hand_rot = np.einsum('ij,njk,kl->nil', R_ROBOT_OPENXR, left_Bxr_arm_hand_rot, R_OPENXR_ROBOT) + right_Brobot_arm_hand_rot = np.einsum('ij,njk,kl->nil', R_ROBOT_OPENXR, right_Bxr_arm_hand_rot, R_OPENXR_ROBOT) + else: + left_Brobot_arm_hand_rot = left_Bxr_world_hand_rot + right_Brobot_arm_hand_rot = right_Bxr_world_hand_rot + else: + left_Brobot_arm_hand_rot = None + right_Brobot_arm_hand_rot = None + + if self.return_state_data: + hand_state = TeleStateData( + left_pinch_state=self.tvuer.left_hand_pinch_state, + left_squeeze_state=self.tvuer.left_hand_squeeze_state, + left_squeeze_value=self.tvuer.left_hand_squeeze_value, + right_pinch_state=self.tvuer.right_hand_pinch_state, + right_squeeze_state=self.tvuer.right_hand_squeeze_state, + right_squeeze_value=self.tvuer.right_hand_squeeze_value, + ) + else: + hand_state = None + + return TeleData( + head_pose=Brobot_world_head, + left_arm_pose=left_IPunitree_Brobot_waist_arm, + right_arm_pose=right_IPunitree_Brobot_waist_arm, + left_hand_pos=left_IPunitree_Brobot_arm_hand_pos, + right_hand_pos=right_IPunitree_Brobot_arm_hand_pos, + left_hand_rot=left_Brobot_arm_hand_rot, + right_hand_rot=right_Brobot_arm_hand_rot, + left_pinch_value=self.tvuer.left_hand_pinch_value * 100.0, + right_pinch_value=self.tvuer.right_hand_pinch_value * 100.0, + tele_state=hand_state + ) + else: + # Controller pose data directly follows the (initial pose) Unitree Humanoid Arm URDF Convention (thus no transform is needed). + left_IPunitree_Bxr_world_arm, left_arm_is_valid = safe_mat_update(CONST_LEFT_ARM_POSE, self.tvuer.left_arm_pose) + right_IPunitree_Bxr_world_arm, right_arm_is_valid = safe_mat_update(CONST_RIGHT_ARM_POSE, self.tvuer.right_arm_pose) + + # Change basis convention + Brobot_world_head = T_ROBOT_OPENXR @ Bxr_world_head @ T_OPENXR_ROBOT + left_IPunitree_Brobot_world_arm = T_ROBOT_OPENXR @ left_IPunitree_Bxr_world_arm @ T_OPENXR_ROBOT + right_IPunitree_Brobot_world_arm = T_ROBOT_OPENXR @ right_IPunitree_Bxr_world_arm @ T_OPENXR_ROBOT + + # Transfer from WORLD to HEAD coordinate (translation adjustment only) + left_IPunitree_Brobot_head_arm = left_IPunitree_Brobot_world_arm.copy() + right_IPunitree_Brobot_head_arm = right_IPunitree_Brobot_world_arm.copy() + left_IPunitree_Brobot_head_arm[0:3, 3] = left_IPunitree_Brobot_head_arm[0:3, 3] - Brobot_world_head[0:3, 3] + right_IPunitree_Brobot_head_arm[0:3, 3] = right_IPunitree_Brobot_head_arm[0:3, 3] - Brobot_world_head[0:3, 3] + + # =====coordinate origin offset===== + # The origin of the coordinate for IK Solve is near the WAIST joint motor. You can use teleop/robot_control/robot_arm_ik.py Unit_Test to check it. + # The origin of the coordinate of IPunitree_Brobot_head_arm is HEAD. + # So it is necessary to translate the origin of IPunitree_Brobot_head_arm from HEAD to WAIST. + left_IPunitree_Brobot_waist_arm = left_IPunitree_Brobot_head_arm.copy() + right_IPunitree_Brobot_waist_arm = right_IPunitree_Brobot_head_arm.copy() + left_IPunitree_Brobot_waist_arm[0, 3] +=0.15 # x + right_IPunitree_Brobot_waist_arm[0,3] +=0.15 + left_IPunitree_Brobot_waist_arm[2, 3] +=0.45 # z + right_IPunitree_Brobot_waist_arm[2,3] +=0.45 + # left_IPunitree_Brobot_waist_arm[1, 3] +=0.02 # y + # right_IPunitree_Brobot_waist_arm[1,3] +=0.02 + + # return data + if self.return_state_data: + controller_state = TeleStateData( + left_trigger_state=self.tvuer.left_controller_trigger_state, + left_squeeze_ctrl_state=self.tvuer.left_controller_squeeze_state, + left_squeeze_ctrl_value=self.tvuer.left_controller_squeeze_value, + left_thumbstick_state=self.tvuer.left_controller_thumbstick_state, + left_thumbstick_value=self.tvuer.left_controller_thumbstick_value, + left_aButton=self.tvuer.left_controller_aButton, + left_bButton=self.tvuer.left_controller_bButton, + right_trigger_state=self.tvuer.right_controller_trigger_state, + right_squeeze_ctrl_state=self.tvuer.right_controller_squeeze_state, + right_squeeze_ctrl_value=self.tvuer.right_controller_squeeze_value, + right_thumbstick_state=self.tvuer.right_controller_thumbstick_state, + right_thumbstick_value=self.tvuer.right_controller_thumbstick_value, + right_aButton=self.tvuer.right_controller_aButton, + right_bButton=self.tvuer.right_controller_bButton, + ) + else: + controller_state = None + + return TeleData( + head_pose=Brobot_world_head, + left_arm_pose=left_IPunitree_Brobot_waist_arm, + right_arm_pose=right_IPunitree_Brobot_waist_arm, + left_trigger_value=10.0 - self.tvuer.left_controller_trigger_value * 10, + right_trigger_value=10.0 - self.tvuer.right_controller_trigger_value * 10, + tele_state=controller_state + ) \ No newline at end of file diff --git a/test/_test_televuer.py b/test/_test_televuer.py new file mode 100644 index 0000000..8e8ccca --- /dev/null +++ b/test/_test_televuer.py @@ -0,0 +1,86 @@ +import os, sys +this_file = os.path.abspath(__file__) +project_root = os.path.abspath(os.path.join(os.path.dirname(this_file), '..')) +if project_root not in sys.path: + sys.path.insert(0, project_root) + +import time +import numpy as np +from multiprocessing import shared_memory +from televuer import TeleVuer +import logging_mp +logger_mp = logging_mp.get_logger(__name__, level=logging_mp.INFO) + +def run_test_TeleVuer(): + # image + image_shape = (480, 640 * 2, 3) + image_shm = shared_memory.SharedMemory(create=True, size=np.prod(image_shape) * np.uint8().itemsize) + image_array = np.ndarray(image_shape, dtype=np.uint8, buffer=image_shm.buf) + + # from image_server.image_client import ImageClient + # import threading + # image_client = ImageClient(tv_img_shape = image_shape, tv_img_shm_name = image_shm.name, image_show=True, server_address="127.0.0.1") + # image_receive_thread = threading.Thread(target = image_client.receive_process, daemon = True) + # image_receive_thread.daemon = True + # image_receive_thread.start() + + # xr-mode + use_hand_track = True + tv = TeleVuer(binocular = True, use_hand_tracking = use_hand_track, img_shape = image_shape, img_shm_name = image_shm.name, webrtc=False) + + try: + input("Press Enter to start TeleVuer test...") + running = True + while running: + logger_mp.info("=" * 80) + logger_mp.info("Common Data (always available):") + logger_mp.info(f"head_pose shape: {tv.head_pose.shape}\n{tv.head_pose}\n") + logger_mp.info(f"left_arm_pose shape: {tv.left_arm_pose.shape}\n{tv.left_arm_pose}\n") + logger_mp.info(f"right_arm_pose shape: {tv.right_arm_pose.shape}\n{tv.right_arm_pose}\n") + logger_mp.info("=" * 80) + + if use_hand_track: + logger_mp.info("Hand Tracking Data:") + logger_mp.info(f"left_hand_positions shape: {tv.left_hand_positions.shape}\n{tv.left_hand_positions}\n") + logger_mp.info(f"right_hand_positions shape: {tv.right_hand_positions.shape}\n{tv.right_hand_positions}\n") + logger_mp.info(f"left_hand_orientations shape: {tv.left_hand_orientations.shape}\n{tv.left_hand_orientations}\n") + logger_mp.info(f"right_hand_orientations shape: {tv.right_hand_orientations.shape}\n{tv.right_hand_orientations}\n") + logger_mp.info(f"left_hand_pinch_state: {tv.left_hand_pinch_state}") + logger_mp.info(f"left_hand_pinch_value: {tv.left_hand_pinch_value}") + logger_mp.info(f"left_hand_squeeze_state: {tv.left_hand_squeeze_state}") + logger_mp.info(f"left_hand_squeeze_value: {tv.left_hand_squeeze_value}") + logger_mp.info(f"right_hand_pinch_state: {tv.right_hand_pinch_state}") + logger_mp.info(f"right_hand_pinch_value: {tv.right_hand_pinch_value}") + logger_mp.info(f"right_hand_squeeze_state: {tv.right_hand_squeeze_state}") + logger_mp.info(f"right_hand_squeeze_value: {tv.right_hand_squeeze_value}") + else: + logger_mp.info("Controller Data:") + logger_mp.info(f"left_controller_trigger_state: {tv.left_controller_trigger_state}") + logger_mp.info(f"left_controller_trigger_value: {tv.left_controller_trigger_value}") + logger_mp.info(f"left_controller_squeeze_state: {tv.left_controller_squeeze_state}") + logger_mp.info(f"left_controller_squeeze_value: {tv.left_controller_squeeze_value}") + logger_mp.info(f"left_controller_thumbstick_state: {tv.left_controller_thumbstick_state}") + logger_mp.info(f"left_controller_thumbstick_value: {tv.left_controller_thumbstick_value}") + logger_mp.info(f"left_controller_aButton: {tv.left_controller_aButton}") + logger_mp.info(f"left_controller_bButton: {tv.left_controller_bButton}") + logger_mp.info(f"right_controller_trigger_state: {tv.right_controller_trigger_state}") + logger_mp.info(f"right_controller_trigger_value: {tv.right_controller_trigger_value}") + logger_mp.info(f"right_controller_squeeze_state: {tv.right_controller_squeeze_state}") + logger_mp.info(f"right_controller_squeeze_value: {tv.right_controller_squeeze_value}") + logger_mp.info(f"right_controller_thumbstick_state: {tv.right_controller_thumbstick_state}") + logger_mp.info(f"right_controller_thumbstick_value: {tv.right_controller_thumbstick_value}") + logger_mp.info(f"right_controller_aButton: {tv.right_controller_aButton}") + logger_mp.info(f"right_controller_bButton: {tv.right_controller_bButton}") + logger_mp.info("=" * 80) + time.sleep(0.03) + except KeyboardInterrupt: + running = False + logger_mp.warning("KeyboardInterrupt, exiting program...") + finally: + image_shm.unlink() + image_shm.close() + logger_mp.warning("Finally, exiting program...") + exit(0) + +if __name__ == '__main__': + run_test_TeleVuer() \ No newline at end of file diff --git a/test/_test_tv_wrapper.py b/test/_test_tv_wrapper.py new file mode 100644 index 0000000..d0f28e7 --- /dev/null +++ b/test/_test_tv_wrapper.py @@ -0,0 +1,95 @@ +import os, sys +this_file = os.path.abspath(__file__) +project_root = os.path.abspath(os.path.join(os.path.dirname(this_file), '..')) +if project_root not in sys.path: + sys.path.insert(0, project_root) + +import numpy as np +import time +from multiprocessing import shared_memory +from televuer import TeleVuerWrapper +import logging_mp +logger_mp = logging_mp.get_logger(__name__, level=logging_mp.INFO) + + +def run_test_tv_wrapper(): + image_shape = (480, 640 * 2, 3) + image_shm = shared_memory.SharedMemory(create=True, size=np.prod(image_shape) * np.uint8().itemsize) + image_array = np.ndarray(image_shape, dtype=np.uint8, buffer=image_shm.buf) + + # from image_server.image_client import ImageClient + # import threading + # image_client = ImageClient(tv_img_shape = image_shape, tv_img_shm_name = image_shm.name, image_show=True, server_address="127.0.0.1") + # image_receive_thread = threading.Thread(target = image_client.receive_process, daemon = True) + # image_receive_thread.daemon = True + # image_receive_thread.start() + + use_hand_track=False + tv_wrapper = TeleVuerWrapper(binocular=True, use_hand_tracking=use_hand_track, img_shape=image_shape, img_shm_name=image_shm.name, + return_state_data=True, return_hand_rot_data = True) + try: + input("Press Enter to start tv_wrapper test...") + running = True + while running: + start_time = time.time() + teleData = tv_wrapper.get_motion_state_data() + + logger_mp.info("=== TeleData Snapshot ===") + logger_mp.info(f"[Head Rotation Matrix]:\n{teleData.head_pose}") + logger_mp.info(f"[Left EE Pose]:\n{teleData.left_arm_pose}") + logger_mp.info(f"[Right EE Pose]:\n{teleData.right_arm_pose}") + + if use_hand_track: + logger_mp.info(f"[Left Hand Position] shape {teleData.left_hand_pos.shape}:\n{teleData.left_hand_pos}") + logger_mp.info(f"[Right Hand Position] shape {teleData.right_hand_pos.shape}:\n{teleData.right_hand_pos}") + + if teleData.left_hand_rot is not None: + logger_mp.info(f"[Left Hand Rotation] shape {teleData.left_hand_rot.shape}:\n{teleData.left_hand_rot}") + if teleData.right_hand_rot is not None: + logger_mp.info(f"[Right Hand Rotation] shape {teleData.right_hand_rot.shape}:\n{teleData.right_hand_rot}") + + if teleData.left_pinch_value is not None: + logger_mp.info(f"[Left Pinch Value]: {teleData.left_pinch_value:.2f}") + if teleData.right_pinch_value is not None: + logger_mp.info(f"[Right Pinch Value]: {teleData.right_pinch_value:.2f}") + + if teleData.tele_state: + state = teleData.tele_state + logger_mp.info("[Hand State]:") + logger_mp.info(f" Left Pinch state: {state.left_pinch_state}") + logger_mp.info(f" Left Squeeze: {state.left_squeeze_state} ({state.left_squeeze_value:.2f})") + logger_mp.info(f" Right Pinch state: {state.right_pinch_state}") + logger_mp.info(f" Right Squeeze: {state.right_squeeze_state} ({state.right_squeeze_value:.2f})") + else: + logger_mp.info(f"[Left Trigger Value]: {teleData.left_trigger_value:.2f}") + logger_mp.info(f"[Right Trigger Value]: {teleData.right_trigger_value:.2f}") + + if teleData.tele_state: + state = teleData.tele_state + logger_mp.info("[Controller State]:") + logger_mp.info(f" Left Trigger: {state.left_trigger_state}") + logger_mp.info(f" Left Squeeze: {state.left_squeeze_ctrl_state} ({state.left_squeeze_ctrl_value:.2f})") + logger_mp.info(f" Left Thumbstick: {state.left_thumbstick_state} ({state.left_thumbstick_value})") + logger_mp.info(f" Left A/B Buttons: A={state.left_aButton}, B={state.left_bButton}") + logger_mp.info(f" Right Trigger: {state.right_trigger_state}") + logger_mp.info(f" Right Squeeze: {state.right_squeeze_ctrl_state} ({state.right_squeeze_ctrl_value:.2f})") + logger_mp.info(f" Right Thumbstick: {state.right_thumbstick_state} ({state.right_thumbstick_value})") + logger_mp.info(f" Right A/B Buttons: A={state.right_aButton}, B={state.right_bButton}") + + current_time = time.time() + time_elapsed = current_time - start_time + sleep_time = max(0, 0.033 - time_elapsed) + time.sleep(sleep_time) + logger_mp.debug(f"main process sleep: {sleep_time}") + + except KeyboardInterrupt: + running = False + logger_mp.warning("KeyboardInterrupt, exiting program...") + finally: + image_shm.unlink() + image_shm.close() + logger_mp.warning("Finally, exiting program...") + exit(0) + +if __name__ == '__main__': + run_test_tv_wrapper() \ No newline at end of file