Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Improve to_string and x25crc #748

Merged
merged 4 commits into from
Nov 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 15 additions & 31 deletions DFReader.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,45 +127,29 @@ def set_unit_ids(self, unit_ids):
def set_mult_ids(self, mult_ids):
'''set mult IDs string from FMTU'''
self.mult_ids = mult_ids

def __str__(self):
return ("DFFormat(%s,%s,%s,%s)" %
(self.type, self.name, self.format, self.columns))

# Swiped into mavgen_python.py
def to_string(s):
'''desperate attempt to convert a string regardless of what garbage we get'''
try:
return s.decode("utf-8")
except Exception as e:
pass
try:
s2 = s.encode('utf-8', 'ignore')
x = u"%s" % s2
return s2
except Exception:
pass
# so it's a nasty one. Let's grab as many characters as we can
r = ''
while s != '':
try:
r2 = r + s[0]
s = s[1:]
r2 = r2.encode('ascii', 'ignore')
x = u"%s" % r2
r = r2
except Exception:
break
return r + '_XXX'

def null_term(str):
if isinstance(s, str):
return s
if sys.version_info[0] == 2:
# In python2 we want to return unicode for passed in unicode
return s
return s.decode(errors="backslashreplace")

def null_term(string):
'''null terminate a string'''
if isinstance(str, bytes):
str = to_string(str)
idx = str.find("\0")
if isinstance(string, bytes):
string = to_string(string)
idx = string.find("\0")
if idx != -1:
str = str[:idx]
return str
string = string[:idx]
return string


class DFMessage(object):
Expand Down Expand Up @@ -1130,7 +1114,7 @@ def init_arrays(self, progress_callback=None):
if mtype == "FMTU":
self.offset = ofs
self._parse_next()

ofs = self.data_map.find(b"\n", ofs)
if ofs == -1:
break
Expand Down
43 changes: 22 additions & 21 deletions generator/mavcrc.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,37 +4,38 @@
Copyright Andrew Tridgell
Released under GNU LGPL version 3 or later
'''
import sys
from builtins import object


class x25crc(object):
'''CRC-16/MCRF4XX - based on checksum.h from mavlink library'''
"""CRC-16/MCRF4XX - based on checksum.h from mavlink library"""

def __init__(self, buf=None):
self.crc = 0xffff
self.crc = 0xFFFF
if buf is not None:
if isinstance(buf, str):
self.accumulate_str(buf)
else:
self.accumulate(buf)
self.accumulate(buf)

def accumulate(self, buf):
'''add in some more bytes'''
"""add in some more bytes (it also accepts strings)"""
if sys.version_info[0] == 2:
if type(buf) is str:
buf = bytearray(buf)
elif type(buf).__name__ == 'unicode':
# we can't use the identifier unicode in python3
buf = bytearray(buf.encode())
elif type(buf) is str:
buf = buf.encode()

accum = self.crc
for b in buf:
tmp = b ^ (accum & 0xff)
tmp = (tmp ^ (tmp<<4)) & 0xFF
accum = (accum>>8) ^ (tmp<<8) ^ (tmp<<3) ^ (tmp>>4)
tmp = b ^ (accum & 0xFF)
tmp = (tmp ^ (tmp << 4)) & 0xFF
accum = (accum >> 8) ^ (tmp << 8) ^ (tmp << 3) ^ (tmp >> 4)
self.crc = accum

def accumulate_str(self, buf):
'''add in some more bytes'''
accum = self.crc
import array
bytes_array = array.array('B')
try: # if buf is bytes
bytes_array.frombytes(buf)
except TypeError: # if buf is str
bytes_array.frombytes(buf.encode())
except AttributeError: # Python < 3.2
bytes_array.fromstring(buf)
self.accumulate(bytes_array)
"""
Provided for backwards compatibility. accumulate now also works on strings.
"""
return self.accumulate(buf)
24 changes: 3 additions & 21 deletions mavutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
import re
from pymavlink import mavexpression

# We want to re-export x25crc here
from pymavlink.generator.mavcrc import x25crc as x25crc

is_py3 = sys.version_info >= (3,0)
supports_type_annotations = sys.version_info >= (3,6)

Expand Down Expand Up @@ -2281,27 +2284,6 @@ def mode_string_acm(mode_number):
return mode_mapping_acm[mode_number]
return "Mode(%u)" % mode_number

class x25crc(object):
'''CRC-16/MCRF4XX - based on checksum.h from mavlink library'''
def __init__(self, buf=''):
self.crc = 0xffff
self.accumulate(buf)

def accumulate(self, buf):
'''add in some more bytes'''
byte_buf = array.array('B')
if isinstance(buf, array.array):
byte_buf.extend(buf)
else:
byte_buf.fromstring(buf)
accum = self.crc
for b in byte_buf:
tmp = b ^ (accum & 0xff)
tmp = (tmp ^ (tmp<<4)) & 0xFF
accum = (accum>>8) ^ (tmp<<8) ^ (tmp<<3) ^ (tmp>>4)
accum = accum & 0xFFFF
self.crc = accum

class MavlinkSerialPort(object):
'''an object that looks like a serial port, but
transmits using mavlink SERIAL_CONTROL packets'''
Expand Down
31 changes: 8 additions & 23 deletions tools/mavlogdump.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,28 +155,13 @@ def reduce_rate_msg(m, reduction_rate):

# swiped from DFReader.py
def to_string(s):
"""desperate attempt to convert a string regardless of what garbage we get"""
try:
return s.decode("utf-8")
except Exception:
pass
try:
s2 = s.encode("utf-8", "ignore")
x = u"%s" % s2
return x
except Exception:
pass
# so it's a nasty one. Let's grab as many characters as we can
r = ""
try:
for c in s:
r2 = r + c
r2 = r2.encode("ascii", "ignore")
x = u"%s" % r2
r = r2
except Exception:
pass
return r + "_XXX"
'''desperate attempt to convert a string regardless of what garbage we get'''
if isinstance(s, str):
return s
if sys.version_info[0] == 2:
# In python2 we want to return unicode for passed in unicode
return s
return s.decode(errors="backslashreplace")

def match_type(mtype, patterns):
'''return True if mtype matches pattern'''
Expand Down Expand Up @@ -259,7 +244,7 @@ def match_type(mtype, patterns):

if args.reduce_rate > 0 and reduce_rate_msg(m, args.reduce_rate):
continue

if output is not None:
if (isbin or islog) and m_type == "FMT":
output.write(m.get_msgbuf())
Expand Down