diff --git a/teleop/robot_control/robot_arm.py b/teleop/robot_control/robot_arm.py index cdd24c1..3a1923b 100644 --- a/teleop/robot_control/robot_arm.py +++ b/teleop/robot_control/robot_arm.py @@ -81,7 +81,7 @@ class G1_29_ArmController: self._gradual_time = None # initialize lowcmd publisher and lowstate subscriber - ChannelFactoryInitialize(0) + ChannelFactoryInitialize(1) if self.debug_mode: self.lowcmd_publisher = ChannelPublisher(kTopicLowCommand_Debug, hg_LowCmd) else: diff --git a/teleop/teleop_hand_and_arm.py b/teleop/teleop_hand_and_arm.py index e6449f0..c08d8e5 100644 --- a/teleop/teleop_hand_and_arm.py +++ b/teleop/teleop_hand_and_arm.py @@ -47,10 +47,10 @@ def on_press(key): if key == 'r': start_signal = True logger_mp.info("Program start signal received.") - elif key == 'q': + elif key == 's': stop_listening() running = False - elif key == 's': + elif key == 'q': should_toggle_recording = True threading.Thread(target=listen_keyboard, kwargs={"on_press": on_press}, daemon=True).start() @@ -172,6 +172,14 @@ if __name__ == '__main__': if args.is_use_sim: reset_pose_publisher = ChannelPublisher("rt/reset_pose/cmd", String_) reset_pose_publisher.Init() + from teleop.utils.sim_state_topic import start_sim_state_subscribe + sim_state_subscriber = start_sim_state_subscribe() + # from teleop.utils.sim_state_client import SimStateClient + # sim_state_client = SimStateClient() + # sim_state_client.Init() + # sim_state_client.SetTimeout(5.0) + # sim_state_client.WaitLeaseApplied() + # c=1 # xr mode if args.xr_mode == 'controller' and not args.debug: from unitree_sdk2py.g1.loco.g1_loco_client import LocoClient @@ -196,12 +204,12 @@ if __name__ == '__main__': tv_resized_image = cv2.resize(tv_img_array, (tv_img_shape[1] // 2, tv_img_shape[0] // 2)) cv2.imshow("record image", tv_resized_image) key = cv2.waitKey(1) & 0xFF - if key == ord('q'): + if key == ord('s'): stop_listening() running = False if args.is_use_sim: publish_reset_category(2, reset_pose_publisher) - elif key == ord('s'): + elif key == ord('q'): should_toggle_recording = True elif key == ord('a'): if args.is_use_sim: @@ -237,7 +245,7 @@ if __name__ == '__main__': with right_gripper_value.get_lock(): right_gripper_value.value = tele_data.right_pinch_value else: - pass + pass # high level control if args.xr_mode == 'controller' and not args.debug: @@ -381,7 +389,11 @@ if __name__ == '__main__': "qpos": current_body_action, }, } - recorder.add_item(colors=colors, depths=depths, states=states, actions=actions) + if args.is_use_sim: + sim_state = sim_state_subscriber.read_data() + recorder.add_item(colors=colors, depths=depths, states=states, actions=actions, sim_state=sim_state) + else: + recorder.add_item(colors=colors, depths=depths, states=states, actions=actions) current_time = time.time() time_elapsed = current_time - start_time @@ -401,4 +413,4 @@ if __name__ == '__main__': if args.record: recorder.close() logger_mp.info("Finally, exiting program...") - exit(0) \ No newline at end of file + exit(0) diff --git a/teleop/utils/data/-home-unitree-Code-unitree-opject-unitree_sim_isaaclab-.txt b/teleop/utils/data/-home-unitree-Code-unitree-opject-unitree_sim_isaaclab-.txt new file mode 100644 index 0000000..5e55b46 --- /dev/null +++ b/teleop/utils/data/-home-unitree-Code-unitree-opject-unitree_sim_isaaclab-.txt @@ -0,0 +1 @@ +/home/unitree/Code/unitree-opject/unitree_sim_isaaclab/data \ No newline at end of file diff --git a/teleop/utils/episode_writer.py b/teleop/utils/episode_writer.py index 5cc4830..ea59faf 100644 --- a/teleop/utils/episode_writer.py +++ b/teleop/utils/episode_writer.py @@ -71,6 +71,7 @@ class EpisodeWriter(): "left_ee": [], "right_ee": [], }, + "sim_state": "" } def text_desc(self): self.text = { @@ -113,7 +114,7 @@ class EpisodeWriter(): logger_mp.info(f"==> New episode created: {self.episode_dir}") return True # Return True if the episode is successfully created - def add_item(self, colors, depths=None, states=None, actions=None, tactiles=None, audios=None): + def add_item(self, colors, depths=None, states=None, actions=None, tactiles=None, audios=None,sim_state=None): # Increment the item ID self.item_id += 1 # Create the item data dictionary @@ -125,6 +126,7 @@ class EpisodeWriter(): 'actions': actions, 'tactiles': tactiles, 'audios': audios, + 'sim_state': sim_state, } # Enqueue the item data self.item_data_queue.put(item_data) diff --git a/teleop/utils/server_api.py b/teleop/utils/server_api.py new file mode 100644 index 0000000..98bfb4b --- /dev/null +++ b/teleop/utils/server_api.py @@ -0,0 +1,8 @@ +# service name +SERVICE_NAME = "get_sim_state" + +# api version +API_VERSION = "1.0.0.1" + +# api id +API_ID_GET_SIM_STATE = 1008 \ No newline at end of file diff --git a/teleop/utils/sim_state_client.py b/teleop/utils/sim_state_client.py new file mode 100644 index 0000000..972824b --- /dev/null +++ b/teleop/utils/sim_state_client.py @@ -0,0 +1,48 @@ +import time +import json +from unitree_sdk2py.core.channel import ChannelFactoryInitialize +from unitree_sdk2py.rpc.client import Client +from utils.server_api import SERVICE_NAME, API_VERSION, API_ID_GET_SIM_STATE + + +class SimStateClient(Client): + def __init__(self, enableLease: bool = False): + super().__init__(SERVICE_NAME, enableLease) + + def Init(self): + self._RegistApi(API_ID_GET_SIM_STATE, 0) + self._SetApiVerson(API_VERSION) + + def GetSimState_client_call(self): + c, d = self._Call(API_ID_GET_SIM_STATE, json.dumps("")) + return c, d + + + +if __name__ == "__main__": + # initialize channel factory. + ChannelFactoryInitialize(0) + + # create client + client = SimStateClient() + client.Init() + client.SetTimeout(5.0) + + # get server version + code, serverApiVersion = client.GetServerApiVersion() + print("server api version:", serverApiVersion) + + # wait lease applied + client.WaitLeaseApplied() + print("lease applied") + # test api + while True: + try: + c, d = client.GetSimState_client_call() + print("client get sim state ret:", c) + print("sim state data:", d) + except Exception as e: + print("Error getting sim state:", e) + time.sleep(1.0) + + diff --git a/teleop/utils/sim_state_topic.py b/teleop/utils/sim_state_topic.py new file mode 100644 index 0000000..dc5db54 --- /dev/null +++ b/teleop/utils/sim_state_topic.py @@ -0,0 +1,285 @@ +# Copyright (c) 2025, Unitree Robotics Co., Ltd. All Rights Reserved. +# License: Apache License, Version 2.0 +""" +Simple sim state subscriber class +Subscribe to rt/sim_state_cmd topic and write to shared memory +""" + +import threading +import time +import json +from multiprocessing import shared_memory +from typing import Any, Dict, Optional +from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize +from unitree_sdk2py.idl.std_msgs.msg.dds_ import String_ + + +class SharedMemoryManager: + """Shared memory manager""" + + def __init__(self, name: str = None, size: int = 512): + """Initialize shared memory manager + + Args: + name: shared memory name, if None, create new one + size: shared memory size (bytes) + """ + self.size = size + self.lock = threading.RLock() # reentrant lock + + if name: + try: + self.shm = shared_memory.SharedMemory(name=name) + self.shm_name = name + self.created = False + except FileNotFoundError: + self.shm = shared_memory.SharedMemory(create=True, size=size) + self.shm_name = self.shm.name + self.created = True + else: + self.shm = shared_memory.SharedMemory(create=True, size=size) + self.shm_name = self.shm.name + self.created = True + + def write_data(self, data: Dict[str, Any]) -> bool: + """Write data to shared memory + + Args: + data: data to write + + Returns: + bool: write success or not + """ + try: + with self.lock: + json_str = json.dumps(data) + json_bytes = json_str.encode('utf-8') + + if len(json_bytes) > self.size - 8: # reserve 8 bytes for length and timestamp + print(f"Warning: Data too large for shared memory ({len(json_bytes)} > {self.size - 8})") + return False + + # write timestamp (4 bytes) and data length (4 bytes) + timestamp = int(time.time()) & 0xFFFFFFFF # 32-bit timestamp, use bitmask to ensure in range + self.shm.buf[0:4] = timestamp.to_bytes(4, 'little') + self.shm.buf[4:8] = len(json_bytes).to_bytes(4, 'little') + + # write data + self.shm.buf[8:8+len(json_bytes)] = json_bytes + return True + + except Exception as e: + print(f"Error writing to shared memory: {e}") + return False + + def read_data(self) -> Optional[Dict[str, Any]]: + """Read data from shared memory + + Returns: + Dict[str, Any]: read data dictionary, return None if failed + """ + try: + with self.lock: + # read timestamp and data length + timestamp = int.from_bytes(self.shm.buf[0:4], 'little') + data_len = int.from_bytes(self.shm.buf[4:8], 'little') + + if data_len == 0: + return None + + # read data + json_bytes = bytes(self.shm.buf[8:8+data_len]) + data = json.loads(json_bytes.decode('utf-8')) + data['_timestamp'] = timestamp # add timestamp information + return data + + except Exception as e: + print(f"Error reading from shared memory: {e}") + return None + + def get_name(self) -> str: + """Get shared memory name""" + return self.shm_name + + def cleanup(self): + """Clean up shared memory""" + if hasattr(self, 'shm') and self.shm: + self.shm.close() + if self.created: + try: + self.shm.unlink() + except: + pass + + def __del__(self): + """Destructor""" + self.cleanup() + + +class SimStateSubscriber: + """Simple sim state subscriber class""" + + def __init__(self, shm_name: str = "sim_state_cmd_data", shm_size: int = 3096): + """Initialize the subscriber + + Args: + shm_name: shared memory name + shm_size: shared memory size + """ + self.shm_name = shm_name + self.shm_size = shm_size + self.running = False + self.subscriber = None + self.subscribe_thread = None + self.shared_memory = None + + # initialize shared memory + self._setup_shared_memory() + + print(f"[SimStateSubscriber] Initialized with shared memory: {shm_name}") + + def _setup_shared_memory(self): + """Setup shared memory""" + try: + self.shared_memory = SharedMemoryManager(self.shm_name, self.shm_size) + print(f"[SimStateSubscriber] Shared memory setup successfully") + except Exception as e: + print(f"[SimStateSubscriber] Failed to setup shared memory: {e}") + + def _message_handler(self, msg: String_): + """Handle received messages""" + try: + # parse the received message + data = json.loads(msg.data) + # print(f"[SimStateSubscriber] Received message: {data}") + + # write to shared memory + if self.shared_memory: + self.shared_memory.write_data(data) + + except Exception as e: + print(f"[SimStateSubscriber] Error processing message: {e}") + + def _subscribe_loop(self): + """Subscribe loop thread""" + print(f"[SimStateSubscriber] Subscribe thread started") + + while self.running: + try: + time.sleep(0.001) # keep thread alive + except Exception as e: + print(f"[SimStateSubscriber] Error in subscribe loop: {e}") + time.sleep(0.01) + + print(f"[SimStateSubscriber] Subscribe thread stopped") + + def start_subscribe(self): + """Start subscribing""" + if self.running: + print(f"[SimStateSubscriber] Already running") + return + + try: + # setup subscriber + self.subscriber = ChannelSubscriber("rt/sim_state", String_) + self.subscriber.Init(self._message_handler, 10) + + self.running = True + print(f"[SimStateSubscriber] Subscriber initialized") + # start subscribe thread + self.subscribe_thread = threading.Thread(target=self._subscribe_loop) + self.subscribe_thread.daemon = True + self.subscribe_thread.start() + + print(f"[SimStateSubscriber] Started subscribing to rt/sim_state_cmd") + + except Exception as e: + print(f"[SimStateSubscriber] Failed to start subscribing: {e}") + self.running = False + + def stop_subscribe(self): + """Stop subscribing""" + if not self.running: + return + + print(f"[SimStateSubscriber] Stopping subscriber...") + self.running = False + + # wait for thread to finish + if self.subscribe_thread: + self.subscribe_thread.join(timeout=1.0) + + print(f"[SimStateSubscriber] Subscriber stopped") + + def read_data(self) -> Optional[Dict[str, Any]]: + """Read data from shared memory + + Returns: + Dict: received data, None if no data or error + """ + try: + if self.shared_memory: + return self.shared_memory.read_data() + return None + except Exception as e: + print(f"[SimStateSubscriber] Error reading data: {e}") + return None + + def is_running(self) -> bool: + """Check if subscriber is running""" + return self.running + + def __del__(self): + """Destructor""" + self.stop_subscribe() + + +# convenient functions +def create_sim_state_subscriber(shm_name: str = "sim_state_cmd_data", shm_size: int = 3096) -> SimStateSubscriber: + """Create a sim state subscriber instance + + Args: + shm_name: shared memory name + shm_size: shared memory size + + Returns: + SimStateSubscriber: subscriber instance + """ + return SimStateSubscriber(shm_name, shm_size) + + +def start_sim_state_subscribe(shm_name: str = "sim_state_cmd_data", shm_size: int = 3096) -> SimStateSubscriber: + """Start sim state subscribing + + Args: + shm_name: shared memory name + shm_size: shared memory size + + Returns: + SimStateSubscriber: started subscriber instance + """ + subscriber = create_sim_state_subscriber(shm_name, shm_size) + subscriber.start_subscribe() + return subscriber + + +# if __name__ == "__main__": +# # example usage +# print("Starting sim state subscriber...") +# ChannelFactoryInitialize(0) +# # create and start subscriber +# subscriber = start_sim_state_subscribe() + +# try: +# # keep running and check for data +# while True: +# data = subscriber.read_data() +# if data: +# print(f"Read data: {data}") +# time.sleep(1) + +# except KeyboardInterrupt: +# print("\nInterrupted by user") +# finally: +# subscriber.stop_subscribe() +# print("Subscriber stopped") \ No newline at end of file