You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

274 lines
11 KiB

#!/usr/bin/env python3
"""
Convert G1 motion capture data from joblib pickle to C++ readable formats
Converts motion sequences with joint positions, velocities, and full body kinematics
"""
import sys
import os
def convert_motion_data(pkl_file, base_output_dir=None):
"""Convert the motion pickle file to C++ readable formats"""
# Create organized folder structure: reference/{pkl_name}/
pkl_name = os.path.splitext(os.path.basename(pkl_file))[0] # Extract filename without extension
if base_output_dir is None:
# Default: put in reference folder structure
base_output_dir = os.path.join(os.path.dirname(pkl_file), pkl_name)
print(f"Converting motion data from: {pkl_file}")
print(f"Output directory structure: {base_output_dir}/")
# Load the data
try:
import joblib
data = joblib.load(pkl_file)
print("✓ Successfully loaded with joblib")
except ImportError:
print("✗ joblib not available, trying pickle...")
try:
import pickle
with open(pkl_file, 'rb') as f:
data = pickle.load(f)
print("✓ Successfully loaded with pickle")
except Exception as e:
print(f"✗ Failed to load: {e}")
return False
except Exception as e:
print(f"✗ Failed to load with joblib: {e}")
return False
# Create base output directory
os.makedirs(base_output_dir, exist_ok=True)
print(f"\nFound {len(data)} motion sequences:")
for motion_name in data.keys():
print(f" - {motion_name}")
# Convert each motion sequence
success_count = 0
for motion_name, motion_data in data.items():
print(f"\nProcessing: {motion_name}")
# Create individual folder for this motion
motion_output_dir = os.path.join(base_output_dir, motion_name)
print(f"Creating individual folder for this motion: {motion_output_dir}")
os.makedirs(motion_output_dir, exist_ok=True)
if convert_single_motion(motion_name, motion_data, motion_output_dir):
success_count += 1
# Create summary file in base directory
create_summary_file(data, base_output_dir)
print(f"\n✓ Successfully converted {success_count}/{len(data)} motions")
print(f"Output files saved to: {base_output_dir}/")
# Extract counts from first motion for summary
joint_count = None
body_count = None
if data:
first_motion = next(iter(data.values()))
joint_count = first_motion['joint_pos'].shape[1]
body_count = first_motion['body_pos_w'].shape[1]
return success_count > 0, len(data), joint_count, body_count
def convert_single_motion(motion_name, motion_data, output_dir):
"""Convert a single motion sequence to various formats"""
try:
# Extract ALL available data arrays
joint_pos = motion_data['joint_pos'] # Shape: (timesteps, 29)
joint_vel = motion_data['joint_vel'] # Shape: (timesteps, 29)
body_pos_w = motion_data['body_pos_w'] # Shape: (timesteps, 14, 3)
body_quat_w = motion_data['body_quat_w'] # Shape: (timesteps, 14, 4)
body_lin_vel_w = motion_data['body_lin_vel_w'] # Shape: (timesteps, 14, 3)
body_ang_vel_w = motion_data['body_ang_vel_w'] # Shape: (timesteps, 14, 3)
timesteps = joint_pos.shape[0]
print(f" Timesteps: {timesteps}, Joints: {joint_pos.shape[1]}, Body parts: {body_pos_w.shape[1]}")
# 1. Save joint data as CSV
joint_pos_file = os.path.join(output_dir, "joint_pos.csv")
save_array_as_csv(joint_pos, joint_pos_file,
[f"joint_{i}" for i in range(joint_pos.shape[1])])
joint_vel_file = os.path.join(output_dir, "joint_vel.csv")
save_array_as_csv(joint_vel, joint_vel_file,
[f"joint_vel_{i}" for i in range(joint_vel.shape[1])])
# 2. Save body position data (reshape to 2D for CSV)
body_pos_reshaped = body_pos_w.reshape(timesteps, -1) # (timesteps, 14*3)
body_pos_file = os.path.join(output_dir, "body_pos.csv")
body_pos_headers = [f"body_{i//3}_{'xyz'[i%3]}" for i in range(body_pos_reshaped.shape[1])]
save_array_as_csv(body_pos_reshaped, body_pos_file, body_pos_headers)
# 3. Save body quaternion data (reshape to 2D for CSV)
body_quat_reshaped = body_quat_w.reshape(timesteps, -1) # (timesteps, 14*4)
body_quat_file = os.path.join(output_dir, "body_quat.csv")
body_quat_headers = [f"body_{i//4}_{'wxyz'[i%4]}" for i in range(body_quat_reshaped.shape[1])]
save_array_as_csv(body_quat_reshaped, body_quat_file, body_quat_headers)
# 4. Save body linear velocity data
body_lin_vel_reshaped = body_lin_vel_w.reshape(timesteps, -1) # (timesteps, 14*3)
body_lin_vel_file = os.path.join(output_dir, "body_lin_vel.csv")
body_lin_vel_headers = [f"body_{i//3}_vel_{'xyz'[i%3]}" for i in range(body_lin_vel_reshaped.shape[1])]
save_array_as_csv(body_lin_vel_reshaped, body_lin_vel_file, body_lin_vel_headers)
# 5. Save body angular velocity data
body_ang_vel_reshaped = body_ang_vel_w.reshape(timesteps, -1) # (timesteps, 14*3)
body_ang_vel_file = os.path.join(output_dir, "body_ang_vel.csv")
body_ang_vel_headers = [f"body_{i//3}_angvel_{'xyz'[i%3]}" for i in range(body_ang_vel_reshaped.shape[1])]
save_array_as_csv(body_ang_vel_reshaped, body_ang_vel_file, body_ang_vel_headers)
# 6. Save metadata
metadata_file = os.path.join(output_dir, "metadata.txt")
save_metadata(motion_name, motion_data, metadata_file)
# 7. Save detailed info
info_file = os.path.join(output_dir, "info.txt")
save_motion_info(motion_name, motion_data, info_file)
print(f" ✓ Saved 7 files for {motion_name} (joints + full body kinematics)")
return True
except Exception as e:
print(f" ✗ Error processing {motion_name}: {e}")
return False
def save_array_as_csv(array, filename, headers=None):
"""Save numpy array as CSV file"""
import numpy as np
with open(filename, 'w') as f:
# Write header
if headers:
f.write(",".join(headers) + "\n")
else:
f.write(",".join([f"col_{i}" for i in range(array.shape[1])]) + "\n")
# Write data
for row in array:
f.write(",".join([f"{val:.6f}" for val in row]) + "\n")
def save_metadata(motion_name, motion_data, filename):
"""Save metadata and indices information"""
with open(filename, 'w') as f:
f.write(f"Metadata for: {motion_name}\n")
f.write("=" * 30 + "\n\n")
# Save body indexes if available
if '_body_indexes' in motion_data:
f.write("Body part indexes:\n")
f.write(f"{motion_data['_body_indexes']}\n\n")
# Save total timestep count
if 'time_step_total' in motion_data:
f.write(f"Total timesteps: {motion_data['time_step_total']}\n\n")
# Data summary
f.write("Data arrays summary:\n")
for key, value in motion_data.items():
if hasattr(value, 'shape'):
f.write(f" {key}: {value.shape} ({value.dtype})\n")
def save_motion_info(motion_name, motion_data, filename):
"""Save detailed motion information"""
with open(filename, 'w') as f:
f.write(f"Motion Information: {motion_name}\n")
f.write("=" * 50 + "\n\n")
for key, value in motion_data.items():
f.write(f"{key}:\n")
if hasattr(value, 'shape'):
f.write(f" Shape: {value.shape}\n")
f.write(f" Dtype: {value.dtype}\n")
if value.size > 0:
flat_vals = value.flatten()
f.write(f" Range: [{flat_vals.min():.3f}, {flat_vals.max():.3f}]\n")
f.write(f" Sample: {flat_vals[:5]}\n")
else:
f.write(f" Value: {value}\n")
f.write("\n")
def create_summary_file(data, output_dir):
"""Create a summary file with all motion information"""
summary_file = os.path.join(output_dir, "motion_summary.txt")
with open(summary_file, 'w') as f:
f.write("G1 Motion Capture Data Summary\n")
f.write("=" * 40 + "\n\n")
f.write(f"Total motion sequences: {len(data)}\n\n")
# Detailed motion list
f.write("Detailed motion list:\n")
for motion_name, motion_data in data.items():
joint_pos = motion_data['joint_pos']
f.write(f" {motion_name}:\n")
f.write(f" Timesteps: {joint_pos.shape[0]}\n")
f.write(f" Joints: {joint_pos.shape[1]}\n")
f.write(f" Body parts: {motion_data['body_pos_w'].shape[1]}\n")
f.write("\n")
def main():
if len(sys.argv) < 2:
print("Usage: python3 convert_motions.py <pkl_file> [output_base_dir]")
print("Examples:")
print(" python3 convert_motions.py bones_072925_test.pkl")
print(" python3 convert_motions.py bones_072925_test.pkl custom_output/")
print("")
print("Default output structure: reference/{pkl_name}/{motion_name}/")
return
pkl_file = sys.argv[1]
output_base_dir = sys.argv[2] if len(sys.argv) > 2 else None
if not os.path.exists(pkl_file):
print(f"Error: File not found: {pkl_file}")
return
# Extract pkl name for output messages
pkl_name = os.path.splitext(os.path.basename(pkl_file))[0]
print("G1 Motion Data Converter")
print("========================")
success, motion_count, joint_count, body_count = convert_motion_data(pkl_file, output_base_dir)
if success:
print("\n✓ Conversion completed successfully!")
print("\nExtracted data for each motion:")
print(f"- Joint positions & velocities ({joint_count} joints)")
print(f"- Body positions in world coordinates ({body_count} body parts)")
print("- Body orientations (quaternions)")
print("- Body linear & angular velocities")
print("- Metadata and body part indices")
print("\nNext steps:")
print(f"1. Build C++ reader: make motion_data_reader")
print(f"2. Test reading: ./bin/motion_data_reader reference/{pkl_name}/")
print("3. Use full kinematic data in your G1 control programs")
print("\nFile structure created:")
print(f"reference/{pkl_name}/")
print("├── [motion_name_1]/")
print("│ ├── joint_pos.csv")
print("│ ├── joint_vel.csv")
print("│ ├── body_pos.csv")
print("│ ├── body_quat.csv")
print("│ ├── body_lin_vel.csv")
print("│ ├── body_ang_vel.csv")
print("│ ├── metadata.txt")
print("│ └── info.txt")
print("├── [motion_name_2]/")
print("├── motion_summary.txt")
print(f"└── ... ({motion_count} motion folders total)")
else:
print("\n✗ Conversion failed")
if __name__ == "__main__":
main()