Skip to content

Commit 4454e3a

Browse files
committedOct 15, 2019
better CAN comm abstraction
1 parent 43adad3 commit 4454e3a

File tree

1 file changed

+143
-114
lines changed

1 file changed

+143
-114
lines changed
 

‎python/uds.py

+143-114
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,117 @@ class InvalidSubFunctioneError(Exception):
267267
0x93: 'voltage too low',
268268
}
269269

270+
class IsoTpMessage():
271+
def __init__(self, can_tx_queue: Queue, can_rx_queue: Queue, timeout: float, debug: bool=False):
272+
self.can_tx_queue = can_tx_queue
273+
self.can_rx_queue = can_rx_queue
274+
self.timeout = timeout
275+
self.debug = debug
276+
277+
def send(self, dat: bytes) -> None:
278+
self.tx_dat = dat
279+
self.tx_len = len(dat)
280+
self.tx_idx = 0
281+
self.tx_done = False
282+
283+
if self.debug: print(f"ISO-TP: REQUEST - {hexlify(self.tx_dat)}")
284+
self._tx_first_frame()
285+
286+
def _tx_first_frame(self) -> None:
287+
if self.tx_len < 8:
288+
# single frame (send all bytes)
289+
if self.debug: print("ISO-TP: TX - single frame")
290+
msg = (bytes([self.tx_len]) + self.tx_dat).ljust(8, b"\x00")
291+
self.tx_done = True
292+
else:
293+
# first frame (send first 6 bytes)
294+
if self.debug: print("ISO-TP: TX - first frame")
295+
msg = (struct.pack("!H", 0x1000 | self.tx_len) + self.tx_dat[:6]).ljust(8, b"\x00")
296+
self.can_tx_queue.put(msg)
297+
298+
def recv(self) -> bytes:
299+
self.rx_dat = b""
300+
self.rx_len = 0
301+
self.rx_idx = 0
302+
self.rx_done = False
303+
304+
try:
305+
while True:
306+
self._isotp_rx_next()
307+
if self.tx_done and self.rx_done:
308+
return self.rx_dat
309+
except Empty:
310+
raise MessageTimeoutError("timeout waiting for response")
311+
finally:
312+
if self.debug: print(f"ISO-TP: RESPONSE - {hexlify(self.rx_dat)}")
313+
314+
def _isotp_rx_next(self) -> None:
315+
rx_data = self.can_rx_queue.get(block=True, timeout=self.timeout)
316+
317+
# single rx_frame
318+
if rx_data[0] >> 4 == 0x0:
319+
self.rx_len = rx_data[0] & 0xFF
320+
self.rx_dat = rx_data[1:1+self.rx_len]
321+
self.rx_idx = 0
322+
self.rx_done = True
323+
if self.debug: print(f"ISO-TP: RX - single frame - idx={self.rx_idx} done={self.rx_done}")
324+
return
325+
326+
# first rx_frame
327+
if rx_data[0] >> 4 == 0x1:
328+
self.rx_len = ((rx_data[0] & 0x0F) << 8) + rx_data[1]
329+
self.rx_dat = rx_data[2:]
330+
self.rx_idx = 0
331+
self.rx_done = False
332+
if self.debug: print(f"ISO-TP: RX - first frame - idx={self.rx_idx} done={self.rx_done}")
333+
if self.debug: print(f"ISO-TP: TX - flow control continue")
334+
# send flow control message (send all bytes)
335+
msg = b"\x30\x00\x00".ljust(8, b"\x00")
336+
self.can_tx_queue.put(msg)
337+
return
338+
339+
# consecutive rx frame
340+
if rx_data[0] >> 4 == 0x2:
341+
assert self.rx_done == False, "isotp - rx: consecutive frame with no active frame"
342+
self.rx_idx += 1
343+
assert self.rx_idx & 0xF == rx_data[0] & 0xF, "isotp - rx: invalid consecutive frame index"
344+
rx_size = self.rx_len - len(self.rx_dat)
345+
self.rx_dat += rx_data[1:1+min(rx_size, 7)]
346+
if self.rx_len == len(self.rx_dat):
347+
self.rx_done = True
348+
if self.debug: print(f"ISO-TP: RX - consecutive frame - idx={self.rx_idx} done={self.rx_done}")
349+
return
350+
351+
# flow control
352+
if rx_data[0] >> 4 == 0x3:
353+
assert self.tx_done == False, "isotp - rx: flow control with no active frame"
354+
assert rx_data[0] != 0x32, "isotp - rx: flow-control overflow/abort"
355+
assert rx_data[0] == 0x30 or rx_data[0] == 0x31, "isotp - rx: flow-control transfer state indicator invalid"
356+
if rx_data[0] == 0x30:
357+
if self.debug: print("ISO-TP: RX - flow control continue")
358+
delay_ts = rx_data[2] & 0x7F
359+
# scale is 1 milliseconds if first bit == 0, 100 micro seconds if first bit == 1
360+
delay_div = 1000. if rx_data[2] & 0x80 == 0 else 10000.
361+
# first frame = 6 bytes, each consecutive frame = 7 bytes
362+
start = 6 + self.tx_idx * 7
363+
count = rx_data[1]
364+
end = start + count * 7 if count > 0 else self.tx_len
365+
for i in range(start, end, 7):
366+
if delay_ts > 0 and i > start:
367+
delay_s = delay_ts / delay_div
368+
if self.debug: print(f"ISO-TP: TX - delay - seconds={delay_s}")
369+
time.sleep(delay_s)
370+
self.tx_idx += 1
371+
# consecutive tx frames
372+
msg = (bytes([0x20 | (self.tx_idx & 0xF)]) + self.tx_dat[i:i+7]).ljust(8, b"\x00")
373+
self.can_tx_queue.put(msg)
374+
if end >= self.tx_len:
375+
self.tx_done = True
376+
if self.debug: print(f"ISO-TP: TX - consecutive frame - idx={self.tx_idx} done={self.tx_done}")
377+
elif rx_data[0] == 0x31:
378+
# wait (do nothing until next flow control message)
379+
if self.debug: print("ISO-TP: TX - flow control wait")
380+
270381
class UdsClient():
271382
def __init__(self, panda, tx_addr: int, rx_addr: int=None, bus: int=0, timeout: int=10, debug: bool=False):
272383
self.panda = panda
@@ -282,20 +393,17 @@ def __init__(self, panda, tx_addr: int, rx_addr: int=None, bus: int=0, timeout:
282393
else:
283394
raise ValueError("invalid tx_addr: {}".format(tx_addr))
284395

285-
self.tx_queue = Queue()
286-
self.rx_queue = Queue()
396+
self.can_tx_queue = Queue()
397+
self.can_rx_queue = Queue()
287398
self.timeout = timeout
288399
self.debug = debug
289400

290-
self.can_reader_t = Thread(target=self._isotp_thread, args=(self.debug,))
291-
self.can_reader_t.daemon = True
292-
self.can_reader_t.start()
401+
self.can_thread = Thread(target=self._can_thread, args=(self.debug,))
402+
self.can_thread.daemon = True
403+
self.can_thread.start()
293404

294-
def _isotp_thread(self, debug: bool=False):
405+
def _can_thread(self, debug: bool=False):
295406
try:
296-
rx_frame = {"size": 0, "data": b"", "idx": 0, "done": True}
297-
tx_frame = {"size": 0, "data": b"", "idx": 0, "done": True}
298-
299407
# allow all output
300408
self.panda.set_safety_mode(0x1337)
301409
# clear tx buffer
@@ -304,96 +412,23 @@ def _isotp_thread(self, debug: bool=False):
304412
self.panda.can_clear(0xFFFF)
305413

306414
while True:
307-
messages = self.panda.can_recv()
308-
for rx_addr, rx_ts, rx_data, rx_bus in messages:
415+
# send
416+
while not self.can_tx_queue.empty():
417+
msg = self.can_tx_queue.get(block=False)
418+
if debug: print("CAN-TX: {} - {}".format(hex(self.tx_addr), hexlify(msg)))
419+
self.panda.can_send(self.tx_addr, msg, self.bus)
420+
421+
# receive
422+
msgs = self.panda.can_recv()
423+
for rx_addr, rx_ts, rx_data, rx_bus in msgs:
309424
if rx_bus != self.bus or rx_addr != self.rx_addr or len(rx_data) == 0:
310425
continue
311-
312-
if (debug): print("R: {} {}".format(hex(rx_addr), hexlify(rx_data)))
313-
if rx_data[0] >> 4 == 0x0:
314-
# single rx_frame
315-
rx_frame["size"] = rx_data[0] & 0xFF
316-
rx_frame["data"] = rx_data[1:1+rx_frame["size"]]
317-
rx_frame["idx"] = 0
318-
rx_frame["done"] = True
319-
self.rx_queue.put(rx_frame["data"])
320-
elif rx_data[0] >> 4 == 0x1:
321-
# first rx_frame
322-
rx_frame["size"] = ((rx_data[0] & 0x0F) << 8) + rx_data[1]
323-
rx_frame["data"] = rx_data[2:]
324-
rx_frame["idx"] = 0
325-
rx_frame["done"] = False
326-
# send flow control message (send all bytes)
327-
msg = b"\x30\x00\x00".ljust(8, b"\x00")
328-
if (debug): print("S: {} {}".format(hex(self.tx_addr), hexlify(msg)))
329-
self.panda.can_send(self.tx_addr, msg, self.bus)
330-
elif rx_data[0] >> 4 == 0x2:
331-
# consecutive rx frame
332-
assert rx_frame["done"] == False, "rx: no active frame"
333-
# validate frame index
334-
rx_frame["idx"] += 1
335-
assert rx_frame["idx"] & 0xF == rx_data[0] & 0xF, "rx: invalid consecutive frame index"
336-
rx_size = rx_frame["size"] - len(rx_frame["data"])
337-
rx_frame["data"] += rx_data[1:1+min(rx_size, 7)]
338-
if rx_frame["size"] == len(rx_frame["data"]):
339-
rx_frame["done"] = True
340-
self.rx_queue.put(rx_frame["data"])
341-
elif rx_data[0] >> 4 == 0x3:
342-
# flow control
343-
if tx_frame["done"] != False:
344-
tx_frame["done"] = True
345-
self.rx_queue.put(b"\x7F\xFF\xFFtx: no active frame")
346-
if rx_data[0] == 0x32:
347-
# 0x32 = overflow/abort
348-
tx_frame["done"] = True
349-
self.rx_queue.put(b"\x7F\xFF\xFFtx: flow-control error - overflow/abort")
350-
if rx_data[0] != 0x30 and rx_data[0] != 0x31:
351-
# 0x30 = continue
352-
# 0x31 = wait
353-
tx_frame["done"] = True
354-
self.rx_queue.put(b"\x7F\xFF\xFFtx: flow-control error - invalid transfer state indicator")
355-
if rx_data[0] == 0x30:
356-
delay_ts = rx_data[2] & 0x7F
357-
# scale is 1 milliseconds if first bit == 0, 100 micro seconds if first bit == 1
358-
delay_div = 1000. if rx_data[2] & 0x80 == 0 else 10000.
359-
# first frame = 6 bytes, each consecutive frame = 7 bytes
360-
start = 6 + tx_frame["idx"] * 7
361-
count = rx_data[1]
362-
end = start + count * 7 if count > 0 else tx_frame["size"]
363-
for i in range(start, end, 7):
364-
if delay_ts > 0 and i > start:
365-
if (debug): print("D: {}".format(delay_ts / delay_div))
366-
time.sleep(delay_ts / delay_div)
367-
tx_frame["idx"] += 1
368-
# consecutive tx frames
369-
msg = (bytes([0x20 | (tx_frame["idx"] & 0xF)]) + tx_frame["data"][i:i+7]).ljust(8, b"\x00")
370-
if (debug): print("S: {} {}".format(hex(self.tx_addr), hexlify(msg)))
371-
self.panda.can_send(self.tx_addr, msg, self.bus)
372-
if end >= tx_frame["size"]:
373-
tx_frame["done"] = True
374-
375-
if not self.tx_queue.empty():
376-
req = self.tx_queue.get(block=False)
377-
# reset rx and tx frames
378-
rx_frame = {"size": 0, "data": b"", "idx": 0, "done": True}
379-
tx_frame = {"size": len(req), "data": req, "idx": 0, "done": False}
380-
if tx_frame["size"] < 8:
381-
# single frame
382-
tx_frame["done"] = True
383-
msg = (bytes([tx_frame["size"]]) + tx_frame["data"]).ljust(8, b"\x00")
384-
if (debug): print("S: {} {}".format(hex(self.tx_addr), hexlify(msg)))
385-
self.panda.can_send(self.tx_addr, msg, self.bus)
386-
else:
387-
# first rx_frame
388-
tx_frame["done"] = False
389-
msg = (struct.pack("!H", 0x1000 | tx_frame["size"]) + tx_frame["data"][:6]).ljust(8, b"\x00")
390-
if (debug): print("S: {} {}".format(hex(self.tx_addr), hexlify(msg)))
391-
self.panda.can_send(self.tx_addr, msg, self.bus)
426+
if debug: print("CAN-RX: {} - {}".format(hex(self.rx_addr), hexlify(rx_data)))
427+
self.can_rx_queue.put(rx_data)
392428
else:
393429
time.sleep(0.01)
394430
finally:
395431
self.panda.close()
396-
self.rx_queue.put(None)
397432

398433
# generic uds request
399434
def _uds_request(self, service_type: SERVICE_TYPE, subfunction: int=None, data: bytes=None) -> bytes:
@@ -402,16 +437,12 @@ def _uds_request(self, service_type: SERVICE_TYPE, subfunction: int=None, data:
402437
req += bytes([subfunction])
403438
if data is not None:
404439
req += data
405-
self.tx_queue.put(req)
406440

441+
# send request, wait for response
442+
isotp_msg = IsoTpMessage(self.can_tx_queue, self.can_rx_queue, self.timeout, self.debug)
443+
isotp_msg.send(req)
407444
while True:
408-
try:
409-
resp = self.rx_queue.get(block=True, timeout=self.timeout)
410-
except Empty:
411-
raise MessageTimeoutError("timeout waiting for response")
412-
if resp is None:
413-
raise MessageTimeoutError("timeout waiting for response")
414-
445+
resp = isotp_msg.recv()
415446
resp_sid = resp[0] if len(resp) > 0 else None
416447

417448
# negative response
@@ -428,24 +459,22 @@ def _uds_request(self, service_type: SERVICE_TYPE, subfunction: int=None, data:
428459
error_desc = resp[3:]
429460
# wait for another message if response pending
430461
if error_code == 0x78:
431-
time.sleep(0.1)
432462
continue
433463
raise NegativeResponseError('{} - {}'.format(service_desc, error_desc), service_id, error_code)
434-
break
435464

436-
# positive response
437-
if service_type+0x40 != resp_sid:
438-
resp_sid_hex = hex(resp_sid) if resp_sid is not None else None
439-
raise InvalidServiceIdError('invalid response service id: {}'.format(resp_sid_hex))
465+
# positive response
466+
if service_type+0x40 != resp_sid:
467+
resp_sid_hex = hex(resp_sid) if resp_sid is not None else None
468+
raise InvalidServiceIdError('invalid response service id: {}'.format(resp_sid_hex))
440469

441-
if subfunction is not None:
442-
resp_sfn = resp[1] if len(resp) > 1 else None
443-
if subfunction != resp_sfn:
444-
resp_sfn_hex = hex(resp_sfn) if resp_sfn is not None else None
445-
raise InvalidSubFunctioneError('invalid response subfunction: {}'.format(hex(resp_sfn)))
470+
if subfunction is not None:
471+
resp_sfn = resp[1] if len(resp) > 1 else None
472+
if subfunction != resp_sfn:
473+
resp_sfn_hex = hex(resp_sfn) if resp_sfn is not None else None
474+
raise InvalidSubFunctioneError('invalid response subfunction: {}'.format(hex(resp_sfn)))
446475

447-
# return data (exclude service id and sub-function id)
448-
return resp[(1 if subfunction is None else 2):]
476+
# return data (exclude service id and sub-function id)
477+
return resp[(1 if subfunction is None else 2):]
449478

450479
# services
451480
def diagnostic_session_control(self, session_type: SESSION_TYPE):

0 commit comments

Comments
 (0)