-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmini_arm.py
151 lines (117 loc) · 4.92 KB
/
mini_arm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import time
import serial
import threading
import numpy as np
__version__ = '0.1.0'
__author__ = 'Jonathan Shulgach'
class MiniArmClient(object):
""" A class that serves as a client for the MiniArm robot arm and acts like a driver
Parameters
----------
name (str) : Name of the client
port (str) : Serial port to connect to
baudrate (int) : Baudrate for the serial connection
visualize (bool) : Enable/disable visualization
verbose (bool) : Enable/disable verbose output
"""
def __init__(self, name='MiniArmClient', port='COM3', baudrate=9600, command_delimiter=';', verbose=False):
self.name = name
self.command_delimiter = command_delimiter
self.verbose = verbose
# Setup serial communication
self.s = serial.Serial(port, baudrate, timeout=1)
self.connected = True if self.s.is_open else False
# Print out any bytes currently in the buffer
data = self.get_buffer()
# Set the debug mode to match verbose
#self.send_message(f"debug:{self.verbose}")
self.home() # Set to home position upon successful connection
# Move to a new pose
self.send_message('set_pose:[0.135, 0.000, 0.22]')
#self.send_message('set_delta_pose:[0.0,0.0,-0.02]')
self.logger(f"{self.name} initialized\n")
def logger(self, *argv, warning=False):
""" Robust logging function to track events"""
msg = ''.join(argv)
if warning: msg = '(Warning) ' + msg
print("[{:.3f}][{}] {}".format(time.monotonic(), self.name, msg))
def send_message(self, message):
"""Sends a message to the Pico over serial. Add the command terminator if missing.
Parameters:
-----------
message (str): The message to send.
"""
if self.s and self.s.is_open:
if not message.endswith(self.command_delimiter):
message += self.command_delimiter # Add terminator character to the message
try:
self.s.write(message.encode())
if self.verbose:
print(f"Sent message to Pico: {message}")
time.sleep(0.01) # Mandatory 10ms delay to prevent buffer overflow
except serial.SerialException as e:
print(f"Error sending message: {e}")
else:
print("Serial connection not available or not open.")
def get_buffer(self):
"""Read all available bytes from the serial connection and return them.
Return:
-------
str : The buffer contents
"""
if self.s and self.s.is_open:
try:
if self.s.in_waiting > 0:
msg = ""
while self.s.in_waiting > 0:
msg += self.s.readline().decode().strip()
time.sleep(0.01) # Mandatory 10ms delay to prevent buffer overflow
return msg
except serial.SerialException as e:
print(f"Error reading message: {e}")
def get_current_pose(self):
""" Returns the current pose of the robot arm
Message received from robot with 'get_pose' command is:
'[10262.801][Robot]Current pose: cords: [x: 0.13500, y: 0.00000, z: 0.21500]angles: [Roll: 0.00000, Pitch: 0.00000, Yaw:0.00000]tool: 0'
Return:
----------
np.array : A numpy array representing the current pose of the robot arm [x, y, z, roll, pitch, yaw]
"""
# Get the pose
self.send_message("get_pose;")
# Read all data from the buffer
data = self.get_buffer()
# Extract the pose from the data
if data:
try:
pose = data.split('cords: ')[1].split('angles: ')[0].strip().replace('[', '').replace(']', '').split(',')
pose = [float(p.split(':')[1]) for p in pose]
return np.array(pose)
except Exception as e:
print(f"Error extracting pose: {e}")
return np.zeros((6,))
else:
print("No pose data to read from")
return None
def home(self):
""" Sends the robot arm to the home position"""
self.send_message("home;")
def send_ctrl_c(self):
""" Sends the Ctrl+C command to the robot arm"""
self.s.write(b'\x03')
time.sleep(0.01)
def send_ctrl_d(self):
""" Sends the Ctrl+D command to the robot arm"""
self.s.write(b'\x04')
time.sleep(0.01)
def disconnect(self):
""" A function that stops the client and closes the serial connection"""
self.s.close()
self.logger("Serial connection closed...")
def set_debug_mode(self, mode):
""" Set the debug mode of the robot arm
Parameters:
-----------
mode (bool, str): The debug mode to set (true/false or 'on'/'off'
"""
self.send_message(f"debug:{mode}")