|
|
|
@ -24,11 +24,15 @@ from teleop.utils.episode_writer import EpisodeWriter |
|
|
|
from sshkeyboard import listen_keyboard |
|
|
|
|
|
|
|
|
|
|
|
start_signal = False |
|
|
|
running = True |
|
|
|
recording = False |
|
|
|
def on_press(key): |
|
|
|
global running, recording |
|
|
|
if key == 'q': |
|
|
|
global running, recording, start_signal |
|
|
|
if key == 'r': |
|
|
|
start_signal = True |
|
|
|
logger_mp.info("[Key] start signal received.") |
|
|
|
elif key == 'q': |
|
|
|
running = False |
|
|
|
elif key == 's': |
|
|
|
recording = not recording |
|
|
|
@ -100,7 +104,6 @@ if __name__ == '__main__': |
|
|
|
if args.arm == 'G1_29': |
|
|
|
arm_ctrl = G1_29_ArmController(debug_mode=args.debug) |
|
|
|
arm_ik = G1_29_ArmIK() |
|
|
|
pass |
|
|
|
elif args.arm == 'G1_23': |
|
|
|
arm_ctrl = G1_23_ArmController() |
|
|
|
arm_ik = G1_23_ArmIK() |
|
|
|
@ -149,198 +152,200 @@ if __name__ == '__main__': |
|
|
|
recorder = EpisodeWriter(task_dir = args.task_dir, frequency = args.frequency, rerun_log = True) |
|
|
|
|
|
|
|
try: |
|
|
|
user_input = input("Please enter the start signal (enter 'r' to start the subsequent program):\n") |
|
|
|
if user_input.lower() == 'r': |
|
|
|
arm_ctrl.speed_gradual_max() |
|
|
|
while running: |
|
|
|
start_time = time.time() |
|
|
|
|
|
|
|
if not args.headless: |
|
|
|
tv_resized_image = cv2.resize(tv_img_array, (tv_img_shape[1] // 2, tv_img_shape[0] // 2)) |
|
|
|
cv2.imshow("record image", tv_resized_image) |
|
|
|
key = cv2.waitKey(1) & 0xFF |
|
|
|
if key == ord('q'): |
|
|
|
running = False |
|
|
|
elif key == ord('s'): |
|
|
|
recording = not recording # state flipping |
|
|
|
logger_mp.info(f"recording : {recording}") |
|
|
|
logger_mp.info("Please enter the start signal (enter 'r' to start the subsequent program)") |
|
|
|
while not start_signal: |
|
|
|
time.sleep(0.01) |
|
|
|
logger_mp.info("Program start.") |
|
|
|
arm_ctrl.speed_gradual_max() |
|
|
|
while running: |
|
|
|
start_time = time.time() |
|
|
|
|
|
|
|
if args.record: |
|
|
|
if recording: |
|
|
|
if not recorder.create_episode(): |
|
|
|
recording = False |
|
|
|
else: |
|
|
|
recorder.save_episode() |
|
|
|
if not args.headless: |
|
|
|
tv_resized_image = cv2.resize(tv_img_array, (tv_img_shape[1] // 2, tv_img_shape[0] // 2)) |
|
|
|
cv2.imshow("record image", tv_resized_image) |
|
|
|
key = cv2.waitKey(1) & 0xFF |
|
|
|
if key == ord('q'): |
|
|
|
running = False |
|
|
|
elif key == ord('s'): |
|
|
|
recording = not recording # state flipping |
|
|
|
logger_mp.info(f"recording : {recording}") |
|
|
|
|
|
|
|
# get input data |
|
|
|
tele_data = tv_wrapper.get_motion_state_data() |
|
|
|
if (args.ee == 'dex3' or args.ee == 'inspire1') and args.xr_mode == 'hand': |
|
|
|
with left_hand_pos_array.get_lock(): |
|
|
|
left_hand_pos_array[:] = tele_data.left_hand_pos.flatten() |
|
|
|
with right_hand_pos_array.get_lock(): |
|
|
|
right_hand_pos_array[:] = tele_data.right_hand_pos.flatten() |
|
|
|
elif args.ee == 'gripper' and args.xr_mode == 'controller': |
|
|
|
with left_gripper_value.get_lock(): |
|
|
|
left_gripper_value.value = tele_data.left_trigger_value |
|
|
|
with right_gripper_value.get_lock(): |
|
|
|
right_gripper_value.value = tele_data.right_trigger_value |
|
|
|
elif args.ee == 'gripper' and args.xr_mode == 'hand': |
|
|
|
with left_gripper_value.get_lock(): |
|
|
|
left_gripper_value.value = tele_data.left_pinch_value |
|
|
|
with right_gripper_value.get_lock(): |
|
|
|
right_gripper_value.value = tele_data.right_pinch_value |
|
|
|
if args.record: |
|
|
|
if recording: |
|
|
|
if not recorder.create_episode(): |
|
|
|
recording = False |
|
|
|
else: |
|
|
|
pass |
|
|
|
|
|
|
|
# high level control |
|
|
|
if args.xr_mode == 'controller' and not args.debug: |
|
|
|
# quit teleoperate |
|
|
|
if tele_data.tele_state.right_aButton: |
|
|
|
running = False |
|
|
|
# command robot to enter damping mode. soft emergency stop function |
|
|
|
if tele_data.tele_state.left_thumbstick_state and tele_data.tele_state.right_thumbstick_state: |
|
|
|
sport_client.Damp() |
|
|
|
# control, limit velocity to within 0.3 |
|
|
|
sport_client.Move(-tele_data.tele_state.left_thumbstick_value[1] * 0.3, |
|
|
|
-tele_data.tele_state.left_thumbstick_value[0] * 0.3, |
|
|
|
-tele_data.tele_state.right_thumbstick_value[0] * 0.3) |
|
|
|
recorder.save_episode() |
|
|
|
|
|
|
|
# get current robot state data. |
|
|
|
current_lr_arm_q = arm_ctrl.get_current_dual_arm_q() |
|
|
|
current_lr_arm_dq = arm_ctrl.get_current_dual_arm_dq() |
|
|
|
# get input data |
|
|
|
tele_data = tv_wrapper.get_motion_state_data() |
|
|
|
if (args.ee == 'dex3' or args.ee == 'inspire1') and args.xr_mode == 'hand': |
|
|
|
with left_hand_pos_array.get_lock(): |
|
|
|
left_hand_pos_array[:] = tele_data.left_hand_pos.flatten() |
|
|
|
with right_hand_pos_array.get_lock(): |
|
|
|
right_hand_pos_array[:] = tele_data.right_hand_pos.flatten() |
|
|
|
elif args.ee == 'gripper' and args.xr_mode == 'controller': |
|
|
|
with left_gripper_value.get_lock(): |
|
|
|
left_gripper_value.value = tele_data.left_trigger_value |
|
|
|
with right_gripper_value.get_lock(): |
|
|
|
right_gripper_value.value = tele_data.right_trigger_value |
|
|
|
elif args.ee == 'gripper' and args.xr_mode == 'hand': |
|
|
|
with left_gripper_value.get_lock(): |
|
|
|
left_gripper_value.value = tele_data.left_pinch_value |
|
|
|
with right_gripper_value.get_lock(): |
|
|
|
right_gripper_value.value = tele_data.right_pinch_value |
|
|
|
else: |
|
|
|
pass |
|
|
|
|
|
|
|
# high level control |
|
|
|
if args.xr_mode == 'controller' and not args.debug: |
|
|
|
# quit teleoperate |
|
|
|
if tele_data.tele_state.right_aButton: |
|
|
|
running = False |
|
|
|
# command robot to enter damping mode. soft emergency stop function |
|
|
|
if tele_data.tele_state.left_thumbstick_state and tele_data.tele_state.right_thumbstick_state: |
|
|
|
sport_client.Damp() |
|
|
|
# control, limit velocity to within 0.3 |
|
|
|
sport_client.Move(-tele_data.tele_state.left_thumbstick_value[1] * 0.3, |
|
|
|
-tele_data.tele_state.left_thumbstick_value[0] * 0.3, |
|
|
|
-tele_data.tele_state.right_thumbstick_value[0] * 0.3) |
|
|
|
|
|
|
|
# solve ik using motor data and wrist pose, then use ik results to control arms. |
|
|
|
time_ik_start = time.time() |
|
|
|
sol_q, sol_tauff = arm_ik.solve_ik(tele_data.left_arm_pose, tele_data.right_arm_pose, current_lr_arm_q, current_lr_arm_dq) |
|
|
|
time_ik_end = time.time() |
|
|
|
logger_mp.debug(f"ik:\t{round(time_ik_end - time_ik_start, 6)}") |
|
|
|
arm_ctrl.ctrl_dual_arm(sol_q, sol_tauff) |
|
|
|
# get current robot state data. |
|
|
|
current_lr_arm_q = arm_ctrl.get_current_dual_arm_q() |
|
|
|
current_lr_arm_dq = arm_ctrl.get_current_dual_arm_dq() |
|
|
|
|
|
|
|
# record data |
|
|
|
if args.record: |
|
|
|
# dex hand or gripper |
|
|
|
if args.ee == "dex3" and args.xr_mode == 'hand': |
|
|
|
with dual_hand_data_lock: |
|
|
|
left_ee_state = dual_hand_state_array[:7] |
|
|
|
right_ee_state = dual_hand_state_array[-7:] |
|
|
|
left_hand_action = dual_hand_action_array[:7] |
|
|
|
right_hand_action = dual_hand_action_array[-7:] |
|
|
|
current_body_state = [] |
|
|
|
current_body_action = [] |
|
|
|
elif args.ee == "gripper" and args.xr_mode == 'hand': |
|
|
|
with dual_gripper_data_lock: |
|
|
|
left_ee_state = [dual_gripper_state_array[1]] |
|
|
|
right_ee_state = [dual_gripper_state_array[0]] |
|
|
|
left_hand_action = [dual_gripper_action_array[1]] |
|
|
|
right_hand_action = [dual_gripper_action_array[0]] |
|
|
|
current_body_state = [] |
|
|
|
current_body_action = [] |
|
|
|
elif args.ee == "gripper" and args.xr_mode == 'controller': |
|
|
|
with dual_gripper_data_lock: |
|
|
|
left_ee_state = [dual_gripper_state_array[1]] |
|
|
|
right_ee_state = [dual_gripper_state_array[0]] |
|
|
|
left_hand_action = [dual_gripper_action_array[1]] |
|
|
|
right_hand_action = [dual_gripper_action_array[0]] |
|
|
|
current_body_state = arm_ctrl.get_current_motor_q().tolist() |
|
|
|
current_body_action = [-tele_data.tele_state.left_thumbstick_value[1] * 0.3, |
|
|
|
-tele_data.tele_state.left_thumbstick_value[0] * 0.3, |
|
|
|
-tele_data.tele_state.right_thumbstick_value[0] * 0.3] |
|
|
|
elif args.ee == "inspire1" and args.xr_mode == 'hand': |
|
|
|
with dual_hand_data_lock: |
|
|
|
left_ee_state = dual_hand_state_array[:6] |
|
|
|
right_ee_state = dual_hand_state_array[-6:] |
|
|
|
left_hand_action = dual_hand_action_array[:6] |
|
|
|
right_hand_action = dual_hand_action_array[-6:] |
|
|
|
current_body_state = [] |
|
|
|
current_body_action = [] |
|
|
|
else: |
|
|
|
left_ee_state = [] |
|
|
|
right_ee_state = [] |
|
|
|
left_hand_action = [] |
|
|
|
right_hand_action = [] |
|
|
|
# solve ik using motor data and wrist pose, then use ik results to control arms. |
|
|
|
time_ik_start = time.time() |
|
|
|
sol_q, sol_tauff = arm_ik.solve_ik(tele_data.left_arm_pose, tele_data.right_arm_pose, current_lr_arm_q, current_lr_arm_dq) |
|
|
|
time_ik_end = time.time() |
|
|
|
logger_mp.debug(f"ik:\t{round(time_ik_end - time_ik_start, 6)}") |
|
|
|
arm_ctrl.ctrl_dual_arm(sol_q, sol_tauff) |
|
|
|
|
|
|
|
# record data |
|
|
|
if args.record: |
|
|
|
# dex hand or gripper |
|
|
|
if args.ee == "dex3" and args.xr_mode == 'hand': |
|
|
|
with dual_hand_data_lock: |
|
|
|
left_ee_state = dual_hand_state_array[:7] |
|
|
|
right_ee_state = dual_hand_state_array[-7:] |
|
|
|
left_hand_action = dual_hand_action_array[:7] |
|
|
|
right_hand_action = dual_hand_action_array[-7:] |
|
|
|
current_body_state = [] |
|
|
|
current_body_action = [] |
|
|
|
# head image |
|
|
|
current_tv_image = tv_img_array.copy() |
|
|
|
# wrist image |
|
|
|
if WRIST: |
|
|
|
current_wrist_image = wrist_img_array.copy() |
|
|
|
# arm state and action |
|
|
|
left_arm_state = current_lr_arm_q[:7] |
|
|
|
right_arm_state = current_lr_arm_q[-7:] |
|
|
|
left_arm_action = sol_q[:7] |
|
|
|
right_arm_action = sol_q[-7:] |
|
|
|
if recording: |
|
|
|
colors = {} |
|
|
|
depths = {} |
|
|
|
if BINOCULAR: |
|
|
|
colors[f"color_{0}"] = current_tv_image[:, :tv_img_shape[1]//2] |
|
|
|
colors[f"color_{1}"] = current_tv_image[:, tv_img_shape[1]//2:] |
|
|
|
if WRIST: |
|
|
|
colors[f"color_{2}"] = current_wrist_image[:, :wrist_img_shape[1]//2] |
|
|
|
colors[f"color_{3}"] = current_wrist_image[:, wrist_img_shape[1]//2:] |
|
|
|
else: |
|
|
|
colors[f"color_{0}"] = current_tv_image |
|
|
|
if WRIST: |
|
|
|
colors[f"color_{1}"] = current_wrist_image[:, :wrist_img_shape[1]//2] |
|
|
|
colors[f"color_{2}"] = current_wrist_image[:, wrist_img_shape[1]//2:] |
|
|
|
states = { |
|
|
|
"left_arm": { |
|
|
|
"qpos": left_arm_state.tolist(), # numpy.array -> list |
|
|
|
"qvel": [], |
|
|
|
"torque": [], |
|
|
|
}, |
|
|
|
"right_arm": { |
|
|
|
"qpos": right_arm_state.tolist(), |
|
|
|
"qvel": [], |
|
|
|
"torque": [], |
|
|
|
}, |
|
|
|
"left_ee": { |
|
|
|
"qpos": left_ee_state, |
|
|
|
"qvel": [], |
|
|
|
"torque": [], |
|
|
|
}, |
|
|
|
"right_ee": { |
|
|
|
"qpos": right_ee_state, |
|
|
|
"qvel": [], |
|
|
|
"torque": [], |
|
|
|
}, |
|
|
|
"body": { |
|
|
|
"qpos": current_body_state, |
|
|
|
}, |
|
|
|
} |
|
|
|
actions = { |
|
|
|
"left_arm": { |
|
|
|
"qpos": left_arm_action.tolist(), |
|
|
|
"qvel": [], |
|
|
|
"torque": [], |
|
|
|
}, |
|
|
|
"right_arm": { |
|
|
|
"qpos": right_arm_action.tolist(), |
|
|
|
"qvel": [], |
|
|
|
"torque": [], |
|
|
|
}, |
|
|
|
"left_ee": { |
|
|
|
"qpos": left_hand_action, |
|
|
|
"qvel": [], |
|
|
|
"torque": [], |
|
|
|
}, |
|
|
|
"right_ee": { |
|
|
|
"qpos": right_hand_action, |
|
|
|
"qvel": [], |
|
|
|
"torque": [], |
|
|
|
}, |
|
|
|
"body": { |
|
|
|
"qpos": current_body_action, |
|
|
|
}, |
|
|
|
} |
|
|
|
recorder.add_item(colors=colors, depths=depths, states=states, actions=actions) |
|
|
|
elif args.ee == "gripper" and args.xr_mode == 'hand': |
|
|
|
with dual_gripper_data_lock: |
|
|
|
left_ee_state = [dual_gripper_state_array[1]] |
|
|
|
right_ee_state = [dual_gripper_state_array[0]] |
|
|
|
left_hand_action = [dual_gripper_action_array[1]] |
|
|
|
right_hand_action = [dual_gripper_action_array[0]] |
|
|
|
current_body_state = [] |
|
|
|
current_body_action = [] |
|
|
|
elif args.ee == "gripper" and args.xr_mode == 'controller': |
|
|
|
with dual_gripper_data_lock: |
|
|
|
left_ee_state = [dual_gripper_state_array[1]] |
|
|
|
right_ee_state = [dual_gripper_state_array[0]] |
|
|
|
left_hand_action = [dual_gripper_action_array[1]] |
|
|
|
right_hand_action = [dual_gripper_action_array[0]] |
|
|
|
current_body_state = arm_ctrl.get_current_motor_q().tolist() |
|
|
|
current_body_action = [-tele_data.tele_state.left_thumbstick_value[1] * 0.3, |
|
|
|
-tele_data.tele_state.left_thumbstick_value[0] * 0.3, |
|
|
|
-tele_data.tele_state.right_thumbstick_value[0] * 0.3] |
|
|
|
elif args.ee == "inspire1" and args.xr_mode == 'hand': |
|
|
|
with dual_hand_data_lock: |
|
|
|
left_ee_state = dual_hand_state_array[:6] |
|
|
|
right_ee_state = dual_hand_state_array[-6:] |
|
|
|
left_hand_action = dual_hand_action_array[:6] |
|
|
|
right_hand_action = dual_hand_action_array[-6:] |
|
|
|
current_body_state = [] |
|
|
|
current_body_action = [] |
|
|
|
else: |
|
|
|
left_ee_state = [] |
|
|
|
right_ee_state = [] |
|
|
|
left_hand_action = [] |
|
|
|
right_hand_action = [] |
|
|
|
current_body_state = [] |
|
|
|
current_body_action = [] |
|
|
|
# head image |
|
|
|
current_tv_image = tv_img_array.copy() |
|
|
|
# wrist image |
|
|
|
if WRIST: |
|
|
|
current_wrist_image = wrist_img_array.copy() |
|
|
|
# arm state and action |
|
|
|
left_arm_state = current_lr_arm_q[:7] |
|
|
|
right_arm_state = current_lr_arm_q[-7:] |
|
|
|
left_arm_action = sol_q[:7] |
|
|
|
right_arm_action = sol_q[-7:] |
|
|
|
if recording: |
|
|
|
colors = {} |
|
|
|
depths = {} |
|
|
|
if BINOCULAR: |
|
|
|
colors[f"color_{0}"] = current_tv_image[:, :tv_img_shape[1]//2] |
|
|
|
colors[f"color_{1}"] = current_tv_image[:, tv_img_shape[1]//2:] |
|
|
|
if WRIST: |
|
|
|
colors[f"color_{2}"] = current_wrist_image[:, :wrist_img_shape[1]//2] |
|
|
|
colors[f"color_{3}"] = current_wrist_image[:, wrist_img_shape[1]//2:] |
|
|
|
else: |
|
|
|
colors[f"color_{0}"] = current_tv_image |
|
|
|
if WRIST: |
|
|
|
colors[f"color_{1}"] = current_wrist_image[:, :wrist_img_shape[1]//2] |
|
|
|
colors[f"color_{2}"] = current_wrist_image[:, wrist_img_shape[1]//2:] |
|
|
|
states = { |
|
|
|
"left_arm": { |
|
|
|
"qpos": left_arm_state.tolist(), # numpy.array -> list |
|
|
|
"qvel": [], |
|
|
|
"torque": [], |
|
|
|
}, |
|
|
|
"right_arm": { |
|
|
|
"qpos": right_arm_state.tolist(), |
|
|
|
"qvel": [], |
|
|
|
"torque": [], |
|
|
|
}, |
|
|
|
"left_ee": { |
|
|
|
"qpos": left_ee_state, |
|
|
|
"qvel": [], |
|
|
|
"torque": [], |
|
|
|
}, |
|
|
|
"right_ee": { |
|
|
|
"qpos": right_ee_state, |
|
|
|
"qvel": [], |
|
|
|
"torque": [], |
|
|
|
}, |
|
|
|
"body": { |
|
|
|
"qpos": current_body_state, |
|
|
|
}, |
|
|
|
} |
|
|
|
actions = { |
|
|
|
"left_arm": { |
|
|
|
"qpos": left_arm_action.tolist(), |
|
|
|
"qvel": [], |
|
|
|
"torque": [], |
|
|
|
}, |
|
|
|
"right_arm": { |
|
|
|
"qpos": right_arm_action.tolist(), |
|
|
|
"qvel": [], |
|
|
|
"torque": [], |
|
|
|
}, |
|
|
|
"left_ee": { |
|
|
|
"qpos": left_hand_action, |
|
|
|
"qvel": [], |
|
|
|
"torque": [], |
|
|
|
}, |
|
|
|
"right_ee": { |
|
|
|
"qpos": right_hand_action, |
|
|
|
"qvel": [], |
|
|
|
"torque": [], |
|
|
|
}, |
|
|
|
"body": { |
|
|
|
"qpos": current_body_action, |
|
|
|
}, |
|
|
|
} |
|
|
|
recorder.add_item(colors=colors, depths=depths, states=states, actions=actions) |
|
|
|
|
|
|
|
current_time = time.time() |
|
|
|
time_elapsed = current_time - start_time |
|
|
|
sleep_time = max(0, (1 / args.frequency) - time_elapsed) |
|
|
|
time.sleep(sleep_time) |
|
|
|
logger_mp.debug(f"main process sleep: {sleep_time}") |
|
|
|
current_time = time.time() |
|
|
|
time_elapsed = current_time - start_time |
|
|
|
sleep_time = max(0, (1 / args.frequency) - time_elapsed) |
|
|
|
time.sleep(sleep_time) |
|
|
|
logger_mp.debug(f"main process sleep: {sleep_time}") |
|
|
|
|
|
|
|
except KeyboardInterrupt: |
|
|
|
logger_mp.info("KeyboardInterrupt, exiting program...") |
|
|
|
|