diff --git a/requirements.txt b/requirements.txt index cce9de0..bf45d8f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,7 +2,6 @@ aiohttp==3.9.5 aiohttp_cors==0.7.0 aiortc==1.8.0 av==11.0.0 -dex_retargeting==0.1.1 einops==0.8.0 h5py==3.11.0 ipython==8.12.3 diff --git a/teleop/robot_control/dex_retargeting/CITATION.cff b/teleop/robot_control/dex_retargeting/CITATION.cff new file mode 100644 index 0000000..770c132 --- /dev/null +++ b/teleop/robot_control/dex_retargeting/CITATION.cff @@ -0,0 +1,22 @@ +cff-version: 1.2.0 +authors: + - family-names: "Qin" + given-names: "Yuzhe" + - family-names: "Yang" + given-names: "Wei" + - family-names: "Huang" + given-names: "Binghao" + - family-names: "Van Wyk" + given-names: "Karl" + - family-names: "Su" + given-names: "Hao" + - family-names: "Wang" + given-names: "Xiaolong" + - family-names: "Chao" + given-names: "Yu-Wei" + - family-names: "Fox" + given-names: "Dieter" +date-released: 2023-10-25 +title: "AnyTeleop" +message: "Thanks for using dex-retargeting. If you use this software, please cite it as below." +url: "https://github.com/dexsuite/dex-retargeting" diff --git a/teleop/robot_control/dex_retargeting/LICENSE b/teleop/robot_control/dex_retargeting/LICENSE new file mode 100644 index 0000000..15904fb --- /dev/null +++ b/teleop/robot_control/dex_retargeting/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2023 Yuzhe Qin + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/teleop/robot_control/dex_retargeting/__init__.py b/teleop/robot_control/dex_retargeting/__init__.py new file mode 100644 index 0000000..3dd3d2d --- /dev/null +++ b/teleop/robot_control/dex_retargeting/__init__.py @@ -0,0 +1 @@ +__version__ = "0.4.6" diff --git a/teleop/robot_control/dex_retargeting/constants.py b/teleop/robot_control/dex_retargeting/constants.py new file mode 100644 index 0000000..4b7147a --- /dev/null +++ b/teleop/robot_control/dex_retargeting/constants.py @@ -0,0 +1,85 @@ +import enum +from pathlib import Path +from typing import Optional + +import numpy as np + +OPERATOR2MANO_RIGHT = np.array( + [ + [0, 0, -1], + [-1, 0, 0], + [0, 1, 0], + ] +) + +OPERATOR2MANO_LEFT = np.array( + [ + [0, 0, -1], + [1, 0, 0], + [0, -1, 0], + ] +) + + +class RobotName(enum.Enum): + allegro = enum.auto() + shadow = enum.auto() + svh = enum.auto() + leap = enum.auto() + ability = enum.auto() + inspire = enum.auto() + panda = enum.auto() + + +class RetargetingType(enum.Enum): + vector = enum.auto() # For teleoperation, no finger closing prior + position = enum.auto() # For offline data processing, especially hand-object interaction data + dexpilot = enum.auto() # For teleoperation, with finger closing prior + + +class HandType(enum.Enum): + right = enum.auto() + left = enum.auto() + + +ROBOT_NAME_MAP = { + RobotName.allegro: "allegro_hand", + RobotName.shadow: "shadow_hand", + RobotName.svh: "schunk_svh_hand", + RobotName.leap: "leap_hand", + RobotName.ability: "ability_hand", + RobotName.inspire: "inspire_hand", + RobotName.panda: "panda_gripper", +} + +ROBOT_NAMES = list(ROBOT_NAME_MAP.keys()) + + +def get_default_config_path( + robot_name: RobotName, retargeting_type: RetargetingType, hand_type: HandType +) -> Optional[Path]: + config_path = Path(__file__).parent / "configs" + if retargeting_type is RetargetingType.position: + config_path = config_path / "offline" + else: + config_path = config_path / "teleop" + + robot_name_str = ROBOT_NAME_MAP[robot_name] + hand_type_str = hand_type.name + if "gripper" in robot_name_str: # For gripper robots, only use gripper config file. + if retargeting_type == RetargetingType.dexpilot: + config_name = f"{robot_name_str}_dexpilot.yml" + else: + config_name = f"{robot_name_str}.yml" + else: + if retargeting_type == RetargetingType.dexpilot: + config_name = f"{robot_name_str}_{hand_type_str}_dexpilot.yml" + else: + config_name = f"{robot_name_str}_{hand_type_str}.yml" + return config_path / config_name + + +OPERATOR2MANO = { + HandType.right: OPERATOR2MANO_RIGHT, + HandType.left: OPERATOR2MANO_LEFT, +} diff --git a/teleop/robot_control/dex_retargeting/kinematics_adaptor.py b/teleop/robot_control/dex_retargeting/kinematics_adaptor.py new file mode 100644 index 0000000..045a49f --- /dev/null +++ b/teleop/robot_control/dex_retargeting/kinematics_adaptor.py @@ -0,0 +1,102 @@ +from abc import abstractmethod +from typing import List + +import numpy as np + +from .robot_wrapper import RobotWrapper + + +class KinematicAdaptor: + def __init__(self, robot: RobotWrapper, target_joint_names: List[str]): + self.robot = robot + self.target_joint_names = target_joint_names + + # Index mapping + self.idx_pin2target = np.array([robot.get_joint_index(n) for n in target_joint_names]) + + @abstractmethod + def forward_qpos(self, qpos: np.ndarray) -> np.ndarray: + """ + Adapt the joint position for different kinematics constraints. + Note that the joint order of this qpos is consistent with pinocchio + Args: + qpos: the pinocchio qpos + + Returns: the adapted qpos with the same shape as input + + """ + pass + + @abstractmethod + def backward_jacobian(self, jacobian: np.ndarray) -> np.ndarray: + """ + Adapt the jacobian for different kinematics applications. + Note that the joint order of this Jacobian is consistent with pinocchio + Args: + jacobian: the original jacobian + + Returns: the adapted jacobian with the same shape as input + + """ + pass + + +class MimicJointKinematicAdaptor(KinematicAdaptor): + def __init__( + self, + robot: RobotWrapper, + target_joint_names: List[str], + source_joint_names: List[str], + mimic_joint_names: List[str], + multipliers: List[float], + offsets: List[float], + ): + super().__init__(robot, target_joint_names) + + self.multipliers = np.array(multipliers) + self.offsets = np.array(offsets) + + # Joint name check + union_set = set(mimic_joint_names).intersection(set(target_joint_names)) + if len(union_set) > 0: + raise ValueError( + f"Mimic joint should not be one of the target joints.\n" + f"Mimic joints: {mimic_joint_names}.\n" + f"Target joints: {target_joint_names}\n" + f"You need to specify the target joint names explicitly in your retargeting config" + f" for robot with mimic joint constraints: {target_joint_names}" + ) + + # Indices in the pinocchio + self.idx_pin2source = np.array([robot.get_joint_index(name) for name in source_joint_names]) + self.idx_pin2mimic = np.array([robot.get_joint_index(name) for name in mimic_joint_names]) + + # Indices in the output results + self.idx_target2source = np.array([self.target_joint_names.index(n) for n in source_joint_names]) + + # Dimension check + len_source, len_mimic = self.idx_target2source.shape[0], self.idx_pin2mimic.shape[0] + len_mul, len_offset = self.multipliers.shape[0], self.offsets.shape[0] + if not (len_mimic == len_source == len_mul == len_offset): + raise ValueError( + f"Mimic joints setting dimension mismatch.\n" + f"Source joints: {len_source}, mimic joints: {len_mimic}, multiplier: {len_mul}, offset: {len_offset}" + ) + self.num_active_joints = len(robot.dof_joint_names) - len_mimic + + # Uniqueness check + if len(mimic_joint_names) != len(np.unique(mimic_joint_names)): + raise ValueError(f"Redundant mimic joint names: {mimic_joint_names}") + + def forward_qpos(self, pin_qpos: np.ndarray) -> np.ndarray: + mimic_qpos = pin_qpos[self.idx_pin2source] * self.multipliers + self.offsets + pin_qpos[self.idx_pin2mimic] = mimic_qpos + return pin_qpos + + def backward_jacobian(self, jacobian: np.ndarray) -> np.ndarray: + target_jacobian = jacobian[..., self.idx_pin2target] + mimic_joint_jacobian = jacobian[..., self.idx_pin2mimic] * self.multipliers + + for i, index in enumerate(self.idx_target2source): + target_jacobian[..., index] += mimic_joint_jacobian[..., i] + return target_jacobian diff --git a/teleop/robot_control/dex_retargeting/optimizer.py b/teleop/robot_control/dex_retargeting/optimizer.py new file mode 100644 index 0000000..9288523 --- /dev/null +++ b/teleop/robot_control/dex_retargeting/optimizer.py @@ -0,0 +1,511 @@ +from abc import abstractmethod +from typing import List, Optional + +import nlopt +import numpy as np +import torch + +from .kinematics_adaptor import KinematicAdaptor, MimicJointKinematicAdaptor +from .robot_wrapper import RobotWrapper + + +class Optimizer: + retargeting_type = "BASE" + + def __init__( + self, + robot: RobotWrapper, + target_joint_names: List[str], + target_link_human_indices: np.ndarray, + ): + self.robot = robot + self.num_joints = robot.dof + + joint_names = robot.dof_joint_names + idx_pin2target = [] + for target_joint_name in target_joint_names: + if target_joint_name not in joint_names: + raise ValueError(f"Joint {target_joint_name} given does not appear to be in robot XML.") + idx_pin2target.append(joint_names.index(target_joint_name)) + self.target_joint_names = target_joint_names + self.idx_pin2target = np.array(idx_pin2target) + + self.idx_pin2fixed = np.array([i for i in range(robot.dof) if i not in idx_pin2target], dtype=int) + self.opt = nlopt.opt(nlopt.LD_SLSQP, len(idx_pin2target)) + self.opt_dof = len(idx_pin2target) # This dof includes the mimic joints + + # Target + self.target_link_human_indices = target_link_human_indices + + # Free joint + link_names = robot.link_names + self.has_free_joint = len([name for name in link_names if "dummy" in name]) >= 6 + + # Kinematics adaptor + self.adaptor: Optional[KinematicAdaptor] = None + + def set_joint_limit(self, joint_limits: np.ndarray, epsilon=1e-3): + if joint_limits.shape != (self.opt_dof, 2): + raise ValueError(f"Expect joint limits have shape: {(self.opt_dof, 2)}, but get {joint_limits.shape}") + self.opt.set_lower_bounds((joint_limits[:, 0] - epsilon).tolist()) + self.opt.set_upper_bounds((joint_limits[:, 1] + epsilon).tolist()) + + def get_link_indices(self, target_link_names): + return [self.robot.get_link_index(link_name) for link_name in target_link_names] + + def set_kinematic_adaptor(self, adaptor: KinematicAdaptor): + self.adaptor = adaptor + + # Remove mimic joints from fixed joint list + if isinstance(adaptor, MimicJointKinematicAdaptor): + fixed_idx = self.idx_pin2fixed + mimic_idx = adaptor.idx_pin2mimic + new_fixed_id = np.array([x for x in fixed_idx if x not in mimic_idx], dtype=int) + self.idx_pin2fixed = new_fixed_id + + def retarget(self, ref_value, fixed_qpos, last_qpos): + """ + Compute the retargeting results using non-linear optimization + Args: + ref_value: the reference value in cartesian space as input, different optimizer has different reference + fixed_qpos: the fixed value (not optimized) in retargeting, consistent with self.fixed_joint_names + last_qpos: the last retargeting results or initial value, consistent with function return + + Returns: joint position of robot, the joint order and dim is consistent with self.target_joint_names + + """ + if len(fixed_qpos) != len(self.idx_pin2fixed): + raise ValueError( + f"Optimizer has {len(self.idx_pin2fixed)} joints but non_target_qpos {fixed_qpos} is given" + ) + objective_fn = self.get_objective_function(ref_value, fixed_qpos, np.array(last_qpos).astype(np.float32)) + + self.opt.set_min_objective(objective_fn) + try: + qpos = self.opt.optimize(last_qpos) + return np.array(qpos, dtype=np.float32) + except RuntimeError as e: + print(e) + return np.array(last_qpos, dtype=np.float32) + + @abstractmethod + def get_objective_function(self, ref_value: np.ndarray, fixed_qpos: np.ndarray, last_qpos: np.ndarray): + pass + + @property + def fixed_joint_names(self): + joint_names = self.robot.dof_joint_names + return [joint_names[i] for i in self.idx_pin2fixed] + + +class PositionOptimizer(Optimizer): + retargeting_type = "POSITION" + + def __init__( + self, + robot: RobotWrapper, + target_joint_names: List[str], + target_link_names: List[str], + target_link_human_indices: np.ndarray, + huber_delta=0.02, + norm_delta=4e-3, + ): + super().__init__(robot, target_joint_names, target_link_human_indices) + self.body_names = target_link_names + self.huber_loss = torch.nn.SmoothL1Loss(beta=huber_delta) + self.norm_delta = norm_delta + + # Sanity check and cache link indices + self.target_link_indices = self.get_link_indices(target_link_names) + + self.opt.set_ftol_abs(1e-5) + + def get_objective_function(self, target_pos: np.ndarray, fixed_qpos: np.ndarray, last_qpos: np.ndarray): + qpos = np.zeros(self.num_joints) + qpos[self.idx_pin2fixed] = fixed_qpos + torch_target_pos = torch.as_tensor(target_pos) + torch_target_pos.requires_grad_(False) + + def objective(x: np.ndarray, grad: np.ndarray) -> float: + qpos[self.idx_pin2target] = x + + # Kinematics forwarding for qpos + if self.adaptor is not None: + qpos[:] = self.adaptor.forward_qpos(qpos)[:] + + self.robot.compute_forward_kinematics(qpos) + target_link_poses = [self.robot.get_link_pose(index) for index in self.target_link_indices] + body_pos = np.stack([pose[:3, 3] for pose in target_link_poses], axis=0) # (n ,3) + + # Torch computation for accurate loss and grad + torch_body_pos = torch.as_tensor(body_pos) + torch_body_pos.requires_grad_() + + # Loss term for kinematics retargeting based on 3D position error + huber_distance = self.huber_loss(torch_body_pos, torch_target_pos) + result = huber_distance.cpu().detach().item() + + if grad.size > 0: + jacobians = [] + for i, index in enumerate(self.target_link_indices): + link_body_jacobian = self.robot.compute_single_link_local_jacobian(qpos, index)[:3, ...] + link_pose = target_link_poses[i] + link_rot = link_pose[:3, :3] + link_kinematics_jacobian = link_rot @ link_body_jacobian + jacobians.append(link_kinematics_jacobian) + + # Note: the joint order in this jacobian is consistent pinocchio + jacobians = np.stack(jacobians, axis=0) + huber_distance.backward() + grad_pos = torch_body_pos.grad.cpu().numpy()[:, None, :] + + # Convert the jacobian from pinocchio order to target order + if self.adaptor is not None: + jacobians = self.adaptor.backward_jacobian(jacobians) + else: + jacobians = jacobians[..., self.idx_pin2target] + + # Compute the gradient to the qpos + grad_qpos = np.matmul(grad_pos, jacobians) + grad_qpos = grad_qpos.mean(1).sum(0) + grad_qpos += 2 * self.norm_delta * (x - last_qpos) + + grad[:] = grad_qpos[:] + + return result + + return objective + + +class VectorOptimizer(Optimizer): + retargeting_type = "VECTOR" + + def __init__( + self, + robot: RobotWrapper, + target_joint_names: List[str], + target_origin_link_names: List[str], + target_task_link_names: List[str], + target_link_human_indices: np.ndarray, + huber_delta=0.02, + norm_delta=4e-3, + scaling=1.0, + ): + super().__init__(robot, target_joint_names, target_link_human_indices) + self.origin_link_names = target_origin_link_names + self.task_link_names = target_task_link_names + self.huber_loss = torch.nn.SmoothL1Loss(beta=huber_delta, reduction="mean") + self.norm_delta = norm_delta + self.scaling = scaling + + # Computation cache for better performance + # For one link used in multiple vectors, e.g. hand palm, we do not want to compute it multiple times + self.computed_link_names = list(set(target_origin_link_names).union(set(target_task_link_names))) + self.origin_link_indices = torch.tensor( + [self.computed_link_names.index(name) for name in target_origin_link_names] + ) + self.task_link_indices = torch.tensor([self.computed_link_names.index(name) for name in target_task_link_names]) + + # Cache link indices that will involve in kinematics computation + self.computed_link_indices = self.get_link_indices(self.computed_link_names) + + self.opt.set_ftol_abs(1e-6) + + def get_objective_function(self, target_vector: np.ndarray, fixed_qpos: np.ndarray, last_qpos: np.ndarray): + qpos = np.zeros(self.num_joints) + qpos[self.idx_pin2fixed] = fixed_qpos + torch_target_vec = torch.as_tensor(target_vector) * self.scaling + torch_target_vec.requires_grad_(False) + + def objective(x: np.ndarray, grad: np.ndarray) -> float: + qpos[self.idx_pin2target] = x + + # Kinematics forwarding for qpos + if self.adaptor is not None: + qpos[:] = self.adaptor.forward_qpos(qpos)[:] + + self.robot.compute_forward_kinematics(qpos) + target_link_poses = [self.robot.get_link_pose(index) for index in self.computed_link_indices] + body_pos = np.array([pose[:3, 3] for pose in target_link_poses]) + + # Torch computation for accurate loss and grad + torch_body_pos = torch.as_tensor(body_pos) + torch_body_pos.requires_grad_() + + # Index link for computation + origin_link_pos = torch_body_pos[self.origin_link_indices, :] + task_link_pos = torch_body_pos[self.task_link_indices, :] + robot_vec = task_link_pos - origin_link_pos + + # Loss term for kinematics retargeting based on 3D position error + vec_dist = torch.norm(robot_vec - torch_target_vec, dim=1, keepdim=False) + huber_distance = self.huber_loss(vec_dist, torch.zeros_like(vec_dist)) + result = huber_distance.cpu().detach().item() + + if grad.size > 0: + jacobians = [] + for i, index in enumerate(self.computed_link_indices): + link_body_jacobian = self.robot.compute_single_link_local_jacobian(qpos, index)[:3, ...] + link_pose = target_link_poses[i] + link_rot = link_pose[:3, :3] + link_kinematics_jacobian = link_rot @ link_body_jacobian + jacobians.append(link_kinematics_jacobian) + + # Note: the joint order in this jacobian is consistent pinocchio + jacobians = np.stack(jacobians, axis=0) + huber_distance.backward() + grad_pos = torch_body_pos.grad.cpu().numpy()[:, None, :] + + # Convert the jacobian from pinocchio order to target order + if self.adaptor is not None: + jacobians = self.adaptor.backward_jacobian(jacobians) + else: + jacobians = jacobians[..., self.idx_pin2target] + + grad_qpos = np.matmul(grad_pos, np.array(jacobians)) + grad_qpos = grad_qpos.mean(1).sum(0) + grad_qpos += 2 * self.norm_delta * (x - last_qpos) + + grad[:] = grad_qpos[:] + + return result + + return objective + + +class DexPilotOptimizer(Optimizer): + """Retargeting optimizer using the method proposed in DexPilot + + This is a broader adaptation of the original optimizer delineated in the DexPilot paper. + While the initial DexPilot study focused solely on the four-fingered Allegro Hand, this version of the optimizer + embraces the same principles for both four-fingered and five-fingered hands. It projects the distance between the + thumb and the other fingers to facilitate more stable grasping. + Reference: https://arxiv.org/abs/1910.03135 + + Args: + robot: + target_joint_names: + finger_tip_link_names: + wrist_link_name: + gamma: + project_dist: + escape_dist: + eta1: + eta2: + scaling: + """ + + retargeting_type = "DEXPILOT" + + def __init__( + self, + robot: RobotWrapper, + target_joint_names: List[str], + finger_tip_link_names: List[str], + wrist_link_name: str, + target_link_human_indices: Optional[np.ndarray] = None, + huber_delta=0.03, + norm_delta=4e-3, + # DexPilot parameters + # gamma=2.5e-3, + project_dist=0.03, + escape_dist=0.05, + eta1=1e-4, + eta2=3e-2, + scaling=1.0, + ): + if len(finger_tip_link_names) < 2 or len(finger_tip_link_names) > 5: + raise ValueError( + f"DexPilot optimizer can only be applied to hands with 2 to 5 fingers, but got " + f"{len(finger_tip_link_names)} fingers." + ) + self.num_fingers = len(finger_tip_link_names) + + origin_link_index, task_link_index = self.generate_link_indices(self.num_fingers) + + if target_link_human_indices is None: + target_link_human_indices = (np.stack([origin_link_index, task_link_index], axis=0) * 4).astype(int) + link_names = [wrist_link_name] + finger_tip_link_names + target_origin_link_names = [link_names[index] for index in origin_link_index] + target_task_link_names = [link_names[index] for index in task_link_index] + + super().__init__(robot, target_joint_names, target_link_human_indices) + self.origin_link_names = target_origin_link_names + self.task_link_names = target_task_link_names + self.scaling = scaling + self.huber_loss = torch.nn.SmoothL1Loss(beta=huber_delta, reduction="none") + self.norm_delta = norm_delta + + # DexPilot parameters + self.project_dist = project_dist + self.escape_dist = escape_dist + self.eta1 = eta1 + self.eta2 = eta2 + + # Computation cache for better performance + # For one link used in multiple vectors, e.g. hand palm, we do not want to compute it multiple times + self.computed_link_names = list(set(target_origin_link_names).union(set(target_task_link_names))) + self.origin_link_indices = torch.tensor( + [self.computed_link_names.index(name) for name in target_origin_link_names] + ) + self.task_link_indices = torch.tensor([self.computed_link_names.index(name) for name in target_task_link_names]) + + # Sanity check and cache link indices + self.computed_link_indices = self.get_link_indices(self.computed_link_names) + + self.opt.set_ftol_abs(1e-6) + + # DexPilot cache + self.projected, self.s2_project_index_origin, self.s2_project_index_task, self.projected_dist = ( + self.set_dexpilot_cache(self.num_fingers, eta1, eta2) + ) + + @staticmethod + def generate_link_indices(num_fingers): + """ + Example: + >>> generate_link_indices(4) + ([2, 3, 4, 3, 4, 4, 0, 0, 0, 0], [1, 1, 1, 2, 2, 3, 1, 2, 3, 4]) + """ + origin_link_index = [] + task_link_index = [] + + # Add indices for connections between fingers + for i in range(1, num_fingers): + for j in range(i + 1, num_fingers + 1): + origin_link_index.append(j) + task_link_index.append(i) + + # Add indices for connections to the base (0) + for i in range(1, num_fingers + 1): + origin_link_index.append(0) + task_link_index.append(i) + + return origin_link_index, task_link_index + + @staticmethod + def set_dexpilot_cache(num_fingers, eta1, eta2): + """ + Example: + >>> set_dexpilot_cache(4, 0.1, 0.2) + (array([False, False, False, False, False, False]), + [1, 2, 2], + [0, 0, 1], + array([0.1, 0.1, 0.1, 0.2, 0.2, 0.2])) + """ + projected = np.zeros(num_fingers * (num_fingers - 1) // 2, dtype=bool) + + s2_project_index_origin = [] + s2_project_index_task = [] + for i in range(0, num_fingers - 2): + for j in range(i + 1, num_fingers - 1): + s2_project_index_origin.append(j) + s2_project_index_task.append(i) + + projected_dist = np.array([eta1] * (num_fingers - 1) + [eta2] * ((num_fingers - 1) * (num_fingers - 2) // 2)) + + return projected, s2_project_index_origin, s2_project_index_task, projected_dist + + def get_objective_function(self, target_vector: np.ndarray, fixed_qpos: np.ndarray, last_qpos: np.ndarray): + qpos = np.zeros(self.num_joints) + qpos[self.idx_pin2fixed] = fixed_qpos + + len_proj = len(self.projected) + len_s2 = len(self.s2_project_index_task) + len_s1 = len_proj - len_s2 + + # Update projection indicator + target_vec_dist = np.linalg.norm(target_vector[:len_proj], axis=1) + self.projected[:len_s1][target_vec_dist[0:len_s1] < self.project_dist] = True + self.projected[:len_s1][target_vec_dist[0:len_s1] > self.escape_dist] = False + self.projected[len_s1:len_proj] = np.logical_and( + self.projected[:len_s1][self.s2_project_index_origin], self.projected[:len_s1][self.s2_project_index_task] + ) + self.projected[len_s1:len_proj] = np.logical_and( + self.projected[len_s1:len_proj], target_vec_dist[len_s1:len_proj] <= 0.03 + ) + + # Update weight vector + normal_weight = np.ones(len_proj, dtype=np.float32) * 1 + high_weight = np.array([200] * len_s1 + [400] * len_s2, dtype=np.float32) + weight = np.where(self.projected, high_weight, normal_weight) + + # We change the weight to 10 instead of 1 here, for vector originate from wrist to fingertips + # This ensures better intuitive mapping due wrong pose detection + weight = torch.from_numpy( + np.concatenate([weight, np.ones(self.num_fingers, dtype=np.float32) * len_proj + self.num_fingers]) + ) + + # Compute reference distance vector + normal_vec = target_vector * self.scaling # (10, 3) + dir_vec = target_vector[:len_proj] / (target_vec_dist[:, None] + 1e-6) # (6, 3) + projected_vec = dir_vec * self.projected_dist[:, None] # (6, 3) + + # Compute final reference vector + reference_vec = np.where(self.projected[:, None], projected_vec, normal_vec[:len_proj]) # (6, 3) + reference_vec = np.concatenate([reference_vec, normal_vec[len_proj:]], axis=0) # (10, 3) + torch_target_vec = torch.as_tensor(reference_vec, dtype=torch.float32) + torch_target_vec.requires_grad_(False) + + def objective(x: np.ndarray, grad: np.ndarray) -> float: + qpos[self.idx_pin2target] = x + + # Kinematics forwarding for qpos + if self.adaptor is not None: + qpos[:] = self.adaptor.forward_qpos(qpos)[:] + + self.robot.compute_forward_kinematics(qpos) + target_link_poses = [self.robot.get_link_pose(index) for index in self.computed_link_indices] + body_pos = np.array([pose[:3, 3] for pose in target_link_poses]) + + # Torch computation for accurate loss and grad + torch_body_pos = torch.as_tensor(body_pos) + torch_body_pos.requires_grad_() + + # Index link for computation + origin_link_pos = torch_body_pos[self.origin_link_indices, :] + task_link_pos = torch_body_pos[self.task_link_indices, :] + robot_vec = task_link_pos - origin_link_pos + + # Loss term for kinematics retargeting based on 3D position error + # Different from the original DexPilot, we use huber loss here instead of the squared dist + vec_dist = torch.norm(robot_vec - torch_target_vec, dim=1, keepdim=False) + huber_distance = ( + self.huber_loss(vec_dist, torch.zeros_like(vec_dist)) * weight / (robot_vec.shape[0]) + ).sum() + huber_distance = huber_distance.sum() + result = huber_distance.cpu().detach().item() + + if grad.size > 0: + jacobians = [] + for i, index in enumerate(self.computed_link_indices): + link_body_jacobian = self.robot.compute_single_link_local_jacobian(qpos, index)[:3, ...] + link_pose = target_link_poses[i] + link_rot = link_pose[:3, :3] + link_kinematics_jacobian = link_rot @ link_body_jacobian + jacobians.append(link_kinematics_jacobian) + + # Note: the joint order in this jacobian is consistent pinocchio + jacobians = np.stack(jacobians, axis=0) + huber_distance.backward() + grad_pos = torch_body_pos.grad.cpu().numpy()[:, None, :] + + # Convert the jacobian from pinocchio order to target order + if self.adaptor is not None: + jacobians = self.adaptor.backward_jacobian(jacobians) + else: + jacobians = jacobians[..., self.idx_pin2target] + + grad_qpos = np.matmul(grad_pos, np.array(jacobians)) + grad_qpos = grad_qpos.mean(1).sum(0) + + # In the original DexPilot, γ = 2.5 × 10−3 is a weight on regularizing the Allegro angles to zero + # which is equivalent to fully opened the hand + # In our implementation, we regularize the joint angles to the previous joint angles + grad_qpos += 2 * self.norm_delta * (x - last_qpos) + + grad[:] = grad_qpos[:] + + return result + + return objective diff --git a/teleop/robot_control/dex_retargeting/optimizer_utils.py b/teleop/robot_control/dex_retargeting/optimizer_utils.py new file mode 100644 index 0000000..2b54275 --- /dev/null +++ b/teleop/robot_control/dex_retargeting/optimizer_utils.py @@ -0,0 +1,17 @@ +class LPFilter: + def __init__(self, alpha): + self.alpha = alpha + self.y = None + self.is_init = False + + def next(self, x): + if not self.is_init: + self.y = x + self.is_init = True + return self.y.copy() + self.y = self.y + self.alpha * (x - self.y) + return self.y.copy() + + def reset(self): + self.y = None + self.is_init = False diff --git a/teleop/robot_control/dex_retargeting/retargeting_config.py b/teleop/robot_control/dex_retargeting/retargeting_config.py new file mode 100644 index 0000000..d2378c0 --- /dev/null +++ b/teleop/robot_control/dex_retargeting/retargeting_config.py @@ -0,0 +1,251 @@ +from dataclasses import dataclass +from pathlib import Path +from typing import List, Optional, Dict, Any, Tuple +from typing import Union + +import numpy as np +import yaml +import os + +from . import yourdfpy as urdf +from .kinematics_adaptor import MimicJointKinematicAdaptor +from .optimizer_utils import LPFilter +from .robot_wrapper import RobotWrapper +from .seq_retarget import SeqRetargeting +from .yourdfpy import DUMMY_JOINT_NAMES + + +@dataclass +class RetargetingConfig: + type: str + urdf_path: str + + # Whether to add free joint to the root of the robot. Free joint enable the robot hand move freely in the space + add_dummy_free_joint: bool = False + + # Source refers to the retargeting input, which usually corresponds to the human hand + # The joint indices of human hand joint which corresponds to each link in the target_link_names + target_link_human_indices: Optional[np.ndarray] = None + + # The link on the robot hand which corresponding to the wrist of human hand + wrist_link_name: Optional[str] = None + + # Position retargeting link names + target_link_names: Optional[List[str]] = None + + # Vector retargeting link names + target_joint_names: Optional[List[str]] = None + target_origin_link_names: Optional[List[str]] = None + target_task_link_names: Optional[List[str]] = None + + # DexPilot retargeting link names + finger_tip_link_names: Optional[List[str]] = None + + # Scaling factor for vector retargeting only + # For example, Allegro is 1.6 times larger than normal human hand, then this scaling factor should be 1.6 + scaling_factor: float = 1.0 + + # Optimization parameters + normal_delta: float = 4e-3 + huber_delta: float = 2e-2 + + # DexPilot optimizer parameters + project_dist: float = 0.03 + escape_dist: float = 0.05 + + # Joint limit tag + has_joint_limits: bool = True + + # Mimic joint tag + ignore_mimic_joint: bool = False + + # Low pass filter + low_pass_alpha: float = 0.1 + + _TYPE = ["vector", "position", "dexpilot"] + _DEFAULT_URDF_DIR = "./" + + def __post_init__(self): + # Retargeting type check + self.type = self.type.lower() + if self.type not in self._TYPE: + raise ValueError(f"Retargeting type must be one of {self._TYPE}") + + # Vector retargeting requires: target_origin_link_names + target_task_link_names + # Position retargeting requires: target_link_names + if self.type == "vector": + if self.target_origin_link_names is None or self.target_task_link_names is None: + raise ValueError(f"Vector retargeting requires: target_origin_link_names + target_task_link_names") + if len(self.target_task_link_names) != len(self.target_origin_link_names): + raise ValueError(f"Vector retargeting origin and task links dim mismatch") + if self.target_link_human_indices.shape != (2, len(self.target_origin_link_names)): + raise ValueError(f"Vector retargeting link names and link indices dim mismatch") + if self.target_link_human_indices is None: + raise ValueError(f"Vector retargeting requires: target_link_human_indices") + + elif self.type == "position": + if self.target_link_names is None: + raise ValueError(f"Position retargeting requires: target_link_names") + self.target_link_human_indices = self.target_link_human_indices.squeeze() + if self.target_link_human_indices.shape != (len(self.target_link_names),): + raise ValueError(f"Position retargeting link names and link indices dim mismatch") + if self.target_link_human_indices is None: + raise ValueError(f"Position retargeting requires: target_link_human_indices") + + elif self.type == "dexpilot": + if self.finger_tip_link_names is None or self.wrist_link_name is None: + raise ValueError(f"Position retargeting requires: finger_tip_link_names + wrist_link_name") + if self.target_link_human_indices is not None: + print( + "\033[33m", + "Target link human indices is provided in the DexPilot retargeting config, which is uncommon.\n" + "If you do not know exactly how it is used, please leave it to None for default.\n" + "\033[00m", + ) + + # URDF path check + urdf_path = Path(self.urdf_path) + if not urdf_path.is_absolute(): + urdf_path = self._DEFAULT_URDF_DIR / urdf_path + urdf_path = urdf_path.absolute() + if not urdf_path.exists(): + raise ValueError(f"URDF path {urdf_path} does not exist") + self.urdf_path = str(urdf_path) + + @classmethod + def set_default_urdf_dir(cls, urdf_dir: Union[str, Path]): + path = Path(urdf_dir) + if not path.exists(): + raise ValueError(f"URDF dir {urdf_dir} not exists.") + cls._DEFAULT_URDF_DIR = urdf_dir + + @classmethod + def load_from_file(cls, config_path: Union[str, Path], override: Optional[Dict] = None): + path = Path(config_path) + if not path.is_absolute(): + path = path.absolute() + + with path.open("r") as f: + yaml_config = yaml.load(f, Loader=yaml.FullLoader) + cfg = yaml_config["retargeting"] + return cls.from_dict(cfg, override) + + @classmethod + def from_dict(cls, cfg: Dict[str, Any], override: Optional[Dict] = None): + if "target_link_human_indices" in cfg: + cfg["target_link_human_indices"] = np.array(cfg["target_link_human_indices"]) + if override is not None: + for key, value in override.items(): + cfg[key] = value + config = RetargetingConfig(**cfg) + return config + + def build(self) -> SeqRetargeting: + from .optimizer import ( + VectorOptimizer, + PositionOptimizer, + DexPilotOptimizer, + ) + import tempfile + + # Process the URDF with yourdfpy to better find file path + robot_urdf = urdf.URDF.load( + self.urdf_path, add_dummy_free_joints=self.add_dummy_free_joint, build_scene_graph=False + ) + urdf_name = self.urdf_path.split(os.path.sep)[-1] + temp_dir = tempfile.mkdtemp(prefix="dex_retargeting-") + temp_path = f"{temp_dir}/{urdf_name}" + robot_urdf.write_xml_file(temp_path) + + # Load pinocchio model + robot = RobotWrapper(temp_path) + + # Add 6D dummy joint to target joint names so that it will also be optimized + if self.add_dummy_free_joint and self.target_joint_names is not None: + self.target_joint_names = DUMMY_JOINT_NAMES + self.target_joint_names + joint_names = self.target_joint_names if self.target_joint_names is not None else robot.dof_joint_names + + if self.type == "position": + optimizer = PositionOptimizer( + robot, + joint_names, + target_link_names=self.target_link_names, + target_link_human_indices=self.target_link_human_indices, + norm_delta=self.normal_delta, + huber_delta=self.huber_delta, + ) + elif self.type == "vector": + optimizer = VectorOptimizer( + robot, + joint_names, + target_origin_link_names=self.target_origin_link_names, + target_task_link_names=self.target_task_link_names, + target_link_human_indices=self.target_link_human_indices, + scaling=self.scaling_factor, + norm_delta=self.normal_delta, + huber_delta=self.huber_delta, + ) + elif self.type == "dexpilot": + optimizer = DexPilotOptimizer( + robot, + joint_names, + finger_tip_link_names=self.finger_tip_link_names, + wrist_link_name=self.wrist_link_name, + target_link_human_indices=self.target_link_human_indices, + scaling=self.scaling_factor, + project_dist=self.project_dist, + escape_dist=self.escape_dist, + ) + else: + raise RuntimeError() + + if 0 <= self.low_pass_alpha <= 1: + lp_filter = LPFilter(self.low_pass_alpha) + else: + lp_filter = None + + # Parse mimic joints and set kinematics adaptor for optimizer + has_mimic_joints, source_names, mimic_names, multipliers, offsets = parse_mimic_joint(robot_urdf) + if has_mimic_joints and not self.ignore_mimic_joint: + adaptor = MimicJointKinematicAdaptor( + robot, + target_joint_names=joint_names, + source_joint_names=source_names, + mimic_joint_names=mimic_names, + multipliers=multipliers, + offsets=offsets, + ) + optimizer.set_kinematic_adaptor(adaptor) + print( + "\033[34m", + "Mimic joint adaptor enabled. The mimic joint tags in the URDF will be considered during retargeting.\n" + "To disable mimic joint adaptor, consider setting ignore_mimic_joint=True in the configuration.", + "\033[39m", + ) + + retargeting = SeqRetargeting( + optimizer, + has_joint_limits=self.has_joint_limits, + lp_filter=lp_filter, + ) + return retargeting + + +def get_retargeting_config(config_path: Union[str, Path]) -> RetargetingConfig: + config = RetargetingConfig.load_from_file(config_path) + return config + + +def parse_mimic_joint(robot_urdf: urdf.URDF) -> Tuple[bool, List[str], List[str], List[float], List[float]]: + mimic_joint_names = [] + source_joint_names = [] + multipliers = [] + offsets = [] + for name, joint in robot_urdf.joint_map.items(): + if joint.mimic is not None: + mimic_joint_names.append(name) + source_joint_names.append(joint.mimic.joint) + multipliers.append(joint.mimic.multiplier) + offsets.append(joint.mimic.offset) + + return len(mimic_joint_names) > 0, source_joint_names, mimic_joint_names, multipliers, offsets diff --git a/teleop/robot_control/dex_retargeting/robot_wrapper.py b/teleop/robot_control/dex_retargeting/robot_wrapper.py new file mode 100644 index 0000000..335fb43 --- /dev/null +++ b/teleop/robot_control/dex_retargeting/robot_wrapper.py @@ -0,0 +1,93 @@ +from typing import List + +import numpy as np +import numpy.typing as npt +import pinocchio as pin + + +class RobotWrapper: + """ + This class does not take mimic joint into consideration + """ + + def __init__(self, urdf_path: str, use_collision=False, use_visual=False): + # Create robot model and data + self.model: pin.Model = pin.buildModelFromUrdf(urdf_path) + self.data: pin.Data = self.model.createData() + + if use_visual or use_collision: + raise NotImplementedError + + self.q0 = pin.neutral(self.model) + if self.model.nv != self.model.nq: + raise NotImplementedError(f"Can not handle robot with special joint.") + + # -------------------------------------------------------------------------- # + # Robot property + # -------------------------------------------------------------------------- # + @property + def joint_names(self) -> List[str]: + return list(self.model.names) + + @property + def dof_joint_names(self) -> List[str]: + nqs = self.model.nqs + return [name for i, name in enumerate(self.model.names) if nqs[i] > 0] + + @property + def dof(self) -> int: + return self.model.nq + + @property + def link_names(self) -> List[str]: + link_names = [] + for i, frame in enumerate(self.model.frames): + link_names.append(frame.name) + return link_names + + @property + def joint_limits(self): + lower = self.model.lowerPositionLimit + upper = self.model.upperPositionLimit + return np.stack([lower, upper], axis=1) + + # -------------------------------------------------------------------------- # + # Query function + # -------------------------------------------------------------------------- # + def get_joint_index(self, name: str): + return self.dof_joint_names.index(name) + + def get_link_index(self, name: str): + if name not in self.link_names: + raise ValueError(f"{name} is not a link name. Valid link names: \n{self.link_names}") + return self.model.getFrameId(name, pin.BODY) + + def get_joint_parent_child_frames(self, joint_name: str): + joint_id = self.model.getFrameId(joint_name) + parent_id = self.model.frames[joint_id].parent + child_id = -1 + for idx, frame in enumerate(self.model.frames): + if frame.previousFrame == joint_id: + child_id = idx + if child_id == -1: + raise ValueError(f"Can not find child link of {joint_name}") + + return parent_id, child_id + + # -------------------------------------------------------------------------- # + # Kinematics function + # -------------------------------------------------------------------------- # + def compute_forward_kinematics(self, qpos: npt.NDArray): + pin.forwardKinematics(self.model, self.data, qpos) + + def get_link_pose(self, link_id: int) -> npt.NDArray: + pose: pin.SE3 = pin.updateFramePlacement(self.model, self.data, link_id) + return pose.homogeneous + + def get_link_pose_inv(self, link_id: int) -> npt.NDArray: + pose: pin.SE3 = pin.updateFramePlacement(self.model, self.data, link_id) + return pose.inverse().homogeneous + + def compute_single_link_local_jacobian(self, qpos, link_id: int) -> npt.NDArray: + J = pin.computeFrameJacobian(self.model, self.data, qpos, link_id) + return J diff --git a/teleop/robot_control/dex_retargeting/seq_retarget.py b/teleop/robot_control/dex_retargeting/seq_retarget.py new file mode 100644 index 0000000..a24529c --- /dev/null +++ b/teleop/robot_control/dex_retargeting/seq_retarget.py @@ -0,0 +1,151 @@ +import time +from typing import Optional + +import numpy as np +from pytransform3d import rotations + +from .constants import OPERATOR2MANO, HandType +from .optimizer import Optimizer +from .optimizer_utils import LPFilter + + +class SeqRetargeting: + def __init__( + self, + optimizer: Optimizer, + has_joint_limits=True, + lp_filter: Optional[LPFilter] = None, + ): + self.optimizer = optimizer + robot = self.optimizer.robot + + # Joint limit + self.has_joint_limits = has_joint_limits + joint_limits = np.ones_like(robot.joint_limits) + joint_limits[:, 0] = -1e4 # a large value is equivalent to no limit + joint_limits[:, 1] = 1e4 + if has_joint_limits: + joint_limits[:] = robot.joint_limits[:] + self.optimizer.set_joint_limit(joint_limits[self.optimizer.idx_pin2target]) + self.joint_limits = joint_limits[self.optimizer.idx_pin2target] + + # Temporal information + self.last_qpos = joint_limits.mean(1)[self.optimizer.idx_pin2target].astype(np.float32) + self.accumulated_time = 0 + self.num_retargeting = 0 + + # Filter + self.filter = lp_filter + + # Warm started + self.is_warm_started = False + + def warm_start( + self, + wrist_pos: np.ndarray, + wrist_quat: np.ndarray, + hand_type: HandType = HandType.right, + is_mano_convention: bool = False, + ): + """ + Initialize the wrist joint pose using analytical computation instead of retargeting optimization. + This function is specifically for position retargeting with the flying robot hand, i.e. has 6D free joint + You are not expected to use this function for vector retargeting, e.g. when you are working on teleoperation + + Args: + wrist_pos: position of the hand wrist, typically from human hand pose + wrist_quat: quaternion of the hand wrist, the same convention as the operator frame definition if not is_mano_convention + hand_type: hand type, used to determine the operator2mano matrix + is_mano_convention: whether the wrist_quat is in mano convention + """ + # This function can only be used when the first joints of robot are free joints + + if len(wrist_pos) != 3: + raise ValueError(f"Wrist pos: {wrist_pos} is not a 3-dim vector.") + if len(wrist_quat) != 4: + raise ValueError(f"Wrist quat: {wrist_quat} is not a 4-dim vector.") + + operator2mano = OPERATOR2MANO[hand_type] if is_mano_convention else np.eye(3) + robot = self.optimizer.robot + target_wrist_pose = np.eye(4) + target_wrist_pose[:3, :3] = rotations.matrix_from_quaternion(wrist_quat) @ operator2mano.T + target_wrist_pose[:3, 3] = wrist_pos + + name_list = [ + "dummy_x_translation_joint", + "dummy_y_translation_joint", + "dummy_z_translation_joint", + "dummy_x_rotation_joint", + "dummy_y_rotation_joint", + "dummy_z_rotation_joint", + ] + wrist_link_id = robot.get_joint_parent_child_frames(name_list[5])[1] + + # Set the dummy joints angles to zero + old_qpos = robot.q0 + new_qpos = old_qpos.copy() + for num, joint_name in enumerate(self.optimizer.target_joint_names): + if joint_name in name_list: + new_qpos[num] = 0 + + robot.compute_forward_kinematics(new_qpos) + root2wrist = robot.get_link_pose_inv(wrist_link_id) + target_root_pose = target_wrist_pose @ root2wrist + + euler = rotations.euler_from_matrix(target_root_pose[:3, :3], 0, 1, 2, extrinsic=False) + pose_vec = np.concatenate([target_root_pose[:3, 3], euler]) + + # Find the dummy joints + for num, joint_name in enumerate(self.optimizer.target_joint_names): + if joint_name in name_list: + index = name_list.index(joint_name) + self.last_qpos[num] = pose_vec[index] + + self.is_warm_started = True + + def retarget(self, ref_value, fixed_qpos=np.array([])): + tic = time.perf_counter() + + qpos = self.optimizer.retarget( + ref_value=ref_value.astype(np.float32), + fixed_qpos=fixed_qpos.astype(np.float32), + last_qpos=np.clip(self.last_qpos, self.joint_limits[:, 0], self.joint_limits[:, 1]), + ) + self.accumulated_time += time.perf_counter() - tic + self.num_retargeting += 1 + self.last_qpos = qpos + robot_qpos = np.zeros(self.optimizer.robot.dof) + robot_qpos[self.optimizer.idx_pin2fixed] = fixed_qpos + robot_qpos[self.optimizer.idx_pin2target] = qpos + + if self.optimizer.adaptor is not None: + robot_qpos = self.optimizer.adaptor.forward_qpos(robot_qpos) + + if self.filter is not None: + robot_qpos = self.filter.next(robot_qpos) + return robot_qpos + + def set_qpos(self, robot_qpos: np.ndarray): + target_qpos = robot_qpos[self.optimizer.idx_pin2target] + self.last_qpos = target_qpos + + def get_qpos(self, fixed_qpos: Optional[np.ndarray] = None): + robot_qpos = np.zeros(self.optimizer.robot.dof) + robot_qpos[self.optimizer.idx_pin2target] = self.last_qpos + if fixed_qpos is not None: + robot_qpos[self.optimizer.idx_pin2fixed] = fixed_qpos + return robot_qpos + + def verbose(self): + min_value = self.optimizer.opt.last_optimum_value() + print(f"Retargeting {self.num_retargeting} times takes: {self.accumulated_time}s") + print(f"Last distance: {min_value}") + + def reset(self): + self.last_qpos = self.joint_limits.mean(1).astype(np.float32) + self.num_retargeting = 0 + self.accumulated_time = 0 + + @property + def joint_names(self): + return self.optimizer.robot.dof_joint_names diff --git a/teleop/robot_control/dex_retargeting/yourdfpy.py b/teleop/robot_control/dex_retargeting/yourdfpy.py new file mode 100644 index 0000000..ce8e287 --- /dev/null +++ b/teleop/robot_control/dex_retargeting/yourdfpy.py @@ -0,0 +1,2237 @@ +# Code from yourdfpy with small modification for deprecated warning +# Source: https://github.com/clemense/yourdfpy/blob/main/src/yourdfpy/urdf.py + +import copy +import logging +import os +from dataclasses import dataclass, field, is_dataclass +from functools import partial +from typing import Dict, List, Optional, Union + +import anytree +import numpy as np +import six +import trimesh +import trimesh.transformations as tra +from anytree import Node, LevelOrderIter +from lxml import etree + +_logger = logging.getLogger(__name__) + + +def _array_eq(arr1, arr2): + if arr1 is None and arr2 is None: + return True + return ( + isinstance(arr1, np.ndarray) + and isinstance(arr2, np.ndarray) + and arr1.shape == arr2.shape + and (arr1 == arr2).all() + ) + + +@dataclass(eq=False) +class TransmissionJoint: + name: str + hardware_interfaces: List[str] = field(default_factory=list) + + def __eq__(self, other): + if not isinstance(other, TransmissionJoint): + return NotImplemented + return ( + self.name == other.name + and all(self_hi in other.hardware_interfaces for self_hi in self.hardware_interfaces) + and all(other_hi in self.hardware_interfaces for other_hi in other.hardware_interfaces) + ) + + +@dataclass(eq=False) +class Actuator: + name: str + mechanical_reduction: Optional[float] = None + # The follwing is only valid for ROS Indigo and prior versions + hardware_interfaces: List[str] = field(default_factory=list) + + def __eq__(self, other): + if not isinstance(other, Actuator): + return NotImplemented + return ( + self.name == other.name + and self.mechanical_reduction == other.mechanical_reduction + and all(self_hi in other.hardware_interfaces for self_hi in self.hardware_interfaces) + and all(other_hi in self.hardware_interfaces for other_hi in other.hardware_interfaces) + ) + + +@dataclass(eq=False) +class Transmission: + name: str + type: Optional[str] = None + joints: List[TransmissionJoint] = field(default_factory=list) + actuators: List[Actuator] = field(default_factory=list) + + def __eq__(self, other): + if not isinstance(other, Transmission): + return NotImplemented + return ( + self.name == other.name + and self.type == other.type + and all(self_joint in other.joints for self_joint in self.joints) + and all(other_joint in self.joints for other_joint in other.joints) + and all(self_actuator in other.actuators for self_actuator in self.actuators) + and all(other_actuator in self.actuators for other_actuator in other.actuators) + ) + + +@dataclass +class Calibration: + rising: Optional[float] = None + falling: Optional[float] = None + + +@dataclass +class Mimic: + joint: str + multiplier: Optional[float] = None + offset: Optional[float] = None + + +@dataclass +class SafetyController: + soft_lower_limit: Optional[float] = None + soft_upper_limit: Optional[float] = None + k_position: Optional[float] = None + k_velocity: Optional[float] = None + + +@dataclass +class Sphere: + radius: float + + +@dataclass +class Cylinder: + radius: float + length: float + + +@dataclass(eq=False) +class Box: + size: np.ndarray + + def __eq__(self, other): + if not isinstance(other, Box): + return NotImplemented + return _array_eq(self.size, other.size) + + +@dataclass(eq=False) +class Mesh: + filename: str + scale: Optional[Union[float, np.ndarray]] = None + + def __eq__(self, other): + if not isinstance(other, Mesh): + return NotImplemented + + if self.filename != other.filename: + return False + + if isinstance(self.scale, float) and isinstance(other.scale, float): + return self.scale == other.scale + + return _array_eq(self.scale, other.scale) + + +@dataclass +class Geometry: + box: Optional[Box] = None + cylinder: Optional[Cylinder] = None + sphere: Optional[Sphere] = None + mesh: Optional[Mesh] = None + + +@dataclass(eq=False) +class Color: + rgba: np.ndarray + + def __eq__(self, other): + if not isinstance(other, Color): + return NotImplemented + return _array_eq(self.rgba, other.rgba) + + +@dataclass +class Texture: + filename: str + + +@dataclass +class Material: + name: Optional[str] = None + color: Optional[Color] = None + texture: Optional[Texture] = None + + +@dataclass(eq=False) +class Visual: + name: Optional[str] = None + origin: Optional[np.ndarray] = None + geometry: Optional[Geometry] = None # That's not really optional according to ROS + material: Optional[Material] = None + + def __eq__(self, other): + if not isinstance(other, Visual): + return NotImplemented + return ( + self.name == other.name + and _array_eq(self.origin, other.origin) + and self.geometry == other.geometry + and self.material == other.material + ) + + +@dataclass(eq=False) +class Collision: + name: str + origin: Optional[np.ndarray] = None + geometry: Geometry = None + + def __eq__(self, other): + if not isinstance(other, Collision): + return NotImplemented + return self.name == other.name and _array_eq(self.origin, other.origin) and self.geometry == other.geometry + + +@dataclass(eq=False) +class Inertial: + origin: Optional[np.ndarray] = None + mass: Optional[float] = None + inertia: Optional[np.ndarray] = None + + def __eq__(self, other): + if not isinstance(other, Inertial): + return NotImplemented + return ( + _array_eq(self.origin, other.origin) and self.mass == other.mass and _array_eq(self.inertia, other.inertia) + ) + + +@dataclass(eq=False) +class Link: + name: str + inertial: Optional[Inertial] = None + visuals: List[Visual] = field(default_factory=list) + collisions: List[Collision] = field(default_factory=list) + + def __eq__(self, other): + if not isinstance(other, Link): + return NotImplemented + return ( + self.name == other.name + and self.inertial == other.inertial + and all(self_visual in other.visuals for self_visual in self.visuals) + and all(other_visual in self.visuals for other_visual in other.visuals) + and all(self_collision in other.collisions for self_collision in self.collisions) + and all(other_collision in self.collisions for other_collision in other.collisions) + ) + + +@dataclass +class Dynamics: + damping: Optional[float] = None + friction: Optional[float] = None + + +@dataclass +class Limit: + effort: Optional[float] = None + velocity: Optional[float] = None + lower: Optional[float] = None + upper: Optional[float] = None + + +@dataclass(eq=False) +class Joint: + name: str + type: str = None + parent: str = None + child: str = None + origin: np.ndarray = None + axis: np.ndarray = None + dynamics: Optional[Dynamics] = None + limit: Optional[Limit] = None + mimic: Optional[Mimic] = None + calibration: Optional[Calibration] = None + safety_controller: Optional[SafetyController] = None + + def __eq__(self, other): + if not isinstance(other, Joint): + return NotImplemented + return ( + self.name == other.name + and self.type == other.type + and self.parent == other.parent + and self.child == other.child + and _array_eq(self.origin, other.origin) + and _array_eq(self.axis, other.axis) + and self.dynamics == other.dynamics + and self.limit == other.limit + and self.mimic == other.mimic + and self.calibration == other.calibration + and self.safety_controller == other.safety_controller + ) + + +@dataclass(eq=False) +class Robot: + name: str + links: List[Link] = field(default_factory=list) + joints: List[Joint] = field(default_factory=list) + materials: List[Material] = field(default_factory=list) + transmission: List[str] = field(default_factory=list) + gazebo: List[str] = field(default_factory=list) + + def __eq__(self, other): + if not isinstance(other, Robot): + return NotImplemented + return ( + self.name == other.name + and all(self_link in other.links for self_link in self.links) + and all(other_link in self.links for other_link in other.links) + and all(self_joint in other.joints for self_joint in self.joints) + and all(other_joint in self.joints for other_joint in other.joints) + and all(self_material in other.materials for self_material in self.materials) + and all(other_material in self.materials for other_material in other.materials) + and all(self_transmission in other.transmission for self_transmission in self.transmission) + and all(other_transmission in self.transmission for other_transmission in other.transmission) + and all(self_gazebo in other.gazebo for self_gazebo in self.gazebo) + and all(other_gazebo in self.gazebo for other_gazebo in other.gazebo) + ) + + +class URDFError(Exception): + """General URDF exception.""" + + def __init__(self, msg): + super(URDFError, self).__init__() + self.msg = msg + + def __str__(self): + return type(self).__name__ + ": " + self.msg + + def __repr__(self): + return type(self).__name__ + '("' + self.msg + '")' + + +class URDFIncompleteError(URDFError): + """Raised when needed data for an object isn't there.""" + + pass + + +class URDFAttributeValueError(URDFError): + """Raised when attribute value is not contained in the set of allowed values.""" + + pass + + +class URDFBrokenRefError(URDFError): + """Raised when a referenced object is not found in the scope.""" + + pass + + +class URDFMalformedError(URDFError): + """Raised when data is found to be corrupted in some way.""" + + pass + + +class URDFUnsupportedError(URDFError): + """Raised when some unexpectedly unsupported feature is found.""" + + pass + + +class URDFSaveValidationError(URDFError): + """Raised when XML validation fails when saving.""" + + pass + + +def _str2float(s): + """Cast string to float if it is not None. Otherwise return None. + + Args: + s (str): String to convert or None. + + Returns: + str or NoneType: The converted string or None. + """ + return float(s) if s is not None else None + + +def apply_visual_color( + geom: trimesh.Trimesh, + visual: Visual, + material_map: Dict[str, Material], +) -> None: + """Apply the color of the visual material to the mesh. + + Args: + geom: Trimesh to color. + visual: Visual description from XML. + material_map: Dictionary mapping material names to their definitions. + """ + if visual.material is None: + return + + if visual.material.color is not None: + color = visual.material.color + elif visual.material.name is not None and visual.material.name in material_map: + color = material_map[visual.material.name].color + else: + return + + if color is None: + return + if isinstance(geom.visual, trimesh.visual.ColorVisuals): + geom.visual.face_colors[:] = [int(255 * channel) for channel in color.rgba] + + +def filename_handler_null(fname): + """A lazy filename handler that simply returns its input. + + Args: + fname (str): A file name. + + Returns: + str: Same file name. + """ + return fname + + +def filename_handler_ignore_directive(fname): + """A filename handler that removes anything before (and including) '://'. + + Args: + fname (str): A file name. + + Returns: + str: The file name without the prefix. + """ + if "://" in fname or ":\\\\" in fname: + return ":".join(fname.split(":")[1:])[2:] + return fname + + +def filename_handler_ignore_directive_package(fname): + """A filename handler that removes the 'package://' directive and the package it refers to. + It subsequently calls filename_handler_ignore_directive, i.e., it removes any other directive. + + Args: + fname (str): A file name. + + Returns: + str: The file name without 'package://' and the package name. + """ + if fname.startswith("package://"): + string_length = len("package://") + return os.path.join(*os.path.normpath(fname[string_length:]).split(os.path.sep)[1:]) + return filename_handler_ignore_directive(fname) + + +def filename_handler_add_prefix(fname, prefix): + """A filename handler that adds a prefix. + + Args: + fname (str): A file name. + prefix (str): A prefix. + + Returns: + str: Prefix plus file name. + """ + return prefix + fname + + +def filename_handler_absolute2relative(fname, dir): + """A filename handler that turns an absolute file name into a relative one. + + Args: + fname (str): A file name. + dir (str): A directory. + + Returns: + str: The file name relative to the directory. + """ + # TODO: that's not right + if fname.startswith(dir): + return fname[len(dir) :] + return fname + + +def filename_handler_relative(fname, dir): + """A filename handler that joins a file name with a directory. + + Args: + fname (str): A file name. + dir (str): A directory. + + Returns: + str: The directory joined with the file name. + """ + return os.path.join(dir, filename_handler_ignore_directive_package(fname)) + + +def filename_handler_relative_to_urdf_file(fname, urdf_fname): + return filename_handler_relative(fname, os.path.dirname(urdf_fname)) + + +def filename_handler_relative_to_urdf_file_recursive(fname, urdf_fname, level=0): + if level == 0: + return filename_handler_relative_to_urdf_file(fname, urdf_fname) + return filename_handler_relative_to_urdf_file_recursive(fname, os.path.split(urdf_fname)[0], level=level - 1) + + +def _create_filename_handlers_to_urdf_file_recursive(urdf_fname): + return [ + partial( + filename_handler_relative_to_urdf_file_recursive, + urdf_fname=urdf_fname, + level=i, + ) + for i in range(len(os.path.normpath(urdf_fname).split(os.path.sep))) + ] + + +def filename_handler_meta(fname, filename_handlers): + """A filename handler that calls other filename handlers until the resulting file name points to an existing file. + + Args: + fname (str): A file name. + filename_handlers (list(fn)): A list of function pointers to filename handlers. + + Returns: + str: The resolved file name that points to an existing file or the input if none of the files exists. + """ + for fn in filename_handlers: + candidate_fname = fn(fname=fname) + _logger.debug(f"Checking filename: {candidate_fname}") + if os.path.isfile(candidate_fname): + return candidate_fname + _logger.warning(f"Unable to resolve filename: {fname}") + return fname + + +def filename_handler_magic(fname, dir): + """A magic filename handler. + + Args: + fname (str): A file name. + dir (str): A directory. + + Returns: + str: The file name that exists or the input if nothing is found. + """ + return filename_handler_meta( + fname=fname, + filename_handlers=[ + partial(filename_handler_relative, dir=dir), + filename_handler_ignore_directive, + ] + + _create_filename_handlers_to_urdf_file_recursive(urdf_fname=dir), + ) + + +def validation_handler_strict(errors): + """A validation handler that does not allow any errors. + + Args: + errors (list[yourdfpy.URDFError]): List of errors. + + Returns: + bool: Whether any errors were found. + """ + return len(errors) == 0 + + +class URDF: + def __init__( + self, + robot: Robot = None, + build_scene_graph: bool = True, + build_collision_scene_graph: bool = False, + load_meshes: bool = True, + load_collision_meshes: bool = False, + filename_handler=None, + mesh_dir: str = "", + force_mesh: bool = False, + force_collision_mesh: bool = True, + build_tree: bool = False, + ): + """A URDF model. + + Args: + robot (Robot): The robot model. Defaults to None. + build_scene_graph (bool, optional): Wheter to build a scene graph to enable transformation queries and forward kinematics. Defaults to True. + build_collision_scene_graph (bool, optional): Wheter to build a scene graph for elements. Defaults to False. + load_meshes (bool, optional): Whether to load the meshes referenced in the elements. Defaults to True. + load_collision_meshes (bool, optional): Whether to load the collision meshes referenced in the elements. Defaults to False. + filename_handler ([type], optional): Any function f(in: str) -> str, that maps filenames in the URDF to actual resources. Can be used to customize treatment of `package://` directives or relative/absolute filenames. Defaults to None. + mesh_dir (str, optional): A root directory used for loading meshes. Defaults to "". + force_mesh (bool, optional): Each loaded geometry will be concatenated into a single one (instead of being turned into a graph; in case the underlying file contains multiple geometries). This might loose texture information but the resulting scene graph will be smaller. Defaults to False. + force_collision_mesh (bool, optional): Same as force_mesh, but for collision scene. Defaults to True. + build_tree (bool, optional): Build the tree structure for global kinematics computation + """ + if filename_handler is None: + self._filename_handler = partial(filename_handler_magic, dir=mesh_dir) + else: + self._filename_handler = filename_handler + + self.robot = robot + self._create_maps() + self._update_actuated_joints() + + self._cfg = self.zero_cfg + + if build_scene_graph or build_collision_scene_graph: + self._base_link = self._determine_base_link() + else: + self._base_link = None + + self._errors = [] + + if build_scene_graph: + self._scene = self._create_scene( + use_collision_geometry=False, + load_geometry=load_meshes, + force_mesh=force_mesh, + force_single_geometry_per_link=force_mesh, + ) + else: + self._scene = None + + if build_collision_scene_graph: + self._scene_collision = self._create_scene( + use_collision_geometry=True, + load_geometry=load_collision_meshes, + force_mesh=force_collision_mesh, + force_single_geometry_per_link=force_collision_mesh, + ) + else: + self._scene_collision = None + + if build_tree: + self.tree_root = self.build_tree() + else: + self.tree_root = None + + @property + def scene(self) -> trimesh.Scene: + """A scene object representing the URDF model. + + Returns: + trimesh.Scene: A trimesh scene object. + """ + return self._scene + + @property + def collision_scene(self) -> trimesh.Scene: + """A scene object representing the elements of the URDF model + + Returns: + trimesh.Scene: A trimesh scene object. + """ + return self._scene_collision + + @property + def link_map(self) -> dict: + """A dictionary mapping link names to link objects. + + Returns: + dict: Mapping from link name (str) to Link. + """ + return self._link_map + + @property + def joint_map(self) -> dict: + """A dictionary mapping joint names to joint objects. + + Returns: + dict: Mapping from joint name (str) to Joint. + """ + return self._joint_map + + @property + def joint_names(self): + """List of joint names. + + Returns: + list[str]: List of joint names of the URDF model. + """ + return [j.name for j in self.robot.joints] + + @property + def actuated_joints(self): + """List of actuated joints. This excludes mimic and fixed joints. + + Returns: + list[Joint]: List of actuated joints of the URDF model. + """ + return self._actuated_joints + + @property + def actuated_dof_indices(self): + """List of DOF indices per actuated joint. Can be used to reference configuration. + + Returns: + list[list[int]]: List of DOF indices per actuated joint. + """ + return self._actuated_dof_indices + + @property + def actuated_joint_indices(self): + """List of indices of all joints that are actuated, i.e., not of type mimic or fixed. + + Returns: + list[int]: List of indices of actuated joints. + """ + return self._actuated_joint_indices + + @property + def actuated_joint_names(self): + """List of names of actuated joints. This excludes mimic and fixed joints. + + Returns: + list[str]: List of names of actuated joints of the URDF model. + """ + return [j.name for j in self._actuated_joints] + + @property + def num_actuated_joints(self): + """Number of actuated joints. + + Returns: + int: Number of actuated joints. + """ + return len(self.actuated_joints) + + @property + def num_dofs(self): + """Number of degrees of freedom of actuated joints. Depending on the type of the joint, the number of DOFs might vary. + + Returns: + int: Degrees of freedom. + """ + total_num_dofs = 0 + for j in self._actuated_joints: + if j.type in ["revolute", "prismatic", "continuous"]: + total_num_dofs += 1 + elif j.type == "floating": + total_num_dofs += 6 + elif j.type == "planar": + total_num_dofs += 2 + return total_num_dofs + + @property + def zero_cfg(self): + """Return the zero configuration. + + Returns: + np.ndarray: The zero configuration. + """ + return np.zeros(self.num_dofs) + + @property + def center_cfg(self): + """Return center configuration of URDF model by using the average of each joint's limits if present, otherwise zero. + + Returns: + (n), float: Default configuration of URDF model. + """ + config = [] + config_names = [] + for j in self._actuated_joints: + if j.type == "revolute" or j.type == "prismatic": + if j.limit is not None: + cfg = [j.limit.lower + 0.5 * (j.limit.upper - j.limit.lower)] + else: + cfg = [0.0] + elif j.type == "continuous": + cfg = [0.0] + elif j.type == "floating": + cfg = [0.0] * 6 + elif j.type == "planar": + cfg = [0.0] * 2 + + config.append(cfg) + config_names.append(j.name) + + for i, j in enumerate(self.robot.joints): + if j.mimic is not None: + index = config_names.index(j.mimic.joint) + config[i][0] = config[index][0] * j.mimic.multiplier + j.mimic.offset + + if len(config) == 0: + return np.array([], dtype=np.float64) + return np.concatenate(config) + + @property + def cfg(self): + """Current configuration. + + Returns: + np.ndarray: Current configuration of URDF model. + """ + return self._cfg + + @property + def base_link(self): + """Name of URDF base/root link. + + Returns: + str: Name of base link of URDF model. + """ + return self._base_link + + @property + def errors(self) -> list: + """A list with validation errors. + + Returns: + list: A list of validation errors. + """ + return self._errors + + def clear_errors(self): + """Clear the validation error log.""" + self._errors = [] + + def show(self, collision_geometry=False, callback=None): + """Open a simpler viewer displaying the URDF model. + + Args: + collision_geometry (bool, optional): Whether to display the or elements. Defaults to False. + """ + if collision_geometry: + if self._scene_collision is None: + raise ValueError( + "No collision scene available. Use build_collision_scene_graph=True and load_collision_meshes=True during loading." + ) + else: + self._scene_collision.show(callback=callback) + else: + if self._scene is None: + raise ValueError("No scene available. Use build_scene_graph=True and load_meshes=True during loading.") + elif len(self._scene.bounds_corners) < 1: + raise ValueError( + "Scene is empty, maybe meshes failed to load? Use build_scene_graph=True and load_meshes=True during loading." + ) + else: + self._scene.show(callback=callback) + + def validate(self, validation_fn=None) -> bool: + """Validate URDF model. + + Args: + validation_fn (function, optional): A function f(list[yourdfpy.URDFError]) -> bool. None uses the strict handler (any error leads to False). Defaults to None. + + Returns: + bool: Whether the model is valid. + """ + self._errors = [] + self._validate_robot(self.robot) + + if validation_fn is None: + validation_fn = validation_handler_strict + + return validation_fn(self._errors) + + def _create_maps(self): + self._material_map = {} + for m in self.robot.materials: + self._material_map[m.name] = m + + self._joint_map = {} + for j in self.robot.joints: + self._joint_map[j.name] = j + + self._link_map = {} + for l in self.robot.links: + self._link_map[l.name] = l + + def _update_actuated_joints(self): + self._actuated_joints = [] + self._actuated_joint_indices = [] + self._actuated_dof_indices = [] + + dof_indices_cnt = 0 + for i, j in enumerate(self.robot.joints): + if j.mimic is None and j.type != "fixed": + self._actuated_joints.append(j) + self._actuated_joint_indices.append(i) + + if j.type in ["prismatic", "revolute", "continuous"]: + self._actuated_dof_indices.append([dof_indices_cnt]) + dof_indices_cnt += 1 + elif j.type == "floating": + self._actuated_dof_indices.append([dof_indices_cnt, dof_indices_cnt + 1, dof_indices_cnt + 2]) + dof_indices_cnt += 3 + elif j.type == "planar": + self._actuated_dof_indices.append([dof_indices_cnt, dof_indices_cnt + 1]) + dof_indices_cnt += 2 + + def _validate_required_attribute(self, attribute, error_msg, allowed_values=None): + if attribute is None: + self._errors.append(URDFIncompleteError(error_msg)) + elif isinstance(attribute, str) and len(attribute) == 0: + self._errors.append(URDFIncompleteError(error_msg)) + + if allowed_values is not None and attribute is not None: + if attribute not in allowed_values: + self._errors.append(URDFAttributeValueError(error_msg)) + + @staticmethod + def load(fname_or_file, add_dummy_free_joints=False, **kwargs): + """Load URDF file from filename or file object. + + Args: + fname_or_file (str or file object): A filename or file object, file-like object, stream representing the URDF file. + **build_scene_graph (bool, optional): Wheter to build a scene graph to enable transformation queries and forward kinematics. Defaults to True. + **build_collision_scene_graph (bool, optional): Wheter to build a scene graph for elements. Defaults to False. + **load_meshes (bool, optional): Whether to load the meshes referenced in the elements. Defaults to True. + **load_collision_meshes (bool, optional): Whether to load the collision meshes referenced in the elements. Defaults to False. + **filename_handler ([type], optional): Any function f(in: str) -> str, that maps filenames in the URDF to actual resources. Can be used to customize treatment of `package://` directives or relative/absolute filenames. Defaults to None. + **mesh_dir (str, optional): A root directory used for loading meshes. Defaults to "". + **force_mesh (bool, optional): Each loaded geometry will be concatenated into a single one (instead of being turned into a graph; in case the underlying file contains multiple geometries). This might loose texture information but the resulting scene graph will be smaller. Defaults to False. + **force_collision_mesh (bool, optional): Same as force_mesh, but for collision scene. Defaults to True. + + Raises: + ValueError: If filename does not exist. + + Returns: + yourdfpy.URDF: URDF model. + """ + if isinstance(fname_or_file, six.string_types): + if not os.path.isfile(fname_or_file): + raise ValueError("{} is not a file".format(fname_or_file)) + + if not "mesh_dir" in kwargs: + kwargs["mesh_dir"] = os.path.dirname(fname_or_file) + + try: + parser = etree.XMLParser(remove_blank_text=True) + tree = etree.parse(fname_or_file, parser=parser) + xml_root = tree.getroot() + except Exception as e: + _logger.error(e) + _logger.error("Using different parsing approach.") + + events = ("start", "end", "start-ns", "end-ns") + xml = etree.iterparse(fname_or_file, recover=True, events=events) + + # Iterate through all XML elements + for action, elem in xml: + # Skip comments and processing instructions, + # because they do not have names + if not (isinstance(elem, etree._Comment) or isinstance(elem, etree._ProcessingInstruction)): + # Remove a namespace URI in the element's name + # elem.tag = etree.QName(elem).localname + if action == "end" and ":" in elem.tag: + elem.getparent().remove(elem) + + xml_root = xml.root + + # Remove comments + etree.strip_tags(xml_root, etree.Comment) + etree.cleanup_namespaces(xml_root) + + return URDF( + robot=URDF._parse_robot(xml_element=xml_root, add_dummy_free_joints=add_dummy_free_joints), **kwargs + ) + + def contains(self, key, value, element=None) -> bool: + """Checks recursively whether the URDF tree contains the provided key-value pair. + + Args: + key (str): A key. + value (str): A value. + element (etree.Element, optional): The XML element from which to start the recursive search. None means URDF root. Defaults to None. + + Returns: + bool: Whether the key-value pair was found. + """ + if element is None: + element = self.robot + + result = False + for field in element.__dataclass_fields__: + field_value = getattr(element, field) + if is_dataclass(field_value): + result = result or self.contains(key=key, value=value, element=field_value) + elif isinstance(field_value, list) and len(field_value) > 0 and is_dataclass(field_value[0]): + for field_value_element in field_value: + result = result or self.contains(key=key, value=value, element=field_value_element) + else: + if key == field and value == field_value: + result = True + return result + + def _determine_base_link(self): + """Get the base link of the URDF tree by extracting all links without parents. + In case multiple links could be root chose the first. + + Returns: + str: Name of the base link. + """ + link_names = [l.name for l in self.robot.links] + + for j in self.robot.joints: + link_names.remove(j.child) + + if len(link_names) == 0: + # raise Error? + return None + + return link_names[0] + + def _forward_kinematics_joint(self, joint, q=None): + origin = np.eye(4) if joint.origin is None else joint.origin + + if joint.mimic is not None: + if joint.mimic.joint in self.actuated_joint_names: + mimic_joint_index = self.actuated_joint_names.index(joint.mimic.joint) + q = self._cfg[mimic_joint_index] * joint.mimic.multiplier + joint.mimic.offset + else: + _logger.warning( + f"Joint '{joint.name}' is supposed to mimic '{joint.mimic.joint}'. But this joint is not actuated - will assume (0.0 + offset)." + ) + q = 0.0 + joint.mimic.offset + + if joint.type in ["revolute", "prismatic", "continuous"]: + if q is None: + # Use internal cfg vector for forward kinematics + q = float(self.cfg[self.actuated_dof_indices[self.actuated_joint_names.index(joint.name)]]) + + if joint.type == "prismatic": + matrix = origin @ tra.translation_matrix(q * joint.axis) + else: + matrix = origin @ tra.rotation_matrix(q, joint.axis) + else: + # this includes: floating, planar, fixed + matrix = origin + + return matrix, q + + def update_cfg(self, configuration): + """Update joint configuration of URDF; does forward kinematics. + + Args: + configuration (dict, list[float], tuple[float] or np.ndarray): A mapping from joints or joint names to configuration values, or a list containing a value for each actuated joint. + + Raises: + ValueError: Raised if dimensionality of configuration does not match number of actuated joints of URDF model. + TypeError: Raised if configuration is neither a dict, list, tuple or np.ndarray. + """ + joint_cfg = [] + + if isinstance(configuration, dict): + for joint in configuration: + if isinstance(joint, six.string_types): + joint_cfg.append((self._joint_map[joint], configuration[joint])) + elif isinstance(joint, Joint): + # TODO: Joint is not hashable; so this branch will not succeed + joint_cfg.append((joint, configuration[joint])) + elif isinstance(configuration, (list, tuple, np.ndarray)): + if len(configuration) == len(self.robot.joints): + for joint, value in zip(self.robot.joints, configuration): + joint_cfg.append((joint, value)) + elif len(configuration) == self.num_actuated_joints: + for joint, value in zip(self._actuated_joints, configuration): + joint_cfg.append((joint, value)) + else: + raise ValueError( + f"Dimensionality of configuration ({len(configuration)}) doesn't match number of all ({len(self.robot.joints)}) or actuated joints ({self.num_actuated_joints})." + ) + else: + raise TypeError("Invalid type for configuration") + + # append all mimic joints in the update + for j, q in joint_cfg + [(j, 0.0) for j in self.robot.joints if j.mimic is not None]: + matrix, joint_q = self._forward_kinematics_joint(j, q=q) + + # update internal configuration vector - only consider actuated joints + if j.name in self.actuated_joint_names: + self._cfg[self.actuated_dof_indices[self.actuated_joint_names.index(j.name)]] = joint_q + + if self._scene is not None: + self._scene.graph.update(frame_from=j.parent, frame_to=j.child, matrix=matrix) + if self._scene_collision is not None: + self._scene_collision.graph.update(frame_from=j.parent, frame_to=j.child, matrix=matrix) + + def get_transform(self, frame_to, frame_from=None, collision_geometry=False): + """Get the transform from one frame to another. + + Args: + frame_to (str): Node name. + frame_from (str, optional): Node name. If None it will be set to self.base_frame. Defaults to None. + collision_geometry (bool, optional): Whether to use the collision geometry scene graph (instead of the visual geometry). Defaults to False. + + Raises: + ValueError: Raised if scene graph wasn't constructed during intialization. + + Returns: + (4, 4) float: Homogeneous transformation matrix + """ + if collision_geometry: + if self._scene_collision is None: + raise ValueError("No collision scene available. Use build_collision_scene_graph=True during loading.") + else: + return self._scene_collision.graph.get(frame_to=frame_to, frame_from=frame_from)[0] + else: + if self._scene is None: + raise ValueError("No scene available. Use build_scene_graph=True during loading.") + else: + return self._scene.graph.get(frame_to=frame_to, frame_from=frame_from)[0] + + def _link_mesh(self, link, collision_geometry=True): + geometries = link.collisions if collision_geometry else link.visuals + + if len(geometries) == 0: + return None + + meshes = [] + for g in geometries: + for m in g.geometry.meshes: + m = m.copy() + pose = g.origin + if g.geometry.mesh is not None: + if g.geometry.mesh.scale is not None: + S = np.eye(4) + S[:3, :3] = np.diag(g.geometry.mesh.scale) + pose = pose.dot(S) + m.apply_transform(pose) + meshes.append(m) + if len(meshes) == 0: + return None + self._collision_mesh = meshes[0] + meshes[1:] + return self._collision_mesh + + def _geometry2trimeshscene(self, geometry, load_file, force_mesh, skip_materials): + new_s = None + if geometry.box is not None: + new_s = trimesh.primitives.Box(extents=geometry.box.size).scene() + elif geometry.sphere is not None: + new_s = trimesh.primitives.Sphere(radius=geometry.sphere.radius).scene() + elif geometry.cylinder is not None: + new_s = trimesh.primitives.Cylinder( + radius=geometry.cylinder.radius, height=geometry.cylinder.length + ).scene() + elif geometry.mesh is not None and load_file: + new_filename = self._filename_handler(fname=geometry.mesh.filename) + + if os.path.isfile(new_filename): + _logger.debug(f"Loading {geometry.mesh.filename} as {new_filename}") + + if force_mesh: + new_g = trimesh.load( + new_filename, + ignore_broken=True, + force="mesh", + skip_materials=skip_materials, + ) + + # add original filename + if "file_path" not in new_g.metadata: + new_g.metadata["file_path"] = os.path.abspath(new_filename) + new_g.metadata["file_name"] = os.path.basename(new_filename) + + new_s = trimesh.Scene() + new_s.add_geometry(new_g) + else: + new_s = trimesh.load( + new_filename, + ignore_broken=True, + force="scene", + skip_materials=skip_materials, + ) + + if "file_path" in new_s.metadata: + for i, (_, geom) in enumerate(new_s.geometry.items()): + if "file_path" not in geom.metadata: + geom.metadata["file_path"] = new_s.metadata["file_path"] + geom.metadata["file_name"] = new_s.metadata["file_name"] + geom.metadata["file_element"] = i + + # scale mesh appropriately + if geometry.mesh.scale is not None: + if isinstance(geometry.mesh.scale, float): + new_s = new_s.scaled(geometry.mesh.scale) + elif isinstance(geometry.mesh.scale, np.ndarray): + new_s = new_s.scaled(geometry.mesh.scale) + else: + _logger.warning(f"Warning: Can't interpret scale '{geometry.mesh.scale}'") + else: + _logger.warning(f"Can't find {new_filename}") + return new_s + + def _add_geometries_to_scene( + self, + s, + geometries, + link_name, + load_geometry, + force_mesh, + force_single_geometry, + skip_materials, + ): + if force_single_geometry: + tmp_scene = trimesh.Scene(base_frame=link_name) + + first_geom_name = None + + for v in geometries: + if v.geometry is not None: + if first_geom_name is None: + first_geom_name = v.name + + new_s = self._geometry2trimeshscene( + geometry=v.geometry, + load_file=load_geometry, + force_mesh=force_mesh, + skip_materials=skip_materials, + ) + if new_s is not None: + origin = v.origin if v.origin is not None else np.eye(4) + + if force_single_geometry: + for name, geom in new_s.geometry.items(): + if isinstance(v, Visual): + apply_visual_color(geom, v, self._material_map) + tmp_scene.add_geometry( + geometry=geom, + geom_name=v.name, + parent_node_name=link_name, + transform=origin @ new_s.graph.get(name)[0], + ) + else: + # The following map is used to deal with glb format + # when the graph node and geometry have different names + geom_name_map = {new_s.graph[node_name][1]: node_name for node_name in new_s.graph.nodes} + for name, geom in new_s.geometry.items(): + if isinstance(v, Visual): + apply_visual_color(geom, v, self._material_map) + s.add_geometry( + geometry=geom, + geom_name=v.name, + parent_node_name=link_name, + transform=origin @ new_s.graph.get(geom_name_map[name])[0], + ) + + if force_single_geometry and len(tmp_scene.geometry) > 0: + s.add_geometry( + geometry=tmp_scene.dump(concatenate=True), + geom_name=first_geom_name, + parent_node_name=link_name, + transform=np.eye(4), + ) + + def _create_scene( + self, + use_collision_geometry=False, + load_geometry=True, + force_mesh=False, + force_single_geometry_per_link=False, + ): + s = trimesh.scene.Scene(base_frame=self._base_link) + + for j in self.robot.joints: + matrix, _ = self._forward_kinematics_joint(j) + + s.graph.update(frame_from=j.parent, frame_to=j.child, matrix=matrix) + + for l in self.robot.links: + if l.name not in s.graph.nodes and l.name != s.graph.base_frame: + _logger.warning(f"{l.name} not connected via joints. Will add link to base frame.") + s.graph.update(frame_from=s.graph.base_frame, frame_to=l.name) + + meshes = l.collisions if use_collision_geometry else l.visuals + self._add_geometries_to_scene( + s, + geometries=meshes, + link_name=l.name, + load_geometry=load_geometry, + force_mesh=force_mesh, + force_single_geometry=force_single_geometry_per_link, + skip_materials=use_collision_geometry, + ) + + return s + + def _successors(self, node): + """ + Get all nodes of the scene that succeeds a specified node. + + Parameters + ------------ + node : any + Hashable key in `scene.graph` + + Returns + ----------- + subnodes : set[str] + Set of nodes. + """ + # get every node that is a successor to specified node + # this includes `node` + return self._scene.graph.transforms.successors(node) + + def _create_subrobot(self, robot_name, root_link_name): + subrobot = Robot(name=robot_name) + subnodes = self._successors(node=root_link_name) + + if len(subnodes) > 0: + for node in subnodes: + if node in self.link_map: + subrobot.links.append(copy.deepcopy(self.link_map[node])) + for joint_name, joint in self.joint_map.items(): + if joint.parent in subnodes and joint.child in subnodes: + subrobot.joints.append(copy.deepcopy(self.joint_map[joint_name])) + + return subrobot + + def split_along_joints(self, joint_type="floating", **kwargs): + """Split URDF model along a particular joint type. + The result is a set of URDF models which together compose the original URDF. + + Args: + joint_type (str, or list[str], optional): Type of joint to use for splitting. Defaults to "floating". + **kwargs: Arguments delegated to URDF constructor of new URDF models. + + Returns: + list[(np.ndarray, yourdfpy.URDF)]: A list of tuples (np.ndarray, yourdfpy.URDF) whereas each homogeneous 4x4 matrix describes the root transformation of the respective URDF model w.r.t. the original URDF. + """ + root_urdf = URDF(robot=copy.deepcopy(self.robot), build_scene_graph=False, load_meshes=False) + result = [] + + joint_types = joint_type if isinstance(joint_type, list) else [joint_type] + + # find all relevant joints + joint_names = [j.name for j in self.robot.joints if j.type in joint_types] + for joint_name in joint_names: + root_link = self.link_map[self.joint_map[joint_name].child] + new_robot = self._create_subrobot( + robot_name=root_link.name, + root_link_name=root_link.name, + ) + + result.append( + ( + self._scene.graph.get(root_link.name)[0], + URDF(robot=new_robot, **kwargs), + ) + ) + + # remove links and joints from root robot + for j in new_robot.joints: + root_urdf.robot.joints.remove(root_urdf.joint_map[j.name]) + for l in new_robot.links: + root_urdf.robot.links.remove(root_urdf.link_map[l.name]) + + # remove joint that connects root urdf to root_link + if root_link.name in [j.child for j in root_urdf.robot.joints]: + root_urdf.robot.joints.remove( + root_urdf.robot.joints[[j.child for j in root_urdf.robot.joints].index(root_link.name)] + ) + + result.insert(0, (np.eye(4), URDF(robot=root_urdf.robot, **kwargs))) + + return result + + def validate_filenames(self): + for l in self.robot.links: + meshes = [m.geometry.mesh for m in l.collisions + l.visuals if m.geometry.mesh is not None] + for m in meshes: + _logger.debug(m.filename, "-->", self._filename_handler(m.filename)) + if not os.path.isfile(self._filename_handler(m.filename)): + return False + return True + + def write_xml(self): + """Write URDF model to an XML element hierarchy. + + Returns: + etree.ElementTree: XML data. + """ + xml_element = self._write_robot(self.robot) + return etree.ElementTree(xml_element) + + def write_xml_string(self, **kwargs): + """Write URDF model to a string. + + Returns: + str: String of the xml representation of the URDF model. + """ + xml_element = self.write_xml() + return etree.tostring(xml_element, xml_declaration=True, *kwargs) + + def write_xml_file(self, fname): + """Write URDF model to an xml file. + + Args: + fname (str): Filename of the file to be written. Usually ends in `.urdf`. + """ + xml_element = self.write_xml() + xml_element.write(fname, xml_declaration=True, pretty_print=True) + + def _parse_mimic(xml_element): + if xml_element is None: + return None + + return Mimic( + joint=xml_element.get("joint"), + multiplier=_str2float(xml_element.get("multiplier", 1.0)), + offset=_str2float(xml_element.get("offset", 0.0)), + ) + + def _write_mimic(self, xml_parent, mimic): + etree.SubElement( + xml_parent, + "mimic", + attrib={ + "joint": mimic.joint, + "multiplier": str(mimic.multiplier), + "offset": str(mimic.offset), + }, + ) + + def _parse_safety_controller(xml_element): + if xml_element is None: + return None + + return SafetyController( + soft_lower_limit=_str2float(xml_element.get("soft_lower_limit")), + soft_upper_limit=_str2float(xml_element.get("soft_upper_limit")), + k_position=_str2float(xml_element.get("k_position")), + k_velocity=_str2float(xml_element.get("k_velocity")), + ) + + def _write_safety_controller(self, xml_parent, safety_controller): + etree.SubElement( + xml_parent, + "safety_controller", + attrib={ + "soft_lower_limit": str(safety_controller.soft_lower_limit), + "soft_upper_limit": str(safety_controller.soft_upper_limit), + "k_position": str(safety_controller.k_position), + "k_velocity": str(safety_controller.k_velocity), + }, + ) + + def _parse_transmission_joint(xml_element): + if xml_element is None: + return None + + transmission_joint = TransmissionJoint(name=xml_element.get("name")) + + for h in xml_element.findall("hardware_interface"): + transmission_joint.hardware_interfaces.append(h.text) + + return transmission_joint + + def _write_transmission_joint(self, xml_parent, transmission_joint): + xml_element = etree.SubElement( + xml_parent, + "joint", + attrib={ + "name": str(transmission_joint.name), + }, + ) + for h in transmission_joint.hardware_interfaces: + tmp = etree.SubElement( + xml_element, + "hardwareInterface", + ) + tmp.text = h + + def _parse_actuator(xml_element): + if xml_element is None: + return None + + actuator = Actuator(name=xml_element.get("name")) + if xml_element.find("mechanicalReduction"): + actuator.mechanical_reduction = float(xml_element.find("mechanicalReduction").text) + + for h in xml_element.findall("hardwareInterface"): + actuator.hardware_interfaces.append(h.text) + + return actuator + + def _write_actuator(self, xml_parent, actuator): + xml_element = etree.SubElement( + xml_parent, + "actuator", + attrib={ + "name": str(actuator.name), + }, + ) + if actuator.mechanical_reduction is not None: + tmp = etree.SubElement("mechanicalReduction") + tmp.text = str(actuator.mechanical_reduction) + + for h in actuator.hardware_interfaces: + tmp = etree.SubElement( + xml_element, + "hardwareInterface", + ) + tmp.text = h + + def _parse_transmission(xml_element): + if xml_element is None: + return None + + transmission = Transmission(name=xml_element.get("name")) + + for j in xml_element.findall("joint"): + transmission.joints.append(URDF._parse_transmission_joint(j)) + for a in xml_element.findall("actuator"): + transmission.actuators.append(URDF._parse_actuator(a)) + + return transmission + + def _write_transmission(self, xml_parent, transmission): + xml_element = etree.SubElement( + xml_parent, + "transmission", + attrib={ + "name": str(transmission.name), + }, + ) + + for j in transmission.joints: + self._write_transmission_joint(xml_element, j) + + for a in transmission.actuators: + self._write_actuator(xml_element, a) + + def _parse_calibration(xml_element): + if xml_element is None: + return None + + return Calibration( + rising=_str2float(xml_element.get("rising")), + falling=_str2float(xml_element.get("falling")), + ) + + def _write_calibration(self, xml_parent, calibration): + etree.SubElement( + xml_parent, + "calibration", + attrib={ + "rising": str(calibration.rising), + "falling": str(calibration.falling), + }, + ) + + def _parse_box(xml_element): + return Box(size=np.array(xml_element.attrib["size"].split(), dtype=float)) + + def _write_box(self, xml_parent, box): + etree.SubElement(xml_parent, "box", attrib={"size": " ".join(map(str, box.size))}) + + def _parse_cylinder(xml_element): + return Cylinder( + radius=float(xml_element.attrib["radius"]), + length=float(xml_element.attrib["length"]), + ) + + def _write_cylinder(self, xml_parent, cylinder): + etree.SubElement( + xml_parent, + "cylinder", + attrib={"radius": str(cylinder.radius), "length": str(cylinder.length)}, + ) + + def _parse_sphere(xml_element): + return Sphere(radius=float(xml_element.attrib["radius"])) + + def _write_sphere(self, xml_parent, sphere): + etree.SubElement(xml_parent, "sphere", attrib={"radius": str(sphere.radius)}) + + def _parse_scale(xml_element): + if "scale" in xml_element.attrib: + s = xml_element.get("scale").split() + if len(s) == 0: + return None + elif len(s) == 1: + return float(s[0]) + else: + return np.array(list(map(float, s))) + return None + + def _write_scale(self, xml_parent, scale): + if scale is not None: + if isinstance(scale, float) or isinstance(scale, int): + xml_parent.set("scale", " ".join([str(scale)] * 3)) + else: + xml_parent.set("scale", " ".join(map(str, scale))) + + def _parse_mesh(xml_element): + return Mesh(filename=xml_element.get("filename"), scale=URDF._parse_scale(xml_element)) + + def _write_mesh(self, xml_parent, mesh): + # TODO: turn into different filename handler + xml_element = etree.SubElement( + xml_parent, + "mesh", + attrib={"filename": self._filename_handler(mesh.filename)}, + ) + + self._write_scale(xml_element, mesh.scale) + + def _parse_geometry(xml_element): + geometry = Geometry() + if xml_element[0].tag == "box": + geometry.box = URDF._parse_box(xml_element[0]) + elif xml_element[0].tag == "cylinder": + geometry.cylinder = URDF._parse_cylinder(xml_element[0]) + elif xml_element[0].tag == "sphere": + geometry.sphere = URDF._parse_sphere(xml_element[0]) + elif xml_element[0].tag == "mesh": + geometry.mesh = URDF._parse_mesh(xml_element[0]) + else: + raise ValueError(f"Unknown tag: {xml_element[0].tag}") + + return geometry + + def _validate_geometry(self, geometry): + if geometry is None: + self._errors.append(URDFIncompleteError(" is missing.")) + + num_nones = sum( + [ + x is not None + for x in [ + geometry.box, + geometry.cylinder, + geometry.sphere, + geometry.mesh, + ] + ] + ) + if num_nones < 1: + self._errors.append( + URDFIncompleteError( + "One of , , , needs to be defined as a child of ." + ) + ) + elif num_nones > 1: + self._errors.append( + URDFError( + "Too many of , , , defined as a child of . Only one allowed." + ) + ) + + def _write_geometry(self, xml_parent, geometry): + if geometry is None: + return + + xml_element = etree.SubElement(xml_parent, "geometry") + if geometry.box is not None: + self._write_box(xml_element, geometry.box) + elif geometry.cylinder is not None: + self._write_cylinder(xml_element, geometry.cylinder) + elif geometry.sphere is not None: + self._write_sphere(xml_element, geometry.sphere) + elif geometry.mesh is not None: + self._write_mesh(xml_element, geometry.mesh) + + def _parse_origin(xml_element): + if xml_element is None: + return None + + xyz = xml_element.get("xyz", default="0 0 0") + rpy = xml_element.get("rpy", default="0 0 0") + + return tra.compose_matrix( + translate=np.array(list(map(float, xyz.split()))), + angles=np.array(list(map(float, rpy.split()))), + ) + + def _write_origin(self, xml_parent, origin): + if origin is None: + return + + etree.SubElement( + xml_parent, + "origin", + attrib={ + "xyz": " ".join(map(str, tra.translation_from_matrix(origin))), + "rpy": " ".join(map(str, tra.euler_from_matrix(origin))), + }, + ) + + def _parse_color(xml_element): + if xml_element is None: + return None + + rgba = xml_element.get("rgba", default="1 1 1 1") + + return Color(rgba=np.array(list(map(float, rgba.split())))) + + def _write_color(self, xml_parent, color): + if color is None: + return + + etree.SubElement(xml_parent, "color", attrib={"rgba": " ".join(map(str, color.rgba))}) + + def _parse_texture(xml_element): + if xml_element is None: + return None + + # TODO: use texture filename handler + return Texture(filename=xml_element.get("filename", default=None)) + + def _write_texture(self, xml_parent, texture): + if texture is None: + return + + # TODO: use texture filename handler + etree.SubElement(xml_parent, "texture", attrib={"filename": texture.filename}) + + def _parse_material(xml_element): + if xml_element is None: + return None + + material = Material(name=xml_element.get("name")) + material.color = URDF._parse_color(xml_element.find("color")) + material.texture = URDF._parse_texture(xml_element.find("texture")) + + return material + + def _write_material(self, xml_parent, material): + if material is None: + return + + attrib = {"name": material.name} if material.name is not None else {} + xml_element = etree.SubElement( + xml_parent, + "material", + attrib=attrib, + ) + + self._write_color(xml_element, material.color) + self._write_texture(xml_element, material.texture) + + def _parse_visual(xml_element): + visual = Visual(name=xml_element.get("name")) + + visual.geometry = URDF._parse_geometry(xml_element.find("geometry")) + visual.origin = URDF._parse_origin(xml_element.find("origin")) + visual.material = URDF._parse_material(xml_element.find("material")) + + return visual + + def _validate_visual(self, visual): + self._validate_geometry(visual.geometry) + + def _write_visual(self, xml_parent, visual): + attrib = {"name": visual.name} if visual.name is not None else {} + xml_element = etree.SubElement( + xml_parent, + "visual", + attrib=attrib, + ) + + self._write_geometry(xml_element, visual.geometry) + self._write_origin(xml_element, visual.origin) + self._write_material(xml_element, visual.material) + + def _parse_collision(xml_element): + collision = Collision(name=xml_element.get("name")) + + collision.geometry = URDF._parse_geometry(xml_element.find("geometry")) + collision.origin = URDF._parse_origin(xml_element.find("origin")) + + return collision + + def _validate_collision(self, collision): + self._validate_geometry(collision.geometry) + + def _write_collision(self, xml_parent, collision): + attrib = {"name": collision.name} if collision.name is not None else {} + xml_element = etree.SubElement( + xml_parent, + "collision", + attrib=attrib, + ) + + self._write_geometry(xml_element, collision.geometry) + self._write_origin(xml_element, collision.origin) + + def _parse_inertia(xml_element): + if xml_element is None: + return None + + x = xml_element + + return np.array( + [ + [ + x.get("ixx", default=1.0), + x.get("ixy", default=0.0), + x.get("ixz", default=0.0), + ], + [ + x.get("ixy", default=0.0), + x.get("iyy", default=1.0), + x.get("iyz", default=0.0), + ], + [ + x.get("ixz", default=0.0), + x.get("iyz", default=0.0), + x.get("izz", default=1.0), + ], + ], + dtype=np.float64, + ) + + def _write_inertia(self, xml_parent, inertia): + if inertia is None: + return None + + etree.SubElement( + xml_parent, + "inertia", + attrib={ + "ixx": str(inertia[0, 0]), + "ixy": str(inertia[0, 1]), + "ixz": str(inertia[0, 2]), + "iyy": str(inertia[1, 1]), + "iyz": str(inertia[1, 2]), + "izz": str(inertia[2, 2]), + }, + ) + + def _parse_mass(xml_element): + if xml_element is None: + return None + + return _str2float(xml_element.get("value", default=0.0)) + + def _write_mass(self, xml_parent, mass): + if mass is None: + return + + etree.SubElement( + xml_parent, + "mass", + attrib={ + "value": str(mass), + }, + ) + + def _parse_inertial(xml_element): + if xml_element is None: + return None + + inertial = Inertial() + inertial.origin = URDF._parse_origin(xml_element.find("origin")) + inertial.inertia = URDF._parse_inertia(xml_element.find("inertia")) + inertial.mass = URDF._parse_mass(xml_element.find("mass")) + + return inertial + + def _write_inertial(self, xml_parent, inertial): + if inertial is None: + return + + xml_element = etree.SubElement(xml_parent, "inertial") + + self._write_origin(xml_element, inertial.origin) + self._write_mass(xml_element, inertial.mass) + self._write_inertia(xml_element, inertial.inertia) + + def _parse_link(xml_element): + link = Link(name=xml_element.attrib["name"]) + + link.inertial = URDF._parse_inertial(xml_element.find("inertial")) + + for v in xml_element.findall("visual"): + link.visuals.append(URDF._parse_visual(v)) + + for c in xml_element.findall("collision"): + link.collisions.append(URDF._parse_collision(c)) + + return link + + def _validate_link(self, link): + self._validate_required_attribute(attribute=link.name, error_msg="The tag misses a 'name' attribute.") + + for v in link.visuals: + self._validate_visual(v) + + for c in link.collisions: + self._validate_collision(c) + + def _write_link(self, xml_parent, link): + xml_element = etree.SubElement( + xml_parent, + "link", + attrib={ + "name": link.name, + }, + ) + + self._write_inertial(xml_element, link.inertial) + for visual in link.visuals: + self._write_visual(xml_element, visual) + for collision in link.collisions: + self._write_collision(xml_element, collision) + + def _parse_axis(xml_element): + if xml_element is None: + return np.array([1.0, 0, 0]) + + xyz = xml_element.get("xyz", "1 0 0") + results = [] + for x in xyz.split(): + try: + x = float(x) + except ValueError: + x = 0 + results.append(x) + return np.array(results) + # return np.array(list(map(float, xyz.split()))) + + def _write_axis(self, xml_parent, axis): + if axis is None: + return + + etree.SubElement(xml_parent, "axis", attrib={"xyz": " ".join(map(str, axis))}) + + def _parse_limit(xml_element): + if xml_element is None: + return None + + return Limit( + effort=_str2float(xml_element.get("effort", default=None)), + velocity=_str2float(xml_element.get("velocity", default=None)), + lower=_str2float(xml_element.get("lower", default=None)), + upper=_str2float(xml_element.get("upper", default=None)), + ) + + def _validate_limit(self, limit, type): + if type in ["revolute", "prismatic"]: + self._validate_required_attribute( + limit, + error_msg="The of a (prismatic, revolute) joint is missing.", + ) + + if limit is not None: + self._validate_required_attribute( + limit.upper, + error_msg="Tag of joint is missing attribute 'upper'.", + ) + self._validate_required_attribute( + limit.lower, + error_msg="Tag of joint is missing attribute 'lower'.", + ) + + if limit is not None: + self._validate_required_attribute( + limit.effort, + error_msg="Tag of joint is missing attribute 'effort'.", + ) + + self._validate_required_attribute( + limit.velocity, + error_msg="Tag of joint is missing attribute 'velocity'.", + ) + + def _write_limit(self, xml_parent, limit): + if limit is None: + return + + attrib = {} + if limit.effort is not None: + attrib["effort"] = str(limit.effort) + if limit.velocity is not None: + attrib["velocity"] = str(limit.velocity) + if limit.lower is not None: + attrib["lower"] = str(limit.lower) + if limit.upper is not None: + attrib["upper"] = str(limit.upper) + + etree.SubElement( + xml_parent, + "limit", + attrib=attrib, + ) + + def _parse_dynamics(xml_element): + if xml_element is None: + return None + + dynamics = Dynamics() + dynamics.damping = xml_element.get("damping", default=None) + dynamics.friction = xml_element.get("friction", default=None) + + return dynamics + + def _write_dynamics(self, xml_parent, dynamics): + if dynamics is None: + return + + attrib = {} + if dynamics.damping is not None: + attrib["damping"] = str(dynamics.damping) + if dynamics.friction is not None: + attrib["friction"] = str(dynamics.friction) + + etree.SubElement( + xml_parent, + "dynamics", + attrib=attrib, + ) + + def _parse_joint(xml_element): + joint = Joint(name=xml_element.attrib["name"]) + + joint.type = xml_element.get("type", default=None) + joint.parent = xml_element.find("parent").get("link") + joint.child = xml_element.find("child").get("link") + joint.origin = URDF._parse_origin(xml_element.find("origin")) + joint.axis = URDF._parse_axis(xml_element.find("axis")) + joint.limit = URDF._parse_limit(xml_element.find("limit")) + joint.dynamics = URDF._parse_dynamics(xml_element.find("dynamics")) + joint.mimic = URDF._parse_mimic(xml_element.find("mimic")) + joint.calibration = URDF._parse_calibration(xml_element.find("calibration")) + joint.safety_controller = URDF._parse_safety_controller(xml_element.find("safety_controller")) + + return joint + + def _validate_joint(self, joint): + self._validate_required_attribute( + attribute=joint.name, + error_msg="The tag misses a 'name' attribute.", + ) + + allowed_types = [ + "revolute", + "continuous", + "prismatic", + "fixed", + "floating", + "planar", + ] + self._validate_required_attribute( + attribute=joint.type, + error_msg=f"The tag misses a 'type' attribute or value is not part of allowed values [{', '.join(allowed_types)}].", + allowed_values=allowed_types, + ) + + self._validate_required_attribute( + joint.parent, + error_msg=f"The of a is missing.", + ) + + self._validate_required_attribute( + joint.child, + error_msg=f"The of a is missing.", + ) + + self._validate_limit(joint.limit, type=joint.type) + + def _write_joint(self, xml_parent, joint): + xml_element = etree.SubElement( + xml_parent, + "joint", + attrib={ + "name": joint.name, + "type": joint.type, + }, + ) + + etree.SubElement(xml_element, "parent", attrib={"link": joint.parent}) + etree.SubElement(xml_element, "child", attrib={"link": joint.child}) + self._write_origin(xml_element, joint.origin) + self._write_axis(xml_element, joint.axis) + self._write_limit(xml_element, joint.limit) + self._write_dynamics(xml_element, joint.dynamics) + + @staticmethod + def _parse_robot(xml_element, add_dummy_free_joints=False): + robot = Robot(name=xml_element.attrib["name"]) + + for l in xml_element.findall("link"): + robot.links.append(URDF._parse_link(l)) + for j in xml_element.findall("joint"): + robot.joints.append(URDF._parse_joint(j)) + for m in xml_element.findall("material"): + robot.materials.append(URDF._parse_material(m)) + + if add_dummy_free_joints: + # Determine root link + link_names = [l.name for l in robot.links] + for j in robot.joints: + link_names.remove(j.child) + + if len(link_names) == 0: + raise RuntimeError(f"No root link found for robot.") + + root_link_name = link_names[0] + _add_dummy_joints(robot, root_link_name) + + return robot + + def _validate_robot(self, robot): + if robot is not None: + self._validate_required_attribute( + attribute=robot.name, + error_msg="The tag misses a 'name' attribute.", + ) + + for l in robot.links: + self._validate_link(l) + + for j in robot.joints: + self._validate_joint(j) + + def _write_robot(self, robot): + xml_element = etree.Element("robot", attrib={"name": robot.name}) + for link in robot.links: + self._write_link(xml_element, link) + for joint in robot.joints: + self._write_joint(xml_element, joint) + for material in robot.materials: + self._write_material(xml_element, material) + + return xml_element + + def __eq__(self, other): + if not isinstance(other, URDF): + raise NotImplemented + return self.robot == other.robot + + @property + def filename_handler(self): + return self._filename_handler + + def build_tree(self): + parent_child_map: Dict[str, List[str]] = {} + for joint in self.robot.joints: + if joint.parent in parent_child_map: + parent_child_map[joint.parent].append(joint.child) + else: + parent_child_map[joint.parent] = [joint.child] + + # Sort link with bfs order + bfs_link_list = [self.base_link] + to_be_handle_list = [self.base_link] + while len(to_be_handle_list) > 0: + parent = to_be_handle_list.pop(0) + if parent not in parent_child_map: + continue + + children = parent_child_map[parent] + to_be_handle_list.extend(children) + bfs_link_list.extend(children) + bfs_joint_list = [] + for link_name in bfs_link_list[1:]: + joint_index = [i for i in range(len(self.robot.joints)) if self.robot.joints[i].child == link_name][0] + bfs_joint_list.append(self.robot.joints[joint_index]) + + # Build tree + root = Node(self.base_link, matrix=np.eye(4)) + for joint in bfs_joint_list: + matrix, _ = self._forward_kinematics_joint(joint, 0) + parent_node = anytree.search.findall_by_attr(root, value=joint.parent)[0] + node = Node(joint.child, parent=parent_node, matrix=matrix) + return root + + def update_kinematics(self, configuration): + joint_cfg = [] + + if isinstance(configuration, dict): + for joint in configuration: + if isinstance(joint, six.string_types): + joint_cfg.append((self._joint_map[joint], configuration[joint])) + elif isinstance(joint, Joint): + # TODO: Joint is not hashable; so this branch will not succeed + joint_cfg.append((joint, configuration[joint])) + elif isinstance(configuration, (list, tuple, np.ndarray)): + if len(configuration) == len(self.robot.joints): + for joint, value in zip(self.robot.joints, configuration): + joint_cfg.append((joint, value)) + elif len(configuration) == self.num_actuated_joints: + for joint, value in zip(self._actuated_joints, configuration): + joint_cfg.append((joint, value)) + else: + raise ValueError( + f"Dimensionality of configuration ({len(configuration)}) doesn't match number of all ({len(self.robot.joints)}) or actuated joints ({self.num_actuated_joints})." + ) + else: + raise TypeError("Invalid type for configuration") + + # append all mimic joints in the update + for j, q in joint_cfg + [(j, 0.0) for j in self.robot.joints if j.mimic is not None]: + matrix, _ = self._forward_kinematics_joint(j, q=q) + node = anytree.search.findall_by_attr(self.tree_root, j.child)[0] + node.matrix = matrix + + for node in LevelOrderIter(self.tree_root): + if node.name == self.base_link: + node.global_pose = np.eye(4) + else: + node.global_pose = node.parent.global_pose @ node.matrix + + def get_link_global_transform(self, link_name): + node = anytree.search.findall_by_attr(self.tree_root, link_name)[0] + + return node.global_pose + + +def _add_dummy_joints(robot: Robot, root_link_name: str): + # Prepare link and joint properties + translation_range = (-5, 5) + rotation_range = (-2 * np.pi, 2 * np.pi) + joint_types = ["prismatic"] * 3 + ["revolute"] * 3 + joint_limit = [translation_range] * 3 + [rotation_range] * 3 + joint_name = DUMMY_JOINT_NAMES.copy() + link_name = [f"dummy_{name}_translation_link" for name in "xyz"] + [f"dummy_{name}_rotation_link" for name in "xyz"] + + links = [] + joints = [] + + for i in range(6): + inertial = Inertial( + mass=0.01, inertia=np.array([[1e-4, 0, 0], [0, 1e-4, 0], [0, 0, 1e-4]]), origin=np.identity(4) + ) + link = Link(name=link_name[i], inertial=inertial) + links.append(link) + + joint_axis = np.zeros(3, dtype=int) + joint_axis[i % 3] = 1 + limit = Limit(lower=joint_limit[i][0], upper=joint_limit[i][1], velocity=3.14, effort=10) + + child_name = link_name[i + 1] if i < 5 else root_link_name + joint = Joint( + name=joint_name[i], + type=joint_types[i], + parent=link_name[i], + child=child_name, + origin=np.identity(4), + axis=joint_axis, + limit=limit, + ) + joints.append(joint) + + robot.joints = joints + robot.joints + robot.links = links + robot.links + + +DUMMY_JOINT_NAMES = [f"dummy_{name}_translation_joint" for name in "xyz"] + [ + f"dummy_{name}_rotation_joint" for name in "xyz" +] diff --git a/teleop/robot_control/hand_retargeting.py b/teleop/robot_control/hand_retargeting.py index 97d8437..ec03702 100644 --- a/teleop/robot_control/hand_retargeting.py +++ b/teleop/robot_control/hand_retargeting.py @@ -1,4 +1,4 @@ -from dex_retargeting.retargeting_config import RetargetingConfig +from .dex_retargeting.retargeting_config import RetargetingConfig from pathlib import Path import yaml from enum import Enum @@ -30,6 +30,28 @@ class HandRetargeting: right_retargeting_config = RetargetingConfig.from_dict(self.cfg['right']) self.left_retargeting = left_retargeting_config.build() self.right_retargeting = right_retargeting_config.build() + + self.left_retargeting_joint_names = self.left_retargeting.joint_names + self.right_retargeting_joint_names = self.right_retargeting.joint_names + # In section "Sort by message structure" of https://support.unitree.com/home/en/G1_developer/dexterous_hand + self.right_dex3_api_joint_names = [ 'right_hand_thumb_0_joint', 'right_hand_thumb_1_joint', 'right_hand_thumb_2_joint', + 'right_hand_middle_0_joint', 'right_hand_middle_1_joint', + 'right_hand_index_0_joint', 'right_hand_index_1_joint' ] + self.left_dex3_api_joint_names = [ 'left_hand_thumb_0_joint', 'left_hand_thumb_1_joint', 'left_hand_thumb_2_joint', + 'left_hand_middle_0_joint', 'left_hand_middle_1_joint', + 'left_hand_index_0_joint', 'left_hand_index_1_joint' ] + self.left_dex_retargeting_to_hardware = [ self.left_retargeting_joint_names.index(name) for name in self.left_dex3_api_joint_names] + self.right_dex_retargeting_to_hardware = [ self.right_retargeting_joint_names.index(name) for name in self.right_dex3_api_joint_names] + + # Archive: This is the joint order of the dex-retargeting library version 0.1.1. + # print([joint.get_name() for joint in self.left_retargeting.optimizer.robot.get_active_joints()]) + # ['left_hand_thumb_0_joint', 'left_hand_thumb_1_joint', 'left_hand_thumb_2_joint', + # 'left_hand_middle_0_joint', 'left_hand_middle_1_joint', + # 'left_hand_index_0_joint', 'left_hand_index_1_joint'] + # print([joint.get_name() for joint in self.right_retargeting.optimizer.robot.get_active_joints()]) + # ['right_hand_thumb_0_joint', 'right_hand_thumb_1_joint', 'right_hand_thumb_2_joint', + # 'right_hand_middle_0_joint', 'right_hand_middle_1_joint', + # 'right_hand_index_0_joint', 'right_hand_index_1_joint'] except FileNotFoundError: print(f"Configuration file not found: {config_file_path}") diff --git a/teleop/robot_control/robot_hand_unitree.py b/teleop/robot_control/robot_hand_unitree.py index 9968db7..7592075 100644 --- a/teleop/robot_control/robot_hand_unitree.py +++ b/teleop/robot_control/robot_hand_unitree.py @@ -185,8 +185,9 @@ class Dex3_1_Controller: ref_right_value[0] = ref_right_value[0] * 1.15 ref_right_value[1] = ref_right_value[1] * 1.05 ref_right_value[2] = ref_right_value[2] * 0.95 - left_q_target = self.hand_retargeting.left_retargeting.retarget(ref_left_value)[[0,1,2,3,4,5,6]] - right_q_target = self.hand_retargeting.right_retargeting.retarget(ref_right_value)[[0,1,2,3,4,5,6]] + + left_q_target = self.hand_retargeting.left_retargeting.retarget(ref_left_value)[self.hand_retargeting.right_dex_retargeting_to_hardware] + right_q_target = self.hand_retargeting.right_retargeting.retarget(ref_right_value)[self.hand_retargeting.right_dex_retargeting_to_hardware] # get dual hand action action_data = np.concatenate((left_q_target, right_q_target))