From 66c2c7c6ebef4df52fb4167f41ff8f590ad31e2a Mon Sep 17 00:00:00 2001 From: silencht Date: Mon, 27 Oct 2025 21:09:22 +0800 Subject: [PATCH] [update] pass-through mode, webrtc mode, fix bugs, fix ipc mode, etc. --- teleop/robot_control/robot_hand_unitree.py | 2 +- teleop/teleimager | 2 +- teleop/teleop_hand_and_arm.py | 164 +++++++++++---------- teleop/televuer | 2 +- teleop/utils/episode_writer.py | 8 +- teleop/utils/ipc.py | 45 +----- 6 files changed, 99 insertions(+), 124 deletions(-) diff --git a/teleop/robot_control/robot_hand_unitree.py b/teleop/robot_control/robot_hand_unitree.py index 2f18b25..8cb9e57 100644 --- a/teleop/robot_control/robot_hand_unitree.py +++ b/teleop/robot_control/robot_hand_unitree.py @@ -419,7 +419,7 @@ if __name__ == "__main__": if not img_client.has_head_cam(): logger_mp.error("Head camera is required. Please enable head camera on the image server side.") head_img_shape = img_client.get_head_shape() - tv_binocular = img_client.is_binocular() + tv_binocular = img_client.head_is_binocular() # television: obtain hand pose data from the XR device and transmit the robot's head camera image to the XR device. tv_wrapper = TeleVuerWrapper(binocular=tv_binocular, use_hand_tracking=args.xr_mode == "hand", img_shape=head_img_shape, return_hand_rot_data = False) diff --git a/teleop/teleimager b/teleop/teleimager index 8e31bbb..f81e0c6 160000 --- a/teleop/teleimager +++ b/teleop/teleimager @@ -1 +1 @@ -Subproject commit 8e31bbbceb45ed0eeeae274eac0aa928959307e4 +Subproject commit f81e0c6b234698661831134c6b8aba4f1a3ee709 diff --git a/teleop/teleop_hand_and_arm.py b/teleop/teleop_hand_and_arm.py index f83fb44..1a78353 100644 --- a/teleop/teleop_hand_and_arm.py +++ b/teleop/teleop_hand_and_arm.py @@ -27,21 +27,29 @@ from sshkeyboard import listen_keyboard, stop_listening # for simulation from unitree_sdk2py.core.channel import ChannelPublisher from unitree_sdk2py.idl.std_msgs.msg.dds_ import String_ -def publish_reset_category(category: int,publisher): # Scene Reset signal +def publish_reset_category(category: int, publisher): # Scene Reset signal msg = String_(data=str(category)) publisher.Write(msg) logger_mp.info(f"published reset category: {category}") # state transition -START = False # Enable to start robot following VR user motion +START = False # Enable to start robot following VR user motion STOP = False # Enable to begin system exit procedure -RECORD_TOGGLE = False # [Ready] ⇄ [Recording] ⟶ [AutoSave] ⟶ [Ready] (⇄ manual) (⟶ auto) +READY = False # Ready to (1) enter START state, (2) enter RECORD_RUNNING state RECORD_RUNNING = False # True if [Recording] -RECORD_READY = False # True if [Ready], False if [Recording] or [AutoSave] -# task info -TASK_NAME = None -TASK_DESC = None -ITEM_ID = None +RECORD_TOGGLE = False # Toggle recording state +# ------- --------- ----------- ----------- --------- +# state [Ready] ==> [Recording] ==> [AutoSave] --> [Ready] +# ------- --------- | ----------- | ----------- | --------- +# START True |manual True |manual True | True +# READY True |set False |set False |auto True +# RECORD_RUNNING False |to True |to False | False +# ∨ ∨ ∨ +# RECORD_TOGGLE False True False True False False +# ------- --------- ----------- ----------- --------- +# ==> manual: when READY is True, set RECORD_TOGGLE=True to transition. +# --> auto : Auto-transition after saving data. + def on_press(key): global STOP, START, RECORD_TOGGLE if key == 'r': @@ -54,27 +62,19 @@ def on_press(key): else: logger_mp.warning(f"[on_press] {key} was pressed, but no action is defined for this key.") -def on_info(info): - """Only handle CMD_TOGGLE_RECORD's task info""" - global TASK_NAME, TASK_DESC, ITEM_ID - TASK_NAME = info.get("task_name") - TASK_DESC = info.get("task_desc") - ITEM_ID = info.get("item_id") - logger_mp.debug(f"[on_info] Updated globals: {TASK_NAME}, {TASK_DESC}, {ITEM_ID}") - def get_state() -> dict: """Return current heartbeat state""" - global START, STOP, RECORD_RUNNING, RECORD_READY + global START, STOP, RECORD_RUNNING, READY return { "START": START, "STOP": STOP, + "READY": READY, "RECORD_RUNNING": RECORD_RUNNING, - "RECORD_READY": RECORD_READY, } if __name__ == '__main__': parser = argparse.ArgumentParser() - parser.add_argument('--frequency', type = float, default = 30.0, help = 'save data\'s frequency') + parser.add_argument('--frequency', type = float, default = 30.0, help = 'save and control \'s frequency') # basic control parameters parser.add_argument('--xr-mode', type=str, choices=['hand', 'controller'], default='hand', help='Select XR device tracking source') @@ -84,12 +84,17 @@ if __name__ == '__main__': parser.add_argument('--motion', action = 'store_true', help = 'Enable motion control mode') parser.add_argument('--headless', action='store_true', help='Enable headless mode (no display)') parser.add_argument('--sim', action = 'store_true', help = 'Enable isaac simulation mode') - parser.add_argument('--affinity', action = 'store_true', help = 'Enable high priority and set CPU affinity') + parser.add_argument('--affinity', action = 'store_true', help = 'Enable high priority and set CPU affinity mode') parser.add_argument('--ipc', action = 'store_true', help = 'Enable IPC server to handle input; otherwise enable sshkeyboard') - parser.add_argument('--record', action = 'store_true', help = 'Enable data recording') + parser.add_argument('--pass-through', action='store_true', help='Enable passthrough mode (use real-world view in XR device)') + parser.add_argument('--img-server-ip', type=str, default='127.0.0.1', help='IP address of image server') + # record mode and task info + parser.add_argument('--record', action = 'store_true', help = 'Enable data recording mode') parser.add_argument('--task-dir', type = str, default = './utils/data/', help = 'path to save data') - parser.add_argument('--task-name', type = str, default = 'pick cube', help = 'task name for recording') - parser.add_argument('--task-desc', type = str, default = 'e.g. pick the red cube on the table.', help = 'task goal for recording') + parser.add_argument('--task-name', type = str, default = 'pick cube', help = 'task file name for recording') + parser.add_argument('--task-goal', type = str, default = 'pick up cube.', help = 'task goal for recording at json file') + parser.add_argument('--task-desc', type = str, default = 'task description', help = 'task description for recording at json file') + parser.add_argument('--task-steps', type = str, default = 'step1: do this; step2: do that;', help = 'task steps for recording at json file') args = parser.parse_args() logger_mp.info(f"args: {args}") @@ -97,23 +102,26 @@ if __name__ == '__main__': try: # ipc communication mode. client usage: see utils/ipc.py if args.ipc: - ipc_server = IPC_Server(on_press=on_press, on_info=on_info, get_state=get_state) + ipc_server = IPC_Server(on_press=on_press,get_state=get_state) ipc_server.start() # sshkeyboard communication mode else: - listen_keyboard_thread = threading.Thread(target=listen_keyboard, kwargs={"on_press": on_press, "until": None, "sequential": False,}, daemon=True) + listen_keyboard_thread = threading.Thread(target=listen_keyboard, + kwargs={"on_press": on_press, "until": None, "sequential": False,}, + daemon=True) listen_keyboard_thread.start() # image client - img_client = ImageClient(host='127.0.0.1')#host='192.168.123.164' - if not img_client.has_head_cam(): - logger_mp.error("Head camera is required. Please enable head camera on the image server side.") - - head_img_shape = img_client.get_head_shape() - tv_binocular = img_client.is_binocular() + img_client = ImageClient(host=args.img_server_ip) + xr_need_local_img = not (args.pass_through or img_client.enable_head_webrtc()) - # television: obtain hand pose data from the XR device and transmit the robot's head camera image to the XR device. - tv_wrapper = TeleVuerWrapper(binocular=tv_binocular, use_hand_tracking=args.xr_mode == "hand", img_shape=head_img_shape, return_hand_rot_data = False) + # televuer_wrapper: obtain hand pose data from the XR device and transmit the robot's head camera image to the XR device. + tv_wrapper = TeleVuerWrapper(use_hand_tracking=args.xr_mode == "hand", + pass_through=args.pass_through, + binocular=img_client.head_is_binocular(), + img_shape=img_client.get_head_shape(), + webrtc=img_client.enable_head_webrtc(), + webrtc_url=img_client.head_webrtc_url()) # arm if args.arm == "G1_29": @@ -136,28 +144,32 @@ if __name__ == '__main__': dual_hand_data_lock = Lock() dual_hand_state_array = Array('d', 14, lock = False) # [output] current left, right hand state(14) data. dual_hand_action_array = Array('d', 14, lock = False) # [output] current left, right hand action(14) data. - hand_ctrl = Dex3_1_Controller(left_hand_pos_array, right_hand_pos_array, dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array, simulation_mode=args.sim) + hand_ctrl = Dex3_1_Controller(left_hand_pos_array, right_hand_pos_array, dual_hand_data_lock, + dual_hand_state_array, dual_hand_action_array, simulation_mode=args.sim) elif args.ee == "dex1": left_gripper_value = Value('d', 0.0, lock=True) # [input] right_gripper_value = Value('d', 0.0, lock=True) # [input] dual_gripper_data_lock = Lock() dual_gripper_state_array = Array('d', 2, lock=False) # current left, right gripper state(2) data. dual_gripper_action_array = Array('d', 2, lock=False) # current left, right gripper action(2) data. - gripper_ctrl = Dex1_1_Gripper_Controller(left_gripper_value, right_gripper_value, dual_gripper_data_lock, dual_gripper_state_array, dual_gripper_action_array, simulation_mode=args.sim) + gripper_ctrl = Dex1_1_Gripper_Controller(left_gripper_value, right_gripper_value, dual_gripper_data_lock, + dual_gripper_state_array, dual_gripper_action_array, simulation_mode=args.sim) elif args.ee == "inspire1": left_hand_pos_array = Array('d', 75, lock = True) # [input] right_hand_pos_array = Array('d', 75, lock = True) # [input] dual_hand_data_lock = Lock() dual_hand_state_array = Array('d', 12, lock = False) # [output] current left, right hand state(12) data. dual_hand_action_array = Array('d', 12, lock = False) # [output] current left, right hand action(12) data. - hand_ctrl = Inspire_Controller(left_hand_pos_array, right_hand_pos_array, dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array, simulation_mode=args.sim) + hand_ctrl = Inspire_Controller(left_hand_pos_array, right_hand_pos_array, dual_hand_data_lock, + dual_hand_state_array, dual_hand_action_array, simulation_mode=args.sim) elif args.ee == "brainco": left_hand_pos_array = Array('d', 75, lock = True) # [input] right_hand_pos_array = Array('d', 75, lock = True) # [input] dual_hand_data_lock = Lock() dual_hand_state_array = Array('d', 12, lock = False) # [output] current left, right hand state(12) data. dual_hand_action_array = Array('d', 12, lock = False) # [output] current left, right hand action(12) data. - hand_ctrl = Brainco_Controller(left_hand_pos_array, right_hand_pos_array, dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array, simulation_mode=args.sim) + hand_ctrl = Brainco_Controller(left_hand_pos_array, right_hand_pos_array, dual_hand_data_lock, + dual_hand_state_array, dual_hand_action_array, simulation_mode=args.sim) else: pass @@ -167,7 +179,7 @@ if __name__ == '__main__': p = psutil.Process(os.getpid()) p.cpu_affinity([0,1,2,3]) # Set CPU affinity to cores 0-3 try: - p.nice(-20) # Set highest priority + p.nice(-20) # Set highest priority logger_mp.info("Set high priority successfully.") except psutil.AccessDenied: logger_mp.warning("Failed to set high priority. Please run as root.") @@ -195,49 +207,39 @@ if __name__ == '__main__': sport_client.Init() # record + headless / non-headless mode - if args.record and args.headless: - recorder = EpisodeWriter(task_dir = args.task_dir + args.task_name, task_goal = args.task_desc, frequency = args.frequency, rerun_log = False) - elif args.record and not args.headless: - recorder = EpisodeWriter(task_dir = args.task_dir + args.task_name, task_goal = args.task_desc, frequency = args.frequency, rerun_log = True) - + if args.record: + recorder = EpisodeWriter(task_dir = os.path.join(args.task_dir, args.task_name), + task_goal = args.task_goal, + task_desc = args.task_desc, + task_steps = args.task_steps, + frequency = args.frequency, + rerun_log = not args.headless) logger_mp.info("Please enter the start signal (enter 'r' to start the subsequent program)") - while not START and not STOP: - # wait for start signal. by the way, get image and send it to xr - head_img, head_img_fps = img_client.get_head_frame() - tv_wrapper.set_display_image(head_img) + READY = True # now ready to (1) enter START state + while not START and not STOP: # wait for start or stop signal. time.sleep(0.033) + if img_client.enable_head_zmq() and xr_need_local_img: + head_img, _ = img_client.get_head_frame() + tv_wrapper.render_to_xr(head_img) + logger_mp.info("start program.") arm_ctrl.speed_gradual_max() - # main loop. robot start to follow VR user's motion while not STOP: start_time = time.time() # get image - head_img, head_img_fps = img_client.get_head_frame() - if img_client.has_left_wrist_cam(): - left_wrist_img, _ = img_client.get_left_wrist_frame() - if img_client.has_right_wrist_cam(): - right_wrist_img, _ = img_client.get_right_wrist_frame() - # send head rimage to xr - tv_wrapper.set_display_image(head_img) - - # non-headless mode - if not args.headless: - resized_image = cv2.resize(head_img, (head_img_shape[1] // 2, head_img_shape[0] // 2)) - cv2.imshow("record image", resized_image) - # opencv GUI communication - key = cv2.waitKey(1) & 0xFF - if key == ord('q'): - START = False - STOP = True - if args.sim: - publish_reset_category(2, reset_pose_publisher) - elif key == ord('s'): - RECORD_TOGGLE = True - elif key == ord('a'): - if args.sim: - publish_reset_category(2, reset_pose_publisher) + if img_client.enable_head_zmq(): + if args.record or xr_need_local_img: + head_img, head_img_fps = img_client.get_head_frame() + if xr_need_local_img: + tv_wrapper.render_to_xr(head_img) + if img_client.enable_left_wrist_zmq(): + if args.record: + left_wrist_img, _ = img_client.get_left_wrist_frame() + if img_client.enable_right_wrist_zmq(): + if args.record: + right_wrist_img, _ = img_client.get_right_wrist_frame() # record mode if args.record and RECORD_TOGGLE: @@ -271,7 +273,7 @@ if __name__ == '__main__': with right_gripper_value.get_lock(): right_gripper_value.value = tele_data.right_hand_pinchValue else: - pass + pass # high level control if args.xr_mode == "controller" and args.motion: @@ -300,7 +302,7 @@ if __name__ == '__main__': # record data if args.record: - RECORD_READY = recorder.is_ready() + READY = recorder.is_ready() # now ready to (2) enter RECORD_RUNNING state # dex hand or gripper if args.ee == "dex3" and args.xr_mode == "hand": with dual_hand_data_lock: @@ -352,18 +354,18 @@ if __name__ == '__main__': if RECORD_RUNNING: colors = {} depths = {} - if tv_binocular: - colors[f"color_{0}"] = head_img[:, :head_img_shape[1]//2] - colors[f"color_{1}"] = head_img[:, head_img_shape[1]//2:] - if img_client.has_left_wrist_cam(): + if img_client.head_is_binocular(): + colors[f"color_{0}"] = head_img[:, :img_client.get_head_shape()[1]//2] + colors[f"color_{1}"] = head_img[:, img_client.get_head_shape()[1]//2:] + if img_client.enable_left_wrist_zmq(): colors[f"color_{2}"] = left_wrist_img - if img_client.has_right_wrist_cam(): + if img_client.enable_right_wrist_zmq(): colors[f"color_{3}"] = right_wrist_img else: colors[f"color_{0}"] = head_img - if img_client.has_left_wrist_cam(): + if img_client.enable_left_wrist_zmq(): colors[f"color_{1}"] = left_wrist_img - if img_client.has_right_wrist_cam(): + if img_client.enable_right_wrist_zmq(): colors[f"color_{2}"] = right_wrist_img states = { "left_arm": { diff --git a/teleop/televuer b/teleop/televuer index cb38fa1..3bc0cad 160000 --- a/teleop/televuer +++ b/teleop/televuer @@ -1 +1 @@ -Subproject commit cb38fa12763ab989d7c1705b9d9a4318f5391170 +Subproject commit 3bc0cad0ce4407c75d0550b5cc61cea9254e0390 diff --git a/teleop/utils/episode_writer.py b/teleop/utils/episode_writer.py index acc78e8..89ec06f 100644 --- a/teleop/utils/episode_writer.py +++ b/teleop/utils/episode_writer.py @@ -11,7 +11,7 @@ import logging_mp logger_mp = logging_mp.get_logger(__name__) class EpisodeWriter(): - def __init__(self, task_dir, task_goal=None, frequency=30, image_size=[640, 480], rerun_log = True): + def __init__(self, task_dir, task_goal=None, task_desc = None, task_steps = None, frequency=30, image_size=[640, 480], rerun_log = True): """ image_size: [width, height] """ @@ -24,6 +24,10 @@ class EpisodeWriter(): } if task_goal is not None: self.text['goal'] = task_goal + if task_desc is not None: + self.text['desc'] = task_desc + if task_steps is not None: + self.text['steps'] = task_steps self.frequency = frequency self.image_size = image_size @@ -37,7 +41,7 @@ class EpisodeWriter(): self.item_id = -1 self.episode_id = -1 if os.path.exists(self.task_dir): - episode_dirs = [episode_dir for episode_dir in os.listdir(self.task_dir) if 'episode_' in episode_dir] + episode_dirs = [episode_dir for episode_dir in os.listdir(self.task_dir) if 'episode_' in episode_dir and not episode_dir.endswith('.zip')] episode_last = sorted(episode_dirs)[-1] if len(episode_dirs) > 0 else None self.episode_id = 0 if episode_last is None else int(episode_last.split('_')[-1]) logger_mp.info(f"==> task_dir directory already exist, now self.episode_id is:{self.episode_id}\n") diff --git a/teleop/utils/ipc.py b/teleop/utils/ipc.py index 5db218d..a2bbaab 100644 --- a/teleop/utils/ipc.py +++ b/teleop/utils/ipc.py @@ -22,12 +22,7 @@ logger_mp = logging_mp.get_logger(__name__, level=logging_mp.INFO) 3) start or stop (record toggle) { "reqid": unique id, - "cmd": "CMD_RECORD_TOGGLE", - "info": { # optional - "task_name": "T001", - "task_desc": "pick and place apple to basket", - "item_id": 1 - } + "cmd": "CMD_RECORD_TOGGLE" } # Server → Client (Reply) @@ -44,7 +39,6 @@ logger_mp = logging_mp.get_logger(__name__, level=logging_mp.INFO) "msg": "reqid not provided" | "cmd not provided" | "cmd not supported: {cmd}" - | "info missing keys: {missing_keys}" | "internal error msg" } @@ -72,19 +66,18 @@ class IPC_Server: "CMD_RECORD_TOGGLE": "s", # start & stop (toggle record) } - def __init__(self, on_press=None, on_info=None, get_state=None, hb_fps=10.0): + def __init__(self, on_press=None, get_state=None, hb_fps=10.0): """ Args: on_press : callback(cmd:str), called for every command - on_info : callback(data:dict), only handle CMD_RECORD_TOGGLE's task info - hb_fps : heartbeat publish frequency get_state : callback() -> dict, provides current heartbeat state + hb_fps : heartbeat publish frequency """ if callable(on_press): self.on_press = on_press else: raise ValueError("[IPC_Server] on_press callback function must be provided") - self.on_info = on_info + if callable(get_state): self.get_state = get_state else: @@ -171,23 +164,6 @@ class IPC_Server: # unsupported cmd if cmd not in self.cmd_map: return {"repid": reqid, "status": "error", "msg": f"cmd not supported: {cmd}"} - - # CMD_RECORD_TOGGLE: optional info - if cmd == "CMD_RECORD_TOGGLE": - info = msg.get("info", None) - if info: - required_keys = ["task_name", "task_desc", "item_id"] - missing_keys = [key for key in required_keys if key not in info] - if missing_keys: - return {"repid": reqid, "status": "error", "msg": f"info missing keys: {missing_keys}"} - else: - if self.on_info: - self.on_info(info) - logger_mp.debug(f"[IPC_Server] on_info called with info: {info}") - else: - logger_mp.warning("[IPC_Server] No on_info provided") - else: - logger_mp.warning("[IPC_Server] No info provided with cmd: CMD_RECORD_TOGGLE") # supported cmd path self.on_press(self.cmd_map[cmd]) @@ -232,7 +208,7 @@ class IPC_Server: class IPC_Client: """ Inter - Process Communication Client: - - Send command/info via REQ + - Send command via REQ - Subscribe heartbeat via SUB """ def __init__(self, hb_fps=10.0): @@ -300,7 +276,7 @@ class IPC_Client: # --------------------------- # Public API # --------------------------- - def send_data(self, cmd: str, info: dict = None) -> dict: + def send_data(self, cmd: str) -> dict: """Send command to server and wait reply""" reqid = self._make_reqid() if not self.is_online(): @@ -308,8 +284,6 @@ class IPC_Client: return {"repid": reqid, "status": "error", "msg": "server offline (no heartbeat)"} msg = {"reqid": reqid, "cmd": cmd} - if cmd == "CMD_RECORD_TOGGLE" and info: - msg["info"] = info try: self.req_socket.send_json(msg) # wait up to 1s for reply @@ -374,13 +348,8 @@ if __name__ == "__main__": logger_mp.info("Reply: %s", rep) elif key == "s": - info = { - "task_name": "T003", - "task_desc": "pick and place pear.", - "item_id": 1, - } logger_mp.info("⏺️ Sending record toggle command...") - rep = client.send_data("CMD_RECORD_TOGGLE", info=info) # optional info + rep = client.send_data("CMD_RECORD_TOGGLE") logger_mp.info("Reply: %s", rep)