-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathipp.py
executable file
·1132 lines (938 loc) · 35.8 KB
/
ipp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
An I++ client implemented in python using asycio async/await syntax.
"""
import socket
import sys
from enum import Enum
import time
import asyncio
import logging
import tornado
from tornado.ioloop import IOLoop
from tornado.tcpclient import TCPClient
from tornado.iostream import StreamClosedError
from dataclasses import dataclass
import math
import functools
import traceback
import numpy as np
from scipy.spatial.transform import Rotation
logger = logging.getLogger(__name__)
RECEIVE_SIZE = 1024
HOST = "10.0.0.1"
PORT = 1294
IPP_ACK_CHAR = "&"
IPP_COMPLETE_CHAR = "%"
IPP_DATA_CHAR = "#"
IPP_ERROR_CHAR = "!"
status = 0
class float3:
def __init__(self, *args):
if len(args)==0:
self.values = (0,0,0)
elif len(args)==1:
if hasattr(args[0], '__iter__'):
self.values = tuple([ i for i in args[0] ])
else:
logger.debug("failed to create values")
raise ValueError("Invalid args {} for float3".format(args))
elif len(args) == 3:
self.values = args
else:
raise ValueError("Invalid args {} for float3".format(args))
self.x = self.values[0]
self.y = self.values[1]
self.z = self.values[2]
def __array__(self, dtype=None):
if dtype:
return np.array([self.x, self.y, self.z], dtype=dtype)
else:
return np.array([self.x, self.y, self.z])
def __sub__(self, other):
""" Returns the vector difference of self and other """
if isinstance(other, float3):
subbed = float3(self.x-other.x,self.y-other.y,self.z-other.z)
elif isinstance(other, (int, float)):
subbed = float3( list(a - other for a in self) )
else:
raise ValueError("Subtraction with type {} not supported".format(type(other)))
return self.__class__(*subbed)
def __rsub__(self, other):
return self.__sub__(other)
def __add__(self, other):
""" Returns the vector addition of self and other """
if isinstance(other, float3):
added = float3(other.x+self.x,other.y+self.y,other.z+self.z)
elif isinstance(other, (int, float)):
added = float3( list(a + other for a in self) )
else:
raise ValueError("Addition with type {} not supported".format(type(other)))
return added
def __radd__(self, other):
return self.__add__(other)
def __mul__(self,other):
if isinstance(other, float3):
product = float3(other.x * self.x,other.y * self.y,other.z * self.z)
return product
elif isinstance(other, (int, float)):
product = float3(self.x*other, self.y*other, self.z*other)
return product
else:
raise ValueError("Multiplication with type {} not supported".format(type(other)))
def __rmul__(self, other):
""" Called if 4 * self for instance """
return self.__mul__(other)
def norm(self):
""" Returns the norm (length, magnitude) of the vector """
return math.sqrt(sum( x*x for x in self ))
def normalize(self):
""" Returns a normalized unit vector """
norm = self.norm()
normed = tuple( x / norm for x in self )
return self.__class__(*normed)
def __iter__(self):
for val in [self.x,self.y,self.z]:
yield val
def __repr__(self):
return str(self.values)
def __getitem__(self, item):
return self.values[item]
def inner(self, vector):
""" Returns the dot product (inner product) of self and another vector
"""
if not isinstance(vector, float3):
raise ValueError('The dot product requires another vector')
return sum(a * b for a, b in zip(self, vector))
def ToXYZString(self):
return "X(%s), Y(%s), Z(%s)" % (self.x, self.y, self.z)
@classmethod
def FromPtMeasString(cls,ptMeasString):
xyzString = ptMeasString[ptMeasString.find("PtMeas(") + 7 : ptMeasString.rfind(")")]
return cls.FromXYZString(xyzString)
@classmethod
def FromXYZString(cls, xyzString):
x = float(xyzString[xyzString.find("X(") + 2 : xyzString.find("), Y")])
y = float(xyzString[xyzString.find("Y(") + 2 : xyzString.find("), Z")])
z = float(xyzString[xyzString.find("Z(") + 2 : xyzString.rfind(")")])
return cls(x,y,z)
def ToIJKString(self):
return "IJK(%s,%s,%s)" % (self.x, self.y, self.z)
# I++ documentation mentions zxz rotation order in an example and
# experimentation has shown it work.
ORDER = 'zxz'
class Csy:
def __init__(self, x,y,z,theta,psi,phi):
"""
Parameters are in the same order as they are passed to I++'s SetCsyTransform.
Note that the order of the angles is different than the order they are passed
in to perform euler angle calculations.
"""
self.x = x
self.y = y
self.z = z
self.theta = theta
self.psi = psi
self.phi = phi
def toMatrix4(self):
r = Rotation.from_euler(ORDER, (self.phi, self.theta, self.psi), degrees=True)
# Convert rotation to 3x3 matrix
mat3 = r.as_matrix()
# Initialize empty 4x4 matrix
mat4 = np.empty((4,4))
# Copy 3x3 rotation matrix into top left of 4x4 matrix
mat4[:3,:3] = mat3
# Populate the translation portion of the 4x4 matrix with the origin
mat4[:3,3] = (self.x, self.y, self.z)
# Fill in the homogenous coordinates
mat4[3,:] = [0,0,0,1]
return mat4
def fromMatrix4(mat4):
# Extract the 3x3 rotation matrix from the 4x4 matrix
mat3 = mat4[0:3, 0:3]
# Create a rotation object with rotation matrix
r = Rotation.from_matrix(mat3)
# Convert rotation to euler angles
(phi, theta, psi) = r.as_euler(ORDER, degrees=True)
# Extract the origin from the translation components of 4x4 matrix
x = mat4[0][3]
y = mat4[1][3]
z = mat4[2][3]
return Csy(x,y,z,theta,psi,phi)
def toJSON(self):
return {
"isCsy": True,
"x": self.x,
"y": self.y,
"z": self.z,
"theta": self.theta,
"psi": self.psi,
"phi": self.phi
}
def isJSONObjectACsy(data):
"""Returns True if a dict object has the "isCsy" key. This is used
to automatically convert a deserialized JSON data structure to Csy objects."""
return type(data) == dict and data.get("isCsy", False)
def readPointData(data):
logger.debug("read point data %s" % data)
x = float(data[data.find("X(") + 2 : data.find("), Y")])
y = float(data[data.find("Y(") + 2 : data.find("), Z")])
z = float(data[data.find("Z(") + 2 : data.rfind(")")])
pt = float3(x,y,z)
return pt
async def noop(args=None):
pass
async def setEvent(event):
logger.debug("setting event")
event.set()
async def waitForEvent(event):
await event.wait()
def gotError():
raise CmmException()
async def setFutureException(fut,msg):
logger.debug("setFutureException %s" % msg)
if not fut.done():
fut.set_exception(CmmException(msg))
async def setFutureResult(fut,val):
if not fut.done():
fut.set_result(val)
async def futureWaitForCommandComplete(cmd, *args):
loop = asyncio.get_running_loop()
fut = loop.create_future()
obj = {}
callbacks = TransactionCallbacks(complete=(lambda: setFutureResult(fut,obj)), error=(lambda: setFutureException(fut,"boo")))
await cmd(*args, callbacks=callbacks)
logger.debug("awaiting fut")
r = await fut
logger.debug("Future resolved %s" % r)
async def waitForCommandComplete(cmd, *args, otherCallbacks=None):
cmdCompleted = asyncio.Event()
waitTask = asyncio.create_task(waitForEvent(cmdCompleted))
logger.debug(otherCallbacks)
if otherCallbacks:
callbacks = TransactionCallbacks(complete=(lambda: setEvent(cmdCompleted)), **otherCallbacks)
else:
callbacks = TransactionCallbacks(complete=(lambda: setEvent(cmdCompleted)))
logger.debug(callbacks.complete)
await cmd(*args, callbacks=callbacks)
await waitTask
class CmmException(Exception):
pass
class CmmExceptionUnexpectedCollision(CmmException):
pass
class CmmExceptionErrorsPresent(CmmException):
pass
class CmmExceptionAxisLimit(CmmException):
pass
class CmmExceptionUnknownCommand(CmmException):
pass
class TransactionStatus(Enum):
ERROR = -1
CREATED = 0
SENT = 1
ACK = 2
COMPLETE = 3
class TransactionCallbacks:
# def __init__(self, send, ack, complete, data, error):
def __init__(self):
self.send = []
self.ack = []
self.complete = []
self.data = []
self.error = []
class StandardTransactionCallbacks(TransactionCallbacks):
def __init__(self, future):
self.send = None
self.ack = None
self.complete = (lambda: setFutureResult(fut,data))
self.data = data
self.error = (lambda msg: setFutureException(fut,msg))
class Callbacks:
def __init__(self,transaction):
self.transaction = transaction
def send(self):
self.transaction.status = TransactionStatus.SENT
if self.transaction.custom_callbacks.send is not None:
self.transaction.custom_callbacks.send()
def ack(self):
self.transaction.status = TransactionStatus.ACK
def data(self):
self.transaction.data = TransactionStatus.SENT
def error(self):
self.transaction.error = TransactionStatus.ERROR
def complete(self):
self.transaction.complete = TransactionStatus.COMPLETE
class Transaction:
def __init__(self, tag, cmd):
self.status = TransactionStatus.CREATED
self.tag = tag
self.data_list = []
self.error_list = []
self.futures = {}
self.sendTask = None
self.command = cmd
self.fut = None
self.callbacks = TransactionCallbacks()
def register_callback(self, event, callback, once):
try:
getattr(self.callbacks, event).append({'callback': callback, 'once': once})
except AttributeError as err:
return err
def clear_callbacks(self, event=None):
if event is not None:
setattr(self.callbacks, event, [])
else:
self.callbacks = TransactionCallbacks()
def _std_event_callback(self, key):
loop = asyncio.get_running_loop()
fut = loop.create_future()
def callback(transaction, isError=False):
self.fut = None
fut.set_result(transaction)
def error_callback(transaction, isError=False):
self.fut = None
fut.set_exception(CmmException("".join(transaction.error_list)))
self.register_callback(key, callback, True)
self.register_callback('error', error_callback, True)
self.fut = fut
if self.sendCoro:
sendCoro = self.sendCoro
async def mycoro():
await sendCoro
return await fut
self.sendCoro = None
return asyncio.create_task(mycoro())
else:
return fut
def _process_event_callbacks(self, event, isError=False):
eventCallbacks = getattr(self.callbacks, event)
repeatCallbacks = []
for cb in eventCallbacks:
func = cb['callback']
func(self, isError)
if not cb['once']:
repeatCallbacks.append(cb)
setattr(self.callbacks, event, repeatCallbacks)
def send(self):
return self._std_event_callback("send")
def ack(self):
logger.debug("creating ack future for message %s", self.tag)
return self._std_event_callback("ack")
def complete(self):
logger.debug("creating complete future for message %s", self.tag)
return self._std_event_callback("complete")
def data(self):
logger.debug("creating data future for message %s", self.tag)
return self._std_event_callback("data")
def error(self):
logger.debug("creating error future for message %s", self.tag)
return self._std_event_callback("error")
def handle_send(self):
logger.debug("handling send for message %s", self.tag)
self.status = TransactionStatus.SENT
self._process_event_callbacks('send')
def handle_ack(self):
logger.debug("handling ack for message %s", self.tag)
self.status = TransactionStatus.ACK
self._process_event_callbacks('ack')
def handle_data(self, data_msg):
logger.debug("handling data for message %s", self.tag)
self.data_list.append(data_msg)
self._process_event_callbacks('data')
def handle_error(self, err_msg):
logger.debug("handling error for message %s", self.tag)
self.status = TransactionStatus.ERROR
self.error_list.append(err_msg)
self._process_event_callbacks('error', True)
def handle_complete(self):
logger.debug("handling complete for message %s", self.tag)
self.status = TransactionStatus.COMPLETE
self._process_event_callbacks('complete')
class Client:
def __init__(self, host=HOST, port=PORT):
self.host = host
self.port = port
self.tcpClient = TCPClient()
self.stream = None
self.nextTagNum = 1
self.nextEventTagNum = 1
self.transactions = {}
self.events = {}
self.buffer = ""
self.points = []
self.eventCallbacks = []
self.eventFutures = []
def is_connected(self):
return not self.stream.closed() if self.stream else False
async def connect(self):
logger.debug(tornado.version)
try:
logger.debug('connecting')
self.stream = await self.tcpClient.connect(self.host, self.port, timeout=3.0)
logger.debug('connected %s' % (self.stream,))
self.listenerTask = asyncio.create_task(self.handleMessages())
return True
except Exception as e:
logger.error("connect error %s", traceback.format_exc())
raise e
async def disconnect(self):
try:
if self.stream is not None:
self.stream.close()
except Exception as e:
logger.error("disconnect error %s", traceback.format_exc())
raise e
def sendCommand(self, command, isEvent=False):
try:
if isEvent:
tagNum = self.nextEventTagNum
tag = "E%04d" % eventTagNum
self.nextEventTagNum = self.nextEventTagNum%9999+1 # Get the next event tag between 1 - 9999
else:
tagNum = self.nextTagNum
tag = "%05d" % tagNum
self.nextTagNum = self.nextTagNum%99999+1 # Get the next tag between 1 - 99999
logger.debug("sendCommand %s, tag %s " % (command, tag))
transaction = Transaction(tag, command)
self.transactions[tag] = transaction
transaction.sendCoro = self._coro_send_command(transaction)
return transaction
except Exception as e:
logger.error(e)
raise e
async def _coro_send_command(self, transaction):
message = "%s %s\r\n" % (transaction.tag, transaction.command)
try:
await asyncio.wait_for(self.stream.write(message.encode('ascii')), 3.0)
except asyncio.TimeoutError as e:
logger.debug("Timeout!")
loop = asyncio.get_running_loop()
loop.stop()
raise e
transaction.handle_send()
async def readMessage(self):
msg = await self.stream.read_until(b"\r\n")
msg = msg.decode("ascii")
return msg
def addEventCallback(self, callback):
self.eventCallbacks.append(callback)
def removeEventCallback(self, callback):
self.eventCallbacks.remove(callback)
def manualProbe(self):
loop = asyncio.get_running_loop()
fut = loop.create_future()
def callback(msg):
try:
self.eventFutures.remove(fut)
self.removeEventCallback(callback)
fut.set_result(msg)
except Exception as e:
fut.set_exception(e)
self.addEventCallback(callback)
self.eventFutures.append(fut)
return fut
async def handleMessages(self, stopTag=None, stopKey=None):
'''
Run this in a coroutine
'''
logger.debug("started handling messages")
try:
while True:
msg = await self.readMessage()
logger.debug("handleMessage: %s" % msg)
msgTag = msg[0:5]
responseKey = msg[6]
if msgTag == "E0000":
logger.debug("Received E0000 event, calling all registered callbacks")
for callback in self.eventCallbacks:
logger.debug("Calling callback %s" % (callback,))
callback(msg[8:])
if msgTag in self.transactions:
transaction = self.transactions[msgTag]
if transaction.status != TransactionStatus.ERROR:
if responseKey == IPP_ACK_CHAR:
transaction.handle_ack()
elif responseKey == IPP_COMPLETE_CHAR:
transaction.handle_complete()
elif responseKey == IPP_DATA_CHAR:
transaction.handle_data(msg)
elif responseKey == IPP_ERROR_CHAR:
for t in self.transactions.values():
if t.fut:
t.handle_error(msg)
for f in self.eventFutures:
f.set_exception(CmmException(msg))
self.eventFutures.clear()
else:
logger.debug("%s NOT in transactions dict" % msgTag)
except StreamClosedError:
pass
'''
I++ Server Methods
'''
def StartSession(self):
return self.sendCommand("StartSession()")
def EndSession(self):
endTransaction = self.sendCommand("EndSession()")
self.nextTag = 1
self.nextEventTag = 1
return endTransaction
def StopDaemon(self, eventTag):
return self.sendCommand("StopDaemon(%s)" % eventTag)
def StopAllDaemons(self):
return self.sendCommand("StopAllDaemons()")
def AbortE(self):
'''
Fast Queue command
'''
return self.sendCommand("AbortE()", isEvent=True)
def GetErrorInfo(self, errNum=None):
return self.sendCommand("GetErrorInfo(%s)" % str(errNum or ''))
def ClearAllErrors(self):
return self.sendCommand("ClearAllErrors()")
def GetProp(self, propArr):
propsString = ", ".join(propArr)
return self.sendCommand("GetProp(%s)" % propsString)
def GetPropE(self, propArr):
'''
Fast Queue command
'''
propsString = ", ".join(propArr)
return self.sendCommand("GetPropE(%s)" % propsString, isEvent=true)
def SetProp(self, setPropString):
return self.sendCommand("SetProp(%s)" % setPropString)
def EnumProp(self, pointerString):
'''
Get the list of properties for a system object
For example, "EnumProp(Tool.PtMeasPar())" will return
the active tool's PtMeas properties list
'''
return self.sendCommand("EnumProp(%s)" % pointerString)
def EnumAllProp(self, pointerString):
'''
Get the entire tree of properties and sub-properties for a system object
'''
return self.sendCommand("EnumAllProp(%s)" % pointerString)
def GetDMEVersion(self):
return self.sendCommand("GetDMEVersion()")
'''
I++ DME Methods
'''
def Home(self):
return self.sendCommand("Home()")
def IsHomed(self):
return self.sendCommand("IsHomed()")
def EnableUser(self):
return self.sendCommand("EnableUser()")
def DisableUser(self):
return self.sendCommand("DisableUser()")
def IsUserEnabled(self):
return self.sendCommand("IsUserEnabled()")
def OnPtMeasReport(self, ptMeasFormatString):
'''
Define the information reported in the result of a PtMeas command
'''
return self.sendCommand("OnPtMeasReport(%s)" % ptMeasFormatString)
def OnMoveReportE(self, onMoveReportFormatString):
'''
Fast Queue command
Start a daemon that reports machine movement, and define
which information is sent (sequence and contents)
'''
return self.sendCommand("OnMoveReportE(%s)" % onMoveReportFormatString, isEvent=True)
def GetMachineClass(self):
return self.sendCommand("GetMachineClass()")
def GetErrStatusE(self):
'''
Fast Queue command
Response is "ErrStatus(1)" if in error
Response is "ErrStatus(0)" if ok
'''
return self.sendCommand("GetErrStatusE()", isEvent=true)
def GetXtdErrStatus(self):
'''
Response is one or more lines of status information
Could also include one or more errors
Example:
IsHomed(1)
IsUserEnabled(0)
1009: Air Pressure Out Of Range
0512: No Daemons Are Active.
'''
return self.sendCommand("GetXtdErrStatus()")
def Get(self, queryString):
'''
Query tool position
queryString example:
"X(), Y(), Z(), Tool.A(), Tool.B()"
'''
return self.sendCommand("Get(%s)" % queryString)
def GoTo(self, positionString):
'''
Move to a target position, including tool rotation
'''
return self.sendCommand("GoTo(%s)" % positionString)
async def sendPtMeas(self, ptMeasString):
await self.sendCommand("PtMeas(%s)" % ptMeasString)
def PtMeas(self, ptMeasString):
'''
Execute a single point measurement
Necessary parameters are defined by the active tool
Return format is set by OnPtMeasReport
Errors if surface not found (Error 1006: Surface not Found)
'''
cmdString = "PtMeas(%s)" % ptMeasString
return self.sendCommand(cmdString)
# See examples 7.6 and 7.7 in IDME specification for tool handling examples
def Tool(self):
'''
Select a pointer to the active tool
'''
return self.sendCommand("Tool()")
def FindTool(self, toolName):
'''
Select a pointer to a tool with a known name
'''
return self.sendCommand("FindTool(%s)" % toolName)
def FoundTool(self):
'''
Acts as pointer to tool selected by FindTool command
Default pointer is "UnDefTool"
'''
return self.sendCommand("FoundTool()")
def ChangeTool(self, toolName):
'''
Perform a tool change
'''
return self.sendCommand('ChangeTool("%s")' % toolName)
def SetTool(self, toolName):
'''
Force the server to assume a given tool is the active tool
'''
return self.sendCommand("SetTool(\"%s\")" % toolName)
def AlignTool(self, alignToolString):
'''
Orientate an alignable tool
'''
return self.sendCommand("AlignTool(%s)" % alignToolString)
def GoToPar(self):
'''
This method acts as a pointer to the GoToParameter block of the DME
'''
return self.sendCommand("GoToPar()")
def PtMeasPar(self):
'''
This method acts as a pointer to the PtMeasParameter block of the DME
'''
return self.sendCommand("PtMeasPar()")
def EnumTools(self):
'''
Returns a list of the names of available tools
'''
return self.sendCommand("enumTools()")
def GetChangeToolAction(self, toolName):
'''
Query the necessary movement to change to a given tool
'''
return self.sendCommand("GetChangeToolAction(%s)" % toolName)
def EnumToolCollection(self, collectionName):
'''
Query the names and types of tools (or child collections) belonging to a collection
'''
return self.sendCommand("EnumToolCollection(%s)" % collectionName)
def EnumAllToolCollections(self, collectionName):
'''
Recursively return all tools and sub-collections to this collection
'''
return self.sendCommand("EnumToolCollection(%s)" % collectionName)
def OpenToolCollection(self, collectionName):
'''
Make all tools in referenced collection visible, meaning a ChangeTool command can
directly use names in the collection without a path extension
'''
return self.sendCommand("EnumToolCollection(%s)" % collectionName)
def IjkAct(self):
'''
Query the meaning of server IJK() values: actual measured normal, nominal, or tool alignment
'''
return self.sendCommand("IJKAct()")
def PtMeasSelfCenter(self, ptMeasSelfCenterString):
'''
Execute a single point measurement by self-centering probing
Necessary parameters defined by the active tool
'''
return self.sendCommand("PtMeasSelfCenter(%s)" % ptMeasSelfCenterString)
def PtMeasSelfCenterLocked(self, ptMeasSelfCenterString):
'''
Execute a single point measurement by self-centering probing
without leaving a plane defined by params
Necessary parameters defined by the active tool
'''
return self.sendCommand("PtMeasSelfCenter(%s)" % ptMeasSelfCenterString)
def ReadAllTemperatures(self):
return self.sendCommand("ReadAllTemperatures()")
'''
I++ CartCMM Methods
'''
def SetCoordSystem(self, coodSystemString):
'''
Arg is one of: MachineCsy, MoveableMachineCsy, MultipleArmCsy, RotaryTableVarCsy, PartCsy
'''
return self.sendCommand("SetCoordSystem(%s)" % coodSystemString)
def GetCoordSystem(self):
'''
Query which coord sys is selected
'''
return self.sendCommand("GetCoordSystem()")
def GetCsyTransformation(self, getCsyTransformationString):
return self.sendCommand("GetCsyTransformation(%s)" % getCsyTransformationString)
def SetCsyTransformation(self, setCsyTransformationString):
return self.sendCommand("SetCsyTransformation(%s)" % setCsyTransformationString)
def SaveActiveCoordSystem(self, csyName):
return self.sendCommand("SaveActiveCoordSystem(%s)" % csyName)
def LoadCoordSystem(self, csyName):
return self.sendCommand("LoadCoordSystem(%s)" % csyName)
def DeleteCoordSystem(self, csyName):
return self.sendCommand("DeleteCoordSystem(%s)" % csyName)
def EnumCoordSystem(self, name):
return self.sendCommand("EnumCoordSystem()")
def GetNamedCsyTransformation(self, csyName):
return self.sendCommand("GetNamedCsyTransformation(%s)" % csyName)
def SaveNamedCsyTransformation(self, csyName, csyCoordsString):
return self.sendCommand("SaveNamedCsyTransformation(%s, %s)" % (csyName, csyCoordsStsring))
'''
I++ Tool Methods
'''
def ReQualify(self):
'''
Requalify active tool
'''
return self.sendCommand("ReQualify()")
def ScanPar(self):
'''
Acts as a pointer to the ScanParameter block of KTool instance
'''
return self.sendCommand("ScanPar()")
def AvrRadius(self):
'''
Return average tip radius of selected tool
'''
return self.sendCommand("AvrRadius()")
def IsAlignable(self):
'''
Query if selected tool is alignable
'''
return self.sendCommand("IsAlignable()")
def Alignment(self, alignmentVectorString):
'''
Query if selected tool is alignable
'''
return self.sendCommand("Alignment(%s)" % alignmentVectorString)
def CalcToolAlignment(self, calcToolAlignmentString):
'''
Query the alignment of selected tool
'''
return self.sendCommand("CalcToolAlignment(%s)" % calcToolAlignmentString)
def CalcToolAngles(self, calcToolAnglesString):
'''
Query the alignment of selected tool
'''
return self.sendCommand("CalcToolAngles(%s)" % calcToolAnglesString)
def UseSmallestAngleToAlignTool(self, enabled):
'''
Param is 0 or 1
When enabled with CalcToolAngles(1), attempting to rotate the tool by
180 degrees or more will produce an error
'''
return self.sendCommand("CalcToolAngles(%s)" % calcToolAnglesString)
'''
I++ Scanning Methods
'''
def OnScanReport(self, onScanReportString):
'''
Define the format of scan reports
'''
return self.sendCommand("OnScanReport(%s)" % onScanReportString)
def ScanOnCircleHint(self, displacement, form):
'''
Optimize ScanOnCircle execution by defining expected deviation from nominal of the measured circle
'''
return self.sendCommand("ScanOnCircleHint(%s, %s)" % (displacement, form))
def ScanOnCircle(self, scanOnCircleString):
'''
Perform a scanning measurement on a circular
Parameters: (Cx, Cy, Cz, Sx, Sy, Sz, i, j, k, delta, sfa, StepW)
Cx, Cy, Cz is the nominal center point of the circle
Sx, Sy, Sz is a point on the circle radius where the scan starts
i,j,k is the normal vector of the circle plane
delta is the angle to scan
sfa is the surface angle of the circle
StepW average angular distance between 2 measured points in degrees.
'''
return self.sendCommand("ScanOnCircle(%s)" % (scanOnCircleString))
def ScanOnLineHint(self, angle, form):
'''
Optimize ScanOnLine execution by defining expected deviation from nominal of the measured line
'''
return self.sendCommand("ScanOnLineHint(%s, %s)" % (angle, form))
def ScanOnLine(self, scanOnLineString):
'''
Perform a scanning measurement on a circular
Parameters: (Sx,Sy,Sz,Ex,Ey,Ez,i,j,k,StepW)
Sx, Sy, Sz defines the line start point
Ex, Ey, Ez defines the line end point
i,j,k is the surface normal vector on the line
StepW average distance between 2 measured points in mm
'''
return self.sendCommand("ScanOnLine(%s)" % (scanOnLineString))
def ScanOnCurveHint(self, deviation, minRadiusOfCurvature):
'''
Optimize ScanOnCurve execution by defining expected deviation from nominal of the measured curve
'''
return self.sendCommand("ScanOnCurveHint(%s, %s)" % (deviation, minRadiusOfCurvature))
def ScanOnCurveDensity(self, scanOnCurveDensityString):
'''
Define density of points returned from server by ScanOnCurve execution
Parameters: (Dis(),Angle(),AngleBaseLength(),AtNominals())
Dis() Maximum distance of 2 points returned
Angle() Maximum angle between the 2 segments between the last 3 points
AngleBaseLength() Baselength for calculating the Angle criteria (necessary for small point distances)
AtNominals() Boolean 0 or 1. If 1 the arguments Dis() and Angle() are ignored
Dis() or/and AtNominals() without Angle() and AngleBaseLength() also possible.
'''
return self.sendCommand("ScanOnCurveDensity(%s)" % (scanOnCurveDensityString))
def ScanOnCurve(self, scanOnCurveString):
'''
Perform a scanning measurement along a curve
Parameters: (
Closed(),
Format(X(),Y(),Z(),IJK(),tag[,pi,pj,pk[,si,sj,sk]]),
Data(
P1x,P1y,P1z,i1,j1,k1,tag1[,pi1,pj1,pk1[,si1,sj1,sk1]],
Pnx,Pny,Pnz,in,jn,jn,tagn[,pin,pjn,pkn[,sin,sjn,skn]]
)
)
Closed() Boolean 0 or 1. 1 means contour is closed
Format Defines the structure of data set send to server
X(),Y(),Z() Format definition for nominal point coordinates