Dual-Arm Sync Example

This is a demo script. When you move the first arm, the other arm follows.

Requirements

  • Python 3.5-3.13
  • xarm-python-sdk (pip install xarm-python-sdk)
  • Two robotic arm of the same model (e.g., two xArm 6) on the same network.
  • OS Recommendation: For best performance, it is recommended to run this script on a Linux system with a real-time patch. Running on Windows is not advised due to potential real-time performance issues.

Quick Start

  1. Configure IPs: Edit the script below and set MASTER_IP and SLAVE_IP.
  2. Enable Master Arm: Put the master arm in a mode where it can be moved manually (e.g., drag-and-teach).
  3. Run: Save the code below as a Python file (e.g., dual_arm_sync.py) and run it from your terminal:
    python dual_arm_sync.py
    
  4. Stop: Press Ctrl+C in the terminal.

Full Code

import time
from xarm.wrapper import XArmAPI

class XArmSyncController:
    def __init__(self, master_ip, slave_ip, init_speed=10):
        self.master_ip = master_ip
        self.slave_ip = slave_ip
        self.init_speed = init_speed
        self.master_arm = None
        self.slave_arm = None
        self.running = False

    def connect_arms(self):
        try:
            print(f"Connecting to master arm: {self.master_ip}")
            self.master_arm = XArmAPI(self.master_ip)
            self.master_arm.motion_enable(enable=True)
            self.master_arm.set_mode(0)
            self.master_arm.set_state(state=0)
            print("Master arm connected successfully.")

            print(f"Connecting to slave arm: {self.slave_ip}")
            self.slave_arm = XArmAPI(self.slave_ip)
            self.slave_arm.motion_enable(enable=True)
            self.slave_arm.set_mode(0)
            self.slave_arm.set_state(state=0)
            print("Slave arm connected successfully.")
            return True
        except Exception as e:
            print(f"Failed to connect to arms: {e}")
            self.disconnect_arms()
            return False

    def _ensure_sync_before_follow(self, safety_threshold=0.2):
        _, master_angles = self.master_arm.get_servo_angle(is_real=True)
        _, slave_angles = self.slave_arm.get_servo_angle(is_real=True)

        if not master_angles or not slave_angles:
            print("Safety check failed: Could not get arm angles.")
            return False

        for i in range(len(master_angles)):
            if abs(master_angles[i] - slave_angles[i]) > safety_threshold:
                print("Warning: Position deviation detected. Auto-syncing...")
                if self.slave_arm.mode != 0:
                    self.slave_arm.set_mode(0)
                    self.slave_arm.set_state(0)
                    time.sleep(0.1)

                code = self.slave_arm.set_servo_angle(angle=master_angles, speed=self.init_speed, wait=True)
                if code != 0:
                    print(f"Re-sync failed with error code: {code}")
                    return False
                print("Re-sync completed.")
                break
        return True

    def start_sync(self):
        if not self.connect_arms():
            return

        if not self._ensure_sync_before_follow():
            print("Final safety check failed. Aborting sync.")
            self.disconnect_arms()
            return

        try:
            self.slave_arm.set_mode(1)
            self.slave_arm.set_state(state=0)
            time.sleep(0.5)
            print("\nSlave arm is now in follow mode. Move the master arm. Press Ctrl+C to stop.")
            self.running = True

            while self.running:
                code, angles = self.master_arm.get_servo_angle(is_real=True)
                if code == 0 and angles:
                    # Speed and acceleration parameters are required in servo mode
                    self.slave_arm.set_servo_angle_j(angles=angles, speed=100, mvacc=2000)

                # Check for errors on the slave arm in real-time
                if self.slave_arm.error_code != 0:
                    print(f"Slave arm error detected. Error code: {self.slave_arm.error_code}. Stopping sync.")
                    break
                
                time.sleep(0.01)

        except KeyboardInterrupt:
            print("\nUser interrupted. Stopping sync control.")
        except Exception as e:
            print(f"\nAn error occurred during sync: {e}")
        finally:
            self.stop_sync()

    def stop_sync(self):
        self.running = False
        self.disconnect_arms()
        print("All connections have been disconnected.")

    def disconnect_arms(self):
        if self.master_arm and self.master_arm.connected:
            self.master_arm.disconnect()
        if self.slave_arm and self.slave_arm.connected:
            self.slave_arm.disconnect()

if __name__ == "__main__":
    MASTER_IP = "192.168.1.xxx"
    SLAVE_IP = "192.168.1.xxx"
    
    sync_controller = XArmSyncController(MASTER_IP, SLAVE_IP, init_speed=10)
    sync_controller.start_sync()