I am using Python SDK to control my xArm Lite 6. The set_servo_angle has the “wait” option. I set this option to “true” (wait=True) so that I can wait the motion to stop before executing another command relating to I/O and external serial device. My problem is that result is not consistent. Some time the program waits the robot to finish moving and some other time it does not wait. What could be the reason? My sample code is below.
#!/usr/bin/env python3
import sys
import math
import time
import queue
import datetime
import random
import traceback
import threading
from xarm import version
from xarm.wrapper import XArmAPI
from tkinter import *
from tkinter import ttk
import time
from threading import *
import serial
import struct
Create Object
root = Tk()
def threading():
# Call work function
t1=Thread(target=work)
t1.start()
def work():
try:
robot_main._Volume = int(fluid_Vol.get())
robot_main._DispenseSpd = int(fluid_DispenseSpd.get())
robot_main._AspireSpd = int(fluid_AspireSpd.get())
lb_errorStr.set('')
print("Move robot")
robot_main._robot_init()
robot_main.run()
except:
lb_errorStr.set('ERROR: please enter only numbers within the ranges')
Set geometry
root.title(“Title”)
root.geometry(“500x300”)
root.configure(background = ‘#CCCCFF’)
Create Button
btn = Button(root, text=“Start”, height=5, width=10 ,command = threading).grid(row=5,column=0)
class RobotMain(object):
“”“Robot Main Class”“”
def init(self, robot, **kwargs):
self.alive = True
self._arm = robot
self._tcp_speed = 100
self._tcp_acc = 2000
self._angle_speed = 10
self._angle_acc = 500
self._vars = {}
self._funcs = {}
# Robot init
def _robot_init(self):
self._arm.clean_warn()
self._arm.clean_error()
self._arm.motion_enable(True)
self._arm.set_mode(0)
self._arm.set_state(0)
time.sleep(1)
self._arm.register_error_warn_changed_callback(self._error_warn_changed_callback)
self._arm.register_state_changed_callback(self._state_changed_callback)
if hasattr(self._arm, 'register_count_changed_callback'):
self._arm.register_count_changed_callback(self._count_changed_callback)
def _microfluid_init(self):
self._fluid = serial.Serial(
port='/dev/ttyUSB0',
baudrate=38400,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS
)
print(self._fluid.isOpen())
self._fluid.write('1>It500,100,2'.encode())
# Register error/warn changed callback
def _error_warn_changed_callback(self, data):
if data and data['error_code'] != 0:
self.alive = False
self.pprint('err={}, quit'.format(data['error_code']))
self._arm.release_error_warn_changed_callback(self._error_warn_changed_callback)
# Register state changed callback
def _state_changed_callback(self, data):
if data and data['state'] == 4:
self.alive = False
self.pprint('state=4, quit')
self._arm.release_state_changed_callback(self._state_changed_callback)
# Register count changed callback
def _count_changed_callback(self, data):
if self.is_alive:
self.pprint('counter val: {}'.format(data['count']))
def _check_code(self, code, label):
if not self.is_alive or code != 0:
self.alive = False
ret1 = self._arm.get_state()
ret2 = self._arm.get_err_warn_code()
self.pprint('{}, code={}, connected={}, state={}, error={}, ret1={}. ret2={}'.format(label, code, self._arm.connected, self._arm.state, self._arm.error_code, ret1, ret2))
return self.is_alive
@staticmethod
def pprint(*args, **kwargs):
try:
stack_tuple = traceback.extract_stack(limit=2)[0]
print('[{}][{}] {}'.format(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())), stack_tuple[1], ' '.join(map(str, args))))
except:
print(*args, **kwargs)
@property
def arm(self):
return self._arm
@property
def VARS(self):
return self._vars
@property
def FUNCS(self):
return self._funcs
@property
def is_alive(self):
if self.alive and self._arm.connected and self._arm.error_code == 0:
if self._arm.state == 5:
cnt = 0
while self._arm.state == 5 and cnt < 5:
cnt += 1
time.sleep(0.1)
return self._arm.state < 4
else:
return False
# Robot Main Run
def run(self):
try:
# test1
self._angle_speed = 15
self._angle_acc = 300
code = self._arm.set_servo_angle(angle=[18.6, -7.8, 44.4, 0.0, -37.8, 0.0], speed=self._angle_speed, mvacc=self._angle_acc, wait=True, radius=0.0)
if not self._check_code(code, 'set_servo_angle'):
return
print('step 1')
self._tcp_speed = 53
self._tcp_acc = 500
code = self._arm.set_position(*[317.6, 106.6, 259.4, 180.0, -90.0, 18.6], speed=self._tcp_speed, mvacc=self._tcp_acc, radius=0.0, wait=True)
if not self._check_code(code, 'set_position'):
return
print('step 2 -- now aspiring fluid')
# aspire the solution
text2aspire ='1>Ia100,100,10'
text2dispense = '1>Da500,,100,10'
self._fluid.write(text2aspire.encode())
code = self._arm.set_pause_time(int(self._Volume/self._AspireSpd)+1, wait=True)
if not self._check_code(code, 'set_pause_time'):
return
print('step 3 -- finish aspiring')
code = self._arm.set_position(*[317.5, 106.8, 370.7, 180.0, -90.0, 18.6], speed=self._tcp_speed, mvacc=self._tcp_acc, radius=0.0, wait=True)
if not self._check_code(code, 'set_position'):
return
print('step 4')
code = self._arm.set_pause_time(1)
if not self._check_code(code, 'set_pause_time'):
return
code = self._arm.set_servo_angle(angle=[40.9, -7.8, 44.4, 0.0, -37.8, 0.0], speed=self._angle_speed, mvacc=self._angle_acc, wait=True, radius=0.0)
if not self._check_code(code, 'set_servo_angle'):
return
print('step 5')
code = self._arm.set_pause_time(1)
if not self._check_code(code, 'set_pause_time'):
return
code = self._arm.set_position(*[253.2, 219.4, 310, 180.0, -90.0, 40.9], speed=self._tcp_speed, mvacc=self._tcp_acc, radius=0.0, wait=True)
if not self._check_code(code, 'set_position'):
return
print('step 6')
code = self._arm.set_pause_time(1)
if not self._check_code(code, 'set_pause_time'):
return
self._fluid.write(text2dispense.encode())
code = self._arm.set_pause_time(int(self._Volume/self._DispenseSpd)+1, wait=True)
if not self._check_code(code, 'set_pause_time'):
return
code = self._arm.set_position(*[253.2, 219.4, 370.7, 180.0, -90.0, 40.9], speed=self._tcp_speed, mvacc=self._tcp_acc, radius=0.0, wait=True)
if not self._check_code(code, 'set_position'):
return
print('step 7')
code = self._arm.set_pause_time(1)
if not self._check_code(code, 'set_pause_time'):
return
code = self._arm.set_servo_angle(angle=[18.6, -7.8, 44.4, 0.0, -37.8, 0.0], speed=self._angle_speed, mvacc=self._angle_acc, wait=True, radius=0.0)
if not self._check_code(code, 'set_servo_angle'):
return
print('step 8')
print('Move finished')
# self._fluid.write(b'1>Ld1')
# Aspiring fluid
except Exception as e:
self.pprint('MainException: {}'.format(e))
# self.alive = False
self._arm.release_error_warn_changed_callback(self._error_warn_changed_callback)
print('after 2')
self._arm.release_state_changed_callback(self._state_changed_callback)
if hasattr(self._arm, 'release_count_changed_callback'):
self._arm.release_count_changed_callback(self._count_changed_callback)
def new_method(self):
return True
if name == ‘main’:
RobotMain.pprint(‘xArm-Python-SDK Version:{}’.format(version.version))
arm = XArmAPI(‘192.168.1.185’, baud_checkset=False)
robot_main = RobotMain(arm)
# Execute Tkinter
root.mainloop()