Browse Source

update episode_writer ,add sim_state

main
wei.li 9 months ago
parent
commit
7c1455c7fb
  1. 2
      teleop/robot_control/robot_arm.py
  2. 22
      teleop/teleop_hand_and_arm.py
  3. 1
      teleop/utils/data/-home-unitree-Code-unitree-opject-unitree_sim_isaaclab-.txt
  4. 4
      teleop/utils/episode_writer.py
  5. 8
      teleop/utils/server_api.py
  6. 48
      teleop/utils/sim_state_client.py
  7. 285
      teleop/utils/sim_state_topic.py

2
teleop/robot_control/robot_arm.py

@ -81,7 +81,7 @@ class G1_29_ArmController:
self._gradual_time = None
# initialize lowcmd publisher and lowstate subscriber
ChannelFactoryInitialize(0)
ChannelFactoryInitialize(1)
if self.debug_mode:
self.lowcmd_publisher = ChannelPublisher(kTopicLowCommand_Debug, hg_LowCmd)
else:

22
teleop/teleop_hand_and_arm.py

@ -47,10 +47,10 @@ def on_press(key):
if key == 'r':
start_signal = True
logger_mp.info("Program start signal received.")
elif key == 'q':
elif key == 's':
stop_listening()
running = False
elif key == 's':
elif key == 'q':
should_toggle_recording = True
threading.Thread(target=listen_keyboard, kwargs={"on_press": on_press}, daemon=True).start()
@ -172,6 +172,14 @@ if __name__ == '__main__':
if args.is_use_sim:
reset_pose_publisher = ChannelPublisher("rt/reset_pose/cmd", String_)
reset_pose_publisher.Init()
from teleop.utils.sim_state_topic import start_sim_state_subscribe
sim_state_subscriber = start_sim_state_subscribe()
# from teleop.utils.sim_state_client import SimStateClient
# sim_state_client = SimStateClient()
# sim_state_client.Init()
# sim_state_client.SetTimeout(5.0)
# sim_state_client.WaitLeaseApplied()
# c=1
# xr mode
if args.xr_mode == 'controller' and not args.debug:
from unitree_sdk2py.g1.loco.g1_loco_client import LocoClient
@ -196,12 +204,12 @@ if __name__ == '__main__':
tv_resized_image = cv2.resize(tv_img_array, (tv_img_shape[1] // 2, tv_img_shape[0] // 2))
cv2.imshow("record image", tv_resized_image)
key = cv2.waitKey(1) & 0xFF
if key == ord('q'):
if key == ord('s'):
stop_listening()
running = False
if args.is_use_sim:
publish_reset_category(2, reset_pose_publisher)
elif key == ord('s'):
elif key == ord('q'):
should_toggle_recording = True
elif key == ord('a'):
if args.is_use_sim:
@ -381,7 +389,11 @@ if __name__ == '__main__':
"qpos": current_body_action,
},
}
recorder.add_item(colors=colors, depths=depths, states=states, actions=actions)
if args.is_use_sim:
sim_state = sim_state_subscriber.read_data()
recorder.add_item(colors=colors, depths=depths, states=states, actions=actions, sim_state=sim_state)
else:
recorder.add_item(colors=colors, depths=depths, states=states, actions=actions)
current_time = time.time()
time_elapsed = current_time - start_time

1
teleop/utils/data/-home-unitree-Code-unitree-opject-unitree_sim_isaaclab-.txt

@ -0,0 +1 @@
/home/unitree/Code/unitree-opject/unitree_sim_isaaclab/data

4
teleop/utils/episode_writer.py

@ -71,6 +71,7 @@ class EpisodeWriter():
"left_ee": [],
"right_ee": [],
},
"sim_state": ""
}
def text_desc(self):
self.text = {
@ -113,7 +114,7 @@ class EpisodeWriter():
logger_mp.info(f"==> New episode created: {self.episode_dir}")
return True # Return True if the episode is successfully created
def add_item(self, colors, depths=None, states=None, actions=None, tactiles=None, audios=None):
def add_item(self, colors, depths=None, states=None, actions=None, tactiles=None, audios=None,sim_state=None):
# Increment the item ID
self.item_id += 1
# Create the item data dictionary
@ -125,6 +126,7 @@ class EpisodeWriter():
'actions': actions,
'tactiles': tactiles,
'audios': audios,
'sim_state': sim_state,
}
# Enqueue the item data
self.item_data_queue.put(item_data)

8
teleop/utils/server_api.py

@ -0,0 +1,8 @@
# service name
SERVICE_NAME = "get_sim_state"
# api version
API_VERSION = "1.0.0.1"
# api id
API_ID_GET_SIM_STATE = 1008

48
teleop/utils/sim_state_client.py

@ -0,0 +1,48 @@
import time
import json
from unitree_sdk2py.core.channel import ChannelFactoryInitialize
from unitree_sdk2py.rpc.client import Client
from utils.server_api import SERVICE_NAME, API_VERSION, API_ID_GET_SIM_STATE
class SimStateClient(Client):
def __init__(self, enableLease: bool = False):
super().__init__(SERVICE_NAME, enableLease)
def Init(self):
self._RegistApi(API_ID_GET_SIM_STATE, 0)
self._SetApiVerson(API_VERSION)
def GetSimState_client_call(self):
c, d = self._Call(API_ID_GET_SIM_STATE, json.dumps(""))
return c, d
if __name__ == "__main__":
# initialize channel factory.
ChannelFactoryInitialize(0)
# create client
client = SimStateClient()
client.Init()
client.SetTimeout(5.0)
# get server version
code, serverApiVersion = client.GetServerApiVersion()
print("server api version:", serverApiVersion)
# wait lease applied
client.WaitLeaseApplied()
print("lease applied")
# test api
while True:
try:
c, d = client.GetSimState_client_call()
print("client get sim state ret:", c)
print("sim state data:", d)
except Exception as e:
print("Error getting sim state:", e)
time.sleep(1.0)

285
teleop/utils/sim_state_topic.py

@ -0,0 +1,285 @@
# Copyright (c) 2025, Unitree Robotics Co., Ltd. All Rights Reserved.
# License: Apache License, Version 2.0
"""
Simple sim state subscriber class
Subscribe to rt/sim_state_cmd topic and write to shared memory
"""
import threading
import time
import json
from multiprocessing import shared_memory
from typing import Any, Dict, Optional
from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize
from unitree_sdk2py.idl.std_msgs.msg.dds_ import String_
class SharedMemoryManager:
"""Shared memory manager"""
def __init__(self, name: str = None, size: int = 512):
"""Initialize shared memory manager
Args:
name: shared memory name, if None, create new one
size: shared memory size (bytes)
"""
self.size = size
self.lock = threading.RLock() # reentrant lock
if name:
try:
self.shm = shared_memory.SharedMemory(name=name)
self.shm_name = name
self.created = False
except FileNotFoundError:
self.shm = shared_memory.SharedMemory(create=True, size=size)
self.shm_name = self.shm.name
self.created = True
else:
self.shm = shared_memory.SharedMemory(create=True, size=size)
self.shm_name = self.shm.name
self.created = True
def write_data(self, data: Dict[str, Any]) -> bool:
"""Write data to shared memory
Args:
data: data to write
Returns:
bool: write success or not
"""
try:
with self.lock:
json_str = json.dumps(data)
json_bytes = json_str.encode('utf-8')
if len(json_bytes) > self.size - 8: # reserve 8 bytes for length and timestamp
print(f"Warning: Data too large for shared memory ({len(json_bytes)} > {self.size - 8})")
return False
# write timestamp (4 bytes) and data length (4 bytes)
timestamp = int(time.time()) & 0xFFFFFFFF # 32-bit timestamp, use bitmask to ensure in range
self.shm.buf[0:4] = timestamp.to_bytes(4, 'little')
self.shm.buf[4:8] = len(json_bytes).to_bytes(4, 'little')
# write data
self.shm.buf[8:8+len(json_bytes)] = json_bytes
return True
except Exception as e:
print(f"Error writing to shared memory: {e}")
return False
def read_data(self) -> Optional[Dict[str, Any]]:
"""Read data from shared memory
Returns:
Dict[str, Any]: read data dictionary, return None if failed
"""
try:
with self.lock:
# read timestamp and data length
timestamp = int.from_bytes(self.shm.buf[0:4], 'little')
data_len = int.from_bytes(self.shm.buf[4:8], 'little')
if data_len == 0:
return None
# read data
json_bytes = bytes(self.shm.buf[8:8+data_len])
data = json.loads(json_bytes.decode('utf-8'))
data['_timestamp'] = timestamp # add timestamp information
return data
except Exception as e:
print(f"Error reading from shared memory: {e}")
return None
def get_name(self) -> str:
"""Get shared memory name"""
return self.shm_name
def cleanup(self):
"""Clean up shared memory"""
if hasattr(self, 'shm') and self.shm:
self.shm.close()
if self.created:
try:
self.shm.unlink()
except:
pass
def __del__(self):
"""Destructor"""
self.cleanup()
class SimStateSubscriber:
"""Simple sim state subscriber class"""
def __init__(self, shm_name: str = "sim_state_cmd_data", shm_size: int = 3096):
"""Initialize the subscriber
Args:
shm_name: shared memory name
shm_size: shared memory size
"""
self.shm_name = shm_name
self.shm_size = shm_size
self.running = False
self.subscriber = None
self.subscribe_thread = None
self.shared_memory = None
# initialize shared memory
self._setup_shared_memory()
print(f"[SimStateSubscriber] Initialized with shared memory: {shm_name}")
def _setup_shared_memory(self):
"""Setup shared memory"""
try:
self.shared_memory = SharedMemoryManager(self.shm_name, self.shm_size)
print(f"[SimStateSubscriber] Shared memory setup successfully")
except Exception as e:
print(f"[SimStateSubscriber] Failed to setup shared memory: {e}")
def _message_handler(self, msg: String_):
"""Handle received messages"""
try:
# parse the received message
data = json.loads(msg.data)
# print(f"[SimStateSubscriber] Received message: {data}")
# write to shared memory
if self.shared_memory:
self.shared_memory.write_data(data)
except Exception as e:
print(f"[SimStateSubscriber] Error processing message: {e}")
def _subscribe_loop(self):
"""Subscribe loop thread"""
print(f"[SimStateSubscriber] Subscribe thread started")
while self.running:
try:
time.sleep(0.001) # keep thread alive
except Exception as e:
print(f"[SimStateSubscriber] Error in subscribe loop: {e}")
time.sleep(0.01)
print(f"[SimStateSubscriber] Subscribe thread stopped")
def start_subscribe(self):
"""Start subscribing"""
if self.running:
print(f"[SimStateSubscriber] Already running")
return
try:
# setup subscriber
self.subscriber = ChannelSubscriber("rt/sim_state", String_)
self.subscriber.Init(self._message_handler, 10)
self.running = True
print(f"[SimStateSubscriber] Subscriber initialized")
# start subscribe thread
self.subscribe_thread = threading.Thread(target=self._subscribe_loop)
self.subscribe_thread.daemon = True
self.subscribe_thread.start()
print(f"[SimStateSubscriber] Started subscribing to rt/sim_state_cmd")
except Exception as e:
print(f"[SimStateSubscriber] Failed to start subscribing: {e}")
self.running = False
def stop_subscribe(self):
"""Stop subscribing"""
if not self.running:
return
print(f"[SimStateSubscriber] Stopping subscriber...")
self.running = False
# wait for thread to finish
if self.subscribe_thread:
self.subscribe_thread.join(timeout=1.0)
print(f"[SimStateSubscriber] Subscriber stopped")
def read_data(self) -> Optional[Dict[str, Any]]:
"""Read data from shared memory
Returns:
Dict: received data, None if no data or error
"""
try:
if self.shared_memory:
return self.shared_memory.read_data()
return None
except Exception as e:
print(f"[SimStateSubscriber] Error reading data: {e}")
return None
def is_running(self) -> bool:
"""Check if subscriber is running"""
return self.running
def __del__(self):
"""Destructor"""
self.stop_subscribe()
# convenient functions
def create_sim_state_subscriber(shm_name: str = "sim_state_cmd_data", shm_size: int = 3096) -> SimStateSubscriber:
"""Create a sim state subscriber instance
Args:
shm_name: shared memory name
shm_size: shared memory size
Returns:
SimStateSubscriber: subscriber instance
"""
return SimStateSubscriber(shm_name, shm_size)
def start_sim_state_subscribe(shm_name: str = "sim_state_cmd_data", shm_size: int = 3096) -> SimStateSubscriber:
"""Start sim state subscribing
Args:
shm_name: shared memory name
shm_size: shared memory size
Returns:
SimStateSubscriber: started subscriber instance
"""
subscriber = create_sim_state_subscriber(shm_name, shm_size)
subscriber.start_subscribe()
return subscriber
# if __name__ == "__main__":
# # example usage
# print("Starting sim state subscriber...")
# ChannelFactoryInitialize(0)
# # create and start subscriber
# subscriber = start_sim_state_subscribe()
# try:
# # keep running and check for data
# while True:
# data = subscriber.read_data()
# if data:
# print(f"Read data: {data}")
# time.sleep(1)
# except KeyboardInterrupt:
# print("\nInterrupted by user")
# finally:
# subscriber.stop_subscribe()
# print("Subscriber stopped")
Loading…
Cancel
Save