committed by
GitHub
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
36 changed files with 1681 additions and 5086 deletions
-
6.gitmodules
-
7LICENSE
-
456README.md
-
559README_ja-JP.md
-
432README_zh-CN.md
-
63assets/inspire_hand/inspire_hand.yml
-
40assets/unitree_hand/unitree_dex3.yml
-
27requirements.txt
-
20teleop/image_server/image_client.py
-
42teleop/image_server/image_server.py
-
53teleop/open_television/constants.py
-
178teleop/open_television/television.py
-
147teleop/open_television/tv_wrapper.py
-
1teleop/robot_control/dex-retargeting
-
22teleop/robot_control/dex_retargeting/CITATION.cff
-
21teleop/robot_control/dex_retargeting/LICENSE
-
1teleop/robot_control/dex_retargeting/__init__.py
-
85teleop/robot_control/dex_retargeting/constants.py
-
102teleop/robot_control/dex_retargeting/kinematics_adaptor.py
-
511teleop/robot_control/dex_retargeting/optimizer.py
-
17teleop/robot_control/dex_retargeting/optimizer_utils.py
-
251teleop/robot_control/dex_retargeting/retargeting_config.py
-
93teleop/robot_control/dex_retargeting/robot_wrapper.py
-
151teleop/robot_control/dex_retargeting/seq_retarget.py
-
2237teleop/robot_control/dex_retargeting/yourdfpy.py
-
16teleop/robot_control/hand_retargeting.py
-
178teleop/robot_control/robot_arm.py
-
32teleop/robot_control/robot_arm_ik.py
-
29teleop/robot_control/robot_hand_inspire.py
-
124teleop/robot_control/robot_hand_unitree.py
-
485teleop/teleop_hand_and_arm.py
-
1teleop/televuer
-
44teleop/utils/episode_writer.py
-
14teleop/utils/mat_tool.py
-
32teleop/utils/rerun_visualizer.py
-
290teleop/utils/sim_state_topic.py
@ -0,0 +1,6 @@ |
|||
[submodule "teleop/televuer"] |
|||
path = teleop/televuer |
|||
url = https://github.com/silencht/televuer |
|||
[submodule "teleop/robot_control/dex-retargeting"] |
|||
path = teleop/robot_control/dex-retargeting |
|||
url = https://github.com/silencht/dex-retargeting |
|||
@ -1,39 +1,62 @@ |
|||
left: |
|||
type: vector |
|||
type: DexPilot # or vector |
|||
urdf_path: inspire_hand/inspire_hand_left.urdf |
|||
wrist_link_name: "L_hand_base_link" |
|||
|
|||
# Target refers to the retargeting target, which is the robot hand |
|||
target_joint_names: ['L_thumb_proximal_yaw_joint', 'L_thumb_proximal_pitch_joint', |
|||
'L_index_proximal_joint', 'L_middle_proximal_joint', |
|||
'L_ring_proximal_joint', 'L_pinky_proximal_joint' ] |
|||
target_joint_names: |
|||
[ |
|||
'L_thumb_proximal_yaw_joint', |
|||
'L_thumb_proximal_pitch_joint', |
|||
'L_index_proximal_joint', |
|||
'L_middle_proximal_joint', |
|||
'L_ring_proximal_joint', |
|||
'L_pinky_proximal_joint' |
|||
] |
|||
|
|||
# for DexPilot type |
|||
wrist_link_name: "L_hand_base_link" |
|||
finger_tip_link_names: [ "L_thumb_tip", "L_index_tip", "L_middle_tip", "L_ring_tip", "L_pinky_tip" ] |
|||
# If you do not know exactly how it is used, please leave it to None for default. |
|||
target_link_human_indices_dexpilot: [[ 9, 14, 19, 24, 14, 19, 24, 19, 24, 24, 0, 0, 0, 0, 0], [ 4, 4, 4, 4, 9, 9, 9, 14, 14, 19, 4, 9, 14, 19, 24]] |
|||
|
|||
# for vector type |
|||
target_origin_link_names: [ "L_hand_base_link", "L_hand_base_link", "L_hand_base_link", "L_hand_base_link", "L_hand_base_link"] |
|||
target_task_link_names: [ "L_thumb_tip", "L_index_tip", "L_middle_tip", "L_ring_tip", "L_pinky_tip" ] |
|||
scaling_factor: 1.20 |
|||
|
|||
# Source refers to the retargeting input, which usually corresponds to the human hand |
|||
# The joint indices of human hand joint which corresponds to each link in the target_link_names |
|||
target_link_human_indices: [ [ 0, 0, 0, 0, 0 ], [ 4, 9, 14, 19, 24 ] ] |
|||
target_link_human_indices_vector: [ [ 0, 0, 0, 0, 0 ], [ 4, 9, 14, 19, 24 ] ] |
|||
|
|||
# Scaling factor for vector retargeting only |
|||
# For example, Allegro is 1.6 times larger than normal human hand, then this scaling factor should be 1.6 |
|||
scaling_factor: 1.20 |
|||
# A smaller alpha means stronger filtering, i.e. more smooth but also larger latency |
|||
low_pass_alpha: 0.2 |
|||
|
|||
right: |
|||
type: vector |
|||
type: DexPilot # or vector |
|||
urdf_path: inspire_hand/inspire_hand_right.urdf |
|||
wrist_link_name: "R_hand_base_link" |
|||
|
|||
# Target refers to the retargeting target, which is the robot hand |
|||
target_joint_names: ['R_thumb_proximal_yaw_joint', 'R_thumb_proximal_pitch_joint', |
|||
'R_index_proximal_joint', 'R_middle_proximal_joint', |
|||
'R_ring_proximal_joint', 'R_pinky_proximal_joint' ] |
|||
target_joint_names: |
|||
[ |
|||
'R_thumb_proximal_yaw_joint', |
|||
'R_thumb_proximal_pitch_joint', |
|||
'R_index_proximal_joint', |
|||
'R_middle_proximal_joint', |
|||
'R_ring_proximal_joint', |
|||
'R_pinky_proximal_joint' |
|||
] |
|||
|
|||
# for DexPilot type |
|||
wrist_link_name: "R_hand_base_link" |
|||
finger_tip_link_names: [ "R_thumb_tip", "R_index_tip", "R_middle_tip", "R_ring_tip", "R_pinky_tip" ] |
|||
# If you do not know exactly how it is used, please leave it to None for |
|||
target_link_human_indices_dexpilot: [[ 9, 14, 19, 24, 14, 19, 24, 19, 24, 24, 0, 0, 0, 0, 0], [ 4, 4, 4, 4, 9, 9, 9, 14, 14, 19, 4, 9, 14, 19, 24]] |
|||
|
|||
target_origin_link_names: [ "R_hand_base_link", "R_hand_base_link", "R_hand_base_link", "R_hand_base_link", "R_hand_base_link"] |
|||
target_task_link_names: [ "R_thumb_tip", "R_index_tip", "R_middle_tip", "R_ring_tip", "R_pinky_tip" ] |
|||
target_link_human_indices_vector: [ [ 0, 0, 0, 0, 0 ], [ 4, 9, 14, 19, 24 ] ] |
|||
|
|||
# Scaling factor for vector retargeting only |
|||
# For example, Allegro is 1.6 times larger than normal human hand, then this scaling factor should be 1.6 |
|||
scaling_factor: 1.20 |
|||
|
|||
# Source refers to the retargeting input, which usually corresponds to the human hand |
|||
# The joint indices of human hand joint which corresponds to each link in the target_link_names |
|||
target_link_human_indices: [ [ 0, 0, 0, 0, 0 ], [ 4, 9, 14, 19, 24 ] ] |
|||
|
|||
# A smaller alpha means stronger filtering, i.e. more smooth but also larger latency |
|||
low_pass_alpha: 0.2 |
|||
@ -1,27 +1,4 @@ |
|||
aiohttp==3.9.5 |
|||
aiohttp_cors==0.7.0 |
|||
aiortc==1.8.0 |
|||
av==11.0.0 |
|||
einops==0.8.0 |
|||
h5py==3.11.0 |
|||
ipython==8.12.3 |
|||
matplotlib==3.7.5 |
|||
numpy==1.23.0 |
|||
opencv_contrib_python==4.10.0.82 |
|||
opencv_python==4.9.0.80 |
|||
packaging==24.1 |
|||
pandas==2.0.3 |
|||
params_proto==2.12.1 |
|||
pytransform3d==3.5.0 |
|||
PyYAML==6.0.1 |
|||
scikit_learn==1.3.2 |
|||
scipy==1.10.1 |
|||
seaborn==0.13.2 |
|||
setuptools==69.5.1 |
|||
torch==2.3.0 |
|||
torchvision==0.18.0 |
|||
vuer==0.0.32rc7 |
|||
rerun-sdk==0.20.1 |
|||
nlopt>=2.6.1,<2.8.0 |
|||
trimesh>=4.4.0 |
|||
anytree>=2.12.0 |
|||
meshcat==0.3.2 |
|||
sshkeyboard==2.3.1 |
|||
@ -1,53 +0,0 @@ |
|||
import numpy as np |
|||
|
|||
|
|||
T_to_unitree_left_wrist = np.array([[1, 0, 0, 0], |
|||
[0, 0, -1, 0], |
|||
[0, 1, 0, 0], |
|||
[0, 0, 0, 1]]) |
|||
|
|||
T_to_unitree_right_wrist = np.array([[1, 0, 0, 0], |
|||
[0, 0, 1, 0], |
|||
[0, -1, 0, 0], |
|||
[0, 0, 0, 1]]) |
|||
|
|||
T_to_unitree_hand = np.array([[0, 0, 1, 0], |
|||
[-1,0, 0, 0], |
|||
[0, -1,0, 0], |
|||
[0, 0, 0, 1]]) |
|||
|
|||
T_robot_openxr = np.array([[0, 0, -1, 0], |
|||
[-1, 0, 0, 0], |
|||
[0, 1, 0, 0], |
|||
[0, 0, 0, 1]]) |
|||
|
|||
|
|||
|
|||
const_head_vuer_mat = np.array([[1, 0, 0, 0], |
|||
[0, 1, 0, 1.5], |
|||
[0, 0, 1, -0.2], |
|||
[0, 0, 0, 1]]) |
|||
|
|||
|
|||
# For G1 initial position |
|||
const_right_wrist_vuer_mat = np.array([[1, 0, 0, 0.15], |
|||
[0, 1, 0, 1.13], |
|||
[0, 0, 1, -0.3], |
|||
[0, 0, 0, 1]]) |
|||
|
|||
# For G1 initial position |
|||
const_left_wrist_vuer_mat = np.array([[1, 0, 0, -0.15], |
|||
[0, 1, 0, 1.13], |
|||
[0, 0, 1, -0.3], |
|||
[0, 0, 0, 1]]) |
|||
|
|||
# legacy |
|||
# const_right_wrist_vuer_mat = np.array([[1, 0, 0, 0.5], |
|||
# [0, 1, 0, 1], |
|||
# [0, 0, 1, -0.5], |
|||
# [0, 0, 0, 1]]) |
|||
|
|||
# const_left_wrist_vuer_mat = np.array([[1, 0, 0, -0.5], |
|||
# [0, 1, 0, 1], |
|||
# [0, 0, 1, -0.5], |
|||
# [0, 0, 0, 1]]) |
|||
@ -1,178 +0,0 @@ |
|||
import time |
|||
from vuer import Vuer |
|||
from vuer.schemas import ImageBackground, Hands |
|||
from multiprocessing import Array, Process, shared_memory |
|||
import numpy as np |
|||
import asyncio |
|||
import cv2 |
|||
|
|||
from multiprocessing import context |
|||
Value = context._default_context.Value |
|||
|
|||
|
|||
class TeleVision: |
|||
def __init__(self, binocular, img_shape, img_shm_name, cert_file="./cert.pem", key_file="./key.pem", ngrok=False): |
|||
self.binocular = binocular |
|||
self.img_height = img_shape[0] |
|||
if binocular: |
|||
self.img_width = img_shape[1] // 2 |
|||
else: |
|||
self.img_width = img_shape[1] |
|||
|
|||
if ngrok: |
|||
self.vuer = Vuer(host='0.0.0.0', queries=dict(grid=False), queue_len=3) |
|||
else: |
|||
self.vuer = Vuer(host='0.0.0.0', cert=cert_file, key=key_file, queries=dict(grid=False), queue_len=3) |
|||
|
|||
self.vuer.add_handler("HAND_MOVE")(self.on_hand_move) |
|||
self.vuer.add_handler("CAMERA_MOVE")(self.on_cam_move) |
|||
|
|||
existing_shm = shared_memory.SharedMemory(name=img_shm_name) |
|||
self.img_array = np.ndarray(img_shape, dtype=np.uint8, buffer=existing_shm.buf) |
|||
|
|||
if binocular: |
|||
self.vuer.spawn(start=False)(self.main_image_binocular) |
|||
else: |
|||
self.vuer.spawn(start=False)(self.main_image_monocular) |
|||
|
|||
self.left_hand_shared = Array('d', 16, lock=True) |
|||
self.right_hand_shared = Array('d', 16, lock=True) |
|||
self.left_landmarks_shared = Array('d', 75, lock=True) |
|||
self.right_landmarks_shared = Array('d', 75, lock=True) |
|||
|
|||
self.head_matrix_shared = Array('d', 16, lock=True) |
|||
self.aspect_shared = Value('d', 1.0, lock=True) |
|||
|
|||
self.process = Process(target=self.vuer_run) |
|||
self.process.daemon = True |
|||
self.process.start() |
|||
|
|||
|
|||
def vuer_run(self): |
|||
self.vuer.run() |
|||
|
|||
async def on_cam_move(self, event, session, fps=60): |
|||
try: |
|||
self.head_matrix_shared[:] = event.value["camera"]["matrix"] |
|||
self.aspect_shared.value = event.value['camera']['aspect'] |
|||
except: |
|||
pass |
|||
|
|||
async def on_hand_move(self, event, session, fps=60): |
|||
try: |
|||
self.left_hand_shared[:] = event.value["leftHand"] |
|||
self.right_hand_shared[:] = event.value["rightHand"] |
|||
self.left_landmarks_shared[:] = np.array(event.value["leftLandmarks"]).flatten() |
|||
self.right_landmarks_shared[:] = np.array(event.value["rightLandmarks"]).flatten() |
|||
except: |
|||
pass |
|||
|
|||
async def main_image_binocular(self, session, fps=60): |
|||
session.upsert @ Hands(fps=fps, stream=True, key="hands", showLeft=False, showRight=False) |
|||
while True: |
|||
display_image = cv2.cvtColor(self.img_array, cv2.COLOR_BGR2RGB) |
|||
# aspect_ratio = self.img_width / self.img_height |
|||
session.upsert( |
|||
[ |
|||
ImageBackground( |
|||
display_image[:, :self.img_width], |
|||
aspect=1.778, |
|||
height=1, |
|||
distanceToCamera=1, |
|||
# The underlying rendering engine supported a layer binary bitmask for both objects and the camera. |
|||
# Below we set the two image planes, left and right, to layers=1 and layers=2. |
|||
# Note that these two masks are associated with left eye’s camera and the right eye’s camera. |
|||
layers=1, |
|||
format="jpeg", |
|||
quality=50, |
|||
key="background-left", |
|||
interpolate=True, |
|||
), |
|||
ImageBackground( |
|||
display_image[:, self.img_width:], |
|||
aspect=1.778, |
|||
height=1, |
|||
distanceToCamera=1, |
|||
layers=2, |
|||
format="jpeg", |
|||
quality=50, |
|||
key="background-right", |
|||
interpolate=True, |
|||
), |
|||
], |
|||
to="bgChildren", |
|||
) |
|||
# 'jpeg' encoding should give you about 30fps with a 16ms wait in-between. |
|||
await asyncio.sleep(0.016 * 2) |
|||
|
|||
async def main_image_monocular(self, session, fps=60): |
|||
session.upsert @ Hands(fps=fps, stream=True, key="hands", showLeft=False, showRight=False) |
|||
while True: |
|||
display_image = cv2.cvtColor(self.img_array, cv2.COLOR_BGR2RGB) |
|||
# aspect_ratio = self.img_width / self.img_height |
|||
session.upsert( |
|||
[ |
|||
ImageBackground( |
|||
display_image, |
|||
aspect=1.778, |
|||
height=1, |
|||
distanceToCamera=1, |
|||
format="jpeg", |
|||
quality=50, |
|||
key="background-mono", |
|||
interpolate=True, |
|||
), |
|||
], |
|||
to="bgChildren", |
|||
) |
|||
await asyncio.sleep(0.016) |
|||
|
|||
@property |
|||
def left_hand(self): |
|||
return np.array(self.left_hand_shared[:]).reshape(4, 4, order="F") |
|||
|
|||
|
|||
@property |
|||
def right_hand(self): |
|||
return np.array(self.right_hand_shared[:]).reshape(4, 4, order="F") |
|||
|
|||
|
|||
@property |
|||
def left_landmarks(self): |
|||
return np.array(self.left_landmarks_shared[:]).reshape(25, 3) |
|||
|
|||
@property |
|||
def right_landmarks(self): |
|||
return np.array(self.right_landmarks_shared[:]).reshape(25, 3) |
|||
|
|||
@property |
|||
def head_matrix(self): |
|||
return np.array(self.head_matrix_shared[:]).reshape(4, 4, order="F") |
|||
|
|||
@property |
|||
def aspect(self): |
|||
return float(self.aspect_shared.value) |
|||
|
|||
if __name__ == '__main__': |
|||
import os |
|||
import sys |
|||
current_dir = os.path.dirname(os.path.abspath(__file__)) |
|||
parent_dir = os.path.dirname(current_dir) |
|||
sys.path.append(parent_dir) |
|||
import threading |
|||
from image_server.image_client import ImageClient |
|||
|
|||
# image |
|||
img_shape = (480, 640 * 2, 3) |
|||
img_shm = shared_memory.SharedMemory(create=True, size=np.prod(img_shape) * np.uint8().itemsize) |
|||
img_array = np.ndarray(img_shape, dtype=np.uint8, buffer=img_shm.buf) |
|||
img_client = ImageClient(tv_img_shape = img_shape, tv_img_shm_name = img_shm.name) |
|||
image_receive_thread = threading.Thread(target=img_client.receive_process, daemon=True) |
|||
image_receive_thread.start() |
|||
|
|||
# television |
|||
tv = TeleVision(True, img_shape, img_shm.name) |
|||
print("vuer unit test program running...") |
|||
print("you can press ^C to interrupt program.") |
|||
while True: |
|||
time.sleep(0.03) |
|||
@ -1,147 +0,0 @@ |
|||
import numpy as np |
|||
from teleop.open_television.television import TeleVision |
|||
from teleop.open_television.constants import * |
|||
from teleop.utils.mat_tool import mat_update, fast_mat_inv |
|||
|
|||
""" |
|||
(basis) OpenXR Convention : y up, z back, x right. |
|||
(basis) Robot Convention : z up, y left, x front. |
|||
p.s. Vuer's all raw data follows OpenXR Convention, WORLD coordinate. |
|||
|
|||
under (basis) Robot Convention, wrist's initial pose convention: |
|||
|
|||
# (Left Wrist) XR/AppleVisionPro Convention: |
|||
- the x-axis pointing from wrist toward middle. |
|||
- the y-axis pointing from index toward pinky. |
|||
- the z-axis pointing from palm toward back of the hand. |
|||
|
|||
# (Right Wrist) XR/AppleVisionPro Convention: |
|||
- the x-axis pointing from wrist toward middle. |
|||
- the y-axis pointing from pinky toward index. |
|||
- the z-axis pointing from palm toward back of the hand. |
|||
|
|||
# (Left Wrist URDF) Unitree Convention: |
|||
- the x-axis pointing from wrist toward middle. |
|||
- the y-axis pointing from palm toward back of the hand. |
|||
- the z-axis pointing from pinky toward index. |
|||
|
|||
# (Right Wrist URDF) Unitree Convention: |
|||
- the x-axis pointing from wrist toward middle. |
|||
- the y-axis pointing from back of the hand toward palm. |
|||
- the z-axis pointing from pinky toward index. |
|||
|
|||
under (basis) Robot Convention, hand's initial pose convention: |
|||
|
|||
# (Left Hand) XR/AppleVisionPro Convention: |
|||
- the x-axis pointing from wrist toward middle. |
|||
- the y-axis pointing from index toward pinky. |
|||
- the z-axis pointing from palm toward back of the hand. |
|||
|
|||
# (Right Hand) XR/AppleVisionPro Convention: |
|||
- the x-axis pointing from wrist toward middle. |
|||
- the y-axis pointing from pinky toward index. |
|||
- the z-axis pointing from palm toward back of the hand. |
|||
|
|||
# (Left Hand URDF) Unitree Convention: |
|||
- The x-axis pointing from palm toward back of the hand. |
|||
- The y-axis pointing from middle toward wrist. |
|||
- The z-axis pointing from pinky toward index. |
|||
|
|||
# (Right Hand URDF) Unitree Convention: |
|||
- The x-axis pointing from palm toward back of the hand. |
|||
- The y-axis pointing from middle toward wrist. |
|||
- The z-axis pointing from index toward pinky. |
|||
|
|||
p.s. From website: https://registry.khronos.org/OpenXR/specs/1.1/man/html/openxr.html. |
|||
You can find **(Left/Right Wrist) XR/AppleVisionPro Convention** related information like this below: |
|||
"The wrist joint is located at the pivot point of the wrist, which is location invariant when twisting the hand without moving the forearm. |
|||
The backward (+Z) direction is parallel to the line from wrist joint to middle finger metacarpal joint, and points away from the finger tips. |
|||
The up (+Y) direction points out towards back of the hand and perpendicular to the skin at wrist. |
|||
The X direction is perpendicular to the Y and Z directions and follows the right hand rule." |
|||
Note: The above context is of course under **(basis) OpenXR Convention**. |
|||
|
|||
p.s. **(Wrist/Hand URDF) Unitree Convention** information come from URDF files. |
|||
""" |
|||
|
|||
class TeleVisionWrapper: |
|||
def __init__(self, binocular, img_shape, img_shm_name): |
|||
self.tv = TeleVision(binocular, img_shape, img_shm_name) |
|||
|
|||
def get_data(self): |
|||
|
|||
# --------------------------------wrist------------------------------------- |
|||
|
|||
# TeleVision obtains a basis coordinate that is OpenXR Convention |
|||
head_vuer_mat, head_flag = mat_update(const_head_vuer_mat, self.tv.head_matrix.copy()) |
|||
left_wrist_vuer_mat, left_wrist_flag = mat_update(const_left_wrist_vuer_mat, self.tv.left_hand.copy()) |
|||
right_wrist_vuer_mat, right_wrist_flag = mat_update(const_right_wrist_vuer_mat, self.tv.right_hand.copy()) |
|||
|
|||
# Change basis convention: VuerMat ((basis) OpenXR Convention) to WristMat ((basis) Robot Convention) |
|||
# p.s. WristMat = T_{robot}_{openxr} * VuerMat * T_{robot}_{openxr}^T |
|||
# Reason for right multiply fast_mat_inv(T_robot_openxr): |
|||
# This is similarity transformation: B = PAP^{-1}, that is B ~ A |
|||
# For example: |
|||
# - For a pose data T_r under the Robot Convention, left-multiplying WristMat means: |
|||
# - WristMat * T_r ==> T_{robot}_{openxr} * VuerMat * T_{openxr}_{robot} * T_r |
|||
# - First, transform to the OpenXR Convention (The function of T_{openxr}_{robot}) |
|||
# - then, apply the rotation VuerMat in the OpenXR Convention (The function of VuerMat) |
|||
# - finally, transform back to the Robot Convention (The function of T_{robot}_{openxr}) |
|||
# This results in the same rotation effect under the Robot Convention as in the OpenXR Convention. |
|||
head_mat = T_robot_openxr @ head_vuer_mat @ fast_mat_inv(T_robot_openxr) |
|||
left_wrist_mat = T_robot_openxr @ left_wrist_vuer_mat @ fast_mat_inv(T_robot_openxr) |
|||
right_wrist_mat = T_robot_openxr @ right_wrist_vuer_mat @ fast_mat_inv(T_robot_openxr) |
|||
|
|||
# Change wrist convention: WristMat ((Left Wrist) XR/AppleVisionPro Convention) to UnitreeWristMat((Left Wrist URDF) Unitree Convention) |
|||
# Reason for right multiply (T_to_unitree_left_wrist) : Rotate 90 degrees counterclockwise about its own x-axis. |
|||
# Reason for right multiply (T_to_unitree_right_wrist): Rotate 90 degrees clockwise about its own x-axis. |
|||
unitree_left_wrist = left_wrist_mat @ (T_to_unitree_left_wrist if left_wrist_flag else np.eye(4)) |
|||
unitree_right_wrist = right_wrist_mat @ (T_to_unitree_right_wrist if right_wrist_flag else np.eye(4)) |
|||
|
|||
# Transfer from WORLD to HEAD coordinate (translation only). |
|||
unitree_left_wrist[0:3, 3] = unitree_left_wrist[0:3, 3] - head_mat[0:3, 3] |
|||
unitree_right_wrist[0:3, 3] = unitree_right_wrist[0:3, 3] - head_mat[0:3, 3] |
|||
|
|||
# --------------------------------hand------------------------------------- |
|||
|
|||
# Homogeneous, [xyz] to [xyz1] |
|||
# p.s. np.concatenate([25,3]^T,(1,25)) ==> hand_vuer_mat.shape is (4,25) |
|||
# Now under (basis) OpenXR Convention, mat shape like this: |
|||
# x0 x1 x2 ··· x23 x24 |
|||
# y0 y1 y1 ··· y23 y24 |
|||
# z0 z1 z2 ··· z23 z24 |
|||
# 1 1 1 ··· 1 1 |
|||
left_hand_vuer_mat = np.concatenate([self.tv.left_landmarks.copy().T, np.ones((1, self.tv.left_landmarks.shape[0]))]) |
|||
right_hand_vuer_mat = np.concatenate([self.tv.right_landmarks.copy().T, np.ones((1, self.tv.right_landmarks.shape[0]))]) |
|||
|
|||
# Change basis convention: from (basis) OpenXR Convention to (basis) Robot Convention |
|||
# Just a change of basis for 3D points. No rotation, only translation. No need to right-multiply fast_mat_inv(T_robot_openxr). |
|||
left_hand_mat = T_robot_openxr @ left_hand_vuer_mat |
|||
right_hand_mat = T_robot_openxr @ right_hand_vuer_mat |
|||
|
|||
# Transfer from WORLD to WRIST coordinate. (this process under (basis) Robot Convention) |
|||
# p.s. HandMat_WristBased = WristMat_{wrold}_{wrist}^T * HandMat_{wrold} |
|||
# HandMat_WristBased = WristMat_{wrist}_{wrold} * HandMat_{wrold}, that is HandMat_{wrist} |
|||
left_hand_mat_wb = fast_mat_inv(left_wrist_mat) @ left_hand_mat |
|||
right_hand_mat_wb = fast_mat_inv(right_wrist_mat) @ right_hand_mat |
|||
# Change hand convention: HandMat ((Left Hand) XR/AppleVisionPro Convention) to UnitreeHandMat((Left Hand URDF) Unitree Convention) |
|||
# Reason for left multiply : T_to_unitree_hand @ left_hand_mat_wb ==> (4,4) @ (4,25) ==> (4,25), (4,25)[0:3, :] ==> (3,25), (3,25).T ==> (25,3) |
|||
# Now under (Left Hand URDF) Unitree Convention, mat shape like this: |
|||
# [x0, y0, z0] |
|||
# [x1, y1, z1] |
|||
# ··· |
|||
# [x23,y23,z23] |
|||
# [x24,y24,z24] |
|||
unitree_left_hand = (T_to_unitree_hand @ left_hand_mat_wb)[0:3, :].T |
|||
unitree_right_hand = (T_to_unitree_hand @ right_hand_mat_wb)[0:3, :].T |
|||
|
|||
# --------------------------------offset------------------------------------- |
|||
|
|||
head_rmat = head_mat[:3, :3] |
|||
# The origin of the coordinate for IK Solve is the WAIST joint motor. You can use teleop/robot_control/robot_arm_ik.py Unit_Test to check it. |
|||
# The origin of the coordinate of unitree_left_wrist is HEAD. So it is necessary to translate the origin of unitree_left_wrist from HEAD to WAIST. |
|||
unitree_left_wrist[0, 3] +=0.15 |
|||
unitree_right_wrist[0,3] +=0.15 |
|||
unitree_left_wrist[2, 3] +=0.45 |
|||
unitree_right_wrist[2,3] +=0.45 |
|||
|
|||
return head_rmat, unitree_left_wrist, unitree_right_wrist, unitree_left_hand, unitree_right_hand |
|||
@ -0,0 +1 @@ |
|||
Subproject commit d7753d38c9ff11f80bafea6cd168351fd3db9b0e |
|||
@ -1,22 +0,0 @@ |
|||
cff-version: 1.2.0 |
|||
authors: |
|||
- family-names: "Qin" |
|||
given-names: "Yuzhe" |
|||
- family-names: "Yang" |
|||
given-names: "Wei" |
|||
- family-names: "Huang" |
|||
given-names: "Binghao" |
|||
- family-names: "Van Wyk" |
|||
given-names: "Karl" |
|||
- family-names: "Su" |
|||
given-names: "Hao" |
|||
- family-names: "Wang" |
|||
given-names: "Xiaolong" |
|||
- family-names: "Chao" |
|||
given-names: "Yu-Wei" |
|||
- family-names: "Fox" |
|||
given-names: "Dieter" |
|||
date-released: 2023-10-25 |
|||
title: "AnyTeleop" |
|||
message: "Thanks for using dex-retargeting. If you use this software, please cite it as below." |
|||
url: "https://github.com/dexsuite/dex-retargeting" |
|||
@ -1,21 +0,0 @@ |
|||
The MIT License (MIT) |
|||
|
|||
Copyright (c) 2023 Yuzhe Qin |
|||
|
|||
Permission is hereby granted, free of charge, to any person obtaining a copy |
|||
of this software and associated documentation files (the "Software"), to deal |
|||
in the Software without restriction, including without limitation the rights |
|||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
|||
copies of the Software, and to permit persons to whom the Software is |
|||
furnished to do so, subject to the following conditions: |
|||
|
|||
The above copyright notice and this permission notice shall be included in all |
|||
copies or substantial portions of the Software. |
|||
|
|||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
|||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
|||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
|||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
|||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
|||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE |
|||
SOFTWARE. |
|||
@ -1 +0,0 @@ |
|||
__version__ = "0.4.6" |
|||
@ -1,85 +0,0 @@ |
|||
import enum |
|||
from pathlib import Path |
|||
from typing import Optional |
|||
|
|||
import numpy as np |
|||
|
|||
OPERATOR2MANO_RIGHT = np.array( |
|||
[ |
|||
[0, 0, -1], |
|||
[-1, 0, 0], |
|||
[0, 1, 0], |
|||
] |
|||
) |
|||
|
|||
OPERATOR2MANO_LEFT = np.array( |
|||
[ |
|||
[0, 0, -1], |
|||
[1, 0, 0], |
|||
[0, -1, 0], |
|||
] |
|||
) |
|||
|
|||
|
|||
class RobotName(enum.Enum): |
|||
allegro = enum.auto() |
|||
shadow = enum.auto() |
|||
svh = enum.auto() |
|||
leap = enum.auto() |
|||
ability = enum.auto() |
|||
inspire = enum.auto() |
|||
panda = enum.auto() |
|||
|
|||
|
|||
class RetargetingType(enum.Enum): |
|||
vector = enum.auto() # For teleoperation, no finger closing prior |
|||
position = enum.auto() # For offline data processing, especially hand-object interaction data |
|||
dexpilot = enum.auto() # For teleoperation, with finger closing prior |
|||
|
|||
|
|||
class HandType(enum.Enum): |
|||
right = enum.auto() |
|||
left = enum.auto() |
|||
|
|||
|
|||
ROBOT_NAME_MAP = { |
|||
RobotName.allegro: "allegro_hand", |
|||
RobotName.shadow: "shadow_hand", |
|||
RobotName.svh: "schunk_svh_hand", |
|||
RobotName.leap: "leap_hand", |
|||
RobotName.ability: "ability_hand", |
|||
RobotName.inspire: "inspire_hand", |
|||
RobotName.panda: "panda_gripper", |
|||
} |
|||
|
|||
ROBOT_NAMES = list(ROBOT_NAME_MAP.keys()) |
|||
|
|||
|
|||
def get_default_config_path( |
|||
robot_name: RobotName, retargeting_type: RetargetingType, hand_type: HandType |
|||
) -> Optional[Path]: |
|||
config_path = Path(__file__).parent / "configs" |
|||
if retargeting_type is RetargetingType.position: |
|||
config_path = config_path / "offline" |
|||
else: |
|||
config_path = config_path / "teleop" |
|||
|
|||
robot_name_str = ROBOT_NAME_MAP[robot_name] |
|||
hand_type_str = hand_type.name |
|||
if "gripper" in robot_name_str: # For gripper robots, only use gripper config file. |
|||
if retargeting_type == RetargetingType.dexpilot: |
|||
config_name = f"{robot_name_str}_dexpilot.yml" |
|||
else: |
|||
config_name = f"{robot_name_str}.yml" |
|||
else: |
|||
if retargeting_type == RetargetingType.dexpilot: |
|||
config_name = f"{robot_name_str}_{hand_type_str}_dexpilot.yml" |
|||
else: |
|||
config_name = f"{robot_name_str}_{hand_type_str}.yml" |
|||
return config_path / config_name |
|||
|
|||
|
|||
OPERATOR2MANO = { |
|||
HandType.right: OPERATOR2MANO_RIGHT, |
|||
HandType.left: OPERATOR2MANO_LEFT, |
|||
} |
|||
@ -1,102 +0,0 @@ |
|||
from abc import abstractmethod |
|||
from typing import List |
|||
|
|||
import numpy as np |
|||
|
|||
from .robot_wrapper import RobotWrapper |
|||
|
|||
|
|||
class KinematicAdaptor: |
|||
def __init__(self, robot: RobotWrapper, target_joint_names: List[str]): |
|||
self.robot = robot |
|||
self.target_joint_names = target_joint_names |
|||
|
|||
# Index mapping |
|||
self.idx_pin2target = np.array([robot.get_joint_index(n) for n in target_joint_names]) |
|||
|
|||
@abstractmethod |
|||
def forward_qpos(self, qpos: np.ndarray) -> np.ndarray: |
|||
""" |
|||
Adapt the joint position for different kinematics constraints. |
|||
Note that the joint order of this qpos is consistent with pinocchio |
|||
Args: |
|||
qpos: the pinocchio qpos |
|||
|
|||
Returns: the adapted qpos with the same shape as input |
|||
|
|||
""" |
|||
pass |
|||
|
|||
@abstractmethod |
|||
def backward_jacobian(self, jacobian: np.ndarray) -> np.ndarray: |
|||
""" |
|||
Adapt the jacobian for different kinematics applications. |
|||
Note that the joint order of this Jacobian is consistent with pinocchio |
|||
Args: |
|||
jacobian: the original jacobian |
|||
|
|||
Returns: the adapted jacobian with the same shape as input |
|||
|
|||
""" |
|||
pass |
|||
|
|||
|
|||
class MimicJointKinematicAdaptor(KinematicAdaptor): |
|||
def __init__( |
|||
self, |
|||
robot: RobotWrapper, |
|||
target_joint_names: List[str], |
|||
source_joint_names: List[str], |
|||
mimic_joint_names: List[str], |
|||
multipliers: List[float], |
|||
offsets: List[float], |
|||
): |
|||
super().__init__(robot, target_joint_names) |
|||
|
|||
self.multipliers = np.array(multipliers) |
|||
self.offsets = np.array(offsets) |
|||
|
|||
# Joint name check |
|||
union_set = set(mimic_joint_names).intersection(set(target_joint_names)) |
|||
if len(union_set) > 0: |
|||
raise ValueError( |
|||
f"Mimic joint should not be one of the target joints.\n" |
|||
f"Mimic joints: {mimic_joint_names}.\n" |
|||
f"Target joints: {target_joint_names}\n" |
|||
f"You need to specify the target joint names explicitly in your retargeting config" |
|||
f" for robot with mimic joint constraints: {target_joint_names}" |
|||
) |
|||
|
|||
# Indices in the pinocchio |
|||
self.idx_pin2source = np.array([robot.get_joint_index(name) for name in source_joint_names]) |
|||
self.idx_pin2mimic = np.array([robot.get_joint_index(name) for name in mimic_joint_names]) |
|||
|
|||
# Indices in the output results |
|||
self.idx_target2source = np.array([self.target_joint_names.index(n) for n in source_joint_names]) |
|||
|
|||
# Dimension check |
|||
len_source, len_mimic = self.idx_target2source.shape[0], self.idx_pin2mimic.shape[0] |
|||
len_mul, len_offset = self.multipliers.shape[0], self.offsets.shape[0] |
|||
if not (len_mimic == len_source == len_mul == len_offset): |
|||
raise ValueError( |
|||
f"Mimic joints setting dimension mismatch.\n" |
|||
f"Source joints: {len_source}, mimic joints: {len_mimic}, multiplier: {len_mul}, offset: {len_offset}" |
|||
) |
|||
self.num_active_joints = len(robot.dof_joint_names) - len_mimic |
|||
|
|||
# Uniqueness check |
|||
if len(mimic_joint_names) != len(np.unique(mimic_joint_names)): |
|||
raise ValueError(f"Redundant mimic joint names: {mimic_joint_names}") |
|||
|
|||
def forward_qpos(self, pin_qpos: np.ndarray) -> np.ndarray: |
|||
mimic_qpos = pin_qpos[self.idx_pin2source] * self.multipliers + self.offsets |
|||
pin_qpos[self.idx_pin2mimic] = mimic_qpos |
|||
return pin_qpos |
|||
|
|||
def backward_jacobian(self, jacobian: np.ndarray) -> np.ndarray: |
|||
target_jacobian = jacobian[..., self.idx_pin2target] |
|||
mimic_joint_jacobian = jacobian[..., self.idx_pin2mimic] * self.multipliers |
|||
|
|||
for i, index in enumerate(self.idx_target2source): |
|||
target_jacobian[..., index] += mimic_joint_jacobian[..., i] |
|||
return target_jacobian |
|||
@ -1,511 +0,0 @@ |
|||
from abc import abstractmethod |
|||
from typing import List, Optional |
|||
|
|||
import nlopt |
|||
import numpy as np |
|||
import torch |
|||
|
|||
from .kinematics_adaptor import KinematicAdaptor, MimicJointKinematicAdaptor |
|||
from .robot_wrapper import RobotWrapper |
|||
|
|||
|
|||
class Optimizer: |
|||
retargeting_type = "BASE" |
|||
|
|||
def __init__( |
|||
self, |
|||
robot: RobotWrapper, |
|||
target_joint_names: List[str], |
|||
target_link_human_indices: np.ndarray, |
|||
): |
|||
self.robot = robot |
|||
self.num_joints = robot.dof |
|||
|
|||
joint_names = robot.dof_joint_names |
|||
idx_pin2target = [] |
|||
for target_joint_name in target_joint_names: |
|||
if target_joint_name not in joint_names: |
|||
raise ValueError(f"Joint {target_joint_name} given does not appear to be in robot XML.") |
|||
idx_pin2target.append(joint_names.index(target_joint_name)) |
|||
self.target_joint_names = target_joint_names |
|||
self.idx_pin2target = np.array(idx_pin2target) |
|||
|
|||
self.idx_pin2fixed = np.array([i for i in range(robot.dof) if i not in idx_pin2target], dtype=int) |
|||
self.opt = nlopt.opt(nlopt.LD_SLSQP, len(idx_pin2target)) |
|||
self.opt_dof = len(idx_pin2target) # This dof includes the mimic joints |
|||
|
|||
# Target |
|||
self.target_link_human_indices = target_link_human_indices |
|||
|
|||
# Free joint |
|||
link_names = robot.link_names |
|||
self.has_free_joint = len([name for name in link_names if "dummy" in name]) >= 6 |
|||
|
|||
# Kinematics adaptor |
|||
self.adaptor: Optional[KinematicAdaptor] = None |
|||
|
|||
def set_joint_limit(self, joint_limits: np.ndarray, epsilon=1e-3): |
|||
if joint_limits.shape != (self.opt_dof, 2): |
|||
raise ValueError(f"Expect joint limits have shape: {(self.opt_dof, 2)}, but get {joint_limits.shape}") |
|||
self.opt.set_lower_bounds((joint_limits[:, 0] - epsilon).tolist()) |
|||
self.opt.set_upper_bounds((joint_limits[:, 1] + epsilon).tolist()) |
|||
|
|||
def get_link_indices(self, target_link_names): |
|||
return [self.robot.get_link_index(link_name) for link_name in target_link_names] |
|||
|
|||
def set_kinematic_adaptor(self, adaptor: KinematicAdaptor): |
|||
self.adaptor = adaptor |
|||
|
|||
# Remove mimic joints from fixed joint list |
|||
if isinstance(adaptor, MimicJointKinematicAdaptor): |
|||
fixed_idx = self.idx_pin2fixed |
|||
mimic_idx = adaptor.idx_pin2mimic |
|||
new_fixed_id = np.array([x for x in fixed_idx if x not in mimic_idx], dtype=int) |
|||
self.idx_pin2fixed = new_fixed_id |
|||
|
|||
def retarget(self, ref_value, fixed_qpos, last_qpos): |
|||
""" |
|||
Compute the retargeting results using non-linear optimization |
|||
Args: |
|||
ref_value: the reference value in cartesian space as input, different optimizer has different reference |
|||
fixed_qpos: the fixed value (not optimized) in retargeting, consistent with self.fixed_joint_names |
|||
last_qpos: the last retargeting results or initial value, consistent with function return |
|||
|
|||
Returns: joint position of robot, the joint order and dim is consistent with self.target_joint_names |
|||
|
|||
""" |
|||
if len(fixed_qpos) != len(self.idx_pin2fixed): |
|||
raise ValueError( |
|||
f"Optimizer has {len(self.idx_pin2fixed)} joints but non_target_qpos {fixed_qpos} is given" |
|||
) |
|||
objective_fn = self.get_objective_function(ref_value, fixed_qpos, np.array(last_qpos).astype(np.float32)) |
|||
|
|||
self.opt.set_min_objective(objective_fn) |
|||
try: |
|||
qpos = self.opt.optimize(last_qpos) |
|||
return np.array(qpos, dtype=np.float32) |
|||
except RuntimeError as e: |
|||
print(e) |
|||
return np.array(last_qpos, dtype=np.float32) |
|||
|
|||
@abstractmethod |
|||
def get_objective_function(self, ref_value: np.ndarray, fixed_qpos: np.ndarray, last_qpos: np.ndarray): |
|||
pass |
|||
|
|||
@property |
|||
def fixed_joint_names(self): |
|||
joint_names = self.robot.dof_joint_names |
|||
return [joint_names[i] for i in self.idx_pin2fixed] |
|||
|
|||
|
|||
class PositionOptimizer(Optimizer): |
|||
retargeting_type = "POSITION" |
|||
|
|||
def __init__( |
|||
self, |
|||
robot: RobotWrapper, |
|||
target_joint_names: List[str], |
|||
target_link_names: List[str], |
|||
target_link_human_indices: np.ndarray, |
|||
huber_delta=0.02, |
|||
norm_delta=4e-3, |
|||
): |
|||
super().__init__(robot, target_joint_names, target_link_human_indices) |
|||
self.body_names = target_link_names |
|||
self.huber_loss = torch.nn.SmoothL1Loss(beta=huber_delta) |
|||
self.norm_delta = norm_delta |
|||
|
|||
# Sanity check and cache link indices |
|||
self.target_link_indices = self.get_link_indices(target_link_names) |
|||
|
|||
self.opt.set_ftol_abs(1e-5) |
|||
|
|||
def get_objective_function(self, target_pos: np.ndarray, fixed_qpos: np.ndarray, last_qpos: np.ndarray): |
|||
qpos = np.zeros(self.num_joints) |
|||
qpos[self.idx_pin2fixed] = fixed_qpos |
|||
torch_target_pos = torch.as_tensor(target_pos) |
|||
torch_target_pos.requires_grad_(False) |
|||
|
|||
def objective(x: np.ndarray, grad: np.ndarray) -> float: |
|||
qpos[self.idx_pin2target] = x |
|||
|
|||
# Kinematics forwarding for qpos |
|||
if self.adaptor is not None: |
|||
qpos[:] = self.adaptor.forward_qpos(qpos)[:] |
|||
|
|||
self.robot.compute_forward_kinematics(qpos) |
|||
target_link_poses = [self.robot.get_link_pose(index) for index in self.target_link_indices] |
|||
body_pos = np.stack([pose[:3, 3] for pose in target_link_poses], axis=0) # (n ,3) |
|||
|
|||
# Torch computation for accurate loss and grad |
|||
torch_body_pos = torch.as_tensor(body_pos) |
|||
torch_body_pos.requires_grad_() |
|||
|
|||
# Loss term for kinematics retargeting based on 3D position error |
|||
huber_distance = self.huber_loss(torch_body_pos, torch_target_pos) |
|||
result = huber_distance.cpu().detach().item() |
|||
|
|||
if grad.size > 0: |
|||
jacobians = [] |
|||
for i, index in enumerate(self.target_link_indices): |
|||
link_body_jacobian = self.robot.compute_single_link_local_jacobian(qpos, index)[:3, ...] |
|||
link_pose = target_link_poses[i] |
|||
link_rot = link_pose[:3, :3] |
|||
link_kinematics_jacobian = link_rot @ link_body_jacobian |
|||
jacobians.append(link_kinematics_jacobian) |
|||
|
|||
# Note: the joint order in this jacobian is consistent pinocchio |
|||
jacobians = np.stack(jacobians, axis=0) |
|||
huber_distance.backward() |
|||
grad_pos = torch_body_pos.grad.cpu().numpy()[:, None, :] |
|||
|
|||
# Convert the jacobian from pinocchio order to target order |
|||
if self.adaptor is not None: |
|||
jacobians = self.adaptor.backward_jacobian(jacobians) |
|||
else: |
|||
jacobians = jacobians[..., self.idx_pin2target] |
|||
|
|||
# Compute the gradient to the qpos |
|||
grad_qpos = np.matmul(grad_pos, jacobians) |
|||
grad_qpos = grad_qpos.mean(1).sum(0) |
|||
grad_qpos += 2 * self.norm_delta * (x - last_qpos) |
|||
|
|||
grad[:] = grad_qpos[:] |
|||
|
|||
return result |
|||
|
|||
return objective |
|||
|
|||
|
|||
class VectorOptimizer(Optimizer): |
|||
retargeting_type = "VECTOR" |
|||
|
|||
def __init__( |
|||
self, |
|||
robot: RobotWrapper, |
|||
target_joint_names: List[str], |
|||
target_origin_link_names: List[str], |
|||
target_task_link_names: List[str], |
|||
target_link_human_indices: np.ndarray, |
|||
huber_delta=0.02, |
|||
norm_delta=4e-3, |
|||
scaling=1.0, |
|||
): |
|||
super().__init__(robot, target_joint_names, target_link_human_indices) |
|||
self.origin_link_names = target_origin_link_names |
|||
self.task_link_names = target_task_link_names |
|||
self.huber_loss = torch.nn.SmoothL1Loss(beta=huber_delta, reduction="mean") |
|||
self.norm_delta = norm_delta |
|||
self.scaling = scaling |
|||
|
|||
# Computation cache for better performance |
|||
# For one link used in multiple vectors, e.g. hand palm, we do not want to compute it multiple times |
|||
self.computed_link_names = list(set(target_origin_link_names).union(set(target_task_link_names))) |
|||
self.origin_link_indices = torch.tensor( |
|||
[self.computed_link_names.index(name) for name in target_origin_link_names] |
|||
) |
|||
self.task_link_indices = torch.tensor([self.computed_link_names.index(name) for name in target_task_link_names]) |
|||
|
|||
# Cache link indices that will involve in kinematics computation |
|||
self.computed_link_indices = self.get_link_indices(self.computed_link_names) |
|||
|
|||
self.opt.set_ftol_abs(1e-6) |
|||
|
|||
def get_objective_function(self, target_vector: np.ndarray, fixed_qpos: np.ndarray, last_qpos: np.ndarray): |
|||
qpos = np.zeros(self.num_joints) |
|||
qpos[self.idx_pin2fixed] = fixed_qpos |
|||
torch_target_vec = torch.as_tensor(target_vector) * self.scaling |
|||
torch_target_vec.requires_grad_(False) |
|||
|
|||
def objective(x: np.ndarray, grad: np.ndarray) -> float: |
|||
qpos[self.idx_pin2target] = x |
|||
|
|||
# Kinematics forwarding for qpos |
|||
if self.adaptor is not None: |
|||
qpos[:] = self.adaptor.forward_qpos(qpos)[:] |
|||
|
|||
self.robot.compute_forward_kinematics(qpos) |
|||
target_link_poses = [self.robot.get_link_pose(index) for index in self.computed_link_indices] |
|||
body_pos = np.array([pose[:3, 3] for pose in target_link_poses]) |
|||
|
|||
# Torch computation for accurate loss and grad |
|||
torch_body_pos = torch.as_tensor(body_pos) |
|||
torch_body_pos.requires_grad_() |
|||
|
|||
# Index link for computation |
|||
origin_link_pos = torch_body_pos[self.origin_link_indices, :] |
|||
task_link_pos = torch_body_pos[self.task_link_indices, :] |
|||
robot_vec = task_link_pos - origin_link_pos |
|||
|
|||
# Loss term for kinematics retargeting based on 3D position error |
|||
vec_dist = torch.norm(robot_vec - torch_target_vec, dim=1, keepdim=False) |
|||
huber_distance = self.huber_loss(vec_dist, torch.zeros_like(vec_dist)) |
|||
result = huber_distance.cpu().detach().item() |
|||
|
|||
if grad.size > 0: |
|||
jacobians = [] |
|||
for i, index in enumerate(self.computed_link_indices): |
|||
link_body_jacobian = self.robot.compute_single_link_local_jacobian(qpos, index)[:3, ...] |
|||
link_pose = target_link_poses[i] |
|||
link_rot = link_pose[:3, :3] |
|||
link_kinematics_jacobian = link_rot @ link_body_jacobian |
|||
jacobians.append(link_kinematics_jacobian) |
|||
|
|||
# Note: the joint order in this jacobian is consistent pinocchio |
|||
jacobians = np.stack(jacobians, axis=0) |
|||
huber_distance.backward() |
|||
grad_pos = torch_body_pos.grad.cpu().numpy()[:, None, :] |
|||
|
|||
# Convert the jacobian from pinocchio order to target order |
|||
if self.adaptor is not None: |
|||
jacobians = self.adaptor.backward_jacobian(jacobians) |
|||
else: |
|||
jacobians = jacobians[..., self.idx_pin2target] |
|||
|
|||
grad_qpos = np.matmul(grad_pos, np.array(jacobians)) |
|||
grad_qpos = grad_qpos.mean(1).sum(0) |
|||
grad_qpos += 2 * self.norm_delta * (x - last_qpos) |
|||
|
|||
grad[:] = grad_qpos[:] |
|||
|
|||
return result |
|||
|
|||
return objective |
|||
|
|||
|
|||
class DexPilotOptimizer(Optimizer): |
|||
"""Retargeting optimizer using the method proposed in DexPilot |
|||
|
|||
This is a broader adaptation of the original optimizer delineated in the DexPilot paper. |
|||
While the initial DexPilot study focused solely on the four-fingered Allegro Hand, this version of the optimizer |
|||
embraces the same principles for both four-fingered and five-fingered hands. It projects the distance between the |
|||
thumb and the other fingers to facilitate more stable grasping. |
|||
Reference: https://arxiv.org/abs/1910.03135 |
|||
|
|||
Args: |
|||
robot: |
|||
target_joint_names: |
|||
finger_tip_link_names: |
|||
wrist_link_name: |
|||
gamma: |
|||
project_dist: |
|||
escape_dist: |
|||
eta1: |
|||
eta2: |
|||
scaling: |
|||
""" |
|||
|
|||
retargeting_type = "DEXPILOT" |
|||
|
|||
def __init__( |
|||
self, |
|||
robot: RobotWrapper, |
|||
target_joint_names: List[str], |
|||
finger_tip_link_names: List[str], |
|||
wrist_link_name: str, |
|||
target_link_human_indices: Optional[np.ndarray] = None, |
|||
huber_delta=0.03, |
|||
norm_delta=4e-3, |
|||
# DexPilot parameters |
|||
# gamma=2.5e-3, |
|||
project_dist=0.03, |
|||
escape_dist=0.05, |
|||
eta1=1e-4, |
|||
eta2=3e-2, |
|||
scaling=1.0, |
|||
): |
|||
if len(finger_tip_link_names) < 2 or len(finger_tip_link_names) > 5: |
|||
raise ValueError( |
|||
f"DexPilot optimizer can only be applied to hands with 2 to 5 fingers, but got " |
|||
f"{len(finger_tip_link_names)} fingers." |
|||
) |
|||
self.num_fingers = len(finger_tip_link_names) |
|||
|
|||
origin_link_index, task_link_index = self.generate_link_indices(self.num_fingers) |
|||
|
|||
if target_link_human_indices is None: |
|||
target_link_human_indices = (np.stack([origin_link_index, task_link_index], axis=0) * 4).astype(int) |
|||
link_names = [wrist_link_name] + finger_tip_link_names |
|||
target_origin_link_names = [link_names[index] for index in origin_link_index] |
|||
target_task_link_names = [link_names[index] for index in task_link_index] |
|||
|
|||
super().__init__(robot, target_joint_names, target_link_human_indices) |
|||
self.origin_link_names = target_origin_link_names |
|||
self.task_link_names = target_task_link_names |
|||
self.scaling = scaling |
|||
self.huber_loss = torch.nn.SmoothL1Loss(beta=huber_delta, reduction="none") |
|||
self.norm_delta = norm_delta |
|||
|
|||
# DexPilot parameters |
|||
self.project_dist = project_dist |
|||
self.escape_dist = escape_dist |
|||
self.eta1 = eta1 |
|||
self.eta2 = eta2 |
|||
|
|||
# Computation cache for better performance |
|||
# For one link used in multiple vectors, e.g. hand palm, we do not want to compute it multiple times |
|||
self.computed_link_names = list(set(target_origin_link_names).union(set(target_task_link_names))) |
|||
self.origin_link_indices = torch.tensor( |
|||
[self.computed_link_names.index(name) for name in target_origin_link_names] |
|||
) |
|||
self.task_link_indices = torch.tensor([self.computed_link_names.index(name) for name in target_task_link_names]) |
|||
|
|||
# Sanity check and cache link indices |
|||
self.computed_link_indices = self.get_link_indices(self.computed_link_names) |
|||
|
|||
self.opt.set_ftol_abs(1e-6) |
|||
|
|||
# DexPilot cache |
|||
self.projected, self.s2_project_index_origin, self.s2_project_index_task, self.projected_dist = ( |
|||
self.set_dexpilot_cache(self.num_fingers, eta1, eta2) |
|||
) |
|||
|
|||
@staticmethod |
|||
def generate_link_indices(num_fingers): |
|||
""" |
|||
Example: |
|||
>>> generate_link_indices(4) |
|||
([2, 3, 4, 3, 4, 4, 0, 0, 0, 0], [1, 1, 1, 2, 2, 3, 1, 2, 3, 4]) |
|||
""" |
|||
origin_link_index = [] |
|||
task_link_index = [] |
|||
|
|||
# Add indices for connections between fingers |
|||
for i in range(1, num_fingers): |
|||
for j in range(i + 1, num_fingers + 1): |
|||
origin_link_index.append(j) |
|||
task_link_index.append(i) |
|||
|
|||
# Add indices for connections to the base (0) |
|||
for i in range(1, num_fingers + 1): |
|||
origin_link_index.append(0) |
|||
task_link_index.append(i) |
|||
|
|||
return origin_link_index, task_link_index |
|||
|
|||
@staticmethod |
|||
def set_dexpilot_cache(num_fingers, eta1, eta2): |
|||
""" |
|||
Example: |
|||
>>> set_dexpilot_cache(4, 0.1, 0.2) |
|||
(array([False, False, False, False, False, False]), |
|||
[1, 2, 2], |
|||
[0, 0, 1], |
|||
array([0.1, 0.1, 0.1, 0.2, 0.2, 0.2])) |
|||
""" |
|||
projected = np.zeros(num_fingers * (num_fingers - 1) // 2, dtype=bool) |
|||
|
|||
s2_project_index_origin = [] |
|||
s2_project_index_task = [] |
|||
for i in range(0, num_fingers - 2): |
|||
for j in range(i + 1, num_fingers - 1): |
|||
s2_project_index_origin.append(j) |
|||
s2_project_index_task.append(i) |
|||
|
|||
projected_dist = np.array([eta1] * (num_fingers - 1) + [eta2] * ((num_fingers - 1) * (num_fingers - 2) // 2)) |
|||
|
|||
return projected, s2_project_index_origin, s2_project_index_task, projected_dist |
|||
|
|||
def get_objective_function(self, target_vector: np.ndarray, fixed_qpos: np.ndarray, last_qpos: np.ndarray): |
|||
qpos = np.zeros(self.num_joints) |
|||
qpos[self.idx_pin2fixed] = fixed_qpos |
|||
|
|||
len_proj = len(self.projected) |
|||
len_s2 = len(self.s2_project_index_task) |
|||
len_s1 = len_proj - len_s2 |
|||
|
|||
# Update projection indicator |
|||
target_vec_dist = np.linalg.norm(target_vector[:len_proj], axis=1) |
|||
self.projected[:len_s1][target_vec_dist[0:len_s1] < self.project_dist] = True |
|||
self.projected[:len_s1][target_vec_dist[0:len_s1] > self.escape_dist] = False |
|||
self.projected[len_s1:len_proj] = np.logical_and( |
|||
self.projected[:len_s1][self.s2_project_index_origin], self.projected[:len_s1][self.s2_project_index_task] |
|||
) |
|||
self.projected[len_s1:len_proj] = np.logical_and( |
|||
self.projected[len_s1:len_proj], target_vec_dist[len_s1:len_proj] <= 0.03 |
|||
) |
|||
|
|||
# Update weight vector |
|||
normal_weight = np.ones(len_proj, dtype=np.float32) * 1 |
|||
high_weight = np.array([200] * len_s1 + [400] * len_s2, dtype=np.float32) |
|||
weight = np.where(self.projected, high_weight, normal_weight) |
|||
|
|||
# We change the weight to 10 instead of 1 here, for vector originate from wrist to fingertips |
|||
# This ensures better intuitive mapping due wrong pose detection |
|||
weight = torch.from_numpy( |
|||
np.concatenate([weight, np.ones(self.num_fingers, dtype=np.float32) * len_proj + self.num_fingers]) |
|||
) |
|||
|
|||
# Compute reference distance vector |
|||
normal_vec = target_vector * self.scaling # (10, 3) |
|||
dir_vec = target_vector[:len_proj] / (target_vec_dist[:, None] + 1e-6) # (6, 3) |
|||
projected_vec = dir_vec * self.projected_dist[:, None] # (6, 3) |
|||
|
|||
# Compute final reference vector |
|||
reference_vec = np.where(self.projected[:, None], projected_vec, normal_vec[:len_proj]) # (6, 3) |
|||
reference_vec = np.concatenate([reference_vec, normal_vec[len_proj:]], axis=0) # (10, 3) |
|||
torch_target_vec = torch.as_tensor(reference_vec, dtype=torch.float32) |
|||
torch_target_vec.requires_grad_(False) |
|||
|
|||
def objective(x: np.ndarray, grad: np.ndarray) -> float: |
|||
qpos[self.idx_pin2target] = x |
|||
|
|||
# Kinematics forwarding for qpos |
|||
if self.adaptor is not None: |
|||
qpos[:] = self.adaptor.forward_qpos(qpos)[:] |
|||
|
|||
self.robot.compute_forward_kinematics(qpos) |
|||
target_link_poses = [self.robot.get_link_pose(index) for index in self.computed_link_indices] |
|||
body_pos = np.array([pose[:3, 3] for pose in target_link_poses]) |
|||
|
|||
# Torch computation for accurate loss and grad |
|||
torch_body_pos = torch.as_tensor(body_pos) |
|||
torch_body_pos.requires_grad_() |
|||
|
|||
# Index link for computation |
|||
origin_link_pos = torch_body_pos[self.origin_link_indices, :] |
|||
task_link_pos = torch_body_pos[self.task_link_indices, :] |
|||
robot_vec = task_link_pos - origin_link_pos |
|||
|
|||
# Loss term for kinematics retargeting based on 3D position error |
|||
# Different from the original DexPilot, we use huber loss here instead of the squared dist |
|||
vec_dist = torch.norm(robot_vec - torch_target_vec, dim=1, keepdim=False) |
|||
huber_distance = ( |
|||
self.huber_loss(vec_dist, torch.zeros_like(vec_dist)) * weight / (robot_vec.shape[0]) |
|||
).sum() |
|||
huber_distance = huber_distance.sum() |
|||
result = huber_distance.cpu().detach().item() |
|||
|
|||
if grad.size > 0: |
|||
jacobians = [] |
|||
for i, index in enumerate(self.computed_link_indices): |
|||
link_body_jacobian = self.robot.compute_single_link_local_jacobian(qpos, index)[:3, ...] |
|||
link_pose = target_link_poses[i] |
|||
link_rot = link_pose[:3, :3] |
|||
link_kinematics_jacobian = link_rot @ link_body_jacobian |
|||
jacobians.append(link_kinematics_jacobian) |
|||
|
|||
# Note: the joint order in this jacobian is consistent pinocchio |
|||
jacobians = np.stack(jacobians, axis=0) |
|||
huber_distance.backward() |
|||
grad_pos = torch_body_pos.grad.cpu().numpy()[:, None, :] |
|||
|
|||
# Convert the jacobian from pinocchio order to target order |
|||
if self.adaptor is not None: |
|||
jacobians = self.adaptor.backward_jacobian(jacobians) |
|||
else: |
|||
jacobians = jacobians[..., self.idx_pin2target] |
|||
|
|||
grad_qpos = np.matmul(grad_pos, np.array(jacobians)) |
|||
grad_qpos = grad_qpos.mean(1).sum(0) |
|||
|
|||
# In the original DexPilot, γ = 2.5 × 10−3 is a weight on regularizing the Allegro angles to zero |
|||
# which is equivalent to fully opened the hand |
|||
# In our implementation, we regularize the joint angles to the previous joint angles |
|||
grad_qpos += 2 * self.norm_delta * (x - last_qpos) |
|||
|
|||
grad[:] = grad_qpos[:] |
|||
|
|||
return result |
|||
|
|||
return objective |
|||
@ -1,17 +0,0 @@ |
|||
class LPFilter: |
|||
def __init__(self, alpha): |
|||
self.alpha = alpha |
|||
self.y = None |
|||
self.is_init = False |
|||
|
|||
def next(self, x): |
|||
if not self.is_init: |
|||
self.y = x |
|||
self.is_init = True |
|||
return self.y.copy() |
|||
self.y = self.y + self.alpha * (x - self.y) |
|||
return self.y.copy() |
|||
|
|||
def reset(self): |
|||
self.y = None |
|||
self.is_init = False |
|||
@ -1,251 +0,0 @@ |
|||
from dataclasses import dataclass |
|||
from pathlib import Path |
|||
from typing import List, Optional, Dict, Any, Tuple |
|||
from typing import Union |
|||
|
|||
import numpy as np |
|||
import yaml |
|||
import os |
|||
|
|||
from . import yourdfpy as urdf |
|||
from .kinematics_adaptor import MimicJointKinematicAdaptor |
|||
from .optimizer_utils import LPFilter |
|||
from .robot_wrapper import RobotWrapper |
|||
from .seq_retarget import SeqRetargeting |
|||
from .yourdfpy import DUMMY_JOINT_NAMES |
|||
|
|||
|
|||
@dataclass |
|||
class RetargetingConfig: |
|||
type: str |
|||
urdf_path: str |
|||
|
|||
# Whether to add free joint to the root of the robot. Free joint enable the robot hand move freely in the space |
|||
add_dummy_free_joint: bool = False |
|||
|
|||
# Source refers to the retargeting input, which usually corresponds to the human hand |
|||
# The joint indices of human hand joint which corresponds to each link in the target_link_names |
|||
target_link_human_indices: Optional[np.ndarray] = None |
|||
|
|||
# The link on the robot hand which corresponding to the wrist of human hand |
|||
wrist_link_name: Optional[str] = None |
|||
|
|||
# Position retargeting link names |
|||
target_link_names: Optional[List[str]] = None |
|||
|
|||
# Vector retargeting link names |
|||
target_joint_names: Optional[List[str]] = None |
|||
target_origin_link_names: Optional[List[str]] = None |
|||
target_task_link_names: Optional[List[str]] = None |
|||
|
|||
# DexPilot retargeting link names |
|||
finger_tip_link_names: Optional[List[str]] = None |
|||
|
|||
# Scaling factor for vector retargeting only |
|||
# For example, Allegro is 1.6 times larger than normal human hand, then this scaling factor should be 1.6 |
|||
scaling_factor: float = 1.0 |
|||
|
|||
# Optimization parameters |
|||
normal_delta: float = 4e-3 |
|||
huber_delta: float = 2e-2 |
|||
|
|||
# DexPilot optimizer parameters |
|||
project_dist: float = 0.03 |
|||
escape_dist: float = 0.05 |
|||
|
|||
# Joint limit tag |
|||
has_joint_limits: bool = True |
|||
|
|||
# Mimic joint tag |
|||
ignore_mimic_joint: bool = False |
|||
|
|||
# Low pass filter |
|||
low_pass_alpha: float = 0.1 |
|||
|
|||
_TYPE = ["vector", "position", "dexpilot"] |
|||
_DEFAULT_URDF_DIR = "./" |
|||
|
|||
def __post_init__(self): |
|||
# Retargeting type check |
|||
self.type = self.type.lower() |
|||
if self.type not in self._TYPE: |
|||
raise ValueError(f"Retargeting type must be one of {self._TYPE}") |
|||
|
|||
# Vector retargeting requires: target_origin_link_names + target_task_link_names |
|||
# Position retargeting requires: target_link_names |
|||
if self.type == "vector": |
|||
if self.target_origin_link_names is None or self.target_task_link_names is None: |
|||
raise ValueError(f"Vector retargeting requires: target_origin_link_names + target_task_link_names") |
|||
if len(self.target_task_link_names) != len(self.target_origin_link_names): |
|||
raise ValueError(f"Vector retargeting origin and task links dim mismatch") |
|||
if self.target_link_human_indices.shape != (2, len(self.target_origin_link_names)): |
|||
raise ValueError(f"Vector retargeting link names and link indices dim mismatch") |
|||
if self.target_link_human_indices is None: |
|||
raise ValueError(f"Vector retargeting requires: target_link_human_indices") |
|||
|
|||
elif self.type == "position": |
|||
if self.target_link_names is None: |
|||
raise ValueError(f"Position retargeting requires: target_link_names") |
|||
self.target_link_human_indices = self.target_link_human_indices.squeeze() |
|||
if self.target_link_human_indices.shape != (len(self.target_link_names),): |
|||
raise ValueError(f"Position retargeting link names and link indices dim mismatch") |
|||
if self.target_link_human_indices is None: |
|||
raise ValueError(f"Position retargeting requires: target_link_human_indices") |
|||
|
|||
elif self.type == "dexpilot": |
|||
if self.finger_tip_link_names is None or self.wrist_link_name is None: |
|||
raise ValueError(f"Position retargeting requires: finger_tip_link_names + wrist_link_name") |
|||
if self.target_link_human_indices is not None: |
|||
print( |
|||
"\033[33m", |
|||
"Target link human indices is provided in the DexPilot retargeting config, which is uncommon.\n" |
|||
"If you do not know exactly how it is used, please leave it to None for default.\n" |
|||
"\033[00m", |
|||
) |
|||
|
|||
# URDF path check |
|||
urdf_path = Path(self.urdf_path) |
|||
if not urdf_path.is_absolute(): |
|||
urdf_path = self._DEFAULT_URDF_DIR / urdf_path |
|||
urdf_path = urdf_path.absolute() |
|||
if not urdf_path.exists(): |
|||
raise ValueError(f"URDF path {urdf_path} does not exist") |
|||
self.urdf_path = str(urdf_path) |
|||
|
|||
@classmethod |
|||
def set_default_urdf_dir(cls, urdf_dir: Union[str, Path]): |
|||
path = Path(urdf_dir) |
|||
if not path.exists(): |
|||
raise ValueError(f"URDF dir {urdf_dir} not exists.") |
|||
cls._DEFAULT_URDF_DIR = urdf_dir |
|||
|
|||
@classmethod |
|||
def load_from_file(cls, config_path: Union[str, Path], override: Optional[Dict] = None): |
|||
path = Path(config_path) |
|||
if not path.is_absolute(): |
|||
path = path.absolute() |
|||
|
|||
with path.open("r") as f: |
|||
yaml_config = yaml.load(f, Loader=yaml.FullLoader) |
|||
cfg = yaml_config["retargeting"] |
|||
return cls.from_dict(cfg, override) |
|||
|
|||
@classmethod |
|||
def from_dict(cls, cfg: Dict[str, Any], override: Optional[Dict] = None): |
|||
if "target_link_human_indices" in cfg: |
|||
cfg["target_link_human_indices"] = np.array(cfg["target_link_human_indices"]) |
|||
if override is not None: |
|||
for key, value in override.items(): |
|||
cfg[key] = value |
|||
config = RetargetingConfig(**cfg) |
|||
return config |
|||
|
|||
def build(self) -> SeqRetargeting: |
|||
from .optimizer import ( |
|||
VectorOptimizer, |
|||
PositionOptimizer, |
|||
DexPilotOptimizer, |
|||
) |
|||
import tempfile |
|||
|
|||
# Process the URDF with yourdfpy to better find file path |
|||
robot_urdf = urdf.URDF.load( |
|||
self.urdf_path, add_dummy_free_joints=self.add_dummy_free_joint, build_scene_graph=False |
|||
) |
|||
urdf_name = self.urdf_path.split(os.path.sep)[-1] |
|||
temp_dir = tempfile.mkdtemp(prefix="dex_retargeting-") |
|||
temp_path = f"{temp_dir}/{urdf_name}" |
|||
robot_urdf.write_xml_file(temp_path) |
|||
|
|||
# Load pinocchio model |
|||
robot = RobotWrapper(temp_path) |
|||
|
|||
# Add 6D dummy joint to target joint names so that it will also be optimized |
|||
if self.add_dummy_free_joint and self.target_joint_names is not None: |
|||
self.target_joint_names = DUMMY_JOINT_NAMES + self.target_joint_names |
|||
joint_names = self.target_joint_names if self.target_joint_names is not None else robot.dof_joint_names |
|||
|
|||
if self.type == "position": |
|||
optimizer = PositionOptimizer( |
|||
robot, |
|||
joint_names, |
|||
target_link_names=self.target_link_names, |
|||
target_link_human_indices=self.target_link_human_indices, |
|||
norm_delta=self.normal_delta, |
|||
huber_delta=self.huber_delta, |
|||
) |
|||
elif self.type == "vector": |
|||
optimizer = VectorOptimizer( |
|||
robot, |
|||
joint_names, |
|||
target_origin_link_names=self.target_origin_link_names, |
|||
target_task_link_names=self.target_task_link_names, |
|||
target_link_human_indices=self.target_link_human_indices, |
|||
scaling=self.scaling_factor, |
|||
norm_delta=self.normal_delta, |
|||
huber_delta=self.huber_delta, |
|||
) |
|||
elif self.type == "dexpilot": |
|||
optimizer = DexPilotOptimizer( |
|||
robot, |
|||
joint_names, |
|||
finger_tip_link_names=self.finger_tip_link_names, |
|||
wrist_link_name=self.wrist_link_name, |
|||
target_link_human_indices=self.target_link_human_indices, |
|||
scaling=self.scaling_factor, |
|||
project_dist=self.project_dist, |
|||
escape_dist=self.escape_dist, |
|||
) |
|||
else: |
|||
raise RuntimeError() |
|||
|
|||
if 0 <= self.low_pass_alpha <= 1: |
|||
lp_filter = LPFilter(self.low_pass_alpha) |
|||
else: |
|||
lp_filter = None |
|||
|
|||
# Parse mimic joints and set kinematics adaptor for optimizer |
|||
has_mimic_joints, source_names, mimic_names, multipliers, offsets = parse_mimic_joint(robot_urdf) |
|||
if has_mimic_joints and not self.ignore_mimic_joint: |
|||
adaptor = MimicJointKinematicAdaptor( |
|||
robot, |
|||
target_joint_names=joint_names, |
|||
source_joint_names=source_names, |
|||
mimic_joint_names=mimic_names, |
|||
multipliers=multipliers, |
|||
offsets=offsets, |
|||
) |
|||
optimizer.set_kinematic_adaptor(adaptor) |
|||
print( |
|||
"\033[34m", |
|||
"Mimic joint adaptor enabled. The mimic joint tags in the URDF will be considered during retargeting.\n" |
|||
"To disable mimic joint adaptor, consider setting ignore_mimic_joint=True in the configuration.", |
|||
"\033[39m", |
|||
) |
|||
|
|||
retargeting = SeqRetargeting( |
|||
optimizer, |
|||
has_joint_limits=self.has_joint_limits, |
|||
lp_filter=lp_filter, |
|||
) |
|||
return retargeting |
|||
|
|||
|
|||
def get_retargeting_config(config_path: Union[str, Path]) -> RetargetingConfig: |
|||
config = RetargetingConfig.load_from_file(config_path) |
|||
return config |
|||
|
|||
|
|||
def parse_mimic_joint(robot_urdf: urdf.URDF) -> Tuple[bool, List[str], List[str], List[float], List[float]]: |
|||
mimic_joint_names = [] |
|||
source_joint_names = [] |
|||
multipliers = [] |
|||
offsets = [] |
|||
for name, joint in robot_urdf.joint_map.items(): |
|||
if joint.mimic is not None: |
|||
mimic_joint_names.append(name) |
|||
source_joint_names.append(joint.mimic.joint) |
|||
multipliers.append(joint.mimic.multiplier) |
|||
offsets.append(joint.mimic.offset) |
|||
|
|||
return len(mimic_joint_names) > 0, source_joint_names, mimic_joint_names, multipliers, offsets |
|||
@ -1,93 +0,0 @@ |
|||
from typing import List |
|||
|
|||
import numpy as np |
|||
import numpy.typing as npt |
|||
import pinocchio as pin |
|||
|
|||
|
|||
class RobotWrapper: |
|||
""" |
|||
This class does not take mimic joint into consideration |
|||
""" |
|||
|
|||
def __init__(self, urdf_path: str, use_collision=False, use_visual=False): |
|||
# Create robot model and data |
|||
self.model: pin.Model = pin.buildModelFromUrdf(urdf_path) |
|||
self.data: pin.Data = self.model.createData() |
|||
|
|||
if use_visual or use_collision: |
|||
raise NotImplementedError |
|||
|
|||
self.q0 = pin.neutral(self.model) |
|||
if self.model.nv != self.model.nq: |
|||
raise NotImplementedError(f"Can not handle robot with special joint.") |
|||
|
|||
# -------------------------------------------------------------------------- # |
|||
# Robot property |
|||
# -------------------------------------------------------------------------- # |
|||
@property |
|||
def joint_names(self) -> List[str]: |
|||
return list(self.model.names) |
|||
|
|||
@property |
|||
def dof_joint_names(self) -> List[str]: |
|||
nqs = self.model.nqs |
|||
return [name for i, name in enumerate(self.model.names) if nqs[i] > 0] |
|||
|
|||
@property |
|||
def dof(self) -> int: |
|||
return self.model.nq |
|||
|
|||
@property |
|||
def link_names(self) -> List[str]: |
|||
link_names = [] |
|||
for i, frame in enumerate(self.model.frames): |
|||
link_names.append(frame.name) |
|||
return link_names |
|||
|
|||
@property |
|||
def joint_limits(self): |
|||
lower = self.model.lowerPositionLimit |
|||
upper = self.model.upperPositionLimit |
|||
return np.stack([lower, upper], axis=1) |
|||
|
|||
# -------------------------------------------------------------------------- # |
|||
# Query function |
|||
# -------------------------------------------------------------------------- # |
|||
def get_joint_index(self, name: str): |
|||
return self.dof_joint_names.index(name) |
|||
|
|||
def get_link_index(self, name: str): |
|||
if name not in self.link_names: |
|||
raise ValueError(f"{name} is not a link name. Valid link names: \n{self.link_names}") |
|||
return self.model.getFrameId(name, pin.BODY) |
|||
|
|||
def get_joint_parent_child_frames(self, joint_name: str): |
|||
joint_id = self.model.getFrameId(joint_name) |
|||
parent_id = self.model.frames[joint_id].parent |
|||
child_id = -1 |
|||
for idx, frame in enumerate(self.model.frames): |
|||
if frame.previousFrame == joint_id: |
|||
child_id = idx |
|||
if child_id == -1: |
|||
raise ValueError(f"Can not find child link of {joint_name}") |
|||
|
|||
return parent_id, child_id |
|||
|
|||
# -------------------------------------------------------------------------- # |
|||
# Kinematics function |
|||
# -------------------------------------------------------------------------- # |
|||
def compute_forward_kinematics(self, qpos: npt.NDArray): |
|||
pin.forwardKinematics(self.model, self.data, qpos) |
|||
|
|||
def get_link_pose(self, link_id: int) -> npt.NDArray: |
|||
pose: pin.SE3 = pin.updateFramePlacement(self.model, self.data, link_id) |
|||
return pose.homogeneous |
|||
|
|||
def get_link_pose_inv(self, link_id: int) -> npt.NDArray: |
|||
pose: pin.SE3 = pin.updateFramePlacement(self.model, self.data, link_id) |
|||
return pose.inverse().homogeneous |
|||
|
|||
def compute_single_link_local_jacobian(self, qpos, link_id: int) -> npt.NDArray: |
|||
J = pin.computeFrameJacobian(self.model, self.data, qpos, link_id) |
|||
return J |
|||
@ -1,151 +0,0 @@ |
|||
import time |
|||
from typing import Optional |
|||
|
|||
import numpy as np |
|||
from pytransform3d import rotations |
|||
|
|||
from .constants import OPERATOR2MANO, HandType |
|||
from .optimizer import Optimizer |
|||
from .optimizer_utils import LPFilter |
|||
|
|||
|
|||
class SeqRetargeting: |
|||
def __init__( |
|||
self, |
|||
optimizer: Optimizer, |
|||
has_joint_limits=True, |
|||
lp_filter: Optional[LPFilter] = None, |
|||
): |
|||
self.optimizer = optimizer |
|||
robot = self.optimizer.robot |
|||
|
|||
# Joint limit |
|||
self.has_joint_limits = has_joint_limits |
|||
joint_limits = np.ones_like(robot.joint_limits) |
|||
joint_limits[:, 0] = -1e4 # a large value is equivalent to no limit |
|||
joint_limits[:, 1] = 1e4 |
|||
if has_joint_limits: |
|||
joint_limits[:] = robot.joint_limits[:] |
|||
self.optimizer.set_joint_limit(joint_limits[self.optimizer.idx_pin2target]) |
|||
self.joint_limits = joint_limits[self.optimizer.idx_pin2target] |
|||
|
|||
# Temporal information |
|||
self.last_qpos = joint_limits.mean(1)[self.optimizer.idx_pin2target].astype(np.float32) |
|||
self.accumulated_time = 0 |
|||
self.num_retargeting = 0 |
|||
|
|||
# Filter |
|||
self.filter = lp_filter |
|||
|
|||
# Warm started |
|||
self.is_warm_started = False |
|||
|
|||
def warm_start( |
|||
self, |
|||
wrist_pos: np.ndarray, |
|||
wrist_quat: np.ndarray, |
|||
hand_type: HandType = HandType.right, |
|||
is_mano_convention: bool = False, |
|||
): |
|||
""" |
|||
Initialize the wrist joint pose using analytical computation instead of retargeting optimization. |
|||
This function is specifically for position retargeting with the flying robot hand, i.e. has 6D free joint |
|||
You are not expected to use this function for vector retargeting, e.g. when you are working on teleoperation |
|||
|
|||
Args: |
|||
wrist_pos: position of the hand wrist, typically from human hand pose |
|||
wrist_quat: quaternion of the hand wrist, the same convention as the operator frame definition if not is_mano_convention |
|||
hand_type: hand type, used to determine the operator2mano matrix |
|||
is_mano_convention: whether the wrist_quat is in mano convention |
|||
""" |
|||
# This function can only be used when the first joints of robot are free joints |
|||
|
|||
if len(wrist_pos) != 3: |
|||
raise ValueError(f"Wrist pos: {wrist_pos} is not a 3-dim vector.") |
|||
if len(wrist_quat) != 4: |
|||
raise ValueError(f"Wrist quat: {wrist_quat} is not a 4-dim vector.") |
|||
|
|||
operator2mano = OPERATOR2MANO[hand_type] if is_mano_convention else np.eye(3) |
|||
robot = self.optimizer.robot |
|||
target_wrist_pose = np.eye(4) |
|||
target_wrist_pose[:3, :3] = rotations.matrix_from_quaternion(wrist_quat) @ operator2mano.T |
|||
target_wrist_pose[:3, 3] = wrist_pos |
|||
|
|||
name_list = [ |
|||
"dummy_x_translation_joint", |
|||
"dummy_y_translation_joint", |
|||
"dummy_z_translation_joint", |
|||
"dummy_x_rotation_joint", |
|||
"dummy_y_rotation_joint", |
|||
"dummy_z_rotation_joint", |
|||
] |
|||
wrist_link_id = robot.get_joint_parent_child_frames(name_list[5])[1] |
|||
|
|||
# Set the dummy joints angles to zero |
|||
old_qpos = robot.q0 |
|||
new_qpos = old_qpos.copy() |
|||
for num, joint_name in enumerate(self.optimizer.target_joint_names): |
|||
if joint_name in name_list: |
|||
new_qpos[num] = 0 |
|||
|
|||
robot.compute_forward_kinematics(new_qpos) |
|||
root2wrist = robot.get_link_pose_inv(wrist_link_id) |
|||
target_root_pose = target_wrist_pose @ root2wrist |
|||
|
|||
euler = rotations.euler_from_matrix(target_root_pose[:3, :3], 0, 1, 2, extrinsic=False) |
|||
pose_vec = np.concatenate([target_root_pose[:3, 3], euler]) |
|||
|
|||
# Find the dummy joints |
|||
for num, joint_name in enumerate(self.optimizer.target_joint_names): |
|||
if joint_name in name_list: |
|||
index = name_list.index(joint_name) |
|||
self.last_qpos[num] = pose_vec[index] |
|||
|
|||
self.is_warm_started = True |
|||
|
|||
def retarget(self, ref_value, fixed_qpos=np.array([])): |
|||
tic = time.perf_counter() |
|||
|
|||
qpos = self.optimizer.retarget( |
|||
ref_value=ref_value.astype(np.float32), |
|||
fixed_qpos=fixed_qpos.astype(np.float32), |
|||
last_qpos=np.clip(self.last_qpos, self.joint_limits[:, 0], self.joint_limits[:, 1]), |
|||
) |
|||
self.accumulated_time += time.perf_counter() - tic |
|||
self.num_retargeting += 1 |
|||
self.last_qpos = qpos |
|||
robot_qpos = np.zeros(self.optimizer.robot.dof) |
|||
robot_qpos[self.optimizer.idx_pin2fixed] = fixed_qpos |
|||
robot_qpos[self.optimizer.idx_pin2target] = qpos |
|||
|
|||
if self.optimizer.adaptor is not None: |
|||
robot_qpos = self.optimizer.adaptor.forward_qpos(robot_qpos) |
|||
|
|||
if self.filter is not None: |
|||
robot_qpos = self.filter.next(robot_qpos) |
|||
return robot_qpos |
|||
|
|||
def set_qpos(self, robot_qpos: np.ndarray): |
|||
target_qpos = robot_qpos[self.optimizer.idx_pin2target] |
|||
self.last_qpos = target_qpos |
|||
|
|||
def get_qpos(self, fixed_qpos: Optional[np.ndarray] = None): |
|||
robot_qpos = np.zeros(self.optimizer.robot.dof) |
|||
robot_qpos[self.optimizer.idx_pin2target] = self.last_qpos |
|||
if fixed_qpos is not None: |
|||
robot_qpos[self.optimizer.idx_pin2fixed] = fixed_qpos |
|||
return robot_qpos |
|||
|
|||
def verbose(self): |
|||
min_value = self.optimizer.opt.last_optimum_value() |
|||
print(f"Retargeting {self.num_retargeting} times takes: {self.accumulated_time}s") |
|||
print(f"Last distance: {min_value}") |
|||
|
|||
def reset(self): |
|||
self.last_qpos = self.joint_limits.mean(1).astype(np.float32) |
|||
self.num_retargeting = 0 |
|||
self.accumulated_time = 0 |
|||
|
|||
@property |
|||
def joint_names(self): |
|||
return self.optimizer.robot.dof_joint_names |
|||
2237
teleop/robot_control/dex_retargeting/yourdfpy.py
File diff suppressed because it is too large
View File
File diff suppressed because it is too large
View File
@ -1,14 +0,0 @@ |
|||
import numpy as np |
|||
|
|||
def mat_update(prev_mat, mat): |
|||
if np.linalg.det(mat) == 0: |
|||
return prev_mat, False # Return previous matrix and False flag if the new matrix is non-singular (determinant ≠ 0). |
|||
else: |
|||
return mat, True |
|||
|
|||
|
|||
def fast_mat_inv(mat): |
|||
ret = np.eye(4) |
|||
ret[:3, :3] = mat[:3, :3].T |
|||
ret[:3, 3] = -mat[:3, :3].T @ mat[:3, 3] |
|||
return ret |
|||
@ -0,0 +1,290 @@ |
|||
# Copyright (c) 2025, Unitree Robotics Co., Ltd. All Rights Reserved. |
|||
# License: Apache License, Version 2.0 |
|||
""" |
|||
Simple sim state subscriber class |
|||
Subscribe to rt/sim_state_cmd topic and write to shared memory |
|||
""" |
|||
|
|||
import threading |
|||
import time |
|||
import json |
|||
from multiprocessing import shared_memory |
|||
from typing import Any, Dict, Optional |
|||
from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize |
|||
from unitree_sdk2py.idl.std_msgs.msg.dds_ import String_ |
|||
|
|||
import logging_mp |
|||
logger_mp = logging_mp.get_logger(__name__) |
|||
|
|||
|
|||
class SharedMemoryManager: |
|||
"""Shared memory manager""" |
|||
|
|||
def __init__(self, name: str = None, size: int = 512): |
|||
"""Initialize shared memory manager |
|||
|
|||
Args: |
|||
name: shared memory name, if None, create new one |
|||
size: shared memory size (bytes) |
|||
""" |
|||
self.size = size |
|||
self.lock = threading.RLock() # reentrant lock |
|||
|
|||
if name: |
|||
try: |
|||
self.shm = shared_memory.SharedMemory(name=name) |
|||
self.shm_name = name |
|||
self.created = False |
|||
except FileNotFoundError: |
|||
self.shm = shared_memory.SharedMemory(create=True, size=size) |
|||
self.shm_name = self.shm.name |
|||
self.created = True |
|||
else: |
|||
self.shm = shared_memory.SharedMemory(create=True, size=size) |
|||
self.shm_name = self.shm.name |
|||
self.created = True |
|||
|
|||
def write_data(self, data: Dict[str, Any]) -> bool: |
|||
"""Write data to shared memory |
|||
|
|||
Args: |
|||
data: data to write |
|||
|
|||
Returns: |
|||
bool: write success or not |
|||
""" |
|||
try: |
|||
with self.lock: |
|||
json_str = json.dumps(data) |
|||
json_bytes = json_str.encode('utf-8') |
|||
|
|||
if len(json_bytes) > self.size - 8: # reserve 8 bytes for length and timestamp |
|||
logger_mp.warning(f"Data too large for shared memory ({len(json_bytes)} > {self.size - 8})") |
|||
return False |
|||
|
|||
# write timestamp (4 bytes) and data length (4 bytes) |
|||
timestamp = int(time.time()) & 0xFFFFFFFF # 32-bit timestamp, use bitmask to ensure in range |
|||
self.shm.buf[0:4] = timestamp.to_bytes(4, 'little') |
|||
self.shm.buf[4:8] = len(json_bytes).to_bytes(4, 'little') |
|||
|
|||
# write data |
|||
self.shm.buf[8:8+len(json_bytes)] = json_bytes |
|||
return True |
|||
|
|||
except Exception as e: |
|||
logger_mp.error(f"Error writing to shared memory: {e}") |
|||
return False |
|||
|
|||
def read_data(self) -> Optional[Dict[str, Any]]: |
|||
"""Read data from shared memory |
|||
|
|||
Returns: |
|||
Dict[str, Any]: read data dictionary, return None if failed |
|||
""" |
|||
try: |
|||
with self.lock: |
|||
# read timestamp and data length |
|||
timestamp = int.from_bytes(self.shm.buf[0:4], 'little') |
|||
data_len = int.from_bytes(self.shm.buf[4:8], 'little') |
|||
|
|||
if data_len == 0: |
|||
return None |
|||
|
|||
# read data |
|||
json_bytes = bytes(self.shm.buf[8:8+data_len]) |
|||
data = json.loads(json_bytes.decode('utf-8')) |
|||
data['_timestamp'] = timestamp # add timestamp information |
|||
return data |
|||
|
|||
except Exception as e: |
|||
logger_mp.error(f"Error reading from shared memory: {e}") |
|||
return None |
|||
|
|||
def get_name(self) -> str: |
|||
"""Get shared memory name""" |
|||
return self.shm_name |
|||
|
|||
def cleanup(self): |
|||
"""Clean up shared memory""" |
|||
if hasattr(self, 'shm') and self.shm: |
|||
self.shm.close() |
|||
if self.created: |
|||
try: |
|||
self.shm.unlink() |
|||
except: |
|||
pass |
|||
|
|||
def __del__(self): |
|||
"""Destructor""" |
|||
self.cleanup() |
|||
|
|||
|
|||
class SimStateSubscriber: |
|||
"""Simple sim state subscriber class""" |
|||
|
|||
def __init__(self, shm_name: str = "sim_state_cmd_data", shm_size: int = 3096): |
|||
"""Initialize the subscriber |
|||
|
|||
Args: |
|||
shm_name: shared memory name |
|||
shm_size: shared memory size |
|||
""" |
|||
self.shm_name = shm_name |
|||
self.shm_size = shm_size |
|||
self.running = False |
|||
self.subscriber = None |
|||
self.subscribe_thread = None |
|||
self.shared_memory = None |
|||
|
|||
# initialize shared memory |
|||
self._setup_shared_memory() |
|||
|
|||
logger_mp.info(f"[SimStateSubscriber] Initialized with shared memory: {shm_name}") |
|||
|
|||
def _setup_shared_memory(self): |
|||
"""Setup shared memory""" |
|||
try: |
|||
self.shared_memory = SharedMemoryManager(self.shm_name, self.shm_size) |
|||
logger_mp.info(f"[SimStateSubscriber] Shared memory setup successfully") |
|||
except Exception as e: |
|||
logger_mp.error(f"[SimStateSubscriber] Failed to setup shared memory: {e}") |
|||
|
|||
def _message_handler(self, msg: String_): |
|||
"""Handle received messages""" |
|||
try: |
|||
# parse the received message |
|||
data = json.loads(msg.data) |
|||
|
|||
# write to shared memory |
|||
if self.shared_memory: |
|||
self.shared_memory.write_data(data) |
|||
|
|||
except Exception as e: |
|||
logger_mp.error(f"[SimStateSubscriber] Error processing message: {e}") |
|||
|
|||
def _subscribe_loop(self): |
|||
"""Subscribe loop thread""" |
|||
logger_mp.info(f"[SimStateSubscriber] Subscribe thread started") |
|||
|
|||
while self.running: |
|||
try: |
|||
time.sleep(0.001) # keep thread alive |
|||
except Exception as e: |
|||
logger_mp.error(f"[SimStateSubscriber] Error in subscribe loop: {e}") |
|||
time.sleep(0.01) |
|||
|
|||
logger_mp.info(f"[SimStateSubscriber] Subscribe thread stopped") |
|||
|
|||
def start_subscribe(self): |
|||
"""Start subscribing""" |
|||
if self.running: |
|||
logger_mp.info(f"[SimStateSubscriber] Already running") |
|||
return |
|||
|
|||
try: |
|||
# setup subscriber |
|||
self.subscriber = ChannelSubscriber("rt/sim_state", String_) |
|||
self.subscriber.Init(self._message_handler, 10) |
|||
|
|||
self.running = True |
|||
logger_mp.info(f"[SimStateSubscriber] Subscriber initialized") |
|||
# start subscribe thread |
|||
self.subscribe_thread = threading.Thread(target=self._subscribe_loop) |
|||
self.subscribe_thread.daemon = True |
|||
self.subscribe_thread.start() |
|||
|
|||
logger_mp.info(f"[SimStateSubscriber] Started subscribing to rt/sim_state_cmd") |
|||
|
|||
except Exception as e: |
|||
logger_mp.error(f"[SimStateSubscriber] Failed to start subscribing: {e}") |
|||
self.running = False |
|||
|
|||
def stop_subscribe(self): |
|||
"""Stop subscribing""" |
|||
if not self.running: |
|||
return |
|||
|
|||
logger_mp.info(f"[SimStateSubscriber] Stopping subscriber...") |
|||
self.running = False |
|||
|
|||
# wait for thread to finish |
|||
if self.subscribe_thread: |
|||
self.subscribe_thread.join(timeout=1.0) |
|||
|
|||
if self.shared_memory: |
|||
# cleanup shared memory |
|||
self.shared_memory.cleanup() |
|||
logger_mp.info(f"[SimStateSubscriber] Subscriber stopped") |
|||
|
|||
def read_data(self) -> Optional[Dict[str, Any]]: |
|||
"""Read data from shared memory |
|||
|
|||
Returns: |
|||
Dict: received data, None if no data or error |
|||
""" |
|||
try: |
|||
if self.shared_memory: |
|||
return self.shared_memory.read_data() |
|||
return None |
|||
except Exception as e: |
|||
logger_mp.error(f"[SimStateSubscriber] Error reading data: {e}") |
|||
return None |
|||
|
|||
def is_running(self) -> bool: |
|||
"""Check if subscriber is running""" |
|||
return self.running |
|||
|
|||
def __del__(self): |
|||
"""Destructor""" |
|||
self.stop_subscribe() |
|||
|
|||
|
|||
# convenient functions |
|||
def create_sim_state_subscriber(shm_name: str = "sim_state_cmd_data", shm_size: int = 3096) -> SimStateSubscriber: |
|||
"""Create a sim state subscriber instance |
|||
|
|||
Args: |
|||
shm_name: shared memory name |
|||
shm_size: shared memory size |
|||
|
|||
Returns: |
|||
SimStateSubscriber: subscriber instance |
|||
""" |
|||
return SimStateSubscriber(shm_name, shm_size) |
|||
|
|||
|
|||
def start_sim_state_subscribe(shm_name: str = "sim_state_cmd_data", shm_size: int = 3096) -> SimStateSubscriber: |
|||
"""Start sim state subscribing |
|||
|
|||
Args: |
|||
shm_name: shared memory name |
|||
shm_size: shared memory size |
|||
|
|||
Returns: |
|||
SimStateSubscriber: started subscriber instance |
|||
""" |
|||
subscriber = create_sim_state_subscriber(shm_name, shm_size) |
|||
subscriber.start_subscribe() |
|||
return subscriber |
|||
|
|||
|
|||
# if __name__ == "__main__": |
|||
# # example usage |
|||
# logger_mp.info("Starting sim state subscriber...") |
|||
# ChannelFactoryInitialize(0) |
|||
# # create and start subscriber |
|||
# subscriber = start_sim_state_subscribe() |
|||
|
|||
# try: |
|||
# # keep running and check for data |
|||
# while True: |
|||
# data = subscriber.read_data() |
|||
# if data: |
|||
# logger_mp.info(f"Read data: {data}") |
|||
# time.sleep(1) |
|||
|
|||
# except KeyboardInterrupt: |
|||
# logger_mp.warning("\nInterrupted by user") |
|||
# finally: |
|||
# subscriber.stop_subscribe() |
|||
# logger_mp.info("Subscriber stopped") |
|||
Write
Preview
Loading…
Cancel
Save
Reference in new issue