Skip to content

Commit

Permalink
Refactor to add ABANDONED message (#392)
Browse files Browse the repository at this point in the history
* Refactor to add ABANDONED message

* Include 'A' when generating invalid message sequences
  • Loading branch information
mdickinson authored Jul 10, 2021
1 parent a1d5761 commit 363017f
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 18 deletions.
65 changes: 62 additions & 3 deletions traits_futures/base_future.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,16 @@
#: starts.
_CANCELLING_AFTER_STARTED = "cancelling_after_started"

#: Internal state corresponding to a task that was abandoned due to
#: cancellation.
_CANCELLED_ABANDONED = "cancelled_abandoned"

#: Internal state corresponding to a task that failed after cancellation.
_CANCELLED_FAILED = "cancelled_failed"

#: Internal state corresponding to a task that completed after cancellation.
_CANCELLED_COMPLETED = "cancelled_completed"

#: Mapping from each internal state to the corresponding user-visible state.
_INTERNAL_STATE_TO_STATE = {
_NOT_INITIALIZED: WAITING,
Expand All @@ -71,7 +81,9 @@
FAILED: FAILED,
_CANCELLING_BEFORE_STARTED: CANCELLING,
_CANCELLING_AFTER_STARTED: CANCELLING,
CANCELLED: CANCELLED,
_CANCELLED_ABANDONED: CANCELLED,
_CANCELLED_COMPLETED: CANCELLED,
_CANCELLED_FAILED: CANCELLED,
}

#: Internal states corresponding to completed futures.
Expand Down Expand Up @@ -223,6 +235,10 @@ def _task_sent(self, message):
If the future is already in ``CANCELLING`` state, no message is
dispatched.
Internal state:
* _CANCELLING_AFTER_STARTED -> _CANCELLING_AFTER_STARTED
* EXECUTING -> EXECUTING
Parameters
----------
message : tuple(str, object)
Expand All @@ -244,10 +260,36 @@ def _task_sent(self, message):
)
)

def _task_abandoned(self, none):
"""
Update state when the background task is abandoned due to cancellation.
Internal state:
* _CANCELLING_BEFORE_STARTED -> _CANCELLED_ABANDONED
Parameters
----------
none : NoneType
This parameter is unused.
"""
if self._internal_state == _CANCELLING_BEFORE_STARTED:
self._cancel = None
self._internal_state = _CANCELLED_ABANDONED
else:
raise _StateTransitionError(
"Unexpected 'started' message in internal state {!r}".format(
self._internal_state
)
)

def _task_started(self, none):
"""
Update state when the background task has started processing.
Internal state:
* _INITIALIZED -> EXECUTING
* _CANCELLING_BEFORE_STARTED -> _CANCELLED_AFTER_STARTED
Parameters
----------
none : NoneType
Expand All @@ -268,6 +310,10 @@ def _task_returned(self, result):
"""
Update state when background task reports completing successfully.
Internal state:
* EXECUTING -> COMPLETED
* _CANCELLING_AFTER_STARTED -> _CANCELLED_COMPLETED
Parameters
----------
result : any
Expand All @@ -279,7 +325,8 @@ def _task_returned(self, result):
self._internal_state = COMPLETED
elif self._internal_state == _CANCELLING_AFTER_STARTED:
self._cancel = None
self._internal_state = CANCELLED
self._result = result
self._internal_state = _CANCELLED_COMPLETED
else:
raise _StateTransitionError(
"Unexpected 'returned' message in internal state {!r}".format(
Expand All @@ -291,6 +338,10 @@ def _task_raised(self, exception_info):
"""
Update state when the background task reports completing with an error.
Internal state:
* EXECUTING -> FAILED
* _CANCELLING_AFTER_STARTED -> _CANCELLED_FAILED
Parameters
----------
exception_info : tuple(str, str, str)
Expand All @@ -303,7 +354,8 @@ def _task_raised(self, exception_info):
self._internal_state = FAILED
elif self._internal_state == _CANCELLING_AFTER_STARTED:
self._cancel = None
self._internal_state = CANCELLED
self._exception = exception_info
self._internal_state = _CANCELLED_FAILED
else:
raise _StateTransitionError(
"Unexpected 'raised' message in internal state {!r}".format(
Expand All @@ -317,6 +369,10 @@ def _user_cancelled(self):
A future in ``WAITING`` or ``EXECUTING`` state moves to ``CANCELLING``
state.
Internal state:
* _INITIALIZED -> _CANCELLING_BEFORE_STARTED
* EXECUTING -> _CANCELLING_AFTER_STARTED
"""
if self._internal_state == _INITIALIZED:
self._cancel()
Expand All @@ -340,6 +396,9 @@ def _executor_initialized(self, cancel):
cancel : callable
The callback to be called when the user requests cancellation.
The callback accepts no arguments, and has no return value.
Internal state:
* _NOT_INITIALIZED -> _INITIALIZED
"""
if self._internal_state == _NOT_INITIALIZED:
self._cancel = cancel
Expand Down
24 changes: 16 additions & 8 deletions traits_futures/tests/common_future_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,19 @@ def dummy_cancel_callback():
# that a future might encounter. Here:
#
# * I represents the executor initializing the future
# * A represents the background task being abandoned before starting
# * S represents the background task starting
# * X represents the background task failing with an exception
# * R represents the background task returning a result
# * C represents the user cancelling.
#
# A future must always be initialized before anything else happens, and then a
# complete run must always involve "started, raised" or "started, returned" in
# that order. In addition, a single cancellation is possible at any time before
# the end of the sequence.
# complete run must always involve "abandoned", "started, raised" or "started,
# returned" in that order. In addition, a single cancellation is possible at
# any time before the end of the sequence, and abandoned can only ever occur
# following cancellation.

MESSAGE_TYPES = "IASRXC"

COMPLETE_VALID_SEQUENCES = {
"ISR",
Expand All @@ -48,6 +52,7 @@ def dummy_cancel_callback():
"ICSX",
"ISCR",
"ISCX",
"ICA",
}


Expand Down Expand Up @@ -175,7 +180,7 @@ def test_invalid_message_sequences(self):
seq[:i] + msg
for seq in valid_initial_sequences
for i in range(len(seq) + 1)
for msg in "ICRSX"
for msg in MESSAGE_TYPES
}
invalid_sequences = continuations - valid_initial_sequences

Expand Down Expand Up @@ -213,15 +218,18 @@ def send_message(self, future, message, cancel_callback):
"""Send a particular message to a future."""
if message == "I":
future._executor_initialized(cancel_callback)
elif message == "A":
future._task_abandoned(None)
elif message == "S":
future._task_started(None)
elif message == "X":
future._task_raised(self.fake_exception())
elif message == "R":
future._task_returned(23)
else:
assert message == "C"
elif message == "X":
future._task_raised(self.fake_exception())
elif message == "C":
future._user_cancelled()
else:
raise ValueError(f"message {message} not understood")

def send_message_sequence(self, messages, cancel_callback=None):
"""Create a new future, and send the given message sequence to it."""
Expand Down
18 changes: 11 additions & 7 deletions traits_futures/wrappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
#: are interpreted by the future.
SENT = "sent"

#: Control message sent when the callable is abandoned before execution.
ABANDONED = "abandoned"

#: Control message sent before we start to process the target callable.
#: The argument is always ``None``.
STARTED = "started"
Expand Down Expand Up @@ -73,7 +76,7 @@ def _dispatch_to_future(self, event):
message_type, message_arg = message
method_name = "_task_{}".format(message_type)
getattr(self.future, method_name)(message_arg)
if message_type in {RAISED, RETURNED}:
if message_type in {ABANDONED, RAISED, RETURNED}:
self.done = True


Expand Down Expand Up @@ -101,14 +104,15 @@ def __init__(self, background_task, sender, cancelled):
def __call__(self):
try:
with self._sender:
if self._cancelled():
self._sender.send((ABANDONED, None))
return

self._sender.send((STARTED, None))
try:
if self._cancelled():
result = None
else:
result = self._background_task(
self._send_custom_message, self._cancelled
)
result = self._background_task(
self._send_custom_message, self._cancelled
)
except BaseException as e:
self._sender.send((RAISED, marshal_exception(e)))
else:
Expand Down

0 comments on commit 363017f

Please # to comment.