Browse Source

[feat] support webrtc

main
silencht 5 months ago
parent
commit
3bc0cad0ce
  1. 1
      .gitignore
  2. 85
      src/televuer/televuer.py
  3. 8
      src/televuer/tv_wrapper.py
  4. 5
      test/_test_televuer.py
  5. 7
      test/_test_tv_wrapper.py

1
.gitignore

@ -3,3 +3,4 @@
*.egg-info *.egg-info
*build/ *build/
*.pem *.pem
.vscode/

85
src/televuer/televuer.py

@ -10,7 +10,8 @@ from pathlib import Path
class TeleVuer: class TeleVuer:
def __init__(self, binocular: bool, use_hand_tracking: bool, img_shape, cert_file=None, key_file=None, ngrok=False, webrtc=False):
def __init__(self, binocular: bool, use_hand_tracking: bool, img_shape, cert_file=None, key_file=None, ngrok=False,
use_image = True, webrtc=False):
""" """
TeleVuer class for OpenXR-based XR teleoperate applications. TeleVuer class for OpenXR-based XR teleoperate applications.
This class handles the communication with the Vuer server and manages the shared memory for image and pose data. This class handles the communication with the Vuer server and manages the shared memory for image and pose data.
@ -21,7 +22,9 @@ class TeleVuer:
:param cert_file: str, path to the SSL certificate file. :param cert_file: str, path to the SSL certificate file.
:param key_file: str, path to the SSL key file. :param key_file: str, path to the SSL key file.
:param ngrok: bool, whether to use ngrok for tunneling. :param ngrok: bool, whether to use ngrok for tunneling.
:param webrtc: bool, whether to use WebRTC for real-time communication.
:param use_image: bool, whether to use image streaming: ImageBackground or WebRTCVideoPlane.
if False, no image stream is used.
:param webrtc: bool, whether to use WebRTC for real-time communication. if False, use ImageBackground.
""" """
self.binocular = binocular self.binocular = binocular
self.use_hand_tracking = use_hand_tracking self.use_hand_tracking = use_hand_tracking
@ -56,13 +59,17 @@ class TeleVuer:
self.vuer.add_handler("HAND_MOVE")(self.on_hand_move) self.vuer.add_handler("HAND_MOVE")(self.on_hand_move)
else: else:
self.vuer.add_handler("CONTROLLER_MOVE")(self.on_controller_move) self.vuer.add_handler("CONTROLLER_MOVE")(self.on_controller_move)
self.use_image = use_image
self.webrtc = webrtc self.webrtc = webrtc
if self.binocular and not self.webrtc:
self.vuer.spawn(start=False)(self.main_image_binocular)
elif not self.binocular and not self.webrtc:
self.vuer.spawn(start=False)(self.main_image_monocular)
elif self.webrtc:
self.vuer.spawn(start=False)(self.main_image_webrtc)
if self.use_image:
if self.binocular and not self.webrtc:
self.vuer.spawn(start=False)(self.main_image_binocular)
elif not self.binocular and not self.webrtc:
self.vuer.spawn(start=False)(self.main_image_monocular)
elif self.binocular and self.webrtc:
self.vuer.spawn(start=False)(self.main_image_binocular_webrtc)
elif not self.binocular and self.webrtc:
self.vuer.spawn(start=False)(self.main_image_monocular_webrtc)
self.head_pose_shared = Array('d', 16, lock=True) self.head_pose_shared = Array('d', 16, lock=True)
self.left_arm_pose_shared = Array('d', 16, lock=True) self.left_arm_pose_shared = Array('d', 16, lock=True)
@ -322,7 +329,7 @@ class TeleVuer:
) )
await asyncio.sleep(0.016) await asyncio.sleep(0.016)
async def main_image_webrtc(self, session, fps=60):
async def main_image_binocular_webrtc(self, session):
aspect_ratio = self.img_width / self.img_height aspect_ratio = self.img_width / self.img_height
if self.use_hand_tracking: if self.use_hand_tracking:
session.upsert( session.upsert(
@ -343,20 +350,54 @@ class TeleVuer:
showRight=False, showRight=False,
) )
) )
session.upsert(
WebRTCVideoPlane(
# WebRTCStereoVideoPlane(
src="https://10.0.7.49:8080/offer",
iceServer={},
key="webrtc",
aspect=aspect_ratio,
height = 7,
),
to="bgChildren",
)
while True: while True:
await asyncio.sleep(1)
session.upsert(
WebRTCStereoVideoPlane(
src="https://127.0.0.1:60001/offer",
iceServers=[],
key="video-quad",
aspect=aspect_ratio,
height = 7,
),
to="bgChildren",
)
await asyncio.sleep(0.016)
async def main_image_monocular_webrtc(self, session, fps=60):
aspect_ratio = self.img_width / self.img_height
if self.use_hand_tracking:
session.upsert(
Hands(
stream=True,
key="hands",
showLeft=False,
showRight=False
),
to="bgChildren",
)
else:
session.upsert(
MotionControllers(
stream=True,
key="motionControllers",
showLeft=False,
showRight=False,
)
)
while True:
session.upsert(
WebRTCVideoPlane(
src="https://127.0.0.1:60001/offer",
iceServers=[],
key="video-quad",
aspect=aspect_ratio,
height = 7,
),
to="bgChildren",
)
await asyncio.sleep(0.016)
# ==================== common data ==================== # ==================== common data ====================
@property @property
def head_pose(self): def head_pose(self):

8
src/televuer/tv_wrapper.py

@ -194,7 +194,7 @@ class TeleData:
class TeleVuerWrapper: class TeleVuerWrapper:
def __init__(self, binocular: bool, use_hand_tracking: bool, img_shape, return_hand_rot_data: bool = False, def __init__(self, binocular: bool, use_hand_tracking: bool, img_shape, return_hand_rot_data: bool = False,
cert_file = None, key_file = None, ngrok = False, webrtc = False):
cert_file = None, key_file = None, ngrok = False, use_image = True, webrtc = False):
""" """
TeleVuerWrapper is a wrapper for the TeleVuer class, which handles XR device's data suit for robot control. TeleVuerWrapper is a wrapper for the TeleVuer class, which handles XR device's data suit for robot control.
It initializes the TeleVuer instance with the specified parameters and provides a method to get motion state data. It initializes the TeleVuer instance with the specified parameters and provides a method to get motion state data.
@ -206,12 +206,14 @@ class TeleVuerWrapper:
:param cert_file: The path to the certificate file for secure connection. :param cert_file: The path to the certificate file for secure connection.
:param key_file: The path to the key file for secure connection. :param key_file: The path to the key file for secure connection.
:param ngrok: A boolean indicating whether to use ngrok for remote access. :param ngrok: A boolean indicating whether to use ngrok for remote access.
:param webrtc: A boolean indicating whether to use WebRTC for real-time communication.
:param use_image: A boolean indicating whether to use image streaming: ImageBackground or WebRTCVideoPlane.
if False, no image stream is used.
:param webrtc: A boolean indicating whether to use WebRTC for real-time communication. if False, use ImageBackground.
""" """
self.use_hand_tracking = use_hand_tracking self.use_hand_tracking = use_hand_tracking
self.return_hand_rot_data = return_hand_rot_data self.return_hand_rot_data = return_hand_rot_data
self.tvuer = TeleVuer(binocular, use_hand_tracking, img_shape, cert_file=cert_file, key_file=key_file, self.tvuer = TeleVuer(binocular, use_hand_tracking, img_shape, cert_file=cert_file, key_file=key_file,
ngrok=ngrok, webrtc=webrtc)
ngrok=ngrok, use_image=use_image, webrtc=webrtc)
def get_tele_data(self): def get_tele_data(self):
""" """

5
test/_test_televuer.py

@ -24,7 +24,10 @@ def run_test_TeleVuer():
# xr-mode # xr-mode
use_hand_track = True use_hand_track = True
tv = TeleVuer(binocular = head_binocular, use_hand_tracking = use_hand_track, img_shape = head_img_shape, webrtc=False)
use_image = True
webrtc = True
tv = TeleVuer(binocular = head_binocular, use_hand_tracking = use_hand_track, img_shape = head_img_shape,
use_image=use_image, webrtc=webrtc)
try: try:
input("Press Enter to start TeleVuer test...") input("Press Enter to start TeleVuer test...")

7
test/_test_tv_wrapper.py

@ -25,7 +25,10 @@ def run_test_tv_wrapper():
# xr-mode # xr-mode
use_hand_track=False use_hand_track=False
tv_wrapper = TeleVuerWrapper(binocular=head_binocular, use_hand_tracking=use_hand_track, img_shape=head_img_shape, return_hand_rot_data = True)
use_image=True
webrtc=True
tv_wrapper = TeleVuerWrapper(binocular=head_binocular, use_hand_tracking=use_hand_track, img_shape=head_img_shape, return_hand_rot_data = True,
use_image=use_image, webrtc=webrtc)
try: try:
input("Press Enter to start tv_wrapper test...") input("Press Enter to start tv_wrapper test...")
running = True running = True
@ -70,7 +73,7 @@ def run_test_tv_wrapper():
current_time = time.time() current_time = time.time()
time_elapsed = current_time - start_time time_elapsed = current_time - start_time
sleep_time = max(0, 0.3 - time_elapsed)
sleep_time = max(0, 0.16 - time_elapsed)
time.sleep(sleep_time) time.sleep(sleep_time)
logger_mp.debug(f"main process sleep: {sleep_time}") logger_mp.debug(f"main process sleep: {sleep_time}")

Loading…
Cancel
Save