diff --git a/teleop/robot_control/robot_arm.py b/teleop/robot_control/robot_arm.py index 957d7bb..0397861 100644 --- a/teleop/robot_control/robot_arm.py +++ b/teleop/robot_control/robot_arm.py @@ -32,6 +32,7 @@ class MotorState: class G1_29_LowState: def __init__(self): self.motor_state = [MotorState() for _ in range(G1_29_Num_Motors)] + self.wireless_remote = bytes(40) class G1_23_LowState: def __init__(self): @@ -146,6 +147,7 @@ class G1_29_ArmController: for id in range(G1_29_Num_Motors): lowstate.motor_state[id].q = msg.motor_state[id].q lowstate.motor_state[id].dq = msg.motor_state[id].dq + lowstate.wireless_remote = bytes(msg.wireless_remote) self.lowstate_buffer.SetData(lowstate) time.sleep(0.002) @@ -212,7 +214,14 @@ class G1_29_ArmController: def get_current_dual_arm_dq(self): '''Return current state dq of the left and right arm motors.''' return np.array([self.lowstate_buffer.GetData().motor_state[id].dq for id in G1_29_JointArmIndex]) - + + def get_wireless_remote(self): + '''Return raw wireless_remote bytes (40 bytes) from latest lowstate.''' + data = self.lowstate_buffer.GetData() + if data is not None: + return data.wireless_remote + return bytes(40) + def ctrl_dual_arm_go_home(self): '''Move both the left and right arms of the robot to their home position by setting the target joint angles (q) and torques (tau) to zero.''' logger_mp.info("[G1_29_ArmController] ctrl_dual_arm_go_home start...") diff --git a/teleop/teleop_hand_and_arm.py b/teleop/teleop_hand_and_arm.py index 9b9a5b6..0b58818 100644 --- a/teleop/teleop_hand_and_arm.py +++ b/teleop/teleop_hand_and_arm.py @@ -1,5 +1,6 @@ import time import argparse +import struct import numpy as np from multiprocessing import Value, Array, Lock import threading @@ -51,6 +52,23 @@ RESET_ARMS = False # Trigger arm reset via 'h' key # ==> manual: when READY is True, set RECORD_TOGGLE=True to transition. # --> auto : Auto-transition after saving data. +def parse_r3_buttons(data): + """Parse R3 controller button bytes from wireless_remote. Returns dict of button states.""" + if len(data) < 4: + return {} + btn1 = data[2] # R1=0x01, L1=0x02, Start=0x04, Select=0x08, R2=0x10, L2=0x20 + btn2 = data[3] # A=0x01, B=0x02, X=0x04, Y=0x08, Up=0x10, Right=0x20, Down=0x40, Left=0x80 + return { + 'A': bool(btn2 & 0x01), + 'B': bool(btn2 & 0x02), + 'X': bool(btn2 & 0x04), + 'Y': bool(btn2 & 0x08), + 'Start': bool(btn1 & 0x04), + } + +# Previous button state for edge detection +_r3_prev_buttons = {} + def on_press(key): global STOP, START, RECORD_TOGGLE, INTEGRAL_ENABLED, RESET_ARMS if key == 'r': @@ -253,21 +271,28 @@ if __name__ == '__main__': rerun_log = not args.headless) logger_mp.info("----------------------------------------------------------------") - logger_mp.info("🟢 Press [r] to start syncing the robot with your movements.") + logger_mp.info(" Keyboard | R3 Controller | Action") + logger_mp.info(" --------- --------------- ------") + logger_mp.info(" [r] | [A] | Start arm tracking") + logger_mp.info(" [q] | [B] | Stop and exit") + logger_mp.info(" [h] | [X] | Reset arms to home") + logger_mp.info(" [i] | [Y] | Toggle I-term correction") if args.record: - logger_mp.info("🟡 Press [s] to START or SAVE recording (toggle cycle).") - else: - logger_mp.info("🔵 Recording is DISABLED (run with --record to enable).") - logger_mp.info("🏠 Press [h] to reset arms to home position (without exiting).") + logger_mp.info(" [s] | [Start] | Toggle recording") if args.ki > 0.0: - logger_mp.info(f"🔧 Press [i] to toggle integral drift correction (Ki={args.ki}, clamp={args.i_clamp}m, decay={args.i_decay})") - else: - logger_mp.info("⚪ Integral correction DISABLED (--ki=0.0)") - logger_mp.info("🔴 Press [q] to stop and exit the program.") + logger_mp.info(f" I-term: Ki={args.ki}, clamp={args.i_clamp}m, decay={args.i_decay}") + logger_mp.info("----------------------------------------------------------------") logger_mp.info("⚠️ IMPORTANT: Please keep your distance and stay safe.") READY = True # now ready to (1) enter START state while not START and not STOP: # wait for start or stop signal. time.sleep(0.033) + # Poll R3 A button in wait loop (edge-detected) + r3_data = arm_ctrl.get_wireless_remote() + r3_btns = parse_r3_buttons(r3_data) + if r3_btns.get('A') and not _r3_prev_buttons.get('A'): + START = True + logger_mp.info("[R3] A pressed → START tracking") + _r3_prev_buttons = r3_btns if camera_config['head_camera']['enable_zmq'] and xr_need_local_img: head_img = img_client.get_head_frame() tv_wrapper.render_to_xr(head_img) @@ -302,6 +327,28 @@ if __name__ == '__main__': last_loop_time = time.time() continue + # Poll R3 controller buttons (edge-detected) + r3_data = arm_ctrl.get_wireless_remote() + r3_btns = parse_r3_buttons(r3_data) + if r3_btns.get('A') and not _r3_prev_buttons.get('A'): + if not START: + START = True + logger_mp.info("[R3] A pressed → START tracking") + if r3_btns.get('B') and not _r3_prev_buttons.get('B'): + START = False + STOP = True + logger_mp.info("[R3] B pressed → STOP") + if r3_btns.get('X') and not _r3_prev_buttons.get('X') and START: + RESET_ARMS = True + logger_mp.info("[R3] X pressed → Reset arms") + if r3_btns.get('Y') and not _r3_prev_buttons.get('Y'): + INTEGRAL_ENABLED = not INTEGRAL_ENABLED + logger_mp.info(f"[R3] Y pressed → Integral {'ENABLED' if INTEGRAL_ENABLED else 'DISABLED'}") + if r3_btns.get('Start') and not _r3_prev_buttons.get('Start') and START: + RECORD_TOGGLE = True + logger_mp.info("[R3] Start pressed → Toggle recording") + _r3_prev_buttons = r3_btns + # get image if camera_config['head_camera']['enable_zmq']: if args.record or xr_need_local_img: