3 changed files with 246 additions and 10 deletions
-
2teleop/robot_control/robot_hand_inspire_dfx.py
-
229teleop/robot_control/robot_hand_inspire_ftp.py
-
25teleop/teleop_hand_and_arm.py
@ -0,0 +1,229 @@ |
|||||
|
from unitree_sdk2py.core.channel import ChannelPublisher, ChannelSubscriber, ChannelFactoryInitialize |
||||
|
from inspire_sdkpy import inspire_dds, inspire_hand_defaut |
||||
|
|
||||
|
from teleop.robot_control.hand_retargeting import HandRetargeting, HandType |
||||
|
import numpy as np |
||||
|
import threading |
||||
|
import time |
||||
|
from multiprocessing import Process, Array, Lock |
||||
|
|
||||
|
import logging_mp |
||||
|
logger_mp = logging_mp.get_logger(__name__) |
||||
|
|
||||
|
inspire_tip_indices = [4, 9, 14, 19, 24] |
||||
|
Inspire_Num_Motors = 6 |
||||
|
|
||||
|
kTopicInspireCtrlLeft = "rt/inspire_hand/ctrl/l" |
||||
|
kTopicInspireCtrlRight = "rt/inspire_hand/ctrl/r" |
||||
|
kTopicInspireStateLeft = "rt/inspire_hand/state/l" |
||||
|
kTopicInspireStateRight = "rt/inspire_hand/state/r" |
||||
|
|
||||
|
class Inspire_Controller_FTP: |
||||
|
def __init__(self, left_hand_array, right_hand_array, dual_hand_data_lock = None, dual_hand_state_array = None, |
||||
|
dual_hand_action_array = None, fps = 100.0, Unit_Test = False, simulation_mode = False): |
||||
|
logger_mp.info("Initialize Inspire_Controller...") |
||||
|
self.fps = fps |
||||
|
self.Unit_Test = Unit_Test |
||||
|
self.simulation_mode = simulation_mode |
||||
|
if not self.Unit_Test: |
||||
|
self.hand_retargeting = HandRetargeting(HandType.INSPIRE_HAND) |
||||
|
else: |
||||
|
self.hand_retargeting = HandRetargeting(HandType.INSPIRE_HAND_Unit_Test) |
||||
|
|
||||
|
if self.simulation_mode: |
||||
|
ChannelFactoryInitialize(1) |
||||
|
else: |
||||
|
ChannelFactoryInitialize(0) |
||||
|
|
||||
|
# Initialize hand command publishers |
||||
|
self.LeftHandCmd_publisher = ChannelPublisher(kTopicInspireCtrlLeft, inspire_dds.inspire_hand_ctrl) |
||||
|
self.LeftHandCmd_publisher.Init() |
||||
|
self.RightHandCmd_publisher = ChannelPublisher(kTopicInspireCtrlRight, inspire_dds.inspire_hand_ctrl) |
||||
|
self.RightHandCmd_publisher.Init() |
||||
|
|
||||
|
# Initialize hand state subscribers |
||||
|
self.LeftHandState_subscriber = ChannelSubscriber(kTopicInspireStateLeft, inspire_dds.inspire_hand_state) |
||||
|
self.LeftHandState_subscriber.Init() # Consider using callback if preferred: Init(callback_func, period_ms) |
||||
|
self.RightHandState_subscriber = ChannelSubscriber(kTopicInspireStateRight, inspire_dds.inspire_hand_state) |
||||
|
self.RightHandState_subscriber.Init() |
||||
|
|
||||
|
# Shared Arrays for hand states ([0,1] normalized values) |
||||
|
self.left_hand_state_array = Array('d', Inspire_Num_Motors, lock=True) |
||||
|
self.right_hand_state_array = Array('d', Inspire_Num_Motors, lock=True) |
||||
|
|
||||
|
# Initialize subscribe thread |
||||
|
self.subscribe_state_thread = threading.Thread(target=self._subscribe_hand_state) |
||||
|
self.subscribe_state_thread.daemon = True |
||||
|
self.subscribe_state_thread.start() |
||||
|
|
||||
|
# Wait for initial DDS messages (optional, but good for ensuring connection) |
||||
|
wait_count = 0 |
||||
|
while not (any(self.left_hand_state_array) or any(self.right_hand_state_array)): |
||||
|
if wait_count % 100 == 0: # Print every second |
||||
|
logger_mp.info(f"[Inspire_Controller] Waiting to subscribe to hand states from DDS (L: {any(self.left_hand_state_array)}, R: {any(self.right_hand_state_array)})...") |
||||
|
time.sleep(0.01) |
||||
|
wait_count += 1 |
||||
|
if wait_count > 500: # Timeout after 5 seconds |
||||
|
logger_mp.warning("[Inspire_Controller] Warning: Timeout waiting for initial hand states. Proceeding anyway.") |
||||
|
break |
||||
|
logger_mp.info("[Inspire_Controller] Initial hand states received or timeout.") |
||||
|
|
||||
|
hand_control_process = Process(target=self.control_process, args=(left_hand_array, right_hand_array, self.left_hand_state_array, self.right_hand_state_array, |
||||
|
dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array)) |
||||
|
hand_control_process.daemon = True |
||||
|
hand_control_process.start() |
||||
|
|
||||
|
logger_mp.info("Initialize Inspire_Controller OK!\n") |
||||
|
|
||||
|
def _subscribe_hand_state(self): |
||||
|
logger_mp.info("[Inspire_Controller] Subscribe thread started.") |
||||
|
while True: |
||||
|
# Left Hand |
||||
|
left_state_msg = self.LeftHandState_subscriber.Read() |
||||
|
if left_state_msg is not None: |
||||
|
if hasattr(left_state_msg, 'angle_act') and len(left_state_msg.angle_act) == Inspire_Num_Motors: |
||||
|
with self.left_hand_state_array.get_lock(): |
||||
|
for i in range(Inspire_Num_Motors): |
||||
|
self.left_hand_state_array[i] = left_state_msg.angle_act[i] / 1000.0 |
||||
|
else: |
||||
|
logger_mp.warning(f"[Inspire_Controller] Received left_state_msg but attributes are missing or incorrect. Type: {type(left_state_msg)}, Content: {str(left_state_msg)[:100]}") |
||||
|
# Right Hand |
||||
|
right_state_msg = self.RightHandState_subscriber.Read() |
||||
|
if right_state_msg is not None: |
||||
|
if hasattr(right_state_msg, 'angle_act') and len(right_state_msg.angle_act) == Inspire_Num_Motors: |
||||
|
with self.right_hand_state_array.get_lock(): |
||||
|
for i in range(Inspire_Num_Motors): |
||||
|
self.right_hand_state_array[i] = right_state_msg.angle_act[i] / 1000.0 |
||||
|
else: |
||||
|
logger_mp.warning(f"[Inspire_Controller] Received right_state_msg but attributes are missing or incorrect. Type: {type(right_state_msg)}, Content: {str(right_state_msg)[:100]}") |
||||
|
|
||||
|
time.sleep(0.002) |
||||
|
|
||||
|
def _send_hand_command(self, left_angle_cmd_scaled, right_angle_cmd_scaled): |
||||
|
""" |
||||
|
Send scaled angle commands [0-1000] to both hands. |
||||
|
""" |
||||
|
# Left Hand Command |
||||
|
left_cmd_msg = inspire_hand_defaut.get_inspire_hand_ctrl() |
||||
|
left_cmd_msg.angle_set = left_angle_cmd_scaled |
||||
|
left_cmd_msg.mode = 0b0001 # Mode 1: Angle control |
||||
|
self.LeftHandCmd_publisher.Write(left_cmd_msg) |
||||
|
|
||||
|
# Right Hand Command |
||||
|
right_cmd_msg = inspire_hand_defaut.get_inspire_hand_ctrl() |
||||
|
right_cmd_msg.angle_set = right_angle_cmd_scaled |
||||
|
right_cmd_msg.mode = 0b0001 # Mode 1: Angle control |
||||
|
self.RightHandCmd_publisher.Write(right_cmd_msg) |
||||
|
|
||||
|
def control_process(self, left_hand_array, right_hand_array, left_hand_state_array, right_hand_state_array, |
||||
|
dual_hand_data_lock = None, dual_hand_state_array = None, dual_hand_action_array = None): |
||||
|
logger_mp.info("[Inspire_Controller] Control process started.") |
||||
|
self.running = True |
||||
|
|
||||
|
left_q_target = np.full(Inspire_Num_Motors, 1.0) |
||||
|
right_q_target = np.full(Inspire_Num_Motors, 1.0) |
||||
|
|
||||
|
try: |
||||
|
while self.running: |
||||
|
start_time = time.time() |
||||
|
# get dual hand state |
||||
|
with left_hand_array.get_lock(): |
||||
|
left_hand_data = np.array(left_hand_array[:]).reshape(25, 3).copy() |
||||
|
with right_hand_array.get_lock(): |
||||
|
right_hand_data = np.array(right_hand_array[:]).reshape(25, 3).copy() |
||||
|
|
||||
|
# Read left and right q_state from shared arrays |
||||
|
state_data = np.concatenate((np.array(left_hand_state_array[:]), np.array(right_hand_state_array[:]))) |
||||
|
|
||||
|
if not np.all(right_hand_data == 0.0) and not np.all(left_hand_data[4] == np.array([-1.13, 0.3, 0.15])): # if hand data has been initialized. |
||||
|
ref_left_value = left_hand_data[self.hand_retargeting.left_indices[1,:]] - left_hand_data[self.hand_retargeting.left_indices[0,:]] |
||||
|
ref_right_value = right_hand_data[self.hand_retargeting.right_indices[1,:]] - right_hand_data[self.hand_retargeting.right_indices[0,:]] |
||||
|
|
||||
|
left_q_target = self.hand_retargeting.left_retargeting.retarget(ref_left_value)[self.hand_retargeting.left_dex_retargeting_to_hardware] |
||||
|
right_q_target = self.hand_retargeting.right_retargeting.retarget(ref_right_value)[self.hand_retargeting.right_dex_retargeting_to_hardware] |
||||
|
|
||||
|
def normalize(val, min_val, max_val): |
||||
|
return np.clip((max_val - val) / (max_val - min_val), 0.0, 1.0) |
||||
|
|
||||
|
for idx in range(Inspire_Num_Motors): |
||||
|
if idx <= 3: |
||||
|
left_q_target[idx] = normalize(left_q_target[idx], 0.0, 1.7) |
||||
|
right_q_target[idx] = normalize(right_q_target[idx], 0.0, 1.7) |
||||
|
elif idx == 4: |
||||
|
left_q_target[idx] = normalize(left_q_target[idx], 0.0, 0.5) |
||||
|
right_q_target[idx] = normalize(right_q_target[idx], 0.0, 0.5) |
||||
|
elif idx == 5: |
||||
|
left_q_target[idx] = normalize(left_q_target[idx], -0.1, 1.3) |
||||
|
right_q_target[idx] = normalize(right_q_target[idx], -0.1, 1.3) |
||||
|
|
||||
|
scaled_left_cmd = [int(np.clip(val * 1000, 0, 1000)) for val in left_q_target] |
||||
|
scaled_right_cmd = [int(np.clip(val * 1000, 0, 1000)) for val in right_q_target] |
||||
|
|
||||
|
# get dual hand action |
||||
|
action_data = np.concatenate((left_q_target, right_q_target)) |
||||
|
if dual_hand_state_array and dual_hand_action_array: |
||||
|
with dual_hand_data_lock: |
||||
|
dual_hand_state_array[:] = state_data |
||||
|
dual_hand_action_array[:] = action_data |
||||
|
|
||||
|
self._send_hand_command(scaled_left_cmd, scaled_right_cmd) |
||||
|
current_time = time.time() |
||||
|
time_elapsed = current_time - start_time |
||||
|
sleep_time = max(0, (1 / self.fps) - time_elapsed) |
||||
|
time.sleep(sleep_time) |
||||
|
finally: |
||||
|
logger_mp.info("Inspire_Controller has been closed.") |
||||
|
|
||||
|
if __name__ == '__main__': |
||||
|
logger_mp.info("Starting Inspire_Controller example...") |
||||
|
mock_left_hand_input = Array('d', 75, lock=True) |
||||
|
mock_right_hand_input = Array('d', 75, lock=True) |
||||
|
|
||||
|
with mock_right_hand_input.get_lock(): |
||||
|
for i in range(len(mock_right_hand_input)): |
||||
|
mock_right_hand_input[i] = (i % 10) * 0.01 |
||||
|
|
||||
|
with mock_left_hand_input.get_lock(): |
||||
|
temp_left_mat = np.zeros((25,3)) |
||||
|
temp_left_mat[4] = np.array([-1.13, 0.3, 0.15]) |
||||
|
mock_left_hand_input[:] = temp_left_mat.flatten() |
||||
|
|
||||
|
shared_lock = Lock() |
||||
|
shared_state = Array('d', Inspire_Num_Motors * 2, lock=False) |
||||
|
shared_action = Array('d', Inspire_Num_Motors * 2, lock=False) |
||||
|
|
||||
|
try: |
||||
|
controller = Inspire_Controller( |
||||
|
left_hand_array=mock_left_hand_input, |
||||
|
right_hand_array=mock_right_hand_input, |
||||
|
dual_hand_data_lock=shared_lock, |
||||
|
dual_hand_state_array=shared_state, |
||||
|
dual_hand_action_array=shared_action, |
||||
|
fps=50.0, |
||||
|
Unit_Test=False, |
||||
|
) |
||||
|
|
||||
|
count = 0 |
||||
|
main_loop_running = True |
||||
|
while main_loop_running: |
||||
|
try: |
||||
|
time.sleep(1.0) |
||||
|
# Simulate a slight change in human hand input |
||||
|
with mock_right_hand_input.get_lock(): |
||||
|
mock_right_hand_input[inspire_tip_indices[0]*3 + 1] = 0.1 + (count % 10) * 0.02 |
||||
|
|
||||
|
with shared_lock: |
||||
|
print(f"Cycle {count} - Logged State: {[f'{x:.3f}' for x in shared_state[:]]}, Logged Action: {[f'{x:.3f}' for x in shared_action[:]]}") |
||||
|
count +=1 |
||||
|
if count > 3000 : # Increased run time for more observation |
||||
|
print("Example finished after 3000 cycles.") |
||||
|
main_loop_running = False |
||||
|
except KeyboardInterrupt: |
||||
|
print("Main loop interrupted. Finishing example.") |
||||
|
main_loop_running = False |
||||
|
|
||||
|
|
||||
|
except Exception as e: |
||||
|
print(f"An error occurred in the example: {e}") |
||||
|
finally: |
||||
|
print("Exiting main program.") |
||||
Write
Preview
Loading…
Cancel
Save
Reference in new issue