diff --git a/teleop/robot_control/robot_hand_inspire.py b/teleop/robot_control/robot_hand_inspire_dfx.py similarity index 99% rename from teleop/robot_control/robot_hand_inspire.py rename to teleop/robot_control/robot_hand_inspire_dfx.py index 60ad1b9..d223482 100644 --- a/teleop/robot_control/robot_hand_inspire.py +++ b/teleop/robot_control/robot_hand_inspire_dfx.py @@ -16,7 +16,7 @@ Inspire_Num_Motors = 6 kTopicInspireCommand = "rt/inspire/cmd" kTopicInspireState = "rt/inspire/state" -class Inspire_Controller: +class Inspire_Controller_DFX: def __init__(self, left_hand_array, right_hand_array, dual_hand_data_lock = None, dual_hand_state_array = None, dual_hand_action_array = None, fps = 100.0, Unit_Test = False, simulation_mode = False): logger_mp.info("Initialize Inspire_Controller...") diff --git a/teleop/robot_control/robot_hand_inspire_ftp.py b/teleop/robot_control/robot_hand_inspire_ftp.py new file mode 100644 index 0000000..385f942 --- /dev/null +++ b/teleop/robot_control/robot_hand_inspire_ftp.py @@ -0,0 +1,229 @@ +from unitree_sdk2py.core.channel import ChannelPublisher, ChannelSubscriber, ChannelFactoryInitialize +from inspire_sdkpy import inspire_dds, inspire_hand_defaut + +from teleop.robot_control.hand_retargeting import HandRetargeting, HandType +import numpy as np +import threading +import time +from multiprocessing import Process, Array, Lock + +import logging_mp +logger_mp = logging_mp.get_logger(__name__) + +inspire_tip_indices = [4, 9, 14, 19, 24] +Inspire_Num_Motors = 6 + +kTopicInspireCtrlLeft = "rt/inspire_hand/ctrl/l" +kTopicInspireCtrlRight = "rt/inspire_hand/ctrl/r" +kTopicInspireStateLeft = "rt/inspire_hand/state/l" +kTopicInspireStateRight = "rt/inspire_hand/state/r" + +class Inspire_Controller_FTP: + def __init__(self, left_hand_array, right_hand_array, dual_hand_data_lock = None, dual_hand_state_array = None, + dual_hand_action_array = None, fps = 100.0, Unit_Test = False, simulation_mode = False): + logger_mp.info("Initialize Inspire_Controller...") + self.fps = fps + self.Unit_Test = Unit_Test + self.simulation_mode = simulation_mode + if not self.Unit_Test: + self.hand_retargeting = HandRetargeting(HandType.INSPIRE_HAND) + else: + self.hand_retargeting = HandRetargeting(HandType.INSPIRE_HAND_Unit_Test) + + if self.simulation_mode: + ChannelFactoryInitialize(1) + else: + ChannelFactoryInitialize(0) + + # Initialize hand command publishers + self.LeftHandCmd_publisher = ChannelPublisher(kTopicInspireCtrlLeft, inspire_dds.inspire_hand_ctrl) + self.LeftHandCmd_publisher.Init() + self.RightHandCmd_publisher = ChannelPublisher(kTopicInspireCtrlRight, inspire_dds.inspire_hand_ctrl) + self.RightHandCmd_publisher.Init() + + # Initialize hand state subscribers + self.LeftHandState_subscriber = ChannelSubscriber(kTopicInspireStateLeft, inspire_dds.inspire_hand_state) + self.LeftHandState_subscriber.Init() # Consider using callback if preferred: Init(callback_func, period_ms) + self.RightHandState_subscriber = ChannelSubscriber(kTopicInspireStateRight, inspire_dds.inspire_hand_state) + self.RightHandState_subscriber.Init() + + # Shared Arrays for hand states ([0,1] normalized values) + self.left_hand_state_array = Array('d', Inspire_Num_Motors, lock=True) + self.right_hand_state_array = Array('d', Inspire_Num_Motors, lock=True) + + # Initialize subscribe thread + self.subscribe_state_thread = threading.Thread(target=self._subscribe_hand_state) + self.subscribe_state_thread.daemon = True + self.subscribe_state_thread.start() + + # Wait for initial DDS messages (optional, but good for ensuring connection) + wait_count = 0 + while not (any(self.left_hand_state_array) or any(self.right_hand_state_array)): + if wait_count % 100 == 0: # Print every second + logger_mp.info(f"[Inspire_Controller] Waiting to subscribe to hand states from DDS (L: {any(self.left_hand_state_array)}, R: {any(self.right_hand_state_array)})...") + time.sleep(0.01) + wait_count += 1 + if wait_count > 500: # Timeout after 5 seconds + logger_mp.warning("[Inspire_Controller] Warning: Timeout waiting for initial hand states. Proceeding anyway.") + break + logger_mp.info("[Inspire_Controller] Initial hand states received or timeout.") + + hand_control_process = Process(target=self.control_process, args=(left_hand_array, right_hand_array, self.left_hand_state_array, self.right_hand_state_array, + dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array)) + hand_control_process.daemon = True + hand_control_process.start() + + logger_mp.info("Initialize Inspire_Controller OK!\n") + + def _subscribe_hand_state(self): + logger_mp.info("[Inspire_Controller] Subscribe thread started.") + while True: + # Left Hand + left_state_msg = self.LeftHandState_subscriber.Read() + if left_state_msg is not None: + if hasattr(left_state_msg, 'angle_act') and len(left_state_msg.angle_act) == Inspire_Num_Motors: + with self.left_hand_state_array.get_lock(): + for i in range(Inspire_Num_Motors): + self.left_hand_state_array[i] = left_state_msg.angle_act[i] / 1000.0 + else: + logger_mp.warning(f"[Inspire_Controller] Received left_state_msg but attributes are missing or incorrect. Type: {type(left_state_msg)}, Content: {str(left_state_msg)[:100]}") + # Right Hand + right_state_msg = self.RightHandState_subscriber.Read() + if right_state_msg is not None: + if hasattr(right_state_msg, 'angle_act') and len(right_state_msg.angle_act) == Inspire_Num_Motors: + with self.right_hand_state_array.get_lock(): + for i in range(Inspire_Num_Motors): + self.right_hand_state_array[i] = right_state_msg.angle_act[i] / 1000.0 + else: + logger_mp.warning(f"[Inspire_Controller] Received right_state_msg but attributes are missing or incorrect. Type: {type(right_state_msg)}, Content: {str(right_state_msg)[:100]}") + + time.sleep(0.002) + + def _send_hand_command(self, left_angle_cmd_scaled, right_angle_cmd_scaled): + """ + Send scaled angle commands [0-1000] to both hands. + """ + # Left Hand Command + left_cmd_msg = inspire_hand_defaut.get_inspire_hand_ctrl() + left_cmd_msg.angle_set = left_angle_cmd_scaled + left_cmd_msg.mode = 0b0001 # Mode 1: Angle control + self.LeftHandCmd_publisher.Write(left_cmd_msg) + + # Right Hand Command + right_cmd_msg = inspire_hand_defaut.get_inspire_hand_ctrl() + right_cmd_msg.angle_set = right_angle_cmd_scaled + right_cmd_msg.mode = 0b0001 # Mode 1: Angle control + self.RightHandCmd_publisher.Write(right_cmd_msg) + + def control_process(self, left_hand_array, right_hand_array, left_hand_state_array, right_hand_state_array, + dual_hand_data_lock = None, dual_hand_state_array = None, dual_hand_action_array = None): + logger_mp.info("[Inspire_Controller] Control process started.") + self.running = True + + left_q_target = np.full(Inspire_Num_Motors, 1.0) + right_q_target = np.full(Inspire_Num_Motors, 1.0) + + try: + while self.running: + start_time = time.time() + # get dual hand state + with left_hand_array.get_lock(): + left_hand_data = np.array(left_hand_array[:]).reshape(25, 3).copy() + with right_hand_array.get_lock(): + right_hand_data = np.array(right_hand_array[:]).reshape(25, 3).copy() + + # Read left and right q_state from shared arrays + state_data = np.concatenate((np.array(left_hand_state_array[:]), np.array(right_hand_state_array[:]))) + + if not np.all(right_hand_data == 0.0) and not np.all(left_hand_data[4] == np.array([-1.13, 0.3, 0.15])): # if hand data has been initialized. + ref_left_value = left_hand_data[self.hand_retargeting.left_indices[1,:]] - left_hand_data[self.hand_retargeting.left_indices[0,:]] + ref_right_value = right_hand_data[self.hand_retargeting.right_indices[1,:]] - right_hand_data[self.hand_retargeting.right_indices[0,:]] + + left_q_target = self.hand_retargeting.left_retargeting.retarget(ref_left_value)[self.hand_retargeting.left_dex_retargeting_to_hardware] + right_q_target = self.hand_retargeting.right_retargeting.retarget(ref_right_value)[self.hand_retargeting.right_dex_retargeting_to_hardware] + + def normalize(val, min_val, max_val): + return np.clip((max_val - val) / (max_val - min_val), 0.0, 1.0) + + for idx in range(Inspire_Num_Motors): + if idx <= 3: + left_q_target[idx] = normalize(left_q_target[idx], 0.0, 1.7) + right_q_target[idx] = normalize(right_q_target[idx], 0.0, 1.7) + elif idx == 4: + left_q_target[idx] = normalize(left_q_target[idx], 0.0, 0.5) + right_q_target[idx] = normalize(right_q_target[idx], 0.0, 0.5) + elif idx == 5: + left_q_target[idx] = normalize(left_q_target[idx], -0.1, 1.3) + right_q_target[idx] = normalize(right_q_target[idx], -0.1, 1.3) + + scaled_left_cmd = [int(np.clip(val * 1000, 0, 1000)) for val in left_q_target] + scaled_right_cmd = [int(np.clip(val * 1000, 0, 1000)) for val in right_q_target] + + # get dual hand action + action_data = np.concatenate((left_q_target, right_q_target)) + if dual_hand_state_array and dual_hand_action_array: + with dual_hand_data_lock: + dual_hand_state_array[:] = state_data + dual_hand_action_array[:] = action_data + + self._send_hand_command(scaled_left_cmd, scaled_right_cmd) + current_time = time.time() + time_elapsed = current_time - start_time + sleep_time = max(0, (1 / self.fps) - time_elapsed) + time.sleep(sleep_time) + finally: + logger_mp.info("Inspire_Controller has been closed.") + +if __name__ == '__main__': + logger_mp.info("Starting Inspire_Controller example...") + mock_left_hand_input = Array('d', 75, lock=True) + mock_right_hand_input = Array('d', 75, lock=True) + + with mock_right_hand_input.get_lock(): + for i in range(len(mock_right_hand_input)): + mock_right_hand_input[i] = (i % 10) * 0.01 + + with mock_left_hand_input.get_lock(): + temp_left_mat = np.zeros((25,3)) + temp_left_mat[4] = np.array([-1.13, 0.3, 0.15]) + mock_left_hand_input[:] = temp_left_mat.flatten() + + shared_lock = Lock() + shared_state = Array('d', Inspire_Num_Motors * 2, lock=False) + shared_action = Array('d', Inspire_Num_Motors * 2, lock=False) + + try: + controller = Inspire_Controller( + left_hand_array=mock_left_hand_input, + right_hand_array=mock_right_hand_input, + dual_hand_data_lock=shared_lock, + dual_hand_state_array=shared_state, + dual_hand_action_array=shared_action, + fps=50.0, + Unit_Test=False, + ) + + count = 0 + main_loop_running = True + while main_loop_running: + try: + time.sleep(1.0) + # Simulate a slight change in human hand input + with mock_right_hand_input.get_lock(): + mock_right_hand_input[inspire_tip_indices[0]*3 + 1] = 0.1 + (count % 10) * 0.02 + + with shared_lock: + print(f"Cycle {count} - Logged State: {[f'{x:.3f}' for x in shared_state[:]]}, Logged Action: {[f'{x:.3f}' for x in shared_action[:]]}") + count +=1 + if count > 3000 : # Increased run time for more observation + print("Example finished after 3000 cycles.") + main_loop_running = False + except KeyboardInterrupt: + print("Main loop interrupted. Finishing example.") + main_loop_running = False + + + except Exception as e: + print(f"An error occurred in the example: {e}") + finally: + print("Exiting main program.") diff --git a/teleop/teleop_hand_and_arm.py b/teleop/teleop_hand_and_arm.py index e4eeb51..684daed 100644 --- a/teleop/teleop_hand_and_arm.py +++ b/teleop/teleop_hand_and_arm.py @@ -16,7 +16,8 @@ from televuer import TeleVuerWrapper from teleop.robot_control.robot_arm import G1_29_ArmController, G1_23_ArmController, H1_2_ArmController, H1_ArmController from teleop.robot_control.robot_arm_ik import G1_29_ArmIK, G1_23_ArmIK, H1_2_ArmIK, H1_ArmIK from teleop.robot_control.robot_hand_unitree import Dex3_1_Controller, Dex1_1_Gripper_Controller -from teleop.robot_control.robot_hand_inspire import Inspire_Controller +from teleop.robot_control.robot_hand_inspire_dfx import Inspire_Controller_DFX +from teleop.robot_control.robot_hand_inspire_ftp import Inspire_Controller_FTP from teleop.robot_control.robot_hand_brainco import Brainco_Controller from teleimager.image_client import ImageClient from teleop.utils.episode_writer import EpisodeWriter @@ -78,7 +79,7 @@ if __name__ == '__main__': parser.add_argument('--frequency', type = float, default = 60.0, help = 'control and record \'s frequency') parser.add_argument('--xr-mode', type=str, choices=['hand', 'controller'], default='hand', help='Select XR device tracking source') parser.add_argument('--arm', type=str, choices=['G1_29', 'G1_23', 'H1_2', 'H1'], default='G1_29', help='Select arm controller') - parser.add_argument('--ee', type=str, choices=['dex1', 'dex3', 'inspire1', 'brainco'], help='Select end effector controller') + parser.add_argument('--ee', type=str, choices=['dex1', 'dex3', 'inspire_ftp', 'inspire_dfx', 'brainco'], help='Select end effector controller') # mode flags parser.add_argument('--motion', action = 'store_true', help = 'Enable motion control mode') parser.add_argument('--headless', action='store_true', help='Enable headless mode (no display)') @@ -155,14 +156,20 @@ if __name__ == '__main__': dual_gripper_action_array = Array('d', 2, lock=False) # current left, right gripper action(2) data. gripper_ctrl = Dex1_1_Gripper_Controller(left_gripper_value, right_gripper_value, dual_gripper_data_lock, dual_gripper_state_array, dual_gripper_action_array, simulation_mode=args.sim) - elif args.ee == "inspire1": + elif args.ee == "inspire_dfx": left_hand_pos_array = Array('d', 75, lock = True) # [input] right_hand_pos_array = Array('d', 75, lock = True) # [input] dual_hand_data_lock = Lock() dual_hand_state_array = Array('d', 12, lock = False) # [output] current left, right hand state(12) data. dual_hand_action_array = Array('d', 12, lock = False) # [output] current left, right hand action(12) data. - hand_ctrl = Inspire_Controller(left_hand_pos_array, right_hand_pos_array, dual_hand_data_lock, - dual_hand_state_array, dual_hand_action_array, simulation_mode=args.sim) + hand_ctrl = Inspire_Controller_DFX(left_hand_pos_array, right_hand_pos_array, dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array, simulation_mode=args.sim) + elif args.ee == "inspire_ftp": + left_hand_pos_array = Array('d', 75, lock = True) # [input] + right_hand_pos_array = Array('d', 75, lock = True) # [input] + dual_hand_data_lock = Lock() + dual_hand_state_array = Array('d', 12, lock = False) # [output] current left, right hand state(12) data. + dual_hand_action_array = Array('d', 12, lock = False) # [output] current left, right hand action(12) data. + hand_ctrl = Inspire_Controller_FTP(left_hand_pos_array, right_hand_pos_array, dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array, simulation_mode=args.sim) elif args.ee == "brainco": left_hand_pos_array = Array('d', 75, lock = True) # [input] right_hand_pos_array = Array('d', 75, lock = True) # [input] @@ -331,10 +338,10 @@ if __name__ == '__main__': left_hand_action = [dual_gripper_action_array[0]] right_hand_action = [dual_gripper_action_array[1]] current_body_state = arm_ctrl.get_current_motor_q().tolist() - current_body_action = [-tele_data.left_ctrl_thumbstickValue[1] * 0.3, - -tele_data.left_ctrl_thumbstickValue[0] * 0.3, - -tele_data.right_ctrl_thumbstickValue[0]* 0.3] - elif (args.ee == "inspire1" or args.ee == "brainco") and args.xr_mode == "hand": + current_body_action = [-tele_data.tele_state.left_thumbstick_value[1] * 0.3, + -tele_data.tele_state.left_thumbstick_value[0] * 0.3, + -tele_data.tele_state.right_thumbstick_value[0] * 0.3] + elif (args.ee == "inspire_dfx" or args.ee == "inspire_ftp" or args.ee == "brainco") and args.xr_mode == "hand": with dual_hand_data_lock: left_ee_state = dual_hand_state_array[:6] right_ee_state = dual_hand_state_array[-6:]