diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0ddb3028c..ab14b6b6a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -18,7 +18,6 @@ jobs: os: [ubuntu-latest, macos-latest, windows-latest] experimental: [false] python-version: [ - "3.8", "3.9", "3.10", "3.11", @@ -81,9 +80,6 @@ jobs: run: | python -m pip install --upgrade pip pip install -e .[lint] - - name: mypy 3.8 - run: | - mypy --python-version 3.8 . - name: mypy 3.9 run: | mypy --python-version 3.9 . diff --git a/README.rst b/README.rst index d2f05b2c1..3c185f6cb 100644 --- a/README.rst +++ b/README.rst @@ -3,7 +3,7 @@ python-can |pypi| |conda| |python_implementation| |downloads| |downloads_monthly| -|docs| |github-actions| |coverage| |mergify| |formatter| +|docs| |github-actions| |coverage| |formatter| .. |pypi| image:: https://img.shields.io/pypi/v/python-can.svg :target: https://pypi.python.org/pypi/python-can/ @@ -41,10 +41,6 @@ python-can :target: https://coveralls.io/github/hardbyte/python-can?branch=develop :alt: Test coverage reports on Coveralls.io -.. |mergify| image:: https://img.shields.io/endpoint.svg?url=https://api.mergify.com/v1/badges/hardbyte/python-can&style=flat - :target: https://mergify.io - :alt: Mergify Status - The **C**\ ontroller **A**\ rea **N**\ etwork is a bus standard designed to allow microcontrollers and devices to communicate with each other. It has priority based bus arbitration and reliable deterministic @@ -64,6 +60,7 @@ Library Version Python 3.x 2.7+, 3.5+ 4.0+ 3.7+ 4.3+ 3.8+ + 4.6+ 3.9+ ============================== =========== diff --git a/can/__init__.py b/can/__init__.py index 1d4b7f0cf..b1bd636c1 100644 --- a/can/__init__.py +++ b/can/__init__.py @@ -8,7 +8,7 @@ import contextlib import logging from importlib.metadata import PackageNotFoundError, version -from typing import Any, Dict +from typing import Any __all__ = [ "VALID_INTERFACES", @@ -130,4 +130,4 @@ log = logging.getLogger("can") -rc: Dict[str, Any] = {} +rc: dict[str, Any] = {} diff --git a/can/_entry_points.py b/can/_entry_points.py index 6842e3c1a..e8ce92d7c 100644 --- a/can/_entry_points.py +++ b/can/_entry_points.py @@ -2,7 +2,7 @@ import sys from dataclasses import dataclass from importlib.metadata import entry_points -from typing import Any, List +from typing import Any @dataclass @@ -20,14 +20,14 @@ def load(self) -> Any: # "Compatibility Note". if sys.version_info >= (3, 10): - def read_entry_points(group: str) -> List[_EntryPoint]: + def read_entry_points(group: str) -> list[_EntryPoint]: return [ _EntryPoint(ep.name, ep.module, ep.attr) for ep in entry_points(group=group) ] else: - def read_entry_points(group: str) -> List[_EntryPoint]: + def read_entry_points(group: str) -> list[_EntryPoint]: return [ _EntryPoint(ep.name, *ep.value.split(":", maxsplit=1)) for ep in entry_points().get(group, []) diff --git a/can/bit_timing.py b/can/bit_timing.py index feba8b6d2..4b0074472 100644 --- a/can/bit_timing.py +++ b/can/bit_timing.py @@ -1,6 +1,7 @@ # pylint: disable=too-many-lines import math -from typing import TYPE_CHECKING, Iterator, List, Mapping, cast +from collections.abc import Iterator, Mapping +from typing import TYPE_CHECKING, cast if TYPE_CHECKING: from can.typechecking import BitTimingDict, BitTimingFdDict @@ -286,7 +287,7 @@ def from_sample_point( if sample_point < 50.0: raise ValueError(f"sample_point (={sample_point}) must not be below 50%.") - possible_solutions: List[BitTiming] = list( + possible_solutions: list[BitTiming] = list( cls.iterate_from_sample_point(f_clock, bitrate, sample_point) ) @@ -874,7 +875,7 @@ def from_sample_point( f"data_sample_point (={data_sample_point}) must not be below 50%." ) - possible_solutions: List[BitTimingFd] = list( + possible_solutions: list[BitTimingFd] = list( cls.iterate_from_sample_point( f_clock, nom_bitrate, diff --git a/can/broadcastmanager.py b/can/broadcastmanager.py index 19dca8fbc..b2bc28e76 100644 --- a/can/broadcastmanager.py +++ b/can/broadcastmanager.py @@ -12,13 +12,12 @@ import threading import time import warnings +from collections.abc import Sequence from typing import ( TYPE_CHECKING, Callable, Final, Optional, - Sequence, - Tuple, Union, cast, ) @@ -127,7 +126,7 @@ def __init__( @staticmethod def _check_and_convert_messages( messages: Union[Sequence[Message], Message], - ) -> Tuple[Message, ...]: + ) -> tuple[Message, ...]: """Helper function to convert a Message or Sequence of messages into a tuple, and raises an error when the given value is invalid. @@ -194,7 +193,7 @@ def start(self) -> None: class ModifiableCyclicTaskABC(CyclicSendTaskABC, abc.ABC): - def _check_modified_messages(self, messages: Tuple[Message, ...]) -> None: + def _check_modified_messages(self, messages: tuple[Message, ...]) -> None: """Helper function to perform error checking when modifying the data in the cyclic task. diff --git a/can/bus.py b/can/bus.py index d186e2d05..0d031a18b 100644 --- a/can/bus.py +++ b/can/bus.py @@ -6,18 +6,14 @@ import logging import threading from abc import ABC, ABCMeta, abstractmethod +from collections.abc import Iterator, Sequence from enum import Enum, auto from time import time from types import TracebackType from typing import ( Any, Callable, - Iterator, - List, Optional, - Sequence, - Tuple, - Type, Union, cast, ) @@ -97,7 +93,7 @@ def __init__( :raises ~can.exceptions.CanInitializationError: If the bus cannot be initialized """ - self._periodic_tasks: List[_SelfRemovingCyclicTask] = [] + self._periodic_tasks: list[_SelfRemovingCyclicTask] = [] self.set_filters(can_filters) # Flip the class default value when the constructor finishes. That # usually means the derived class constructor was also successful, @@ -147,7 +143,7 @@ def recv(self, timeout: Optional[float] = None) -> Optional[Message]: def _recv_internal( self, timeout: Optional[float] - ) -> Tuple[Optional[Message], bool]: + ) -> tuple[Optional[Message], bool]: """ Read a message from the bus and tell whether it was filtered. This methods may be called by :meth:`~can.BusABC.recv` @@ -491,7 +487,7 @@ def __enter__(self) -> Self: def __exit__( self, - exc_type: Optional[Type[BaseException]], + exc_type: Optional[type[BaseException]], exc_value: Optional[BaseException], traceback: Optional[TracebackType], ) -> None: @@ -529,7 +525,7 @@ def protocol(self) -> CanProtocol: return self._can_protocol @staticmethod - def _detect_available_configs() -> List[can.typechecking.AutoDetectedConfig]: + def _detect_available_configs() -> list[can.typechecking.AutoDetectedConfig]: """Detect all configurations/channels that this interface could currently connect with. diff --git a/can/ctypesutil.py b/can/ctypesutil.py index a80dc3194..8336941be 100644 --- a/can/ctypesutil.py +++ b/can/ctypesutil.py @@ -5,7 +5,7 @@ import ctypes import logging import sys -from typing import Any, Callable, Optional, Tuple, Union +from typing import Any, Callable, Optional, Union log = logging.getLogger("can.ctypesutil") @@ -32,7 +32,7 @@ def map_symbol( self, func_name: str, restype: Any = None, - argtypes: Tuple[Any, ...] = (), + argtypes: tuple[Any, ...] = (), errcheck: Optional[Callable[..., Any]] = None, ) -> Any: """ diff --git a/can/exceptions.py b/can/exceptions.py index ca2d6c5a3..8abc75147 100644 --- a/can/exceptions.py +++ b/can/exceptions.py @@ -15,14 +15,9 @@ :class:`ValueError`. This should always be documented for the function at hand. """ -import sys +from collections.abc import Generator from contextlib import contextmanager -from typing import Optional, Type - -if sys.version_info >= (3, 9): - from collections.abc import Generator -else: - from typing import Generator +from typing import Optional class CanError(Exception): @@ -114,7 +109,7 @@ class CanTimeoutError(CanError, TimeoutError): @contextmanager def error_check( error_message: Optional[str] = None, - exception_type: Type[CanError] = CanOperationError, + exception_type: type[CanError] = CanOperationError, ) -> Generator[None, None, None]: """Catches any exceptions and turns them into the new type while preserving the stack trace.""" try: diff --git a/can/interface.py b/can/interface.py index f7a8ecc02..e017b9514 100644 --- a/can/interface.py +++ b/can/interface.py @@ -6,7 +6,8 @@ import importlib import logging -from typing import Any, Iterable, List, Optional, Type, Union, cast +from collections.abc import Iterable +from typing import Any, Optional, Union, cast from . import util from .bus import BusABC @@ -18,7 +19,7 @@ log_autodetect = log.getChild("detect_available_configs") -def _get_class_for_interface(interface: str) -> Type[BusABC]: +def _get_class_for_interface(interface: str) -> type[BusABC]: """ Returns the main bus class for the given interface. @@ -52,7 +53,7 @@ def _get_class_for_interface(interface: str) -> Type[BusABC]: f"'{interface}': {e}" ) from None - return cast("Type[BusABC]", bus_class) + return cast("type[BusABC]", bus_class) @util.deprecated_args_alias( @@ -139,7 +140,7 @@ def Bus( # noqa: N802 def detect_available_configs( interfaces: Union[None, str, Iterable[str]] = None, -) -> List[AutoDetectedConfig]: +) -> list[AutoDetectedConfig]: """Detect all configurations/channels that the interfaces could currently connect with. diff --git a/can/interfaces/__init__.py b/can/interfaces/__init__.py index f220d28e5..1b401639a 100644 --- a/can/interfaces/__init__.py +++ b/can/interfaces/__init__.py @@ -2,8 +2,6 @@ Interfaces contain low level implementations that interact with CAN hardware. """ -from typing import Dict, Tuple - from can._entry_points import read_entry_points __all__ = [ @@ -35,7 +33,7 @@ ] # interface_name => (module, classname) -BACKENDS: Dict[str, Tuple[str, str]] = { +BACKENDS: dict[str, tuple[str, str]] = { "kvaser": ("can.interfaces.kvaser", "KvaserBus"), "socketcan": ("can.interfaces.socketcan", "SocketcanBus"), "serial": ("can.interfaces.serial.serial_can", "SerialBus"), diff --git a/can/interfaces/canalystii.py b/can/interfaces/canalystii.py index 2fef19497..d85211130 100644 --- a/can/interfaces/canalystii.py +++ b/can/interfaces/canalystii.py @@ -1,8 +1,9 @@ import logging import time from collections import deque +from collections.abc import Sequence from ctypes import c_ubyte -from typing import Any, Deque, Dict, Optional, Sequence, Tuple, Union +from typing import Any, Optional, Union import canalystii as driver @@ -26,7 +27,7 @@ def __init__( timing: Optional[Union[BitTiming, BitTimingFd]] = None, can_filters: Optional[CanFilters] = None, rx_queue_size: Optional[int] = None, - **kwargs: Dict[str, Any], + **kwargs: dict[str, Any], ): """ @@ -68,7 +69,7 @@ def __init__( self.channels = list(channel) self.channel_info = f"CANalyst-II: device {device}, channels {self.channels}" - self.rx_queue: Deque[Tuple[int, driver.Message]] = deque(maxlen=rx_queue_size) + self.rx_queue: deque[tuple[int, driver.Message]] = deque(maxlen=rx_queue_size) self.device = driver.CanalystDevice(device_index=device) self._can_protocol = CanProtocol.CAN_20 @@ -129,7 +130,7 @@ def send(self, msg: Message, timeout: Optional[float] = None) -> None: if timeout is not None and not send_result: raise CanTimeoutError(f"Send timed out after {timeout} seconds") - def _recv_from_queue(self) -> Tuple[Message, bool]: + def _recv_from_queue(self) -> tuple[Message, bool]: """Return a message from the internal receive queue""" channel, raw_msg = self.rx_queue.popleft() @@ -166,7 +167,7 @@ def poll_received_messages(self) -> None: def _recv_internal( self, timeout: Optional[float] = None - ) -> Tuple[Optional[Message], bool]: + ) -> tuple[Optional[Message], bool]: """ :param timeout: float in seconds diff --git a/can/interfaces/etas/__init__.py b/can/interfaces/etas/__init__.py index 2bfcbf427..9d4d0bd2a 100644 --- a/can/interfaces/etas/__init__.py +++ b/can/interfaces/etas/__init__.py @@ -1,5 +1,5 @@ import time -from typing import Dict, List, Optional, Tuple +from typing import Optional import can from can.exceptions import CanInitializationError @@ -16,7 +16,7 @@ def __init__( bitrate: int = 1000000, fd: bool = True, data_bitrate: int = 2000000, - **kwargs: Dict[str, any], + **kwargs: dict[str, any], ): self.receive_own_messages = receive_own_messages self._can_protocol = can.CanProtocol.CAN_FD if fd else can.CanProtocol.CAN_20 @@ -122,7 +122,7 @@ def __init__( def _recv_internal( self, timeout: Optional[float] - ) -> Tuple[Optional[can.Message], bool]: + ) -> tuple[Optional[can.Message], bool]: ociMsgs = (ctypes.POINTER(OCI_CANMessageEx) * 1)() ociMsg = OCI_CANMessageEx() ociMsgs[0] = ctypes.pointer(ociMsg) @@ -295,12 +295,12 @@ def state(self, new_state: can.BusState) -> None: raise NotImplementedError("Setting state is not implemented.") @staticmethod - def _detect_available_configs() -> List[can.typechecking.AutoDetectedConfig]: + def _detect_available_configs() -> list[can.typechecking.AutoDetectedConfig]: nodeRange = CSI_NodeRange(CSI_NODE_MIN, CSI_NODE_MAX) tree = ctypes.POINTER(CSI_Tree)() CSI_CreateProtocolTree(ctypes.c_char_p(b""), nodeRange, ctypes.byref(tree)) - nodes: List[Dict[str, str]] = [] + nodes: list[dict[str, str]] = [] def _findNodes(tree, prefix): uri = f"{prefix}/{tree.contents.item.uriName.decode()}" diff --git a/can/interfaces/gs_usb.py b/can/interfaces/gs_usb.py index 6268350ee..4ab541f43 100644 --- a/can/interfaces/gs_usb.py +++ b/can/interfaces/gs_usb.py @@ -1,5 +1,5 @@ import logging -from typing import Optional, Tuple +from typing import Optional import usb from gs_usb.constants import CAN_EFF_FLAG, CAN_ERR_FLAG, CAN_MAX_DLC, CAN_RTR_FLAG @@ -119,7 +119,7 @@ def send(self, msg: can.Message, timeout: Optional[float] = None): def _recv_internal( self, timeout: Optional[float] - ) -> Tuple[Optional[can.Message], bool]: + ) -> tuple[Optional[can.Message], bool]: """ Read a message from the bus and tell whether it was filtered. This methods may be called by :meth:`~can.BusABC.recv` diff --git a/can/interfaces/iscan.py b/can/interfaces/iscan.py index be0b0dae8..79b4f754d 100644 --- a/can/interfaces/iscan.py +++ b/can/interfaces/iscan.py @@ -5,7 +5,7 @@ import ctypes import logging import time -from typing import Optional, Tuple, Union +from typing import Optional, Union from can import ( BusABC, @@ -117,7 +117,7 @@ def __init__( def _recv_internal( self, timeout: Optional[float] - ) -> Tuple[Optional[Message], bool]: + ) -> tuple[Optional[Message], bool]: raw_msg = MessageExStruct() end_time = time.time() + timeout if timeout is not None else None while True: diff --git a/can/interfaces/ixxat/canlib.py b/can/interfaces/ixxat/canlib.py index a1693aeed..e6ad25d57 100644 --- a/can/interfaces/ixxat/canlib.py +++ b/can/interfaces/ixxat/canlib.py @@ -1,4 +1,5 @@ -from typing import Callable, List, Optional, Sequence, Union +from collections.abc import Sequence +from typing import Callable, Optional, Union import can.interfaces.ixxat.canlib_vcinpl as vcinpl import can.interfaces.ixxat.canlib_vcinpl2 as vcinpl2 @@ -174,5 +175,5 @@ def state(self) -> BusState: return self.bus.state @staticmethod - def _detect_available_configs() -> List[AutoDetectedConfig]: + def _detect_available_configs() -> list[AutoDetectedConfig]: return vcinpl._detect_available_configs() diff --git a/can/interfaces/ixxat/canlib_vcinpl.py b/can/interfaces/ixxat/canlib_vcinpl.py index 567dd0cd9..ba8f1870b 100644 --- a/can/interfaces/ixxat/canlib_vcinpl.py +++ b/can/interfaces/ixxat/canlib_vcinpl.py @@ -14,7 +14,8 @@ import logging import sys import warnings -from typing import Callable, List, Optional, Sequence, Tuple, Union +from collections.abc import Sequence +from typing import Callable, Optional, Union from can import ( BusABC, @@ -64,7 +65,7 @@ def __vciFormatErrorExtended( - library_instance: CLibrary, function: Callable, vret: int, args: Tuple + library_instance: CLibrary, function: Callable, vret: int, args: tuple ): """Format a VCI error and attach failed function, decoded HRESULT and arguments :param CLibrary library_instance: @@ -961,7 +962,7 @@ def get_ixxat_hwids(): return hwids -def _detect_available_configs() -> List[AutoDetectedConfig]: +def _detect_available_configs() -> list[AutoDetectedConfig]: config_list = [] # list in wich to store the resulting bus kwargs # used to detect HWID diff --git a/can/interfaces/ixxat/canlib_vcinpl2.py b/can/interfaces/ixxat/canlib_vcinpl2.py index 79ad34c4f..f9ac5346b 100644 --- a/can/interfaces/ixxat/canlib_vcinpl2.py +++ b/can/interfaces/ixxat/canlib_vcinpl2.py @@ -15,7 +15,8 @@ import sys import time import warnings -from typing import Callable, Optional, Sequence, Tuple, Union +from collections.abc import Sequence +from typing import Callable, Optional, Union from can import ( BusABC, @@ -62,7 +63,7 @@ def __vciFormatErrorExtended( - library_instance: CLibrary, function: Callable, vret: int, args: Tuple + library_instance: CLibrary, function: Callable, vret: int, args: tuple ): """Format a VCI error and attach failed function, decoded HRESULT and arguments :param CLibrary library_instance: diff --git a/can/interfaces/nican.py b/can/interfaces/nican.py index 8a2efade7..1abf0b35f 100644 --- a/can/interfaces/nican.py +++ b/can/interfaces/nican.py @@ -16,7 +16,7 @@ import ctypes import logging import sys -from typing import Optional, Tuple, Type +from typing import Optional import can.typechecking from can import ( @@ -112,7 +112,7 @@ def check_status( result: int, function, arguments, - error_class: Type[NicanError] = NicanOperationError, + error_class: type[NicanError] = NicanOperationError, ) -> int: if result > 0: logger.warning(get_error_message(result)) @@ -281,7 +281,7 @@ def __init__( def _recv_internal( self, timeout: Optional[float] - ) -> Tuple[Optional[Message], bool]: + ) -> tuple[Optional[Message], bool]: """ Read a message from a NI-CAN bus. diff --git a/can/interfaces/nixnet.py b/can/interfaces/nixnet.py index 2b3cbd69a..c723d1f52 100644 --- a/can/interfaces/nixnet.py +++ b/can/interfaces/nixnet.py @@ -14,7 +14,7 @@ import warnings from queue import SimpleQueue from types import ModuleType -from typing import Any, List, Optional, Tuple, Union +from typing import Any, Optional, Union import can.typechecking from can import BitTiming, BitTimingFd, BusABC, CanProtocol, Message @@ -203,7 +203,7 @@ def fd(self) -> bool: def _recv_internal( self, timeout: Optional[float] - ) -> Tuple[Optional[Message], bool]: + ) -> tuple[Optional[Message], bool]: end_time = time.perf_counter() + timeout if timeout is not None else None while True: @@ -327,7 +327,7 @@ def shutdown(self) -> None: self._session_receive.close() @staticmethod - def _detect_available_configs() -> List[can.typechecking.AutoDetectedConfig]: + def _detect_available_configs() -> list[can.typechecking.AutoDetectedConfig]: configs = [] try: diff --git a/can/interfaces/pcan/pcan.py b/can/interfaces/pcan/pcan.py index d0372a83c..ef3b23e3b 100644 --- a/can/interfaces/pcan/pcan.py +++ b/can/interfaces/pcan/pcan.py @@ -6,7 +6,7 @@ import platform import time import warnings -from typing import Any, List, Optional, Tuple, Union +from typing import Any, Optional, Union from packaging import version @@ -502,7 +502,7 @@ def set_device_number(self, device_number): def _recv_internal( self, timeout: Optional[float] - ) -> Tuple[Optional[Message], bool]: + ) -> tuple[Optional[Message], bool]: end_time = time.time() + timeout if timeout is not None else None while True: @@ -726,7 +726,7 @@ def _detect_available_configs(): res, value = library_handle.GetValue(PCAN_NONEBUS, PCAN_ATTACHED_CHANNELS) if res != PCAN_ERROR_OK: return interfaces - channel_information: List[TPCANChannelInformation] = list(value) + channel_information: list[TPCANChannelInformation] = list(value) for channel in channel_information: # find channel name in PCAN_CHANNEL_NAMES by value channel_name = next( diff --git a/can/interfaces/serial/serial_can.py b/can/interfaces/serial/serial_can.py index 476cbd624..9fe715267 100644 --- a/can/interfaces/serial/serial_can.py +++ b/can/interfaces/serial/serial_can.py @@ -10,7 +10,7 @@ import io import logging import struct -from typing import Any, List, Optional, Tuple +from typing import Any, Optional from can import ( BusABC, @@ -38,7 +38,7 @@ from serial.tools.list_ports import comports as list_comports except ImportError: # If unavailable on some platform, just return nothing - def list_comports() -> List[Any]: + def list_comports() -> list[Any]: return [] @@ -160,7 +160,7 @@ def send(self, msg: Message, timeout: Optional[float] = None) -> None: def _recv_internal( self, timeout: Optional[float] - ) -> Tuple[Optional[Message], bool]: + ) -> tuple[Optional[Message], bool]: """ Read a message from the serial device. @@ -229,7 +229,7 @@ def fileno(self) -> int: raise CanOperationError("Cannot fetch fileno") from exception @staticmethod - def _detect_available_configs() -> List[AutoDetectedConfig]: + def _detect_available_configs() -> list[AutoDetectedConfig]: return [ {"interface": "serial", "channel": port.device} for port in list_comports() ] diff --git a/can/interfaces/slcan.py b/can/interfaces/slcan.py index ddd2bec23..c51b298cc 100644 --- a/can/interfaces/slcan.py +++ b/can/interfaces/slcan.py @@ -7,7 +7,7 @@ import time import warnings from queue import SimpleQueue -from typing import Any, Optional, Tuple, Union, cast +from typing import Any, Optional, Union, cast from can import BitTiming, BitTimingFd, BusABC, CanProtocol, Message, typechecking from can.exceptions import ( @@ -230,7 +230,7 @@ def close(self) -> None: def _recv_internal( self, timeout: Optional[float] - ) -> Tuple[Optional[Message], bool]: + ) -> tuple[Optional[Message], bool]: canId = None remote = False extended = False @@ -315,7 +315,7 @@ def fileno(self) -> int: def get_version( self, timeout: Optional[float] - ) -> Tuple[Optional[int], Optional[int]]: + ) -> tuple[Optional[int], Optional[int]]: """Get HW and SW version of the slcan interface. :param timeout: diff --git a/can/interfaces/socketcan/socketcan.py b/can/interfaces/socketcan/socketcan.py index a5819dcb5..30b75108a 100644 --- a/can/interfaces/socketcan/socketcan.py +++ b/can/interfaces/socketcan/socketcan.py @@ -15,7 +15,8 @@ import threading import time import warnings -from typing import Callable, Dict, List, Optional, Sequence, Tuple, Type, Union +from collections.abc import Sequence +from typing import Callable, Optional, Union import can from can import BusABC, CanProtocol, Message @@ -50,13 +51,13 @@ # Setup BCM struct def bcm_header_factory( - fields: List[Tuple[str, Union[Type[ctypes.c_uint32], Type[ctypes.c_long]]]], + fields: list[tuple[str, Union[type[ctypes.c_uint32], type[ctypes.c_long]]]], alignment: int = 8, ): curr_stride = 0 - results: List[ - Tuple[ - str, Union[Type[ctypes.c_uint8], Type[ctypes.c_uint32], Type[ctypes.c_long]] + results: list[ + tuple[ + str, Union[type[ctypes.c_uint8], type[ctypes.c_uint32], type[ctypes.c_long]] ] ] = [] pad_index = 0 @@ -292,7 +293,7 @@ def build_bcm_transmit_header( # Note `TX_COUNTEVT` creates the message TX_EXPIRED when count expires flags |= constants.TX_COUNTEVT - def split_time(value: float) -> Tuple[int, int]: + def split_time(value: float) -> tuple[int, int]: """Given seconds as a float, return whole seconds and microseconds""" seconds = int(value) microseconds = int(1e6 * (value - seconds)) @@ -326,7 +327,7 @@ def is_frame_fd(frame: bytes): return len(frame) == constants.CANFD_MTU -def dissect_can_frame(frame: bytes) -> Tuple[int, int, int, bytes]: +def dissect_can_frame(frame: bytes) -> tuple[int, int, int, bytes]: can_id, data_len, flags, len8_dlc = CAN_FRAME_HEADER_STRUCT.unpack_from(frame) if data_len not in can.util.CAN_FD_DLC: @@ -739,7 +740,7 @@ def __init__( self.socket = create_socket() self.channel = channel self.channel_info = f"socketcan channel '{channel}'" - self._bcm_sockets: Dict[str, socket.socket] = {} + self._bcm_sockets: dict[str, socket.socket] = {} self._is_filtered = False self._task_id = 0 self._task_id_guard = threading.Lock() @@ -819,7 +820,7 @@ def shutdown(self) -> None: def _recv_internal( self, timeout: Optional[float] - ) -> Tuple[Optional[Message], bool]: + ) -> tuple[Optional[Message], bool]: try: # get all sockets that are ready (can be a list with a single value # being self.socket or an empty list if self.socket is not ready) @@ -992,7 +993,7 @@ def fileno(self) -> int: return self.socket.fileno() @staticmethod - def _detect_available_configs() -> List[can.typechecking.AutoDetectedConfig]: + def _detect_available_configs() -> list[can.typechecking.AutoDetectedConfig]: return [ {"interface": "socketcan", "channel": channel} for channel in find_available_interfaces() diff --git a/can/interfaces/socketcan/utils.py b/can/interfaces/socketcan/utils.py index 1505c6cf8..80dcb203f 100644 --- a/can/interfaces/socketcan/utils.py +++ b/can/interfaces/socketcan/utils.py @@ -9,7 +9,7 @@ import struct import subprocess import sys -from typing import List, Optional, cast +from typing import Optional, cast from can import typechecking from can.interfaces.socketcan.constants import CAN_EFF_FLAG @@ -39,7 +39,7 @@ def pack_filters(can_filters: Optional[typechecking.CanFilters] = None) -> bytes return struct.pack(can_filter_fmt, *filter_data) -def find_available_interfaces() -> List[str]: +def find_available_interfaces() -> list[str]: """Returns the names of all open can/vcan interfaces The function calls the ``ip link list`` command. If the lookup fails, an error diff --git a/can/interfaces/socketcand/socketcand.py b/can/interfaces/socketcand/socketcand.py index 7a2cc6fd0..3ecda082b 100644 --- a/can/interfaces/socketcand/socketcand.py +++ b/can/interfaces/socketcand/socketcand.py @@ -17,7 +17,6 @@ import urllib.parse as urlparselib import xml.etree.ElementTree as ET from collections import deque -from typing import List import can @@ -27,7 +26,7 @@ DEFAULT_SOCKETCAND_DISCOVERY_PORT = 42000 -def detect_beacon(timeout_ms: int = 3100) -> List[can.typechecking.AutoDetectedConfig]: +def detect_beacon(timeout_ms: int = 3100) -> list[can.typechecking.AutoDetectedConfig]: """ Detects socketcand servers @@ -340,7 +339,7 @@ def shutdown(self): self.__socket.close() @staticmethod - def _detect_available_configs() -> List[can.typechecking.AutoDetectedConfig]: + def _detect_available_configs() -> list[can.typechecking.AutoDetectedConfig]: try: return detect_beacon() except Exception as e: diff --git a/can/interfaces/systec/exceptions.py b/can/interfaces/systec/exceptions.py index dcd94bdbf..8768b412a 100644 --- a/can/interfaces/systec/exceptions.py +++ b/can/interfaces/systec/exceptions.py @@ -1,5 +1,4 @@ from abc import ABC, abstractmethod -from typing import Dict from can import CanError @@ -22,7 +21,7 @@ def __init__(self, result, func, arguments): @property @abstractmethod - def _error_message_mapping(self) -> Dict[ReturnCode, str]: ... + def _error_message_mapping(self) -> dict[ReturnCode, str]: ... class UcanError(UcanException): @@ -51,7 +50,7 @@ class UcanError(UcanException): } @property - def _error_message_mapping(self) -> Dict[ReturnCode, str]: + def _error_message_mapping(self) -> dict[ReturnCode, str]: return UcanError._ERROR_MESSAGES @@ -77,7 +76,7 @@ class UcanCmdError(UcanException): } @property - def _error_message_mapping(self) -> Dict[ReturnCode, str]: + def _error_message_mapping(self) -> dict[ReturnCode, str]: return UcanCmdError._ERROR_MESSAGES @@ -102,5 +101,5 @@ class UcanWarning(UcanException): } @property - def _error_message_mapping(self) -> Dict[ReturnCode, str]: + def _error_message_mapping(self) -> dict[ReturnCode, str]: return UcanWarning._ERROR_MESSAGES diff --git a/can/interfaces/udp_multicast/bus.py b/can/interfaces/udp_multicast/bus.py index dd114278c..31744c2b8 100644 --- a/can/interfaces/udp_multicast/bus.py +++ b/can/interfaces/udp_multicast/bus.py @@ -5,7 +5,7 @@ import struct import time import warnings -from typing import List, Optional, Tuple, Union +from typing import Optional, Union import can from can import BusABC, CanProtocol @@ -26,8 +26,8 @@ # see socket.getaddrinfo() -IPv4_ADDRESS_INFO = Tuple[str, int] # address, port -IPv6_ADDRESS_INFO = Tuple[str, int, int, int] # address, port, flowinfo, scope_id +IPv4_ADDRESS_INFO = tuple[str, int] # address, port +IPv6_ADDRESS_INFO = tuple[str, int, int, int] # address, port, flowinfo, scope_id IP_ADDRESS_INFO = Union[IPv4_ADDRESS_INFO, IPv6_ADDRESS_INFO] # Additional constants for the interaction with Unix kernels @@ -172,7 +172,7 @@ def shutdown(self) -> None: self._multicast.shutdown() @staticmethod - def _detect_available_configs() -> List[AutoDetectedConfig]: + def _detect_available_configs() -> list[AutoDetectedConfig]: if hasattr(socket, "CMSG_SPACE"): return [ { @@ -341,7 +341,7 @@ def send(self, data: bytes, timeout: Optional[float] = None) -> None: def recv( self, timeout: Optional[float] = None - ) -> Optional[Tuple[bytes, IP_ADDRESS_INFO, float]]: + ) -> Optional[tuple[bytes, IP_ADDRESS_INFO, float]]: """ Receive up to **max_buffer** bytes. diff --git a/can/interfaces/udp_multicast/utils.py b/can/interfaces/udp_multicast/utils.py index 35a0df185..5c9454ed4 100644 --- a/can/interfaces/udp_multicast/utils.py +++ b/can/interfaces/udp_multicast/utils.py @@ -2,7 +2,7 @@ Defines common functions. """ -from typing import Any, Dict, Optional +from typing import Any, Optional from can import CanInterfaceNotImplementedError, Message from can.typechecking import ReadableBytesLike @@ -44,7 +44,7 @@ def pack_message(message: Message) -> bytes: def unpack_message( data: ReadableBytesLike, - replace: Optional[Dict[str, Any]] = None, + replace: Optional[dict[str, Any]] = None, check: bool = False, ) -> Message: """Unpack a can.Message from a msgpack byte blob. diff --git a/can/interfaces/usb2can/serial_selector.py b/can/interfaces/usb2can/serial_selector.py index de29f03fe..9f3b7185e 100644 --- a/can/interfaces/usb2can/serial_selector.py +++ b/can/interfaces/usb2can/serial_selector.py @@ -1,7 +1,6 @@ """ """ import logging -from typing import List log = logging.getLogger("can.usb2can") @@ -43,7 +42,7 @@ def WMIDateStringToDate(dtmDate) -> str: return strDateTime -def find_serial_devices(serial_matcher: str = "") -> List[str]: +def find_serial_devices(serial_matcher: str = "") -> list[str]: """ Finds a list of USB devices where the serial number (partially) matches the given string. diff --git a/can/interfaces/vector/canlib.py b/can/interfaces/vector/canlib.py index d51d74529..986f52002 100644 --- a/can/interfaces/vector/canlib.py +++ b/can/interfaces/vector/canlib.py @@ -10,17 +10,13 @@ import os import time import warnings +from collections.abc import Iterator, Sequence from types import ModuleType from typing import ( Any, Callable, - Dict, - Iterator, - List, NamedTuple, Optional, - Sequence, - Tuple, Union, cast, ) @@ -204,8 +200,8 @@ def __init__( is_fd = isinstance(timing, BitTimingFd) if timing else fd self.mask = 0 - self.channel_masks: Dict[int, int] = {} - self.index_to_channel: Dict[int, int] = {} + self.channel_masks: dict[int, int] = {} + self.index_to_channel: dict[int, int] = {} self._can_protocol = CanProtocol.CAN_FD if is_fd else CanProtocol.CAN_20 self._listen_only = listen_only @@ -383,7 +379,7 @@ def _find_global_channel_idx( channel: int, serial: Optional[int], app_name: Optional[str], - channel_configs: List["VectorChannelConfig"], + channel_configs: list["VectorChannelConfig"], ) -> int: if serial is not None: serial_found = False @@ -439,7 +435,7 @@ def _has_init_access(self, channel: int) -> bool: return bool(self.permission_mask & self.channel_masks[channel]) def _read_bus_params( - self, channel_index: int, vcc_list: List["VectorChannelConfig"] + self, channel_index: int, vcc_list: list["VectorChannelConfig"] ) -> "VectorBusParams": for vcc in vcc_list: if vcc.channel_index == channel_index: @@ -712,7 +708,7 @@ def _apply_filters(self, filters: Optional[CanFilters]) -> None: def _recv_internal( self, timeout: Optional[float] - ) -> Tuple[Optional[Message], bool]: + ) -> tuple[Optional[Message], bool]: end_time = time.time() + timeout if timeout is not None else None while True: @@ -986,7 +982,7 @@ def reset(self) -> None: ) @staticmethod - def _detect_available_configs() -> List[AutoDetectedConfig]: + def _detect_available_configs() -> list[AutoDetectedConfig]: configs = [] channel_configs = get_channel_configs() LOG.info("Found %d channels", len(channel_configs)) @@ -1037,7 +1033,7 @@ def popup_vector_hw_configuration(wait_for_finish: int = 0) -> None: @staticmethod def get_application_config( app_name: str, app_channel: int - ) -> Tuple[Union[int, xldefine.XL_HardwareType], int, int]: + ) -> tuple[Union[int, xldefine.XL_HardwareType], int, int]: """Retrieve information for an application in Vector Hardware Configuration. :param app_name: @@ -1243,14 +1239,14 @@ def _read_bus_params_from_c_struct( ) -def get_channel_configs() -> List[VectorChannelConfig]: +def get_channel_configs() -> list[VectorChannelConfig]: """Read channel properties from Vector XL API.""" try: driver_config = _get_xl_driver_config() except VectorError: return [] - channel_list: List[VectorChannelConfig] = [] + channel_list: list[VectorChannelConfig] = [] for i in range(driver_config.channelCount): xlcc: xlclass.XLchannelConfig = driver_config.channel[i] vcc = VectorChannelConfig( diff --git a/can/interfaces/virtual.py b/can/interfaces/virtual.py index 62ad0cfe3..aa858913e 100644 --- a/can/interfaces/virtual.py +++ b/can/interfaces/virtual.py @@ -12,7 +12,7 @@ from copy import deepcopy from random import randint from threading import RLock -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple +from typing import TYPE_CHECKING, Any, Optional from can import CanOperationError from can.bus import BusABC, CanProtocol @@ -25,7 +25,7 @@ # Channels are lists of queues, one for each connection if TYPE_CHECKING: # https://mypy.readthedocs.io/en/stable/runtime_troubles.html#using-classes-that-are-generic-in-stubs-but-not-at-runtime - channels: Dict[Optional[Any], List[queue.Queue[Message]]] = {} + channels: dict[Optional[Any], list[queue.Queue[Message]]] = {} else: channels = {} channels_lock = RLock() @@ -125,7 +125,7 @@ def _check_if_open(self) -> None: def _recv_internal( self, timeout: Optional[float] - ) -> Tuple[Optional[Message], bool]: + ) -> tuple[Optional[Message], bool]: self._check_if_open() try: msg = self.queue.get(block=True, timeout=timeout) @@ -168,7 +168,7 @@ def shutdown(self) -> None: del channels[self.channel_id] @staticmethod - def _detect_available_configs() -> List[AutoDetectedConfig]: + def _detect_available_configs() -> list[AutoDetectedConfig]: """ Returns all currently used channels as well as one other currently unused channel. diff --git a/can/io/asc.py b/can/io/asc.py index deb7d429e..0bea823fd 100644 --- a/can/io/asc.py +++ b/can/io/asc.py @@ -8,8 +8,9 @@ import logging import re +from collections.abc import Generator from datetime import datetime -from typing import Any, Dict, Final, Generator, Optional, TextIO, Union +from typing import Any, Final, Optional, TextIO, Union from ..message import Message from ..typechecking import StringPathLike @@ -153,7 +154,7 @@ def _datetime_to_timestamp(datetime_string: str) -> float: raise ValueError(f"Incompatible datetime string {datetime_string}") - def _extract_can_id(self, str_can_id: str, msg_kwargs: Dict[str, Any]) -> None: + def _extract_can_id(self, str_can_id: str, msg_kwargs: dict[str, Any]) -> None: if str_can_id[-1:].lower() == "x": msg_kwargs["is_extended_id"] = True can_id = int(str_can_id[0:-1], self._converted_base) @@ -169,7 +170,7 @@ def _check_base(base: str) -> int: return BASE_DEC if base == "dec" else BASE_HEX def _process_data_string( - self, data_str: str, data_length: int, msg_kwargs: Dict[str, Any] + self, data_str: str, data_length: int, msg_kwargs: dict[str, Any] ) -> None: frame = bytearray() data = data_str.split() @@ -178,7 +179,7 @@ def _process_data_string( msg_kwargs["data"] = frame def _process_classic_can_frame( - self, line: str, msg_kwargs: Dict[str, Any] + self, line: str, msg_kwargs: dict[str, Any] ) -> Message: # CAN error frame if line.strip()[0:10].lower() == "errorframe": @@ -213,7 +214,7 @@ def _process_classic_can_frame( return Message(**msg_kwargs) - def _process_fd_can_frame(self, line: str, msg_kwargs: Dict[str, Any]) -> Message: + def _process_fd_can_frame(self, line: str, msg_kwargs: dict[str, Any]) -> Message: channel, direction, rest_of_message = line.split(None, 2) # See ASCWriter msg_kwargs["channel"] = int(channel) - 1 @@ -285,7 +286,7 @@ def __iter__(self) -> Generator[Message, None, None]: # J1939 message or some other unsupported event continue - msg_kwargs: Dict[str, Union[float, bool, int]] = {} + msg_kwargs: dict[str, Union[float, bool, int]] = {} try: _timestamp, channel, rest_of_message = line.split(None, 2) timestamp = float(_timestamp) + self.start_time diff --git a/can/io/blf.py b/can/io/blf.py index 139b44285..0368134ad 100644 --- a/can/io/blf.py +++ b/can/io/blf.py @@ -17,15 +17,16 @@ import struct import time import zlib +from collections.abc import Generator from decimal import Decimal -from typing import Any, BinaryIO, Generator, List, Optional, Tuple, Union, cast +from typing import Any, BinaryIO, Optional, Union, cast from ..message import Message from ..typechecking import StringPathLike from ..util import channel2int, dlc2len, len2dlc from .generic import BinaryIOMessageReader, FileIOMessageWriter -TSystemTime = Tuple[int, int, int, int, int, int, int, int] +TSystemTime = tuple[int, int, int, int, int, int, int, int] class BLFParseError(Exception): @@ -419,7 +420,7 @@ def __init__( assert self.file is not None self.channel = channel self.compression_level = compression_level - self._buffer: List[bytes] = [] + self._buffer: list[bytes] = [] self._buffer_size = 0 # If max container size is located in kwargs, then update the instance if kwargs.get("max_container_size", False): diff --git a/can/io/canutils.py b/can/io/canutils.py index cc978d4f2..e83c21926 100644 --- a/can/io/canutils.py +++ b/can/io/canutils.py @@ -5,7 +5,8 @@ """ import logging -from typing import Any, Generator, TextIO, Union +from collections.abc import Generator +from typing import Any, TextIO, Union from can.message import Message diff --git a/can/io/csv.py b/can/io/csv.py index 2abaeb70e..dcc7996f7 100644 --- a/can/io/csv.py +++ b/can/io/csv.py @@ -10,7 +10,8 @@ """ from base64 import b64decode, b64encode -from typing import Any, Generator, TextIO, Union +from collections.abc import Generator +from typing import Any, TextIO, Union from can.message import Message diff --git a/can/io/generic.py b/can/io/generic.py index 93840f807..72d3e219e 100644 --- a/can/io/generic.py +++ b/can/io/generic.py @@ -3,16 +3,15 @@ import gzip import locale from abc import ABCMeta +from collections.abc import Iterable +from contextlib import AbstractContextManager from types import TracebackType from typing import ( Any, BinaryIO, - ContextManager, - Iterable, Literal, Optional, TextIO, - Type, Union, cast, ) @@ -24,7 +23,7 @@ from ..message import Message -class BaseIOHandler(ContextManager, metaclass=ABCMeta): +class BaseIOHandler(AbstractContextManager): """A generic file handler that can be used for reading and writing. Can be used as a context manager. @@ -74,7 +73,7 @@ def __enter__(self) -> Self: def __exit__( self, - exc_type: Optional[Type[BaseException]], + exc_type: Optional[type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType], ) -> Literal[False]: diff --git a/can/io/logger.py b/can/io/logger.py index 359aae4ac..f9f029759 100644 --- a/can/io/logger.py +++ b/can/io/logger.py @@ -12,13 +12,9 @@ Any, Callable, ClassVar, - Dict, Final, Literal, Optional, - Set, - Tuple, - Type, cast, ) @@ -43,7 +39,7 @@ #: A map of file suffixes to their corresponding #: :class:`can.io.generic.MessageWriter` class -MESSAGE_WRITERS: Final[Dict[str, Type[MessageWriter]]] = { +MESSAGE_WRITERS: Final[dict[str, type[MessageWriter]]] = { ".asc": ASCWriter, ".blf": BLFWriter, ".csv": CSVWriter, @@ -66,7 +62,7 @@ def _update_writer_plugins() -> None: MESSAGE_WRITERS[entry_point.key] = writer_class -def _get_logger_for_suffix(suffix: str) -> Type[MessageWriter]: +def _get_logger_for_suffix(suffix: str) -> type[MessageWriter]: try: return MESSAGE_WRITERS[suffix] except KeyError: @@ -77,7 +73,7 @@ def _get_logger_for_suffix(suffix: str) -> Type[MessageWriter]: def _compress( filename: StringPathLike, **kwargs: Any -) -> Tuple[Type[MessageWriter], FileLike]: +) -> tuple[type[MessageWriter], FileLike]: """ Return the suffix and io object of the decompressed file. File will automatically recompress upon close. @@ -171,7 +167,7 @@ class BaseRotatingLogger(MessageWriter, ABC): Subclasses must set the `_writer` attribute upon initialization. """ - _supported_formats: ClassVar[Set[str]] = set() + _supported_formats: ClassVar[set[str]] = set() #: If this attribute is set to a callable, the :meth:`~BaseRotatingLogger.rotation_filename` #: method delegates to this callable. The parameters passed to the callable are @@ -290,7 +286,7 @@ def __enter__(self) -> Self: def __exit__( self, - exc_type: Optional[Type[BaseException]], + exc_type: Optional[type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType], ) -> Literal[False]: @@ -347,7 +343,7 @@ class SizedRotatingLogger(BaseRotatingLogger): :meth:`~can.Listener.stop` is called. """ - _supported_formats: ClassVar[Set[str]] = {".asc", ".blf", ".csv", ".log", ".txt"} + _supported_formats: ClassVar[set[str]] = {".asc", ".blf", ".csv", ".log", ".txt"} def __init__( self, diff --git a/can/io/mf4.py b/can/io/mf4.py index 9efa1d83d..557d882e1 100644 --- a/can/io/mf4.py +++ b/can/io/mf4.py @@ -8,11 +8,12 @@ import abc import heapq import logging +from collections.abc import Generator, Iterator from datetime import datetime from hashlib import md5 from io import BufferedIOBase, BytesIO from pathlib import Path -from typing import Any, BinaryIO, Dict, Generator, Iterator, List, Optional, Union, cast +from typing import Any, BinaryIO, Optional, Union, cast from ..message import Message from ..typechecking import StringPathLike @@ -339,7 +340,7 @@ def __iter__(self) -> Generator[Message, None, None]: for i in range(len(data)): data_length = int(data["CAN_DataFrame.DataLength"][i]) - kv: Dict[str, Any] = { + kv: dict[str, Any] = { "timestamp": float(data.timestamps[i]) + self._start_timestamp, "arbitration_id": int(data["CAN_DataFrame.ID"][i]) & 0x1FFFFFFF, "data": data["CAN_DataFrame.DataBytes"][i][ @@ -377,7 +378,7 @@ def __iter__(self) -> Generator[Message, None, None]: names = data.samples[0].dtype.names for i in range(len(data)): - kv: Dict[str, Any] = { + kv: dict[str, Any] = { "timestamp": float(data.timestamps[i]) + self._start_timestamp, "is_error_frame": True, } @@ -428,7 +429,7 @@ def __iter__(self) -> Generator[Message, None, None]: names = data.samples[0].dtype.names for i in range(len(data)): - kv: Dict[str, Any] = { + kv: dict[str, Any] = { "timestamp": float(data.timestamps[i]) + self._start_timestamp, "arbitration_id": int(data["CAN_RemoteFrame.ID"][i]) & 0x1FFFFFFF, @@ -474,7 +475,7 @@ def __init__( def __iter__(self) -> Iterator[Message]: # To handle messages split over multiple channel groups, create a single iterator per # channel group and merge these iterators into a single iterator using heapq. - iterators: List[FrameIterator] = [] + iterators: list[FrameIterator] = [] for group_index, group in enumerate(self._mdf.groups): channel_group: ChannelGroup = group.channel_group diff --git a/can/io/player.py b/can/io/player.py index 214112164..2451eab41 100644 --- a/can/io/player.py +++ b/can/io/player.py @@ -7,14 +7,10 @@ import gzip import pathlib import time +from collections.abc import Generator, Iterable from typing import ( Any, - Dict, Final, - Generator, - Iterable, - Tuple, - Type, Union, ) @@ -32,7 +28,7 @@ #: A map of file suffixes to their corresponding #: :class:`can.io.generic.MessageReader` class -MESSAGE_READERS: Final[Dict[str, Type[MessageReader]]] = { +MESSAGE_READERS: Final[dict[str, type[MessageReader]]] = { ".asc": ASCReader, ".blf": BLFReader, ".csv": CSVReader, @@ -54,7 +50,7 @@ def _update_reader_plugins() -> None: MESSAGE_READERS[entry_point.key] = reader_class -def _get_logger_for_suffix(suffix: str) -> Type[MessageReader]: +def _get_logger_for_suffix(suffix: str) -> type[MessageReader]: """Find MessageReader class for given suffix.""" try: return MESSAGE_READERS[suffix] @@ -64,7 +60,7 @@ def _get_logger_for_suffix(suffix: str) -> Type[MessageReader]: def _decompress( filename: StringPathLike, -) -> Tuple[Type[MessageReader], Union[str, FileLike]]: +) -> tuple[type[MessageReader], Union[str, FileLike]]: """ Return the suffix and io object of the decompressed file. """ diff --git a/can/io/sqlite.py b/can/io/sqlite.py index 43fd761e9..686e2d038 100644 --- a/can/io/sqlite.py +++ b/can/io/sqlite.py @@ -8,7 +8,8 @@ import sqlite3 import threading import time -from typing import Any, Generator +from collections.abc import Generator +from typing import Any from can.listener import BufferedReader from can.message import Message diff --git a/can/io/trc.py b/can/io/trc.py index 2dbe3763c..a07a53a4d 100644 --- a/can/io/trc.py +++ b/can/io/trc.py @@ -9,9 +9,10 @@ import logging import os +from collections.abc import Generator from datetime import datetime, timedelta, timezone from enum import Enum -from typing import Any, Callable, Dict, Generator, Optional, TextIO, Tuple, Union +from typing import Any, Callable, Optional, TextIO, Union from ..message import Message from ..typechecking import StringPathLike @@ -56,13 +57,13 @@ def __init__( super().__init__(file, mode="r") self.file_version = TRCFileVersion.UNKNOWN self._start_time: float = 0 - self.columns: Dict[str, int] = {} + self.columns: dict[str, int] = {} self._num_columns = -1 if not self.file: raise ValueError("The given file cannot be None") - self._parse_cols: Callable[[Tuple[str, ...]], Optional[Message]] = ( + self._parse_cols: Callable[[tuple[str, ...]], Optional[Message]] = ( lambda x: None ) @@ -140,7 +141,7 @@ def _extract_header(self): return line - def _parse_msg_v1_0(self, cols: Tuple[str, ...]) -> Optional[Message]: + def _parse_msg_v1_0(self, cols: tuple[str, ...]) -> Optional[Message]: arbit_id = cols[2] if arbit_id == "FFFFFFFF": logger.info("TRCReader: Dropping bus info line") @@ -155,7 +156,7 @@ def _parse_msg_v1_0(self, cols: Tuple[str, ...]) -> Optional[Message]: msg.data = bytearray([int(cols[i + 4], 16) for i in range(msg.dlc)]) return msg - def _parse_msg_v1_1(self, cols: Tuple[str, ...]) -> Optional[Message]: + def _parse_msg_v1_1(self, cols: tuple[str, ...]) -> Optional[Message]: arbit_id = cols[3] msg = Message() @@ -168,7 +169,7 @@ def _parse_msg_v1_1(self, cols: Tuple[str, ...]) -> Optional[Message]: msg.is_rx = cols[2] == "Rx" return msg - def _parse_msg_v1_3(self, cols: Tuple[str, ...]) -> Optional[Message]: + def _parse_msg_v1_3(self, cols: tuple[str, ...]) -> Optional[Message]: arbit_id = cols[4] msg = Message() @@ -181,7 +182,7 @@ def _parse_msg_v1_3(self, cols: Tuple[str, ...]) -> Optional[Message]: msg.is_rx = cols[3] == "Rx" return msg - def _parse_msg_v2_x(self, cols: Tuple[str, ...]) -> Optional[Message]: + def _parse_msg_v2_x(self, cols: tuple[str, ...]) -> Optional[Message]: type_ = cols[self.columns["T"]] bus = self.columns.get("B", None) @@ -208,7 +209,7 @@ def _parse_msg_v2_x(self, cols: Tuple[str, ...]) -> Optional[Message]: return msg - def _parse_cols_v1_1(self, cols: Tuple[str, ...]) -> Optional[Message]: + def _parse_cols_v1_1(self, cols: tuple[str, ...]) -> Optional[Message]: dtype = cols[2] if dtype in ("Tx", "Rx"): return self._parse_msg_v1_1(cols) @@ -216,7 +217,7 @@ def _parse_cols_v1_1(self, cols: Tuple[str, ...]) -> Optional[Message]: logger.info("TRCReader: Unsupported type '%s'", dtype) return None - def _parse_cols_v1_3(self, cols: Tuple[str, ...]) -> Optional[Message]: + def _parse_cols_v1_3(self, cols: tuple[str, ...]) -> Optional[Message]: dtype = cols[3] if dtype in ("Tx", "Rx"): return self._parse_msg_v1_3(cols) @@ -224,7 +225,7 @@ def _parse_cols_v1_3(self, cols: Tuple[str, ...]) -> Optional[Message]: logger.info("TRCReader: Unsupported type '%s'", dtype) return None - def _parse_cols_v2_x(self, cols: Tuple[str, ...]) -> Optional[Message]: + def _parse_cols_v2_x(self, cols: tuple[str, ...]) -> Optional[Message]: dtype = cols[self.columns["T"]] if dtype in {"DT", "FD", "FB", "FE", "BI"}: return self._parse_msg_v2_x(cols) diff --git a/can/listener.py b/can/listener.py index 1f19c3acd..1e95fa990 100644 --- a/can/listener.py +++ b/can/listener.py @@ -6,8 +6,9 @@ import sys import warnings from abc import ABCMeta, abstractmethod +from collections.abc import AsyncIterator from queue import Empty, SimpleQueue -from typing import Any, AsyncIterator, Optional +from typing import Any, Optional from can.bus import BusABC from can.message import Message diff --git a/can/logger.py b/can/logger.py index 4167558d8..9c1134257 100644 --- a/can/logger.py +++ b/can/logger.py @@ -2,15 +2,12 @@ import errno import re import sys +from collections.abc import Sequence from datetime import datetime from typing import ( TYPE_CHECKING, Any, - Dict, - List, Optional, - Sequence, - Tuple, Union, ) @@ -111,7 +108,7 @@ def _create_bus(parsed_args: argparse.Namespace, **kwargs: Any) -> can.BusABC: logging_level_names = ["critical", "error", "warning", "info", "debug", "subdebug"] can.set_logging_level(logging_level_names[min(5, parsed_args.verbosity)]) - config: Dict[str, Any] = {"single_handle": True, **kwargs} + config: dict[str, Any] = {"single_handle": True, **kwargs} if parsed_args.interface: config["interface"] = parsed_args.interface if parsed_args.bitrate: @@ -140,7 +137,7 @@ def __call__( raise argparse.ArgumentError(None, "Invalid filter argument") print(f"Adding filter(s): {values}") - can_filters: List[CanFilter] = [] + can_filters: list[CanFilter] = [] for filt in values: if ":" in filt: @@ -169,7 +166,7 @@ def __call__( if not isinstance(values, list): raise argparse.ArgumentError(None, "Invalid --timing argument") - timing_dict: Dict[str, int] = {} + timing_dict: dict[str, int] = {} for arg in values: try: key, value_string = arg.split("=") @@ -193,19 +190,19 @@ def _parse_additional_config(unknown_args: Sequence[str]) -> TAdditionalCliArgs: if not re.match(r"^--[a-zA-Z][a-zA-Z0-9\-]*=\S*?$", arg): raise ValueError(f"Parsing argument {arg} failed") - def _split_arg(_arg: str) -> Tuple[str, str]: + def _split_arg(_arg: str) -> tuple[str, str]: left, right = _arg.split("=", 1) return left.lstrip("-").replace("-", "_"), right - args: Dict[str, Union[str, int, float, bool]] = {} + args: dict[str, Union[str, int, float, bool]] = {} for key, string_val in map(_split_arg, unknown_args): args[key] = cast_from_string(string_val) return args def _parse_logger_args( - args: List[str], -) -> Tuple[argparse.Namespace, TAdditionalCliArgs]: + args: list[str], +) -> tuple[argparse.Namespace, TAdditionalCliArgs]: """Parse command line arguments for logger script.""" parser = argparse.ArgumentParser( diff --git a/can/notifier.py b/can/notifier.py index 088f0802e..237c874da 100644 --- a/can/notifier.py +++ b/can/notifier.py @@ -7,7 +7,8 @@ import logging import threading import time -from typing import Any, Awaitable, Callable, Iterable, List, Optional, Union +from collections.abc import Awaitable, Iterable +from typing import Any, Callable, Optional, Union from can.bus import BusABC from can.listener import Listener @@ -21,7 +22,7 @@ class Notifier: def __init__( self, - bus: Union[BusABC, List[BusABC]], + bus: Union[BusABC, list[BusABC]], listeners: Iterable[MessageRecipient], timeout: float = 1.0, loop: Optional[asyncio.AbstractEventLoop] = None, @@ -43,7 +44,7 @@ def __init__( :param timeout: An optional maximum number of seconds to wait for any :class:`~can.Message`. :param loop: An :mod:`asyncio` event loop to schedule the ``listeners`` in. """ - self.listeners: List[MessageRecipient] = list(listeners) + self.listeners: list[MessageRecipient] = list(listeners) self.bus = bus self.timeout = timeout self._loop = loop @@ -54,7 +55,7 @@ def __init__( self._running = True self._lock = threading.Lock() - self._readers: List[Union[int, threading.Thread]] = [] + self._readers: list[Union[int, threading.Thread]] = [] buses = self.bus if isinstance(self.bus, list) else [self.bus] for each_bus in buses: self.add_bus(each_bus) diff --git a/can/player.py b/can/player.py index 9deb0c51f..38b76a331 100644 --- a/can/player.py +++ b/can/player.py @@ -16,7 +16,7 @@ from .logger import _create_base_argument_parser, _create_bus, _parse_additional_config if TYPE_CHECKING: - from typing import Iterable + from collections.abc import Iterable from can import Message diff --git a/can/typechecking.py b/can/typechecking.py index d993d6bd3..36343ddaa 100644 --- a/can/typechecking.py +++ b/can/typechecking.py @@ -50,13 +50,13 @@ class CanFilterExtended(TypedDict): StringPathLike = typing.Union[str, "os.PathLike[str]"] AcceptedIOType = typing.Union[FileLike, StringPathLike] -BusConfig = typing.NewType("BusConfig", typing.Dict[str, typing.Any]) +BusConfig = typing.NewType("BusConfig", dict[str, typing.Any]) # Used by CLI scripts -TAdditionalCliArgs: TypeAlias = typing.Dict[str, typing.Union[str, int, float, bool]] -TDataStructs: TypeAlias = typing.Dict[ - typing.Union[int, typing.Tuple[int, ...]], - typing.Union[struct.Struct, typing.Tuple, None], +TAdditionalCliArgs: TypeAlias = dict[str, typing.Union[str, int, float, bool]] +TDataStructs: TypeAlias = dict[ + typing.Union[int, tuple[int, ...]], + typing.Union[struct.Struct, tuple, None], ] diff --git a/can/util.py b/can/util.py index bb835b846..2f32dda8e 100644 --- a/can/util.py +++ b/can/util.py @@ -12,15 +12,13 @@ import platform import re import warnings +from collections.abc import Iterable from configparser import ConfigParser from time import get_clock_info, perf_counter, time from typing import ( Any, Callable, - Dict, - Iterable, Optional, - Tuple, TypeVar, Union, cast, @@ -53,7 +51,7 @@ def load_file_config( path: Optional[typechecking.AcceptedIOType] = None, section: str = "default" -) -> Dict[str, str]: +) -> dict[str, str]: """ Loads configuration from file with following content:: @@ -77,7 +75,7 @@ def load_file_config( else: config.read(path) - _config: Dict[str, str] = {} + _config: dict[str, str] = {} if config.has_section(section): _config.update(config.items(section)) @@ -85,7 +83,7 @@ def load_file_config( return _config -def load_environment_config(context: Optional[str] = None) -> Dict[str, str]: +def load_environment_config(context: Optional[str] = None) -> dict[str, str]: """ Loads config dict from environmental variables (if set): @@ -111,7 +109,7 @@ def load_environment_config(context: Optional[str] = None) -> Dict[str, str]: context_suffix = f"_{context}" if context else "" can_config_key = f"CAN_CONFIG{context_suffix}" - config: Dict[str, str] = json.loads(os.environ.get(can_config_key, "{}")) + config: dict[str, str] = json.loads(os.environ.get(can_config_key, "{}")) for key, val in mapper.items(): config_option = os.environ.get(val + context_suffix, None) @@ -123,7 +121,7 @@ def load_environment_config(context: Optional[str] = None) -> Dict[str, str]: def load_config( path: Optional[typechecking.AcceptedIOType] = None, - config: Optional[Dict[str, Any]] = None, + config: Optional[dict[str, Any]] = None, context: Optional[str] = None, ) -> typechecking.BusConfig: """ @@ -178,7 +176,7 @@ def load_config( # Use the given dict for default values config_sources = cast( - "Iterable[Union[Dict[str, Any], Callable[[Any], Dict[str, Any]]]]", + "Iterable[Union[dict[str, Any], Callable[[Any], dict[str, Any]]]]", [ given_config, can.rc, @@ -212,7 +210,7 @@ def load_config( return bus_config -def _create_bus_config(config: Dict[str, Any]) -> typechecking.BusConfig: +def _create_bus_config(config: dict[str, Any]) -> typechecking.BusConfig: """Validates some config values, performs compatibility mappings and creates specific structures (e.g. for bit timings). @@ -254,7 +252,7 @@ def _create_bus_config(config: Dict[str, Any]) -> typechecking.BusConfig: return cast("typechecking.BusConfig", config) -def _dict2timing(data: Dict[str, Any]) -> Union[BitTiming, BitTimingFd, None]: +def _dict2timing(data: dict[str, Any]) -> Union[BitTiming, BitTimingFd, None]: """Try to instantiate a :class:`~can.BitTiming` or :class:`~can.BitTimingFd` from a dictionary. Return `None` if not possible.""" @@ -396,8 +394,8 @@ def _rename_kwargs( func_name: str, start: str, end: Optional[str], - kwargs: Dict[str, Any], - aliases: Dict[str, Optional[str]], + kwargs: dict[str, Any], + aliases: dict[str, Optional[str]], ) -> None: """Helper function for `deprecated_args_alias`""" for alias, new in aliases.items(): @@ -468,7 +466,7 @@ def check_or_adjust_timing_clock(timing: T2, valid_clocks: Iterable[int]) -> T2: ) from None -def time_perfcounter_correlation() -> Tuple[float, float]: +def time_perfcounter_correlation() -> tuple[float, float]: """Get the `perf_counter` value nearest to when time.time() is updated Computed if the default timer used by `time.time` on this platform has a resolution diff --git a/can/viewer.py b/can/viewer.py index 57b3d6f7d..3eed727ab 100644 --- a/can/viewer.py +++ b/can/viewer.py @@ -27,7 +27,6 @@ import struct import sys import time -from typing import Dict, List, Tuple from can import __version__ from can.logger import ( @@ -161,7 +160,7 @@ def run(self): # Unpack the data and then convert it into SI-units @staticmethod - def unpack_data(cmd: int, cmd_to_struct: Dict, data: bytes) -> List[float]: + def unpack_data(cmd: int, cmd_to_struct: dict, data: bytes) -> list[float]: if not cmd_to_struct or not data: # These messages do not contain a data package return [] @@ -390,8 +389,8 @@ def _fill_text(self, text, width, indent): def _parse_viewer_args( - args: List[str], -) -> Tuple[argparse.Namespace, TDataStructs, TAdditionalCliArgs]: + args: list[str], +) -> tuple[argparse.Namespace, TDataStructs, TAdditionalCliArgs]: # Parse command line arguments parser = argparse.ArgumentParser( "python -m can.viewer", @@ -524,7 +523,7 @@ def _parse_viewer_args( key, fmt = int(tmp[0], base=16), tmp[1] # The scaling - scaling: List[float] = [] + scaling: list[float] = [] for t in tmp[2:]: # First try to convert to int, if that fails, then convert to a float try: diff --git a/pyproject.toml b/pyproject.toml index c65275559..3d1f8a3c3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,7 +13,7 @@ dependencies = [ "typing_extensions>=3.10.0.0", "msgpack~=1.1.0", ] -requires-python = ">=3.8" +requires-python = ">=3.9" license = { text = "LGPL v3" } classifiers = [ "Development Status :: 5 - Production/Stable", @@ -29,7 +29,6 @@ classifiers = [ "Operating System :: Microsoft :: Windows", "Operating System :: POSIX :: Linux", "Programming Language :: Python", - "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", @@ -61,7 +60,7 @@ changelog = "https://github.com/hardbyte/python-can/blob/develop/CHANGELOG.md" [project.optional-dependencies] lint = [ "pylint==3.2.*", - "ruff==0.10.0", + "ruff==0.11.0", "black==25.1.*", "mypy==1.15.*", ] diff --git a/test/contextmanager_test.py b/test/contextmanager_test.py index 3adb1e7c6..fe87f33b0 100644 --- a/test/contextmanager_test.py +++ b/test/contextmanager_test.py @@ -17,9 +17,10 @@ def setUp(self): ) def test_open_buses(self): - with can.Bus(interface="virtual") as bus_send, can.Bus( - interface="virtual" - ) as bus_recv: + with ( + can.Bus(interface="virtual") as bus_send, + can.Bus(interface="virtual") as bus_recv, + ): bus_send.send(self.msg_send) msg_recv = bus_recv.recv() @@ -27,9 +28,10 @@ def test_open_buses(self): self.assertTrue(msg_recv) def test_use_closed_bus(self): - with can.Bus(interface="virtual") as bus_send, can.Bus( - interface="virtual" - ) as bus_recv: + with ( + can.Bus(interface="virtual") as bus_send, + can.Bus(interface="virtual") as bus_recv, + ): bus_send.send(self.msg_send) # Receiving a frame after bus has been closed should raise a CanException