You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

335 lines
18 KiB

import numpy as np
import time
import argparse
import cv2
from multiprocessing import shared_memory, Value, Array, Lock
import threading
import os
import sys
current_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.dirname(current_dir)
sys.path.append(parent_dir)
from teleop.open_television import TeleVisionWrapper
from teleop.robot_control.robot_arm import G1_29_ArmController, G1_23_ArmController, H1_2_ArmController, H1_ArmController
from teleop.robot_control.robot_arm_ik import G1_29_ArmIK, G1_23_ArmIK, H1_2_ArmIK, H1_ArmIK
from teleop.robot_control.robot_hand_unitree import Dex3_1_Controller, Gripper_Controller
from teleop.robot_control.robot_hand_inspire import Inspire_Controller
from teleop.image_server.image_client import ImageClient
from teleop.utils.episode_writer import EpisodeWriter
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--task_dir', type = str, default = './utils/data', help = 'path to save data')
parser.add_argument('--frequency', type = float, default = 90.0, help = 'save data\'s frequency')
parser.add_argument('--record', action = 'store_true', help = 'Save data or not')
parser.add_argument('--no-record', dest = 'record', action = 'store_false', help = 'Do not save data')
parser.set_defaults(record = False)
parser.add_argument('--xr-mode', type=str, choices=['hand', 'controller'], default='hand', help='Select XR device tracking source')
parser.add_argument('--arm', type=str, choices=['G1_29', 'G1_23', 'H1_2', 'H1'], default='G1_29', help='Select arm controller')
parser.add_argument('--ee', type=str, choices=['dex3', 'gripper', 'inspire1'], help='Select end effector controller')
args = parser.parse_args()
print(f"args:{args}\n")
# image client: img_config should be the same as the configuration in image_server.py (of Robot's development computing unit)
img_config = {
'fps': 30,
'head_camera_type': 'opencv',
'head_camera_image_shape': [480, 1280], # Head camera resolution
'head_camera_id_numbers': [0],
'wrist_camera_type': 'opencv',
'wrist_camera_image_shape': [480, 640], # Wrist camera resolution
'wrist_camera_id_numbers': [2, 4],
}
ASPECT_RATIO_THRESHOLD = 2.0 # If the aspect ratio exceeds this value, it is considered binocular
if len(img_config['head_camera_id_numbers']) > 1 or (img_config['head_camera_image_shape'][1] / img_config['head_camera_image_shape'][0] > ASPECT_RATIO_THRESHOLD):
BINOCULAR = True
else:
BINOCULAR = False
if 'wrist_camera_type' in img_config:
WRIST = True
else:
WRIST = False
if BINOCULAR and not (img_config['head_camera_image_shape'][1] / img_config['head_camera_image_shape'][0] > ASPECT_RATIO_THRESHOLD):
tv_img_shape = (img_config['head_camera_image_shape'][0], img_config['head_camera_image_shape'][1] * 2, 3)
else:
tv_img_shape = (img_config['head_camera_image_shape'][0], img_config['head_camera_image_shape'][1], 3)
tv_img_shm = shared_memory.SharedMemory(create = True, size = np.prod(tv_img_shape) * np.uint8().itemsize)
tv_img_array = np.ndarray(tv_img_shape, dtype = np.uint8, buffer = tv_img_shm.buf)
if WRIST:
wrist_img_shape = (img_config['wrist_camera_image_shape'][0], img_config['wrist_camera_image_shape'][1] * 2, 3)
wrist_img_shm = shared_memory.SharedMemory(create = True, size = np.prod(wrist_img_shape) * np.uint8().itemsize)
wrist_img_array = np.ndarray(wrist_img_shape, dtype = np.uint8, buffer = wrist_img_shm.buf)
img_client = ImageClient(tv_img_shape = tv_img_shape, tv_img_shm_name = tv_img_shm.name,
wrist_img_shape = wrist_img_shape, wrist_img_shm_name = wrist_img_shm.name)
else:
img_client = ImageClient(tv_img_shape = tv_img_shape, tv_img_shm_name = tv_img_shm.name)
image_receive_thread = threading.Thread(target = img_client.receive_process, daemon = True)
image_receive_thread.daemon = True
image_receive_thread.start()
# television: obtain hand pose data from the XR device and transmit the robot's head camera image to the XR device.
tv_wrapper = TeleVisionWrapper(binocular=BINOCULAR, use_hand_tracking=args.xr_mode == 'hand', img_shape=tv_img_shape, img_shm_name=tv_img_shm.name,
return_state_data=True, return_hand_rot_data = False)
# arm
if args.arm == 'G1_29':
arm_ctrl = G1_29_ArmController(debug_mode=True)
arm_ik = G1_29_ArmIK()
elif args.arm == 'G1_23':
arm_ctrl = G1_23_ArmController()
arm_ik = G1_23_ArmIK()
elif args.arm == 'H1_2':
arm_ctrl = H1_2_ArmController()
arm_ik = H1_2_ArmIK()
elif args.arm == 'H1':
arm_ctrl = H1_ArmController()
arm_ik = H1_ArmIK()
# end-effector
if args.ee == "dex3":
left_hand_pos_array = Array('d', 75, lock = True) # [input]
right_hand_pos_array = Array('d', 75, lock = True) # [input]
dual_hand_data_lock = Lock()
dual_hand_state_array = Array('d', 14, lock = False) # [output] current left, right hand state(14) data.
dual_hand_action_array = Array('d', 14, lock = False) # [output] current left, right hand action(14) data.
hand_ctrl = Dex3_1_Controller(left_hand_pos_array, right_hand_pos_array, dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array)
elif args.ee == "gripper":
left_gripper_value = Value('d', 0.0, lock=True) # [input]
right_gripper_value = Value('d', 0.0, lock=True) # [input]
dual_gripper_data_lock = Lock()
dual_gripper_state_array = Array('d', 2, lock=False) # current left, right gripper state(2) data.
dual_gripper_action_array = Array('d', 2, lock=False) # current left, right gripper action(2) data.
gripper_ctrl = Gripper_Controller(left_gripper_value, right_gripper_value, dual_gripper_data_lock, dual_gripper_state_array, dual_gripper_action_array)
elif args.ee == "inspire1":
left_hand_pos_array = Array('d', 75, lock = True) # [input]
right_hand_pos_array = Array('d', 75, lock = True) # [input]
dual_hand_data_lock = Lock()
dual_hand_state_array = Array('d', 12, lock = False) # [output] current left, right hand state(12) data.
dual_hand_action_array = Array('d', 12, lock = False) # [output] current left, right hand action(12) data.
hand_ctrl = Inspire_Controller(left_hand_pos_array, right_hand_pos_array, dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array)
else:
pass
# xr mode
if args.xr_mode == 'controller':
from unitree_sdk2py.g1.loco.g1_loco_client import LocoClient
sport_client = LocoClient()
sport_client.SetTimeout(0.0001)
sport_client.Init()
if args.record:
recorder = EpisodeWriter(task_dir = args.task_dir, frequency = args.frequency, rerun_log = True)
recording = False
try:
user_input = input("Please enter the start signal (enter 'r' to start the subsequent program):\n")
if user_input.lower() == 'r':
arm_ctrl.speed_gradual_max()
running = True
while running:
start_time = time.time()
# opencv image
tv_resized_image = cv2.resize(tv_img_array, (tv_img_shape[1] // 2, tv_img_shape[0] // 2))
cv2.imshow("record image", tv_resized_image)
key = cv2.waitKey(1) & 0xFF
if key == ord('q'):
running = False
elif key == ord('s') and args.record:
recording = not recording # state flipping
if recording:
if not recorder.create_episode():
recording = False
else:
recorder.save_episode()
# get input data
tele_data = tv_wrapper.get_motion_state_data()
if (args.ee == 'dex3' or args.ee == 'inspire1') and args.xr_mode == 'hand':
with left_hand_pos_array.get_lock():
left_hand_pos_array[:] = tele_data.left_hand_pos.flatten()
with right_hand_pos_array.get_lock():
right_hand_pos_array[:] = tele_data.right_hand_pos.flatten()
elif args.ee == 'gripper' and args.xr_mode == 'controller':
with left_gripper_value.get_lock():
left_gripper_value.value = tele_data.left_trigger_value
with right_gripper_value.get_lock():
right_gripper_value.value = tele_data.right_trigger_value
# quit teleoperate
if tele_data.tele_state.right_aButton:
running = False
# command robot to enter damping mode. soft emergency stop function
if tele_data.tele_state.left_thumbstick_state and tele_data.tele_state.right_thumbstick_state:
sport_client.Damp()
# high level control, limit velocity to within 0.3
sport_client.Move(-tele_data.tele_state.left_thumbstick_value[1] * 0.3,
-tele_data.tele_state.left_thumbstick_value[0] * 0.3,
-tele_data.tele_state.right_thumbstick_value[0] * 0.3)
elif args.ee == 'gripper' and args.xr_mode == 'hand':
with left_gripper_value.get_lock():
left_gripper_value.value = tele_data.left_pinch_value
with right_gripper_value.get_lock():
right_gripper_value.value = tele_data.right_pinch_value
else:
pass
# get current robot state data.
current_lr_arm_q = arm_ctrl.get_current_dual_arm_q()
current_lr_arm_dq = arm_ctrl.get_current_dual_arm_dq()
# solve ik using motor data and wrist pose, then use ik results to control arms.
time_ik_start = time.time()
sol_q, sol_tauff = arm_ik.solve_ik(tele_data.left_arm_pose, tele_data.right_arm_pose, current_lr_arm_q, current_lr_arm_dq)
time_ik_end = time.time()
# print(f"ik:\t{round(time_ik_end - time_ik_start, 6)}")
arm_ctrl.ctrl_dual_arm(sol_q, sol_tauff)
# record data
if args.record:
# dex hand or gripper
if args.ee == "dex3" and args.xr_mode == 'hand':
with dual_hand_data_lock:
left_hand_state = dual_hand_state_array[:7]
right_hand_state = dual_hand_state_array[-7:]
left_hand_action = dual_hand_action_array[:7]
right_hand_action = dual_hand_action_array[-7:]
current_body_state = []
current_body_action = []
elif args.ee == "gripper" and args.xr_mode == 'hand':
with dual_gripper_data_lock:
left_hand_state = [dual_gripper_state_array[1]]
right_hand_state = [dual_gripper_state_array[0]]
left_hand_action = [dual_gripper_action_array[1]]
right_hand_action = [dual_gripper_action_array[0]]
current_body_state = []
current_body_action = []
elif args.ee == "gripper" and args.xr_mode == 'controller':
with dual_gripper_data_lock:
left_hand_state = [dual_gripper_state_array[1]]
right_hand_state = [dual_gripper_state_array[0]]
left_hand_action = [dual_gripper_action_array[1]]
right_hand_action = [dual_gripper_action_array[0]]
current_body_state = arm_ctrl.get_current_motor_q().tolist()
current_body_action = [-tele_data.tele_state.left_thumbstick_value[1] * 0.3,
-tele_data.tele_state.left_thumbstick_value[0] * 0.3,
-tele_data.tele_state.right_thumbstick_value[0] * 0.3]
elif args.ee == "inspire1" and args.xr_mode == 'hand':
with dual_hand_data_lock:
left_hand_state = dual_hand_state_array[:6]
right_hand_state = dual_hand_state_array[-6:]
left_hand_action = dual_hand_action_array[:6]
right_hand_action = dual_hand_action_array[-6:]
current_body_state = []
current_body_action = []
else:
left_hand_state = []
right_hand_state = []
left_hand_action = []
right_hand_action = []
current_body_state = []
current_body_action = []
# head image
current_tv_image = tv_img_array.copy()
# wrist image
if WRIST:
current_wrist_image = wrist_img_array.copy()
# arm state and action
left_arm_state = current_lr_arm_q[:7]
right_arm_state = current_lr_arm_q[-7:]
left_arm_action = sol_q[:7]
right_arm_action = sol_q[-7:]
if recording:
colors = {}
depths = {}
if BINOCULAR:
colors[f"color_{0}"] = current_tv_image[:, :tv_img_shape[1]//2]
colors[f"color_{1}"] = current_tv_image[:, tv_img_shape[1]//2:]
if WRIST:
colors[f"color_{2}"] = current_wrist_image[:, :wrist_img_shape[1]//2]
colors[f"color_{3}"] = current_wrist_image[:, wrist_img_shape[1]//2:]
else:
colors[f"color_{0}"] = current_tv_image
if WRIST:
colors[f"color_{1}"] = current_wrist_image[:, :wrist_img_shape[1]//2]
colors[f"color_{2}"] = current_wrist_image[:, wrist_img_shape[1]//2:]
states = {
"left_arm": {
"qpos": left_arm_state.tolist(), # numpy.array -> list
"qvel": [],
"torque": [],
},
"right_arm": {
"qpos": right_arm_state.tolist(),
"qvel": [],
"torque": [],
},
"left_hand": {
"qpos": left_hand_state,
"qvel": [],
"torque": [],
},
"right_hand": {
"qpos": right_hand_state,
"qvel": [],
"torque": [],
},
"body": {
"qpos": current_body_state,
},
}
actions = {
"left_arm": {
"qpos": left_arm_action.tolist(),
"qvel": [],
"torque": [],
},
"right_arm": {
"qpos": right_arm_action.tolist(),
"qvel": [],
"torque": [],
},
"left_hand": {
"qpos": left_hand_action,
"qvel": [],
"torque": [],
},
"right_hand": {
"qpos": right_hand_action,
"qvel": [],
"torque": [],
},
"body": {
"qpos": current_body_action,
},
}
recorder.add_item(colors=colors, depths=depths, states=states, actions=actions)
current_time = time.time()
time_elapsed = current_time - start_time
sleep_time = max(0, (1 / args.frequency) - time_elapsed)
time.sleep(sleep_time)
# print(f"main process sleep: {sleep_time}")
except KeyboardInterrupt:
print("KeyboardInterrupt, exiting program...")
finally:
arm_ctrl.ctrl_dual_arm_go_home()
tv_img_shm.unlink()
tv_img_shm.close()
if WRIST:
wrist_img_shm.unlink()
wrist_img_shm.close()
if args.record:
recorder.close()
print("Finally, exiting program...")
exit(0)