双臂同步示例

这是一个演示脚本,拖动第一台机械臂,另一台机械臂跟着动。

环境要求

  • Python 3.5-3.13
  • xarm-python-sdk (pip install xarm-python-sdk)
  • 两台同型号(如两台xArm 6)且处于同一局域网的机械臂。
  • 操作系统建议: 为保证最佳的实时同步效果,推荐在打了实时补丁(real-time patch)的 Linux 系统上运行此示例。不建议在 Windows 系统上运行,因为其系统特性可能导致延迟和卡顿。

快速开始

  1. 配置IP: 编辑下面的脚本,设置 MASTER_IPSLAVE_IP
  2. 启用主机: 将主机机械臂设置为可手动移动的模式(如拖动示教)。
  3. 运行: 将下方代码保存为 Python 文件(如 dual_arm_sync.py),然后在终端中运行它:
    python dual_arm_sync.py
    
  4. 停止: 在终端中按下 Ctrl+C

完整代码

import time
from xarm.wrapper import XArmAPI


class XArmSyncController:
    def __init__(self, master_ip, slave_ip, init_speed=10):
        self.master_ip = master_ip
        self.slave_ip = slave_ip
        self.init_speed = init_speed
        self.master_arm = None
        self.slave_arm = None
        self.running = False

    def connect_arms(self):
        try:
            print(f"正在连接主机械臂: {self.master_ip}")
            self.master_arm = XArmAPI(self.master_ip)
            self.master_arm.motion_enable(enable=True)
            self.master_arm.set_mode(0)
            self.master_arm.set_state(state=0)
            print("主机械臂连接成功")

            print(f"正在连接从机械臂: {self.slave_ip}")
            self.slave_arm = XArmAPI(self.slave_ip)
            self.slave_arm.motion_enable(enable=True)
            self.slave_arm.set_mode(0)
            self.slave_arm.set_state(state=0)
            print("从机械臂连接成功")
            return True
        except Exception as e:
            print(f"机械臂连接失败: {e}")
            self.disconnect_arms()
            return False

    def _ensure_sync_before_follow(self, safety_threshold=0.2):
        _, master_angles = self.master_arm.get_servo_angle(is_real=True)
        _, slave_angles = self.slave_arm.get_servo_angle(is_real=True)

        if not master_angles or not slave_angles:
            print("安全检查失败: 无法获取角度")
            return False

        for i in range(len(master_angles)):
            if abs(master_angles[i] - slave_angles[i]) > safety_threshold:
                print("检测到位置偏差,正在自动重同步...")
                if self.slave_arm.mode != 0:
                    print("检测到从机不处于位置模式,正在自动切换...")
                    self.slave_arm.set_mode(0)
                    self.slave_arm.set_state(0)
                    time.sleep(0.1)

                code = self.slave_arm.set_servo_angle(angle=master_angles, speed=self.init_speed, wait=True)
                if code != 0:
                    print(f"重同步失败,错误码: {code}")
                    return False
                print("重同步完成")
                break
        return True

    def start_sync(self):
        if not self.connect_arms():
            return

        if not self._ensure_sync_before_follow():
            print("最终安全检查失败,无法启动")
            self.disconnect_arms()
            return

        try:
            self.slave_arm.set_mode(1)
            self.slave_arm.set_state(state=0)
            time.sleep(0.5)
            print("\n从机已切换到跟随模式,请控制主机移动,或者将主机调整为手动模式并拖动主机。按Ctrl+C停止。")
            self.running = True

            while self.running:
                code, angles = self.master_arm.get_servo_angle(is_real=True)
                if code == 0 and angles:
                    # 在伺服模式下,需要指定速度和加速度参数
                    self.slave_arm.set_servo_angle_j(angles=angles, speed=100, mvacc=2000)

                # 实时检查从机是否报错
                if self.slave_arm.error_code != 0:
                    print(f"从机报错,错误码: {self.slave_arm.error_code},自动停止跟随。")
                    break

                time.sleep(0.01)

        except KeyboardInterrupt:
            print("\n用户中断,停止同步控制")
        except Exception as e:
            print(f"\n控制过程出错: {e}")
        finally:
            self.stop_sync()

    def stop_sync(self):
        self.running = False
        self.disconnect_arms()
        print("已断开所有连接")

    def disconnect_arms(self):
        if self.master_arm and self.master_arm.connected:
            self.master_arm.disconnect()
        if self.slave_arm and self.slave_arm.connected:
            self.slave_arm.disconnect()


if __name__ == "__main__":
    # 主从机械臂IP地址
    MASTER_IP = "192.168.1.xxx"
    SLAVE_IP = "192.168.1.xxx"

    # 创建同步控制器实例
    sync_controller = XArmSyncController(MASTER_IP, SLAVE_IP, init_speed=10)
    sync_controller.start_sync()