Browse Source
feat: add the support for inspire_ftp dexterous hands
feat: add the support for inspire_ftp dexterous hands
Signed-off-by: libsxm-dev <2641911366@qq.com>main
3 changed files with 250 additions and 13 deletions
-
4teleop/robot_control/robot_hand_inspire_dfx.py
-
229teleop/robot_control/robot_hand_inspire_ftp.py
-
30teleop/teleop_hand_and_arm.py
@ -0,0 +1,229 @@ |
|||
from unitree_sdk2py.core.channel import ChannelPublisher, ChannelSubscriber, ChannelFactoryInitialize |
|||
from inspire_sdkpy import inspire_dds, inspire_hand_defaut |
|||
|
|||
from teleop.robot_control.hand_retargeting import HandRetargeting, HandType |
|||
import numpy as np |
|||
import threading |
|||
import time |
|||
from multiprocessing import Process, Array, Lock |
|||
|
|||
import logging_mp |
|||
logger_mp = logging_mp.get_logger(__name__) |
|||
|
|||
inspire_tip_indices = [4, 9, 14, 19, 24] |
|||
Inspire_Num_Motors = 6 |
|||
|
|||
kTopicInspireCtrlLeft = "rt/inspire_hand/ctrl/l" |
|||
kTopicInspireCtrlRight = "rt/inspire_hand/ctrl/r" |
|||
kTopicInspireStateLeft = "rt/inspire_hand/state/l" |
|||
kTopicInspireStateRight = "rt/inspire_hand/state/r" |
|||
|
|||
class Inspire_Controller_FTP: |
|||
def __init__(self, left_hand_array, right_hand_array, dual_hand_data_lock = None, dual_hand_state_array = None, |
|||
dual_hand_action_array = None, fps = 100.0, Unit_Test = False, simulation_mode = False): |
|||
logger_mp.info("Initialize Inspire_Controller...") |
|||
self.fps = fps |
|||
self.Unit_Test = Unit_Test |
|||
self.simulation_mode = simulation_mode |
|||
if not self.Unit_Test: |
|||
self.hand_retargeting = HandRetargeting(HandType.INSPIRE_HAND) |
|||
else: |
|||
self.hand_retargeting = HandRetargeting(HandType.INSPIRE_HAND_Unit_Test) |
|||
|
|||
if self.simulation_mode: |
|||
ChannelFactoryInitialize(1) |
|||
else: |
|||
ChannelFactoryInitialize(0) |
|||
|
|||
# Initialize hand command publishers |
|||
self.LeftHandCmd_publisher = ChannelPublisher(kTopicInspireCtrlLeft, inspire_dds.inspire_hand_ctrl) |
|||
self.LeftHandCmd_publisher.Init() |
|||
self.RightHandCmd_publisher = ChannelPublisher(kTopicInspireCtrlRight, inspire_dds.inspire_hand_ctrl) |
|||
self.RightHandCmd_publisher.Init() |
|||
|
|||
# Initialize hand state subscribers |
|||
self.LeftHandState_subscriber = ChannelSubscriber(kTopicInspireStateLeft, inspire_dds.inspire_hand_state) |
|||
self.LeftHandState_subscriber.Init() # Consider using callback if preferred: Init(callback_func, period_ms) |
|||
self.RightHandState_subscriber = ChannelSubscriber(kTopicInspireStateRight, inspire_dds.inspire_hand_state) |
|||
self.RightHandState_subscriber.Init() |
|||
|
|||
# Shared Arrays for hand states ([0,1] normalized values) |
|||
self.left_hand_state_array = Array('d', Inspire_Num_Motors, lock=True) |
|||
self.right_hand_state_array = Array('d', Inspire_Num_Motors, lock=True) |
|||
|
|||
# Initialize subscribe thread |
|||
self.subscribe_state_thread = threading.Thread(target=self._subscribe_hand_state) |
|||
self.subscribe_state_thread.daemon = True |
|||
self.subscribe_state_thread.start() |
|||
|
|||
# Wait for initial DDS messages (optional, but good for ensuring connection) |
|||
wait_count = 0 |
|||
while not (any(self.left_hand_state_array) or any(self.right_hand_state_array)): |
|||
if wait_count % 100 == 0: # Print every second |
|||
logger_mp.info(f"[Inspire_Controller] Waiting to subscribe to hand states from DDS (L: {any(self.left_hand_state_array)}, R: {any(self.right_hand_state_array)})...") |
|||
time.sleep(0.01) |
|||
wait_count += 1 |
|||
if wait_count > 500: # Timeout after 5 seconds |
|||
logger_mp.warning("[Inspire_Controller] Warning: Timeout waiting for initial hand states. Proceeding anyway.") |
|||
break |
|||
logger_mp.info("[Inspire_Controller] Initial hand states received or timeout.") |
|||
|
|||
hand_control_process = Process(target=self.control_process, args=(left_hand_array, right_hand_array, self.left_hand_state_array, self.right_hand_state_array, |
|||
dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array)) |
|||
hand_control_process.daemon = True |
|||
hand_control_process.start() |
|||
|
|||
logger_mp.info("Initialize Inspire_Controller OK!\n") |
|||
|
|||
def _subscribe_hand_state(self): |
|||
logger_mp.info("[Inspire_Controller] Subscribe thread started.") |
|||
while True: |
|||
# Left Hand |
|||
left_state_msg = self.LeftHandState_subscriber.Read() |
|||
if left_state_msg is not None: |
|||
if hasattr(left_state_msg, 'angle_act') and len(left_state_msg.angle_act) == Inspire_Num_Motors: |
|||
with self.left_hand_state_array.get_lock(): |
|||
for i in range(Inspire_Num_Motors): |
|||
self.left_hand_state_array[i] = left_state_msg.angle_act[i] / 1000.0 |
|||
else: |
|||
logger_mp.warning(f"[Inspire_Controller] Received left_state_msg but attributes are missing or incorrect. Type: {type(left_state_msg)}, Content: {str(left_state_msg)[:100]}") |
|||
# Right Hand |
|||
right_state_msg = self.RightHandState_subscriber.Read() |
|||
if right_state_msg is not None: |
|||
if hasattr(right_state_msg, 'angle_act') and len(right_state_msg.angle_act) == Inspire_Num_Motors: |
|||
with self.right_hand_state_array.get_lock(): |
|||
for i in range(Inspire_Num_Motors): |
|||
self.right_hand_state_array[i] = right_state_msg.angle_act[i] / 1000.0 |
|||
else: |
|||
logger_mp.warning(f"[Inspire_Controller] Received right_state_msg but attributes are missing or incorrect. Type: {type(right_state_msg)}, Content: {str(right_state_msg)[:100]}") |
|||
|
|||
time.sleep(0.002) |
|||
|
|||
def _send_hand_command(self, left_angle_cmd_scaled, right_angle_cmd_scaled): |
|||
""" |
|||
Send scaled angle commands [0-1000] to both hands. |
|||
""" |
|||
# Left Hand Command |
|||
left_cmd_msg = inspire_hand_defaut.get_inspire_hand_ctrl() |
|||
left_cmd_msg.angle_set = left_angle_cmd_scaled |
|||
left_cmd_msg.mode = 0b0001 # Mode 1: Angle control |
|||
self.LeftHandCmd_publisher.Write(left_cmd_msg) |
|||
|
|||
# Right Hand Command |
|||
right_cmd_msg = inspire_hand_defaut.get_inspire_hand_ctrl() |
|||
right_cmd_msg.angle_set = right_angle_cmd_scaled |
|||
right_cmd_msg.mode = 0b0001 # Mode 1: Angle control |
|||
self.RightHandCmd_publisher.Write(right_cmd_msg) |
|||
|
|||
def control_process(self, left_hand_array, right_hand_array, left_hand_state_array, right_hand_state_array, |
|||
dual_hand_data_lock = None, dual_hand_state_array = None, dual_hand_action_array = None): |
|||
logger_mp.info("[Inspire_Controller] Control process started.") |
|||
self.running = True |
|||
|
|||
left_q_target = np.full(Inspire_Num_Motors, 1.0) |
|||
right_q_target = np.full(Inspire_Num_Motors, 1.0) |
|||
|
|||
try: |
|||
while self.running: |
|||
start_time = time.time() |
|||
# get dual hand state |
|||
with left_hand_array.get_lock(): |
|||
left_hand_data = np.array(left_hand_array[:]).reshape(25, 3).copy() |
|||
with right_hand_array.get_lock(): |
|||
right_hand_data = np.array(right_hand_array[:]).reshape(25, 3).copy() |
|||
|
|||
# Read left and right q_state from shared arrays |
|||
state_data = np.concatenate((np.array(left_hand_state_array[:]), np.array(right_hand_state_array[:]))) |
|||
|
|||
if not np.all(right_hand_data == 0.0) and not np.all(left_hand_data[4] == np.array([-1.13, 0.3, 0.15])): # if hand data has been initialized. |
|||
ref_left_value = left_hand_data[self.hand_retargeting.left_indices[1,:]] - left_hand_data[self.hand_retargeting.left_indices[0,:]] |
|||
ref_right_value = right_hand_data[self.hand_retargeting.right_indices[1,:]] - right_hand_data[self.hand_retargeting.right_indices[0,:]] |
|||
|
|||
left_q_target = self.hand_retargeting.left_retargeting.retarget(ref_left_value)[self.hand_retargeting.left_dex_retargeting_to_hardware] |
|||
right_q_target = self.hand_retargeting.right_retargeting.retarget(ref_right_value)[self.hand_retargeting.right_dex_retargeting_to_hardware] |
|||
|
|||
def normalize(val, min_val, max_val): |
|||
return np.clip((max_val - val) / (max_val - min_val), 0.0, 1.0) |
|||
|
|||
for idx in range(Inspire_Num_Motors): |
|||
if idx <= 3: |
|||
left_q_target[idx] = normalize(left_q_target[idx], 0.0, 1.7) |
|||
right_q_target[idx] = normalize(right_q_target[idx], 0.0, 1.7) |
|||
elif idx == 4: |
|||
left_q_target[idx] = normalize(left_q_target[idx], 0.0, 0.5) |
|||
right_q_target[idx] = normalize(right_q_target[idx], 0.0, 0.5) |
|||
elif idx == 5: |
|||
left_q_target[idx] = normalize(left_q_target[idx], -0.1, 1.3) |
|||
right_q_target[idx] = normalize(right_q_target[idx], -0.1, 1.3) |
|||
|
|||
scaled_left_cmd = [int(np.clip(val * 1000, 0, 1000)) for val in left_q_target] |
|||
scaled_right_cmd = [int(np.clip(val * 1000, 0, 1000)) for val in right_q_target] |
|||
|
|||
# get dual hand action |
|||
action_data = np.concatenate((left_q_target, right_q_target)) |
|||
if dual_hand_state_array and dual_hand_action_array: |
|||
with dual_hand_data_lock: |
|||
dual_hand_state_array[:] = state_data |
|||
dual_hand_action_array[:] = action_data |
|||
|
|||
self._send_hand_command(scaled_left_cmd, scaled_right_cmd) |
|||
current_time = time.time() |
|||
time_elapsed = current_time - start_time |
|||
sleep_time = max(0, (1 / self.fps) - time_elapsed) |
|||
time.sleep(sleep_time) |
|||
finally: |
|||
logger_mp.info("Inspire_Controller has been closed.") |
|||
|
|||
if __name__ == '__main__': |
|||
logger_mp.info("Starting Inspire_Controller example...") |
|||
mock_left_hand_input = Array('d', 75, lock=True) |
|||
mock_right_hand_input = Array('d', 75, lock=True) |
|||
|
|||
with mock_right_hand_input.get_lock(): |
|||
for i in range(len(mock_right_hand_input)): |
|||
mock_right_hand_input[i] = (i % 10) * 0.01 |
|||
|
|||
with mock_left_hand_input.get_lock(): |
|||
temp_left_mat = np.zeros((25,3)) |
|||
temp_left_mat[4] = np.array([-1.13, 0.3, 0.15]) |
|||
mock_left_hand_input[:] = temp_left_mat.flatten() |
|||
|
|||
shared_lock = Lock() |
|||
shared_state = Array('d', Inspire_Num_Motors * 2, lock=False) |
|||
shared_action = Array('d', Inspire_Num_Motors * 2, lock=False) |
|||
|
|||
try: |
|||
controller = Inspire_Controller( |
|||
left_hand_array=mock_left_hand_input, |
|||
right_hand_array=mock_right_hand_input, |
|||
dual_hand_data_lock=shared_lock, |
|||
dual_hand_state_array=shared_state, |
|||
dual_hand_action_array=shared_action, |
|||
fps=50.0, |
|||
Unit_Test=False, |
|||
) |
|||
|
|||
count = 0 |
|||
main_loop_running = True |
|||
while main_loop_running: |
|||
try: |
|||
time.sleep(1.0) |
|||
# Simulate a slight change in human hand input |
|||
with mock_right_hand_input.get_lock(): |
|||
mock_right_hand_input[inspire_tip_indices[0]*3 + 1] = 0.1 + (count % 10) * 0.02 |
|||
|
|||
with shared_lock: |
|||
print(f"Cycle {count} - Logged State: {[f'{x:.3f}' for x in shared_state[:]]}, Logged Action: {[f'{x:.3f}' for x in shared_action[:]]}") |
|||
count +=1 |
|||
if count > 3000 : # Increased run time for more observation |
|||
print("Example finished after 3000 cycles.") |
|||
main_loop_running = False |
|||
except KeyboardInterrupt: |
|||
print("Main loop interrupted. Finishing example.") |
|||
main_loop_running = False |
|||
|
|||
|
|||
except Exception as e: |
|||
print(f"An error occurred in the example: {e}") |
|||
finally: |
|||
print("Exiting main program.") |
|||
Write
Preview
Loading…
Cancel
Save
Reference in new issue