@ -55,6 +55,7 @@ class TeleVuer:
"""
self . use_hand_tracking = use_hand_tracking
self . binocular = binocular
self . _current_session = None # Updated by event handlers on each new connection
if img_shape is None :
raise ValueError ( " [TeleVuer] img_shape must be provided. " )
self . img_shape = ( img_shape [ 0 ] , img_shape [ 1 ] , 3 )
@ -225,6 +226,7 @@ class TeleVuer:
pass
async def on_cam_move ( self , event , session , fps = 60 ) :
self . _current_session = session
try :
with self . head_pose_shared . get_lock ( ) :
self . head_pose_shared [ : ] = event . value [ " camera " ] [ " matrix " ]
@ -233,8 +235,16 @@ class TeleVuer:
except :
pass
# Send image frames from here (session is always current after reconnect)
if hasattr ( self , ' img2display ' ) and self . display_mode in ( ' immersive ' , ' ego ' ) :
now = _time . time ( )
if now - getattr ( self , ' _last_img_send ' , 0 ) > = 1.0 / self . display_fps :
self . _last_img_send = now
self . _send_image_frame ( session )
async def on_controller_move ( self , event , session , fps = 60 ) :
""" https://docs.vuer.ai/en/latest/examples/20_motion_controllers.html """
self . _current_session = session
try :
# ControllerData
with self . left_arm_pose_shared . get_lock ( ) :
@ -276,6 +286,7 @@ class TeleVuer:
async def on_hand_move ( self , event , session , fps = 60 ) :
""" https://docs.vuer.ai/en/latest/examples/19_hand_tracking.html """
self . _current_session = session
try :
# HandsData
left_hand_data = event . value [ " left " ]
@ -323,128 +334,98 @@ class TeleVuer:
except :
pass
## immersive MODE
async def main_image_binocular_zmq ( self , session ) :
def _setup_tracking ( self , session ) :
""" Upsert hand/controller tracking components on a (possibly new) session. """
try :
if self . use_hand_tracking :
session . upsert (
Hands (
stream = True ,
key = " hands " ,
hideLeft = True ,
hideRight = True
) ,
Hands ( stream = True , key = " hands " , hideLeft = True , hideRight = True ) ,
to = " bgChildren " ,
)
else :
session . upsert (
MotionControllers (
stream = True ,
key = " motionControllers " ,
left = True ,
right = True ,
) ,
MotionControllers ( stream = True , key = " motionControllers " , left = True , right = True ) ,
to = " bgChildren " ,
)
while True :
except Exception :
pass
def _send_image_frame ( self , session ) :
""" Send current image frame to the given session (rate-limit externally). """
try :
if self . display_mode == " immersive " :
if self . binocular :
session . upsert (
[
ImageBackground (
self . img2display [ : , : self . img_width ] ,
aspect = self . aspect_ratio ,
height = 1 ,
distanceToCamera = 1 ,
layers = 1 ,
format = " jpeg " ,
quality = 50 ,
key = " background-left " ,
interpolate = True ,
) ,
ImageBackground (
self . img2display [ : , self . img_width : ] ,
aspect = self . aspect_ratio ,
height = 1 ,
distanceToCamera = 1 ,
layers = 2 ,
format = " jpeg " ,
quality = 50 ,
key = " background-right " ,
interpolate = True ,
) ,
ImageBackground ( self . img2display [ : , : self . img_width ] , aspect = self . aspect_ratio ,
height = 1 , distanceToCamera = 1 , layers = 1 , format = " jpeg " ,
quality = 50 , key = " background-left " , interpolate = True ) ,
ImageBackground ( self . img2display [ : , self . img_width : ] , aspect = self . aspect_ratio ,
height = 1 , distanceToCamera = 1 , layers = 2 , format = " jpeg " ,
quality = 50 , key = " background-right " , interpolate = True ) ,
] ,
to = " bgChildren " ,
)
except Exception :
pass # Session dropped; coroutine stays alive for reconnect
await asyncio . sleep ( 1.0 / self . display_fps )
async def main_image_monocular_zmq ( self , session ) :
if self . use_hand_tracking :
else :
session . upsert (
Hands (
stream = True ,
key = " hands " ,
hideLeft = True ,
hideRight = True
) ,
[
ImageBackground ( self . img2display , aspect = self . aspect_ratio ,
height = 1 , distanceToCamera = 1 , format = " jpeg " ,
quality = 50 , key = " background-mono " , interpolate = True ) ,
] ,
to = " bgChildren " ,
)
else :
elif self . display_mode == " ego " :
if self . binocular :
session . upsert (
MotionControllers (
stream = True ,
key = " motionControllers " ,
left = True ,
right = True ,
) ,
[
ImageBackground ( self . img2display [ : , : self . img_width ] , aspect = self . aspect_ratio ,
height = 0.75 , distanceToCamera = 2 , layers = 1 , format = " jpeg " ,
quality = 50 , key = " background-left " , interpolate = True ) ,
ImageBackground ( self . img2display [ : , self . img_width : ] , aspect = self . aspect_ratio ,
height = 0.75 , distanceToCamera = 2 , layers = 2 , format = " jpeg " ,
quality = 50 , key = " background-right " , interpolate = True ) ,
] ,
to = " bgChildren " ,
)
while True :
try :
else :
session . upsert (
[
ImageBackground (
self . img2display ,
aspect = self . aspect_ratio ,
height = 1 ,
distanceToCamera = 1 ,
format = " jpeg " ,
quality = 50 ,
key = " background-mono " ,
interpolate = True ,
) ,
ImageBackground ( self . img2display , aspect = self . aspect_ratio ,
height = 0.75 , distanceToCamera = 2 , format = " jpeg " ,
quality = 50 , key = " background-mono " , interpolate = True ) ,
] ,
to = " bgChildren " ,
)
except Exception :
pass # Session dropped; coroutine stays alive for reconnect
await asyncio . sleep ( 1.0 / self . display_fps )
pass
## immersive MODE
async def main_image_binocular_zmq ( self , session ) :
self . _setup_tracking ( session )
# Image sending handled by on_cam_move; keep coroutine alive for Vuer
while True :
await asyncio . sleep ( 1.0 )
async def main_image_monocular_zmq ( self , session ) :
self . _setup_tracking ( session )
# Image sending handled by on_cam_move; keep coroutine alive for Vuer
while True :
await asyncio . sleep ( 1.0 )
async def main_image_binocular_webrtc ( self , session ) :
if self . use_hand_tracking :
session . upsert (
Hands (
stream = True ,
key = " hands " ,
hideLeft = True ,
hideRight = True
) ,
to = " bgChildren " ,
)
else :
session . upsert (
MotionControllers (
stream = True ,
key = " motionControllers " ,
left = True ,
right = True ,
) ,
to = " bgChildren " ,
)
self . _current_session = session
self . _setup_tracking ( session )
last_session = session
while True :
session . upsert (
s = self . _current_session
if s is not last_session :
self . _setup_tracking ( s )
last_session = s
if s is not None :
try :
s . upsert (
WebRTCStereoVideoPlane (
src = self . webrtc_url ,
iceServer = None ,
@ -456,32 +437,23 @@ class TeleVuer:
) ,
to = " bgChildren " ,
)
except Exception :
pass
await asyncio . sleep ( 1.0 / self . display_fps )
async def main_image_monocular_webrtc ( self , session ) :
if self . use_hand_tracking :
session . upsert (
Hands (
stream = True ,
key = " hands " ,
hideLeft = True ,
hideRight = True
) ,
to = " bgChildren " ,
)
else :
session . upsert (
MotionControllers (
stream = True ,
key = " motionControllers " ,
left = True ,
right = True ,
) ,
to = " bgChildren " ,
)
self . _current_session = session
self . _setup_tracking ( session )
last_session = session
while True :
session . upsert (
s = self . _current_session
if s is not last_session :
self . _setup_tracking ( s )
last_session = s
if s is not None :
try :
s . upsert (
WebRTCVideoPlane (
src = self . webrtc_url ,
iceServer = None ,
@ -492,130 +464,36 @@ class TeleVuer:
) ,
to = " bgChildren " ,
)
except Exception :
pass
await asyncio . sleep ( 1.0 / self . display_fps )
## ego MODE
async def main_image_binocular_zmq_ego ( self , session ) :
if self . use_hand_tracking :
session . upsert (
Hands (
stream = True ,
key = " hands " ,
hideLeft = True ,
hideRight = True
) ,
to = " bgChildren " ,
)
else :
session . upsert (
MotionControllers (
stream = True ,
key = " motionControllers " ,
left = True ,
right = True ,
) ,
to = " bgChildren " ,
)
self . _setup_tracking ( session )
# Image sending handled by on_cam_move; keep coroutine alive for Vuer
while True :
try :
session . upsert (
[
ImageBackground (
self . img2display [ : , : self . img_width ] ,
aspect = self . aspect_ratio ,
height = 0.75 ,
distanceToCamera = 2 ,
layers = 1 ,
format = " jpeg " ,
quality = 50 ,
key = " background-left " ,
interpolate = True ,
) ,
ImageBackground (
self . img2display [ : , self . img_width : ] ,
aspect = self . aspect_ratio ,
height = 0.75 ,
distanceToCamera = 2 ,
layers = 2 ,
format = " jpeg " ,
quality = 50 ,
key = " background-right " ,
interpolate = True ,
) ,
] ,
to = " bgChildren " ,
)
except Exception :
pass
await asyncio . sleep ( 1.0 / self . display_fps )
await asyncio . sleep ( 1.0 )
async def main_image_monocular_zmq_ego ( self , session ) :
if self . use_hand_tracking :
session . upsert (
Hands (
stream = True ,
key = " hands " ,
hideLeft = True ,
hideRight = True
) ,
to = " bgChildren " ,
)
else :
session . upsert (
MotionControllers (
stream = True ,
key = " motionControllers " ,
left = True ,
right = True ,
) ,
to = " bgChildren " ,
)
self . _setup_tracking ( session )
# Image sending handled by on_cam_move; keep coroutine alive for Vuer
while True :
try :
session . upsert (
[
ImageBackground (
self . img2display ,
aspect = self . aspect_ratio ,
height = 0.75 ,
distanceToCamera = 2 ,
format = " jpeg " ,
quality = 50 ,
key = " background-mono " ,
interpolate = True ,
) ,
] ,
to = " bgChildren " ,
)
except Exception :
pass
await asyncio . sleep ( 1.0 / self . display_fps )
await asyncio . sleep ( 1.0 )
async def main_image_binocular_webrtc_ego ( self , session ) :
if self . use_hand_tracking :
session . upsert (
Hands (
stream = True ,
key = " hands " ,
hideLeft = True ,
hideRight = True
) ,
to = " bgChildren " ,
)
else :
session . upsert (
MotionControllers (
stream = True ,
key = " motionControllers " ,
left = True ,
right = True ,
) ,
to = " bgChildren " ,
)
self . _current_session = session
self . _setup_tracking ( session )
last_session = session
while True :
session . upsert (
s = self . _current_session
if s is not last_session :
self . _setup_tracking ( s )
last_session = s
if s is not None :
try :
s . upsert (
WebRTCStereoVideoPlane (
src = self . webrtc_url ,
iceServer = None ,
@ -627,32 +505,23 @@ class TeleVuer:
) ,
to = " bgChildren " ,
)
except Exception :
pass
await asyncio . sleep ( 1.0 / self . display_fps )
async def main_image_monocular_webrtc_ego ( self , session ) :
if self . use_hand_tracking :
session . upsert (
Hands (
stream = True ,
key = " hands " ,
hideLeft = True ,
hideRight = True
) ,
to = " bgChildren " ,
)
else :
session . upsert (
MotionControllers (
stream = True ,
key = " motionControllers " ,
left = True ,
right = True ,
) ,
to = " bgChildren " ,
)
self . _current_session = session
self . _setup_tracking ( session )
last_session = session
while True :
session . upsert (
s = self . _current_session
if s is not last_session :
self . _setup_tracking ( s )
last_session = s
if s is not None :
try :
s . upsert (
WebRTCVideoPlane (
src = self . webrtc_url ,
iceServer = None ,
@ -663,33 +532,16 @@ class TeleVuer:
) ,
to = " bgChildren " ,
)
except Exception :
pass
await asyncio . sleep ( 1.0 / self . display_fps )
## pass-through MODE
async def main_pass_through ( self , session ) :
if self . use_hand_tracking :
session . upsert (
Hands (
stream = True ,
key = " hands " ,
hideLeft = True ,
hideRight = True
) ,
to = " bgChildren " ,
)
else :
session . upsert (
MotionControllers (
stream = True ,
key = " motionControllers " ,
left = True ,
right = True ,
) ,
to = " bgChildren " ,
)
self . _setup_tracking ( session )
# No image sending in pass-through; keep coroutine alive for Vuer
while True :
await asyncio . sleep ( 1.0 / self . display_fps )
await asyncio . sleep ( 1.0 )
# ==================== common data ====================
@property