From 9e1f1265f73c3d77280da2eee33475aa8ea79445 Mon Sep 17 00:00:00 2001 From: luismeruje <17674315+luismeruje@users.noreply.github.com> Date: Sat, 4 Nov 2023 03:40:59 +0000 Subject: [PATCH] Structure code and bug fix (#15) * Cleanup code. Exhaust readings before exiting when program receives Keyboard Interrupt. * Added tool to calculate average values from a series of measurement files and organize them into a csv file. * Replace configparser with argparser. Further restructuring. * Removed extra print. --- fnirsi_logger.py | 350 +++++++++++++++++++++++++++-------------------- requirements.txt | 4 +- settings.ini | 2 + 3 files changed, 204 insertions(+), 152 deletions(-) create mode 100644 settings.ini diff --git a/fnirsi_logger.py b/fnirsi_logger.py index 3bb1181..b252f19 100755 --- a/fnirsi_logger.py +++ b/fnirsi_logger.py @@ -5,12 +5,12 @@ import usb.core import usb.util +import argparse +from typing import Union -crc = None -try: - import crc -except ModuleNotFoundError: - print("Warning: crc package not found. crc checks will not be performed", file=sys.stderr) +#Global variables +#================ +is_fnb58_or_fnb48s = False # FNB48 # Bus 001 Device 020: ID 0483:003a STMicroelectronics FNB-48 @@ -33,7 +33,7 @@ # Bus 001 Device 003: ID 2e3c:0049 FNIRSI USB Tester VID_FNB48S = 0x2E3C PID_FNB48S = 0x0049 - +#================= def setup_crc(): if crc is None: @@ -54,10 +54,8 @@ def setup_crc(): calculator = crc.Calculator(configuration, optimized=True) return calculator.checksum - -def main(): - # Find our device - is_fnb58_or_fnb48s = False +def find_device(): + global is_fnb58_or_fnb48s dev = usb.core.find(idVendor=VID, idProduct=PID_FNB48) if dev is None: dev = usb.core.find(idVendor=VID, idProduct=PID_C1) @@ -69,57 +67,170 @@ def main(): dev = usb.core.find(idVendor=VID_FNB48S, idProduct=PID_FNB48S) if dev: is_fnb58_or_fnb48s = True + return dev - assert dev, "Device not found" +def find_hid_interface_num(dev): + # https://github.com/pyusb/pyusb/issues/76#issuecomment-118460796 + intf_hid = 0 + for cfg in dev: + for interface in cfg: + if interface.bInterfaceClass == 0x03: # HID class + return interface.bInterfaceNumber - if False: - print(dev, file=sys.stderr) + - dev.reset() +def ensure_interface_not_busy(dev, interface_num): + if dev.is_kernel_driver_active(interface_num): + try: + dev.detach_kernel_driver(interface_num) + except usb.core.USBError as e: + sys.exit(f"Could not detatch kernel driver from interface({interface_num}): {e}") + +def print_configs(dev): + for cfg in dev: + print("cfg", cfg.bConfigurationValue, file=sys.stderr) + for interface in cfg: + print( + " ", + "interface", + interface.bInterfaceNumber, + interface.bAlternateSetting, + file=sys.stderr, + ) + for ep in interface: + print(" ", "ep", ep.bEndpointAddress, file=sys.stderr) - # if dev.is_kernel_driver_active(0): - # try: - # dev.detach_kernel_driver(0) - # except usb.core.USBError as e: - # sys.exit("Could not detach kernel driver: ") - # https://github.com/pyusb/pyusb/issues/76#issuecomment-118460796 - intf_hid = 0 +def print_configs_overview(dev): for cfg in dev: - for intf in cfg: - if intf.bInterfaceClass == 0x03: # HID class - intf_hid = intf.bInterfaceNumber + for interface in cfg: + print(interface) + +#global variables for decodee +energy = 0.0 +capacity = 0.0 + +def decode(data, calculate_crc, time_interval): + global energy + global capacity + alpha = 0.9 # smoothing factor for temperature + temp_ema = None - if dev.is_kernel_driver_active(intf.bInterfaceNumber): - try: - dev.detach_kernel_driver(intf.bInterfaceNumber) - except usb.core.USBError as e: - sys.exit(f"Could not detatch kernel driver from interface({intf.bInterfaceNumber}): {e}") + # Data is 64 bytes (64 bytes of HID data minus vendor constant 0xaa) + # First byte is HID vendor constant 0xaa + # Second byte is payload type: + # 0x04 is data packet + # Other types (0x03 and maybe other ones) is unknown + # Next 4 samples each 15 bytes. 60 bytes total. + # At the end 2 bytes: + # 1 byte is semi constant with unknown purpose. + # 1 byte (last) is a 8-bit CRC checksum + + packet_type = data[1] + if packet_type != 0x04: + # ignore all non-data packets + # print("Ignoring") + return + + if calculate_crc: + actual_checksum = data[-1] + expected_checksum = calculate_crc(data[1:-1]) + if actual_checksum != expected_checksum: + print( + f"Ignoring packet of length {len(data)} with unexpected checksum. Expected: {expected_checksum:02x} Actual: {actual_checksum:02x}", + file=sys.stderr, + ) + return - usb.util.claim_interface(dev, 0) + t0 = time.time() - 4 * time_interval + for i in range(4): + offset = 2 + 15 * i + # 4 + 4 + 2 + 2 + 2 + 1 (unknown constant 1) = 15 bytes + voltage = ( + data[offset + 3] * 256 * 256 * 256 + + data[offset + 2] * 256 * 256 + + data[offset + 1] * 256 + + data[offset + 0] + ) / 100000 + current = ( + data[offset + 7] * 256 * 256 * 256 + + data[offset + 6] * 256 * 256 + + data[offset + 5] * 256 + + data[offset + 4] + ) / 100000 + dp = (data[offset + 8] + data[offset + 9] * 256) / 1000 + dn = (data[offset + 10] + data[offset + 11] * 256) / 1000 + # unknown12 = data[offset + 12] # ? constant 1 # some PD info? + # It does not look to be a sign of current. I tried reversing + # USB-C-In and USB-C-Out, which does reversed orientation of the + # blue arrow for current on device screen, but unknown12 remains 1. + # print(f"unknown{offset+12} {unknown12:02x}") + temp_C = (data[offset + 13] + data[offset + 14] * 256) / 10.0 + if temp_ema is not None: + temp_ema = temp_C * (1.0 - alpha) + temp_ema * alpha + else: + temp_ema = temp_C + power = voltage * current + # TODO(baryluk): This might be slightly inaccurate, if there is sampling jitter. + energy += power * time_interval + capacity += current * time_interval + t = t0 + i * time_interval + print( + f"{t:.3f} {i} {voltage:7.5f} {current:7.5f} {dp:5.3f} {dn:5.3f} {temp_ema:6.3f} {energy:.6f} {capacity:.6f}" + ) + # unknown62 = data[62] # data[-2] + # print(f"unknown62 {unknown:02x}") + # print() + + + + +def request_data(ep_out): + #Setup communication with power meter + ep_out.write(b"\xaa\x81" + b"\x00" * 61 + b"\x8e") + ep_out.write(b"\xaa\x82" + b"\x00" * 61 + b"\x96") + + if is_fnb58_or_fnb48s: + ep_out.write(b"\xaa\x82" + b"\x00" * 61 + b"\x96") + else: + ep_out.write(b"\xaa\x83" + b"\x00" * 61 + b"\x9e") + + +def main(args): + global is_fnb58_or_fnb48s + # At the moment only 100 sps is supported + sps = 100 + time_interval = 1.0 / sps + crc_calculator = None + if args.crc: + try: + crc_calculator = setup_crc() # can be None + except Exception as e: + print("When initializing crc module got exception: {e}, disabling crc checks", file=sys.stderr) + crc_calculator = None + + dev = find_device() + assert dev, "Device not found" + #print("Found " + ("FNB48s/FNB58" if is_fnb58_or_fnb48s else "FNB48") + " device.") + + dev.reset() + + #print_configs(dev) + #print_configs_overview(dev) + + interface_hid_num = find_hid_interface_num(dev) + ensure_interface_not_busy(dev, interface_hid_num) + + #If you check pyusb's code, this is not implemented + #usb.util.claim_interface(dev, 0) # set the active configuration. With no arguments, the first # configuration will be the active one - - if False: - for cfg in dev: - print("cfg", cfg.bConfigurationValue, file=sys.stderr) - for intf in cfg: - print( - " ", - "intf", - intf.bInterfaceNumber, - intf.bAlternateSetting, - file=sys.stderr, - ) - for ep in intf: - print(" ", "ep", ep.bEndpointAddress, file=sys.stderr) - - # dev.set_configuration(1) + dev.set_configuration() # get an endpoint instance cfg = dev.get_active_configuration() - intf = cfg[(intf_hid, 0)] + intf = cfg[(interface_hid_num, 0)] ep_out = usb.util.find_descriptor( intf, @@ -136,114 +247,53 @@ def main(): assert ep_in assert ep_out - ep_out.write(b"\xaa\x81" + b"\x00" * 61 + b"\x8e") - ep_out.write(b"\xaa\x82" + b"\x00" * 61 + b"\x96") - - if is_fnb58_or_fnb48s: - ep_out.write(b"\xaa\x82" + b"\x00" * 61 + b"\x96") - else: - ep_out.write(b"\xaa\x83" + b"\x00" * 61 + b"\x9e") - - alpha = 0.9 # smoothing factor for temperature - temp_ema = None - - # At the moment only 100 sps is supported - sps = 100 - time_interval = 1.0 / sps - - energy = 0.0 - capacity = 0.0 - - try: - calculate_crc = setup_crc() # can be None - except Exception as e: - print("When initializing crc module got exception: {e}, disabling crc checks", file=sys.stderr) - calculate_crc = None - + request_data(ep_out) + print() # Extra line to concatenation work better in gnuplot. print("timestamp sample_in_packet voltage_V current_A dp_V dn_V temp_C_ema energy_Ws capacity_As") - def decode(data): - nonlocal temp_ema, energy, capacity - - # Data is 64 bytes (64 bytes of HID data minus vendor constant 0xaa) - # First byte is HID vendor constant 0xaa - # Second byte is payload type: - # 0x04 is data packet - # Other types (0x03 and maybe other ones) is unknown - # Next 4 samples each 15 bytes. 60 bytes total. - # At the end 2 bytes: - # 1 byte is semi constant with unknown purpose. - # 1 byte (last) is a 8-bit CRC checksum - - packet_type = data[1] - if packet_type != 0x04: - # ignore all non-data packets - # print("Ignoring") - return - - if calculate_crc: - actual_checksum = data[-1] - expected_checksum = calculate_crc(data[1:-1]) - if actual_checksum != expected_checksum: - print( - f"Ignoring packet of length {len(data)} with unexpected checksum. Expected: {expected_checksum:02x} Actual: {actual_checksum:02x}", - file=sys.stderr, - ) - return - - t0 = time.time() - 4 * time_interval - for i in range(4): - offset = 2 + 15 * i - # 4 + 4 + 2 + 2 + 2 + 1 (unknown constant 1) = 15 bytes - voltage = ( - data[offset + 3] * 256 * 256 * 256 - + data[offset + 2] * 256 * 256 - + data[offset + 1] * 256 - + data[offset + 0] - ) / 100000 - current = ( - data[offset + 7] * 256 * 256 * 256 - + data[offset + 6] * 256 * 256 - + data[offset + 5] * 256 - + data[offset + 4] - ) / 100000 - dp = (data[offset + 8] + data[offset + 9] * 256) / 1000 - dn = (data[offset + 10] + data[offset + 11] * 256) / 1000 - # unknown12 = data[offset + 12] # ? constant 1 # some PD info? - # It does not look to be a sign of current. I tried reversing - # USB-C-In and USB-C-Out, which does reversed orientation of the - # blue arrow for current on device screen, but unknown12 remains 1. - # print(f"unknown{offset+12} {unknown12:02x}") - temp_C = (data[offset + 13] + data[offset + 14] * 256) / 10.0 - if temp_ema is not None: - temp_ema = temp_C * (1.0 - alpha) + temp_ema * alpha - else: - temp_ema = temp_C - power = voltage * current - # TODO(baryluk): This might be slightly inaccurate, if there is sampling jitter. - energy += power * time_interval - capacity += current * time_interval - t = t0 + i * time_interval - print( - f"{t:.3f} {i} {voltage:7.5f} {current:7.5f} {dp:5.3f} {dn:5.3f} {temp_ema:6.3f} {energy:.6f} {capacity:.6f}" - ) - # unknown62 = data[62] # data[-2] - # print(f"unknown62 {unknown:02x}") - # print() - time.sleep(0.1) refresh = 1.0 if is_fnb58_or_fnb48s else 0.003 # 1 s for FNB58 / FNB48S, 3 ms for others continue_time = time.time() + refresh - while True: - data = ep_in.read(size_or_buffer=64, timeout=1000) - # print("".join([f"{x:02x}" for x in data])) - decode(data) - - if time.time() >= continue_time: - continue_time = time.time() + refresh - ep_out.write(b"\xaa\x83" + b"\x00" * 61 + b"\x9e") - + terminate_execution = False + + while not terminate_execution: + try: + data = ep_in.read(size_or_buffer=64, timeout=5000) + # print("".join([f"{x:02x}" for x in data])) + decode(data, crc_calculator, time_interval) + + if time.time() >= continue_time: + continue_time = time.time() + refresh + ep_out.write(b"\xaa\x83" + b"\x00" * 61 + b"\x9e") + except KeyboardInterrupt: + terminate_execution = True + try: + while True: + #Exhaust data in descriptor + ep_in.read(size_or_buffer=64, timeout=1000) + except: + print("Exiting...") + + +# Argument handling +def str2bool(v: str) -> bool: + vl = v.lower() + if vl in ("yes", "true", "t", "1"): + return True + if vl in ("no", "false", "f", "0"): + return False + print("CRC flag argument type must be one of: 'true', 'yes', 't', '1', 'false', 'no', 'f', '0'. However, found '" + v + "'. Defaulting to False.", file = sys.stderr) + return False if __name__ == "__main__": - main() + parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser.add_argument("--crc", type=str2bool, help="Enable CRC checks", default=False) + args = parser.parse_args() + if args.crc: + try: + import crc + except ModuleNotFoundError: + print("Warning: crc package not found. crc checks will not be performed", file=sys.stderr) + args.crc = False + main(args) diff --git a/requirements.txt b/requirements.txt index 21ae030..2b5676c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ -pyusb~=1.2.1 -crc~=1.3.0 +crc==5.0.0 +pyusb==1.2.1 diff --git a/settings.ini b/settings.ini new file mode 100644 index 0000000..8323b3b --- /dev/null +++ b/settings.ini @@ -0,0 +1,2 @@ +[DEFAULT] +CRCCheck = no