Skip to content

Update black, ruff & mypy #1929

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merged
merged 3 commits into from
Mar 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions can/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,20 @@
from typing import Any, Dict

__all__ = [
"VALID_INTERFACES",
"ASCReader",
"ASCWriter",
"AsyncBufferedReader",
"BitTiming",
"BitTimingFd",
"BLFReader",
"BLFWriter",
"BitTiming",
"BitTimingFd",
"BufferedReader",
"Bus",
"BusABC",
"BusState",
"CSVReader",
"CSVWriter",
"CanError",
"CanInitializationError",
"CanInterfaceNotImplementedError",
Expand All @@ -30,30 +33,27 @@
"CanTimeoutError",
"CanutilsLogReader",
"CanutilsLogWriter",
"CSVReader",
"CSVWriter",
"CyclicSendTaskABC",
"LimitedDurationCyclicSendTaskABC",
"Listener",
"Logger",
"LogReader",
"ModifiableCyclicTaskABC",
"Message",
"MessageSync",
"Logger",
"MF4Reader",
"MF4Writer",
"Message",
"MessageSync",
"ModifiableCyclicTaskABC",
"Notifier",
"Printer",
"RedirectReader",
"RestartableCyclicTaskABC",
"SizedRotatingLogger",
"SqliteReader",
"SqliteWriter",
"ThreadSafeBus",
"TRCFileVersion",
"TRCReader",
"TRCWriter",
"VALID_INTERFACES",
"ThreadSafeBus",
"bit_timing",
"broadcastmanager",
"bus",
Expand All @@ -64,8 +64,8 @@
"interfaces",
"io",
"listener",
"logconvert",
"log",
"logconvert",
"logger",
"message",
"notifier",
Expand Down
40 changes: 19 additions & 21 deletions can/bit_timing.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def from_bitrate_and_segments(
if the arguments are invalid.
"""
try:
brp = int(round(f_clock / (bitrate * (1 + tseg1 + tseg2))))
brp = round(f_clock / (bitrate * (1 + tseg1 + tseg2)))
except ZeroDivisionError:
raise ValueError("Invalid inputs") from None

Expand Down Expand Up @@ -232,15 +232,15 @@ def iterate_from_sample_point(
raise ValueError(f"sample_point (={sample_point}) must not be below 50%.")

for brp in range(1, 65):
nbt = round(int(f_clock / (bitrate * brp)))
nbt = int(f_clock / (bitrate * brp))
if nbt < 8:
break

effective_bitrate = f_clock / (nbt * brp)
if abs(effective_bitrate - bitrate) > bitrate / 256:
continue

tseg1 = int(round(sample_point / 100 * nbt)) - 1
tseg1 = round(sample_point / 100 * nbt) - 1
# limit tseg1, so tseg2 is at least 1 TQ
tseg1 = min(tseg1, nbt - 2)

Expand Down Expand Up @@ -312,7 +312,7 @@ def f_clock(self) -> int:
@property
def bitrate(self) -> int:
"""Bitrate in bits/s."""
return int(round(self.f_clock / (self.nbt * self.brp)))
return round(self.f_clock / (self.nbt * self.brp))

@property
def brp(self) -> int:
Expand All @@ -322,7 +322,7 @@ def brp(self) -> int:
@property
def tq(self) -> int:
"""Time quantum in nanoseconds"""
return int(round(self.brp / self.f_clock * 1e9))
return round(self.brp / self.f_clock * 1e9)

@property
def nbt(self) -> int:
Expand Down Expand Up @@ -433,7 +433,7 @@ def recreate_with_f_clock(self, f_clock: int) -> "BitTiming":
"f_clock change failed because of sample point discrepancy."
)
# adapt synchronization jump width, so it has the same size relative to bit time as self
sjw = int(round(self.sjw / self.nbt * bt.nbt))
sjw = round(self.sjw / self.nbt * bt.nbt)
sjw = max(1, min(4, bt.tseg2, sjw))
bt._data["sjw"] = sjw # pylint: disable=protected-access
bt._data["nof_samples"] = self.nof_samples # pylint: disable=protected-access
Expand All @@ -458,7 +458,7 @@ def __repr__(self) -> str:
return f"can.{self.__class__.__name__}({args})"

def __getitem__(self, key: str) -> int:
return cast(int, self._data.__getitem__(key))
return cast("int", self._data.__getitem__(key))

def __len__(self) -> int:
return self._data.__len__()
Expand Down Expand Up @@ -716,10 +716,8 @@ def from_bitrate_and_segments( # pylint: disable=too-many-arguments
if the arguments are invalid.
"""
try:
nom_brp = int(round(f_clock / (nom_bitrate * (1 + nom_tseg1 + nom_tseg2))))
data_brp = int(
round(f_clock / (data_bitrate * (1 + data_tseg1 + data_tseg2)))
)
nom_brp = round(f_clock / (nom_bitrate * (1 + nom_tseg1 + nom_tseg2)))
data_brp = round(f_clock / (data_bitrate * (1 + data_tseg1 + data_tseg2)))
except ZeroDivisionError:
raise ValueError("Invalid inputs.") from None

Expand Down Expand Up @@ -787,15 +785,15 @@ def iterate_from_sample_point(
sync_seg = 1

for nom_brp in range(1, 257):
nbt = round(int(f_clock / (nom_bitrate * nom_brp)))
nbt = int(f_clock / (nom_bitrate * nom_brp))
if nbt < 1:
break

effective_nom_bitrate = f_clock / (nbt * nom_brp)
if abs(effective_nom_bitrate - nom_bitrate) > nom_bitrate / 256:
continue

nom_tseg1 = int(round(nom_sample_point / 100 * nbt)) - 1
nom_tseg1 = round(nom_sample_point / 100 * nbt) - 1
# limit tseg1, so tseg2 is at least 2 TQ
nom_tseg1 = min(nom_tseg1, nbt - sync_seg - 2)
nom_tseg2 = nbt - nom_tseg1 - 1
Expand All @@ -811,7 +809,7 @@ def iterate_from_sample_point(
if abs(effective_data_bitrate - data_bitrate) > data_bitrate / 256:
continue

data_tseg1 = int(round(data_sample_point / 100 * dbt)) - 1
data_tseg1 = round(data_sample_point / 100 * dbt) - 1
# limit tseg1, so tseg2 is at least 2 TQ
data_tseg1 = min(data_tseg1, dbt - sync_seg - 2)
data_tseg2 = dbt - data_tseg1 - 1
Expand Down Expand Up @@ -923,7 +921,7 @@ def f_clock(self) -> int:
@property
def nom_bitrate(self) -> int:
"""Nominal (arbitration phase) bitrate."""
return int(round(self.f_clock / (self.nbt * self.nom_brp)))
return round(self.f_clock / (self.nbt * self.nom_brp))

@property
def nom_brp(self) -> int:
Expand All @@ -933,7 +931,7 @@ def nom_brp(self) -> int:
@property
def nom_tq(self) -> int:
"""Nominal time quantum in nanoseconds"""
return int(round(self.nom_brp / self.f_clock * 1e9))
return round(self.nom_brp / self.f_clock * 1e9)

@property
def nbt(self) -> int:
Expand Down Expand Up @@ -969,7 +967,7 @@ def nom_sample_point(self) -> float:
@property
def data_bitrate(self) -> int:
"""Bitrate of the data phase in bit/s."""
return int(round(self.f_clock / (self.dbt * self.data_brp)))
return round(self.f_clock / (self.dbt * self.data_brp))

@property
def data_brp(self) -> int:
Expand All @@ -979,7 +977,7 @@ def data_brp(self) -> int:
@property
def data_tq(self) -> int:
"""Data time quantum in nanoseconds"""
return int(round(self.data_brp / self.f_clock * 1e9))
return round(self.data_brp / self.f_clock * 1e9)

@property
def dbt(self) -> int:
Expand Down Expand Up @@ -1106,10 +1104,10 @@ def recreate_with_f_clock(self, f_clock: int) -> "BitTimingFd":
"f_clock change failed because of sample point discrepancy."
)
# adapt synchronization jump width, so it has the same size relative to bit time as self
nom_sjw = int(round(self.nom_sjw / self.nbt * bt.nbt))
nom_sjw = round(self.nom_sjw / self.nbt * bt.nbt)
nom_sjw = max(1, min(bt.nom_tseg2, nom_sjw))
bt._data["nom_sjw"] = nom_sjw # pylint: disable=protected-access
data_sjw = int(round(self.data_sjw / self.dbt * bt.dbt))
data_sjw = round(self.data_sjw / self.dbt * bt.dbt)
data_sjw = max(1, min(bt.data_tseg2, data_sjw))
bt._data["data_sjw"] = data_sjw # pylint: disable=protected-access
bt._validate() # pylint: disable=protected-access
Expand Down Expand Up @@ -1138,7 +1136,7 @@ def __repr__(self) -> str:
return f"can.{self.__class__.__name__}({args})"

def __getitem__(self, key: str) -> int:
return cast(int, self._data.__getitem__(key))
return cast("int", self._data.__getitem__(key))

def __len__(self) -> int:
return self._data.__len__()
Expand Down
6 changes: 3 additions & 3 deletions can/broadcastmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def create_timer(self) -> _Pywin32Event:
):
event = self.win32event.CreateWaitableTimer(None, False, None)

return cast(_Pywin32Event, event)
return cast("_Pywin32Event", event)

def set_timer(self, event: _Pywin32Event, period_ms: int) -> None:
self.win32event.SetWaitableTimer(event.handle, 0, period_ms, None, None, False)
Expand Down Expand Up @@ -121,12 +121,12 @@ def __init__(
# Take the Arbitration ID of the first element
self.arbitration_id = messages[0].arbitration_id
self.period = period
self.period_ns = int(round(period * 1e9))
self.period_ns = round(period * 1e9)
self.messages = messages

@staticmethod
def _check_and_convert_messages(
messages: Union[Sequence[Message], Message]
messages: Union[Sequence[Message], Message],
) -> Tuple[Message, ...]:
"""Helper function to convert a Message or Sequence of messages into a
tuple, and raises an error when the given value is invalid.
Expand Down
4 changes: 2 additions & 2 deletions can/bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ def send_periodic(

# Create a backend specific task; will be patched to a _SelfRemovingCyclicTask later
task = cast(
_SelfRemovingCyclicTask,
"_SelfRemovingCyclicTask",
self._send_periodic_internal(
msgs, period, duration, autostart, modifier_callback
),
Expand Down Expand Up @@ -452,7 +452,7 @@ def _matches_filters(self, msg: Message) -> bool:
for _filter in self._filters:
# check if this filter even applies to the message
if "extended" in _filter:
_filter = cast(can.typechecking.CanFilterExtended, _filter)
_filter = cast("can.typechecking.CanFilterExtended", _filter)
if _filter["extended"] != msg.is_extended_id:
continue

Expand Down
2 changes: 1 addition & 1 deletion can/ctypesutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

log = logging.getLogger("can.ctypesutil")

__all__ = ["CLibrary", "HANDLE", "PHANDLE", "HRESULT"]
__all__ = ["HANDLE", "HRESULT", "PHANDLE", "CLibrary"]

if sys.platform == "win32":
_LibBase = ctypes.WinDLL
Expand Down
4 changes: 2 additions & 2 deletions can/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def _get_class_for_interface(interface: str) -> Type[BusABC]:
f"'{interface}': {e}"
) from None

return cast(Type[BusABC], bus_class)
return cast("Type[BusABC]", bus_class)


@util.deprecated_args_alias(
Expand Down Expand Up @@ -138,7 +138,7 @@ def Bus( # noqa: N802


def detect_available_configs(
interfaces: Union[None, str, Iterable[str]] = None
interfaces: Union[None, str, Iterable[str]] = None,
) -> List[AutoDetectedConfig]:
"""Detect all configurations/channels that the interfaces could
currently connect with.
Expand Down
3 changes: 1 addition & 2 deletions can/interfaces/ics_neovi/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
"""
"""
""" """

__all__ = [
"ICSApiError",
Expand Down
8 changes: 4 additions & 4 deletions can/interfaces/ixxat/canlib_vcinpl.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@
from .exceptions import *

__all__ = [
"VCITimeout",
"VCIError",
"IXXATBus",
"VCIBusOffError",
"VCIDeviceNotFoundError",
"IXXATBus",
"VCIError",
"VCITimeout",
"vciFormatError",
]

Expand Down Expand Up @@ -890,7 +890,7 @@ def __init__(
self._count = int(duration / period) if duration else 0

self._msg = structures.CANCYCLICTXMSG()
self._msg.wCycleTime = int(round(period * resolution))
self._msg.wCycleTime = round(period * resolution)
self._msg.dwMsgId = self.messages[0].arbitration_id
self._msg.uMsgInfo.Bits.type = constants.CAN_MSGTYPE_DATA
self._msg.uMsgInfo.Bits.ext = 1 if self.messages[0].is_extended_id else 0
Expand Down
8 changes: 4 additions & 4 deletions can/interfaces/ixxat/canlib_vcinpl2.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@
from .exceptions import *

__all__ = [
"VCITimeout",
"VCIError",
"IXXATBus",
"VCIBusOffError",
"VCIDeviceNotFoundError",
"IXXATBus",
"VCIError",
"VCITimeout",
"vciFormatError",
]

Expand Down Expand Up @@ -1010,7 +1010,7 @@ def __init__(
self._count = int(duration / period) if duration else 0

self._msg = structures.CANCYCLICTXMSG2()
self._msg.wCycleTime = int(round(period * resolution))
self._msg.wCycleTime = round(period * resolution)
self._msg.dwMsgId = self.messages[0].arbitration_id
self._msg.uMsgInfo.Bits.type = constants.CAN_MSGTYPE_DATA
self._msg.uMsgInfo.Bits.ext = 1 if self.messages[0].is_extended_id else 0
Expand Down
6 changes: 3 additions & 3 deletions can/interfaces/ixxat/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
)

__all__ = [
"VCITimeout",
"VCIError",
"VCIRxQueueEmptyError",
"VCIBusOffError",
"VCIDeviceNotFoundError",
"VCIError",
"VCIRxQueueEmptyError",
"VCITimeout",
]


Expand Down
14 changes: 7 additions & 7 deletions can/interfaces/ixxat/structures.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,13 +201,13 @@ class CANBTP(ctypes.Structure):
]

def __str__(self):
return "dwMode=%d, dwBPS=%d, wTS1=%d, wTS2=%d, wSJW=%d, wTDO=%d" % (
self.dwMode,
self.dwBPS,
self.wTS1,
self.wTS2,
self.wSJW,
self.wTDO,
return (
f"dwMode={self.dwMode:d}, "
f"dwBPS={self.dwBPS:d}, "
f"wTS1={self.wTS1:d}, "
f"wTS2={self.wTS2:d}, "
f"wSJW={self.wSJW:d}, "
f"wTDO={self.wTDO:d}"
)


Expand Down
3 changes: 1 addition & 2 deletions can/interfaces/kvaser/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
"""
"""
""" """

__all__ = [
"CANLIBInitializationError",
Expand Down
Loading
Loading