diff --git a/.gitignore b/.gitignore index b5a18cc..60afa6f 100644 --- a/.gitignore +++ b/.gitignore @@ -18,4 +18,5 @@ __MACOSX/ !real_left_hand.jpg !real_right_hand.jpg !wrist_and_ring_mount.png -teleop/data/ \ No newline at end of file +teleop/data/ +teleop/utils/episode_0* \ No newline at end of file diff --git a/teleop/image_server/image_client.py b/teleop/image_server/image_client.py index 7b5aea8..3806f4b 100644 --- a/teleop/image_server/image_client.py +++ b/teleop/image_server/image_client.py @@ -160,7 +160,7 @@ class ImageClient: np.copyto(self.tv_img_array, np.array(current_image[:, :self.tv_img_shape[1]])) if self.wrist_enable_shm: - np.copyto(self.wrist_img_array, np.array(current_image[:, self.wrist_img_shape[1]:])) + np.copyto(self.wrist_img_array, np.array(current_image[:, -self.wrist_img_shape[1]:])) if self._image_show: height, width = current_image.shape[:2] diff --git a/teleop/image_server/image_server.py b/teleop/image_server/image_server.py index 02b1642..8328210 100644 --- a/teleop/image_server/image_server.py +++ b/teleop/image_server/image_server.py @@ -8,102 +8,179 @@ import pyrealsense2 as rs class RealSenseCamera(object): - def __init__(self, img_shape, fps, serial_number = None, enable_depth = False) -> None: - self.img_shape = img_shape - self.fps = fps - self.serial_number = serial_number - self.enable_depth = enable_depth + def __init__(self, img_shape, fps, serial_number=None, enable_depth=False) -> None: + """ + img_shape: [height, width] + serial_number: serial number + """ + self.img_shape = img_shape + self.fps = fps + self.serial_number = serial_number + self.enable_depth = enable_depth - align_to = rs.stream.color - self.align = rs.align(align_to) - self.init_realsense() + align_to = rs.stream.color + self.align = rs.align(align_to) + self.init_realsense() - def init_realsense(self): + def init_realsense(self): - self.pipeline = rs.pipeline() - config = rs.config() - if self.serial_number is not None: - config.enable_device(self.serial_number) + self.pipeline = rs.pipeline() + config = rs.config() + if self.serial_number is not None: + config.enable_device(self.serial_number) - config.enable_stream(rs.stream.color, self.img_shape[1] // 2, self.img_shape[0], rs.format.bgr8, self.fps) + config.enable_stream(rs.stream.color, self.img_shape[1], self.img_shape[0], rs.format.bgr8, self.fps) - if self.enable_depth: - config.enable_stream(rs.stream.depth, self.img_shape[1] // 2, self.img_shape[0], rs.format.z16, self.fps) + if self.enable_depth: + config.enable_stream(rs.stream.depth, self.img_shape[1], self.img_shape[0], rs.format.z16, self.fps) - profile = self.pipeline.start(config) - self._device = profile.get_device() - if self._device is None: - print('pipe_profile.get_device() is None .') - if self.enable_depth: - assert self._device is not None - depth_sensor = self._device.first_depth_sensor() - self.g_depth_scale = depth_sensor.get_depth_scale() + profile = self.pipeline.start(config) + self._device = profile.get_device() + if self._device is None: + print('[Image Server] pipe_profile.get_device() is None .') + if self.enable_depth: + assert self._device is not None + depth_sensor = self._device.first_depth_sensor() + self.g_depth_scale = depth_sensor.get_depth_scale() - self.intrinsics = profile.get_stream(rs.stream.color).as_video_stream_profile().get_intrinsics() + self.intrinsics = profile.get_stream(rs.stream.color).as_video_stream_profile().get_intrinsics() + def get_frame(self): + frames = self.pipeline.wait_for_frames() + aligned_frames = self.align.process(frames) + color_frame = aligned_frames.get_color_frame() - def get_frame(self): - frames = self.pipeline.wait_for_frames() - aligned_frames = self.align.process(frames) - color_frame = aligned_frames.get_color_frame() + if self.enable_depth: + depth_frame = aligned_frames.get_depth_frame() - if self.enable_depth: - depth_frame = aligned_frames.get_depth_frame() + if not color_frame: + return None - if not color_frame: - return None, None - - color_image = np.asanyarray(color_frame.get_data()) - # color_image = cv2.cvtColor(color_image, cv2.COLOR_BGR2RGB) - depth_image = np.asanyarray(depth_frame.get_data()) if self.enable_depth else None - return color_image, depth_image + color_image = np.asanyarray(color_frame.get_data()) + # color_image = cv2.cvtColor(color_image, cv2.COLOR_BGR2RGB) + depth_image = np.asanyarray(depth_frame.get_data()) if self.enable_depth else None + return color_image, depth_image + def release(self): + self.pipeline.stop() -class ImageServer: - def __init__(self, img_shape = (480, 640 * 2, 3), fps = 30, enable_wrist = False, port = 5555, Unit_Test = False): + +class OpenCVCamera(): + def __init__(self, device_id, img_shape, fps): """ - img_shape: User's expected camera resolution shape (H, W, C). - - p.s.1: 'W' of binocular camera according to binocular width (instead of monocular). - - p.s.2: User expectations are not necessarily the end result. The final img_shape value needs to be determined from \ - the terminal output (i.e. "Image Resolution: width is ยทยทยท") - - fps: user's expected camera frames per second. - - port: The port number to bind to, where you can receive messages from subscribers. - - Unit_Test: When both server and client are True, it can be used to test the image transfer latency, \ - network jitter, frame loss rate and other information. + decive_id: /dev/video* or * + img_shape: [height, width] """ - self.img_shape = img_shape + self.id = device_id self.fps = fps - self.enable_wrist = enable_wrist - self.port = port - self.enable_performance_eval = Unit_Test - - # initiate camera - self.cap = cv2.VideoCapture(0, cv2.CAP_V4L2) + self.img_shape = img_shape + self.cap = cv2.VideoCapture(self.id, cv2.CAP_V4L2) self.cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter.fourcc('M', 'J', 'P', 'G')) - self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.img_shape[1]) self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.img_shape[0]) + self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.img_shape[1]) self.cap.set(cv2.CAP_PROP_FPS, self.fps) - if self.enable_wrist: - # initiate realsense camera - self.left_cam = RealSenseCamera(img_shape = self.img_shape, fps = self.fps, serial_number = "218622271789") # left wrist camera - self.right_cam = RealSenseCamera(img_shape = self.img_shape, fps = self.fps, serial_number = "218622278527") # right wrist camera + # Test if the camera can read frames + if not self._can_read_frame(): + print(f"[Image Server] Camera {self.id} Error: Failed to initialize the camera or read frames. Exiting...") + self.release() + + def _can_read_frame(self): + success, _ = self.cap.read() + return success + + def release(self): + self.cap.release() + + def get_frame(self): + ret, color_image = self.cap.read() + if not ret: + return None + return color_image + + +class ImageServer: + def __init__(self, config, port = 5555, Unit_Test = False): + """ + config: + { + 'fps':30 # frame per second + 'head_camera_type': 'opencv', # opencv or realsense + 'head_camera_image_shape': [480, 1280], # Head camera resolution [height, width] + 'head_camera_id_numbers': [0], # '/dev/video0' (opencv) + 'wrist_camera_type': 'realsense', + 'wrist_camera_image_shape': [480, 640], # Wrist camera resolution [height, width] + 'wrist_camera_id_numbers': ["218622271789", "241222076627"], # serial number (realsense) + } + """ + print(config) + self.fps = config.get('fps', 30) + self.head_camera_type = config.get('head_camera_type', 'opencv') + self.head_image_shape = config.get('head_camera_image_shape', [480, 640]) # (height, width) + self.head_camera_id_numbers = config.get('head_camera_id_numbers', [0]) + + self.wrist_camera_type = config.get('wrist_camera_type', None) + self.wrist_image_shape = config.get('wrist_camera_image_shape', [480, 640]) # (height, width) + self.wrist_camera_id_numbers = config.get('wrist_camera_id_numbers', None) - # set ZeroMQ context and socket + self.port = port + self.Unit_Test = Unit_Test + + + # Initialize head cameras + self.head_cameras = [] + if self.head_camera_type == 'opencv': + for device_id in self.head_camera_id_numbers: + camera = OpenCVCamera(device_id=device_id, img_shape=self.head_image_shape, fps=self.fps) + self.head_cameras.append(camera) + elif self.head_camera_type == 'realsense': + for serial_number in self.head_camera_id_numbers: + camera = RealSenseCamera(img_shape=self.head_image_shape, fps=self.fps, serial_number=serial_number) + self.head_cameras.append(camera) + else: + print(f"[Image Server] Unsupported head_camera_type: {self.head_camera_type}") + + # Initialize wrist cameras if provided + self.wrist_cameras = [] + if self.wrist_camera_type and self.wrist_camera_id_numbers: + if self.wrist_camera_type == 'opencv': + for device_id in self.wrist_camera_id_numbers: + camera = OpenCVCamera(device_id=device_id, img_shape=self.wrist_image_shape, fps=self.fps) + self.wrist_cameras.append(camera) + elif self.wrist_camera_type == 'realsense': + for serial_number in self.wrist_camera_id_numbers: + camera = RealSenseCamera(img_shape=self.wrist_image_shape, fps=self.fps, serial_number=serial_number) + self.wrist_cameras.append(camera) + else: + print(f"[Image Server] Unsupported wrist_camera_type: {self.wrist_camera_type}") + + # Set ZeroMQ context and socket self.context = zmq.Context() self.socket = self.context.socket(zmq.PUB) self.socket.bind(f"tcp://*:{self.port}") - if self.enable_performance_eval: + if self.Unit_Test: self._init_performance_metrics() - print("Image server has started, waiting for client connections...") - print(f"Image Resolution: width is {self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)}, height is {self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)}\n") + for cam in self.head_cameras: + if isinstance(cam, OpenCVCamera): + print(f"[Image Server] Head camera {cam.id} resolution: {cam.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)} x {cam.cap.get(cv2.CAP_PROP_FRAME_WIDTH)}") + elif isinstance(cam, RealSenseCamera): + print(f"[Image Server] Head camera {cam.serial_number} resolution: {cam.img_shape[0]} x {cam.img_shape[1]}") + else: + print("[Image Server] Unknown camera type in head_cameras.") + + for cam in self.wrist_cameras: + if isinstance(cam, OpenCVCamera): + print(f"[Image Server] Wrist camera {cam.id} resolution: {cam.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)} x {cam.cap.get(cv2.CAP_PROP_FRAME_WIDTH)}") + elif isinstance(cam, RealSenseCamera): + print(f"[Image Server] Wrist camera {cam.serial_number} resolution: {cam.img_shape[0]} x {cam.img_shape[1]}") + else: + print("[Image Server] Unknown camera type in wrist_cameras.") + + print("[Image Server] Image server has started, waiting for client connections...") + + def _init_performance_metrics(self): self.frame_count = 0 # Total frames sent @@ -125,9 +202,12 @@ class ImageServer: elapsed_time = current_time - self.start_time real_time_fps = len(self.frame_times) / self.time_window print(f"[Image Server] Real-time FPS: {real_time_fps:.2f}, Total frames sent: {self.frame_count}, Elapsed time: {elapsed_time:.2f} sec") - + def _close(self): - self.cap.release() + for cam in self.head_cameras: + cam.release() + for cam in self.wrist_cameras: + cam.release() self.socket.close() self.context.term() print("[Image Server] The server has been closed.") @@ -135,35 +215,62 @@ class ImageServer: def send_process(self): try: while True: - ret, head_color = self.cap.read() - if not ret: - print("[Image Server] Frame read is error.") + head_frames = [] + for cam in self.head_cameras: + if self.head_camera_type == 'opencv': + color_image = cam.get_frame() + if color_image is None: + print("[Image Server] Head camera frame read is error.") + break + elif self.head_camera_type == 'realsense': + color_image, depth_iamge = cam.get_frame() + if color_image is None: + print("[Image Server] Head camera frame read is error.") + break + head_frames.append(color_image) + if len(head_frames) != len(self.head_cameras): break + head_color = cv2.hconcat(head_frames) + + if self.wrist_cameras: + wrist_frames = [] + for cam in self.wrist_cameras: + if self.wrist_camera_type == 'opencv': + color_image = cam.get_frame() + if color_image is None: + print("[Image Server] Wrist camera frame read is error.") + break + elif self.wrist_camera_type == 'realsense': + color_image, depth_iamge = cam.get_frame() + if color_image is None: + print("[Image Server] Wrist camera frame read is error.") + break + wrist_frames.append(color_image) + wrist_color = cv2.hconcat(wrist_frames) + + # Concatenate head and wrist frames + full_color = cv2.hconcat([head_color, wrist_color]) + else: + full_color = head_color - if self.enable_wrist: - left_wrist_color, left_wrist_depth = self.left_cam.get_frame() - right_wrist_color, right_wrist_depth = self.right_cam.get_frame() - # Concatenate images along the width - head_color = cv2.hconcat([head_color, left_wrist_color, right_wrist_color]) - - ret, buffer = cv2.imencode('.jpg', head_color) + ret, buffer = cv2.imencode('.jpg', full_color) if not ret: print("[Image Server] Frame imencode is failed.") continue jpg_bytes = buffer.tobytes() - if self.enable_performance_eval: + if self.Unit_Test: timestamp = time.time() frame_id = self.frame_count header = struct.pack('dI', timestamp, frame_id) # 8-byte double, 4-byte unsigned int message = header + jpg_bytes else: message = jpg_bytes - + self.socket.send(message) - if self.enable_performance_eval: + if self.Unit_Test: current_time = time.time() self._update_performance_metrics(current_time) self._print_performance_metrics(current_time) @@ -173,7 +280,17 @@ class ImageServer: finally: self._close() + if __name__ == "__main__": - # server = ImageServer(img_shape = (720, 640 * 2, 3), Unit_Test = True) # test - server = ImageServer(img_shape = (640, 480 * 2, 3), enable_wrist = True, Unit_Test = False) # deployment + config = { + 'fps': 30, + 'head_camera_type': 'opencv', + 'head_camera_image_shape': [480, 1280], # Head camera resolution + 'head_camera_id_numbers': [0], + 'wrist_camera_type': 'opencv', + 'wrist_camera_image_shape': [480, 640], # Wrist camera resolution + 'wrist_camera_id_numbers': [2, 4], + } + + server = ImageServer(config, Unit_Test=False) server.send_process() \ No newline at end of file diff --git a/teleop/robot_control/robot_hand_unitree.py b/teleop/robot_control/robot_hand_unitree.py index 52e9407..3ced974 100644 --- a/teleop/robot_control/robot_hand_unitree.py +++ b/teleop/robot_control/robot_hand_unitree.py @@ -1,6 +1,11 @@ +# for dex3-1 from unitree_sdk2py.core.channel import ChannelPublisher, ChannelSubscriber, ChannelFactoryInitialize # dds from unitree_sdk2py.idl.unitree_hg.msg.dds_ import HandCmd_, HandState_ # idl from unitree_sdk2py.idl.default import unitree_hg_msg_dds__HandCmd_ +# for gripper +from unitree_sdk2py.core.channel import ChannelPublisher, ChannelSubscriber, ChannelFactoryInitialize # dds +from unitree_sdk2py.idl.unitree_go.msg.dds_ import MotorCmds_, MotorStates_ # idl +from unitree_sdk2py.idl.default import unitree_go_msg_dds__MotorCmd_ import numpy as np from enum import IntEnum @@ -15,8 +20,7 @@ sys.path.append(parent2_dir) from teleop.robot_control.hand_retargeting import HandRetargeting, HandType -unitree_tip_indices = [4, 9, 14] - +unitree_tip_indices = [4, 9, 14] # [thumb, index, middle] in OpenXR Dex3_Num_Motors = 7 kTopicDex3LeftCommand = "rt/dex3/left/cmd" kTopicDex3RightCommand = "rt/dex3/right/cmd" @@ -171,8 +175,7 @@ class Dex3_1_Controller: # Read left and right q_state from shared arrays state_data = np.concatenate((np.array(left_hand_state_array[:]), np.array(right_hand_state_array[:]))) - is_hand_data_initialized = not np.all(left_hand_mat == 0.0) - if is_hand_data_initialized: + if not np.all(left_hand_mat == 0.0): # if hand data has been initialized. left_q_target = self.hand_retargeting.left_retargeting.retarget(left_hand_mat[unitree_tip_indices])[[0,1,2,3,4,5,6]] right_q_target = self.hand_retargeting.right_retargeting.retarget(right_hand_mat[unitree_tip_indices])[[0,1,2,3,4,5,6]] @@ -210,36 +213,222 @@ class Dex3_1_Right_JointIndex(IntEnum): kRightHandSix = 6 # middle_1 +unitree_gripper_indices = [4, 9] # [thumb, index] +Gripper_Num_Motors = 2 +kTopicGripperCommand = "rt/unitree_actuator/cmd" +kTopicGripperState = "rt/unitree_actuator/state" + +class Gripper_Controller: + def __init__(self, left_hand_array, right_hand_array, dual_gripper_data_lock = None, dual_gripper_state_out = None, dual_gripper_action_out = None, + fps = 250.0, Unit_Test = False): + """ + [note] A *_array type parameter requires using a multiprocessing Array, because it needs to be passed to the internal child process + + left_hand_array: [input] Left hand skeleton data (required from XR device) to control_thread + + right_hand_array: [input] Right hand skeleton data (required from XR device) to control_thread + + dual_gripper_data_lock: Data synchronization lock for dual_gripper_state_array and dual_gripper_action_array + + dual_gripper_state: [output] Return left(1), right(1) gripper motor state + + dual_gripper_action: [output] Return left(1), right(1) gripper motor action + + fps: Control frequency + + Unit_Test: Whether to enable unit testing + """ + + print("Initialize Gripper_Controller...") + + self.fps = fps + self.Unit_Test = Unit_Test + + if self.Unit_Test: + ChannelFactoryInitialize(0) + + # initialize handcmd publisher and handstate subscriber + self.GripperCmb_publisher = ChannelPublisher(kTopicGripperCommand, MotorCmds_) + self.GripperCmb_publisher.Init() + + self.GripperState_subscriber = ChannelSubscriber(kTopicGripperState, MotorStates_) + self.GripperState_subscriber.Init() + + self.dual_gripper_state = [0.0] * len(Gripper_JointIndex) + + # initialize subscribe thread + self.subscribe_state_thread = threading.Thread(target=self._subscribe_gripper_state) + self.subscribe_state_thread.daemon = True + self.subscribe_state_thread.start() + + while True: + if any(state != 0.0 for state in self.dual_gripper_state): + break + time.sleep(0.01) + print("[Gripper_Controller] Waiting to subscribe dds...") + + self.gripper_control_thread = threading.Thread(target=self.control_thread, args=(left_hand_array, right_hand_array, self.dual_gripper_state, + dual_gripper_data_lock, dual_gripper_state_out, dual_gripper_action_out)) + self.gripper_control_thread.daemon = True + self.gripper_control_thread.start() + + print("Initialize Gripper_Controller OK!\n") + + def _subscribe_gripper_state(self): + while True: + gripper_msg = self.GripperState_subscriber.Read() + if gripper_msg is not None: + for idx, id in enumerate(Gripper_JointIndex): + self.dual_gripper_state[idx] = gripper_msg.states[id].q + time.sleep(0.002) + + def ctrl_dual_gripper(self, gripper_q_target): + """set current left, right gripper motor state target q""" + for idx, id in enumerate(Gripper_JointIndex): + self.gripper_msg.cmds[id].q = gripper_q_target[idx] + + self.GripperCmb_publisher.Write(self.gripper_msg) + # print("gripper ctrl publish ok.") + + def control_thread(self, left_hand_array, right_hand_array, dual_gripper_state_in, dual_hand_data_lock = None, + dual_gripper_state_out = None, dual_gripper_action_out = None): + self.running = True + + DELTA_GRIPPER_CMD = 0.18 # The motor rotates 5.4 radians, the clamping jaw slide open 9 cm, so 0.6 rad <==> 1 cm, 0.18 rad <==> 3 mm + THUMB_INDEX_DISTANCE_MIN = 0.030 # Assuming a minimum Euclidean distance is 3 cm between thumb and index. + THUMB_INDEX_DISTANCE_MAX = 0.120 # Assuming a maximum Euclidean distance is 12 cm between thumb and index. + LEFT_MAPPED_MIN = 0.0 # The minimum initial motor position when the gripper closes at startup. + RIGHT_MAPPED_MIN = 0.0 # The minimum initial motor position when the gripper closes at startup. + # The maximum initial motor position when the gripper closes before calibration (with the rail stroke calculated as 0.6 cm/rad * 9 rad = 5.4 cm). + LEFT_MAPPED_MAX = LEFT_MAPPED_MIN + 5.40 + RIGHT_MAPPED_MAX = RIGHT_MAPPED_MIN + 5.40 + left_target_action = (LEFT_MAPPED_MAX - LEFT_MAPPED_MIN) / 2.0 + right_target_action = (RIGHT_MAPPED_MAX - RIGHT_MAPPED_MIN) / 2.0 + + dq = 0.0 + tau = 0.0 + kp = 5.0 + kd = 0.05 + # initialize gripper cmd msg + self.gripper_msg = MotorCmds_() + self.gripper_msg.cmds = [unitree_go_msg_dds__MotorCmd_() for _ in range(len(Gripper_JointIndex))] + for id in Gripper_JointIndex: + self.gripper_msg.cmds[id].dq = dq + self.gripper_msg.cmds[id].tau = tau + self.gripper_msg.cmds[id].kp = kp + self.gripper_msg.cmds[id].kd = kd + + try: + while self.running: + start_time = time.time() + # get dual hand skeletal point state from XR device + left_hand_mat = np.array(left_hand_array[:]).reshape(25, 3).copy() + right_hand_mat = np.array(right_hand_array[:]).reshape(25, 3).copy() + + if not np.all(left_hand_mat == 0.0): # if hand data has been initialized. + left_euclidean_distance = np.linalg.norm(left_hand_mat[unitree_gripper_indices[1]] - left_hand_mat[unitree_gripper_indices[0]]) + right_euclidean_distance = np.linalg.norm(right_hand_mat[unitree_gripper_indices[1]] - left_hand_mat[unitree_gripper_indices[0]]) + # Linear mapping from [0, THUMB_INDEX_DISTANCE_MAX] to gripper action range + left_target_action = np.interp(left_euclidean_distance, [THUMB_INDEX_DISTANCE_MIN, THUMB_INDEX_DISTANCE_MAX], [LEFT_MAPPED_MIN, LEFT_MAPPED_MAX]) + right_target_action = np.interp(right_euclidean_distance, [THUMB_INDEX_DISTANCE_MIN, THUMB_INDEX_DISTANCE_MAX], [RIGHT_MAPPED_MIN, RIGHT_MAPPED_MAX]) + # else: # TEST WITHOUT APPLE VISION PRO + # current_time = time.time() + # period = 2.5 + # import math + # left_euclidean_distance = THUMB_INDEX_DISTANCE_MAX * (math.sin(2 * math.pi * current_time / period) + 1) / 2 + # right_euclidean_distance = THUMB_INDEX_DISTANCE_MAX * (math.sin(2 * math.pi * current_time / period) + 1) / 2 + # left_target_action = np.interp(left_euclidean_distance, [THUMB_INDEX_DISTANCE_MIN, THUMB_INDEX_DISTANCE_MAX], [LEFT_MAPPED_MIN, LEFT_MAPPED_MAX]) + # right_target_action = np.interp(right_euclidean_distance, [THUMB_INDEX_DISTANCE_MIN, THUMB_INDEX_DISTANCE_MAX], [RIGHT_MAPPED_MIN, RIGHT_MAPPED_MAX]) + + # get current dual gripper motor state + dual_gripper_state = np.array(dual_gripper_state_in[:]) + + # clip dual gripper action to avoid overflow + left_actual_action = np.clip(left_target_action, dual_gripper_state[1] - DELTA_GRIPPER_CMD, dual_gripper_state[1] + DELTA_GRIPPER_CMD) + right_actual_action = np.clip(right_target_action, dual_gripper_state[0] - DELTA_GRIPPER_CMD, dual_gripper_state[0] + DELTA_GRIPPER_CMD) + + dual_gripper_action = np.array([right_actual_action, left_actual_action]) + + if dual_gripper_state_out and dual_gripper_action_out: + with dual_hand_data_lock: + dual_gripper_state_out[:] = dual_gripper_state - np.array([RIGHT_MAPPED_MIN, LEFT_MAPPED_MIN]) + dual_gripper_action_out[:] = dual_gripper_action - np.array([RIGHT_MAPPED_MIN, LEFT_MAPPED_MIN]) + + # print(f"LEFT: euclidean:{left_euclidean_distance:.4f} \tstate:{dual_gripper_state_out[1]:.4f}\ + # \ttarget_action:{right_target_action - RIGHT_MAPPED_MIN:.4f} \tactual_action:{dual_gripper_action_out[1]:.4f}") + # print(f"RIGHT:euclidean:{right_euclidean_distance:.4f} \tstate:{dual_gripper_state_out[0]:.4f}\ + # \ttarget_action:{left_target_action - LEFT_MAPPED_MIN:.4f} \tactual_action:{dual_gripper_action_out[0]:.4f}") + + self.ctrl_dual_gripper(dual_gripper_action) + current_time = time.time() + time_elapsed = current_time - start_time + sleep_time = max(0, (1 / self.fps) - time_elapsed) + time.sleep(sleep_time) + finally: + print("Gripper_Controller has been closed.") + +class Gripper_JointIndex(IntEnum): + kLeftGripper = 0 + kRightGripper = 1 + + if __name__ == "__main__": import argparse from teleop.open_television.tv_wrapper import TeleVisionWrapper from teleop.image_server.image_client import ImageClient parser = argparse.ArgumentParser() - parser.add_argument('--binocular', action='store_true', help='Use binocular camera') - parser.add_argument('--monocular', dest='binocular', action='store_false', help='Use monocular camera') - parser.set_defaults(binocular=True) + parser.add_argument('--dex', action='store_true', help='Use dex3-1 hand') + parser.add_argument('--gripper', dest='dex', action='store_false', help='Use gripper') + parser.set_defaults(dex=True) args = parser.parse_args() print(f"args:{args}\n") # image - img_shape = (480, 1280, 3) - img_shm = shared_memory.SharedMemory(create=True, size=np.prod(img_shape) * np.uint8().itemsize) - img_array = np.ndarray(img_shape, dtype=np.uint8, buffer=img_shm.buf) - img_client = ImageClient(tv_img_shape = img_shape, tv_img_shm_name = img_shm.name) - image_receive_thread = threading.Thread(target=img_client.receive_process, daemon=True) + img_config = { + 'fps': 30, + 'head_camera_type': 'opencv', + 'head_camera_image_shape': [480, 1280], # Head camera resolution + 'head_camera_id_numbers': [0], + 'wrist_camera_type': 'opencv', + 'wrist_camera_image_shape': [480, 640], # Wrist camera resolution + 'wrist_camera_id_numbers': [2, 4], + } + ASPECT_RATIO_THRESHOLD = 2.0 # If the aspect ratio exceeds this value, it is considered binocular + if len(img_config['head_camera_id_numbers']) > 1 or (img_config['head_camera_image_shape'][1] / img_config['head_camera_image_shape'][0] > ASPECT_RATIO_THRESHOLD): + BINOCULAR = True + else: + BINOCULAR = False + # image + if BINOCULAR and not (img_config['head_camera_image_shape'][1] / img_config['head_camera_image_shape'][0] > ASPECT_RATIO_THRESHOLD): + tv_img_shape = (img_config['head_camera_image_shape'][0], img_config['head_camera_image_shape'][1] * 2, 3) + else: + tv_img_shape = (img_config['head_camera_image_shape'][0], img_config['head_camera_image_shape'][1], 3) + + img_shm = shared_memory.SharedMemory(create = True, size = np.prod(tv_img_shape) * np.uint8().itemsize) + img_array = np.ndarray(tv_img_shape, dtype = np.uint8, buffer = img_shm.buf) + img_client = ImageClient(tv_img_shape = tv_img_shape, tv_img_shm_name = img_shm.name) + image_receive_thread = threading.Thread(target = img_client.receive_process, daemon = True) image_receive_thread.daemon = True image_receive_thread.start() # television and arm - tv_wrapper = TeleVisionWrapper(args.binocular, img_shape, img_shm.name) - - left_hand_array = Array('d', 75, lock=True) - right_hand_array = Array('d', 75, lock=True) - dual_hand_data_lock = Lock() - dual_hand_state_array = Array('d', 14, lock=False) # current left, right hand state(14) data. - dual_hand_action_array = Array('d', 14, lock=False) # current left, right hand action(14) data. - hand_ctrl = Dex3_1_Controller(left_hand_array, right_hand_array, dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array, Unit_Test = True) + tv_wrapper = TeleVisionWrapper(BINOCULAR, tv_img_shape, img_shm.name) + + if args.dex: + left_hand_array = Array('d', 75, lock=True) + right_hand_array = Array('d', 75, lock=True) + dual_hand_data_lock = Lock() + dual_hand_state_array = Array('d', 14, lock=False) # current left, right hand state(14) data. + dual_hand_action_array = Array('d', 14, lock=False) # current left, right hand action(14) data. + hand_ctrl = Dex3_1_Controller(left_hand_array, right_hand_array, dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array, Unit_Test = True) + else: + left_hand_array = Array('d', 75, lock=True) + right_hand_array = Array('d', 75, lock=True) + dual_gripper_data_lock = Lock() + dual_gripper_state_array = Array('d', 2, lock=False) # current left, right gripper state(2) data. + dual_gripper_action_array = Array('d', 2, lock=False) # current left, right gripper action(2) data. + gripper_ctrl = Gripper_Controller(left_hand_array, right_hand_array, dual_gripper_data_lock, dual_gripper_state_array, dual_gripper_action_array, Unit_Test = True) user_input = input("Please enter the start signal (enter 's' to start the subsequent program):\n") @@ -249,5 +438,7 @@ if __name__ == "__main__": # send hand skeleton data to hand_ctrl.control_process left_hand_array[:] = left_hand.flatten() right_hand_array[:] = right_hand.flatten() - print(f"Dual hand state array: {list(dual_hand_state_array)}") + + # with dual_hand_data_lock: + # print(f"state : {list(dual_hand_state_array)} \naction: {list(dual_hand_action_array)} \n") time.sleep(0.01) diff --git a/teleop/teleop_hand_and_arm.py b/teleop/teleop_hand_and_arm.py index c9780fa..cedd083 100644 --- a/teleop/teleop_hand_and_arm.py +++ b/teleop/teleop_hand_and_arm.py @@ -14,7 +14,7 @@ sys.path.append(parent_dir) from teleop.open_television.tv_wrapper import TeleVisionWrapper from teleop.robot_control.robot_arm import G1_29_ArmController from teleop.robot_control.robot_arm_ik import G1_29_ArmIK -from teleop.robot_control.robot_hand_unitree import Dex3_1_Controller +from teleop.robot_control.robot_hand_unitree import Dex3_1_Controller, Gripper_Controller from teleop.image_server.image_client import ImageClient from teleop.utils.episode_writer import EpisodeWriter @@ -26,25 +26,44 @@ if __name__ == '__main__': parser.add_argument('--record', action = 'store_true', help = 'Save data or not') parser.add_argument('--no-record', dest = 'record', action = 'store_false', help = 'Do not save data') - parser.set_defaults(record = True) + parser.set_defaults(record = False) - parser.add_argument('--binocular', action = 'store_true', help = 'Use binocular camera') - parser.add_argument('--monocular', dest = 'binocular', action = 'store_false', help = 'Use monocular camera') - parser.set_defaults(binocular = True) - - parser.add_argument('--wrist', action = 'store_true', help = 'Use wrist camera') - parser.add_argument('--no-wrist', dest = 'wrist', action = 'store_false', help = 'Not use wrist camera') - parser.set_defaults(wrist = False) + parser.add_argument('--dex', action='store_true', help='Use dex3-1 hand') + parser.add_argument('--gripper', dest='dex', action='store_false', help='Use gripper') + parser.set_defaults(dex = False) args = parser.parse_args() print(f"args:{args}\n") # image - tv_img_shape = (480, 640 * 2, 3) + img_config = { + 'fps': 30, + 'head_camera_type': 'opencv', + 'head_camera_image_shape': [480, 1280], # Head camera resolution + 'head_camera_id_numbers': [0], + 'wrist_camera_type': 'opencv', + 'wrist_camera_image_shape': [480, 640], # Wrist camera resolution + 'wrist_camera_id_numbers': [2, 4], + } + ASPECT_RATIO_THRESHOLD = 2.0 # If the aspect ratio exceeds this value, it is considered binocular + if len(img_config['head_camera_id_numbers']) > 1 or (img_config['head_camera_image_shape'][1] / img_config['head_camera_image_shape'][0] > ASPECT_RATIO_THRESHOLD): + BINOCULAR = True + else: + BINOCULAR = False + if 'wrist_camera_type' in img_config: + WRIST = True + else: + WRIST = False + + if BINOCULAR and not (img_config['head_camera_image_shape'][1] / img_config['head_camera_image_shape'][0] > ASPECT_RATIO_THRESHOLD): + tv_img_shape = (img_config['head_camera_image_shape'][0], img_config['head_camera_image_shape'][1] * 2, 3) + else: + tv_img_shape = (img_config['head_camera_image_shape'][0], img_config['head_camera_image_shape'][1], 3) + tv_img_shm = shared_memory.SharedMemory(create = True, size = np.prod(tv_img_shape) * np.uint8().itemsize) tv_img_array = np.ndarray(tv_img_shape, dtype = np.uint8, buffer = tv_img_shm.buf) - if args.wrist: - wrist_img_shape = tv_img_shape + if WRIST: + wrist_img_shape = (img_config['wrist_camera_image_shape'][0], img_config['wrist_camera_image_shape'][1] * 2, 3) wrist_img_shm = shared_memory.SharedMemory(create = True, size = np.prod(wrist_img_shape) * np.uint8().itemsize) wrist_img_array = np.ndarray(wrist_img_shape, dtype = np.uint8, buffer = wrist_img_shm.buf) img_client = ImageClient(tv_img_shape = tv_img_shape, tv_img_shm_name = tv_img_shm.name, @@ -57,20 +76,28 @@ if __name__ == '__main__': image_receive_thread.start() # television and arm - tv_wrapper = TeleVisionWrapper(args.binocular, tv_img_shape, tv_img_shm.name) + tv_wrapper = TeleVisionWrapper(BINOCULAR, tv_img_shape, tv_img_shm.name) arm_ctrl = G1_29_ArmController() arm_ik = G1_29_ArmIK() # hand - left_hand_array = Array('d', 75, lock = True) # [input] - right_hand_array = Array('d', 75, lock = True) # [input] - dual_hand_data_lock = Lock() - dual_hand_state_array = Array('d', 14, lock = False) # [output] current left, right hand state(14) data. - dual_hand_action_array = Array('d', 14, lock = False) # [output] current left, right hand action(14) data. - hand_ctrl = Dex3_1_Controller(left_hand_array, right_hand_array, dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array) + if args.dex: + left_hand_array = Array('d', 75, lock = True) # [input] + right_hand_array = Array('d', 75, lock = True) # [input] + dual_hand_data_lock = Lock() + dual_hand_state_array = Array('d', 14, lock = False) # [output] current left, right hand state(14) data. + dual_hand_action_array = Array('d', 14, lock = False) # [output] current left, right hand action(14) data. + hand_ctrl = Dex3_1_Controller(left_hand_array, right_hand_array, dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array) + else: + left_hand_array = Array('d', 75, lock=True) + right_hand_array = Array('d', 75, lock=True) + dual_gripper_data_lock = Lock() + dual_gripper_state_array = Array('d', 2, lock=False) # current left, right gripper state(2) data. + dual_gripper_action_array = Array('d', 2, lock=False) # current left, right gripper action(2) data. + gripper_ctrl = Gripper_Controller(left_hand_array, right_hand_array, dual_gripper_data_lock, dual_gripper_state_array, dual_gripper_action_array) if args.record: - recorder = EpisodeWriter(task_dir = args.task_dir, frequency = args.frequency) + recorder = EpisodeWriter(task_dir = args.task_dir, frequency = args.frequency, log = True) try: user_input = input("Please enter the start signal (enter 'r' to start the subsequent program):\n") @@ -120,14 +147,25 @@ if __name__ == '__main__': # record data if args.record: - with dual_hand_data_lock: - left_hand_state = dual_hand_state_array[:7] - right_hand_state = dual_hand_state_array[-7:] - left_hand_action = dual_hand_action_array[:7] - right_hand_action = dual_hand_action_array[-7:] + # dex hand or gripper + if args.dex: + with dual_hand_data_lock: + left_hand_state = dual_hand_state_array[:7] + right_hand_state = dual_hand_state_array[-7:] + left_hand_action = dual_hand_action_array[:7] + right_hand_action = dual_hand_action_array[-7:] + else: + with dual_gripper_data_lock: + left_hand_state = dual_gripper_state_array[1] + right_hand_state = dual_gripper_state_array[0] + left_hand_action = dual_gripper_action_array[1] + right_hand_action = dual_gripper_action_array[0] + # head image current_tv_image = tv_img_array.copy() - if args.wrist: + # wrist image + if WRIST: current_wrist_image = wrist_img_array.copy() + # arm state and action left_arm_state = current_lr_arm_q[:7] right_arm_state = current_lr_arm_q[-7:] left_arm_action = sol_q[:7] @@ -136,15 +174,15 @@ if __name__ == '__main__': if recording: colors = {} depths = {} - if args.binocular: + if BINOCULAR: colors[f"color_{0}"] = current_tv_image[:, :tv_img_shape[1]//2] colors[f"color_{1}"] = current_tv_image[:, tv_img_shape[1]//2:] - if args.wrist: + if WRIST: colors[f"color_{2}"] = current_wrist_image[:, :wrist_img_shape[1]//2] colors[f"color_{3}"] = current_wrist_image[:, wrist_img_shape[1]//2:] else: colors[f"color_{0}"] = current_tv_image - if args.wrist: + if WRIST: colors[f"color_{1}"] = current_wrist_image[:, :wrist_img_shape[1]//2] colors[f"color_{2}"] = current_wrist_image[:, wrist_img_shape[1]//2:] states = { @@ -206,7 +244,7 @@ if __name__ == '__main__': finally: tv_img_shm.unlink() tv_img_shm.close() - if args.wrist: + if WRIST: wrist_img_shm.unlink() wrist_img_shm.close() print("Finally, exiting program...") diff --git a/teleop/utils/episode_writer.py b/teleop/utils/episode_writer.py index 7d3e50f..80216d5 100755 --- a/teleop/utils/episode_writer.py +++ b/teleop/utils/episode_writer.py @@ -4,10 +4,11 @@ import json import datetime import numpy as np import time +from .rerun_visualizer import RerunLogger -class EpisodeWriter(object): - def __init__(self, task_dir, frequency=30, image_size=[640, 480]): +class EpisodeWriter(): + def __init__(self, task_dir, frequency=30, image_size=[640, 480], log = False): """ image_size: [width, height] """ @@ -20,6 +21,7 @@ class EpisodeWriter(object): self.episode_data = [] self.item_id = -1 self.episode_id = -1 + self.log = log if os.path.exists(self.task_dir): episode_dirs = [episode_dir for episode_dir in os.listdir(self.task_dir) if 'episode_' in episode_dir] episode_last = sorted(episode_dirs)[-1] if len(episode_dirs) > 0 else None @@ -83,8 +85,10 @@ class EpisodeWriter(object): os.makedirs(self.color_dir, exist_ok=True) os.makedirs(self.depth_dir, exist_ok=True) os.makedirs(self.audio_dir, exist_ok=True) + if self.log: + self.online_logger = RerunLogger(prefix="online/", IdxRangeBoundary = 60) - def add_item(self, colors, depths=None, states=None, actions=None, tactiles=None, audios=None, log = True): + def add_item(self, colors, depths=None, states=None, actions=None, tactiles=None, audios=None): self.item_id += 1 item_data = { 'idx': self.item_id, @@ -125,9 +129,10 @@ class EpisodeWriter(object): self.episode_data.append(item_data) - if log: + if self.log: curent_record_time = time.time() print(f"==> episode_id:{self.episode_id} item_id:{self.item_id} current_time:{curent_record_time}") + self.online_logger.log_item_data(item_data) def save_episode(self): diff --git a/teleop/utils/rerun_visualizer.py b/teleop/utils/rerun_visualizer.py new file mode 100644 index 0000000..3ff884c --- /dev/null +++ b/teleop/utils/rerun_visualizer.py @@ -0,0 +1,231 @@ +import os +import json +import cv2 +import time +import rerun as rr +import rerun.blueprint as rrb +from datetime import datetime + +class RerunEpisodeReader: + def __init__(self, task_dir = ".", json_file="data.json"): + self.task_dir = task_dir + self.json_file = json_file + + def return_episode_data(self, episode_idx): + # Load episode data on-demand + episode_dir = os.path.join(self.task_dir, f"episode_{episode_idx:04d}") + json_path = os.path.join(episode_dir, self.json_file) + + if not os.path.exists(json_path): + raise FileNotFoundError(f"Episode {episode_idx} data.json not found.") + + with open(json_path, 'r', encoding='utf-8') as jsonf: + json_file = json.load(jsonf) + + episode_data = [] + + # Loop over the data entries and process each one + for item_data in json_file['data']: + # Process images and other data + colors = self._process_images(item_data, 'colors', episode_dir) + depths = self._process_images(item_data, 'depths', episode_dir) + audios = self._process_audio(item_data, 'audios', episode_dir) + + # Append the data in the item_data list + episode_data.append( + { + 'idx': item_data.get('idx', 0), + 'colors': colors, + 'depths': depths, + 'states': item_data.get('states', {}), + 'actions': item_data.get('actions', {}), + 'tactiles': item_data.get('tactiles', {}), + 'audios': audios, + } + ) + + return episode_data + + def _process_images(self, item_data, data_type, dir_path): + images = {} + + for key, file_name in item_data.get(data_type, {}).items(): + if file_name: + file_path = os.path.join(dir_path, file_name) + if os.path.exists(file_path): + image = cv2.imread(file_path) + image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) + images[key] = image + return images + + def _process_audio(self, item_data, data_type, episode_dir): + audio_data = {} + dir_path = os.path.join(episode_dir, data_type) + + for key, file_name in item_data.get(data_type, {}).items(): + if file_name: + file_path = os.path.join(dir_path, file_name) + if os.path.exists(file_path): + pass # Handle audio data if needed + return audio_data + +class RerunLogger: + def __init__(self, prefix = "", IdxRangeBoundary = 30, memory_limit = "200MB"): + rr.init(datetime.now().strftime("Runtime_%Y%m%d_%H%M%S")) + rr.spawn(memory_limit = memory_limit) + + self.prefix = prefix + self.IdxRangeBoundary = IdxRangeBoundary + # Set up blueprint for live visualization + if self.IdxRangeBoundary: + self.setup_blueprint() + + def setup_blueprint(self): + data_plot_paths = [ + f"{self.prefix}left_arm", + f"{self.prefix}right_arm", + f"{self.prefix}left_hand", + f"{self.prefix}right_hand" + ] + image_plot_paths = [ + f"{self.prefix}colors/color_0", + f"{self.prefix}colors/color_1", + f"{self.prefix}colors/color_2", + f"{self.prefix}colors/color_3" + ] + + views = [] + for plot_path in data_plot_paths: + view = rrb.TimeSeriesView( + origin = plot_path, + time_ranges=[ + rrb.VisibleTimeRange( + "idx", + start = rrb.TimeRangeBoundary.cursor_relative(seq = -self.IdxRangeBoundary), + end = rrb.TimeRangeBoundary.cursor_relative(), + ) + ], + plot_legend = rrb.PlotLegend(visible = True), + ) + views.append(view) + + for plot_path in image_plot_paths: + view = rrb.Spatial2DView( + origin = plot_path, + time_ranges=[ + rrb.VisibleTimeRange( + "idx", + start = rrb.TimeRangeBoundary.cursor_relative(seq = -self.IdxRangeBoundary), + end = rrb.TimeRangeBoundary.cursor_relative(), + ) + ], + ) + views.append(view) + + grid = rrb.Grid(contents = views, + grid_columns=2, + column_shares=[1, 1], + row_shares=[1, 1, 0.5], + ) + rr.send_blueprint(grid) + + + def log_item_data(self, item_data: dict): + rr.set_time_sequence("idx", item_data.get('idx', 0)) + + # Log states + states = item_data.get('states', {}) or {} + for part, state_info in states.items(): + if state_info: + values = state_info.get('qpos', []) + for idx, val in enumerate(values): + rr.log(f"{self.prefix}{part}/states/qpos/{idx}", rr.Scalar(val)) + + # Log actions + actions = item_data.get('actions', {}) or {} + for part, action_info in actions.items(): + if action_info: + values = action_info.get('qpos', []) + for idx, val in enumerate(values): + rr.log(f"{self.prefix}{part}/actions/qpos/{idx}", rr.Scalar(val)) + + # Log colors (images) + colors = item_data.get('colors', {}) or {} + for color_key, color_val in colors.items(): + if color_val is not None: + rr.log(f"{self.prefix}colors/{color_key}", rr.Image(color_val)) + + # Log depths (images) + depths = item_data.get('depths', {}) or {} + for depth_key, depth_val in depths.items(): + if depth_val is not None: + # rr.log(f"{self.prefix}depths/{depth_key}", rr.Image(depth_val)) + pass # Handle depth if needed + + # Log tactile if needed + tactiles = item_data.get('tactiles', {}) or {} + for hand, tactile_vals in tactiles.items(): + if tactile_vals is not None: + pass # Handle tactile if needed + + # Log audios if needed + audios = item_data.get('audios', {}) or {} + for audio_key, audio_val in audios.items(): + if audio_val is not None: + pass # Handle audios if needed + + def log_episode_data(self, episode_data: list): + for item_data in episode_data: + self.log_item_data(item_data) + + +if __name__ == "__main__": + import gdown + import zipfile + import os + url = "https://drive.google.com/file/d/1f5UuFl1z_gaByg_7jDRj1_NxfJZh2evD/view?usp=sharing" + zip_file = "rerun_testdata.zip" + if not os.path.exists("episode_0006"): + if not os.path.exists(zip_file): + file_id = url.split('/')[5] + gdown.download(id=file_id, output=zip_file, quiet=False) + print("download ok.") + with zipfile.ZipFile(zip_file, 'r') as zip_ref: + zip_ref.extractall(".") + print("uncompress ok.") + os.remove(zip_file) + print("clean file ok.") + else: + print("rerun_testdata exits.") + + episode_reader = RerunEpisodeReader() + episode_data6 = episode_reader.return_episode_data(6) + episode_data8 = episode_reader.return_episode_data(8) + + # Example 1: Offline Visualization + user_input = input("Please enter the start signal (enter 'off' to start the subsequent program):\n") + if user_input.lower() == 'off': + print("Starting offline visualization...") + offline_logger = RerunLogger(prefix="offline/") + offline_logger.log_episode_data(episode_data6) + print("Offline visualization completed.") + + # Example 2: Online Visualization with Fixed Time Window + user_input = input("Please enter the start signal (enter 'on' to start the subsequent program):\n") + if user_input.lower() == 'on': + print("Starting online visualization with fixed idx size...") + online_logger = RerunLogger(prefix="online/", IdxRangeBoundary = 60) + for item_data in episode_data6: + online_logger.log_item_data(item_data) + time.sleep(0.033) # 30hz + print("Online visualization completed.") + + # Example 3: Online Visualization with Fixed Time Window + user_input = input("Please enter the start signal (enter 'on' to start the subsequent program):\n") + if user_input.lower() == 'on': + print("Starting online visualization with fixed idx size...") + online_logger = RerunLogger(prefix="online/", IdxRangeBoundary = 60) + for item_data in episode_data8: + online_logger.log_item_data(item_data) + time.sleep(0.033) # 30hz + print("Online visualization completed.") \ No newline at end of file