-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathUSBCAN.py
156 lines (133 loc) · 4.06 KB
/
USBCAN.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
import serial
from time import sleep
import logging
import sys
import threading
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
logger = logging.getLogger('comfoair-can')
class CN1FAddr:
def __init__(self, SrcAddr, DstAddr, Address, MultiMsg, A8000, A10000, SeqNr):
self.SrcAddr = SrcAddr
self.DstAddr = DstAddr
self.Address = Address
self.MultiMsg = MultiMsg
self.A8000 = A8000
self.A10000 = A10000
self.SeqNr = SeqNr
def id(self):
addr = 0x0
addr |= self.SrcAddr << 0
addr |= self.DstAddr << 6
addr |= self.Address <<12
addr |= self.MultiMsg<<14
addr |= self.A8000 <<15
addr |= self.A10000 <<16
addr |= self.SeqNr <<17
addr |= 0x1F <<24
return addr
def hex(self):
return hex(self.id())[2:]
def bytes(self):
return bytes.fromhex(self.hex())
class CANInterface:
START_BYTE_1 = 0x55
START_BYTE_2 = 0XAA
COMFOAIR_ADDRESS = 1
send_sequence_nr = 0
def __init__(self, device, baudrate):
self.device = device
self.baudrate = baudrate
self.write_lock = threading.Lock()
def open(self):
self.sp = serial.Serial(self.device, self.baudrate)
self._send_magic_init_packet()
def read(self, callback):
frame = bytearray()
while True:
if self.sp.in_waiting != 0 and not self.write_lock.locked():
new_byte = self._get_single_byte()
if new_byte == self.START_BYTE_1:
next_byte = self._get_single_byte()
if next_byte == self.START_BYTE_2:
id = frame[1:5]
data =frame[5:]
id_hex = str() + format(int.from_bytes(id, byteorder='little'), '#10X')
pdid = (int(id_hex, 16)>>14)&0x7ff
frame = bytearray()
callback(pdid, data)
else:
frame.append(new_byte)
frame.append(next_byte)
else:
frame.append(new_byte)
else:
sleep(1)
def send(self, data):
self.write_lock.acquire()
try:
num_bytes = len(data)
can_id = CN1FAddr(0x11, self.COMFOAIR_ADDRESS, 1, num_bytes>8, 0, 1, self.send_sequence_nr)
self.send_sequence_nr = (self.send_sequence_nr + 1)&0x3
if len(data) > 8:
datagrams = int(len(data)/7)
if len(data) == datagrams*7:
datagrams -= 1
for n in range(datagrams):
self._write_to_can(can_id.bytes(), [n]+data[n*7:n*7+7])
n+=1
restlen = len(data)-n*7
self._write_to_can(can_id.bytes(), [n|0x80]+data[n*7:n*7+restlen])
else:
self._write_to_can(can_id.bytes(), data)
finally:
self.write_lock.release()
#----- private
def _get_single_byte(self):
return int.from_bytes(self.sp.read(size=1), byteorder='little')
def _write_to_can(self, id, data):
num_bytes=len(data)
send_buf = bytearray([0xAA,0xE0|num_bytes,])
for byte in reversed(id):
send_buf.append(byte)
for byte in data:
send_buf.append(byte)
send_buf.append(0x55)
self.sp.write(send_buf)
def _send_magic_init_packet(self):
send_buf = bytearray()
send_buf.append(0xAA)
send_buf.append(0x55)
#Pack mystery byte
send_buf.append(0x12)
#Pack byte indicating CAN bus speed
send_buf.append(0x09)
#Pack frame type byte
#use extended
send_buf.append(0x02)
#Filter not supported
send_buf.append(0x00)
send_buf.append(0x00)
send_buf.append(0x00)
send_buf.append(0x00)
#Mask not supported
send_buf.append(0x00)
send_buf.append(0x00)
send_buf.append(0x00)
send_buf.append(0x00)
#Hardcode mode to Normal? Set to 0x01 to get loopback mode
send_buf.append(0x00)
#Send magic byte (may have to be 0x01?)
send_buf.append(0x01)
#Send more magic bytes
send_buf.append(0x00)
send_buf.append(0x00)
send_buf.append(0x00)
send_buf.append(0x00)
#Calculate checksum
checksum = 0
for idx in range(0,18):
checksum += int(send_buf[idx])
checksum = checksum % 255
send_buf.append(checksum)
self.sp.write(send_buf)
logger.info('init config done')