From 742460a7d5d4eb37894cb2026e73161e758cee45 Mon Sep 17 00:00:00 2001 From: silencht Date: Thu, 26 Jun 2025 11:41:48 +0800 Subject: [PATCH] [fix] minor issues --- teleop/robot_control/robot_arm.py | 36 +++++++------- teleop/robot_control/robot_hand_unitree.py | 6 +-- teleop/teleop_hand_and_arm.py | 55 ++++++++++------------ 3 files changed, 46 insertions(+), 51 deletions(-) diff --git a/teleop/robot_control/robot_arm.py b/teleop/robot_control/robot_arm.py index cdd24c1..49ac742 100644 --- a/teleop/robot_control/robot_arm.py +++ b/teleop/robot_control/robot_arm.py @@ -59,12 +59,12 @@ class DataBuffer: self.data = data class G1_29_ArmController: - def __init__(self, debug_mode = True, is_use_sim = False): + def __init__(self, motion_mode = False, simulation_mode = False): logger_mp.info("Initialize G1_29_ArmController...") self.q_target = np.zeros(14) self.tauff_target = np.zeros(14) - self.debug_mode = debug_mode - self.is_use_sim = is_use_sim + self.motion_mode = motion_mode + self.simulation_mode = simulation_mode self.kp_high = 300.0 self.kd_high = 3.0 self.kp_low = 80.0 @@ -82,10 +82,10 @@ class G1_29_ArmController: # initialize lowcmd publisher and lowstate subscriber ChannelFactoryInitialize(0) - if self.debug_mode: - self.lowcmd_publisher = ChannelPublisher(kTopicLowCommand_Debug, hg_LowCmd) - else: + if self.motion_mode: self.lowcmd_publisher = ChannelPublisher(kTopicLowCommand_Motion, hg_LowCmd) + else: + self.lowcmd_publisher = ChannelPublisher(kTopicLowCommand_Debug, hg_LowCmd) self.lowcmd_publisher.Init() self.lowstate_subscriber = ChannelSubscriber(kTopicLowState, hg_LowState) self.lowstate_subscriber.Init() @@ -159,7 +159,7 @@ class G1_29_ArmController: return cliped_arm_q_target def _ctrl_motor_state(self): - if not self.debug_mode: + if self.motion_mode: self.msg.motor_cmd[G1_29_JointIndex.kNotUsedJoint0].q = 1.0; while True: @@ -169,7 +169,7 @@ class G1_29_ArmController: arm_q_target = self.q_target arm_tauff_target = self.tauff_target - if self.is_use_sim: + if self.simulation_mode: cliped_arm_q_target = arm_q_target else: cliped_arm_q_target = self.clip_arm_q_target(arm_q_target, velocity_limit = self.arm_velocity_limit) @@ -225,7 +225,7 @@ class G1_29_ArmController: while True: current_q = self.get_current_dual_arm_q() if np.all(np.abs(current_q) < tolerance): - if not self.debug_mode: + if self.motion_mode: for weight in np.arange(1, 0, -0.01): self.msg.motor_cmd[G1_29_JointIndex.kNotUsedJoint0].q = weight; time.sleep(0.02) @@ -338,8 +338,8 @@ class G1_29_JointIndex(IntEnum): kNotUsedJoint5 = 34 class G1_23_ArmController: - def __init__(self, is_use_sim = False): - self.is_use_sim = is_use_sim + def __init__(self, simulation_mode = False): + self.simulation_mode = simulation_mode logger_mp.info("Initialize G1_23_ArmController...") self.q_target = np.zeros(10) @@ -443,7 +443,7 @@ class G1_23_ArmController: arm_q_target = self.q_target arm_tauff_target = self.tauff_target - if self.is_use_sim: + if self.simulation_mode: cliped_arm_q_target = arm_q_target else: cliped_arm_q_target = self.clip_arm_q_target(arm_q_target, velocity_limit = self.arm_velocity_limit) @@ -600,8 +600,8 @@ class G1_23_JointIndex(IntEnum): kNotUsedJoint5 = 34 class H1_2_ArmController: - def __init__(self, is_use_sim = False): - self.is_use_sim = is_use_sim + def __init__(self, simulation_mode = False): + self.simulation_mode = simulation_mode logger_mp.info("Initialize H1_2_ArmController...") self.q_target = np.zeros(14) @@ -705,7 +705,7 @@ class H1_2_ArmController: arm_q_target = self.q_target arm_tauff_target = self.tauff_target - if self.is_use_sim: + if self.simulation_mode: cliped_arm_q_target = arm_q_target else: cliped_arm_q_target = self.clip_arm_q_target(arm_q_target, velocity_limit = self.arm_velocity_limit) @@ -869,8 +869,8 @@ class H1_2_JointIndex(IntEnum): kNotUsedJoint7 = 34 class H1_ArmController: - def __init__(self, is_use_sim = False): - self.is_use_sim = is_use_sim + def __init__(self, simulation_mode = False): + self.simulation_mode = simulation_mode logger_mp.info("Initialize H1_ArmController...") self.q_target = np.zeros(8) @@ -966,7 +966,7 @@ class H1_ArmController: arm_q_target = self.q_target arm_tauff_target = self.tauff_target - if self.is_use_sim: + if self.simulation_mode: cliped_arm_q_target = arm_q_target else: cliped_arm_q_target = self.clip_arm_q_target(arm_q_target, velocity_limit = self.arm_velocity_limit) diff --git a/teleop/robot_control/robot_hand_unitree.py b/teleop/robot_control/robot_hand_unitree.py index 909801d..320dc2f 100644 --- a/teleop/robot_control/robot_hand_unitree.py +++ b/teleop/robot_control/robot_hand_unitree.py @@ -236,7 +236,7 @@ kTopicGripperState = "rt/unitree_actuator/state" class Gripper_Controller: def __init__(self, left_gripper_value_in, right_gripper_value_in, dual_gripper_data_lock = None, dual_gripper_state_out = None, dual_gripper_action_out = None, - filter = True, fps = 200.0, Unit_Test = False, is_use_sim = False): + filter = True, fps = 200.0, Unit_Test = False, simulation_mode = False): """ [note] A *_array type parameter requires using a multiprocessing Array, because it needs to be passed to the internal child process @@ -259,7 +259,7 @@ class Gripper_Controller: self.fps = fps self.Unit_Test = Unit_Test - self.is_use_sim = is_use_sim + self.simulation_mode = simulation_mode if filter: self.smooth_filter = WeightedMovingFilter(np.array([0.5, 0.3, 0.2]), Gripper_Num_Motors) @@ -316,7 +316,7 @@ class Gripper_Controller: def control_thread(self, left_gripper_value_in, right_gripper_value_in, dual_gripper_state_in, dual_hand_data_lock = None, dual_gripper_state_out = None, dual_gripper_action_out = None): self.running = True - if self.is_use_sim: + if self.simulation_mode: DELTA_GRIPPER_CMD = 1.0 else: DELTA_GRIPPER_CMD = 0.18 # The motor rotates 5.4 radians, the clamping jaw slide open 9 cm, so 0.6 rad <==> 1 cm, 0.18 rad <==> 3 mm diff --git a/teleop/teleop_hand_and_arm.py b/teleop/teleop_hand_and_arm.py index e6449f0..9f578be 100644 --- a/teleop/teleop_hand_and_arm.py +++ b/teleop/teleop_hand_and_arm.py @@ -21,23 +21,17 @@ from teleop.robot_control.robot_hand_unitree import Dex3_1_Controller, Gripper_C from teleop.robot_control.robot_hand_inspire import Inspire_Controller from teleop.image_server.image_client import ImageClient from teleop.utils.episode_writer import EpisodeWriter +from sshkeyboard import listen_keyboard, stop_listening +# for simulation from unitree_sdk2py.core.channel import ChannelPublisher, ChannelFactoryInitialize from unitree_sdk2py.idl.std_msgs.msg.dds_ import String_ - - -from sshkeyboard import listen_keyboard, stop_listening - -def publish_reset_category(category: int,publisher): - # construct message - msg = String_(data=str(category)) # pass data parameter directly during initialization - - # create publisher - - # publish message +def publish_reset_category(category: int,publisher): # Scene Reset signal + msg = String_(data=str(category)) publisher.Write(msg) - print(f"published reset category: {category}") + logger_mp.info(f"published reset category: {category}") +# state transition start_signal = False running = True should_toggle_recording = False @@ -61,20 +55,18 @@ if __name__ == '__main__': parser.add_argument('--xr-mode', type=str, choices=['hand', 'controller'], default='hand', help='Select XR device tracking source') parser.add_argument('--arm', type=str, choices=['G1_29', 'G1_23', 'H1_2', 'H1'], default='G1_29', help='Select arm controller') - parser.add_argument('--ee', type=str, choices=['dex3', 'gripper', 'inspire1'],default='gripper', help='Select end effector controller') + parser.add_argument('--ee', type=str, choices=['dex3', 'gripper', 'inspire1'], help='Select end effector controller') parser.add_argument('--record', action = 'store_true', help = 'Enable data recording') - parser.add_argument('--debug', action = 'store_false', help = 'Enable debug mode') - parser.add_argument('--headless', action='store_true', help='Run in headless mode (no display)') - - parser.add_argument('--is_use_sim', action = 'store_true', help = 'Use simulation or not') - parser.set_defaults(is_use_sim = True) + parser.add_argument('--motion', action = 'store_true', help = 'Enable motion control mode') + parser.add_argument('--headless', action='store_true', help='Enable headless mode (no display)') + parser.add_argument('--sim', action = 'store_true', help = 'Enable isaac simulation mode') args = parser.parse_args() logger_mp.info(f"args: {args}") # image client: img_config should be the same as the configuration in image_server.py (of Robot's development computing unit) - if args.is_use_sim: + if args.sim: img_config = { 'fps': 30, 'head_camera_type': 'opencv', @@ -133,16 +125,16 @@ if __name__ == '__main__': # arm if args.arm == 'G1_29': - arm_ctrl = G1_29_ArmController(debug_mode=args.debug, is_use_sim=args.is_use_sim) + arm_ctrl = G1_29_ArmController(motion_mode=args.motion, simulation_mode=args.sim) arm_ik = G1_29_ArmIK() elif args.arm == 'G1_23': - arm_ctrl = G1_23_ArmController(is_use_sim=args.is_use_sim) + arm_ctrl = G1_23_ArmController(simulation_mode=args.sim) arm_ik = G1_23_ArmIK() elif args.arm == 'H1_2': - arm_ctrl = H1_2_ArmController(is_use_sim=args.is_use_sim) + arm_ctrl = H1_2_ArmController(simulation_mode=args.sim) arm_ik = H1_2_ArmIK() elif args.arm == 'H1': - arm_ctrl = H1_ArmController(is_use_sim=args.is_use_sim) + arm_ctrl = H1_ArmController(simulation_mode=args.sim) arm_ik = H1_ArmIK() # end-effector @@ -159,7 +151,7 @@ if __name__ == '__main__': dual_gripper_data_lock = Lock() dual_gripper_state_array = Array('d', 2, lock=False) # current left, right gripper state(2) data. dual_gripper_action_array = Array('d', 2, lock=False) # current left, right gripper action(2) data. - gripper_ctrl = Gripper_Controller(left_gripper_value, right_gripper_value, dual_gripper_data_lock, dual_gripper_state_array, dual_gripper_action_array, is_use_sim=args.is_use_sim) + gripper_ctrl = Gripper_Controller(left_gripper_value, right_gripper_value, dual_gripper_data_lock, dual_gripper_state_array, dual_gripper_action_array, simulation_mode=args.sim) elif args.ee == "inspire1": left_hand_pos_array = Array('d', 75, lock = True) # [input] right_hand_pos_array = Array('d', 75, lock = True) # [input] @@ -169,11 +161,14 @@ if __name__ == '__main__': hand_ctrl = Inspire_Controller(left_hand_pos_array, right_hand_pos_array, dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array) else: pass - if args.is_use_sim: + + # simulation mode + if args.sim: reset_pose_publisher = ChannelPublisher("rt/reset_pose/cmd", String_) reset_pose_publisher.Init() + # xr mode - if args.xr_mode == 'controller' and not args.debug: + if args.xr_mode == 'controller' and args.motion: from unitree_sdk2py.g1.loco.g1_loco_client import LocoClient sport_client = LocoClient() sport_client.SetTimeout(0.0001) @@ -199,12 +194,12 @@ if __name__ == '__main__': if key == ord('q'): stop_listening() running = False - if args.is_use_sim: + if args.sim: publish_reset_category(2, reset_pose_publisher) elif key == ord('s'): should_toggle_recording = True elif key == ord('a'): - if args.is_use_sim: + if args.sim: publish_reset_category(2, reset_pose_publisher) if args.record and should_toggle_recording: @@ -217,7 +212,7 @@ if __name__ == '__main__': else: is_recording = False recorder.save_episode() - if args.is_use_sim: + if args.sim: publish_reset_category(1, reset_pose_publisher) # get input data tele_data = tv_wrapper.get_motion_state_data() @@ -240,7 +235,7 @@ if __name__ == '__main__': pass # high level control - if args.xr_mode == 'controller' and not args.debug: + if args.xr_mode == 'controller' and args.motion: # quit teleoperate if tele_data.tele_state.right_aButton: running = False