Browse Source

Add rotation I-term for wrist orientation drift correction

Adds integral correction in axis-angle space to actively correct wrist
rotation drift over time. Complements the existing rotation blend
dampening with closed-loop feedback.

- rotation_to_rotvec() and rotvec_to_rotation() helpers (refactored from scale_rotation)
- --ki-rot (default 0.3) and --i-rot-clamp (default 0.3 rad) CLI args
- Rotation error computed as rotvec(target_R @ actual_R^T), same leaky integrator as position
- Magnitude-based anti-windup clamping (preserves axis direction)
- Switched I-term FK from compute_fk to compute_fk_pose for full SE(3)
- Updated periodic logging to include rotation I-term stats

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
main
Joe DiPrima 1 month ago
parent
commit
83045b652a
  1. 121
      teleop/teleop_hand_and_arm.py

121
teleop/teleop_hand_and_arm.py

@ -66,22 +66,34 @@ def parse_r3_buttons(data):
'Start': bool(btn1 & 0x04), 'Start': bool(btn1 & 0x04),
} }
def scale_rotation(R, alpha):
"""Scale a rotation matrix's angle by alpha (0=identity, 1=original). Pure numpy."""
def rotation_to_rotvec(R):
"""Convert 3x3 rotation matrix to axis-angle (rotation vector, 3-vec). Pure numpy."""
cos_theta = np.clip((np.trace(R) - 1.0) / 2.0, -1.0, 1.0) cos_theta = np.clip((np.trace(R) - 1.0) / 2.0, -1.0, 1.0)
theta = np.arccos(cos_theta) theta = np.arccos(cos_theta)
if theta < 1e-6: if theta < 1e-6:
return np.eye(3)
return np.zeros(3)
axis = np.array([ axis = np.array([
R[2, 1] - R[1, 2], R[2, 1] - R[1, 2],
R[0, 2] - R[2, 0], R[0, 2] - R[2, 0],
R[1, 0] - R[0, 1] R[1, 0] - R[0, 1]
]) / (2.0 * np.sin(theta)) ]) / (2.0 * np.sin(theta))
scaled = theta * alpha
return axis * theta
def rotvec_to_rotation(rv):
"""Convert rotation vector (3-vec) to 3x3 rotation matrix via Rodrigues. Pure numpy."""
theta = np.linalg.norm(rv)
if theta < 1e-6:
return np.eye(3)
axis = rv / theta
K = np.array([[0, -axis[2], axis[1]], K = np.array([[0, -axis[2], axis[1]],
[axis[2], 0, -axis[0]], [axis[2], 0, -axis[0]],
[-axis[1], axis[0], 0]]) [-axis[1], axis[0], 0]])
return np.eye(3) + np.sin(scaled) * K + (1.0 - np.cos(scaled)) * (K @ K)
return np.eye(3) + np.sin(theta) * K + (1.0 - np.cos(theta)) * (K @ K)
def scale_rotation(R, alpha):
"""Scale a rotation matrix's angle by alpha (0=identity, 1=original). Pure numpy."""
rv = rotation_to_rotvec(R)
return rotvec_to_rotation(rv * alpha)
# Previous button state for edge detection # Previous button state for edge detection
_r3_prev_buttons = {} _r3_prev_buttons = {}
@ -135,6 +147,9 @@ if __name__ == '__main__':
parser.add_argument('--ki', type=float, default=0.3, help='Integral gain for drift correction (0.0 = disabled)') parser.add_argument('--ki', type=float, default=0.3, help='Integral gain for drift correction (0.0 = disabled)')
parser.add_argument('--i-clamp', type=float, default=0.10, help='Anti-windup clamp for I term in meters (default: 0.10)') parser.add_argument('--i-clamp', type=float, default=0.10, help='Anti-windup clamp for I term in meters (default: 0.10)')
parser.add_argument('--i-decay', type=float, default=0.995, help='Per-frame decay factor for I term (default: 0.995)') parser.add_argument('--i-decay', type=float, default=0.995, help='Per-frame decay factor for I term (default: 0.995)')
# rotation integral drift correction
parser.add_argument('--ki-rot', type=float, default=0.3, help='Rotation integral gain (0.0 = disabled)')
parser.add_argument('--i-rot-clamp', type=float, default=0.3, help='Anti-windup clamp for rotation I term in radians (default: 0.3)')
parser.add_argument('--no-calibration', action='store_true', parser.add_argument('--no-calibration', action='store_true',
help='Disable start-time calibration (use absolute VP poses)') help='Disable start-time calibration (use absolute VP poses)')
parser.add_argument('--settle-time', type=float, default=0.5, parser.add_argument('--settle-time', type=float, default=0.5,
@ -327,6 +342,8 @@ if __name__ == '__main__':
# Integral drift correction state # Integral drift correction state
i_error_left = np.zeros(3) # Accumulated task-space position error (meters) i_error_left = np.zeros(3) # Accumulated task-space position error (meters)
i_error_right = np.zeros(3) i_error_right = np.zeros(3)
i_rot_error_left = np.zeros(3) # Accumulated rotation error (axis-angle, radians)
i_rot_error_right = np.zeros(3)
last_loop_time = time.time() last_loop_time = time.time()
i_frame_count = 0 i_frame_count = 0
@ -362,6 +379,8 @@ if __name__ == '__main__':
logger_mp.info("[main] Resetting arms to home position...") logger_mp.info("[main] Resetting arms to home position...")
i_error_left = np.zeros(3) i_error_left = np.zeros(3)
i_error_right = np.zeros(3) i_error_right = np.zeros(3)
i_rot_error_left = np.zeros(3)
i_rot_error_right = np.zeros(3)
arm_ctrl.ctrl_dual_arm(np.zeros(14), np.zeros(14)) arm_ctrl.ctrl_dual_arm(np.zeros(14), np.zeros(14))
reset_start = time.time() reset_start = time.time()
while time.time() - reset_start < 3.0: while time.time() - reset_start < 3.0:
@ -408,6 +427,8 @@ if __name__ == '__main__':
calibration_time = time.time() calibration_time = time.time()
i_error_left = np.zeros(3) i_error_left = np.zeros(3)
i_error_right = np.zeros(3) i_error_right = np.zeros(3)
i_rot_error_left = np.zeros(3)
i_rot_error_right = np.zeros(3)
logger_mp.info(f"[calibration] Re-calibrated on resume. " logger_mp.info(f"[calibration] Re-calibrated on resume. "
f"Robot L={robot_ref_left[:3,3]}, R={robot_ref_right[:3,3]}") f"Robot L={robot_ref_left[:3,3]}, R={robot_ref_right[:3,3]}")
prev_start = START prev_start = START
@ -416,6 +437,8 @@ if __name__ == '__main__':
if not START: if not START:
i_error_left = np.zeros(3) i_error_left = np.zeros(3)
i_error_right = np.zeros(3) i_error_right = np.zeros(3)
i_rot_error_left = np.zeros(3)
i_rot_error_right = np.zeros(3)
current_time = time.time() current_time = time.time()
time_elapsed = current_time - start_time time_elapsed = current_time - start_time
sleep_time = max(0, (1 / args.frequency) - time_elapsed) sleep_time = max(0, (1 / args.frequency) - time_elapsed)
@ -521,35 +544,73 @@ if __name__ == '__main__':
left_wrist_adjusted = tele_data.left_wrist_pose.copy() left_wrist_adjusted = tele_data.left_wrist_pose.copy()
right_wrist_adjusted = tele_data.right_wrist_pose.copy() right_wrist_adjusted = tele_data.right_wrist_pose.copy()
if args.ki > 0.0 and INTEGRAL_ENABLED:
# FK: where are the robot hands actually?
left_ee_pos, right_ee_pos = arm_ik.compute_fk(current_lr_arm_q)
# Error: calibrated target - actual (task-space position)
left_error = left_wrist_adjusted[:3, 3] - left_ee_pos
right_error = right_wrist_adjusted[:3, 3] - right_ee_pos
# Leaky integrator with anti-windup
i_error_left = i_error_left * args.i_decay + left_error * dt
i_error_right = i_error_right * args.i_decay + right_error * dt
i_error_left = np.clip(i_error_left, -args.i_clamp, args.i_clamp)
i_error_right = np.clip(i_error_right, -args.i_clamp, args.i_clamp)
# Bias IK target position (rotation unchanged)
left_wrist_adjusted[:3, 3] += args.ki * i_error_left
right_wrist_adjusted[:3, 3] += args.ki * i_error_right
if (args.ki > 0.0 or args.ki_rot > 0.0) and INTEGRAL_ENABLED:
# FK: where are the robot hands actually? (full SE(3) for rotation I-term)
left_ee_pose, right_ee_pose = arm_ik.compute_fk_pose(current_lr_arm_q)
left_ee_pos = left_ee_pose[:3, 3]
right_ee_pos = right_ee_pose[:3, 3]
# --- Position I-term ---
if args.ki > 0.0:
left_error = left_wrist_adjusted[:3, 3] - left_ee_pos
right_error = right_wrist_adjusted[:3, 3] - right_ee_pos
# Leaky integrator with anti-windup
i_error_left = i_error_left * args.i_decay + left_error * dt
i_error_right = i_error_right * args.i_decay + right_error * dt
i_error_left = np.clip(i_error_left, -args.i_clamp, args.i_clamp)
i_error_right = np.clip(i_error_right, -args.i_clamp, args.i_clamp)
# Bias IK target position
left_wrist_adjusted[:3, 3] += args.ki * i_error_left
right_wrist_adjusted[:3, 3] += args.ki * i_error_right
# --- Rotation I-term ---
if args.ki_rot > 0.0:
# Rotation error: target_R @ actual_R^T → how far off is actual from target
left_rot_err = rotation_to_rotvec(
left_wrist_adjusted[:3, :3] @ left_ee_pose[:3, :3].T)
right_rot_err = rotation_to_rotvec(
right_wrist_adjusted[:3, :3] @ right_ee_pose[:3, :3].T)
# Leaky integrator
i_rot_error_left = i_rot_error_left * args.i_decay + left_rot_err * dt
i_rot_error_right = i_rot_error_right * args.i_decay + right_rot_err * dt
# Anti-windup: clamp by magnitude (radians)
l_rot_mag = np.linalg.norm(i_rot_error_left)
if l_rot_mag > args.i_rot_clamp:
i_rot_error_left = i_rot_error_left * (args.i_rot_clamp / l_rot_mag)
r_rot_mag = np.linalg.norm(i_rot_error_right)
if r_rot_mag > args.i_rot_clamp:
i_rot_error_right = i_rot_error_right * (args.i_rot_clamp / r_rot_mag)
# Apply rotation correction to IK target
left_wrist_adjusted[:3, :3] = (
rotvec_to_rotation(args.ki_rot * i_rot_error_left)
@ left_wrist_adjusted[:3, :3])
right_wrist_adjusted[:3, :3] = (
rotvec_to_rotation(args.ki_rot * i_rot_error_right)
@ right_wrist_adjusted[:3, :3])
# Log every ~1 second (every 30 frames at 30Hz) # Log every ~1 second (every 30 frames at 30Hz)
i_frame_count += 1 i_frame_count += 1
if i_frame_count % 30 == 0: if i_frame_count % 30 == 0:
logger_mp.info(
f"[I-term] L_err={np.linalg.norm(left_error):.4f}m "
f"R_err={np.linalg.norm(right_error):.4f}m | "
f"L_I={np.linalg.norm(i_error_left):.4f} "
f"R_I={np.linalg.norm(i_error_right):.4f} | "
f"L_corr={np.linalg.norm(args.ki * i_error_left):.4f}m "
f"R_corr={np.linalg.norm(args.ki * i_error_right):.4f}m"
)
pos_info = ""
rot_info = ""
if args.ki > 0.0:
pos_info = (
f"pos L_err={np.linalg.norm(left_error):.4f}m "
f"R_err={np.linalg.norm(right_error):.4f}m "
f"L_I={np.linalg.norm(i_error_left):.4f} "
f"R_I={np.linalg.norm(i_error_right):.4f}")
if args.ki_rot > 0.0:
rot_info = (
f"rot L_err={np.linalg.norm(left_rot_err):.3f}rad "
f"R_err={np.linalg.norm(right_rot_err):.3f}rad "
f"L_I={np.linalg.norm(i_rot_error_left):.3f} "
f"R_I={np.linalg.norm(i_rot_error_right):.3f}")
logger_mp.info(f"[I-term] {pos_info} {rot_info}".strip())
# solve ik using motor data and adjusted wrist pose # solve ik using motor data and adjusted wrist pose
time_ik_start = time.time() time_ik_start = time.time()

Loading…
Cancel
Save