|
|
@ -21,8 +21,22 @@ from teleop.robot_control.robot_hand_unitree import Dex3_1_Controller, Gripper_C |
|
|
from teleop.robot_control.robot_hand_inspire import Inspire_Controller |
|
|
from teleop.robot_control.robot_hand_inspire import Inspire_Controller |
|
|
from teleop.image_server.image_client import ImageClient |
|
|
from teleop.image_server.image_client import ImageClient |
|
|
from teleop.utils.episode_writer import EpisodeWriter |
|
|
from teleop.utils.episode_writer import EpisodeWriter |
|
|
|
|
|
|
|
|
|
|
|
from unitree_sdk2py.core.channel import ChannelPublisher, ChannelFactoryInitialize |
|
|
|
|
|
from unitree_sdk2py.idl.std_msgs.msg.dds_ import String_ |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from sshkeyboard import listen_keyboard, stop_listening |
|
|
from sshkeyboard import listen_keyboard, stop_listening |
|
|
|
|
|
|
|
|
|
|
|
def publish_reset_category(category: int,publisher): |
|
|
|
|
|
# construct message |
|
|
|
|
|
msg = String_(data=str(category)) # pass data parameter directly during initialization |
|
|
|
|
|
|
|
|
|
|
|
# create publisher |
|
|
|
|
|
|
|
|
|
|
|
# publish message |
|
|
|
|
|
publisher.Write(msg) |
|
|
|
|
|
print(f"published reset category: {category}") |
|
|
|
|
|
|
|
|
start_signal = False |
|
|
start_signal = False |
|
|
running = True |
|
|
running = True |
|
|
@ -47,14 +61,14 @@ if __name__ == '__main__': |
|
|
|
|
|
|
|
|
parser.add_argument('--xr-mode', type=str, choices=['hand', 'controller'], default='hand', help='Select XR device tracking source') |
|
|
parser.add_argument('--xr-mode', type=str, choices=['hand', 'controller'], default='hand', help='Select XR device tracking source') |
|
|
parser.add_argument('--arm', type=str, choices=['G1_29', 'G1_23', 'H1_2', 'H1'], default='G1_29', help='Select arm controller') |
|
|
parser.add_argument('--arm', type=str, choices=['G1_29', 'G1_23', 'H1_2', 'H1'], default='G1_29', help='Select arm controller') |
|
|
parser.add_argument('--ee', type=str, choices=['dex3', 'gripper', 'inspire1'], help='Select end effector controller') |
|
|
|
|
|
|
|
|
parser.add_argument('--ee', type=str, choices=['dex3', 'gripper', 'inspire1'],default='gripper', help='Select end effector controller') |
|
|
|
|
|
|
|
|
parser.add_argument('--record', action = 'store_true', help = 'Enable data recording') |
|
|
parser.add_argument('--record', action = 'store_true', help = 'Enable data recording') |
|
|
parser.add_argument('--debug', action = 'store_false', help = 'Enable debug mode') |
|
|
parser.add_argument('--debug', action = 'store_false', help = 'Enable debug mode') |
|
|
parser.add_argument('--headless', action='store_true', help='Run in headless mode (no display)') |
|
|
parser.add_argument('--headless', action='store_true', help='Run in headless mode (no display)') |
|
|
|
|
|
|
|
|
parser.add_argument('--is_use_sim', action = 'store_true', help = 'Use simulation or not') |
|
|
parser.add_argument('--is_use_sim', action = 'store_true', help = 'Use simulation or not') |
|
|
parser.set_defaults(is_use_sim = False) |
|
|
|
|
|
|
|
|
parser.set_defaults(is_use_sim = True) |
|
|
|
|
|
|
|
|
args = parser.parse_args() |
|
|
args = parser.parse_args() |
|
|
logger_mp.info(f"args: {args}") |
|
|
logger_mp.info(f"args: {args}") |
|
|
@ -155,7 +169,9 @@ if __name__ == '__main__': |
|
|
hand_ctrl = Inspire_Controller(left_hand_pos_array, right_hand_pos_array, dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array) |
|
|
hand_ctrl = Inspire_Controller(left_hand_pos_array, right_hand_pos_array, dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array) |
|
|
else: |
|
|
else: |
|
|
pass |
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
if args.is_use_sim: |
|
|
|
|
|
reset_pose_publisher = ChannelPublisher("rt/reset_pose/cmd", String_) |
|
|
|
|
|
reset_pose_publisher.Init() |
|
|
# xr mode |
|
|
# xr mode |
|
|
if args.xr_mode == 'controller' and not args.debug: |
|
|
if args.xr_mode == 'controller' and not args.debug: |
|
|
from unitree_sdk2py.g1.loco.g1_loco_client import LocoClient |
|
|
from unitree_sdk2py.g1.loco.g1_loco_client import LocoClient |
|
|
@ -183,8 +199,13 @@ if __name__ == '__main__': |
|
|
if key == ord('q'): |
|
|
if key == ord('q'): |
|
|
stop_listening() |
|
|
stop_listening() |
|
|
running = False |
|
|
running = False |
|
|
|
|
|
if args.is_use_sim: |
|
|
|
|
|
publish_reset_category(2, reset_pose_publisher) |
|
|
elif key == ord('s'): |
|
|
elif key == ord('s'): |
|
|
should_toggle_recording = True |
|
|
should_toggle_recording = True |
|
|
|
|
|
elif key == ord('a'): |
|
|
|
|
|
if args.is_use_sim: |
|
|
|
|
|
publish_reset_category(2, reset_pose_publisher) |
|
|
|
|
|
|
|
|
if args.record and should_toggle_recording: |
|
|
if args.record and should_toggle_recording: |
|
|
should_toggle_recording = False |
|
|
should_toggle_recording = False |
|
|
@ -196,7 +217,8 @@ if __name__ == '__main__': |
|
|
else: |
|
|
else: |
|
|
is_recording = False |
|
|
is_recording = False |
|
|
recorder.save_episode() |
|
|
recorder.save_episode() |
|
|
|
|
|
|
|
|
|
|
|
if args.is_use_sim: |
|
|
|
|
|
publish_reset_category(1, reset_pose_publisher) |
|
|
# get input data |
|
|
# get input data |
|
|
tele_data = tv_wrapper.get_motion_state_data() |
|
|
tele_data = tv_wrapper.get_motion_state_data() |
|
|
if (args.ee == 'dex3' or args.ee == 'inspire1') and args.xr_mode == 'hand': |
|
|
if (args.ee == 'dex3' or args.ee == 'inspire1') and args.xr_mode == 'hand': |
|
|
|