diff --git a/teleop/robot_control/robot_arm.py b/teleop/robot_control/robot_arm.py index a0a4f43..acaa8e2 100644 --- a/teleop/robot_control/robot_arm.py +++ b/teleop/robot_control/robot_arm.py @@ -112,8 +112,8 @@ class G1_29_ArmController: self.msg.mode_machine = self.get_mode_machine() self.all_motor_q = self.get_current_motor_q() - logger_mp.info(f"Current all body motor state q:\n{self.all_motor_q} \n") - logger_mp.info(f"Current two arms motor state q:\n{self.get_current_dual_arm_q()}\n") + logger_mp.debug(f"Current all body motor state q:\n{self.all_motor_q} \n") + logger_mp.debug(f"Current two arms motor state q:\n{self.get_current_dual_arm_q()}\n") logger_mp.info("Lock all joints except two arms...\n") arm_indices = set(member.value for member in G1_29_JointArmIndex) diff --git a/teleop/robot_control/robot_hand_unitree.py b/teleop/robot_control/robot_hand_unitree.py index 10e5811..90fd806 100644 --- a/teleop/robot_control/robot_hand_unitree.py +++ b/teleop/robot_control/robot_hand_unitree.py @@ -328,10 +328,7 @@ class Dex1_1_Gripper_Controller: def control_thread(self, left_gripper_value_in, right_gripper_value_in, left_gripper_state_value, right_gripper_state_value, dual_hand_data_lock = None, dual_gripper_state_out = None, dual_gripper_action_out = None): self.running = True - if self.simulation_mode: - DELTA_GRIPPER_CMD = 0.18 - else: - DELTA_GRIPPER_CMD = 0.18 # The motor rotates 5.4 radians, the clamping jaw slide open 9 cm, so 0.6 rad <==> 1 cm, 0.18 rad <==> 3 mm + DELTA_GRIPPER_CMD = 0.18 # The motor rotates 5.4 radians, the clamping jaw slide open 9 cm, so 0.6 rad <==> 1 cm, 0.18 rad <==> 3 mm THUMB_INDEX_DISTANCE_MIN = 5.0 THUMB_INDEX_DISTANCE_MAX = 7.0 LEFT_MAPPED_MIN = 0.0 # The minimum initial motor position when the gripper closes at startup. diff --git a/teleop/teleop_hand_and_arm.py b/teleop/teleop_hand_and_arm.py index aa2b7fd..4f838cf 100644 --- a/teleop/teleop_hand_and_arm.py +++ b/teleop/teleop_hand_and_arm.py @@ -22,6 +22,7 @@ from teleop.robot_control.robot_hand_inspire import Inspire_Controller from teleop.robot_control.robot_hand_brainco import Brainco_Controller from teleop.image_server.image_client import ImageClient from teleop.utils.episode_writer import EpisodeWriter +from teleop.utils.ipc import IPC_Server from sshkeyboard import listen_keyboard, stop_listening # for simulation @@ -33,29 +34,50 @@ def publish_reset_category(category: int,publisher): # Scene Reset signal logger_mp.info(f"published reset category: {category}") # state transition -start_signal = False -running = True -should_toggle_recording = False -is_recording = False +START = False +STOP = False +RECORD_TOGGLE = False +RECORD_RUNNING = False +RECORD_READY = True +# task info +TASK_NAME = None +TASK_DESC = None +ITEM_ID = None def on_press(key): - global running, start_signal, should_toggle_recording + global STOP, START, RECORD_TOGGLE if key == 'r': - start_signal = True - logger_mp.info("Program start signal received.") - elif key == 'q' and start_signal == True: - stop_listening() - running = False - elif key == 's' and start_signal == True: - should_toggle_recording = True + START = True + elif key == 'q': + STOP = True + elif key == 's' and START == True: + RECORD_TOGGLE = True else: - logger_mp.info(f"{key} was pressed, but no action is defined for this key.") + logger_mp.warning(f"[on_press] {key} was pressed, but no action is defined for this key.") + +def on_info(info): + """Only handle CMD_TOGGLE_RECORD's task info""" + global TASK_NAME, TASK_DESC, ITEM_ID + TASK_NAME = info.get("task_name") + TASK_DESC = info.get("task_desc") + ITEM_ID = info.get("item_id") + logger_mp.debug(f"[on_info] Updated globals: {TASK_NAME}, {TASK_DESC}, {ITEM_ID}") + +def get_state() -> dict: + """Return current heartbeat state""" + global START, STOP, RECORD_RUNNING, RECORD_READY + return { + "START": START, + "STOP": STOP, + "RECORD_RUNNING": RECORD_RUNNING, + "RECORD_READY": RECORD_READY, + } +# sshkeyboard communication listen_keyboard_thread = threading.Thread(target=listen_keyboard, kwargs={"on_press": on_press, "until": None, "sequential": False,}, daemon=True) listen_keyboard_thread.start() if __name__ == '__main__': parser = argparse.ArgumentParser() - parser.add_argument('--task_dir', type = str, default = './utils/data/', help = 'path to save data') - parser.add_argument('--frequency', type = float, default = 60.0, help = 'save data\'s frequency') + parser.add_argument('--frequency', type = float, default = 30.0, help = 'save data\'s frequency') # basic control parameters parser.add_argument('--xr-mode', type=str, choices=['hand', 'controller'], default='hand', help='Select XR device tracking source') @@ -65,178 +87,204 @@ if __name__ == '__main__': parser.add_argument('--motion', action = 'store_true', help = 'Enable motion control mode') parser.add_argument('--headless', action='store_true', help='Enable headless mode (no display)') parser.add_argument('--sim', action = 'store_true', help = 'Enable isaac simulation mode') + parser.add_argument('--affinity', action = 'store_true', help = 'Enable high priority and set CPU affinity') + parser.add_argument('--ipc', action = 'store_true', help = 'Enable high priority and set CPU affinity') parser.add_argument('--record', action = 'store_true', help = 'Enable data recording') + parser.add_argument('--task-dir', type = str, default = '../../data/', help = 'path to save data') parser.add_argument('--task-name', type = str, default = 'pick cube', help = 'task name for recording') - parser.add_argument('--task-goal', type = str, default = 'e.g. pick the red cube on the table.', help = 'task goal for recording') + parser.add_argument('--task-desc', type = str, default = 'e.g. pick the red cube on the table.', help = 'task goal for recording') args = parser.parse_args() logger_mp.info(f"args: {args}") - # image client: img_config should be the same as the configuration in image_server.py (of Robot's development computing unit) - if args.sim: - img_config = { - 'fps': 30, - 'head_camera_type': 'opencv', - 'head_camera_image_shape': [480, 640], # Head camera resolution - 'head_camera_id_numbers': [0], - 'wrist_camera_type': 'opencv', - 'wrist_camera_image_shape': [480, 640], # Wrist camera resolution - 'wrist_camera_id_numbers': [2, 4], - } - else: - img_config = { - 'fps': 30, - 'head_camera_type': 'opencv', - 'head_camera_image_shape': [480, 1280], # Head camera resolution - 'head_camera_id_numbers': [0], - 'wrist_camera_type': 'opencv', - 'wrist_camera_image_shape': [480, 640], # Wrist camera resolution - 'wrist_camera_id_numbers': [2, 4], - } + try: + # ipc communication + if args.ipc: + ipc_server = IPC_Server(on_press=on_press, on_info=on_info, get_state=get_state) + ipc_server.start() + # image client: img_config should be the same as the configuration in image_server.py (of Robot's development computing unit) + if args.sim: + img_config = { + 'fps': 30, + 'head_camera_type': 'opencv', + 'head_camera_image_shape': [480, 640], # Head camera resolution + 'head_camera_id_numbers': [0], + 'wrist_camera_type': 'opencv', + 'wrist_camera_image_shape': [480, 640], # Wrist camera resolution + 'wrist_camera_id_numbers': [2, 4], + } + else: + img_config = { + 'fps': 30, + 'head_camera_type': 'opencv', + 'head_camera_image_shape': [480, 1280], # Head camera resolution + 'head_camera_id_numbers': [0], + 'wrist_camera_type': 'opencv', + 'wrist_camera_image_shape': [480, 640], # Wrist camera resolution + 'wrist_camera_id_numbers': [2, 4], + } - ASPECT_RATIO_THRESHOLD = 2.0 # If the aspect ratio exceeds this value, it is considered binocular - if len(img_config['head_camera_id_numbers']) > 1 or (img_config['head_camera_image_shape'][1] / img_config['head_camera_image_shape'][0] > ASPECT_RATIO_THRESHOLD): - BINOCULAR = True - else: - BINOCULAR = False - if 'wrist_camera_type' in img_config: - WRIST = True - else: - WRIST = False - - if BINOCULAR and not (img_config['head_camera_image_shape'][1] / img_config['head_camera_image_shape'][0] > ASPECT_RATIO_THRESHOLD): - tv_img_shape = (img_config['head_camera_image_shape'][0], img_config['head_camera_image_shape'][1] * 2, 3) - else: - tv_img_shape = (img_config['head_camera_image_shape'][0], img_config['head_camera_image_shape'][1], 3) - tv_img_shm = shared_memory.SharedMemory(create = True, size = np.prod(tv_img_shape) * np.uint8().itemsize) - tv_img_array = np.ndarray(tv_img_shape, dtype = np.uint8, buffer = tv_img_shm.buf) + ASPECT_RATIO_THRESHOLD = 2.0 # If the aspect ratio exceeds this value, it is considered binocular + if len(img_config['head_camera_id_numbers']) > 1 or (img_config['head_camera_image_shape'][1] / img_config['head_camera_image_shape'][0] > ASPECT_RATIO_THRESHOLD): + BINOCULAR = True + else: + BINOCULAR = False + if 'wrist_camera_type' in img_config: + WRIST = True + else: + WRIST = False + + if BINOCULAR and not (img_config['head_camera_image_shape'][1] / img_config['head_camera_image_shape'][0] > ASPECT_RATIO_THRESHOLD): + tv_img_shape = (img_config['head_camera_image_shape'][0], img_config['head_camera_image_shape'][1] * 2, 3) + else: + tv_img_shape = (img_config['head_camera_image_shape'][0], img_config['head_camera_image_shape'][1], 3) - if WRIST and args.sim: - wrist_img_shape = (img_config['wrist_camera_image_shape'][0], img_config['wrist_camera_image_shape'][1] * 2, 3) - wrist_img_shm = shared_memory.SharedMemory(create = True, size = np.prod(wrist_img_shape) * np.uint8().itemsize) - wrist_img_array = np.ndarray(wrist_img_shape, dtype = np.uint8, buffer = wrist_img_shm.buf) - img_client = ImageClient(tv_img_shape = tv_img_shape, tv_img_shm_name = tv_img_shm.name, - wrist_img_shape = wrist_img_shape, wrist_img_shm_name = wrist_img_shm.name, server_address="127.0.0.1") - elif WRIST and not args.sim: - wrist_img_shape = (img_config['wrist_camera_image_shape'][0], img_config['wrist_camera_image_shape'][1] * 2, 3) - wrist_img_shm = shared_memory.SharedMemory(create = True, size = np.prod(wrist_img_shape) * np.uint8().itemsize) - wrist_img_array = np.ndarray(wrist_img_shape, dtype = np.uint8, buffer = wrist_img_shm.buf) - img_client = ImageClient(tv_img_shape = tv_img_shape, tv_img_shm_name = tv_img_shm.name, - wrist_img_shape = wrist_img_shape, wrist_img_shm_name = wrist_img_shm.name) - else: - img_client = ImageClient(tv_img_shape = tv_img_shape, tv_img_shm_name = tv_img_shm.name) + tv_img_shm = shared_memory.SharedMemory(create = True, size = np.prod(tv_img_shape) * np.uint8().itemsize) + tv_img_array = np.ndarray(tv_img_shape, dtype = np.uint8, buffer = tv_img_shm.buf) - image_receive_thread = threading.Thread(target = img_client.receive_process, daemon = True) - image_receive_thread.daemon = True - image_receive_thread.start() + if WRIST and args.sim: + wrist_img_shape = (img_config['wrist_camera_image_shape'][0], img_config['wrist_camera_image_shape'][1] * 2, 3) + wrist_img_shm = shared_memory.SharedMemory(create = True, size = np.prod(wrist_img_shape) * np.uint8().itemsize) + wrist_img_array = np.ndarray(wrist_img_shape, dtype = np.uint8, buffer = wrist_img_shm.buf) + img_client = ImageClient(tv_img_shape = tv_img_shape, tv_img_shm_name = tv_img_shm.name, + wrist_img_shape = wrist_img_shape, wrist_img_shm_name = wrist_img_shm.name, server_address="127.0.0.1") + elif WRIST and not args.sim: + wrist_img_shape = (img_config['wrist_camera_image_shape'][0], img_config['wrist_camera_image_shape'][1] * 2, 3) + wrist_img_shm = shared_memory.SharedMemory(create = True, size = np.prod(wrist_img_shape) * np.uint8().itemsize) + wrist_img_array = np.ndarray(wrist_img_shape, dtype = np.uint8, buffer = wrist_img_shm.buf) + img_client = ImageClient(tv_img_shape = tv_img_shape, tv_img_shm_name = tv_img_shm.name, + wrist_img_shape = wrist_img_shape, wrist_img_shm_name = wrist_img_shm.name) + else: + img_client = ImageClient(tv_img_shape = tv_img_shape, tv_img_shm_name = tv_img_shm.name) - # television: obtain hand pose data from the XR device and transmit the robot's head camera image to the XR device. - tv_wrapper = TeleVuerWrapper(binocular=BINOCULAR, use_hand_tracking=args.xr_mode == "hand", img_shape=tv_img_shape, img_shm_name=tv_img_shm.name, - return_state_data=True, return_hand_rot_data = False) + image_receive_thread = threading.Thread(target = img_client.receive_process, daemon = True) + image_receive_thread.daemon = True + image_receive_thread.start() - # arm - if args.arm == "G1_29": - arm_ik = G1_29_ArmIK() - arm_ctrl = G1_29_ArmController(motion_mode=args.motion, simulation_mode=args.sim) - elif args.arm == "G1_23": - arm_ik = G1_23_ArmIK() - arm_ctrl = G1_23_ArmController(motion_mode=args.motion, simulation_mode=args.sim) - elif args.arm == "H1_2": - arm_ik = H1_2_ArmIK() - arm_ctrl = H1_2_ArmController(motion_mode=args.motion, simulation_mode=args.sim) - elif args.arm == "H1": - arm_ik = H1_ArmIK() - arm_ctrl = H1_ArmController(simulation_mode=args.sim) + # television: obtain hand pose data from the XR device and transmit the robot's head camera image to the XR device. + tv_wrapper = TeleVuerWrapper(binocular=BINOCULAR, use_hand_tracking=args.xr_mode == "hand", img_shape=tv_img_shape, img_shm_name=tv_img_shm.name, + return_state_data=True, return_hand_rot_data = False) - # end-effector - if args.ee == "dex3": - left_hand_pos_array = Array('d', 75, lock = True) # [input] - right_hand_pos_array = Array('d', 75, lock = True) # [input] - dual_hand_data_lock = Lock() - dual_hand_state_array = Array('d', 14, lock = False) # [output] current left, right hand state(14) data. - dual_hand_action_array = Array('d', 14, lock = False) # [output] current left, right hand action(14) data. - hand_ctrl = Dex3_1_Controller(left_hand_pos_array, right_hand_pos_array, dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array, simulation_mode=args.sim) - elif args.ee == "dex1": - left_gripper_value = Value('d', 0.0, lock=True) # [input] - right_gripper_value = Value('d', 0.0, lock=True) # [input] - dual_gripper_data_lock = Lock() - dual_gripper_state_array = Array('d', 2, lock=False) # current left, right gripper state(2) data. - dual_gripper_action_array = Array('d', 2, lock=False) # current left, right gripper action(2) data. - gripper_ctrl = Dex1_1_Gripper_Controller(left_gripper_value, right_gripper_value, dual_gripper_data_lock, dual_gripper_state_array, dual_gripper_action_array, simulation_mode=args.sim) - elif args.ee == "inspire1": - left_hand_pos_array = Array('d', 75, lock = True) # [input] - right_hand_pos_array = Array('d', 75, lock = True) # [input] - dual_hand_data_lock = Lock() - dual_hand_state_array = Array('d', 12, lock = False) # [output] current left, right hand state(12) data. - dual_hand_action_array = Array('d', 12, lock = False) # [output] current left, right hand action(12) data. - hand_ctrl = Inspire_Controller(left_hand_pos_array, right_hand_pos_array, dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array, simulation_mode=args.sim) - elif args.ee == "brainco": - left_hand_pos_array = Array('d', 75, lock = True) # [input] - right_hand_pos_array = Array('d', 75, lock = True) # [input] - dual_hand_data_lock = Lock() - dual_hand_state_array = Array('d', 12, lock = False) # [output] current left, right hand state(12) data. - dual_hand_action_array = Array('d', 12, lock = False) # [output] current left, right hand action(12) data. - hand_ctrl = Brainco_Controller(left_hand_pos_array, right_hand_pos_array, dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array, simulation_mode=args.sim) - else: - pass + # arm + if args.arm == "G1_29": + arm_ik = G1_29_ArmIK() + arm_ctrl = G1_29_ArmController(motion_mode=args.motion, simulation_mode=args.sim) + elif args.arm == "G1_23": + arm_ik = G1_23_ArmIK() + arm_ctrl = G1_23_ArmController(motion_mode=args.motion, simulation_mode=args.sim) + elif args.arm == "H1_2": + arm_ik = H1_2_ArmIK() + arm_ctrl = H1_2_ArmController(motion_mode=args.motion, simulation_mode=args.sim) + elif args.arm == "H1": + arm_ik = H1_ArmIK() + arm_ctrl = H1_ArmController(simulation_mode=args.sim) - # simulation mode - if args.sim: - reset_pose_publisher = ChannelPublisher("rt/reset_pose/cmd", String_) - reset_pose_publisher.Init() - from teleop.utils.sim_state_topic import start_sim_state_subscribe - sim_state_subscriber = start_sim_state_subscribe() + # end-effector + if args.ee == "dex3": + left_hand_pos_array = Array('d', 75, lock = True) # [input] + right_hand_pos_array = Array('d', 75, lock = True) # [input] + dual_hand_data_lock = Lock() + dual_hand_state_array = Array('d', 14, lock = False) # [output] current left, right hand state(14) data. + dual_hand_action_array = Array('d', 14, lock = False) # [output] current left, right hand action(14) data. + hand_ctrl = Dex3_1_Controller(left_hand_pos_array, right_hand_pos_array, dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array, simulation_mode=args.sim) + elif args.ee == "dex1": + left_gripper_value = Value('d', 0.0, lock=True) # [input] + right_gripper_value = Value('d', 0.0, lock=True) # [input] + dual_gripper_data_lock = Lock() + dual_gripper_state_array = Array('d', 2, lock=False) # current left, right gripper state(2) data. + dual_gripper_action_array = Array('d', 2, lock=False) # current left, right gripper action(2) data. + gripper_ctrl = Dex1_1_Gripper_Controller(left_gripper_value, right_gripper_value, dual_gripper_data_lock, dual_gripper_state_array, dual_gripper_action_array, simulation_mode=args.sim) + elif args.ee == "inspire1": + left_hand_pos_array = Array('d', 75, lock = True) # [input] + right_hand_pos_array = Array('d', 75, lock = True) # [input] + dual_hand_data_lock = Lock() + dual_hand_state_array = Array('d', 12, lock = False) # [output] current left, right hand state(12) data. + dual_hand_action_array = Array('d', 12, lock = False) # [output] current left, right hand action(12) data. + hand_ctrl = Inspire_Controller(left_hand_pos_array, right_hand_pos_array, dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array, simulation_mode=args.sim) + elif args.ee == "brainco": + left_hand_pos_array = Array('d', 75, lock = True) # [input] + right_hand_pos_array = Array('d', 75, lock = True) # [input] + dual_hand_data_lock = Lock() + dual_hand_state_array = Array('d', 12, lock = False) # [output] current left, right hand state(12) data. + dual_hand_action_array = Array('d', 12, lock = False) # [output] current left, right hand action(12) data. + hand_ctrl = Brainco_Controller(left_hand_pos_array, right_hand_pos_array, dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array, simulation_mode=args.sim) + else: + pass + + # affinity mode (if you dont know what it is, then you probably don't need it) + if args.affinity: + import psutil + p = psutil.Process(os.getpid()) + p.cpu_affinity([0,1,2,3]) # Set CPU affinity to cores 0-3 + try: + p.nice(-20) # Set highest priority + logger_mp.info("Set high priority successfully.") + except psutil.AccessDenied: + logger_mp.warning("Failed to set high priority. Please run as root.") + + for child in p.children(recursive=True): + try: + logger_mp.info(f"Child process {child.pid} name: {child.name()}") + child.cpu_affinity([5,6]) + child.nice(-20) + except psutil.AccessDenied: + pass + + # simulation mode + if args.sim: + reset_pose_publisher = ChannelPublisher("rt/reset_pose/cmd", String_) + reset_pose_publisher.Init() + from teleop.utils.sim_state_topic import start_sim_state_subscribe + sim_state_subscriber = start_sim_state_subscribe() - # controller + motion mode - if args.xr_mode == "controller" and args.motion: - from unitree_sdk2py.g1.loco.g1_loco_client import LocoClient - sport_client = LocoClient() - sport_client.SetTimeout(0.0001) - sport_client.Init() - - # record + headless mode - if args.record and args.headless: - recorder = EpisodeWriter(task_dir = args.task_dir + args.task_name, task_goal = args.task_goal, frequency = args.frequency, rerun_log = False) - elif args.record and not args.headless: - recorder = EpisodeWriter(task_dir = args.task_dir + args.task_name, task_goal = args.task_goal, frequency = args.frequency, rerun_log = True) + # controller + motion mode + if args.xr_mode == "controller" and args.motion: + from unitree_sdk2py.g1.loco.g1_loco_client import LocoClient + sport_client = LocoClient() + sport_client.SetTimeout(0.0001) + sport_client.Init() - try: + # record + headless mode + if args.record and args.headless: + recorder = EpisodeWriter(task_dir = args.task_dir + args.task_name, task_goal = args.task_desc, frequency = args.frequency, rerun_log = False) + elif args.record and not args.headless: + recorder = EpisodeWriter(task_dir = args.task_dir + args.task_name, task_goal = args.task_desc, frequency = args.frequency, rerun_log = True) + + logger_mp.info("Please enter the start signal (enter 'r' to start the subsequent program)") - while not start_signal: + while not START and not STOP: time.sleep(0.01) + logger_mp.info("start program.") arm_ctrl.speed_gradual_max() - while running: + while not STOP: start_time = time.time() if not args.headless: tv_resized_image = cv2.resize(tv_img_array, (tv_img_shape[1] // 2, tv_img_shape[0] // 2)) - if args.sim: - wrist_resized_image = cv2.resize(wrist_img_array, (wrist_img_shape[1] // 2, wrist_img_shape[0] // 2)) - tv_resized_image = np.concatenate([tv_resized_image, wrist_resized_image], axis=1) cv2.imshow("record image", tv_resized_image) + # opencv GUI communication key = cv2.waitKey(1) & 0xFF if key == ord('q'): - stop_listening() - running = False + STOP = True if args.sim: publish_reset_category(2, reset_pose_publisher) elif key == ord('s'): - should_toggle_recording = True + RECORD_TOGGLE = True elif key == ord('a'): if args.sim: publish_reset_category(2, reset_pose_publisher) - if args.record and should_toggle_recording: - should_toggle_recording = False - if not is_recording: + if args.record and RECORD_TOGGLE: + RECORD_TOGGLE = False + if not RECORD_RUNNING: if recorder.create_episode(): - is_recording = True + RECORD_RUNNING = True else: logger_mp.error("Failed to create episode. Recording not started.") else: - is_recording = False + RECORD_RUNNING = False recorder.save_episode() if args.sim: publish_reset_category(1, reset_pose_publisher) @@ -264,8 +312,7 @@ if __name__ == '__main__': if args.xr_mode == "controller" and args.motion: # quit teleoperate if tele_data.tele_state.right_aButton: - stop_listening() - running = False + STOP = True # command robot to enter damping mode. soft emergency stop function if tele_data.tele_state.left_thumbstick_state and tele_data.tele_state.right_thumbstick_state: sport_client.Damp() @@ -287,6 +334,7 @@ if __name__ == '__main__': # record data if args.record: + RECORD_READY = recorder.is_ready() # dex hand or gripper if args.ee == "dex3" and args.xr_mode == "hand": with dual_hand_data_lock: @@ -339,7 +387,7 @@ if __name__ == '__main__': right_arm_state = current_lr_arm_q[-7:] left_arm_action = sol_q[:7] right_arm_action = sol_q[-7:] - if is_recording: + if RECORD_RUNNING: colors = {} depths = {} if BINOCULAR: @@ -418,6 +466,7 @@ if __name__ == '__main__': except KeyboardInterrupt: logger_mp.info("KeyboardInterrupt, exiting program...") finally: + stop_listening() arm_ctrl.ctrl_dual_arm_go_home() if args.sim: sim_state_subscriber.stop_subscribe() @@ -428,6 +477,8 @@ if __name__ == '__main__': wrist_img_shm.unlink() if args.record: recorder.close() + if args.ipc: + ipc_server.stop() listen_keyboard_thread.join() - logger_mp.info("Finally, exiting program...") + logger_mp.info("Finally, exiting program.") exit(0) diff --git a/teleop/utils/episode_writer.py b/teleop/utils/episode_writer.py index 8955d10..acc78e8 100644 --- a/teleop/utils/episode_writer.py +++ b/teleop/utils/episode_writer.py @@ -34,8 +34,6 @@ class EpisodeWriter(): self.rerun_logger = RerunLogger(prefix="online/", IdxRangeBoundary = 60, memory_limit = "300MB") logger_mp.info("==> RerunLogger initializing ok.\n") - self.data = {} - self.episode_data = [] self.item_id = -1 self.episode_id = -1 if os.path.exists(self.task_dir): @@ -57,6 +55,9 @@ class EpisodeWriter(): self.worker_thread.start() logger_mp.info("==> EpisodeWriter initialized successfully.\n") + + def is_ready(self): + return self.is_available def data_info(self, version='1.0.0', date=None, author=None): self.info = { @@ -96,7 +97,6 @@ class EpisodeWriter(): # Reset episode-related data and create necessary directories self.item_id = -1 - self.episode_data = [] self.episode_id = self.episode_id + 1 self.episode_dir = os.path.join(self.task_dir, f"episode_{str(self.episode_id).zfill(4)}") @@ -108,6 +108,13 @@ class EpisodeWriter(): os.makedirs(self.color_dir, exist_ok=True) os.makedirs(self.depth_dir, exist_ok=True) os.makedirs(self.audio_dir, exist_ok=True) + with open(self.json_path, "w", encoding="utf-8") as f: + f.write('{\n') + f.write('"info": ' + json.dumps(self.info, ensure_ascii=False, indent=4) + ',\n') + f.write('"text": ' + json.dumps(self.text, ensure_ascii=False, indent=4) + ',\n') + f.write('"data": [\n') + self.first_item = True # Flag to handle commas in JSON array + if self.rerun_log: self.online_logger = RerunLogger(prefix="online/", IdxRangeBoundary = 60, memory_limit="300MB") @@ -179,7 +186,11 @@ class EpisodeWriter(): item_data['audios'][mic] = os.path.join('audios', audio_name) # Update episode data - self.episode_data.append(item_data) + with open(self.json_path, "a", encoding="utf-8") as f: + if not self.first_item: + f.write(",\n") + f.write(json.dumps(item_data, ensure_ascii=False, indent=4)) + self.first_item = False # Log data if necessary if self.rerun_log: @@ -198,11 +209,9 @@ class EpisodeWriter(): """ Save the episode data to a JSON file. """ - self.data['info'] = self.info - self.data['text'] = self.text - self.data['data'] = self.episode_data - with open(self.json_path, 'w', encoding='utf-8') as jsonf: - jsonf.write(json.dumps(self.data, indent=4, ensure_ascii=False)) + with open(self.json_path, "a", encoding="utf-8") as f: + f.write("\n]\n}") # Close the JSON array and object + self.need_save = False # Reset the save flag self.is_available = True # Mark the class as available after saving logger_mp.info(f"==> Episode saved successfully to {self.json_path}.") diff --git a/teleop/utils/ipc.py b/teleop/utils/ipc.py new file mode 100644 index 0000000..033ef92 --- /dev/null +++ b/teleop/utils/ipc.py @@ -0,0 +1,419 @@ +import os +import zmq +import time +import threading +import logging_mp +logging_mp.basic_config(level=logging_mp.INFO) +logger_mp = logging_mp.get_logger(__name__) + +""" +# Client → Server (Request) +1) launch + { + "reqid": unique id, + "cmd": "CMD_START" + } + +2) exit + { + "reqid": unique id, + "cmd": "CMD_STOP" + } + +3) start or stop (record toggle) + { + "reqid": unique id, + "cmd": "CMD_RECORD_TOGGLE", + "info": { # optional + "task_name": "T001", + "task_desc": "pick and place apple to basket", + "item_id": 1 + } + } + +# Server → Client (Reply) +1) if ok + { + "repid": same as reqid, + "status": "ok", + "msg": "ok" + } +2) if error + { + "repid": same as reqid | 0 | 1, # 0: no reqid provided, 1: internal error + "status": "error", + "msg": "reqid not provided" + | "cmd not provided" + | "cmd not supported: {cmd}" + | "info missing keys: {missing_keys}" + | "internal error msg" + } + +# Heartbeat (PUB) +- Heartbeat Pub format: + { + "START": True | False, # whether robot follow vr + "STOP" : True | False, # whether exit program + "RECORD_RUNNING": True | False, # whether is recording + "RECORD_READY": True | False, # whether ready to record + } +""" + + +class IPC_Server: + """ + Inter - Process Communication Server: + - Handle data via REP + - Publish heartbeat via PUB, Heartbeat state is provided by external callback get_state() + """ + # Mapping table for on_press keys + cmd_map = { + "CMD_START": "r", # launch + "CMD_STOP": "q", # exit + "CMD_RECORD_TOGGLE": "s", # start & stop (toggle record) + } + + def __init__(self, on_press=None, on_info=None, get_state=None, hb_fps=10.0): + """ + Args: + on_press : callback(cmd:str), called for every command + on_info : callback(data:dict), only handle CMD_RECORD_TOGGLE's task info + hb_fps : heartbeat publish frequency + get_state : callback() -> dict, provides current heartbeat state + """ + if callable(on_press): + self.on_press = on_press + else: + raise ValueError("[IPC_Server] on_press callback function must be provided") + self.on_info = on_info + if callable(get_state): + self.get_state = get_state + else: + raise ValueError("[IPC_Server] get_state callback function must be provided") + self._hb_interval = 1.0 / float(hb_fps) + self._running = True + self._data_loop_thread = None + self._hb_loop_thread = None + + rd = os.environ.get("XDG_RUNTIME_DIR") or "/tmp" + self.ctx = zmq.Context.instance() + # data IPC (REQ/REP): required + self.data_ipc = os.path.join(rd, f"xr-teleoperate-data-{os.getuid()}.ipc") + self.rep_socket = self.ctx.socket(zmq.REP) + try: + if os.path.exists(self.data_ipc): + os.unlink(self.data_ipc) # remove stale IPC file + except OSError: + pass + self.rep_socket.bind(f"ipc://{self.data_ipc}") + logger_mp.info(f"[IPC_Server] Listening to Data at ipc://{self.data_ipc}") + + # heartbeat IPC (PUB/SUB) + self.hb_ipc = os.path.join(rd, f"xr-teleoperate-hb-{os.getuid()}.ipc") + self.pub_socket = self.ctx.socket(zmq.PUB) + try: + if os.path.exists(self.hb_ipc): + os.unlink(self.hb_ipc) # remove stale IPC file + except OSError: + pass + self.pub_socket.bind(f"ipc://{self.hb_ipc}") + logger_mp.info(f"[IPC_Server] Publishing HeartBeat at ipc://{self.hb_ipc}") + + def _data_loop(self): + """ + Listen for REQ/REP commands and optional info. + """ + poller = zmq.Poller() + poller.register(self.rep_socket, zmq.POLLIN) + while self._running: + try: + socks = dict(poller.poll(20)) + if self.rep_socket in socks: + msg = self.rep_socket.recv_json() + reply = self._handle_message(msg) + try: + self.rep_socket.send_json(reply) + except Exception as e: + logger_mp.error(f"[IPC_Server] Failed to send reply: {e}") + finally: + logger_mp.debug(f"[IPC_Server] DATA recv: {msg} -> rep: {reply}") + except zmq.error.ContextTerminated: + break + except Exception as e: + logger_mp.error(f"[IPC_Server] Data loop exception: {e}") + + def _hb_loop(self): + """Publish heartbeat periodically""" + while self._running: + start_time = time.monotonic() + try: + state = dict(self.get_state() or {}) + self.pub_socket.send_json(state) + logger_mp.debug(f"[IPC_Server] HB pub: {state}") + except Exception as e: + logger_mp.error(f"[IPC_Server] HeartBeat loop exception: {e}") + elapsed = time.monotonic() - start_time + if elapsed < self._hb_interval: + time.sleep(self._hb_interval - elapsed) + + def _handle_message(self, msg: dict) -> dict: + """Process message and return reply""" + try: + # validate reqid + reqid = msg.get("reqid", None) + if not reqid: + return {"repid": 0, "status": "error", "msg": "reqid not provided"} + + # validate cmd + cmd = msg.get("cmd", None) + if not cmd: + return {"repid": reqid, "status": "error", "msg": "cmd not provided"} + + # unsupported cmd + if cmd not in self.cmd_map: + return {"repid": reqid, "status": "error", "msg": f"cmd not supported: {cmd}"} + + # CMD_RECORD_TOGGLE: optional info + if cmd == "CMD_RECORD_TOGGLE": + info = msg.get("info", None) + if info: + required_keys = ["task_name", "task_desc", "item_id"] + missing_keys = [key for key in required_keys if key not in info] + if missing_keys: + return {"repid": reqid, "status": "error", "msg": f"info missing keys: {missing_keys}"} + else: + if self.on_info: + self.on_info(info) + logger_mp.debug(f"[IPC_Server] on_info called with info: {info}") + else: + logger_mp.warning("[IPC_Server] No on_info provided") + else: + logger_mp.warning("[IPC_Server] No info provided with cmd: CMD_RECORD_TOGGLE") + + # supported cmd path + self.on_press(self.cmd_map[cmd]) + return {"repid": reqid, "status": "ok", "msg": "ok"} + + except Exception as e: + return {"repid": 1, "status": "error", "msg": str(e)} + + # --------------------------- + # Public API + # --------------------------- + def start(self): + """Start both data loop and heartbeat loop""" + self._data_loop_thread = threading.Thread(target=self._data_loop, daemon=True) + self._data_loop_thread.start() + self._hb_loop_thread = threading.Thread(target=self._hb_loop, daemon=True) + self._hb_loop_thread.start() + + def stop(self): + """Stop server""" + self._running = False + if self._data_loop_thread: + self._data_loop_thread.join(timeout=1.0) + if self._hb_loop_thread: + self._hb_loop_thread.join(timeout=1.0) + try: + self.rep_socket.setsockopt(zmq.LINGER, 0) + self.rep_socket.close() + except Exception: + pass + try: + self.pub_socket.setsockopt(zmq.LINGER, 0) + self.pub_socket.close() + except Exception: + pass + try: + self.ctx.term() + except Exception: + pass + + +class IPC_Client: + """ + Inter - Process Communication Client: + - Send command/info via REQ + - Subscribe heartbeat via SUB + """ + def __init__(self, hb_fps=10.0): + """hb_fps: heartbeat subscribe frequency, should match server side.""" + rd = os.environ.get("XDG_RUNTIME_DIR") or "/tmp" + self.ctx = zmq.Context.instance() + + # heartbeat IPC (PUB/SUB) + self._hb_running = True + self._hb_last_time = 0 # timestamp of last heartbeat received + self._hb_latest_state = {} # latest heartbeat state + self._hb_online = False # whether heartbeat is online + self._hb_interval = 1.0 / float(hb_fps) # expected heartbeat interval + self._hb_lock = threading.Lock() # lock for heartbeat state + self._hb_timeout = 5.0 * self._hb_interval # timeout to consider offline + self.hb_ipc = os.path.join(rd, f"xr-teleoperate-hb-{os.getuid()}.ipc") + self.sub_socket = self.ctx.socket(zmq.SUB) + self.sub_socket.setsockopt(zmq.RCVHWM, 1) + self.sub_socket.connect(f"ipc://{self.hb_ipc}") + self.sub_socket.setsockopt_string(zmq.SUBSCRIBE, "") + logger_mp.info(f"[IPC_Client] Subscribed to HeartBeat at ipc://{self.hb_ipc}") + self._hb_thread = threading.Thread(target=self._hb_loop, daemon=True) + self._hb_thread.start() + + # data IPC (REQ/REP) + self.data_ipc = os.path.join(rd, f"xr-teleoperate-data-{os.getuid()}.ipc") + self.req_socket = self.ctx.socket(zmq.REQ) + self.req_socket.connect(f"ipc://{self.data_ipc}") + logger_mp.info(f"[IPC_Client] Connected to Data at ipc://{self.data_ipc}") + + def _make_reqid(self) -> str: + import uuid + return str(uuid.uuid4()) + + # --------------------------- + # Heartbeat handling + # --------------------------- + def _hb_loop(self): + consecutive = 0 + while self._hb_running: + start_time = time.monotonic() + try: + msg = self.sub_socket.recv_json(flags=zmq.NOBLOCK) + with self._hb_lock: + self._hb_latest_state = msg + self._hb_last_time = time.monotonic() + consecutive += 1 + if consecutive >= 3: # require 3 consecutive heartbeats to be considered online + self._hb_online = True + except zmq.Again: + with self._hb_lock: + if self._hb_last_time > 0: + if self._hb_online and (time.monotonic() - self._hb_last_time > self._hb_timeout): + self._hb_latest_state = {} + self._hb_last_time = 0 + self._hb_online = False + consecutive = 0 + logger_mp.warning("[IPC_Client] HeartBeat timeout -> OFFLINE") + except Exception as e: + logger_mp.error(f"[IPC_Client] HB loop exception: {e}") + elapsed = time.monotonic() - start_time + if elapsed < self._hb_interval: + time.sleep(self._hb_interval - elapsed) + + # --------------------------- + # Public API + # --------------------------- + def send_data(self, cmd: str, info: dict = None) -> dict: + """Send command to server and wait reply""" + reqid = self._make_reqid() + if not self.is_online(): + logger_mp.warning(f"[IPC_Client] Cannot send {cmd}, server offline (no heartbeat)") + return {"repid": reqid, "status": "error", "msg": "server offline (no heartbeat)"} + + msg = {"reqid": reqid, "cmd": cmd} + if cmd == "CMD_RECORD_TOGGLE" and info: + msg["info"] = info + try: + self.req_socket.send_json(msg) + # wait up to 1s for reply + if self.req_socket.poll(1000): + reply = self.req_socket.recv_json() + else: + return {"repid": reqid, "status": "error", "msg": "timeout waiting for server reply"} + except Exception as e: + logger_mp.error(f"[IPC_Client] send_data failed: {e}") + return {"repid": reqid, "status": "error", "msg": str(e)} + + if reply.get("status") != "ok": + return reply + if reply.get("repid") != reqid: + return {"repid": reqid, "status": "error", "msg": f"reply id mismatch: expected {reqid}, got {reply.get('repid')}"} + return reply + + def is_online(self) -> bool: + with self._hb_lock: + return self._hb_online + + def latest_state(self) -> dict: + with self._hb_lock: + return dict(self._hb_latest_state) + + def stop(self): + self._hb_running = False + if self._hb_thread: + self._hb_thread.join(timeout=1.0) + try: + self.req_socket.setsockopt(zmq.LINGER, 0) + self.req_socket.close() + except Exception: + pass + try: + self.sub_socket.setsockopt(zmq.LINGER, 0) + self.sub_socket.close() + except Exception: + pass + try: + self.ctx.term() + except Exception: + pass + + +# --------------------------- +# Client Example usage +# --------------------------- +if __name__ == "__main__": + from sshkeyboard import listen_keyboard, stop_listening + client = None + + def on_press(key: str): + global client + if client is None: + logger_mp.warning("⚠️ Client not initialized, ignoring key press") + return + + if key == "r": + logger_mp.info("▶️ Sending launch command...") + rep = client.send_data("CMD_START") + logger_mp.info("Reply: %s", rep) + + elif key == "s": + info = { + "task_name": "T003", + "task_desc": "pick and place pear.", + "item_id": 1, + } + logger_mp.info("⏺️ Sending record toggle command...") + rep = client.send_data("CMD_RECORD_TOGGLE", info=info) # optional info + logger_mp.info("Reply: %s", rep) + + + elif key == "q": + logger_mp.info("⏹️ Sending exit command...") + rep = client.send_data("CMD_STOP") + logger_mp.info("Reply: %s", rep) + + elif key == "b": + if client.is_online(): + state = client.latest_state() + logger_mp.info(f"[HEARTBEAT] Current heartbeat: {state}") + else: + logger_mp.warning("[HEARTBEAT] No heartbeat received (OFFLINE)") + + else: + logger_mp.warning(f"⚠️ Undefined key: {key}") + + # Initialize client + client = IPC_Client(hb_fps=10.0) + + # Start keyboard listening thread + listen_keyboard_thread = threading.Thread(target=listen_keyboard, kwargs={"on_press": on_press, "until": None, "sequential": False}, daemon=True) + listen_keyboard_thread.start() + + logger_mp.info("✅ Client started, waiting for keyboard input:\n [r] launch, [s] start/stop record, [b] heartbeat, [q] exit") + try: + while True: + time.sleep(1.0) + except KeyboardInterrupt: + logger_mp.info("⏹️ User interrupt, preparing to exit...") + finally: + stop_listening() + client.stop() + logger_mp.info("✅ Client exited")