You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

224 lines
7.8 KiB

"""Builders for ZMQ wire-format messages on the 'command', 'planner', and 'pose' topics.
Message layout: [topic_bytes][1024-byte JSON header][packed binary payload].
The header describes field names, dtypes, and shapes so the receiver can
deserialize without out-of-band schema knowledge.
"""
import json
import struct
from typing import Sequence
import numpy as np
HEADER_SIZE = 1280
def _build_header(fields: list, version: int = 1, count: int = 1) -> bytes:
header = {
"v": version,
"endian": "le",
"count": count,
"fields": fields,
}
header_json = json.dumps(header, separators=(",", ":")).encode("utf-8")
if len(header_json) > HEADER_SIZE:
raise ValueError(f"Header too large: {len(header_json)} > {HEADER_SIZE}")
return header_json.ljust(HEADER_SIZE, b"\x00")
def build_command_message(
start: bool, stop: bool, planner: bool, delta_heading: float | None = None
) -> bytes:
"""
Assemble a 'command' topic message:
- start: u8 (1=start control)
- stop: u8 (1=stop control)
- planner: u8 (1=planner mode, 0=streamed motion)
- delta_heading: f32 (optional, yaw relative to heading command in radians)
Returns: bytes ready to send via socket.send()
"""
fields = [
{"name": "start", "dtype": "u8", "shape": [1]},
{"name": "stop", "dtype": "u8", "shape": [1]},
{"name": "planner", "dtype": "u8", "shape": [1]},
]
payload = b"".join(
(
struct.pack("B", 1 if start else 0),
struct.pack("B", 1 if stop else 0),
struct.pack("B", 1 if planner else 0),
)
)
if delta_heading is not None:
# Append delta_heading field to header and payload
fields.append({"name": "delta_heading", "dtype": "f32", "shape": [1]})
payload += struct.pack("<f", float(delta_heading))
header = _build_header(fields, version=1, count=1)
return b"command" + header + payload
def build_planner_message(
mode: int,
movement: Sequence[float],
facing: Sequence[float],
speed: float = -1.0,
height: float = -1.0,
upper_body_position: Sequence[float] | None = None,
upper_body_velocity: Sequence[float] | None = None,
left_hand_position: Sequence[float] | None = None,
right_hand_position: Sequence[float] | None = None,
vr_3pt_position: Sequence[float] | None = None,
vr_3pt_orientation: Sequence[float] | None = None,
vr_3pt_compliance: Sequence[float] | None = None,
) -> bytes:
"""
Assemble a 'planner' topic message:
- mode: i32 (LocomotionMode enum)
- movement: f32[3] (x,y,z)
- facing: f32[3] (x,y,z)
- speed: f32 (optional, -1 for default)
- height: f32 (optional, -1 for default)
Returns: bytes ready to send via socket.send()
"""
if len(movement) != 3:
raise ValueError("movement must have length 3")
if len(facing) != 3:
raise ValueError("facing must have length 3")
fields = [
{"name": "mode", "dtype": "i32", "shape": [1]},
{"name": "movement", "dtype": "f32", "shape": [3]},
{"name": "facing", "dtype": "f32", "shape": [3]},
{"name": "speed", "dtype": "f32", "shape": [1]},
{"name": "height", "dtype": "f32", "shape": [1]},
]
payload = b"".join(
(
struct.pack("<i", int(mode)),
struct.pack("<fff", float(movement[0]), float(movement[1]), float(movement[2])),
struct.pack("<fff", float(facing[0]), float(facing[1]), float(facing[2])),
struct.pack("<f", float(speed)),
struct.pack("<f", float(height)),
)
)
# Add upper body position and velocity to payload, optionally
if upper_body_position is not None:
fields.append(
{"name": "upper_body_position", "dtype": "f32", "shape": [len(upper_body_position)]}
)
for value in upper_body_position:
payload += struct.pack("<f", float(value))
if upper_body_velocity is not None:
fields.append(
{"name": "upper_body_velocity", "dtype": "f32", "shape": [len(upper_body_velocity)]}
)
for value in upper_body_velocity:
payload += struct.pack("<f", float(value))
if left_hand_position is not None:
fields.append(
{"name": "left_hand_joints", "dtype": "f32", "shape": [len(left_hand_position)]}
)
for value in left_hand_position:
payload += struct.pack("<f", float(value))
if right_hand_position is not None:
fields.append(
{"name": "right_hand_joints", "dtype": "f32", "shape": [len(right_hand_position)]}
)
for value in right_hand_position:
payload += struct.pack("<f", float(value))
if vr_3pt_position is not None:
fields.append({"name": "vr_position", "dtype": "f32", "shape": [len(vr_3pt_position)]})
for value in vr_3pt_position:
payload += struct.pack("<f", float(value))
if vr_3pt_orientation is not None:
fields.append(
{"name": "vr_orientation", "dtype": "f32", "shape": [len(vr_3pt_orientation)]}
)
for value in vr_3pt_orientation:
payload += struct.pack("<f", float(value))
if vr_3pt_compliance is not None:
fields.append({"name": "vr_compliance", "dtype": "f32", "shape": [len(vr_3pt_compliance)]})
for value in vr_3pt_compliance:
payload += struct.pack("<f", float(value))
header = _build_header(fields, version=1, count=1)
return b"planner" + header + payload
def pack_pose_message(pose_data: dict, topic: str = "pose", version: int = 3) -> bytes:
"""
Pack pose/action data into ZMQ message format:
[topic_prefix][1024-byte JSON header][concatenated binary fields]
This is a general-purpose function for packing numpy arrays into ZMQ messages.
Supports protocol versions 3 and 4.
Args:
pose_data: Dictionary containing numpy arrays to send
topic: Topic prefix string (default: "pose")
version: Protocol version (default: 3). Version 4 includes "count" field.
Returns:
Packed message as bytes
Example:
>>> data = {
... "token_state": np.array([1.0, 2.0], dtype=np.float32),
... "frame_index": np.array([0], dtype=np.int64)
... }
>>> msg = pack_pose_message(data, topic="pose", version=4)
"""
# Build fields list from pose_data
fields = []
binary_data = []
for key, value in pose_data.items():
if isinstance(value, np.ndarray):
# Determine dtype string
if value.dtype == np.float32:
dtype_str = "f32"
elif value.dtype == np.float64:
dtype_str = "f64"
elif value.dtype == np.int32:
dtype_str = "i32"
elif value.dtype == np.int64:
dtype_str = "i64"
elif value.dtype == bool:
dtype_str = "bool"
else:
# Default to f32, cast if needed
dtype_str = "f32"
value = value.astype(np.float32)
fields.append({"name": key, "dtype": dtype_str, "shape": list(value.shape)})
# Ensure contiguous and little-endian
if not value.flags["C_CONTIGUOUS"]:
value = np.ascontiguousarray(value)
if value.dtype.byteorder == ">":
value = value.astype(value.dtype.newbyteorder("<"))
binary_data.append(value.tobytes())
# Build header using common utility
header_bytes = _build_header(fields, version=version, count=1)
# Pack message: [topic][1024-byte header][binary data]
topic_bytes = topic.encode("utf-8")
data_bytes = b"".join(binary_data)
packed_message = topic_bytes + header_bytes + data_bytes
return packed_message