diff --git a/can/broadcastmanager.py b/can/broadcastmanager.py index 319fb0f53..0dd2cffa9 100644 --- a/can/broadcastmanager.py +++ b/can/broadcastmanager.py @@ -117,12 +117,13 @@ def __init__( :raises ValueError: If the given messages are invalid """ messages = self._check_and_convert_messages(messages) - - # Take the Arbitration ID of the first element - self.arbitration_id = messages[0].arbitration_id + self.msgs_len = len(messages) + self.messages = messages + # Take the Arbitration ID of each message and put them into a list + self.arbitration_id = [self.messages[idx].arbitration_id for idx in range(self.msgs_len)] self.period = period self.period_ns = int(round(period * 1e9)) - self.messages = messages + self.msg_index = 0 @staticmethod def _check_and_convert_messages( @@ -131,8 +132,7 @@ def _check_and_convert_messages( """Helper function to convert a Message or Sequence of messages into a tuple, and raises an error when the given value is invalid. - Performs error checking to ensure that all Messages have the same - arbitration ID and channel. + Performs error checking to ensure that all Messages have the same channel. Should be called when the cyclic task is initialized. @@ -147,12 +147,6 @@ def _check_and_convert_messages( raise ValueError("Must be at least a list or tuple of length 1") messages = tuple(messages) - all_same_id = all( - message.arbitration_id == messages[0].arbitration_id for message in messages - ) - if not all_same_id: - raise ValueError("All Arbitration IDs should be the same") - all_same_channel = all( message.channel == messages[0].channel for message in messages ) @@ -205,16 +199,17 @@ def _check_modified_messages(self, messages: Tuple[Message, ...]) -> None: :raises ValueError: If the given messages are invalid """ - if len(self.messages) != len(messages): + if self.msgs_len != len(messages): raise ValueError( "The number of new cyclic messages to be sent must be equal to " "the number of messages originally specified for this task" ) - if self.arbitration_id != messages[0].arbitration_id: - raise ValueError( - "The arbitration ID of new cyclic messages cannot be changed " - "from when the task was created" - ) + for idx in range(self.msgs_len): + if self.arbitration_id[idx] != messages[idx].arbitration_id: + raise ValueError( + "The arbitration ID of new cyclic messages cannot be changed " + "from when the task was created" + ) def modify_data(self, messages: Union[Sequence[Message], Message]) -> None: """Update the contents of the periodically sent messages, without @@ -236,6 +231,68 @@ def modify_data(self, messages: Union[Sequence[Message], Message]) -> None: self.messages = messages +class VariableRateCyclicTaskABC(CyclicSendTaskABC, abc.ABC): + """A Cyclic task that supports a group period and intra-message period.""" + def _check_and_apply_period_intra( + self, period_intra: Optional[float] + ) -> None: + """ + Helper function that checks if the given period_intra is valid and applies the + variable rate attributes to be used in the cyclic task. + + :param period_intra: + The period in seconds to send intra-message. + + :raises ValueError: If the given period_intra is invalid + """ + self._is_variable_rate = False + self._run_cnt_msgs = [0] + self._run_cnt_max = 1 + self._run_cnt = 0 + + if period_intra is not None: + if not isinstance(period_intra, float): + raise ValueError("period_intra must be a float") + if period_intra <= 0: + raise ValueError("period_intra must be greater than 0") + if self.msgs_len <= 1: + raise ValueError("period_intra can only be used with multiple messages") + if period_intra*self.msgs_len >= self.period: + raise ValueError("period_intra per intra-message must be less than period") + period_ms = int(round(self.period * 1000, 0)) + period_intra_ms = int(round(period_intra * 1000, 0)) + (_run_period_ms, msg_cnts, group_cnts) = self._find_gcd(period_ms, period_intra_ms) + self._is_variable_rate = True + self._run_cnt_msgs = [i*msg_cnts for i in range(self.msgs_len)] + self._run_cnt_max = group_cnts + self._run_cnt = 0 + # Override period, period_ms, and period_ns to be the variable period + self.period = _run_period_ms / 1000 + self.period_ms = _run_period_ms + self.period_ns = _run_period_ms * 1000000 + + @staticmethod + def _find_gcd( + period_ms: int, + period_intra_ms: int, + ) -> Tuple[int, int, int]: + """ + Helper function that finds the greatest common divisor between period_ms and period_intra_ms. + + :returns: + Tuple of (gcd_ms, m_steps, n_steps) + * gcd_ms: greatest common divisor in milliseconds + * m_steps: number of steps to send intra-message + * n_steps: number of steps to send message group + """ + gcd_ms = min(period_ms, period_intra_ms) + while gcd_ms > 1: + if period_ms % gcd_ms == 0 and period_intra_ms % gcd_ms == 0: + break + gcd_ms -= 1 + m_steps = int(period_intra_ms / gcd_ms) + n_steps = int(period_ms / gcd_ms) + return (gcd_ms, m_steps, n_steps) class MultiRateCyclicSendTaskABC(CyclicSendTaskABC, abc.ABC): """A Cyclic send task that supports switches send frequency after a set time.""" @@ -265,7 +322,7 @@ def __init__( class ThreadBasedCyclicSendTask( - LimitedDurationCyclicSendTaskABC, ModifiableCyclicTaskABC, RestartableCyclicTaskABC + LimitedDurationCyclicSendTaskABC, ModifiableCyclicTaskABC, RestartableCyclicTaskABC, VariableRateCyclicTaskABC ): """Fallback cyclic send task using daemon thread.""" @@ -279,6 +336,7 @@ def __init__( on_error: Optional[Callable[[Exception], bool]] = None, autostart: bool = True, modifier_callback: Optional[Callable[[Message], None]] = None, + period_intra: Optional[float] = None, ) -> None: """Transmits `messages` with a `period` seconds for `duration` seconds on a `bus`. @@ -302,8 +360,20 @@ def __init__( self.thread: Optional[threading.Thread] = None self.on_error = on_error self.modifier_callback = modifier_callback + self._check_and_apply_period_intra(period_intra) - self.period_ms = int(round(period * 1000, 0)) + if USE_WINDOWS_EVENTS: + if not self._is_variable_rate: + self.period_ms = int(round(period * 1000, 0)) + try: + self.event = win32event.CreateWaitableTimerEx( + None, + None, + win32event.CREATE_WAITABLE_TIMER_HIGH_RESOLUTION, + win32event.TIMER_ALL_ACCESS, + ) + except (AttributeError, OSError, pywintypes.error): + self.event = win32event.CreateWaitableTimer(None, False, None) self.event: Optional[_Pywin32Event] = None if PYWIN32: @@ -349,7 +419,7 @@ def start(self) -> None: self.thread.start() def _run(self) -> None: - msg_index = 0 + self.msg_index = 0 msg_due_time_ns = time.perf_counter_ns() if self.event and PYWIN32: @@ -357,9 +427,30 @@ def _run(self) -> None: PYWIN32.wait_0(self.event) while not self.stopped: + msg_send = (self._run_cnt in self._run_cnt_msgs) if self._is_variable_rate else True if self.end_time is not None and time.perf_counter() >= self.end_time: self.stop() break + if msg_send: + # Prevent calling bus.send from multiple threads + with self.send_lock: + try: + if self.modifier_callback is not None: + self.modifier_callback(self.messages[self.msg_index]) + self.bus.send(self.messages[self.msg_index]) + except Exception as exc: # pylint: disable=broad-except + log.exception(exc) + + # stop if `on_error` callback was not given + if self.on_error is None: + self.stop() + raise exc + + # stop if `on_error` returns False + if not self.on_error(exc): + self.stop() + break + self.msg_index = (self.msg_index + 1) % self.msgs_len try: if self.modifier_callback is not None: @@ -383,6 +474,11 @@ def _run(self) -> None: if not self.event: msg_due_time_ns += self.period_ns + if USE_WINDOWS_EVENTS: + win32event.WaitForSingleObject( + self.event.handle, + win32event.INFINITE, + ) msg_index = (msg_index + 1) % len(self.messages) if self.event and PYWIN32: @@ -392,3 +488,6 @@ def _run(self) -> None: delay_ns = msg_due_time_ns - time.perf_counter_ns() if delay_ns > 0: time.sleep(delay_ns / NANOSECONDS_IN_SECOND) + + if self._is_variable_rate: + self._run_cnt = (self._run_cnt + 1) % self._run_cnt_max diff --git a/can/bus.py b/can/bus.py index a12808ab6..20a0498d3 100644 --- a/can/bus.py +++ b/can/bus.py @@ -217,6 +217,7 @@ def send_periodic( store_task: bool = True, autostart: bool = True, modifier_callback: Optional[Callable[[Message], None]] = None, + period_intra: Optional[float] = None, ) -> can.broadcastmanager.CyclicSendTaskABC: """Start sending messages at a given period on this bus. @@ -246,6 +247,10 @@ def send_periodic( Function which should be used to modify each message's data before sending. The callback modifies the :attr:`~can.Message.data` of the message and returns ``None``. + :param period_intra: + Period in seconds between each message when sending multiple messages + in a sequence. If not provided, the period will be used for each + message. :return: A started task instance. Note the task can be stopped (and depending on the backend modified) by calling the task's @@ -278,7 +283,7 @@ def send_periodic( task = cast( _SelfRemovingCyclicTask, self._send_periodic_internal( - msgs, period, duration, autostart, modifier_callback + msgs, period, duration, autostart, modifier_callback, period_intra ), ) # we wrap the task's stop method to also remove it from the Bus's list of tasks @@ -308,6 +313,7 @@ def _send_periodic_internal( duration: Optional[float] = None, autostart: bool = True, modifier_callback: Optional[Callable[[Message], None]] = None, + period_intra: Optional[float] = None, ) -> can.broadcastmanager.CyclicSendTaskABC: """Default implementation of periodic message sending using threading. @@ -320,10 +326,19 @@ def _send_periodic_internal( :param duration: The duration between sending each message at the given rate. If no duration is provided, the task will continue indefinitely. + :param modifier_callback: + Function which should be used to modify each message's data before + sending. The callback modifies the :attr:`~can.Message.data` of the + message and returns ``None``. + :param period_intra: + Period in seconds between each message when sending multiple messages + in a sequence. If not provided, the period will be used for each + message. :param autostart: If True (the default) the sending task will immediately start after creation. Otherwise, the task has to be started by calling the tasks :meth:`~can.RestartableCyclicTaskABC.start` method on it. + :return: A started task instance. Note the task can be stopped (and depending on the backend modified) by calling the @@ -342,6 +357,7 @@ def _send_periodic_internal( duration=duration, autostart=autostart, modifier_callback=modifier_callback, + period_intra=period_intra, ) return task diff --git a/can/interfaces/socketcan/socketcan.py b/can/interfaces/socketcan/socketcan.py index 40da0d094..f3c77f523 100644 --- a/can/interfaces/socketcan/socketcan.py +++ b/can/interfaces/socketcan/socketcan.py @@ -23,6 +23,7 @@ LimitedDurationCyclicSendTaskABC, ModifiableCyclicTaskABC, RestartableCyclicTaskABC, + VariableRateCyclicTaskABC, ) from can.interfaces.socketcan import constants from can.interfaces.socketcan.utils import find_available_interfaces, pack_filters @@ -310,7 +311,7 @@ def _compose_arbitration_id(message: Message) -> int: class CyclicSendTask( - LimitedDurationCyclicSendTaskABC, ModifiableCyclicTaskABC, RestartableCyclicTaskABC + LimitedDurationCyclicSendTaskABC, ModifiableCyclicTaskABC, RestartableCyclicTaskABC, VariableRateCyclicTaskABC ): """ A SocketCAN cyclic send task supports: @@ -327,6 +328,7 @@ def __init__( messages: Union[Sequence[Message], Message], period: float, duration: Optional[float] = None, + period_intra: Optional[float] = None, autostart: bool = True, ) -> None: """Construct and :meth:`~start` a task. @@ -347,7 +349,6 @@ def __init__( # - self.period # - self.duration super().__init__(messages, period, duration) - self.bcm_socket = bcm_socket self.task_id = task_id if autostart: @@ -819,6 +820,7 @@ def _send_periodic_internal( duration: Optional[float] = None, autostart: bool = True, modifier_callback: Optional[Callable[[Message], None]] = None, + period_intra: Optional[float] = None, ) -> can.broadcastmanager.CyclicSendTaskABC: """Start sending messages at a given period on this bus. @@ -864,7 +866,7 @@ def _send_periodic_internal( bcm_socket = self._get_bcm_socket(msgs_channel or self.channel) task_id = self._get_next_task_id() task = CyclicSendTask( - bcm_socket, task_id, msgs, period, duration, autostart=autostart + bcm_socket, task_id, msgs, period, duration, period_intra, autostart=autostart ) return task