diff --git a/assets/inspire_hand/inspire_hand.yml b/assets/inspire_hand/inspire_hand.yml index a3aaaf0..8466ce0 100644 --- a/assets/inspire_hand/inspire_hand.yml +++ b/assets/inspire_hand/inspire_hand.yml @@ -1,39 +1,62 @@ left: - type: vector + type: DexPilot # or vector urdf_path: inspire_hand/inspire_hand_left.urdf - wrist_link_name: "L_hand_base_link" # Target refers to the retargeting target, which is the robot hand - target_joint_names: ['L_thumb_proximal_yaw_joint', 'L_thumb_proximal_pitch_joint', - 'L_index_proximal_joint', 'L_middle_proximal_joint', - 'L_ring_proximal_joint', 'L_pinky_proximal_joint' ] + target_joint_names: + [ + 'L_thumb_proximal_yaw_joint', + 'L_thumb_proximal_pitch_joint', + 'L_index_proximal_joint', + 'L_middle_proximal_joint', + 'L_ring_proximal_joint', + 'L_pinky_proximal_joint' + ] + + # for DexPilot type + wrist_link_name: "L_hand_base_link" + finger_tip_link_names: [ "L_thumb_tip", "L_index_tip", "L_middle_tip", "L_ring_tip", "L_pinky_tip" ] + # If you do not know exactly how it is used, please leave it to None for default. + target_link_human_indices_dexpilot: [[ 9, 14, 19, 24, 14, 19, 24, 19, 24, 24, 0, 0, 0, 0, 0], [ 4, 4, 4, 4, 9, 9, 9, 14, 14, 19, 4, 9, 14, 19, 24]] + + # for vector type target_origin_link_names: [ "L_hand_base_link", "L_hand_base_link", "L_hand_base_link", "L_hand_base_link", "L_hand_base_link"] target_task_link_names: [ "L_thumb_tip", "L_index_tip", "L_middle_tip", "L_ring_tip", "L_pinky_tip" ] - scaling_factor: 1.20 - - # Source refers to the retargeting input, which usually corresponds to the human hand - # The joint indices of human hand joint which corresponds to each link in the target_link_names - target_link_human_indices: [ [ 0, 0, 0, 0, 0 ], [ 4, 9, 14, 19, 24 ] ] + target_link_human_indices_vector: [ [ 0, 0, 0, 0, 0 ], [ 4, 9, 14, 19, 24 ] ] + # Scaling factor for vector retargeting only + # For example, Allegro is 1.6 times larger than normal human hand, then this scaling factor should be 1.6 + scaling_factor: 1.20 # A smaller alpha means stronger filtering, i.e. more smooth but also larger latency low_pass_alpha: 0.2 right: - type: vector + type: DexPilot # or vector urdf_path: inspire_hand/inspire_hand_right.urdf - wrist_link_name: "R_hand_base_link" # Target refers to the retargeting target, which is the robot hand - target_joint_names: ['R_thumb_proximal_yaw_joint', 'R_thumb_proximal_pitch_joint', - 'R_index_proximal_joint', 'R_middle_proximal_joint', - 'R_ring_proximal_joint', 'R_pinky_proximal_joint' ] + target_joint_names: + [ + 'R_thumb_proximal_yaw_joint', + 'R_thumb_proximal_pitch_joint', + 'R_index_proximal_joint', + 'R_middle_proximal_joint', + 'R_ring_proximal_joint', + 'R_pinky_proximal_joint' + ] + + # for DexPilot type + wrist_link_name: "R_hand_base_link" + finger_tip_link_names: [ "R_thumb_tip", "R_index_tip", "R_middle_tip", "R_ring_tip", "R_pinky_tip" ] + # If you do not know exactly how it is used, please leave it to None for + target_link_human_indices_dexpilot: [[ 9, 14, 19, 24, 14, 19, 24, 19, 24, 24, 0, 0, 0, 0, 0], [ 4, 4, 4, 4, 9, 9, 9, 14, 14, 19, 4, 9, 14, 19, 24]] + target_origin_link_names: [ "R_hand_base_link", "R_hand_base_link", "R_hand_base_link", "R_hand_base_link", "R_hand_base_link"] target_task_link_names: [ "R_thumb_tip", "R_index_tip", "R_middle_tip", "R_ring_tip", "R_pinky_tip" ] + target_link_human_indices_vector: [ [ 0, 0, 0, 0, 0 ], [ 4, 9, 14, 19, 24 ] ] + + # Scaling factor for vector retargeting only + # For example, Allegro is 1.6 times larger than normal human hand, then this scaling factor should be 1.6 scaling_factor: 1.20 - - # Source refers to the retargeting input, which usually corresponds to the human hand - # The joint indices of human hand joint which corresponds to each link in the target_link_names - target_link_human_indices: [ [ 0, 0, 0, 0, 0 ], [ 4, 9, 14, 19, 24 ] ] - # A smaller alpha means stronger filtering, i.e. more smooth but also larger latency low_pass_alpha: 0.2 \ No newline at end of file diff --git a/assets/unitree_hand/unitree_dex3.yml b/assets/unitree_hand/unitree_dex3.yml index d54677a..dc5ec85 100644 --- a/assets/unitree_hand/unitree_dex3.yml +++ b/assets/unitree_hand/unitree_dex3.yml @@ -1,9 +1,8 @@ left: - type: vector + type: DexPilot # or vector urdf_path: unitree_hand/unitree_dex3_left.urdf # Target refers to the retargeting target, which is the robot hand - # target_joint_names: target_joint_names: [ "left_hand_thumb_0_joint", @@ -14,27 +13,26 @@ left: "left_hand_index_0_joint", "left_hand_index_1_joint", ] - wrist_link_name: None + + # for DexPilot type + wrist_link_name: "base_link" + finger_tip_link_names: [ "thumb_tip", "index_tip", "middle_tip"] + # If you do not know exactly how it is used, please leave it to None for default. + target_link_human_indices_dexpilot: [[ 9, 14, 14, 0, 0, 0], [ 4, 4, 9, 4, 9, 14]] + + # for vector type target_origin_link_names: ["base_link_thumb","base_link_index","base_link_middle"] target_task_link_names: ["thumb_tip","index_tip","middle_tip"] - target_link_human_indices: [[0, 0, 0], [4, 9, 14]] + target_link_human_indices_vector: [[0, 0, 0], [4, 9, 14]] - # Currently, the scaling factor for each finger is individually distinguished in the robot_hand_unitree.py file. - # The Unitree Dex3 has three fingers with the same specifications, so the retarget scaling factors need to be adjusted separately. - # The relevant code is as follows: - # ref_left_value[0] = ref_left_value[0] * 1.15 - # ref_left_value[1] = ref_left_value[1] * 1.05 - # ref_left_value[2] = ref_left_value[2] * 0.95 - # ref_right_value[0] = ref_right_value[0] * 1.15 - # ref_right_value[1] = ref_right_value[1] * 1.05 - # ref_right_value[2] = ref_right_value[2] * 0.95 + # Scaling factor for vector retargeting only + # For example, Allegro is 1.6 times larger than normal human hand, then this scaling factor should be 1.6 scaling_factor: 1.0 - # A smaller alpha means stronger filtering, i.e. more smooth but also larger latency low_pass_alpha: 0.2 right: - type: vector + type: DexPilot # or vector urdf_path: unitree_hand/unitree_dex3_right.urdf # Target refers to the retargeting target, which is the robot hand @@ -48,12 +46,18 @@ right: "right_hand_middle_0_joint", "right_hand_middle_1_joint", ] - wrist_link_name: None + # for DexPilot type + wrist_link_name: "base_link" + finger_tip_link_names: [ "thumb_tip", "index_tip", "middle_tip"] + target_link_human_indices_dexpilot: [[ 9, 14, 14, 0, 0, 0], [ 4, 4, 9, 4, 9, 14]] + + # for vector type target_origin_link_names: ["base_link_thumb","base_link_index","base_link_middle"] target_task_link_names: ["thumb_tip", "index_tip", "middle_tip"] - target_link_human_indices: [[0, 0, 0], [4, 9, 14]] + target_link_human_indices_vector: [[0, 0, 0], [4, 9, 14]] + # Scaling factor for vector retargeting only + # For example, Allegro is 1.6 times larger than normal human hand, then this scaling factor should be 1.6 scaling_factor: 1.0 - # A smaller alpha means stronger filtering, i.e. more smooth but also larger latency low_pass_alpha: 0.2 diff --git a/teleop/open_television/_test_tv_wrapper.py b/teleop/open_television/_test_tv_wrapper.py index 66ef23c..ab8579e 100644 --- a/teleop/open_television/_test_tv_wrapper.py +++ b/teleop/open_television/_test_tv_wrapper.py @@ -35,7 +35,7 @@ def run_test_tv_wrapper(): teleData = tv_wrapper.get_motion_state_data() logger_mp.info("=== TeleData Snapshot ===") - logger_mp.info(f"[Head Rotation Matrix]:\n{teleData.head_rotation}") + logger_mp.info(f"[Head Rotation Matrix]:\n{teleData.head_pose}") logger_mp.info(f"[Left EE Pose]:\n{teleData.left_arm_pose}") logger_mp.info(f"[Right EE Pose]:\n{teleData.right_arm_pose}") diff --git a/teleop/open_television/setup.py b/teleop/open_television/setup.py index 6799080..dc471af 100644 --- a/teleop/open_television/setup.py +++ b/teleop/open_television/setup.py @@ -14,7 +14,7 @@ setup( 'aiohttp_cors==0.7.0', 'aiortc==1.8.0', 'av==11.0.0', - # 'vuer==0.0.32rc7', # avp \ pico, hand tracking all work fine. + # 'vuer==0.0.32rc7', # avp \ pico, hand tracking all work fine. but it not support controller tracking. # 'vuer==0.0.35', # avp hand tracking works fine. pico is fine too, but the right eye occasionally goes black for a short time at start. @@ -32,11 +32,17 @@ setup( # from 'vuer==0.0.46' # avp hand tracking work fine. # to - 'vuer==0.0.56', # In pico hand tracking mode, using a hand gesture to click the "Virtual Reality" button + # 'vuer==0.0.56', # In pico hand tracking mode, using a hand gesture to click the "Virtual Reality" button # causes the screen to stay black and stuck loading. But if the button is clicked with the controller instead, everything works fine. # In pico controller tracking mode, using controller to click the "Virtual Reality" button # causes the screen to stay black and stuck loading. But if the button is clicked with a hand gesture instead, everything works fine. # avp \ pico all have hand marker visualization (RGB three-axis color coordinates). + + 'vuer==0.0.60', # a good version + # https://github.com/unitreerobotics/avp_teleoperate/issues/53 + # https://github.com/vuer-ai/vuer/issues/45 + # https://github.com/vuer-ai/vuer/issues/65 + ], python_requires='>=3.8', ) \ No newline at end of file diff --git a/teleop/open_television/television.py b/teleop/open_television/television.py index d891fb9..aa5c527 100644 --- a/teleop/open_television/television.py +++ b/teleop/open_television/television.py @@ -206,8 +206,8 @@ class TeleVision: MotionControllers( stream=True, key="motionControllers", - showLeft=False, - showRight=False, + left=True, + right=True, ), to="bgChildren", ) @@ -264,8 +264,8 @@ class TeleVision: MotionControllers( stream=True, key="motionControllers", - showLeft=False, - showRight=False, + left=True, + right=True, ), to="bgChildren", ) diff --git a/teleop/open_television/tv_wrapper.py b/teleop/open_television/tv_wrapper.py index cc2e1f4..92ca634 100644 --- a/teleop/open_television/tv_wrapper.py +++ b/teleop/open_television/tv_wrapper.py @@ -166,7 +166,7 @@ class TeleStateData: @dataclass class TeleData: - head_rotation: np.ndarray # (3,3) Head orientation matrix + head_pose: np.ndarray # (4,4) SE(3) pose of head matrix left_arm_pose: np.ndarray # (4,4) SE(3) pose of left arm right_arm_pose: np.ndarray # (4,4) SE(3) pose of right arm left_hand_pos: np.ndarray = None # (25,3) 3D positions of left hand joints @@ -259,7 +259,6 @@ class TeleVisionWrapper: right_IPunitree_Brobot_head_arm[0:3, 3] = right_IPunitree_Brobot_world_arm[0:3, 3] - Brobot_world_head[0:3, 3] # =====coordinate origin offset===== - Brobot_world_head_rot = Brobot_world_head[:3, :3] # The origin of the coordinate for IK Solve is near the WAIST joint motor. You can use teleop/robot_control/robot_arm_ik.py Unit_Test to visualize it. # The origin of the coordinate of IPunitree_Brobot_head_arm is HEAD. # So it is necessary to translate the origin of IPunitree_Brobot_head_arm from HEAD to WAIST. @@ -340,7 +339,7 @@ class TeleVisionWrapper: hand_state = None return TeleData( - head_rotation=Brobot_world_head_rot, + head_pose=Brobot_world_head, left_arm_pose=left_IPunitree_Brobot_waist_arm, right_arm_pose=right_IPunitree_Brobot_waist_arm, left_hand_pos=left_IPunitree_Brobot_arm_hand_pos, @@ -368,7 +367,6 @@ class TeleVisionWrapper: right_IPunitree_Brobot_head_arm[0:3, 3] = right_IPunitree_Brobot_head_arm[0:3, 3] - Brobot_world_head[0:3, 3] # =====coordinate origin offset===== - Brobot_world_head_rot = Brobot_world_head[:3, :3] # The origin of the coordinate for IK Solve is near the WAIST joint motor. You can use teleop/robot_control/robot_arm_ik.py Unit_Test to check it. # The origin of the coordinate of IPunitree_Brobot_head_arm is HEAD. # So it is necessary to translate the origin of IPunitree_Brobot_head_arm from HEAD to WAIST. @@ -403,7 +401,7 @@ class TeleVisionWrapper: controller_state = None return TeleData( - head_rotation=Brobot_world_head_rot, + head_pose=Brobot_world_head, left_arm_pose=left_IPunitree_Brobot_waist_arm, right_arm_pose=right_IPunitree_Brobot_waist_arm, left_trigger_value=10.0 - self.tvuer.left_controller_trigger_value * 10, diff --git a/teleop/robot_control/dex_retargeting/optimizer.py b/teleop/robot_control/dex_retargeting/optimizer.py index 9288523..ed891fe 100644 --- a/teleop/robot_control/dex_retargeting/optimizer.py +++ b/teleop/robot_control/dex_retargeting/optimizer.py @@ -324,7 +324,12 @@ class DexPilotOptimizer(Optimizer): origin_link_index, task_link_index = self.generate_link_indices(self.num_fingers) if target_link_human_indices is None: - target_link_human_indices = (np.stack([origin_link_index, task_link_index], axis=0) * 4).astype(int) + logical_indices = np.stack([origin_link_index, task_link_index], axis=0) + target_link_human_indices = np.where( + logical_indices > 0, + logical_indices * 5 - 1, + 0 + ).astype(int) link_names = [wrist_link_name] + finger_tip_link_names target_origin_link_names = [link_names[index] for index in origin_link_index] target_task_link_names = [link_names[index] for index in task_link_index] @@ -370,13 +375,13 @@ class DexPilotOptimizer(Optimizer): origin_link_index = [] task_link_index = [] - # Add indices for connections between fingers + # S1:Add indices for connections between fingers for i in range(1, num_fingers): for j in range(i + 1, num_fingers + 1): origin_link_index.append(j) task_link_index.append(i) - # Add indices for connections to the base (0) + # S2:Add indices for connections to the base (0) for i in range(1, num_fingers + 1): origin_link_index.append(0) task_link_index.append(i) diff --git a/teleop/robot_control/dex_retargeting/retargeting_config.py b/teleop/robot_control/dex_retargeting/retargeting_config.py index d2378c0..f1e8cd8 100644 --- a/teleop/robot_control/dex_retargeting/retargeting_config.py +++ b/teleop/robot_control/dex_retargeting/retargeting_config.py @@ -19,32 +19,34 @@ from .yourdfpy import DUMMY_JOINT_NAMES class RetargetingConfig: type: str urdf_path: str + target_joint_names: Optional[List[str]] = None # Whether to add free joint to the root of the robot. Free joint enable the robot hand move freely in the space add_dummy_free_joint: bool = False - # Source refers to the retargeting input, which usually corresponds to the human hand - # The joint indices of human hand joint which corresponds to each link in the target_link_names - target_link_human_indices: Optional[np.ndarray] = None - + # DexPilot retargeting related # The link on the robot hand which corresponding to the wrist of human hand wrist_link_name: Optional[str] = None + # DexPilot retargeting link names + finger_tip_link_names: Optional[List[str]] = None + target_link_human_indices_dexpilot: Optional[np.ndarray] = None # Position retargeting link names target_link_names: Optional[List[str]] = None + target_link_human_indices_position: Optional[np.ndarray] = None # Vector retargeting link names - target_joint_names: Optional[List[str]] = None target_origin_link_names: Optional[List[str]] = None target_task_link_names: Optional[List[str]] = None - - # DexPilot retargeting link names - finger_tip_link_names: Optional[List[str]] = None + target_link_human_indices_vector: Optional[np.ndarray] = None # Scaling factor for vector retargeting only # For example, Allegro is 1.6 times larger than normal human hand, then this scaling factor should be 1.6 scaling_factor: float = 1.0 + # Low pass filter + low_pass_alpha: float = 0.1 + # Optimization parameters normal_delta: float = 4e-3 huber_delta: float = 2e-2 @@ -59,9 +61,6 @@ class RetargetingConfig: # Mimic joint tag ignore_mimic_joint: bool = False - # Low pass filter - low_pass_alpha: float = 0.1 - _TYPE = ["vector", "position", "dexpilot"] _DEFAULT_URDF_DIR = "./" @@ -78,24 +77,24 @@ class RetargetingConfig: raise ValueError(f"Vector retargeting requires: target_origin_link_names + target_task_link_names") if len(self.target_task_link_names) != len(self.target_origin_link_names): raise ValueError(f"Vector retargeting origin and task links dim mismatch") - if self.target_link_human_indices.shape != (2, len(self.target_origin_link_names)): + if self.target_link_human_indices_vector.shape != (2, len(self.target_origin_link_names)): raise ValueError(f"Vector retargeting link names and link indices dim mismatch") - if self.target_link_human_indices is None: - raise ValueError(f"Vector retargeting requires: target_link_human_indices") + if self.target_link_human_indices_vector is None: + raise ValueError(f"Vector retargeting requires: target_link_human_indices_vector") elif self.type == "position": if self.target_link_names is None: raise ValueError(f"Position retargeting requires: target_link_names") - self.target_link_human_indices = self.target_link_human_indices.squeeze() - if self.target_link_human_indices.shape != (len(self.target_link_names),): + self.target_link_human_indices_position = self.target_link_human_indices_position.squeeze() + if self.target_link_human_indices_position.shape != (len(self.target_link_names),): raise ValueError(f"Position retargeting link names and link indices dim mismatch") - if self.target_link_human_indices is None: - raise ValueError(f"Position retargeting requires: target_link_human_indices") + if self.target_link_human_indices_position is None: + raise ValueError(f"Position retargeting requires: target_link_human_indices_position") elif self.type == "dexpilot": if self.finger_tip_link_names is None or self.wrist_link_name is None: raise ValueError(f"Position retargeting requires: finger_tip_link_names + wrist_link_name") - if self.target_link_human_indices is not None: + if self.target_link_human_indices_dexpilot is not None: print( "\033[33m", "Target link human indices is provided in the DexPilot retargeting config, which is uncommon.\n" @@ -132,8 +131,13 @@ class RetargetingConfig: @classmethod def from_dict(cls, cfg: Dict[str, Any], override: Optional[Dict] = None): - if "target_link_human_indices" in cfg: - cfg["target_link_human_indices"] = np.array(cfg["target_link_human_indices"]) + if "target_link_human_indices_position" in cfg: + cfg["target_link_human_indices_position"] = np.array(cfg["target_link_human_indices_position"]) + if "target_link_human_indices_vector" in cfg: + cfg["target_link_human_indices_vector"] = np.array(cfg["target_link_human_indices_vector"]) + if "target_link_human_indices_dexpilot" in cfg: + cfg["target_link_human_indices_dexpilot"] = np.array(cfg["target_link_human_indices_dexpilot"]) + if override is not None: for key, value in override.items(): cfg[key] = value @@ -170,7 +174,7 @@ class RetargetingConfig: robot, joint_names, target_link_names=self.target_link_names, - target_link_human_indices=self.target_link_human_indices, + target_link_human_indices=self.target_link_human_indices_position, norm_delta=self.normal_delta, huber_delta=self.huber_delta, ) @@ -180,7 +184,7 @@ class RetargetingConfig: joint_names, target_origin_link_names=self.target_origin_link_names, target_task_link_names=self.target_task_link_names, - target_link_human_indices=self.target_link_human_indices, + target_link_human_indices=self.target_link_human_indices_vector, scaling=self.scaling_factor, norm_delta=self.normal_delta, huber_delta=self.huber_delta, @@ -191,7 +195,7 @@ class RetargetingConfig: joint_names, finger_tip_link_names=self.finger_tip_link_names, wrist_link_name=self.wrist_link_name, - target_link_human_indices=self.target_link_human_indices, + target_link_human_indices=self.target_link_human_indices_dexpilot, scaling=self.scaling_factor, project_dist=self.project_dist, escape_dist=self.escape_dist, diff --git a/teleop/robot_control/hand_retargeting.py b/teleop/robot_control/hand_retargeting.py index 77c6a18..1c67945 100644 --- a/teleop/robot_control/hand_retargeting.py +++ b/teleop/robot_control/hand_retargeting.py @@ -38,6 +38,8 @@ class HandRetargeting: self.left_retargeting_joint_names = self.left_retargeting.joint_names self.right_retargeting_joint_names = self.right_retargeting.joint_names + self.left_indices = self.left_retargeting.optimizer.target_link_human_indices + self.right_indices = self.right_retargeting.optimizer.target_link_human_indices if hand_type == HandType.UNITREE_DEX3 or hand_type == HandType.UNITREE_DEX3_Unit_Test: # In section "Sort by message structure" of https://support.unitree.com/home/en/G1_developer/dexterous_hand diff --git a/teleop/robot_control/robot_arm.py b/teleop/robot_control/robot_arm.py index 3a1923b..9ee69b0 100644 --- a/teleop/robot_control/robot_arm.py +++ b/teleop/robot_control/robot_arm.py @@ -59,12 +59,12 @@ class DataBuffer: self.data = data class G1_29_ArmController: - def __init__(self, debug_mode = True, is_use_sim = False): + def __init__(self, motion_mode = False, simulation_mode = False): logger_mp.info("Initialize G1_29_ArmController...") self.q_target = np.zeros(14) self.tauff_target = np.zeros(14) - self.debug_mode = debug_mode - self.is_use_sim = is_use_sim + self.motion_mode = motion_mode + self.simulation_mode = simulation_mode self.kp_high = 300.0 self.kd_high = 3.0 self.kp_low = 80.0 @@ -81,11 +81,11 @@ class G1_29_ArmController: self._gradual_time = None # initialize lowcmd publisher and lowstate subscriber - ChannelFactoryInitialize(1) - if self.debug_mode: - self.lowcmd_publisher = ChannelPublisher(kTopicLowCommand_Debug, hg_LowCmd) - else: + ChannelFactoryInitialize(0) + if self.motion_mode: self.lowcmd_publisher = ChannelPublisher(kTopicLowCommand_Motion, hg_LowCmd) + else: + self.lowcmd_publisher = ChannelPublisher(kTopicLowCommand_Debug, hg_LowCmd) self.lowcmd_publisher.Init() self.lowstate_subscriber = ChannelSubscriber(kTopicLowState, hg_LowState) self.lowstate_subscriber.Init() @@ -159,7 +159,7 @@ class G1_29_ArmController: return cliped_arm_q_target def _ctrl_motor_state(self): - if not self.debug_mode: + if self.motion_mode: self.msg.motor_cmd[G1_29_JointIndex.kNotUsedJoint0].q = 1.0; while True: @@ -169,7 +169,7 @@ class G1_29_ArmController: arm_q_target = self.q_target arm_tauff_target = self.tauff_target - if self.is_use_sim: + if self.simulation_mode: cliped_arm_q_target = arm_q_target else: cliped_arm_q_target = self.clip_arm_q_target(arm_q_target, velocity_limit = self.arm_velocity_limit) @@ -218,19 +218,22 @@ class G1_29_ArmController: def ctrl_dual_arm_go_home(self): '''Move both the left and right arms of the robot to their home position by setting the target joint angles (q) and torques (tau) to zero.''' logger_mp.info("[G1_29_ArmController] ctrl_dual_arm_go_home start...") + max_attempts = 100 + current_attempts = 0 with self.ctrl_lock: self.q_target = np.zeros(14) # self.tauff_target = np.zeros(14) tolerance = 0.05 # Tolerance threshold for joint angles to determine "close to zero", can be adjusted based on your motor's precision requirements - while True: + while current_attempts < max_attempts: current_q = self.get_current_dual_arm_q() if np.all(np.abs(current_q) < tolerance): - if not self.debug_mode: + if self.motion_mode: for weight in np.arange(1, 0, -0.01): self.msg.motor_cmd[G1_29_JointIndex.kNotUsedJoint0].q = weight; time.sleep(0.02) logger_mp.info("[G1_29_ArmController] both arms have reached the home position.") break + current_attempts += 1 time.sleep(0.05) def speed_gradual_max(self, t = 5.0): @@ -338,8 +341,8 @@ class G1_29_JointIndex(IntEnum): kNotUsedJoint5 = 34 class G1_23_ArmController: - def __init__(self, is_use_sim = False): - self.is_use_sim = is_use_sim + def __init__(self, simulation_mode = False): + self.simulation_mode = simulation_mode logger_mp.info("Initialize G1_23_ArmController...") self.q_target = np.zeros(10) @@ -443,7 +446,7 @@ class G1_23_ArmController: arm_q_target = self.q_target arm_tauff_target = self.tauff_target - if self.is_use_sim: + if self.simulation_mode: cliped_arm_q_target = arm_q_target else: cliped_arm_q_target = self.clip_arm_q_target(arm_q_target, velocity_limit = self.arm_velocity_limit) @@ -492,15 +495,18 @@ class G1_23_ArmController: def ctrl_dual_arm_go_home(self): '''Move both the left and right arms of the robot to their home position by setting the target joint angles (q) and torques (tau) to zero.''' logger_mp.info("[G1_23_ArmController] ctrl_dual_arm_go_home start...") + max_attempts = 100 + current_attempts = 0 with self.ctrl_lock: self.q_target = np.zeros(10) # self.tauff_target = np.zeros(10) tolerance = 0.05 # Tolerance threshold for joint angles to determine "close to zero", can be adjusted based on your motor's precision requirements - while True: + while current_attempts < max_attempts: current_q = self.get_current_dual_arm_q() if np.all(np.abs(current_q) < tolerance): logger_mp.info("[G1_23_ArmController] both arms have reached the home position.") break + current_attempts += 1 time.sleep(0.05) def speed_gradual_max(self, t = 5.0): @@ -600,8 +606,8 @@ class G1_23_JointIndex(IntEnum): kNotUsedJoint5 = 34 class H1_2_ArmController: - def __init__(self, is_use_sim = False): - self.is_use_sim = is_use_sim + def __init__(self, simulation_mode = False): + self.simulation_mode = simulation_mode logger_mp.info("Initialize H1_2_ArmController...") self.q_target = np.zeros(14) @@ -623,7 +629,7 @@ class H1_2_ArmController: self._gradual_time = None # initialize lowcmd publisher and lowstate subscriber - ChannelFactoryInitialize(0) + ChannelFactoryInitialize(1) self.lowcmd_publisher = ChannelPublisher(kTopicLowCommand_Debug, hg_LowCmd) self.lowcmd_publisher.Init() self.lowstate_subscriber = ChannelSubscriber(kTopicLowState, hg_LowState) @@ -705,7 +711,7 @@ class H1_2_ArmController: arm_q_target = self.q_target arm_tauff_target = self.tauff_target - if self.is_use_sim: + if self.simulation_mode: cliped_arm_q_target = arm_q_target else: cliped_arm_q_target = self.clip_arm_q_target(arm_q_target, velocity_limit = self.arm_velocity_limit) @@ -754,15 +760,18 @@ class H1_2_ArmController: def ctrl_dual_arm_go_home(self): '''Move both the left and right arms of the robot to their home position by setting the target joint angles (q) and torques (tau) to zero.''' logger_mp.info("[H1_2_ArmController] ctrl_dual_arm_go_home start...") + max_attempts = 100 + current_attempts = 0 with self.ctrl_lock: self.q_target = np.zeros(14) # self.tauff_target = np.zeros(14) tolerance = 0.05 # Tolerance threshold for joint angles to determine "close to zero", can be adjusted based on your motor's precision requirements - while True: + while current_attempts < max_attempts: current_q = self.get_current_dual_arm_q() if np.all(np.abs(current_q) < tolerance): logger_mp.info("[H1_2_ArmController] both arms have reached the home position.") break + current_attempts += 1 time.sleep(0.05) def speed_gradual_max(self, t = 5.0): @@ -869,8 +878,8 @@ class H1_2_JointIndex(IntEnum): kNotUsedJoint7 = 34 class H1_ArmController: - def __init__(self, is_use_sim = False): - self.is_use_sim = is_use_sim + def __init__(self, simulation_mode = False): + self.simulation_mode = simulation_mode logger_mp.info("Initialize H1_ArmController...") self.q_target = np.zeros(8) @@ -966,7 +975,7 @@ class H1_ArmController: arm_q_target = self.q_target arm_tauff_target = self.tauff_target - if self.is_use_sim: + if self.simulation_mode: cliped_arm_q_target = arm_q_target else: cliped_arm_q_target = self.clip_arm_q_target(arm_q_target, velocity_limit = self.arm_velocity_limit) @@ -1011,15 +1020,18 @@ class H1_ArmController: def ctrl_dual_arm_go_home(self): '''Move both the left and right arms of the robot to their home position by setting the target joint angles (q) and torques (tau) to zero.''' logger_mp.info("[H1_ArmController] ctrl_dual_arm_go_home start...") + max_attempts = 100 + current_attempts = 0 with self.ctrl_lock: self.q_target = np.zeros(8) # self.tauff_target = np.zeros(8) tolerance = 0.05 # Tolerance threshold for joint angles to determine "close to zero", can be adjusted based on your motor's precision requirements - while True: + while current_attempts < max_attempts: current_q = self.get_current_dual_arm_q() if np.all(np.abs(current_q) < tolerance): logger_mp.info("[H1_ArmController] both arms have reached the home position.") break + current_attempts += 1 time.sleep(0.05) def speed_gradual_max(self, t = 5.0): @@ -1092,7 +1104,7 @@ if __name__ == "__main__": import pinocchio as pin arm_ik = G1_29_ArmIK(Unit_Test = True, Visualization = False) - arm = G1_29_ArmController() + arm = G1_29_ArmController(simulation_mode=True) # arm_ik = G1_23_ArmIK(Unit_Test = True, Visualization = False) # arm = G1_23_ArmController() # arm_ik = H1_2_ArmIK(Unit_Test = True, Visualization = False) diff --git a/teleop/robot_control/robot_hand_inspire.py b/teleop/robot_control/robot_hand_inspire.py index 47a1e55..bec0a63 100644 --- a/teleop/robot_control/robot_hand_inspire.py +++ b/teleop/robot_control/robot_hand_inspire.py @@ -13,7 +13,6 @@ from multiprocessing import Process, Array import logging_mp logger_mp = logging_mp.get_logger(__name__) -inspire_tip_indices = [4, 9, 14, 19, 24] Inspire_Num_Motors = 6 kTopicInspireCommand = "rt/inspire/cmd" kTopicInspireState = "rt/inspire/state" @@ -103,16 +102,16 @@ class Inspire_Controller: start_time = time.time() # get dual hand state with left_hand_array.get_lock(): - left_hand_mat = np.array(left_hand_array[:]).reshape(25, 3).copy() + left_hand_data = np.array(left_hand_array[:]).reshape(25, 3).copy() with right_hand_array.get_lock(): - right_hand_mat = np.array(right_hand_array[:]).reshape(25, 3).copy() + right_hand_data = np.array(right_hand_array[:]).reshape(25, 3).copy() # Read left and right q_state from shared arrays state_data = np.concatenate((np.array(left_hand_state_array[:]), np.array(right_hand_state_array[:]))) - if not np.all(right_hand_mat == 0.0) and not np.all(left_hand_mat[4] == np.array([-1.13, 0.3, 0.15])): # if hand data has been initialized. - ref_left_value = left_hand_mat[inspire_tip_indices] - ref_right_value = right_hand_mat[inspire_tip_indices] + if not np.all(right_hand_data == 0.0) and not np.all(left_hand_data[4] == np.array([-1.13, 0.3, 0.15])): # if hand data has been initialized. + ref_left_value = left_hand_data[self.hand_retargeting.left_indices[1,:]] - left_hand_data[self.hand_retargeting.left_indices[0,:]] + ref_right_value = right_hand_data[self.hand_retargeting.right_indices[1,:]] - right_hand_data[self.hand_retargeting.right_indices[0,:]] left_q_target = self.hand_retargeting.left_retargeting.retarget(ref_left_value)[self.hand_retargeting.left_dex_retargeting_to_hardware] right_q_target = self.hand_retargeting.right_retargeting.retarget(ref_right_value)[self.hand_retargeting.right_dex_retargeting_to_hardware] diff --git a/teleop/robot_control/robot_hand_unitree.py b/teleop/robot_control/robot_hand_unitree.py index 909801d..fb04cf1 100644 --- a/teleop/robot_control/robot_hand_unitree.py +++ b/teleop/robot_control/robot_hand_unitree.py @@ -24,7 +24,6 @@ import logging_mp logger_mp = logging_mp.get_logger(__name__) -unitree_tip_indices = [4, 9, 14] # [thumb, index, middle] in OpenXR Dex3_Num_Motors = 7 kTopicDex3LeftCommand = "rt/dex3/left/cmd" kTopicDex3RightCommand = "rt/dex3/right/cmd" @@ -183,14 +182,8 @@ class Dex3_1_Controller: state_data = np.concatenate((np.array(left_hand_state_array[:]), np.array(right_hand_state_array[:]))) if not np.all(right_hand_data == 0.0) and not np.all(left_hand_data[4] == np.array([-1.13, 0.3, 0.15])): # if hand data has been initialized. - ref_left_value = left_hand_data[unitree_tip_indices] - ref_right_value = right_hand_data[unitree_tip_indices] - ref_left_value[0] = ref_left_value[0] * 1.15 - ref_left_value[1] = ref_left_value[1] * 1.05 - ref_left_value[2] = ref_left_value[2] * 0.95 - ref_right_value[0] = ref_right_value[0] * 1.15 - ref_right_value[1] = ref_right_value[1] * 1.05 - ref_right_value[2] = ref_right_value[2] * 0.95 + ref_left_value = left_hand_data[self.hand_retargeting.left_indices[1,:]] - left_hand_data[self.hand_retargeting.left_indices[0,:]] + ref_right_value = right_hand_data[self.hand_retargeting.right_indices[1,:]] - right_hand_data[self.hand_retargeting.right_indices[0,:]] left_q_target = self.hand_retargeting.left_retargeting.retarget(ref_left_value)[self.hand_retargeting.right_dex_retargeting_to_hardware] right_q_target = self.hand_retargeting.right_retargeting.retarget(ref_right_value)[self.hand_retargeting.right_dex_retargeting_to_hardware] @@ -236,7 +229,7 @@ kTopicGripperState = "rt/unitree_actuator/state" class Gripper_Controller: def __init__(self, left_gripper_value_in, right_gripper_value_in, dual_gripper_data_lock = None, dual_gripper_state_out = None, dual_gripper_action_out = None, - filter = True, fps = 200.0, Unit_Test = False, is_use_sim = False): + filter = True, fps = 200.0, Unit_Test = False, simulation_mode = False): """ [note] A *_array type parameter requires using a multiprocessing Array, because it needs to be passed to the internal child process @@ -259,7 +252,7 @@ class Gripper_Controller: self.fps = fps self.Unit_Test = Unit_Test - self.is_use_sim = is_use_sim + self.simulation_mode = simulation_mode if filter: self.smooth_filter = WeightedMovingFilter(np.array([0.5, 0.3, 0.2]), Gripper_Num_Motors) @@ -316,7 +309,7 @@ class Gripper_Controller: def control_thread(self, left_gripper_value_in, right_gripper_value_in, dual_gripper_state_in, dual_hand_data_lock = None, dual_gripper_state_out = None, dual_gripper_action_out = None): self.running = True - if self.is_use_sim: + if self.simulation_mode: DELTA_GRIPPER_CMD = 1.0 else: DELTA_GRIPPER_CMD = 0.18 # The motor rotates 5.4 radians, the clamping jaw slide open 9 cm, so 0.6 rad <==> 1 cm, 0.18 rad <==> 3 mm diff --git a/teleop/teleop_hand_and_arm.py b/teleop/teleop_hand_and_arm.py index c08d8e5..5a2c25b 100644 --- a/teleop/teleop_hand_and_arm.py +++ b/teleop/teleop_hand_and_arm.py @@ -21,23 +21,17 @@ from teleop.robot_control.robot_hand_unitree import Dex3_1_Controller, Gripper_C from teleop.robot_control.robot_hand_inspire import Inspire_Controller from teleop.image_server.image_client import ImageClient from teleop.utils.episode_writer import EpisodeWriter - -from unitree_sdk2py.core.channel import ChannelPublisher, ChannelFactoryInitialize -from unitree_sdk2py.idl.std_msgs.msg.dds_ import String_ - - from sshkeyboard import listen_keyboard, stop_listening -def publish_reset_category(category: int,publisher): - # construct message - msg = String_(data=str(category)) # pass data parameter directly during initialization - - # create publisher - - # publish message +# for simulation +from unitree_sdk2py.core.channel import ChannelPublisher +from unitree_sdk2py.idl.std_msgs.msg.dds_ import String_ +def publish_reset_category(category: int,publisher): # Scene Reset signal + msg = String_(data=str(category)) publisher.Write(msg) - print(f"published reset category: {category}") + logger_mp.info(f"published reset category: {category}") +# state transition start_signal = False running = True should_toggle_recording = False @@ -47,12 +41,15 @@ def on_press(key): if key == 'r': start_signal = True logger_mp.info("Program start signal received.") - elif key == 's': + elif key == 'q': stop_listening() running = False - elif key == 'q': + elif key == 's': should_toggle_recording = True -threading.Thread(target=listen_keyboard, kwargs={"on_press": on_press}, daemon=True).start() + else: + logger_mp.info(f"{key} was pressed, but no action is defined for this key.") +listen_keyboard_thread = threading.Thread(target=listen_keyboard, kwargs={"on_press": on_press, "until": None, "sequential": False,}, daemon=True) +listen_keyboard_thread.start() if __name__ == '__main__': parser = argparse.ArgumentParser() @@ -61,20 +58,18 @@ if __name__ == '__main__': parser.add_argument('--xr-mode', type=str, choices=['hand', 'controller'], default='hand', help='Select XR device tracking source') parser.add_argument('--arm', type=str, choices=['G1_29', 'G1_23', 'H1_2', 'H1'], default='G1_29', help='Select arm controller') - parser.add_argument('--ee', type=str, choices=['dex3', 'gripper', 'inspire1'],default='gripper', help='Select end effector controller') + parser.add_argument('--ee', type=str, choices=['dex3', 'gripper', 'inspire1'], help='Select end effector controller') parser.add_argument('--record', action = 'store_true', help = 'Enable data recording') - parser.add_argument('--debug', action = 'store_false', help = 'Enable debug mode') - parser.add_argument('--headless', action='store_true', help='Run in headless mode (no display)') - - parser.add_argument('--is_use_sim', action = 'store_true', help = 'Use simulation or not') - parser.set_defaults(is_use_sim = True) + parser.add_argument('--motion', action = 'store_true', help = 'Enable motion control mode') + parser.add_argument('--headless', action='store_true', help='Enable headless mode (no display)') + parser.add_argument('--sim', action = 'store_true', help = 'Enable isaac simulation mode') args = parser.parse_args() logger_mp.info(f"args: {args}") # image client: img_config should be the same as the configuration in image_server.py (of Robot's development computing unit) - if args.is_use_sim: + if args.sim: img_config = { 'fps': 30, 'head_camera_type': 'opencv', @@ -114,14 +109,20 @@ if __name__ == '__main__': tv_img_shm = shared_memory.SharedMemory(create = True, size = np.prod(tv_img_shape) * np.uint8().itemsize) tv_img_array = np.ndarray(tv_img_shape, dtype = np.uint8, buffer = tv_img_shm.buf) - if WRIST: + if WRIST and args.sim: + wrist_img_shape = (img_config['wrist_camera_image_shape'][0], img_config['wrist_camera_image_shape'][1] * 2, 3) + wrist_img_shm = shared_memory.SharedMemory(create = True, size = np.prod(wrist_img_shape) * np.uint8().itemsize) + wrist_img_array = np.ndarray(wrist_img_shape, dtype = np.uint8, buffer = wrist_img_shm.buf) + img_client = ImageClient(tv_img_shape = tv_img_shape, tv_img_shm_name = tv_img_shm.name, + wrist_img_shape = wrist_img_shape, wrist_img_shm_name = wrist_img_shm.name, server_address="127.0.0.1") + elif WRIST and not args.sim: wrist_img_shape = (img_config['wrist_camera_image_shape'][0], img_config['wrist_camera_image_shape'][1] * 2, 3) wrist_img_shm = shared_memory.SharedMemory(create = True, size = np.prod(wrist_img_shape) * np.uint8().itemsize) wrist_img_array = np.ndarray(wrist_img_shape, dtype = np.uint8, buffer = wrist_img_shm.buf) img_client = ImageClient(tv_img_shape = tv_img_shape, tv_img_shm_name = tv_img_shm.name, - wrist_img_shape = wrist_img_shape, wrist_img_shm_name = wrist_img_shm.name,server_address="127.0.0.1") + wrist_img_shape = wrist_img_shape, wrist_img_shm_name = wrist_img_shm.name) else: - img_client = ImageClient(tv_img_shape = tv_img_shape, tv_img_shm_name = tv_img_shm.name, server_address="127.0.0.1") + img_client = ImageClient(tv_img_shape = tv_img_shape, tv_img_shm_name = tv_img_shm.name) image_receive_thread = threading.Thread(target = img_client.receive_process, daemon = True) image_receive_thread.daemon = True @@ -133,16 +134,16 @@ if __name__ == '__main__': # arm if args.arm == 'G1_29': - arm_ctrl = G1_29_ArmController(debug_mode=args.debug, is_use_sim=args.is_use_sim) + arm_ctrl = G1_29_ArmController(motion_mode=args.motion, simulation_mode=args.sim) arm_ik = G1_29_ArmIK() elif args.arm == 'G1_23': - arm_ctrl = G1_23_ArmController(is_use_sim=args.is_use_sim) + arm_ctrl = G1_23_ArmController(simulation_mode=args.sim) arm_ik = G1_23_ArmIK() elif args.arm == 'H1_2': - arm_ctrl = H1_2_ArmController(is_use_sim=args.is_use_sim) + arm_ctrl = H1_2_ArmController(simulation_mode=args.sim) arm_ik = H1_2_ArmIK() elif args.arm == 'H1': - arm_ctrl = H1_ArmController(is_use_sim=args.is_use_sim) + arm_ctrl = H1_ArmController(simulation_mode=args.sim) arm_ik = H1_ArmIK() # end-effector @@ -159,7 +160,7 @@ if __name__ == '__main__': dual_gripper_data_lock = Lock() dual_gripper_state_array = Array('d', 2, lock=False) # current left, right gripper state(2) data. dual_gripper_action_array = Array('d', 2, lock=False) # current left, right gripper action(2) data. - gripper_ctrl = Gripper_Controller(left_gripper_value, right_gripper_value, dual_gripper_data_lock, dual_gripper_state_array, dual_gripper_action_array, is_use_sim=args.is_use_sim) + gripper_ctrl = Gripper_Controller(left_gripper_value, right_gripper_value, dual_gripper_data_lock, dual_gripper_state_array, dual_gripper_action_array, simulation_mode=args.sim) elif args.ee == "inspire1": left_hand_pos_array = Array('d', 75, lock = True) # [input] right_hand_pos_array = Array('d', 75, lock = True) # [input] @@ -169,19 +170,15 @@ if __name__ == '__main__': hand_ctrl = Inspire_Controller(left_hand_pos_array, right_hand_pos_array, dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array) else: pass - if args.is_use_sim: + + # simulation mode + if args.sim: reset_pose_publisher = ChannelPublisher("rt/reset_pose/cmd", String_) reset_pose_publisher.Init() from teleop.utils.sim_state_topic import start_sim_state_subscribe sim_state_subscriber = start_sim_state_subscribe() - # from teleop.utils.sim_state_client import SimStateClient - # sim_state_client = SimStateClient() - # sim_state_client.Init() - # sim_state_client.SetTimeout(5.0) - # sim_state_client.WaitLeaseApplied() - # c=1 # xr mode - if args.xr_mode == 'controller' and not args.debug: + if args.xr_mode == 'controller' and args.motion: from unitree_sdk2py.g1.loco.g1_loco_client import LocoClient sport_client = LocoClient() sport_client.SetTimeout(0.0001) @@ -207,12 +204,12 @@ if __name__ == '__main__': if key == ord('s'): stop_listening() running = False - if args.is_use_sim: + if args.sim: publish_reset_category(2, reset_pose_publisher) elif key == ord('q'): should_toggle_recording = True elif key == ord('a'): - if args.is_use_sim: + if args.sim: publish_reset_category(2, reset_pose_publisher) if args.record and should_toggle_recording: @@ -225,7 +222,7 @@ if __name__ == '__main__': else: is_recording = False recorder.save_episode() - if args.is_use_sim: + if args.sim: publish_reset_category(1, reset_pose_publisher) # get input data tele_data = tv_wrapper.get_motion_state_data() @@ -248,7 +245,7 @@ if __name__ == '__main__': pass # high level control - if args.xr_mode == 'controller' and not args.debug: + if args.xr_mode == 'controller' and args.motion: # quit teleoperate if tele_data.tele_state.right_aButton: running = False @@ -412,5 +409,6 @@ if __name__ == '__main__': wrist_img_shm.close() if args.record: recorder.close() + listen_keyboard_thread.join() logger_mp.info("Finally, exiting program...") exit(0)