|
|
|
@ -21,23 +21,17 @@ from teleop.robot_control.robot_hand_unitree import Dex3_1_Controller, Gripper_C |
|
|
|
from teleop.robot_control.robot_hand_inspire import Inspire_Controller |
|
|
|
from teleop.image_server.image_client import ImageClient |
|
|
|
from teleop.utils.episode_writer import EpisodeWriter |
|
|
|
from sshkeyboard import listen_keyboard, stop_listening |
|
|
|
|
|
|
|
# for simulation |
|
|
|
from unitree_sdk2py.core.channel import ChannelPublisher, ChannelFactoryInitialize |
|
|
|
from unitree_sdk2py.idl.std_msgs.msg.dds_ import String_ |
|
|
|
|
|
|
|
|
|
|
|
from sshkeyboard import listen_keyboard, stop_listening |
|
|
|
|
|
|
|
def publish_reset_category(category: int,publisher): |
|
|
|
# construct message |
|
|
|
msg = String_(data=str(category)) # pass data parameter directly during initialization |
|
|
|
|
|
|
|
# create publisher |
|
|
|
|
|
|
|
# publish message |
|
|
|
def publish_reset_category(category: int,publisher): # Scene Reset signal |
|
|
|
msg = String_(data=str(category)) |
|
|
|
publisher.Write(msg) |
|
|
|
print(f"published reset category: {category}") |
|
|
|
logger_mp.info(f"published reset category: {category}") |
|
|
|
|
|
|
|
# state transition |
|
|
|
start_signal = False |
|
|
|
running = True |
|
|
|
should_toggle_recording = False |
|
|
|
@ -61,20 +55,18 @@ if __name__ == '__main__': |
|
|
|
|
|
|
|
parser.add_argument('--xr-mode', type=str, choices=['hand', 'controller'], default='hand', help='Select XR device tracking source') |
|
|
|
parser.add_argument('--arm', type=str, choices=['G1_29', 'G1_23', 'H1_2', 'H1'], default='G1_29', help='Select arm controller') |
|
|
|
parser.add_argument('--ee', type=str, choices=['dex3', 'gripper', 'inspire1'],default='gripper', help='Select end effector controller') |
|
|
|
parser.add_argument('--ee', type=str, choices=['dex3', 'gripper', 'inspire1'], help='Select end effector controller') |
|
|
|
|
|
|
|
parser.add_argument('--record', action = 'store_true', help = 'Enable data recording') |
|
|
|
parser.add_argument('--debug', action = 'store_false', help = 'Enable debug mode') |
|
|
|
parser.add_argument('--headless', action='store_true', help='Run in headless mode (no display)') |
|
|
|
|
|
|
|
parser.add_argument('--is_use_sim', action = 'store_true', help = 'Use simulation or not') |
|
|
|
parser.set_defaults(is_use_sim = True) |
|
|
|
parser.add_argument('--motion', action = 'store_true', help = 'Enable motion control mode') |
|
|
|
parser.add_argument('--headless', action='store_true', help='Enable headless mode (no display)') |
|
|
|
parser.add_argument('--sim', action = 'store_true', help = 'Enable isaac simulation mode') |
|
|
|
|
|
|
|
args = parser.parse_args() |
|
|
|
logger_mp.info(f"args: {args}") |
|
|
|
|
|
|
|
# image client: img_config should be the same as the configuration in image_server.py (of Robot's development computing unit) |
|
|
|
if args.is_use_sim: |
|
|
|
if args.sim: |
|
|
|
img_config = { |
|
|
|
'fps': 30, |
|
|
|
'head_camera_type': 'opencv', |
|
|
|
@ -133,16 +125,16 @@ if __name__ == '__main__': |
|
|
|
|
|
|
|
# arm |
|
|
|
if args.arm == 'G1_29': |
|
|
|
arm_ctrl = G1_29_ArmController(debug_mode=args.debug, is_use_sim=args.is_use_sim) |
|
|
|
arm_ctrl = G1_29_ArmController(motion_mode=args.motion, simulation_mode=args.sim) |
|
|
|
arm_ik = G1_29_ArmIK() |
|
|
|
elif args.arm == 'G1_23': |
|
|
|
arm_ctrl = G1_23_ArmController(is_use_sim=args.is_use_sim) |
|
|
|
arm_ctrl = G1_23_ArmController(simulation_mode=args.sim) |
|
|
|
arm_ik = G1_23_ArmIK() |
|
|
|
elif args.arm == 'H1_2': |
|
|
|
arm_ctrl = H1_2_ArmController(is_use_sim=args.is_use_sim) |
|
|
|
arm_ctrl = H1_2_ArmController(simulation_mode=args.sim) |
|
|
|
arm_ik = H1_2_ArmIK() |
|
|
|
elif args.arm == 'H1': |
|
|
|
arm_ctrl = H1_ArmController(is_use_sim=args.is_use_sim) |
|
|
|
arm_ctrl = H1_ArmController(simulation_mode=args.sim) |
|
|
|
arm_ik = H1_ArmIK() |
|
|
|
|
|
|
|
# end-effector |
|
|
|
@ -159,7 +151,7 @@ if __name__ == '__main__': |
|
|
|
dual_gripper_data_lock = Lock() |
|
|
|
dual_gripper_state_array = Array('d', 2, lock=False) # current left, right gripper state(2) data. |
|
|
|
dual_gripper_action_array = Array('d', 2, lock=False) # current left, right gripper action(2) data. |
|
|
|
gripper_ctrl = Gripper_Controller(left_gripper_value, right_gripper_value, dual_gripper_data_lock, dual_gripper_state_array, dual_gripper_action_array, is_use_sim=args.is_use_sim) |
|
|
|
gripper_ctrl = Gripper_Controller(left_gripper_value, right_gripper_value, dual_gripper_data_lock, dual_gripper_state_array, dual_gripper_action_array, simulation_mode=args.sim) |
|
|
|
elif args.ee == "inspire1": |
|
|
|
left_hand_pos_array = Array('d', 75, lock = True) # [input] |
|
|
|
right_hand_pos_array = Array('d', 75, lock = True) # [input] |
|
|
|
@ -169,11 +161,14 @@ if __name__ == '__main__': |
|
|
|
hand_ctrl = Inspire_Controller(left_hand_pos_array, right_hand_pos_array, dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array) |
|
|
|
else: |
|
|
|
pass |
|
|
|
if args.is_use_sim: |
|
|
|
|
|
|
|
# simulation mode |
|
|
|
if args.sim: |
|
|
|
reset_pose_publisher = ChannelPublisher("rt/reset_pose/cmd", String_) |
|
|
|
reset_pose_publisher.Init() |
|
|
|
|
|
|
|
# xr mode |
|
|
|
if args.xr_mode == 'controller' and not args.debug: |
|
|
|
if args.xr_mode == 'controller' and args.motion: |
|
|
|
from unitree_sdk2py.g1.loco.g1_loco_client import LocoClient |
|
|
|
sport_client = LocoClient() |
|
|
|
sport_client.SetTimeout(0.0001) |
|
|
|
@ -199,12 +194,12 @@ if __name__ == '__main__': |
|
|
|
if key == ord('q'): |
|
|
|
stop_listening() |
|
|
|
running = False |
|
|
|
if args.is_use_sim: |
|
|
|
if args.sim: |
|
|
|
publish_reset_category(2, reset_pose_publisher) |
|
|
|
elif key == ord('s'): |
|
|
|
should_toggle_recording = True |
|
|
|
elif key == ord('a'): |
|
|
|
if args.is_use_sim: |
|
|
|
if args.sim: |
|
|
|
publish_reset_category(2, reset_pose_publisher) |
|
|
|
|
|
|
|
if args.record and should_toggle_recording: |
|
|
|
@ -217,7 +212,7 @@ if __name__ == '__main__': |
|
|
|
else: |
|
|
|
is_recording = False |
|
|
|
recorder.save_episode() |
|
|
|
if args.is_use_sim: |
|
|
|
if args.sim: |
|
|
|
publish_reset_category(1, reset_pose_publisher) |
|
|
|
# get input data |
|
|
|
tele_data = tv_wrapper.get_motion_state_data() |
|
|
|
@ -240,7 +235,7 @@ if __name__ == '__main__': |
|
|
|
pass |
|
|
|
|
|
|
|
# high level control |
|
|
|
if args.xr_mode == 'controller' and not args.debug: |
|
|
|
if args.xr_mode == 'controller' and args.motion: |
|
|
|
# quit teleoperate |
|
|
|
if tele_data.tele_state.right_aButton: |
|
|
|
running = False |
|
|
|
|