@ -13,7 +13,7 @@ import time
import os
import sys
import threading
from multiprocessing import Process , shared_memory , Array , Lock
from multiprocessing import Process , shared_memory , Array , Value , Lock
parent2_dir = os . path . dirname ( os . path . dirname ( os . path . dirname ( os . path . abspath ( __file__ ) ) ) )
sys . path . append ( parent2_dir )
@ -50,6 +50,8 @@ class Dex3_1_Controller:
fps : Control frequency
Unit_Test : Whether to enable unit testing
simulation_mode : Whether to use simulation mode ( default is False , which means using real robot )
"""
logger_mp . info ( " Initialize Dex3_1_Controller... " )
@ -227,10 +229,10 @@ class Dex3_1_Right_JointIndex(IntEnum):
kRightHandMiddle1 = 6
unitree_gripper_indices = [ 4 , 9 ] # [thumb, index]
Gripper_Num_Motors = 2
kTopicGripperCommand = " rt/unitree_actuator /cmd "
kTopicGripperState = " rt/unitree_actuator /state "
kTopicGripperLeftCommand = " rt/dex1/left/cmd "
kTopicGripperLeftState = " rt/dex1/left/state "
kTopicGripperRight Command = " rt/dex1/right /cmd "
kTopicGripperRight State = " rt/dex1/right /state "
class Dex1_1_Gripper_Controller :
def __init__ ( self , left_gripper_value_in , right_gripper_value_in , dual_gripper_data_lock = None , dual_gripper_state_out = None , dual_gripper_action_out = None ,
@ -251,16 +253,19 @@ class Dex1_1_Gripper_Controller:
fps : Control frequency
Unit_Test : Whether to enable unit testing
simulation_mode : Whether to use simulation mode ( default is False , which means using real robot )
"""
logger_mp . info ( " Initialize Dex1_1_Gripper_Controller... " )
self . fps = fps
self . Unit_Test = Unit_Test
self . gripper_sub_ready = False
self . simulation_mode = simulation_mode
if filter :
self . smooth_filter = WeightedMovingFilter ( np . array ( [ 0.5 , 0.3 , 0.2 ] ) , Gripper_Num_Motors )
self . smooth_filter = WeightedMovingFilter ( np . array ( [ 0.5 , 0.3 , 0.2 ] ) , 2 )
else :
self . smooth_filter = None
@ -270,27 +275,31 @@ class Dex1_1_Gripper_Controller:
ChannelFactoryInitialize ( 0 )
# initialize handcmd publisher and handstate subscriber
self . GripperCmb_publisher = ChannelPublisher ( kTopicGripperCommand , MotorCmds_ )
self . GripperCmb_publisher . Init ( )
self . LeftGripperCmb_publisher = ChannelPublisher ( kTopicGripperLeftCommand , MotorCmds_ )
self . LeftGripperCmb_publisher . Init ( )
self . RightGripperCmb_publisher = ChannelPublisher ( kTopicGripperRightCommand , MotorCmds_ )
self . RightGripperCmb_publisher . Init ( )
self . GripperState_subscriber = ChannelSubscriber ( kTopicGripperState , MotorStates_ )
self . GripperState_subscriber . Init ( )
self . LeftGripperState_subscriber = ChannelSubscriber ( kTopicGripperLeftState , MotorStates_ )
self . LeftGripperState_subscriber . Init ( )
self . RightGripperState_subscriber = ChannelSubscriber ( kTopicGripperRightState , MotorStates_ )
self . RightGripperState_subscriber . Init ( )
self . dual_gripper_state = [ 0.0 ] * len ( Gripper_JointIndex )
# Shared Arrays for gripper states
self . left_gripper_state_value = Value ( ' d ' , 0.0 , lock = True )
self . right_gripper_state_value = Value ( ' d ' , 0.0 , lock = True )
# initialize subscribe thread
self . subscribe_state_thread = threading . Thread ( target = self . _subscribe_gripper_state )
self . subscribe_state_thread . daemon = True
self . subscribe_state_thread . start ( )
while True :
if any ( state != 0.0 for state in self . dual_gripper_state ) :
break
while not self . gripper_sub_ready :
time . sleep ( 0.01 )
logger_mp . warning ( " [Dex1_1_Gripper_Controller] Waiting to subscribe dds... " )
logger_mp . info ( " [Dex1_1_Gripper_Controller] Subscribe dds ok. " )
self . gripper_control_thread = threading . Thread ( target = self . control_thread , args = ( left_gripper_value_in , right_gripper_value_in , self . dual_gripper_stat e,
self . gripper_control_thread = threading . Thread ( target = self . control_thread , args = ( left_gripper_value_in , right_gripper_value_in , self . left_gripper_state_value , self . right_gripper_state_valu e,
dual_gripper_data_lock , dual_gripper_state_out , dual_gripper_action_out ) )
self . gripper_control_thread . daemon = True
self . gripper_control_thread . start ( )
@ -299,27 +308,30 @@ class Dex1_1_Gripper_Controller:
def _subscribe_gripper_state ( self ) :
while True :
gripper_msg = self . GripperState_subscriber . Read ( )
if gripper_msg is not None :
for idx , id in enumerate ( Gripper_JointIndex ) :
self . dual_gripper_state [ idx ] = gripper_msg . states [ id ] . q
left_gripper_msg = self . LeftGripperState_subscriber . Read ( )
right_gripper_msg = self . RightGripperState_subscriber . Read ( )
self . gripper_sub_ready = True
if left_gripper_msg is not None and right_gripper_msg is not None :
self . left_gripper_state_value . value = left_gripper_msg . states [ 0 ] . q
self . right_gripper_state_value . value = right_gripper_msg . states [ 0 ] . q
time . sleep ( 0.002 )
def ctrl_dual_gripper ( self , gripper_q_target ) :
""" set current left, right gripper motor state target q """
for idx , id in enumerate ( Gripper_JointIndex ) :
self . gripper_msg . cmds [ id ] . q = gripper_q_target [ idx ]
def ctrl_dual_gripper ( self , dual_gripper_action ) :
""" set current left, right gripper motor cmd target q """
self . left_gripper_msg . cmds [ 0 ] . q = dual_gripper_action [ 0 ]
self . right_ gripper_msg. cmds [ 0 ] . q = dual_gripper_action [ 1 ]
self . GripperCmb_publisher . Write ( self . gripper_msg )
self . LeftGripperCmb_publisher . Write ( self . left_gripper_msg )
self . RightGripperCmb_publisher . Write ( self . right_gripper_msg )
# logger_mp.debug("gripper ctrl publish ok.")
def control_thread ( self , left_gripper_value_in , right_gripper_value_in , dual_gripper_state_in , dual_hand_data_lock = None ,
def control_thread ( self , left_gripper_value_in , right_gripper_value_in , left_gripper_state_value , right_gripper_state_value , dual_hand_data_lock = None ,
dual_gripper_state_out = None , dual_gripper_action_out = None ) :
self . running = True
if self . simulation_mode :
DELTA_GRIPPER_CMD = 1.0
else :
DELTA_GRIPPER_CMD = 0.18 # The motor rotates 5.4 radians, the clamping jaw slide open 9 cm, so 0.6 rad <==> 1 cm, 0.18 rad <==> 3 mm
DELTA_GRIPPER_CMD = 0.18 # The motor rotates 5.4 radians, the clamping jaw slide open 9 cm, so 0.6 rad <==> 1 cm, 0.18 rad <==> 3 mm
THUMB_INDEX_DISTANCE_MIN = 5.0
THUMB_INDEX_DISTANCE_MAX = 7.0
LEFT_MAPPED_MIN = 0.0 # The minimum initial motor position when the gripper closes at startup.
@ -335,14 +347,20 @@ class Dex1_1_Gripper_Controller:
kp = 5.00
kd = 0.05
# initialize gripper cmd msg
self . gripper_msg = MotorCmds_ ( )
self . gripper_msg . cmds = [ unitree_go_msg_dds__MotorCmd_ ( ) for _ in range ( len ( Gripper_JointIndex ) ) ]
for id in Gripper_JointIndex :
self . gripper_msg . cmds [ id ] . dq = dq
self . gripper_msg . cmds [ id ] . tau = tau
self . gripper_msg . cmds [ id ] . kp = kp
self . gripper_msg . cmds [ id ] . kd = kd
self . left_gripper_msg = MotorCmds_ ( )
self . left_gripper_msg . cmds = [ unitree_go_msg_dds__MotorCmd_ ( ) ]
self . right_gripper_msg = MotorCmds_ ( )
self . right_gripper_msg . cmds = [ unitree_go_msg_dds__MotorCmd_ ( ) ]
self . left_gripper_msg . cmds [ 0 ] . dq = dq
self . left_gripper_msg . cmds [ 0 ] . tau = tau
self . left_gripper_msg . cmds [ 0 ] . kp = kp
self . left_gripper_msg . cmds [ 0 ] . kd = kd
self . right_gripper_msg . cmds [ 0 ] . dq = dq
self . right_gripper_msg . cmds [ 0 ] . tau = tau
self . right_gripper_msg . cmds [ 0 ] . kp = kp
self . right_gripper_msg . cmds [ 0 ] . kd = kd
try :
while self . running :
start_time = time . time ( )
@ -352,27 +370,19 @@ class Dex1_1_Gripper_Controller:
with right_gripper_value_in . get_lock ( ) :
right_gripper_value = right_gripper_value_in . value
if left_gripper_value != 0.0 or right_gripper_value != 0.0 : # if hand data has been initialized.
if left_gripper_value != 0.0 or right_gripper_value != 0.0 : # if input data has been initialized.
# Linear mapping from [0, THUMB_INDEX_DISTANCE_MAX] to gripper action range
left_target_action = np . interp ( left_gripper_value , [ THUMB_INDEX_DISTANCE_MIN , THUMB_INDEX_DISTANCE_MAX ] , [ LEFT_MAPPED_MIN , LEFT_MAPPED_MAX ] )
right_target_action = np . interp ( right_gripper_value , [ THUMB_INDEX_DISTANCE_MIN , THUMB_INDEX_DISTANCE_MAX ] , [ RIGHT_MAPPED_MIN , RIGHT_MAPPED_MAX ] )
# else: # TEST WITHOUT XR DEVICE
# current_time = time.time()
# period = 2.5
# import math
# left_euclidean_distance = THUMB_INDEX_DISTANCE_MAX * (math.sin(2 * math.pi * current_time / period) + 1) / 2
# right_euclidean_distance = THUMB_INDEX_DISTANCE_MAX * (math.sin(2 * math.pi * current_time / period) + 1) / 2
# left_target_action = np.interp(left_euclidean_distance, [THUMB_INDEX_DISTANCE_MIN, THUMB_INDEX_DISTANCE_MAX], [LEFT_MAPPED_MIN, LEFT_MAPPED_MAX])
# right_target_action = np.interp(right_euclidean_distance, [THUMB_INDEX_DISTANCE_MIN, THUMB_INDEX_DISTANCE_MAX], [RIGHT_MAPPED_MIN, RIGHT_MAPPED_MAX])
# get current dual gripper motor state
dual_gripper_state = np . array ( dual_gripper_state_in [ : ] )
dual_gripper_state = np . array ( [ left_gripper_state_value . value , right_gripper_state_value . value ] )
# clip dual gripper action to avoid overflow
left_actual_action = np . clip ( left_target_action , dual_gripper_state [ 1 ] - DELTA_GRIPPER_CMD , dual_gripper_state [ 1 ] + DELTA_GRIPPER_CMD )
right_actual_action = np . clip ( right_target_action , dual_gripper_state [ 0 ] - DELTA_GRIPPER_CMD , dual_gripper_state [ 0 ] + DELTA_GRIPPER_CMD )
left_actual_action = np . clip ( left_target_action , dual_gripper_state [ 0 ] - DELTA_GRIPPER_CMD , dual_gripper_state [ 0 ] + DELTA_GRIPPER_CMD )
right_actual_action = np . clip ( right_target_action , dual_gripper_state [ 1 ] - DELTA_GRIPPER_CMD , dual_gripper_state [ 1 ] + DELTA_GRIPPER_CMD )
dual_gripper_action = np . array ( [ right_actual_action , lef t_actual_action] )
dual_gripper_action = np . array ( [ left_actual_action , righ t_actual_action] )
if self . smooth_filter :
self . smooth_filter . add_data ( dual_gripper_action )
@ -380,13 +390,8 @@ class Dex1_1_Gripper_Controller:
if dual_gripper_state_out and dual_gripper_action_out :
with dual_hand_data_lock :
dual_gripper_state_out [ : ] = dual_gripper_state - np . array ( [ RIGHT_MAPPED_MIN , LEFT_MAPPED_MIN ] )
dual_gripper_action_out [ : ] = dual_gripper_action - np . array ( [ RIGHT_MAPPED_MIN , LEFT_MAPPED_MIN ] )
# logger_mp.debug(f"LEFT: euclidean:{left_euclidean_distance:.4f} \tstate:{dual_gripper_state_out[1]:.4f}\
# \ttarget_action:{right_target_action - RIGHT_MAPPED_MIN:.4f} \tactual_action:{dual_gripper_action_out[1]:.4f}")
# logger_mp.debug(f"RIGHT:euclidean:{right_euclidean_distance:.4f} \tstate:{dual_gripper_state_out[0]:.4f}\
# \ttarget_action:{left_target_action - LEFT_MAPPED_MIN:.4f} \tactual_action:{dual_gripper_action_out[0]:.4f}")
dual_gripper_state_out [ : ] = dual_gripper_state - np . array ( [ LEFT_MAPPED_MIN , RIGHT_MAPPED_MIN ] )
dual_gripper_action_out [ : ] = dual_gripper_action - np . array ( [ LEFT_MAPPED_MIN , RIGHT_MAPPED_MIN ] )
self . ctrl_dual_gripper ( dual_gripper_action )
current_time = time . time ( )
@ -397,8 +402,7 @@ class Dex1_1_Gripper_Controller:
logger_mp . info ( " Dex1_1_Gripper_Controller has been closed. " )
class Gripper_JointIndex ( IntEnum ) :
kLeftGripper = 0
kRightGripper = 1
kGripper = 0
if __name__ == " __main__ " :
@ -407,9 +411,8 @@ if __name__ == "__main__":
from teleop.image_server.image_client import ImageClient
parser = argparse . ArgumentParser ( )
parser . add_argument ( ' --dex ' , action = ' store_true ' , help = ' Use dex3-1 hand ' )
parser . add_argument ( ' --gripper ' , dest = ' dex ' , action = ' store_false ' , help = ' Use gripper ' )
parser . set_defaults ( dex = True )
parser . add_argument ( ' --xr-mode ' , type = str , choices = [ ' hand ' , ' controller ' ] , default = ' hand ' , help = ' Select XR device tracking source ' )
parser . add_argument ( ' --ee ' , type = str , choices = [ ' dex1 ' , ' dex3 ' , ' inspire1 ' , ' brainco ' ] , help = ' Select end effector controller ' )
args = parser . parse_args ( )
logger_mp . info ( f " args:{args} \n " )
@ -431,39 +434,54 @@ if __name__ == "__main__":
else :
tv_img_shape = ( img_config [ ' head_camera_image_shape ' ] [ 0 ] , img_config [ ' head_camera_image_shape ' ] [ 1 ] , 3 )
img_shm = shared_memory . SharedMemory ( create = True , size = np . prod ( tv_img_shape ) * np . uint8 ( ) . itemsize )
img_array = np . ndarray ( tv_img_shape , dtype = np . uint8 , buffer = img_shm . buf )
img_client = ImageClient ( tv_img_shape = tv_img_shape , tv_img_shm_name = img_shm . name )
tv_ img_shm = shared_memory . SharedMemory ( create = True , size = np . prod ( tv_img_shape ) * np . uint8 ( ) . itemsize )
tv_ img_array = np . ndarray ( tv_img_shape , dtype = np . uint8 , buffer = tv_ img_shm. buf )
img_client = ImageClient ( tv_img_shape = tv_img_shape , tv_img_shm_name = tv_ img_shm. name )
image_receive_thread = threading . Thread ( target = img_client . receive_process , daemon = True )
image_receive_thread . daemon = True
image_receive_thread . start ( )
# television and arm
tv_wrapper = TeleVuerWrapper ( BINOCULAR , tv_img_shape , img_shm . name )
# television: obtain hand pose data from the XR device and transmit the robot's head camera image to the XR device.
tv_wrapper = TeleVuerWrapper ( binocular = BINOCULAR , use_hand_tracking = args . xr_mode == " hand " , img_shape = tv_img_shape , img_shm_name = tv_img_shm . name ,
return_state_data = True , return_hand_rot_data = False )
if args . dex :
left_hand_array = Array ( ' d ' , 75 , lock = True )
right_hand_array = Array ( ' d ' , 75 , lock = True )
# end-effector
if args . ee == " dex3 " :
left_hand_pos_array = Array ( ' d ' , 75 , lock = True ) # [input]
right_hand_pos_array = Array ( ' d ' , 75 , lock = True ) # [input]
dual_hand_data_lock = Lock ( )
dual_hand_state_array = Array ( ' d ' , 14 , lock = False ) # current left, right hand state(14) data.
dual_hand_action_array = Array ( ' d ' , 14 , lock = False ) # current left, right hand action(14) data.
hand_ctrl = Dex3_1_Controller ( left_hand_array , right_hand_array , dual_hand_data_lock , dual_hand_state_array , dual_hand_action_array , Unit_Test = True )
els e :
left_hand_array = Array ( ' d ' , 75 , lock = True )
right_hand_array = Array ( ' d ' , 75 , lock = True )
dual_hand_state_array = Array ( ' d ' , 14 , lock = False ) # [output] current left, right hand state(14) data.
dual_hand_action_array = Array ( ' d ' , 14 , lock = False ) # [output] current left, right hand action(14) data.
hand_ctrl = Dex3_1_Controller ( left_hand_pos_ array , right_hand_pos _array , dual_hand_data_lock , dual_hand_state_array , dual_hand_action_array )
elif args . ee == " dex1 " :
left_gripper_value = Value ( ' d ' , 0.0 , lock = True ) # [input]
right_gripper_value = Value ( ' d ' , 0.0 , lock = True ) # [input]
dual_gripper_data_lock = Lock ( )
dual_gripper_state_array = Array ( ' d ' , 2 , lock = False ) # current left, right gripper state(2) data.
dual_gripper_action_array = Array ( ' d ' , 2 , lock = False ) # current left, right gripper action(2) data.
gripper_ctrl = Dex1_1_Gripper_Controller ( left_hand_array , right_hand_array , dual_gripper_data_lock , dual_gripper_state_array , dual_gripper_action_array , Unit_Test = True )
gripper_ctrl = Dex1_1_Gripper_Controller ( left_gripper_value , right_gripper_value , dual_gripper_data_lock , dual_gripper_state_array , dual_gripper_action_array )
user_input = input ( " Please enter the start signal (enter ' s ' to start the subsequent program): \n " )
if user_input . lower ( ) == ' s ' :
while True :
head_rmat , left_wrist , right_wrist , left_hand , right_hand = tv_wrapper . get_data ( )
# send hand skeleton data to hand_ctrl.control_process
left_hand_array [ : ] = left_hand . flatten ( )
right_hand_array [ : ] = right_hand . flatten ( )
tele_data = tv_wrapper . get_motion_state_data ( )
if args . ee == " dex3 " and args . xr_mode == " hand " :
with left_hand_pos_array . get_lock ( ) :
left_hand_pos_array [ : ] = tele_data . left_hand_pos . flatten ( )
with right_hand_pos_array . get_lock ( ) :
right_hand_pos_array [ : ] = tele_data . right_hand_pos . flatten ( )
elif args . ee == " dex1 " and args . xr_mode == " controller " :
with left_gripper_value . get_lock ( ) :
left_gripper_value . value = tele_data . left_trigger_value
with right_gripper_value . get_lock ( ) :
right_gripper_value . value = tele_data . right_trigger_value
elif args . ee == " dex1 " and args . xr_mode == " hand " :
with left_gripper_value . get_lock ( ) :
left_gripper_value . value = tele_data . left_pinch_value
with right_gripper_value . get_lock ( ) :
right_gripper_value . value = tele_data . right_pinch_value
else :
pass
# with dual_hand_data_lock:
# logger_mp.info(f"state : {list(dual_hand_state_array)} \naction: {list(dual_hand_action_array)} \n")